使用 Consumer API 建立 Kafka 監聽器
1. 概述
在本教程中,我們將學習如何建立 Kafka 偵聽器並使用 Kafka 的Consumer
API 來消費來自主題的訊息。之後,我們將使用Producer
API 和 Testcontainers 測試我們的實作。
我們將專注於在不依賴 Spring Boot 模組的情況下設定KafkaConsumer
。
2. 建立自訂 Kafka 監聽器
我們的自訂偵聽器將在內部使用[kafka-clients](https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients)
庫中的生產者和消費者 API。讓我們先將此依賴項新增到我們的pom.xml
檔案中:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.1</version>
</dependency>
對於本文中的程式碼範例,我們將建立一個CustomKafkaListener
類,該類別將偵聽名為「 baeldung.articles.published
」的主題。在內部,我們的類別將包裝KafkaConsumer
並利用它來訂閱主題:
class CustomKafkaListener {
private final String topic;
private final KafkaConsumer<String, String> consumer;
// ...
}
2.1.創建一個KafkaConsumer
要建立KafkaConsumer,
我們需要透過Properties
物件提供有效的配置。讓我們建立一個簡單的消費者,在建立CustomKafkaListener
實例時可以將其用作預設消費者:
public CustomKafkaListener(String topic, String bootstrapServers) {
this(topic, defaultKafkaConsumer(bootstrapServers));
}
static KafkaConsumer<String, String> defaultKafkaConsumer(String boostrapServers) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServers);
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test_group_id");
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return new KafkaConsumer<>(props);
}
對於此範例,我們對大部分屬性進行了硬編碼,但理想情況下,它們應該從設定檔載入。讓我們快速看看每個屬性的含義:
- Boostrap 伺服器:用於建立與 Kafka 叢集的初始連接的主機和連接埠對列表
- Group ID:允許一群消費者共同消費一組主題分區的ID
- 自動偏移重置:當沒有初始偏移時,Kafka日誌中開始讀取資料的位置
- 鍵/值反序列化器:鍵和值的反序列化器類別。對於我們的範例,我們將使用
String
鍵和值以及以下反序列化器:org.apache.kafka.common.serialization.StringDeserializer
透過這個最小的配置,我們將能夠訂閱該主題並輕鬆測試實現。有關可用屬性的完整列表,請參閱官方文件。
2.2.訂閱主題
現在,我們需要提供一種訂閱主題並開始輪詢訊息的方法。這可以使用KafkaConsumer
的subscribe()
方法來實現,然後無限迴圈呼叫poll()
方法。此外,由於此方法會阻塞線程,因此我們可以實作Runnable
介面以提供與CompletableFuture:
class CustomKafkaListener implements Runnable {
private final String topic;
private final KafkaConsumer<String, String> consumer;
// constructors
@Override
void run() {
consumer.subscribe(Arrays.asList(topic));
while (true) {
consumer.poll(Duration.ofMillis(100))
.forEach(record -> log.info("received: " + record));
}
}
}
現在,我們的CustomKafkaListener
可以像這樣啟動,而不會阻塞主執行緒:
String topic = "baeldung.articles.published";
String bootstrapServers = "localhost:9092";
var listener = new CustomKafkaListener(topic, bootstrapServers)
CompletableFuture.runAsync(listener);
2.3.消費活動
目前,我們的應用程式僅偵聽主題並記錄所有傳入訊息。讓我們進一步改進它以允許更複雜的場景並使其更易於測試。例如,我們可以允許定義一個Consumer<String>
來接受主題中的每個新事件:
class CustomKafkaListener implements Runnable {
private final String topic;
private final KafkaConsumer<String, String> consumer;
private Consumer<String> recordConsumer;
CustomKafkaListener(String topic, KafkaConsumer<String, String> consumer) {
this.topic = topic;
this.consumer = consumer;
this.recordConsumer = record -> log.info("received: " + record);
}
// ...
@Override
public void run() {
consumer.subscribe(Arrays.asList(topic));
while (true) {
consumer.poll(Duration.ofMillis(100))
.forEach(record -> recordConsumer.accept(record.value()));
}
}
CustomKafkaListener onEach(Consumer newConsumer) {
recordConsumer = recordConsumer.andThen(newConsumer);
return this;
}
}
將recordConsumer
宣告為Consumer<String>
允許我們使用預設方法andThen()
連結多個函數。對於每條傳入訊息,這些函數都會被一一呼叫。
3. 測試
為了測試我們的實現,我們將創建一個KafkaProducer
並使用它來向“ baeldung.articles.published
”主題發布一些消息。然後,我們將啟動CustomKafkaListener
並驗證它是否準確地處理所有活動。
3.1.設定 Kafka 測試容器
我們可以利用 Testcontainers 函式庫在我們的測試環境中啟動 Kafka 容器。首先,我們需要為JUnit5 extension
和Kafka module
添加 Testcontainer 依賴項:
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.19.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.19.3</version>
<scope>test</scope>
</dependency>
我們現在可以使用特定的 Docker 映像名稱建立一個KafkaContainer
。然後,我們將添加@Container
和@Testcontainers
註釋,以允許 Testcontainers JUnit5 擴展管理容器的生命週期:
@Testcontainers
class CustomKafkaListenerLiveTest {
@Container
private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
// ...
}
3.2.建立並啟動監聽器
首先,我們將主題名稱定義為硬編碼String
,並從KAFKA_CONTAINER
中提取bootstrapServers
。此外,我們將建立一個ArrayList<String>
用於收集訊息:
String topic = "baeldung.articles.published";
String bootstrapServers = KAFKA_CONTAINER.getBootstrapServers();
List<String> consumedMessages = new ArrayList<>();
我們將使用這些屬性來建立CustomKafkaListener
的實例,並指示它捕獲新訊息並將它們add
至consumedMessages
清單:
CustomKafkaListener listener = new CustomKafkaListener(topic, bootstrapServers).onEach(consumedMessages::add);
listener.run();
然而,值得注意的是,按原樣運行它可能會阻塞線程並凍結測試。為了防止這種情況,我們將使用CompletableFuture
非同步執行它:
CompletableFuture.runAsync(listener);
雖然對於測試來說並不重要,但我們也可以先在 try-with-resources 區塊中實例化偵聽器:
var listener = new CustomKafkaListener(topic, bootstrapServers).onEach(consumedMessages::add);
CompletableFuture.runAsync(listener);
3.3.發布訊息
為了將文章名稱傳送到「 baeldung.articles.published
」主題,我們將使用Properties
物件設定一個KafkaProducer
,遵循與我們為消費者所做的類似方法。
static KafkaProducer<String, String> testKafkaProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(props);
}
此方法將允許我們發布訊息來測試我們的實作。讓我們建立另一個測試助手,它將為作為參數接收的每篇文章發送一條訊息:
private void publishArticles(String topic, String... articles) {
try (KafkaProducer<String, String> producer = testKafkaProducer()) {
Arrays.stream(articles)
.map(article -> new ProducerRecord<String,String>(topic, article))
.forEach(producer::send);
}
}
3.4.核實
讓我們將所有部分放在一起並運行我們的測試。我們已經討論瞭如何建立CustomKafkaListener
並開始發布資料:
@Test
void givenANewCustomKafkaListener_thenConsumesAllMessages() {
// given
String topic = "baeldung.articles.published";
String bootstrapServers = KAFKA_CONTAINER.getBootstrapServers();
List<String> consumedMessages = new ArrayList<>();
// when
CustomKafkaListener listener = new CustomKafkaListener(topic, bootstrapServers).onEach(consumedMessages::add);
CompletableFuture.runAsync(listener);
// and
publishArticles(topic,
"Introduction to Kafka",
"Kotlin for Java Developers",
"Reactive Spring Boot",
"Deploying Spring Boot Applications",
"Spring Security"
);
// then
// ...
}
我們的最終任務包括等待非同步程式碼完成並確認consumedMessages
清單包含預期內容。為了實現這一點,我們將使用 Awaitility 函式庫,利用它的await().untilAsserted()
:
// then
await().untilAsserted(() ->
assertThat(consumedMessages).containsExactlyInAnyOrder(
"Introduction to Kafka",
"Kotlin for Java Developers",
"Reactive Spring Boot",
"Deploying Spring Boot Applications",
"Spring Security"
));
4。結論
在本教程中,我們學習如何在不依賴更高層級的 Spring 模組的情況下使用 Kafka 的Consumer
和Producer
API。首先,我們使用封裝了 KafkaConsumer 的CustomKafkaListener
創建了一個消費者KafkaConsumer.
為了進行測試,我們實作了KafkaProducer
並使用 Testcontainers 和 Awaitility 驗證了我們的設定。
與往常一樣,範例的原始程式碼可以在 GitHub 上取得。