Kafka 中的重置消費者偏移
1. 概述
在 Apache Kafka 中,消費者偏移量是指示消費者應該從主題分區讀取的下一則訊息的指標。
通常情況下,我們不需要更改消費者應用程式中的已提交偏移量。但是,有時我們需要更改偏移量來重新處理或跳過訊息。更改偏移量的目的多種多樣,例如修復應用程式錯誤、重新處理下游故障、重建狀態或進行測試。
在本教程中,我們將學習如何在 Kafka 中重置消費者偏移量。我們將實作不同的方法,例如使用 Kafka CLI 工具、以程式方式重置消費者偏移量以及使用Kafka Admin API。
2. 使用 Kafka CLI 重置
Kafka CLI工具可以協助檢視、管理和重設消費者偏移量。這對於任何一次性更改或測試期間都非常有用。
2.1. 設定 Kafka
首先,我們將使用docker-compose.yml配置來配置一個以Kraft 模式運作的本機 Kafka 實例:
version: "3.8"
services:
kafka:
image: confluentinc/cp-kafka:7.9.0
hostname: kafka
container_name: kafka
ports:
- "9092:9092"
- "9101:9101"
expose:
- '29092'
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_LISTENERS: 'PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
接下來,我們將在終端機中運行上述服務:
$ docker compose up -d
接下來,我們使用kafka-topics指令建立一個test-topic主題:
$ docker exec kafka kafka-topics --bootstrap-server
localhost:9092 --create --topic test-topic --partitions 2 --replication-factor 1
讓我們產生並接收一些測試訊息。
2.2 生產者和消費者
我們將透過執行kafka-console-producer指令產生兩個測試訊息:
$ docker exec -it kafka kafka-console-producer --bootstrap-server localhost:9092 --topic test-topic
>test1
>test2
然後,我們將執行kafka-console-consumer指令來消費這些訊息:
$ docker exec -it kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic --group test-group
test1
test2
上述消費者讀取兩個訊息並提交偏移量。
2.3. 使用kafka-consumer-groups指令重置
讓我們執行kafka-consumer-groups的describe指令來確認目前已提交的偏移量:
$ docker exec kafka kafka-consumer-groups --bootstrap-server localhost:9092 --group test-group --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-group test-topic 0 0 0 0 console-consumer-636e6769-eead-4491-ac63-4179e2620dac /127.0.0.1 console-consumer
test-group test-topic 1 2 2 0 console-consumer-636e6769-eead-4491-ac63-4179e2620dac /127.0.0.1 console-consumer
在這個例子中,我們將偏移量重設為最早提交的偏移量值。
在重置任何偏移量之前,我們應該確保消費者服務已停止。 Kafka 不允許在活動消費者中重設偏移量。
首先,我們將使用kafka-consumer-groups –dry-run指令預覽新的偏移:
$ docker exec kafka kafka-consumer-groups --bootstrap-server
localhost:9092 --group test-group --topic test-topic --reset-offsets --to-earliest --dry-run
GROUP TOPIC PARTITION NEW-OFFSET
test-group test-topic 0 0
test-group test-topic 1 0
根據以上數據,我們在應用變更之前確認新的偏移值。
然後,讓我們執行kafka-consumer-groups的execute指令來重置偏移量:
$ docker exec kafka kafka-consumer-groups --bootstrap-server
localhost:9092 --group test-group --topic test-topic --reset-offsets --to-earliest --execute
最後,我們將執行kafka-console-consumer指令來重啟同一個消費者:
$ docker exec -it kafka kafka-console-consumer --bootstrap-server
localhost:9092 --topic test-topic --group test-group
test1
test2
上述消費者在偏移量重置後重播相同的兩個訊息。
以下是**kafka-consumer-group指令的其他–reset-offsets選項**:
- **
–to-latest –**移動到最新的偏移量,使消費者跳過先前的訊息並處理新資料。 -
–to-offset<offset>偏移量> – 將消費者移至指定的固定偏移量。 -
–shift-by<n>– 將偏移量向前 (+n) 或向後 (-n) 移動目前位置 -
–to-datetime<timestamp>– 將偏移量重設為與特定時間戳對應的位置 -
–by-duration<duration>– 重設到最近的某個時間點 -
–from-file<File>– 將偏移量重設為指定 CSV 檔案中指定的值
在生產環境中重置偏移量之前,我們應該謹慎行事,並在更改之前進行測試。
3. 在消費者中以程式方式重置
Kafka 提供了一個客戶端 API,用於以程式設計方式在消費者服務中尋找特定偏移量。新的偏移量位置可以是起始位置、最新位置、特定時間點或一個偏移值。
在這個例子中,我們將實作一個常見的用例:在特定時間後重播所有訊息。
3.1. 實現消費者重新平衡監聽器
要從所需的偏移位置或時間戳記進行查找,我們需要實作ConsumerRebalanceListener介面。 ConsumerRebalanceListener ConsumerRebalanceListener實作將包含分區分配的查找邏輯。
ConsumerRebalanceListener實作將包含一個必要的KafkaConsumer 、一個replayFromTime和一個seekDone標誌,用於檢查重播是否完成。
讓我們實作帶有onPartitionsAssigned ReplayRebalanceListener類別。 方法:
public class ReplayRebalanceListener implements ConsumerRebalanceListener {
private final KafkaConsumer<String, String> consumer;
private final long replayFromTimeInEpoch;
private final AtomicBoolean seekDone = new AtomicBoolean(false);
public ReplayRebalanceListener(KafkaConsumer<String, String> consumer, long replayFromTimeInEpoch) {
this.consumer = consumer;
this.replayFromTimeInEpoch = replayFromTimeInEpoch;
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
if (seekDone.get() || partitions.isEmpty()) {
return;
}
Map<TopicPartition, Long> partitionsTimestamp = partitions.stream()
.collect(Collectors.toMap(Function.identity(), tp -> replayFromTimeInEpoch));
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(partitionsTimestamp);
partitions.forEach(tp -> {
OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp);
if (offsetAndTimestamp != null) {
consumer.seek(tp, offsetAndTimestamp.offset());
}
});
seekDone.set(true);
}
}
在上面的程式碼中,我們有意根據每個分區的時間戳來尋找消費者。onPartitionsAssigned方法將在消費者開始輪詢ConsumerRecords之前運行。
此外,如果查找操作已經完成,則直接返回,不再重新查找。這樣做是為了避免因任何錯誤而導致不必要的重複請求。
3.2. 實現 Kafka 消費者服務
我們將使用上述ReplayRebalanceListener類別來實作 Kafka 消費者服務。
首先,我們將定義KafkaConsumerService類,其中包含一個KafkaConsumer和一個running布林標誌:
public class KafkaConsumerService {
private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class);
private final KafkaConsumer<String, String> consumer;
private final AtomicBoolean running = new AtomicBoolean(true);
public KafkaConsumerService(Properties consumerProps, String topic, long replayFromTimestampInEpoch) {
this.consumer = new KafkaConsumer<>(consumerProps);
if (replayFromTimestampInEpoch > 0) {
consumer.subscribe(List.of(topic),
new ReplayRebalanceListener(consumer, replayFromTimestampInEpoch));
} else {
consumer.subscribe(List.of(topic));
}
}
}
在上面的程式碼中,我們訂閱了 Kafka 消費者到該主題,並且僅當replayFromTimestampInEpoch非零時才包含ReplayRebalanceListener 。
我們將實作上述類別中的start方法來開始輪詢ConsumerRecords :
public void start() {
try {
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
records.forEach(record -> log.info("topic={} partition={} offset={} key={} value={}",
record.topic(), record.partition(), record.offset(), record.key(), record.value())););
consumer.commitSync();
}
} catch (WakeupException ex) {
if (running.get()) {
log.error("Error in the Kafka Consumer with exception {}", ex.getMessage(), ex);
throw ex;
}
} finally {
consumer.close();
}
}
在上面的程式碼中,消費者不斷輪詢主題,記錄消費者記錄,最後提交。
此外,我們還要添加shutdown方法,以便優雅地關閉消費者服務:
public void shutdown() {
running.set(false);
consumer.wakeup();
}
在 Spring 應用程式中,我們可以使用ConsumerSeekAware介面來實現相同的功能。
接下來,我們將測試KafkaConsumerService類別。
3.3 測試消費者服務
我們將使用 Kafka 的測試容器來實作KafkaConsumerService類別的整合測試。
讓我們考慮這樣一個測試案例:在特定時間戳之後重播消費者,然後使用與測試消費者相同的消費者群組來驗證消費者是否已被讀取。
我們將實作consumeFromCommittedOffset輔助方法來取得已提交的偏移量:
private List<String> consumeFromCommittedOffset(String topic, String groupId) {
List<String> values = new ArrayList<>();
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getConsumerConfig(groupId))) {
consumer.subscribe(Collections.singleton(topic));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord<String, String> r : records) {
values.add(r.value());
}
}
return values;
}
我們將透過發送兩個訊息來實現測試,然後在第一個訊息時間戳記之後重播消費者:
@Test
void givenConsumerReplayIsEnabled_whenReplayTimestampIsProvided_thenConsumesFromTimestamp() {
KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerConfig());
long firstMsgTs = System.currentTimeMillis();
producer.send(new ProducerRecord<>("test-topic-1", 0, firstMsgTs, "x1", "test1"));
producer.flush();
long baseTs = System.currentTimeMillis();
long secondMsgTs = baseTs + 1L;
producer.send(new ProducerRecord<>("test-topic-1", 0, secondMsgTs, "x2", "test2"));
producer.flush();
KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig("test-group-1"),
"test-topic-1",
baseTs);
new Thread(kafkaConsumerService::start).start();
Awaitility.await()
.atMost(45, TimeUnit.SECONDS)
.pollInterval(1, TimeUnit.SECONDS)
.untilAsserted(() -> {
List<String> consumed = consumeFromCommittedOffset("test-topic-1", "test-group-1");
assertEquals(0, consumed.size());
assertFalse(consumed.contains("test1"));
assertFalse(consumed.contains("test2"));
});
kafkaConsumerService.shutdown();
}
我們將驗證測試斷言和KafkaConsumerService日誌:
16:49:38.898 [Thread-5] INFO cbkrcKafkaConsumerService - topic=test-topic partition=0 offset=1 key=x2 value=test2
從上面的測試可以看出,消費者在提供的時間戳之後進行輪詢,並跳過先前的訊息。
需要注意的是,建議消費者不要啟用ENABLE_AUTO_COMMIT_CONFIG .因為消費者提交操作是週期性的,並且獨立於重新平衡機制,所以可能會導致訊息重複。
4. 使用 Kafka 管理 API 重置
與其使用 Kafka CLI 命令或更改消費者端程式碼,我們可以考慮將重置偏移量管理整合到現有的自動化工具中。
Kafka 提供 Admin API,用於以程式設計方式管理和維護叢集。使用此 API,我們可以更改任何主題和消費者群組的偏移量。
4.1. 實作 Kafka 偏移量管理服務
使用AdminClient API,我們可以更新任何主題和消費者群組的消費者偏移量。
首先,我們將使用AdminClient實例來實作ResetOffsetService類別:
public class ResetOffsetService {
private final AdminClient adminClient;
public ResetOffsetService(String bootstrapServers) {
this.adminClient = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers));
}
}
接下來,我們將實作上述類別中的fetchPartitions方法來取得主題分區資料:
private List<TopicPartition> fetchPartitions(String topic) throws ExecutionException, InterruptedException {
return adminClient.describeTopics(List.of(topic))
.values()
.get(topic)
.get()
.partitions()
.stream()
.map(p -> new TopicPartition(topic, p.partition()))
.toList();
}
接下來,我們實作fetchEarliestOffsets方法來取得上述分割區的最早偏移值:
private Map<TopicPartition, OffsetAndMetadata> fetchEarliestOffsets(List<TopicPartition> partitions) {
Map<TopicPartition, OffsetSpec> offsetSpecs = partitions.stream()
.collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.earliest()));
ListOffsetsResult offsetsResult = adminClient.listOffsets(offsetSpecs);
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
partitions.forEach(tp -> {
long offset = Optional.ofNullable(offsetsResult.partitionResult(tp))
.map(kafkaFuture -> {
try {
return kafkaFuture.get();
} catch (InterruptedException | ExecutionException ex) {
log.error("Error in the Kafka Consumer reset with exception {}", ex.getMessage(), ex);
throw new RuntimeException(ex);
}
})
.map(ListOffsetsResult.ListOffsetsResultInfo::offset)
.orElseThrow(() -> new RuntimeException("No offset result returned for partition " + tp));
offsets.put(tp, new OffsetAndMetadata(offset));
});
return offsets;
}
在上面的程式碼中,我們使用AdminClient的listOffsets方法來取得每個分區的偏移值,然後將其作為Map < TopicPartition, OffsetAndMetadata>實例傳回.
要重置偏移量,我們將首先獲取分區信息,獲取earliestOffsets映射,然後使用AdminClient的alterConsumerGroupOffsets方法更改偏移量。
最後,我們將實作上述類別中以topic和consumerGroup為輸入reset方法:
public void reset(String topic, String consumerGroup) {
List<TopicPartition> partitions;
try {
partitions = fetchPartitions(topic);
} catch (ExecutionException | InterruptedException ex) {
log.error("Error in the fetching partitions with exception {}", ex.getMessage(), ex);
throw new RuntimeException(ex);
}
Map<TopicPartition, OffsetAndMetadata> earliestOffsets = fetchEarliestOffsets(partitions);
try {
adminClient.alterConsumerGroupOffsets(consumerGroup, earliestOffsets)
.all()
.get();
} catch (InterruptedException | ExecutionException ex) {
log.error("Error in the Kafka Consumer reset with exception {}", ex.getMessage(), ex);
throw new RuntimeException(ex);
}
}
在上面的程式碼中,如果上述方法出現任何故障,我們將拋出RuntimeException 。
除了重設為最早偏移量之外,我們還可以將偏移量重設為特定時間戳記之後、最新時間戳記之後或任何偏移量值之後。
4.2. 測試管理服務
與先前的測試類似,我們將使用測試容器來實作ResetOffsetService類別的整合測試。
讓我們實作fetchCommitedOffset方法來取得分區的已提交偏移量:
private long fetchCommittedOffset(String groupId) throws ExecutionException, InterruptedException {
Map<TopicPartition, OffsetAndMetadata> offsets = testAdminClient.listConsumerGroupOffsets(groupId)
.partitionsToOffsetAndMetadata()
.get();
return offsets.values()
.iterator()
.next()
.offset();
}
現在,我們將編寫測試來驗證ResetService的reset方法:
@Test
void givenMessagesAreConsumed_whenOffsetIsReset_thenOffsetIsSetToEarliest() {
KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerConfig());
producer.send(new ProducerRecord<>("test-topic-1", "msg-1"));
producer.send(new ProducerRecord<>("test-topic-1", "msg-2"));
producer.flush();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getConsumerConfig("test-group-1"));
consumer.subscribe(List.of("test-topic-1"));
int consumed = 0;
while (consumed < 2) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
consumed += records.count();
}
consumer.commitSync();
consumer.close();
await().atMost(5, SECONDS)
.pollInterval(Duration.ofMillis(300))
.untilAsserted(() -> assertEquals(2L, fetchCommittedOffset("test-group-1")));
resetService.reset("test-topic-1", "test-group-1");
await().atMost(5, SECONDS)
.pollInterval(Duration.ofMillis(300))
.untilAsserted(() -> assertEquals(0L, fetchCommittedOffset("test-group-1")));
}
在上面的程式碼中,我們斷言提交的偏移量在重置之前和之後。
為了安全地處理偏移重置而不導致重複處理,消費者應該具有冪等性。
5. 結論
本文介紹如何在 Kafka 中重置消費者偏移量。我們透過多種方式實現了這一目標,例如使用 Kafka CLI 工具、透過KafkaRebalanceListener介面進行重播以及使用AdminClient API。此外,我們也在 Docker 環境中測試了這些實作方式。
和往常一樣,範例程式碼可以在 GitHub 上找到。