獲取 Apache Kafka 主題中的最後 N 條消息
一、簡介
在這個簡短的教程中,我們將了解如何從 Apache Kafka 主題中檢索最後 N 條消息。
在本文的第一部分,我們將重點介紹執行此操作所需的先決條件。在第二部分中,我們將構建一個小型實用程序來使用 Java 和Kafka Java API庫讀取消息。最後,我們將提供簡短的指導,以使用KafkaCat從命令行獲得相同的結果。
2.先決條件
從 Kafka 主題中檢索最後 N 條消息就像從定義明確的偏移量開始消費消息一樣簡單。 Kafka Topic 中的偏移量表示消費者的當前位置.
在上一篇文章中,我們了解瞭如何利用[consumer.seekToEnd()](https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#seekToEnd-java.util.Collection-)
方法在 Apache Kafka 主題中獲取特定數量的消息。
考慮到相同的功能,我們可以通過執行一個簡單的減法得到計算正確偏移量的直覺:offset = lastOffset – N。然後我們可以從這個位置開始輪詢 N 條消息。
儘管如此,如果我們使用Transactional Producer
生成記錄,此方法將不起作用。在這種情況下,偏移量將跳過一些數字以適應 Kafka 主題事務記錄(提交/回滾等)。使用事務性生產者的一種常見情況是我們需要只處理一次 Kafka 消息。簡單地說,如果我們從 (lastOffset – N) 開始讀取消息,我們可能會消耗不到 N 條消息,因為一些偏移量被事務記錄消耗了。
3. 使用 Java 獲取 Kafka 主題中的最後 N 條消息
首先,我們需要創建一個Producer
和一個Consumer
:
Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "ConsumerGroup1");
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
現在讓我們生成一些消息:
final String TOPIC1 = "baeldung-topic";
int messagesInTopic = 100;
for (int i = 0; i < messagesInTopic; i++) {
producer.send(new ProducerRecord(TOPIC1, null, MESSAGE_KEY, String.valueOf(i))).get();
}
為了清楚和簡單起見,假設我們只需要為我們的消費者註冊一個分區:
TopicPartition partition = new TopicPartition(TOPIC1, 0);
List<TopicPartition> partitions = new ArrayList<>();
partitions.add(partition);
consumer.assign(partitions);
正如我們之前提到的,我們需要將偏移量定位在正確的位置,然後我們就可以開始輪詢了:
int messagesToRetrieve = 10;
consumer.seekToEnd(partitions);
long startIndex = consumer.position(partition) - messagesToRetrieve;
consumer.seek(partition, startIndex);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMinutes(1));
我們可能希望增加輪詢持續時間,以防網絡特別慢,或者要檢索的消息數量特別大。在這種情況下,我們需要考慮內存中有大量記錄可能會導致資源不足的問題。
現在讓我們最後檢查一下我們是否真的檢索到了正確數量的消息:
for (ConsumerRecord<String, String> record : records) {
assertEquals(MESSAGE_KEY, record.key());
assertTrue(Integer.parseInt(record.value()) >= (messagesInTopic - messagesToRetrieve));
recordsReceived++;
}
assertEquals(messagesToRetrieve, recordsReceived);
4. 使用 KafkaCat 獲取 Kafka 主題中的最後 N 條消息
KafkaCat (kcat)是一個命令行工具,我們可以用它來測試和調試Kafka Topics。 Kafka 本身提供了大量腳本和 shell 工具來執行相同的操作。儘管如此,在執行諸如檢索 Apache Kafka 主題中的最後 N 條消息等操作時, KafkaCat
的簡單性和易用性使其成為事實上的標準。安裝後,可以通過運行以下簡單命令來檢索 Kafka 主題中生成的最新 N 條消息:
$ kafkacat -C -b localhost:9092 -t topic-name -o -<N> -e
-
-C
表示我們需要消費消息 -
-b
表示Kafka Broker的位置 -
-t
表示主題名稱 -
-o
表示我們需要從這個偏移量開始讀取。帶負號意味著我們需要從末尾讀取 N 條消息。 -
-e
選項在閱讀最後一條消息時退出
鏈接到我們討論的上述案例,從名為“baeldung-topic”的主題中檢索last 10 messages
的命令“baeldung-topic” is
:
$ kafkacat -C -b localhost:9092 -t baeldung-topic -o -10 -e
5.結論
在這個簡短的教程中,我們了解瞭如何使用 Kafka 主題的最新 N 條消息。在第一部分中,我們使用了 Java Kafka API 庫。在第二部分中,我們使用了一個名為 KafkaCat 的命令行實用程序。
與往常一樣,代碼可在 GitHub 上獲得。