向 Kafka 發送消息時是否需要密鑰?
一、簡介
Apache Kafka 是一種開源的分佈式流處理系統,具有容錯能力並提供高吞吐量。 Kafka 基本上是一個實現發布者-訂閱者模型的消息傳遞系統。 Kafka 的消息傳遞、存儲和流處理功能使我們能夠大規模地存儲和分析實時數據流。
在本教程中,我們將首先了解密鑰在 Kafka 消息中的重要性。然後,我們將學習如何使用 Kafka 主題的密鑰發布消息。
2. Kafka消息中key的意義
正如我們所知,Kafka 按照我們生成記錄的順序有效地存儲記錄流。
當我們向 Kafka 主題發布消息時,它以循環方式分佈在可用分區中。因此,在 Kafka 主題中,消息的順序在一個分區內得到保證,但不能跨分區。
當我們向 Kafka 主題發布帶有密鑰的消息時,Kafka 保證所有具有相同密鑰的消息存儲在同一個分區中。因此,如果我們想要維護具有相同密鑰的消息的順序,Kafka 消息中的密鑰很有用。
總而言之,作為向 Kafka 發送消息的一部分,密鑰不是強制性的。基本上,如果我們希望使用相同的密鑰維護消息的嚴格順序,那麼我們絕對應該使用消息的密鑰。對於所有其他情況,具有空鍵將在分區之間提供更好的消息分佈。
接下來,讓我們直接深入研究一些具有帶密鑰的 Kafka 消息的實現代碼。
3.設置
在開始之前,我們先初始化一個Kafka集群,設置依賴,並初始化與Kafka集群的連接。
Kafka 的 Java 庫提供了易於使用的 Producer 和 Consumer API,我們可以使用它們來發布和消費來自 Kafka 的消息。
3.1.依賴關係
首先,讓我們將 Kafka 客戶端 Java 庫的Maven 依賴項添加到我們項目的pom.xml
文件中:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
3.2.集群和主題初始化
其次,我們需要一個正在運行的 Kafka 集群,我們可以連接到它並執行各種 Kafka 操作。本指南假設 Kafka 集群以默認配置在我們的本地系統上運行。
最後,我們將創建一個具有多個分區的 Kafka 主題,我們可以使用它來發布和使用消息。參考我們的 Kafka 主題創建指南,讓我們創建一個名為“ baeldung
”的主題:
Properties adminProperties = new Properties();
adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
Admin admin = Admin.create(adminProperties);
在這裡,我們使用Properties
實例定義的基本配置創建了一個 Kafka Admin
實例。接下來,我們將使用此Admin
實例創建一個名為“ baeldung
”的主題,其中包含五個分區:
admin.createTopics(Collections.singleton(new NewTopic("baeldung", 5, (short) 1)));
現在我們已經使用主題初始化了 Kafka 集群設置,讓我們使用密鑰發布一些消息。
4. 使用密鑰發布消息
為了演示我們的編碼示例,我們將首先創建一個KafkaProducer
實例,其中包含一些由Properties
實例定義的基本生產者屬性。接下來,我們將使用創建的KafkaProducer
實例發布帶有密鑰的消息並驗證主題分區。
讓我們深入詳細了解這些步驟中的每一個。
4.1.初始化生產者
首先,讓我們創建一個新的Properties
實例,它包含生產者的屬性以連接到我們的本地代理:
Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
此外,讓我們使用創建的生產者的Properties
實例創建KafkaProducer
的實例:
KafkaProducer <String, String> producer = new KafkaProducer<>(producerProperties);
KafkaProducer
類的構造函數接受Properties
對象(或Map)
並返回KafkaProducer
的實例。
4.2.發布消息
Kafka Publisher API 提供了多個構造函數來創建帶有密鑰的ProducerRecord
實例。我們使用ProducerRecord<K,V>(String topic, K key, V value)
構造函數來創建帶有鍵的消息:
ProducerRecord<String, String> record = new ProducerRecord<>("baeldung", "message-key", "Hello World");
在這裡,我們使用鍵為“ baeldung
”主題創建了一個ProducerRecord
實例。
現在,讓我們向 Kafka 主題發布一些消息並驗證分區:
for (int i = 1; i <= 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("baeldung", "message-key", String.valueOf(i));
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get();
logger.info(String.valueOf(metadata.partition()));
}
我們使用KafkaProducer.send(ProducerRecord<String, String> record)
方法向 Kafka 發布消息。該方法返回RecordMetadata
類型的Future
實例。然後,我們使用對Future<RecordMetadata>.get()
方法的阻塞調用,該方法在消息發佈時返回RecordMetadata
的實例。
接下來,我們使用RecordMetadata.partition()
方法並獲取消息的分區。
上面的代碼片段產生以下記錄結果:
1
1
1
1
1
1
1
1
1
1
使用它,我們驗證了我們使用相同密鑰發布的消息是否發佈到同一分區。
5.結論
在本文中,我們了解了 Kafka 消息中密鑰的重要性。
我們首先看到瞭如何發布帶有主題鍵的消息。然後我們討論瞭如何驗證具有相同密鑰的消息是否發佈到同一分區。
一如既往,所有示例的完整代碼都可以在 GitHub 上找到。