使用 Apache Kafka 讀取多個訊息
1. 概述
在本教程中,我們將探討 Kafka Consumer
如何從代理程式檢索訊息。我們將學習可設定的屬性,這些屬性可以直接影響 Kafka Consumer 一次讀取的訊息數量。最後,我們將探討調整這些設定如何影響Consumer
的行為。
2. 搭建環境
Kafka 消費者以可配置大小的批量方式取得給定分區的記錄。我們無法配置一批中要取得的確切記錄數,但我們可以配置這些批次的大小(以位元組為單位)。
對於本文中的程式碼片段,我們需要一個簡單的 Spring 應用程序,該應用程式使用kafka-clients庫與 Kafka 代理進行互動。我們將建立一個 Java 類,該類在內部使用KafkaConsumer
訂閱主題並記錄傳入訊息。如果您想更深入地了解,請隨時閱讀我們專門介紹 Kafka Consumer API 的文章並繼續閱讀。
我們的範例中的主要區別之一是日誌記錄:我們不是一次記錄一條訊息,而是收集它們並記錄整批訊息。這樣,我們就能夠準確地看到每個poll().
此外,讓我們透過合併批次的初始和最終偏移量以及消費者的groupId:
class VariableFetchSizeKafkaListener implements Runnable {
private final String topic;
private final KafkaConsumer<String, String> consumer;
// constructor
@Override
public void run() {
consumer.subscribe(singletonList(topic));
int pollCount = 1;
while (true) {
List<ConsumerRecord<String, String>> records = new ArrayList<>();
for (var record : consumer.poll(ofMillis(500))) {
records.add(record);
}
if (!records.isEmpty()) {
String batchOffsets = String.format("%s -> %s", records.get(0).offset(), records.get(records.size() - 1).offset());
String groupId = consumer.groupMetadata().groupId();
log.info("groupId: {}, poll: #{}, fetched: #{} records, offsets: {}", groupId, pollCount++, records.size(), batchOffsets);
}
}
}
}
Testcontainers 函式庫將協助我們透過執行正在執行的 Kafka 代理程式的 Docker 容器來設定測試環境。如果您想了解有關設定 Testcontainer 的 Kafka 模組的更多信息,請在此處查看我們如何配置測試環境並繼續操作。
在我們的特定情況下,我們可以定義一個附加方法來發布有關給定主題的多個訊息。例如,假設我們將溫度感測器讀取的值串流傳輸到名為「 engine.sensor.temperature
」的主題:
void publishTestData(int recordsCount, String topic) {
List<ProducerRecord<String, String>> records = IntStream.range(0, recordsCount)
.mapToObj(__ -> new ProducerRecord<>(topic, "key1", "temperature=255F"))
.collect(toList());
// publish all to kafka
}
正如我們所看到的,我們對所有訊息使用了相同的密鑰。結果,所有記錄將被傳送到同一個分區。對於有效負載,我們使用了描述溫度測量的簡短固定文字。
3. 測試預設行為
讓我們先使用預設的消費者配置來建立一個 Kafka 監聽器。然後,我們將發布一些訊息來查看偵聽器消耗了多少批次。正如我們所看到的,我們的自訂偵聽器在內部使用 Consumer API。因此,要實例化VariableFetchSizeKafkaListener
,我們必須先配置並建立一個KafkaConsumer
:
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "default_config");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
現在,我們將使用KafkaConsumer
的預設值作為最小和最大獲取大小。基於這個消費者,我們可以實例化我們的監聽器並非同步運行它以避免阻塞主執行緒:
CompletableFuture.runAsync(
new VariableFetchSizeKafkaListener(topic, kafkaConsumer)
);
最後,讓我們將測試線程阻塞幾秒鐘,為偵聽器提供一些時間來使用訊息。本文的目標是啟動聽眾並觀察他們的表現。我們將使用 Junit5 測試作為設定和探索其行為的便捷方式,但為了簡單起見,我們不會包含任何特定的斷言。因此,這將是我們的起始@Test:
@Test
void whenUsingDefaultConfiguration_thenProcessInBatchesOf() throws Exception {
String topic = "engine.sensors.temperature";
publishTestData(300, topic);
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "default_config");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
CompletableFuture.runAsync(
new VariableFetchSizeKafkaListener(topic, kafkaConsumer)
);
Thread.sleep(5_000L);
}
現在,讓我們執行測試並檢查日誌以查看單一批次中將獲取多少記錄:
10:48:46.958 [ForkJoinPool.commonPool-worker-2] INFO cbkcVariableFetchSizeKafkaListener - groupId: default_config, poll: #1, fetched: #300 records, offsets: 0 -> 299
正如我們所注意到的,我們在一個批次中獲取了所有 300 條記錄,因為我們的訊息很小。 key 和 body 都是短字串:key 為 4 個字符,body 為 16 個字符長。總共 20 個字節,加上一些額外的記錄元資料。另一方面,最大批次大小的預設值為 1 mebibyte(1.024 x 1.024 位元組),或簡稱 1,048,576 位元組。
4. 配置最大分割區取得大小
Kafka 中的“max.partition.fetch.bytes”
決定了消費者在單一請求中可以從單一分割區取得的最大資料量。因此,即使對於少量的短信,我們也可以透過更改屬性來強制監聽器分批來取得記錄。
為了觀察這一點,我們再建立兩個VariableFetchSizeKafkaListener
並配置它們,將此屬性分別設為 500B 和 5KB。首先,我們用一個專門的方法來提取所有常見的消費者Properties
,以避免程式碼重複:
Properties commonConsumerProperties() {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return props;
}
然後,讓我們建立第一個偵聽器並非同步運行它:
Properties fetchSize_500B = commonConsumerProperties();
fetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "max_fetch_size_500B");
fetchSize_500B.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "500");
CompletableFuture.runAsync(
new VariableFetchSizeKafkaListener("engine.sensors.temperature", new KafkaConsumer<>(fetchSize_500B))
);
正如我們所看到的,我們為不同的監聽器設定了不同的消費者群組 ID。這將使他們能夠使用相同的測試數據。現在,讓我們繼續第二個偵聽器並完成測試:
@Test
void whenChangingMaxPartitionFetchBytesProperty_thenAdjustBatchSizesWhilePolling() throws Exception {
String topic = "engine.sensors.temperature";
publishTestData(300, topic);
Properties fetchSize_500B = commonConsumerProperties();
fetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "max_fetch_size_500B");
fetchSize_500B.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "500");
CompletableFuture.runAsync(
new VariableFetchSizeKafkaListener(topic, new KafkaConsumer<>(fetchSize_500B))
);
Properties fetchSize_5KB = commonConsumerProperties();
fetchSize_5KB.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "max_fetch_size_5KB");
fetchSize_5KB.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5000");
CompletableFuture.runAsync(
new VariableFetchSizeKafkaListener(topic, new KafkaConsumer<>(fetchSize_5KB))
);
Thread.sleep(10_000L);
}
如果我們執行此測試,我們可以假設第一個消費者將獲得比第二個消費者大約小十倍的批次。我們來分析一下日誌:
[worker-3] INFO - groupId: max_fetch_size_5KB, poll: #1, fetched: #56 records, offsets: 0 -> 55
[worker-2] INFO - groupId: max_fetch_size_500B, poll: #1, fetched: #5 records, offsets: 0 -> 4
[worker-2] INFO - groupId: max_fetch_size_500B, poll: #2, fetched: #5 records, offsets: 5 -> 9
[worker-3] INFO - groupId: max_fetch_size_5KB, poll: #2, fetched: #56 records, offsets: 56 -> 111
[worker-2] INFO - groupId: max_fetch_size_500B, poll: #3, fetched: #5 records, offsets: 10 -> 14
[worker-3] INFO - groupId: max_fetch_size_5KB, poll: #3, fetched: #56 records, offsets: 112 -> 167
[worker-2] INFO - groupId: max_fetch_size_500B, poll: #4, fetched: #5 records, offsets: 15 -> 19
[worker-3] INFO - groupId: max_fetch_size_5KB, poll: #4, fetched: #51 records, offsets: 168 -> 218
[worker-2] INFO - groupId: max_fetch_size_500B, poll: #5, fetched: #5 records, offsets: 20 -> 24
[...]
正如預期的那樣,其中一個偵聽器確實獲取了幾乎比另一個偵聽器大十倍的資料批次。此外,重要的是要了解批次內的記錄數量取決於這些記錄及其元資料的大小。為了強調這一點,我們可以觀察到groupId
“ max_fetch_size_5KB
”的消費者在第四次輪詢時獲取了更少的記錄。
5. 配置最小獲取大小
Consumer API
還允許透過“fetch.min.bytes”
屬性自訂最小提取大小。我們可以更改此屬性以指定代理需要回應的最小資料量。如果未滿足此最小值,代理將在向消費者的獲取請求發送回應之前等待更長時間。為了強調這一點,我們可以在測試輔助方法中向測試發布者添加延遲。因此,生產者將在發送每個訊息之間等待特定的毫秒數:
@Test
void whenChangingMinFetchBytesProperty_thenAdjustWaitTimeWhilePolling() throws Exception {
String topic = "engine.sensors.temperature";
publishTestData(300, topic, 100L);
// ...
}
void publishTestData(int measurementsCount, String topic, long delayInMillis) {
// ...
}
讓我們先建立一個VariableFetchSizeKafkaListener
,它將使用預設配置,「 fetch.min.bytes
」等於一個位元組。與前面的範例類似,我們將在CompletableFuture
中非同步執行此使用者:
// fetch.min.bytes = 1 byte (default)
Properties minFetchSize_1B = commonConsumerProperties();
minFetchSize_1B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "min_fetch_size_1B");
CompletableFuture.runAsync(
new VariableFetchSizeKafkaListener(topic, new KafkaConsumer<>(minFetchSize_1B))
);
透過此設置,並且由於我們引入的延遲,我們可以預期每個記錄都將被一個接一個地單獨檢索。換句話說,我們可以預期單一記錄有多個批次。此外,我們預期這些批次的消耗速度與我們的KafkaProducer
發布資料的速度相似,在我們的例子中是每 100 毫秒一次。讓我們運行測試並分析日誌:
14:23:22.368 [worker-2] INFO - groupId: min_fetch_size_1B, poll: #1, fetched: #1 records, offsets: 0 -> 0
14:23:22.472 [worker-2] INFO - groupId: min_fetch_size_1B, poll: #2, fetched: #1 records, offsets: 1 -> 1
14:23:22.582 [worker-2] INFO - groupId: min_fetch_size_1B, poll: #3, fetched: #1 records, offsets: 2 -> 2
14:23:22.689 [worker-2] INFO - groupId: min_fetch_size_1B, poll: #4, fetched: #1 records, offsets: 3 -> 3
[...]
此外,我們可以透過將「 fetch.min.bytes
」值調整為更大的大小來強制消費者等待更多資料累積:
// fetch.min.bytes = 500 bytes
Properties minFetchSize_500B = commonConsumerProperties();
minFetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "mim_fetch_size_500B");
minFetchSize_500B.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "500");
CompletableFuture.runAsync(
new VariableFetchSizeKafkaListener(topic, new KafkaConsumer<>(minFetchSize_500B))
);
將屬性設定為 500 位元組後,我們可以預期消費者會等待更長時間並取得更多資料。讓我們也運行這個範例並觀察結果:
14:24:49.303 [worker-3] INFO - groupId: mim_fetch_size_500B, poll: #1, fetched: #6 records, offsets: 0 -> 5
14:24:49.812 [worker-3] INFO - groupId: mim_fetch_size_500B, poll: #2, fetched: #4 records, offsets: 6 -> 9
14:24:50.315 [worker-3] INFO - groupId: mim_fetch_size_500B, poll: #3, fetched: #5 records, offsets: 10 -> 14
14:24:50.819 [worker-3] INFO - groupId: mim_fetch_size_500B, poll: #4, fetched: #5 records, offsets: 15 -> 19
[...]
六,結論
在本文中,我們討論了KafkaConsumer
從代理程式取得資料的方式。我們了解到,預設情況下,如果至少有一條新記錄,消費者將獲取數據。另一方面,如果分割區中的新資料超過 1,048,576 位元組,它將被分割成該最大大小的多個批次。我們發現,自訂「 fetch.min.bytes
」和「 max.partition.fetch.bytes
」屬性允許我們自訂 Kafka 的行為以滿足我們的特定要求。
與往常一樣,範例的原始程式碼可以在 GitHub 上取得。