在 Kafka 中提交偏移量
1. 概述
在 Kafka 中,消費者從分區讀取訊息。在讀取訊息時,需要考慮一些問題,例如確定從分割區讀取哪些訊息,或防止重複讀取訊息或在發生故障時遺失訊息。解決這些問題的方法是使用偏移量。
在本教程中,我們將了解 Kafka 中的偏移量。我們將了解如何提交偏移量來管理訊息消耗並討論其方法和缺點。
2. 什麼是偏移量?
我們知道Kafka將訊息儲存在主題中,每個主題可以有多個分區。每個消費者從主題的一個分區讀取訊息。在這裡, Kafka 在偏移量的幫助下追蹤消費者閱讀的訊息。偏移量是從零開始的整數,隨著訊息的儲存而增加 1。
假設一個消費者從一個分割區讀取了 5 則訊息。然後,根據配置,Kafka 將直到4
偏移量標記為已提交(從零開始的序列)。消費者下次嘗試讀取訊息時會消費偏移量為5
訊息。
如果沒有偏移,就無法避免重複處理或資料遺失。這就是為什麼它如此重要。
我們可以用資料庫儲存來類比。在資料庫中,我們在執行 SQL 語句後提交以儲存變更。同樣,從分區讀取後,我們提交偏移量來標記已處理訊息的位置。
3. 提交抵銷的方式
有四種方法可以提交偏移量。我們將詳細研究每一個並討論它們的用例、優點和缺點。
讓我們先在pom.xml
中加入 Kafka Client API依賴項:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.1</version>
</dependency>
3.1.自動提交
這是提交偏移量最簡單的方法。預設情況下,Kafka 使用自動提交-**每五秒提交poll()
方法傳回的最大偏移量**。 poll()
傳回一組超時時間為10
秒的訊息,我們可以在程式碼中看到:
KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(KafkaConfigProperties.getTopic());
ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));
for (ConsumerRecord<Long, String> message : messages) {
// processed message
}
自動提交的問題是,如果應用程式發生故障,資料遺失的可能性非常高。當poll()
傳回訊息時, Kafka 可能會在處理訊息之前提交最大的偏移量。
假設poll()
傳回 100 則訊息,而消費者在自動提交發生時處理 60 則訊息。然後,由於某些故障,消費者崩潰了。當新的消費者上線讀取訊息時,它從偏移量 101 開始讀取,導致 61 到 100 之間的訊息遺失。
因此,我們需要其他不存在此缺點的方法。答案是手動提交。
3.2.手動同步提交
在手動提交中,無論是同步或非同步,都需要透過將預設屬性( enabled.auto.commit
屬性)設為false
來停用自動提交:
Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
停用手動提交後,現在讓我們了解commitSync()
的用法:
KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(KafkaConfigProperties.getTopic());
ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));
//process the messages
consumer.commitSync();
此方法透過僅在處理訊息後提交偏移量來防止資料遺失。但是,當消費者在提交偏移量之前崩潰時,它並不能防止重複讀取。除此之外,它還會影響應用程式效能。
commitSync()
會阻塞程式碼直到完成。此外,如果出現錯誤,它會繼續重試。這會降低應用程式的吞吐量,這是我們不希望的。因此,Kafka 提供了另一種解決方案,即非同步提交,來解決這些缺點。
3.3.手動異步提交
Kafka 提供commitAsync()
來非同步提交偏移量。它透過在不同線程中提交偏移量來克服手動同步提交的效能開銷。讓我們實作一個非同步提交來理解這一點:
KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(KafkaConfigProperties.getTopic());
ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));
//process the messages
consumer.commitAsync();
非同步提交的問題是它在失敗時不會重試。它依賴下一次調用commitAsync()
,這將提交最新的偏移量。
假設 300 是我們想要提交的最大偏移量,但由於某些問題,我們的commitAsync()
失敗了。在重試之前, commitAsync()
的另一個呼叫可能會提交最大偏移量 400,因為它是非同步的。當commitAsync()
重試失敗時,如果成功提交偏移量 300,它將覆蓋先前提交的 400,從而導致重複讀取。這就是commitAsync()
不重試的原因。
3.4.提交特定偏移量
有時,我們需要對偏移量進行更多控制。假設我們正在小批量處理訊息,並希望在處理訊息後立即提交偏移量。我們可以使用commitSync()
和commitAsync()
的重載方法,它接受一個映射參數來提交特定的偏移量:
KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(KafkaConfigProperties.getTopic());
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int messageProcessed = 0;
while (true) {
ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));
for (ConsumerRecord<Long, String> message : messages) {
// processed one message
messageProcessed++;
currentOffsets.put(
new TopicPartition(message.topic(), message.partition()),
new OffsetAndMetadata(message.offset() + 1));
if (messageProcessed%50==0){
consumer.commitSync(currentOffsets);
}
}
}
在此程式碼中,我們管理一個 currentOffsets 映射,它以TopicPartition
為鍵,以OffsetAndMetadata
為值。我們在訊息處理過程中將已處理訊息的TopicPartition
和OffsetAndMetadata
插入到currentOffsets
映射中。當處理的訊息數量達到 50 時,我們使用currentOffsets
映射呼叫commitSync()
將這些訊息標記為已提交。
這種方式的行為與同步和非同步提交相同。唯一的區別是,我們在這裡決定要提交的偏移量,而不是 Kafka。
4。結論
在本文中,我們了解了偏移量及其在 Kafka 中的重要性。此外,我們還探索了四種提交偏移量的方法,包括手動和自動。最後我們分析了它們各自的優缺點。我們可以得出結論,在 Kafka 中沒有明確的最佳提交方式;相反,這取決於具體的用例。
本文中使用的所有程式碼範例都可以在 GitHub 上找到。