KafkaConsumer 的 subscribe() 與 assign() 方法
1. 概述
Apache Kafka 提供的用戶端程式庫允許開發人員使用底層Java API (應用程式介面)以及其他程式語言來產生和消費訊息。此 API 中的KafkaConsumer類別有兩個用於讀取訊息的方法: subscribe()和assign() 。
在本教學中,我們將討論 Kafka Java 用戶端 API 中的subscribe()和assign()方法之間的差異。我們將看到,它們的主要區別在於自動分區分配和手動分區分配。範例中使用的 Kafka 版本為 4.1.1。
2. 使用subscribe()實現自動分區分配
本節我們將討論KafkaConsumer類別的subscribe()方法。
2.1. KafkaConsumer.subscribe()
KafkaConsumer類別的subscribe()方法用於訂閱一個或多個主題。如果消費者屬於某個消費者群組,Kafka 叢集會自動為消費者分配分區。因此,當有新消費者加入或現有消費者離開時,叢集可以實現動態擴展和負載平衡。這樣一來,分區分配的管理就更簡單了。
以下是subscribe()方法的定義:
public void subscribe(Collection<String> topics)
它會訂閱傳遞給它的主題清單。它還有其他重載版本。然而,其主要思想保持不變:取得動態分配的分區。
2.2 一個例子
讓我們來看一個使用subscribe()方法的範例。為了簡單起見,我們將使用一個單獨的未分組消費者,即不屬於任何消費者組的消費者。以下是 Java 程式碼片段:
// Create Kafka Consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// Subscribe Consumer to Our topics
String topics = "test-topic";
consumer.subscribe(List.of(topics));
首先,我們建立一個 Kafka 消費者,然後訂閱一個名為test-topic的單一主題。
然後,我們在一個無限while中取得該主題的傳入樣本:
logger.info("Waiting for messages...");
// Poll the data
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
logger.info("Value: " + record.value() + " -- Partition: " + record.partition());
}
}
我們使用KafkaConsumer類別的poll()方法來取得接收到的樣本。如果有樣本,它會立即返回。否則,它會等待超時時間,在本例中為1000 milliseconds 。如果逾時, poll()方法將傳回一個空記錄。我們透過遍歷這些記錄來列印接收到的樣本的值和分區。
2.3. 測試範例
現在,讓我們測試一下消費者的行為。首先,我們需要使用kafka-topics.sh腳本建立主題test-topic :
$ kafka-topics.sh --bootstrap-server localhost:9092 --topic test-topic --create --partitions 3
Created topic test-topic.
我們使用–partitions選項明確指定了分區數為3實際上,預設值就是3現在,讓我們使用kafka-console-producer.sh腳本啟動一個生產者:
$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic --producer-property partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner
>
箭頭符號>表示我們已準備好向test -topic發送訊息。我們使用[RoundRobinPartitioner](https://kafka.apache.org/26/javadoc/org/apache/kafka/clients/producer/RoundRobinPartitioner.html)策略,透過–producer-property選項使生產者以輪詢方式寫入主題。否則,寫入主題的鍵為null ,主題只會寫入隨機選擇的分區之一。
現在,我們發送六條訊息:
$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic --producer-property partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner
>Message1
>Message2
>Message3
>Message4
>Message5
>Message6
>
接下來,我們來檢查一下消費者應用程式的輸出:
Waiting for messages...
Value: Message1 -- Partition: 0
Value: Message2 -- Partition: 1
Value: Message3 -- Partition: 2
Value: Message4 -- Partition: 0
Value: Message5 -- Partition: 1
Value: Message6 -- Partition: 2
從輸出結果可以看出,由於消費者不屬於任何特定的消費者群組,因此我們收到了所有分區中的所有訊息。如預期,共有三個分區。 Message1和Message4位於第一個分區( Partition 0 。類似地, Message2和Message5位於第二個分區( Partition 1 ,而Message3和Message6位於最後一個分區( Partition 2 。
3. 使用assign()函數手動指派分區
本節我們將討論KafkaConsumer類別的assign()方法。
3.1. KafkaConsumer.assign()
我們使用KafkaConsumer類別的assign()方法手動將分區指派給消費者。因此,它提供了對分區的完全控制。由於新消費者加入或現有消費者離開時不會自動重新平衡分區,因此由於始終從同一分區讀取數據,消費可能更加穩定。但是,它不具備自動擴縮容或容錯能力。
以下是assign()方法的定義:
public void assign(Collection<TopicPartition> partitions)
它接收分區列表作為輸入,並將它們分配給消費者。
3.2 一個例子
讓我們來看一個使用assign()方法的範例。以下是 Java 程式碼片段:
// Create Kafka Consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// Subscribe Consumer to Our topics
String topics = "test-topic";
consumer.assign(Arrays.asList(new TopicPartition(topics, 1)));
創建 Kafka 消費者後,我們將test-topic的第二個分區分配給該消費者。正如我們在上一節的範例中看到的,分區編號從0開始。因此,呼叫TopicPartition建構函式時的第二個參數(即1 )對應於第二個分區。
我們使用與上一節相同的while循環來獲取接收到的主題樣本。
3.3 測試範例
為了測試該應用程序,讓我們使用kafka-console-producer.sh腳本啟動一個生產者,並寫入六個訊息:
$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic --producer-property partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner
>Message11
>Message12
>Message13
>Message14
>Message15
>Message16
>
訊息寫入完畢後,讓我們檢查一下消費者應用程式的輸出:
Waiting for messages...
Value: Message12 -- Partition: 1
Value: Message15 -- Partition: 1
現在,我們不再接收所有訊息,而是像預期那樣只讀取Partition 1中的訊息、 Message12和Message15 。
4. 結論
本文討論了 Kafka Java 用戶端 API 中subscribe()和assign()方法的差異。首先,我們了解了subscribe()方法,它接收一個主題名稱清單。消費者可以訂閱清單中的主題。
接下來,我們討論了assign()方法,該方法接收一個分區清單。消費者訂閱分區,分區由主題名稱和對應的分區號碼組成。