在 Java 中查看 Kafka 標頭
一、簡介
Apache Kafka 是一個分散式串流平台,允許我們發布和訂閱記錄流(通常稱為訊息)。此外,Kafka 標頭提供了一種將元資料附加到 Kafka 訊息的方法,從而在訊息處理中提供額外的上下文和靈活性。
在本教程中,我們將深入研究常用的 Kafka 標頭,並學習如何使用 Java 查看和提取它們。
2.Kafka header 概述
Kafka 標頭表示附加到 Kafka 訊息的鍵值對,提供了一種在主要訊息內容旁邊包含補充元資料的方法。
例如,Kafka 標頭透過提供將訊息導向到特定處理管道或消費者的資料來促進訊息路由。此外,標頭在承載針對應用程式處理邏輯自訂的自訂應用程式元資料方面具有多種用途。
3.Kafka預設標頭
Kafka 會自動在 Kafka 生產者發送的訊息中包含幾個預設標頭。此外,這些標頭提供了有關訊息的重要元資料和上下文。在本節中,我們將深入研究一些常用的標頭及其在 Kafka 訊息處理領域的重要性。
3.1.生產者標題
當在 Kafka 中產生訊息時,生產者會自動包含幾個預設標頭,例如:
-
KafkaHeaders.TOPIC
– 此標頭包含訊息所屬主題的名稱。 -
KafkaHeaders.KEY
– 如果訊息是使用金鑰產生的,Kafka 會自動包含一個名為「key
」的標頭,其中包含序列化的金鑰位元組。 -
KafkaHeaders.PARTITION
– Kafka 新增一個名為「partition
」的標頭來指示訊息所屬的分區 ID。 -
KafkaHeaders.TIMESTAMP
– Kafka 為每個訊息附加一個名為「timestamp
」的標頭,指示生產者產生訊息的時間戳記。
3.2.消費者標題
Kafka 消費者在消費訊息時會加上以RECEIVED_
為前綴的標頭,以提供訊息接收過程的元資料:
-
KafkaHeaders.RECEIVED_TOPIC
– 此標頭包含從中接收訊息的主題的名稱。 -
KafkaHeaders.RECEIVED_KEY
– 此標頭允許消費者存取與訊息關聯的金鑰。 -
KafkaHeaders.RECEIVED_PARTITION_ID
– Kafka 新增此標頭以指示訊息指派到的分割區的 ID。 -
KafkaHeaders.RECEIVED_TIMESTAMP
– 此標頭反映消費者收到訊息的時間。 -
KafkaHeaders.OFFSET
– 偏移量指示訊息在分區日誌中的位置。
4.使用標頭消費訊息
首先,我們實例化一個KafkaConsumer
物件。 KafkaConsumer
負責訂閱 Kafka 主題並從中獲取訊息。實例化KafkaConsumer
後,我們訂閱要從中消費訊息的 Kafka 主題。透過訂閱主題,消費者可以接收在該主題上發布的訊息。
一旦消費者訂閱了該主題,我們就會繼續從 Kafka 取得記錄。在此過程中, KafkaConsumer
從訂閱的主題中檢索訊息及其關聯的標頭。
下面的程式碼範例示範如何使用帶有標頭的訊息:
@KafkaListener(topics = "my-topic")
public void listen(String message, @Headers Map<String, Object> headers) {
System.out.println("Received message: " + message);
System.out.println("Headers:");
headers.forEach((key, value) -> System.out.println(key + ": " + value));
}
當從指定主題(例如「 my-topic
」)接收訊息時,Kafka 偵聽器容器會呼叫listen()
方法。 @Headers
註解指示該參數應填入接收到的訊息的標頭。
下面是一個範例輸出:
Received message: Hello Baeldung!
Headers:
kafka_receivedMessageKey: null
kafka_receivedPartitionId: 0
kafka_receivedTopic: my-topic
kafka_offset: 123
... // other headers
要存取特定標頭,我們可以使用headers
映射的get()
方法,提供所需標頭的鍵。以下是訪問主題名稱的範例:
String topicName = headers.get(KafkaHeaders.TOPIC);
topicName
應回傳my-topic
。
此外,在消費訊息時,如果我們已經知道處理所需的標頭,我們可以直接提取它們作為方法參數。這種方法提供了一種更簡潔、更有針對性的方式來存取特定標頭值,而無需迭代所有標頭。
下面的程式碼範例示範如何使用帶有標頭的訊息,直接提取特定標頭作為方法參數:
@KafkaListener(topics = "my-topic")
public void listen(String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println("Received message: " + message);
System.out.println("Partition: " + partition);
}
在listen()
方法中,我們使用@Header
註釋直接擷取RECEIVED_PARTITION
標頭。該註釋允許我們指定要提取的標頭及其相應的類型。將標頭的值直接注入方法參數(在本例中為分區)可以在方法體內直接存取。
下面是輸出:
Received message: Hello Baeldung!
Partition: 0
5. 結論
在本文中,我們探討了 Kafka 標頭在 Apache Kafka 訊息處理中的重要性。我們已經探索了生產者和消費者自動包含的預設標頭。此外,我們還學習如何提取和使用這些標頭。
與往常一樣,範例的程式碼可以在 GitHub 上取得。