管理 Kafka 消費者組
一、簡介
消費者群組允許多個消費者讀取相同主題,從而幫助創建更具可擴展性的 Kafka 應用程式。
在本教程中,我們將了解消費者群組以及他們如何重新平衡消費者之間的分區。
2.什麼是消費群?
消費者組是與一個或多個主題相關的一組唯一消費者。每個消費者可以從零個、一個或多個分區讀取資料。此外,每個分區在給定時間只能分配給單一消費者。分區分配隨著群組成員的變化而變化。這稱為組再平衡。
消費者組是Kafka應用程式的重要組成部分。這允許對相似的消費者進行分組,並使他們可以從分區主題中並行讀取。因此,它提高了 Kafka 應用程式的效能和可擴展性。
2.1.小組協調員和小組領導
當我們實例化一個消費者群組時,Kafka 也會創建群組協調器。組協調員定期接收來自消費者的請求,稱為心跳。如果消費者停止發送心跳,協調器就會認為消費者已經離開組或崩潰了。這是分區重新平衡的可能觸發因素之一。
第一個請求組協調員加入組的消費者成為組長。當因任何原因發生重新平衡時,組領導者會從組協調員收到組員清單。然後,群組領導者使用在partition.assignment.strategy
配置中設定的可自訂策略在該清單中的消費者之間重新分配分區。
2.2.承諾抵銷額
Kafka 使用提交的偏移量來追蹤從主題讀取的最後位置。提交的偏移量是消費者確認已成功處理的主題中的位置。換句話說,它是自己和其他消費者在後續輪次中讀取事件的起點。
Kafka 將所有分區的提交偏移量儲存在名為__consumer_offsets
的內部主題中。我們可以安全地信任它的訊息,因為主題對於複製的代理來說是持久且容錯的。
2.3.分區重新平衡
分區重新平衡將分區所有權從一個使用者變更為另一個使用者。當新的消費者加入群組或群組的消費者成員崩潰或取消訂閱時,Kafka會自動執行重新平衡。
為了提高可擴展性,當新的消費者加入該群組時,Kafka 與新添加的消費者公平地共享其他消費者的分區。此外,當消費者崩潰時,必須將其分區分配給群組中的剩餘消費者,以避免丟失任何未處理的訊息。
分區重新平衡使用__consumer_offsets
主題使消費者開始從正確的位置讀取重新分配的分區。
在重新平衡期間,消費者無法消費訊息。換句話說,在重新平衡完成之前,代理將變得不可用。此外,消費者會遺失其狀態並需要重新計算其快取值。分區重新平衡期間的不可用和快取重新計算使得事件消耗更慢。
3. 設定應用程式
在本節中,我們將配置基礎知識以啟動並執行 Spring Kafka 應用程式。
3.1.建立基本配置
首先,讓我們來配置主題及其分區:
@Configuration
public class KafkaTopicConfiguration {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
public NewTopic celciusTopic() {
return TopicBuilder.name("topic-1")
.partitions(2)
.build();
}
}
上面的配置很簡單。我們只是配置一個名為topic-1
的新主題,其中包含兩個分區。
現在,讓我們來配置生產者:
@Configuration
public class KafkaProducerConfiguration {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, Double> kafkaProducer() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DoubleSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Double> kafkaProducerTemplate() {
return new KafkaTemplate<>(kafkaProducer());
}
}
在上面的 Kafka 生產者配置中,我們設定代理位址和它們用來寫入訊息的序列化器。
最後,讓我們來配置消費者:
@Configuration
public class KafkaConsumerConfiguration {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public ConsumerFactory<String, Double> kafkaConsumer() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DoubleDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Double> kafkaConsumerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Double> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumer());
return factory;
}
}
3.2.設定消費者
在我們的演示應用程式中,我們將從屬於topic-1
中名為group-1
的同一組的兩個消費者開始:
@Service
public class MessageConsumerService {
@KafkaListener(topics = "topic-1", groupId = "group-1")
public void consumer0(ConsumerRecord<?, ?> consumerRecord) {
trackConsumedPartitions("consumer-0", consumerRecord);
}
@KafkaListener(topics = "topic-1", groupId = "group-1")
public void consumer1(ConsumerRecord<?, ?> consumerRecord) {
trackConsumedPartitions("consumer-1", consumerRecord);
}
}
MessageConsumerService
類別使用@KafkaListener
註解註冊兩個消費者來監聽group-1
1 內的topic-
-1。
現在,我們也在MessageConsumerService
類別中定義一個欄位和一個方法來追蹤所使用的分區:
Map<String, Set<Integer>> consumedPartitions = new ConcurrentHashMap<>();
private void trackConsumedPartitions(String key, ConsumerRecord<?, ?> record) {
consumedPartitions.computeIfAbsent(key, k -> new HashSet<>());
consumedPartitions.computeIfPresent(key, (k, v) -> {
v.add(record.partition());
return v;
});
}
在上面的程式碼中,我們使用ConcurrentHashMap
將每個消費者名稱對應到該消費者使用的所有分區的HashSet
。
4. 消費者離開時可視化分區重新平衡
現在我們已經設定了所有配置並註冊了消費者,我們可以想像當其中一個消費者離開group-1
時 Kafka 會做什麼。為此,我們定義使用嵌入式代理程式的 Kafka 整合測試的框架:
@SpringBootTest(classes = ManagingConsumerGroupsApplicationKafkaApp.class)
@EmbeddedKafka(partitions = 2, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
public class ManagingConsumerGroupsIntegrationTest {
private static final String CONSUMER_1_IDENTIFIER = "org.springframework.kafka.KafkaListenerEndpointContainer#1";
private static final int TOTAL_PRODUCED_MESSAGES = 50000;
private static final int MESSAGE_WHERE_CONSUMER_1_LEAVES_GROUP = 10000;
@Autowired
KafkaTemplate<String, Double> kafkaTemplate;
@Autowired
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
MessageConsumerService consumerService;
}
在上面的程式碼中,我們注入了必要的bean來產生和消耗訊息: kafkaTemplate
和consumerService
。我們也注入了 bean kafkaListenerEndpointRegistry
來操作註冊的消費者。
最後,我們定義了將在我們的測試案例中使用的三個常數。
現在,讓我們定義測試案例方法:
@Test
public void givenContinuousMessageFlow_whenAConsumerLeavesTheGroup_thenKafkaTriggersPartitionRebalance() throws InterruptedException {
int currentMessage = 0;
do {
kafkaTemplate.send("topic-1", RandomUtils.nextDouble(10.0, 20.0));
currentMessage++;
if (currentMessage == MESSAGE_WHERE_CONSUMER_1_LEAVES_GROUP) {
String containerId = kafkaListenerEndpointRegistry.getListenerContainerIds()
.stream()
.filter(a -> a.equals(CONSUMER_1_IDENTIFIER))
.findFirst()
.orElse("");
MessageListenerContainer container = kafkaListenerEndpointRegistry.getListenerContainer(containerId);
Thread.sleep(2000);
Objects.requireNonNull(container).stop();
kafkaListenerEndpointRegistry.unregisterListenerContainer(containerId);
}
} while (currentMessage != TOTAL_PRODUCED_MESSAGES);
Thread.sleep(2000);
assertEquals(1, consumerService.consumedPartitions.get("consumer-1").size());
assertEquals(2, consumerService.consumedPartitions.get("consumer-0").size());
}
在上面的測試中,我們創建了一個訊息流,在某個時刻,我們刪除了一個消費者,這樣 Kafka 就會將其分區重新分配給剩餘的消費者。讓我們分解一下邏輯以使其更加透明:
- 主循環使用
kafkaTemplate
使用 Apache Commons 的RandomUtils
產生 50,000 個隨機數事件。當產生任意數量的消息(在我們的例子中為 10,000 條)時,我們會停止並從代理中取消註冊一個消費者。 - 要取消註冊消費者,我們首先使用流在容器中搜尋匹配的消費者,並使用
getListenerContainer()
方法檢索它。然後,我們呼叫stop()
來停止container
Spring 元件的執行。最後,我們呼叫unregisterListenerContainer()
以程式設計方式從 Kafka Broker 取消註冊與container
變數關聯的偵聽器。
在討論測試斷言之前,我們先來看看 Kafka 在測試執行過程中產生的幾行日誌。
要看到的第一個重要行是顯示consumer-1
向群組協調員發出的LeaveGroup
請求的行:
INFO oakcciConsumerCoordinator - [Consumer clientId=consumer-group-1-1, groupId=group-1] Member consumer-group-1-1-4eb63bbc-336d-44d6-9d41-0a862029ba95 sending LeaveGroup request to coordinator localhost:9092
然後,群組協調器會自動觸發重新平衡並顯示背後的原因:
INFO k.coordinator.group.GroupCoordinator - [GroupCoordinator 0]: Preparing to rebalance group group-1 in state PreparingRebalance with old generation 2 (__consumer_offsets-4) (reason: Removing member consumer-group-1-1-4eb63bbc-336d-44d6-9d41-0a862029ba95 on LeaveGroup)
回到我們的測試,我們將斷言分區重新平衡正確發生。由於我們註銷了以1
結尾的消費者,因此它的分區應該重新分配給剩餘的消費者,即consumer-0
。因此,我們使用追蹤的消費記錄映射來檢查consumer-1
僅從一個分區消費,而consumer-0
則從兩個分區消費。
4. 有用的消費者配置
現在,我們來討論一些影響分區重新平衡的消費者配置以及為它們設定特定值的權衡。
4.1.會話超時和心跳頻率
session.timeout.ms
參數指示群組協調器在觸發分區重新平衡之前可以等待消費者發送心跳的最長時間(以毫秒為單位)。除了session.timeout.ms
之外, heartbeat.interval.ms
指示消費者向群組協調器發送心跳的頻率(以毫秒為單位)。
我們應該一起修改消費者逾時和心跳頻率,以便heartbeat.interval.ms
始終低於[**session.timeout.ms**](https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#heartbeat-interval-ms) .
這是因為我們不想讓消費者在發送心跳之前因超時而死亡。通常,我們將心跳間隔設定為會話逾時的 33%,以確保在消費者死亡之前發送不只一個心跳。
預設消費者會話逾時設定為 45 秒。只要我們了解修改它的權衡,我們就可以修改該值。
當我們將會話逾時設定為低於預設值時,我們可以提高消費者群組從故障中恢復的速度,從而提高群組可用性。但是,在0.10.1.0
之前的 Kafka 版本中,如果消費者在消費訊息的時間超過會話逾時時間時,主執行緒被阻塞,消費者將無法發送心跳。因此,消費者被認為已死亡,並且組協調器觸發了不必要的分區重新平衡。這個問題在KIP-62中得到了修復,引入了一個僅發送心跳的後台線程。
如果我們為會話逾時設定更高的值,我們就會無法更快地偵測到故障。但是,這可能會解決上面提到的早於o.10.1.0
的 Kafka 版本的不必要的分區重新平衡問題。
4.2.最大輪詢間隔時間
另一個配置是**max.poll.interval.ms,
指示代理可以等待空閒消費者的最長時間**。經過該時間後,消費者將停止發送心跳,直到達到配置的會話逾時並離開群組。 max.poll.interval.ms
的預設等待時間為五分鐘。
如果我們為max.poll.interval.ms
設定更高的值,我們就會為消費者提供更多的空閒空間,這可能有助於避免重新平衡。但是,如果沒有消息可供消費,增加該時間也可能會增加空閒消費者的數量。這在低吞吐量環境中可能是一個問題,因為消費者可能會閒置更長時間,從而增加基礎設施成本。
5. 結論
在本文中,我們了解了小組領導者和小組協調員角色的基礎知識。我們也研究了 Kafka 如何管理消費者群組和分區。
我們在實踐中已經看到,當其中一個消費者離開群組時,Kafka 如何自動重新平衡群組內的分區。
必須了解 Kafka 何時觸發分區重新平衡並相應地調整消費者配置。
與往常一樣,本文的源代碼可以在 GitHub 上取得。