使用 Kafka Consumer API 從頭讀取數據
一、簡介
Apache Kafka 是一個開源的分佈式事件流處理系統。它基本上是一個事件流平台,可以發布、訂閱、存儲和處理記錄流。
Kafka 為實時數據處理提供了一個高吞吐量和低延遲的平台。基本上, Kafka 實現了一個發布者-訂閱者模型,其中生產者應用程序向 Kafka 發布事件,而消費者應用程序訂閱這些事件。
在本教程中,我們將學習如何使用 Kafka Consumer API 從 Kafka 主題的開頭讀取數據。
2.設置
在開始之前,我們先設置依賴,初始化Kafka集群連接,發布一些消息到Kafka。
Kafka 提供了一個方便的 Java 客戶端庫,我們可以使用它來對 Kafka 集群執行各種操作。
2.1.依賴關係
首先,讓我們將 Kafka 客戶端 Java 庫的Maven 依賴項添加到我們項目的pom.xml
文件中:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
2.2.集群和主題初始化
在整個指南中,我們假設 Kafka 集群以默認配置在我們的本地系統上運行。
其次,我們需要創建一個 Kafka 主題,我們可以使用它來發布和消費消息。讓我們參考我們的 Kafka 主題創建指南創建一個名為“ baeldung
”的 Kafka 主題。
現在我們已經啟動並運行了 Kafka 集群並創建了一個主題,讓我們向 Kafka 發布一些消息。
2.3.發布消息
最後,讓我們向 Kafka 主題“ baeldung
”發布一些虛擬消息。
要發布消息,讓我們創建一個KafkaProducer
實例,其基本配置由Properties
實例定義:
Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties);
我們使用KafkaProducer.send(ProducerRecord)
方法將消息發佈到 Kafka 主題“ baeldung
”:
for (int i = 1; i <= 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("baeldung", String.valueOf(i));
producer.send(record);
}
在這裡,我們向 Kafka 集群發布了十條消息。我們將使用這些來演示我們的消費者實現。
3. 從一開始就消費消息
到目前為止,我們已經初始化了 Kafka 集群並向 Kafka 主題發布了一些示例消息。接下來,讓我們看看如何從頭開始讀取消息。
為了演示這一點,我們首先使用Properties
實例定義的一組特定的消費者屬性初始化KafkaConsumer
的實例.
然後,我們使用創建的KafkaConsumer
實例消費消息並再次尋找分區偏移量的開始。
讓我們詳細了解這些步驟中的每一個。
3.1.消費者財產
為了從 Kafka 主題的開頭消費消息,我們創建了一個KafkaConsumer
實例,它具有隨機生成的消費者組 ID。我們通過將消費者的“ group.id
”屬性設置為隨機生成的 UUID 來實現:
Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
當我們為消費者生成一個新的消費者組 id 時,消費者將始終屬於由“ group.id
”屬性標識的新消費者組。新的消費者組不會有任何關聯的偏移量。在這種情況下,Kafka 提供了一個屬性“ auto.offset.reset
”,指示當 Kafka 中沒有初始偏移量或者當前偏移量在服務器上不再存在時應該做什麼。
“ auto.offset.reset
”屬性接受以下值:
-
earliest
:此值自動將偏移量重置為最早的偏移量 -
latest
:此值自動將偏移量重置為最新偏移量 -
none
:如果沒有找到消費者組的先前偏移量,則此值向消費者拋出異常 - anything else:如果設置了除前三個值之外的任何其他值,則向消費者拋出異常
由於我們想從 Kafka 主題的開頭讀取,我們將“ auto.offset.reset
”屬性的值設置為“ earliest”:
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
現在讓我們使用消費者屬性創建KafkaConsumer
的實例:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
我們使用這個KafkaConsumer
實例來消費主題開頭的消息。
3.2.消費消息
要消費消息,我們首先訂閱我們的消費者消費來自主題“baeldung”
的消息:
consumer.subscribe(Arrays.asList("baeldung"));
接下來,我們使用KafkaConsumer.poll(Duration duration)
方法從主題“ baeldung
”中輪詢新消息,直到Duration
參數指定的時間:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
for (ConsumerRecord<String, String> record : records) {
logger.info(record.value());
}
至此,我們已經閱讀了從“ baeldung
”主題開始的所有消息。
此外,**要重置現有消費者以從主題的開頭讀取,我們使用KafkaConsumer.seekToBeginning(Collection<TopicPartition> partitions)
方法.
**此方法接受TopicPartition
的集合併將消費者的偏移量指向分區的開頭:
consumer.seekToBeginning(consumer.assignment());
在這裡,我們將KafkaConsumer.assignment()
的值傳遞給seekToBeginning()
方法。 KafkaConsumer.assignment()
方法返回當前分配給消費者的分區集。
最後,再次輪詢同一個消費者的消息現在從分區的開頭讀取所有消息:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
for (ConsumerRecord<String, String> record : records) {
logger.info(record.value());
}
4。結論
在本文中,我們學習瞭如何使用 Kafka Consumer API 從 Kafka 主題的開頭讀取消息。
我們首先看看新消費者如何從 Kafka 主題的開頭讀取消息,以及它的實現。然後我們看到了一個已經在消費的消費者如何從頭開始尋找它的偏移量來讀取消息。
一如既往,所有示例的完整代碼都可以在 GitHub 上找到。