如何為 Kafka 消費者訂閱多個主題
1. 概述
在本教程中,我們將學習如何讓 Kafka 消費者訂閱多個主題。當相同的業務邏輯用於不同的主題時,這是一個常見的需求。
2.創建模型類
我們將考慮一個具有兩個 Kafka 主題的簡單支付系統,一個用於卡片支付,另一個用於銀行轉帳。讓我們建立模型類別:
public class PaymentData {
private String paymentReference;
private String type;
private BigDecimal amount;
private Currency currency;
// standard getters and setters
}
3.使用Kafka Consumer API訂閱多個主題
我們將討論的第一種方法是使用 Kafka Consumer API。讓我們新增所需的Maven 依賴項:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.1</version>
</dependency>
我們也配置 Kafka 消費者:
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "payments");
kafkaConsumer = new KafkaConsumer<>(properties);
在消費訊息之前,我們需要使用subscribe()
方法訂閱kafkaConsumer
兩個主題:
kafkaConsumer.subscribe(Arrays.asList("card-payments", "bank-transfers"));
我們現在準備好測試我們的配置。讓我們針對每個主題發布一條訊息:
void publishMessages() throws Exception {
ProducerRecord<String, String> cardPayment = new ProducerRecord<>("card-payments",
"{\"paymentReference\":\"A184028KM0013790\", \"type\":\"card\", \"amount\":\"275\", \"currency\":\"GBP\"}");
kafkaProducer.send(cardPayment).get();
ProducerRecord<String, String> bankTransfer = new ProducerRecord<>("bank-transfers",
"{\"paymentReference\":\"19ae2-18mk73-009\", \"type\":\"bank\", \"amount\":\"150\", \"currency\":\"EUR\"}");
kafkaProducer.send(bankTransfer).get();
}
最後,我們可以編寫整合測試:
@Test
void whenSendingMessagesOnTwoTopics_thenConsumerReceivesMessages() throws Exception {
publishMessages();
kafkaConsumer.subscribe(Arrays.asList("card-payments", "bank-transfers"));
int eventsProcessed = 0;
for (ConsumerRecord<String, String> record : kafkaConsumer.poll(Duration.ofSeconds(10))) {
log.info("Event on topic={}, payload={}", record.topic(), record.value());
eventsProcessed++;
}
assertThat(eventsProcessed).isEqualTo(2);
}
4.使用Spring Kafka訂閱多個主題
我們將討論的第二種方法使用 Spring Kafka。
讓我們將spring-kafka
和jackson-databind
依賴項加入pom.xml
:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.0.11</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
我們也定義ConsumerFactory
和ConcurrentKafkaListenerContainerFactory
bean:
@Bean
public ConsumerFactory<String, PaymentData> consumerFactory() {
List<String, String> config = new HashMap<>();
config.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
config.put(VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(
config, new StringDeserializer(), new JsonDeserializer<>(PaymentData.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, PaymentData> containerFactory() {
ConcurrentKafkaListenerContainerFactory<String, PaymentData> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
我們需要使用@KafkaListener
註解的topics
性質來訂閱這兩個主題:
@KafkaListener(topics = { "card-payments", "bank-transfers" }, groupId = "payments")
最後,我們可以創建消費者。此外,我們還包含 Kafka 標頭來標識接收訊息的主題:
@KafkaListener(topics = { "card-payments", "bank-transfers" }, groupId = "payments")
public void handlePaymentEvents(
PaymentData paymentData, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Event on topic={}, payload={}", topic, paymentData);
}
讓我們驗證一下我們的配置:
@Test
public void whenSendingMessagesOnTwoTopics_thenConsumerReceivesMessages() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(2);
doAnswer(invocation -> {
countDownLatch.countDown();
return null;
}).when(paymentsConsumer)
.handlePaymentEvents(any(), any());
kafkaTemplate.send("card-payments", createCardPayment());
kafkaTemplate.send("bank-transfers", createBankTransfer());
assertThat(countDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
}
5.使用Kafka CLI訂閱多個主題
Kafka CLI 是我們要討論的最後一種方法。
首先,讓我們針對每個主題發送一條訊息:
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic card-payments
>{"paymentReference":"A184028KM0013790", "type":"card", "amount":"275", "currency":"GBP"}
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic bank-transfers
>{"paymentReference":"19ae2-18mk73-009", "type":"bank", "amount":"150", "currency":"EUR"}
現在,我們可以啟動 Kafka CLI 消費者。 include
選項可讓我們指定要包含的訊息消費的主題清單:
$ bin/kafka-console-consumer.sh --from-beginning --bootstrap-server localhost:9092 --include "card-payments|bank-transfers"
這是我們運行上一個命令時的輸出:
{"paymentReference":"A184028KM0013790", "type":"card", "amount":"275", "currency":"GBP"}
{"paymentReference":"19ae2-18mk73-009", "type":"bank", "amount":"150", "currency":"EUR"}
六,結論
在本文中,我們學習了 Kafka 消費者訂閱多個主題的三種不同方法。當為多個主題實現相同的功能時,這非常有用。
前兩種方法基於 Kafka Consumer API 和 Spring Kafka,可以整合到現有應用程式中。最後一種使用Kafka CLI,可以用來快速驗證多個主題。
像往常一樣,完整的程式碼可以在 GitHub 上找到。