確保 Kafka 中的消息排序:策略和配置
1. 概述
在本文中,我們將探討 Apache Kafka 中訊息排序的挑戰和解決方案。以正確的順序處理訊息對於維護分散式系統中的資料完整性和一致性至關重要。雖然 Kafka 提供了維護訊息順序的機制,但在分散式環境中實現這一點存在其自身的複雜性。
2. 分區內的排序及其挑戰
Kafka 透過為每個訊息分配唯一的偏移量來維護單一分割區內的順序。這保證了該分區內的順序訊息附加。然而,當我們擴展並使用多個分區時,維護全局秩序變得複雜。不同的分區以不同的速率接收訊息,這使得它們之間的嚴格排序變得複雜。
2.1.生產者與消費者時序
我們先來說說Kafka是如何處理訊息的順序的。生產者發送訊息的順序和消費者接收訊息的順序略有不同。只堅持一個分區,我們按照訊息到達代理的順序處理訊息。但是,此順序可能與我們最初發送它們的順序不符。由於網路延遲或我們重新發送訊息等原因,可能會發生這種混淆。為了保持一致,我們可以透過確認和重試來實現生產者。這樣,我們確保訊息不僅到達 Kafka,而且以正確的順序到達。
2.2.多個分區的挑戰
這種跨分區的分佈雖然有利於可擴展性和容錯性,但也帶來了實作全域訊息排序的複雜性。例如,我們按順序發送兩個訊息:M1 和 M2。 Kafka 取得它們就像我們發送它們一樣,但它將它們放在不同的分區中。這裡有一個問題,僅僅因為 M1 先發送並不意味著它會在 M2 之前被處理。在處理順序至關重要的場景(例如金融交易)中,這可能具有挑戰性。
2.3.單分區訊息排序
我們建立名為'single_partition_topic'
的主題(有 1 個分區)和'multi_partition_topic',
有 5 個分區)。以下是具有單一分區的主題的範例,其中生產者向該主題發送訊息:
Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName());
producer = new KafkaProducer<>(producerProperties);
for (long sequenceNumber = 1; sequenceNumber <= 10; sequenceNumber++) {
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
userEvent.setGlobalSequenceNumber(sequenceNumber);
userEvent.setEventNanoTime(System.nanoTime());
ProducerRecord<Long, UserEvent> producerRecord = new ProducerRecord<>(Config.SINGLE_PARTITION_TOPIC, userEvent);
Future<RecordMetadata> future = producer.send(producerRecord);
sentUserEventList.add(userEvent);
RecordMetadata metadata = future.get();
logger.info("User Event ID: " + userEvent.getUserEventId() + ", Partition : " + metadata.partition());
}
UserEvent
是實作Comparable
介面的 POJO 類,有助於按globalSequenceNumber
(外部序號)對訊息類進行排序。由於生產者正在發送 POJO 訊息對象,因此我們實作了自訂 Jackson 序列化器和反序列化器。
分區0接收所有使用者事件,事件ID依下列順序出現:
841e593a-bca0-4dd7-9f32-35792ffc522e
9ef7b0c0-6272-4f9a-940d-37ef93c59646
0b09faef-2939-46f9-9c0a-637192e242c5
4158457a-73cc-4e65-957a-bf3f647d490a
fcf531b7-c427-4e80-90fd-b0a10bc096ca
23ed595c-2477-4475-a4f4-62f6cbb05c41
3a36fb33-0850-424c-81b1-dafe4dc3bb18
10bca2be-3f0e-40ef-bafc-eaf055b4ee26
d48dcd66-799c-4645-a977-fa68414ca7c9
7a70bfde-f659-4c34-ba75-9b43a5045d39
在 Kafka 中,每個消費者群體作為一個不同的實體運作。如果兩個消費者屬於不同的消費者群組,那麼他們都會收到該主題的所有訊息。這是因為Kafka 將每個消費者群組視為單獨的訂閱者。
如果兩個消費者屬於同一個消費者群組並訂閱具有多個分區的主題, Kafka將確保**每個消費者從一組唯一的分區中讀取資料**。這是為了允許並發處理訊息。
Kafka 確保在一個消費者群組內,沒有兩個消費者讀取相同的訊息,因此每個群組中每個訊息僅處理一次。
以下的程式碼適用於消費者消費來自同一主題的訊息:
Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, UserEvent.class);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
consumer = new KafkaConsumer<>(consumerProperties);
consumer.subscribe(Collections.singletonList(Config.SINGLE_PARTITION_TOPIC));
ConsumerRecords<Long, UserEvent> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> {
UserEvent userEvent = record.value();
receivedUserEventList.add(userEvent);
logger.info("User Event ID: " + userEvent.getUserEventId());
});
在這種情況下,我們得到的輸出顯示消費者以相同的順序消費訊息,以下是輸出中的順序事件 ID:
841e593a-bca0-4dd7-9f32-35792ffc522e
9ef7b0c0-6272-4f9a-940d-37ef93c59646
0b09faef-2939-46f9-9c0a-637192e242c5
4158457a-73cc-4e65-957a-bf3f647d490a
fcf531b7-c427-4e80-90fd-b0a10bc096ca
23ed595c-2477-4475-a4f4-62f6cbb05c41
3a36fb33-0850-424c-81b1-dafe4dc3bb18
10bca2be-3f0e-40ef-bafc-eaf055b4ee26
d48dcd66-799c-4645-a977-fa68414ca7c9
7a70bfde-f659-4c34-ba75-9b43a5045d39
2.4.多分區訊息排序
對於具有多個分區的主題,消費者和生產者配置是相同的。唯一的區別是訊息所在的主題和分區,生產者將訊息發送到主題“ multi_partition_topic'
:
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(Config.MULTI_PARTITION_TOPIC, sequenceNumber, userEvent));
sentUserEventList.add(userEvent);
RecordMetadata metadata = future.get();
logger.info("User Event ID: " + userEvent.getUserEventId() + ", Partition : " + metadata.partition());
消費者消費來自同一主題的訊息:
consumer.subscribe(Collections.singletonList(Config.MULTI_PARTITION_TOPIC));
ConsumerRecords<Long, UserEvent> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> {
UserEvent userEvent = record.value();
receivedUserEventList.add(userEvent);
logger.info("User Event ID: " + userEvent.getUserEventId());
});
生產者輸出列出了事件 ID 及其各自的分區,如下所示:
939c1760-140e-4d0c-baa6-3b1dd9833a7d, 0
47fdbe4b-e8c9-4b30-8efd-b9e97294bb74, 4
4566a4ec-cae9-4991-a8a2-d7c5a1b3864f, 4
4b061609-aae8-415f-94d7-ae20d4ef1ca9, 3
eb830eb9-e5e9-498f-8618-fb5d9fc519e4, 2
9f2a048f-eec1-4c68-bc33-c307eec7cace, 1
c300f25f-c85f-413c-836e-b9dcfbb444c1, 0
c82efad1-6287-42c6-8309-ae1d60e13f5e, 4
461380eb-4dd6-455c-9c92-ae58b0913954, 4
43bbe38a-5c9e-452b-be43-ebb26d58e782, 3
對於消費者來說,輸出將顯示消費者沒有以相同的順序消費訊息。輸出中的事件 ID 如下:
939c1760-140e-4d0c-baa6-3b1dd9833a7d
47fdbe4b-e8c9-4b30-8efd-b9e97294bb74
4566a4ec-cae9-4991-a8a2-d7c5a1b3864f
c82efad1-6287-42c6-8309-ae1d60e13f5e
461380eb-4dd6-455c-9c92-ae58b0913954
eb830eb9-e5e9-498f-8618-fb5d9fc519e4
4b061609-aae8-415f-94d7-ae20d4ef1ca9
43bbe38a-5c9e-452b-be43-ebb26d58e782
c300f25f-c85f-413c-836e-b9dcfbb444c1
9f2a048f-eec1-4c68-bc33-c307eec7cace
3. 訊息排序策略
3.1.使用單一分區
我們可以在 Kafka 中使用單一分區,如前面使用'single_partition_topic'
的範例所示,這可以確保訊息的排序。然而,這種方法也有其弊端:
- 吞吐量限制:假設我們在一家繁忙的披薩店。如果我們只有一名廚師(生產者)和一名服務員(消費者)在一張桌子(分區)上工作,那麼在事情開始備份之前,他們只能提供這麼多的披薩。在 Kafka 的世界中,當我們處理大量訊息時,堅持使用單一分區就像單表場景一樣。在大容量場景中,單一分割區成為瓶頸,並且訊息處理速率受到限制,因為一次只有一個生產者和一個消費者可以對單一分割區進行操作。
- 減少並行性:在上面的範例中,如果我們有多個廚師(生產者)和服務員(消費者)在多個表格(分區)上工作,那麼完成的訂單數量就會增加。 Kafka 的優勢在於跨多個分區的並行處理。如果只有一個分區,這一優勢就會喪失,導致順序處理並進一步限制訊息流
本質上,雖然單一分區保證了順序,但它是以降低吞吐量為代價的。
3.2.帶有時間視窗緩衝的外部排序
在這種方法中,生產者會用全域序號標記每個訊息。多個消費者實例同時消費來自不同分區的訊息,並使用這些序號對訊息進行重新排序,以確保全域順序。
在具有多個生產者的現實場景中,我們將透過可跨所有生產者程序存取的共享資源來管理全域序列,例如資料庫序列或分散式計數器。這確保了序號在所有訊息中都是唯一的並且是有序的,無論哪個生產者發送它們:
for (long sequenceNumber = 1; sequenceNumber <= 10 ; sequenceNumber++) {
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
userEvent.setEventNanoTime(System.nanoTime());
userEvent.setGlobalSequenceNumber(sequenceNumber);
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(Config.MULTI_PARTITION_TOPIC, sequenceNumber, userEvent));
sentUserEventList.add(userEvent);
RecordMetadata metadata = future.get();
logger.info("User Event ID: " + userEvent.getUserEventId() + ", Partition : " + metadata.partition());
}
在消費者方面,我們將訊息分組到時間視窗中,然後按順序處理它們。在特定時間範圍內到達的訊息我們將其一起批處理,一旦視窗過去,我們就會處理該批次。這確保了該時間範圍內的消息按順序處理,即使它們在視窗內的不同時間到達。消費者在處理之前緩衝訊息並根據序號對它們重新排序。我們需要確保訊息按照正確的順序處理,為此,消費者應該有一個緩衝期,在處理緩衝的訊息之前多次輪詢訊息,並且這個緩衝期足夠長以應對潛在的訊息排序問題:
consumer.subscribe(Collections.singletonList(Config.MULTI_PARTITION_TOPIC));
List<UserEvent> buffer = new ArrayList<>();
long lastProcessedTime = System.nanoTime();
ConsumerRecords<Long, UserEvent> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> {
buffer.add(record.value());
});
while (!buffer.isEmpty()) {
if (System.nanoTime() - lastProcessedTime > BUFFER_PERIOD_NS) {
processBuffer(buffer, receivedUserEventList);
lastProcessedTime = System.nanoTime();
}
records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> {
buffer.add(record.value());
});
}
void processBuffer(List buffer, List receivedUserEventList) {
Collections.sort(buffer);
buffer.forEach(userEvent -> {
receivedUserEventList.add(userEvent);
logger.info("Processing message with Global Sequence number: " + userEvent.getGlobalSequenceNumber() + ", User Event Id: " + userEvent.getUserEventId());
});
buffer.clear();
}
每個事件 ID 都出現在輸出中及其對應的分區旁邊,如下所示:
d6ef910f-2e65-410d-8b86-fa0fc69f2333, 0
4d6bfe60-7aad-4d1b-a536-cc735f649e1a, 4
9b68dcfe-a6c8-4cca-874d-cfdda6a93a8f, 4
84bd88f5-9609-4342-a7e5-d124679fa55a, 3
55c00440-84e0-4234-b8df-d474536e9357, 2
8fee6cac-7b8f-4da0-a317-ad38cc531a68, 1
d04c1268-25c1-41c8-9690-fec56397225d, 0
11ba8121-5809-4abf-9d9c-aa180330ac27, 4
8e00173c-b8e1-4cf7-ae8c-8a9e28cfa6b2, 4
e1acd392-db07-4325-8966-0f7c7a48e3d3, 3
具有全域序號和事件 ID 的消費者輸出:
1, d6ef910f-2e65-410d-8b86-fa0fc69f2333
2, 4d6bfe60-7aad-4d1b-a536-cc735f649e1a
3, 9b68dcfe-a6c8-4cca-874d-cfdda6a93a8f
4, 84bd88f5-9609-4342-a7e5-d124679fa55a
5, 55c00440-84e0-4234-b8df-d474536e9357
6, 8fee6cac-7b8f-4da0-a317-ad38cc531a68
7, d04c1268-25c1-41c8-9690-fec56397225d
8, 11ba8121-5809-4abf-9d9c-aa180330ac27
9, 8e00173c-b8e1-4cf7-ae8c-8a9e28cfa6b2
10, e1acd392-db07-4325-8966-0f7c7a48e3d3
3.3.帶有緩衝的外部排序的注意事項
在這種方法中,每個消費者實例緩衝訊息並根據訊息的序號按順序處理它們。但是,有一些注意事項:
- 緩衝區大小:緩衝區的大小可以根據傳入訊息的數量而增加。在按序號優先嚴格排序的實作中,我們可能會看到緩衝區顯著成長,特別是在訊息傳遞存在延遲的情況下。例如,如果我們每分鐘處理 100 個訊息,但由於延遲突然收到 200 個訊息,則緩衝區將意外增長。因此,我們必須有效地管理緩衝區大小,並準備好策略,以防它超出預期限制
- 延遲:當我們緩衝訊息時,我們本質上是讓它們在處理之前等待一段時間(引入延遲)。一方面,它幫助我們維持秩序;另一方面,它會減慢整個過程。這一切都是為了在維持秩序和最小化延遲之間找到適當的平衡
- 失敗:如果消費者失敗,我們可能會失去緩衝的訊息,為了防止這種情況,我們可能需要定期保存緩衝區的狀態
- 遲到的訊息:在其視窗後處理後到達的訊息將不按順序排列。根據用例,我們可能需要策略來處理或丟棄此類訊息
- 狀態管理:如果處理涉及有狀態操作,我們將需要跨視窗管理和持久狀態的機制。
- 資源利用:在緩衝區中保存大量訊息需要記憶體。我們需要確保有足夠的資源來處理這個問題,特別是當訊息在緩衝區中停留較長時間時
3.4.冪等生產者
Kafka 的冪等生產者功能旨在精確地傳遞訊息一次,從而防止任何重複。這在生產者可能由於網路錯誤或其他暫時性故障而重試發送訊息的情況下至關重要。雖然冪等性的主要目標是防止訊息重複,但它間接影響訊息排序。 Kafka 使用兩個東西來實現冪等性:生產者 ID (PID) 和序號,序號充當冪等鍵,並且在特定分區的上下文中是唯一的。
- 序號:Kafka 為生產者發送的每個訊息分配序號。這些序號對於每個分區都是唯一的,確保訊息在由生產者以特定序列發送時,在被 Kafka 接收後以相同的順序寫入特定分區內。序號保證單一分區內的順序。但是,當向多個分區產生訊息時,跨分區沒有全域順序保證。例如,如果生產者分別將訊息 M1、M2 和 M3 傳送到分區 P1、P2 和 P3,則每個訊息都會在其分區內收到唯一的序號。但是,它不保證這些分區之間的相對消費順序
- 生產者 ID (PID):啟用冪等性時,代理程式會為每個生產者指派唯一的生產者 ID (PID)。此 PID 與序號結合,使 Kafka 能夠識別並丟棄生產者重試產生的任何重複訊息
借助序號,Kafka 透過按照訊息產生的順序將訊息寫入分區來保證訊息排序,並使用 PID 和冪等性功能防止重複。要啟用冪等生產者,我們需要在生產者的配置中將“enable.idempotence”
屬性設為 true:
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
4. 生產者和消費者的關鍵配置
Kafka 生產者和消費者的一些關鍵配置會影響訊息排序和吞吐量。
4.1.生產者配置
-
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
:如果我們發送一堆訊息,那麼 Kafka 中的此設定有助於確定我們可以發送多少訊息而無需等待「已讀」收據。如果我們將其設為高於 1 而不打開冪等性,那麼如果我們必須重新發送訊息,最終可能會擾亂訊息的順序。但是,如果我們打開冪等性,即使我們一次發送一堆訊息,Kafka 也會保持訊息順序。對於超級嚴格的順序,例如確保在發送下一則訊息之前讀取每條訊息,我們應該將此值設為1。如果我們想要優先考慮速度和完美的順序,那麼我們可以設定為5,但這可能會引入訂購問題。 -
BATCH_SIZE_CONFIG and LINGER_MS_CONFIG
:Kafka 控制預設批量大小(以位元組為單位),旨在將同一分區的記錄分組為更少的請求,以獲得更好的效能。如果我們將此限制設置得太低,我們將派出大量小團體,這可能會減慢我們的速度。但如果我們將其設置得太高,則可能無法充分利用我們的內存。如果群組尚未滿,Kafka 可以在發送群組之前稍等一下。此等待時間由 LINGER_MS_CONFIG 控制。如果更多訊息的速度足以填滿我們設定的限制,它們會立即發送,但如果沒有,Kafka 不會繼續等待 - 它會在時間到時發送我們擁有的任何訊息。這就像平衡速度和效率,確保我們一次發送足夠的訊息,而不會造成不必要的延遲。
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
props.put(ProducerConfig.LINGER_MS_CONFIG, "5");
4.2.消費者配置
-
MAX_POLL_RECORDS_CONFIG
:這是我們的 Kafka 消費者每次要求資料時取得多少記錄的限制。如果我們將這個數字設定得較高,我們就可以一次處理大量數據,從而提高吞吐量。但有個問題——我們拿的越多,保持一切井然有序可能就越困難。因此,我們需要找到高效但又不至於不知所措的最佳點 -
FETCH_MIN_BYTES_CONFIG
:如果我們將此數字設定得較高,Kafka 會等待,直到有足夠的資料來滿足我們的最小位元組數,然後再將其發送出去。這可能意味著更少的行程(或獲取),這對於提高效率非常有用。但如果我們很著急並且想要快速獲取數據,我們可能會將此數字設置得較低,這樣 Kafka 就會更快地向我們發送它擁有的數據。例如,如果我們的消費者應用程式是資源密集型的或需要維護嚴格的訊息順序,尤其是多線程,則較小的批次可能會有益 -
FETCH_MAX_WAIT_MS_CONFIG
:這將決定我們的消費者等待 Kafka 收集足夠資料以滿足我們的 FETCH_MIN_BYTES_CONFIG 需要多長時間。如果我們將此時間設定得較高,我們的消費者願意等待更長時間,有可能一次獲得更多數據。但如果我們很趕時間,我們會將其設定得較低,這樣我們的消費者就能更快地獲取數據,即使數據沒有那麼多。這是等待更大的收穫和讓事情快速進展之間的平衡行為
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1");
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
5. 結論
在本文中,我們深入研究了 Kafka 中訊息排序的複雜性。我們探討了這些挑戰並提出了應對這些挑戰的策略。無論是透過單一分區、帶有時間視窗緩衝的外部排序,還是冪等生產者,Kafka 都提供客製化解決方案來滿足訊息排序的需求。
與往常一樣,範例的原始程式碼可以在GitHub上找到。