將資料傳送到Kafka中的特定分區
一、簡介
Apache Kafka 是一個分散式串流平台,擅長處理大量即時資料流。 Kafka將資料組織成主題,並進一步將主題劃分為分區。每個分區充當獨立通道,從而實現並行處理和容錯。
在本教程中,我們深入研究將資料傳送到 Kafka 中特定分區的技術。我們將探討與此方法相關的好處、實施方法和潛在挑戰。
2.了解Kafka分區
現在,讓我們探討一下 Kafka 分割區的基本概念。
2.1.什麼是 Kafka 分區
當生產者向 Kafka 主題發送訊息時,Kafka 使用指定的分區策略將這些訊息組織到分區中。分區是表示線性有序訊息序列的基本單位。一旦產生訊息,就會根據所選的分區策略將其指派給特定分區。隨後,該訊息將附加到該分區內日誌的末尾。
2.2.並行性和消費者組
一個Kafka主題可以分成多個分區,並且可以為一個消費者群組分配這些分區的子集。組內的每個消費者獨立於其分配的分區處理訊息。這種平行處理機制增強了整體吞吐量和可擴展性,使 Kafka 能夠有效率地處理大量資料。
2.3.訂購和加工保證
在單一分區內,Kafka 確保按照接收訊息的順序處理訊息。這保證了依賴訊息順序的應用程式(例如金融交易或事件日誌)的順序處理。但請注意,由於網路延遲和其他操作考慮因素,收到的順序訊息可能與最初發送的順序不同。
跨不同分區,Kafka 不強制保證順序。來自不同分區的訊息可以同時處理,從而引入事件順序變更的可能性。在設計依賴嚴格訊息排序的應用程式時,必須考慮此特性。
2.4.容錯和高可用性
分區也有助於 Kafka 卓越的容錯能力。每個分區可以跨多個代理進行複製。當broker發生故障時,複製的分區仍然可以被訪問,並確保對資料的連續訪問。
Kafka叢集可以無縫地將消費者重新導向到健康的經紀人,以保持資料可用性和系統的高可靠性。
3. 為什麼要發送資料到特定分區
在本節中,我們將探討向特定分區發送資料的原因。
3.1.數據親和力
資料親和性是指在同一分區內有意將相關資料分組。透過將相關資料傳送到特定分區,我們確保它們被一起處理,從而提高處理效率。
例如,考慮這樣一個場景:我們可能希望確保客戶的訂單駐留在同一分區中以進行訂單追蹤和分析。確保來自特定客戶的所有訂單最終都位於同一分區中,可以簡化追蹤和分析流程。
3.2.負載平衡
此外,跨分區均勻分佈資料有助於確保最佳的資源利用率。跨分區均勻分佈資料有助於優化 Kafka 叢集內的資源利用率。透過根據負載考慮將資料傳送到分區,我們可以防止資源瓶頸並確保每個分區接收到可管理且平衡的工作負載。
3.3.優先順序
在某些情況下,並非所有資料都具有相同的優先順序或緊迫性。 Kafka 的分區功能可將關鍵資料導向至專用分區以進行快速處理,從而確定關鍵資料的優先順序。與較不重要的訊息相比,這種優先順序可確保高優先權訊息得到及時關注和更快的處理。
4. 發送到特定分區的方法
Kafka 提供了各種將訊息分配給分區的策略,提供資料分佈和處理的靈活性。以下是一些可用於將訊息傳送到特定分區的常用方法。
4.1.黏性分區器
在 Kafka 2.4 及更高版本中,黏性分區器旨在將沒有鍵的訊息保留在同一分區中。然而,這種行為並不是絕對的,並且與批次設定(例如batch.size
和linger.ms
相互作用。
為了優化訊息傳遞,Kafka 在將訊息發送到代理之前將訊息分組。 batch.size
設定(預設 16,384 位元組)控制最大批處理大小,影響訊息在黏性分區程式下的相同分割區中保留的時間。
linger.ms
配置(預設值:0 毫秒)在發送批次之前引入延遲,可能會延長沒有密鑰的訊息的黏性行為。
在下列測試案例中,假設預設批次配置保持不變。我們將發送三個訊息,而無需明確分配密鑰。我們應該期望它們最初被分配到同一個分區:
kafkaProducer.send("default-topic", "message1");
kafkaProducer.send("default-topic", "message2");
kafkaProducer.send("default-topic", "message3");
await().atMost(2, SECONDS)
.until(() -> kafkaMessageConsumer.getReceivedMessages()
.size() >= 3);
List<ReceivedMessage> records = kafkaMessageConsumer.getReceivedMessages();
Set<Integer> uniquePartitions = records.stream()
.map(ReceivedMessage::getPartition)
.collect(Collectors.toSet());
Assert.assertEquals(1, uniquePartitions.size());
4.2.基於鍵的方法
在基於密鑰的方法中, Kafka將具有相同密鑰的訊息定向到同一分區,從而優化了相關資料的處理。這是透過雜湊函數實現的,確保訊息鍵到分區的確定性映射。
在此測試案例中,具有相同鍵partitionA
訊息應始終落在同一分區中。讓我們用以下程式碼片段來說明基於鍵的分區:
kafkaProducer.send("order-topic", "partitionA", "critical data");
kafkaProducer.send("order-topic", "partitionA", "more critical data");
kafkaProducer.send("order-topic", "partitionB", "another critical message");
kafkaProducer.send("order-topic", "partitionA", "another more critical data");
await().atMost(2, SECONDS)
.until(() -> kafkaMessageConsumer.getReceivedMessages()
.size() >= 4);
List<ReceivedMessage> records = kafkaMessageConsumer.getReceivedMessages();
Map<String, List<ReceivedMessage>> messagesByKey = groupMessagesByKey(records);
messagesByKey.forEach((key, messages) -> {
int expectedPartition = messages.get(0)
.getPartition();
for (ReceivedMessage message : messages) {
assertEquals("Messages with key '" + key + "' should be in the same partition", message.getPartition(), expectedPartition);
}
});
此外,使用基於金鑰的方法,共用相同金鑰的訊息將按照它們在特定分區中產生的順序一致地接收。這保證了分區內訊息順序的保存,尤其是相關訊息。
在這個測試案例中,我們以特定的順序產生帶有鍵partitionA
訊息,並且測試主動驗證這些訊息在分區內是否以相同的順序接收:
kafkaProducer.send("order-topic", "partitionA", "message1");
kafkaProducer.send("order-topic", "partitionA", "message3");
kafkaProducer.send("order-topic", "partitionA", "message4");
await().atMost(2, SECONDS)
.until(() -> kafkaMessageConsumer.getReceivedMessages()
.size() >= 3);
List<ReceivedMessage> records = kafkaMessageConsumer.getReceivedMessages();
StringBuilder resultMessage = new StringBuilder();
records.forEach(record -> resultMessage.append(record.getMessage()));
String expectedMessage = "message1message3message4";
assertEquals("Messages with the same key should be received in the order they were produced within a partition",
expectedMessage, resultMessage.toString());
4.3.自訂分區
為了進行細粒度控制,Kafka 允許定義自訂分區器。這些類別實現了Partitioner
接口,使我們能夠根據訊息內容、元資料或其他因素編寫邏輯來確定目標分區。
在本節中,我們將在將訂單分派到 Kafka 主題時根據客戶類型建立自訂分區邏輯。具體來說,高級客戶訂單將被導向到一個分區,而普通客戶訂單將找到另一個分區。
首先,我們建立一個名為CustomPartitioner
的類,繼承自 Kafka Partitioner
介面。在此類中,我們使用自訂邏輯來重寫partition()
方法來確定每個訊息的目標分區:
public class CustomPartitioner implements Partitioner {
private static final int PREMIUM_PARTITION = 0;
private static final int NORMAL_PARTITION = 1;
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
String customerType = extractCustomerType(key.toString());
return "premium".equalsIgnoreCase(customerType) ? PREMIUM_PARTITION : NORMAL_PARTITION;
}
private String extractCustomerType(String key) {
String[] parts = key.split("_");
return parts.length > 1 ? parts[1] : "normal";
}
// more methods
}
接下來,要在 Kafka 中套用此自訂分區器,我們需要在生產者設定中設定PARTITIONER_CLASS_CONFIG
屬性。 Kafka 將使用此分區器根據CustomPartitioner
類別中定義的邏輯來確定每個訊息的分區。
setProducerToUseCustomPartitioner()
方法用於設定 Kafka 生產者以使用CustomPartitioner:
private KafkaTemplate<String, String> setProducerToUseCustomPartitioner() {
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafkaBroker.getBrokersAsString());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerProps);
return new KafkaTemplate<>(producerFactory);
}
然後,我們建立一個測試案例,以確保自訂分區邏輯正確地將高級和普通客戶訂單路由到各自的分區:
KafkaTemplate<String, String> kafkaTemplate = setProducerToUseCustomPartitioner();
kafkaTemplate.send("order-topic", "123_premium", "Order 123, Premium order message");
kafkaTemplate.send("order-topic", "456_normal", "Normal order message");
await().atMost(2, SECONDS)
.until(() -> kafkaMessageConsumer.getReceivedMessages()
.size() >= 2);
consumer.assign(Collections.singletonList(new TopicPartition("order-topic", 0)));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
assertEquals("Premium order message should be in partition 0", 0, record.partition());
assertEquals("123_premium", record.key());
}
4.4.直接分區分配
當手動在主題之間遷移資料或調整跨分區的資料分佈時,直接分區分配可以幫助控制訊息放置。 Kafka 也提供使用接受分區號碼的ProductRecord
建構函式將訊息直接傳送到特定分割區的功能。透過指定分區號,我們可以明確指定每個訊息的目標分區。
在此測試案例中,我們指定send()
方法中的第二個參數來接收分區號:
kafkaProducer.send("order-topic", 0, "123_premium", "Premium order message");
kafkaProducer.send("order-topic", 1, "456_normal", "Normal order message");
await().atMost(2, SECONDS)
.until(() -> kafkaMessageConsumer.getReceivedMessages()
.size() >= 2);
List<ReceivedMessage> records = kafkaMessageConsumer.getReceivedMessages();
for (ReceivedMessage record : records) {
if ("123_premium".equals(record.getKey())) {
assertEquals("Premium order message should be in partition 0", 0, record.getPartition());
} else if ("456_normal".equals(record.getKey())) {
assertEquals("Normal order message should be in partition 1", 1, record.getPartition());
}
}
5. 從特定分區消費
要在消費者端消費 Kafka 中特定分區的數據,我們可以使用KafkaConsumer.assign()
方法指定要訂閱的分區。這可以對消耗進行細粒度的控制,但需要手動管理分區偏移。
以下是使用assign()
方法從特定分區消費訊息的範例:
KafkaTemplate<String, String> kafkaTemplate = setProducerToUseCustomPartitioner();
kafkaTemplate.send("order-topic", "123_premium", "Order 123, Premium order message");
kafkaTemplate.send("order-topic", "456_normal", "Normal order message");
await().atMost(2, SECONDS)
.until(() -> kafkaMessageConsumer.getReceivedMessages()
.size() >= 2);
consumer.assign(Collections.singletonList(new TopicPartition("order-topic", 0)));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
assertEquals("Premium order message should be in partition 0", 0, record.partition());
assertEquals("123_premium", record.key());
}
6. 潛在的挑戰與考慮因素
當向特定分區發送訊息時,存在分區之間負載分佈不均勻的風險。如果用於分割區的邏輯未在所有分割區之間均勻分佈訊息,則可能會發生這種情況。此外,擴展 Kafka 叢集(涉及新增或刪除代理)可能會觸發分割區重新分配。在重新分配期間,代理可能會移動分區,這可能會破壞訊息的順序或導致暫時無法使用。
因此,我們應該使用Kafka工具或指標來定期監控每個分割區上的負載。例如,Kafka 管理用戶端和 Micrometer 可以幫助深入了解分區健康狀況和效能。我們可以使用管理客戶端來檢索有關主題、分區及其當前狀態的資訊;並使用 Micrometer 進行指標監控。
此外,預計需要主動調整分區策略或水平擴展 Kafka 集群,以有效管理特定分區上增加的負載。我們也可以考慮增加分區數量或調整鍵範圍以獲得更均勻的分佈。
七、結論
總而言之,向 Apache Kafka 中的特定分區發送訊息的能力為優化資料處理和提高整體系統效率提供了強大的可能性。
在本教程中,我們探索了將訊息定向到特定分區的各種方法,包括基於鍵的方法、自訂分區和直接分區分配。每種方法都具有獨特的優勢,使我們能夠根據應用的特定要求進行客製化。
與往常一樣,範例的原始程式碼可在 GitHub 上取得。