向 Kafka 消息添加自定義標頭
一、簡介
Apache Kafka 是一個開源分佈式事件存儲和容錯流處理系統。 Kafka 基本上是一個事件流平台,客戶端可以在其中發布和訂閱事件流。通常,生產者應用程序向 Kafka 發布事件,而消費者訂閱這些事件,從而實現發布者-訂閱者模型。
在本教程中,我們將學習如何使用 Kafka 生產者在 Kafka 消息中添加自定義標頭。
2.設置
Kafka 提供了一個易於使用的 Java 庫,我們可以使用它來創建 Kafka 生產者客戶端(Producers)和消費者客戶端(Consumers)。
2.1.依賴關係
首先,讓我們將 Kafka 客戶端 Java 庫的Maven 依賴項添加到我們項目的pom.xml
文件中:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
2.2.連接初始化
本指南假設我們有一個在本地系統上運行的 Kafka 集群。此外,我們需要創建一個主題並與 Kafka 集群建立連接。
首先,讓我們開始在我們的集群中創建一個 Kafka 主題。我們可以參考我們的 Kafka Topic Creation 指南創建一個主題“ baeldung
”。
其次,讓我們使用將生產者連接到我們的本地代理所需的最低配置創建一個新的Properties
實例:
Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
最後,讓我們創建一個KafkaProducer
實例,我們將使用它來發布消息:
KafkaProducer <String, String> producer = new KafkaProducer<>(producerProperties);
KafkaProducer
類的構造函數接受具有bootstrap.servers
屬性的Properties
對象(或Map)
並返回KafkaProducer
的實例。
以類似的方式,讓我們創建一個KafkaConsumer
的實例,我們將使用它來消費消息:
Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "ConsumerGroup1");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
我們將使用這些生產者和消費者實例來演示我們所有的編碼示例。
現在我們已經配置了所有必要的依賴項和連接,我們可以編寫一個簡單的應用程序來在 Kafka 消息中添加自定義標頭。
3. 發布帶有自定義標頭的消息
在 Kafka 版本 0.11.0.0 中添加了對 Kafka 消息中自定義標頭的支持。要創建 Kafka 消息(Record),我們創建ProducerRecord<K,V>
的實例。 ProducerRecord
基本上標識消息值和消息要發佈到的主題,以及其他元數據。
ProducerRecord
類提供了各種構造函數來將自定義標頭添加到 Kafka 消息中。讓我們看一下我們可以使用的幾個構造函數:
-
ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers)
-
ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
ProducerRecord c
類構造函數都接受Iterable<Header>
類型形式的自定義標頭。
為了便於理解,讓我們創建一個ProducerRecord
,它向“baeldung”
主題發布一條消息以及一些自定義標頭:
List <Header> headers = new ArrayList<>();
headers.add(new RecordHeader("website", "baeldung.com".getBytes()));
ProducerRecord <String, String> record = new ProducerRecord <>("baeldung", null, "message", "Hello World", headers);
producer.send(record);
在這裡,我們正在創建一個Header
類型List
,以作為標頭傳遞給構造函數。每個標頭代表RecordHeader(String key, byte[] value)
的一個實例,它接受標頭鍵作為String
和標頭值作為byte
數組。
以類似的方式,我們可以使用第二個構造函數,它另外接受正在發布的記錄的時間戳:
List <Header> headers = new ArrayList<>();
headers.add(new RecordHeader("website", "baeldung.com".getBytes()));
ProducerRecord <String, String> record = new ProducerRecord <>("baeldung", null, System.currentTimeMillis(), "message", "Hello World", headers);
producer.send(record);
到目前為止,我們已經創建了一條帶有自定義標頭的消息並將其發佈到 Kafka。
接下來,讓我們實現消費者代碼來消費消息並驗證其自定義標頭。
4. 使用自定義標頭消費消息
首先,我們將我們的消費者實例訂閱到 Kafka 主題“baeldung”
以消費來自以下來源的消息:
consumer.subscribe(Arrays.asList("baeldung"));
其次,我們使用輪詢機制輪詢來自 Kafka 的新消息:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMinutes(1));
KafkaConsumer.poll(Duration duration)
方法輪詢 Kafka 主題中的新消息,直到Duration
參數指定的時間。該方法返回一個包含獲取的消息的ConsumerRecords
實例。 ConsumerRecords
基本上是ConsumerRecord
類型的Iterable
實例。
最後,我們遍歷獲取的記錄並獲取自定義標頭以及每條消息:
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.key());
System.out.println(record.value());
Headers consumedHeaders = record.headers();
for (Header header : consumedHeaders) {
System.out.println(header.key());
System.out.println(new String(header.value()));
}
}
在這裡,我們使用ConsumerRecord
類中的各種 getter 方法來獲取消息鍵、值和自定義標頭。 ConsumerRecord.headers()
方法返回一個包含自定義標頭的Headers
實例。 Headers基本上是Header
類型的Iterable
實例。然後,我們遍歷每個Header
實例並分別使用Header.key()
和Header.value()
方法獲取標頭鍵和值。
5.結論
在本文中,我們學習瞭如何向 Kafka 消息添加自定義標頭。我們查看了接受自定義標頭及其相應實現的不同可用構造函數。
然後我們看到瞭如何使用帶有自定義標頭的消息並驗證它們。
一如既往,所有示例的完整代碼都可以在 GitHub 上找到。