了解 Kafka 主題和分區
一、簡介
在本教程中,我們將探討 Kafka 主題和分區以及它們之間的相互關係。
2.什麼是Kafka主題
主題是一系列事件的存儲。主題是持久的日誌文件,它們按照事件發生的時間順序保存事件。因此,每個新事件總是添加到日誌的末尾。此外,事件是不可變的。因此,在將它們添加到主題後我們無法更改它們。
Kafka 主題的一個示例用例是記錄房間的一系列溫度測量值。一旦記錄了溫度值,例如下午 5:02 的 25°C,就無法更改,因為它已經發生了。此外,下午 5:06 的溫度值不能早於下午 5:02 記錄的溫度值。因此,通過將每個溫度測量視為一個事件,Kafka 主題將是存儲該數據的合適選擇。
3.什麼是Kafka分區
Kafka 使用主題分區來提高可擴展性。在對主題進行分區時,Kafka 將其分解為多個部分,並將每個部分存儲在其分佈式系統的不同節點中。分數的數量由我們或集群默認配置確定。
Kafka保證同一主題的分區之間的事件的順序。因此,從分區主題消費應該與從沒有分區的主題消費相同。
例如,為了提高性能,我們可以將主題分為兩個不同的分區,並在消費者端讀取它們。在這種情況下,消費者會按照到達主題的相同順序讀取溫度,甚至跨分區也是如此。
4. 消費者群體
消費者組是從主題讀取數據的一組消費者。 Kafka 將所有主題分區分配給一個組中的消費者。該劃分可能是不平衡的,這意味著可以將多個分區分配給一個使用者。但是,任何給定的分區始終由組中的單個使用者讀取。
例如,讓我們想像一個具有三個分區的主題,一個具有兩個消費者的消費者組應該讀取該主題。因此,一種可能的劃分是第一個消費者獲得分區一和分區二,而另一個消費者僅獲得分區三。
Kafka 在後台使用 Zookeeper 在消費者之間劃分分區。 Zookeper 的重要之處在於它保證了公平的分配。因此,分區在同一組中的消費者之間平均分配。
5. 配置應用程序
在本節中,我們將創建類來配置主題、消費者和生產者服務。
5.1.主題配置
首先,讓我們為我們的主題創建配置類:
@Configuration
public class KafkaTopicConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
public NewTopic celciusTopic() {
return TopicBuilder.name("celcius-scale-topic")
.partitions(1)
.build();
}
}
KafkaTopicConfig
類註入兩個 Spring bean。 KafkaAdmin
bean 使用應運行的網絡地址啟動 Kafka 集群,而NewTopic
bean 創建一個名為celcius-scale-topic
的主題,其中包含一個分區。
5.2.消費者和生產者配置
現在,我們需要必要的類來注入我們主題的生產者和消費者配置。
首先,讓我們創建生產者配置類:
public class KafkaProducerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, Double> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DoubleSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Double> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
KafkaProducerConfig
注入兩個 Spring bean。 ProducerFactory
告訴 Kafka 應該如何序列化消息以及生產者應該監聽哪個服務器。 KafkaTemplate
將在消費者服務類中用於創建消息。
5.3.卡夫卡生產者服務
最後,在初始配置之後,我們可以創建驅動程序應用程序。我們首先創建生產者應用程序:
public class ThermostatService {
private final KafkaTemplate<String, Double> kafkaTemplate;
public ThermostatService(KafkaTemplate<String, Double> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void measureCelsiusAndPublish(int numMeasurements) {
new Random().doubles(25, 35)
.limit(numMeasurements)
.forEach(tmp -> {
kafkaTemplate.send("celcius-scale-topic", tmp);
});
}
}
ThermostatService
包含一個名為measureCelsiusAndPublish
的方法。此方法生成 [25, 35] 範圍內的隨機溫度測量值,並發佈到celsius-scale-topic
Kafka 主題。為了實現這一點,我們使用Random
類的doubles()
方法來創建隨機數流。然後,我們使用kafkaTemplate
的send
()
方法發布消息。
6. 消息的生產和消費
在本節中,我們將了解如何配置 Kafka 使用者以使用嵌入式 Kafka 代理從主題讀取消息。
6.1.創建消費者服務
為了消費消息,我們需要一個或多個消費者類。讓我們創建一個celcius-scale-topic
的消費者:
@Service
public class TemperatureConsumer {
private CountDownLatch latch = new CountDownLatch(1);
Map<String, Set<String>> consumedRecords = new ConcurrentHashMap<>();
@KafkaListener(topics = "celcius-scale-topic", groupId = "group-1")
public void consumer1(ConsumerRecord<?, ?> consumerRecord) {
computeConsumedRecord("consumer-1", consumerRecord.partition());
}
private void computeConsumedRecord(String key, int consumerRecord) {
consumedRecords.computeIfAbsent(key, k -> new HashSet<>());
consumedRecords.computeIfPresent(key, (k, v) -> {
v.add(String.valueOf(consumerRecord));
return v;
});
}
public CountDownLatch getLatch() {
return latch;
}
}
latch
變量是一個線程安全計數器,我們稍後將在集成測試中使用它來確保我們正確接收消息。
我們的consumer1()
方法使用@KafkaListener
註釋來啟動消費者。 topics
參數是要消費的主題列表,而groupId
參數是消費者所屬的消費者組的標識。
為了稍後可視化結果,我們使用ConcurrentHashMap
來存儲消耗的事件。 key
對應於消費者的名稱,而value
包含其消費的分區。
6.2.創建測試類
現在,讓我們創建集成測試類:
@SpringBootTest(classes = ThermostatApplicationKafkaApp.class)
@EmbeddedKafka(partitions = 2, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
public class KafkaTopicsAndPartitionsIntegrationTest {
@ClassRule
public static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaBroker(1, true, "multitype");
@Autowired
private ThermostatService service;
@Autowired
private TemperatureConsumer consumer;
@Test
public void givenTopic_andConsumerGroup_whenConsumersListenToEvents_thenConsumeItCorrectly() throws Exception {
service.measureCelsiusAndPublish(10000);
consumer.getLatch().await(1, TimeUnit.SECONDS);
System.out.println(consumer.consumedRecords);
}
}
我們使用嵌入式 Kafka 代理來運行 Kafka 測試。 @EmbeddedKafka
註釋使用參數brokerProperties
來配置代理將在其上運行的URL 和端口。然後,我們使用EmbeddedKafkaBroker
字段中的 JUnit 規則啟動嵌入式代理。
最後,在測試方法中,我們簡單地調用恆溫器服務來生成 10,000 條消息。當我們運行該測試時,它會輸出類似以下內容的內容:
{consumer-1=[0, 1]}
這意味著同一個消費者處理了分區 0 和 1 中的所有消息,因為我們只有一個消費者和一個消費者組。如果不同消費群體中有更多的消費者,這個結果可能會有所不同。
七、結論
在本文中,我們了解了 Kafka 主題和分區的定義以及它們之間的相互關係。
我們還演示了消費者使用嵌入式 Kafka 代理從主題的兩個分區讀取消息的場景。
與往常一樣,示例代碼可以在 GitHub 上獲取。