獲取 Apache Kafka 主題中的消息數
瀏覽人數:243最近更新:
一、概述
Apache Kafka是一個開源分佈式事件流平台。
在這個快速教程中,我們將學習獲取 Kafka 主題中消息數量的技術。我們將演示編程以及本機命令技術。
2. 程序化技術
一個 Kafka 主題可能有多個分區。我們的技術應該確保我們已經計算了來自每個分區的消息數量。
我們必須遍歷每個分區並檢查它們的最新偏移量。為此,我們將介紹一個消費者:
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
第二步是從這個消費者那裡得到所有的分區:
List<TopicPartition> partitions = consumer.partitionsFor(topic).stream().map(p -> new TopicPartition(topic, p.partition()))
.collect(Collectors.toList());
第三步是在每個分區的末尾偏移消費者,並將結果記錄在分區映射中:
consumer.assign(partitions);
consumer.seekToEnd(Collections.emptySet());
Map<TopicPartition, Long> endPartitions = partitions.stream().collect(Collectors.toMap(Function.identity(), consumer::position));
最後一步是取每個分區中的最後一個位置,並將結果相加得到主題中的消息數:
numberOfMessages = partitions.stream().mapToLong(p -> endPartitions.get(p)).sum();
3. Kafka 原生命令
如果我們想對 Kafka 主題的消息數量執行一些自動化任務,那麼編程技術是很好的選擇。但是,如果僅用於分析目的,則創建這些服務並在機器上運行它們將是開銷。一個簡單的選擇是使用原生 Kafka 命令。它會很快給出結果。
3.1。使用GetoffsetShell
命令
在執行本機命令之前,我們必須導航到機器上 Kafka 的根文件夾。以下命令返回我們在主題baeldung
上發布的消息數量:
$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092
--topic baeldung | awk -F ":" '{sum += $3} END {print "Result: "sum}'
Result: 3
3.2.使用消費者控制台
如前所述,我們將在執行任何命令之前導航到 Kafka 的根文件夾。以下命令返回主題baeldung
上發布的消息數:
$ bin/kafka-console-consumer.sh --from-beginning --bootstrap-server localhost:9092
--property print.key=true --property print.value=false --property print.partition
--topic baeldung --timeout-ms 5000 | tail -n 10|grep "Processed a total of"
Processed a total of 3 messages
4。結論
在本文中,我們研究了獲取 Kafka 主題中消息數量的技術。我們學習了一種編程技術,將所有分區分配給消費者並檢查最新的偏移量。
我們還看到了兩種原生 Kafka 命令技術。一個是來自 Kafka 工具的GetoffsetShell
命令。另一個是在控制台上運行一個消費者並從一開始就打印消息的數量。
與往常一樣,本文的源代碼可以在 GitHub 上找到。
本作品係原創或者翻譯,採用《署名-非商業性使用-禁止演繹4.0國際》許可協議