了解 Kafka 消費者抵消
1. 概述
Kafka 消費者偏移量是一個唯一的、單調遞增的整數,用來識別事件記錄在分區中的位置。組中的每個消費者都為每個分區維護一個特定的偏移量以追蹤進度。另一方面,Kafka 消費者群組由負責透過輪詢從跨多個分區的主題讀取訊息的消費者組成。
Kafka 中的群組協調器管理消費者群組並將分區分配給群組內的消費者。當消費者啟動時,它會找到其組的協調員並要求加入。協調器觸發群組重新平衡,為新成員分配其分區份額。
在本教程中,我們將探討這些偏移量的保存位置以及消費者如何使用它們來追蹤和開始或恢復其進度。
2. 設定
**讓我們先使用 Docker Compose 腳本在 Kraft 模式下設定單一實例 Kafka 叢集**:
broker:
image: confluentinc/cp-kafka:7.7.0
hostname: broker
container_name: broker
ports:
- "9092:9092"
- "9101:9101"
expose:
- '29092'
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
KAFKA_LOG_CLEANUP_POLICY: 'delete'
這將使叢集在http://localhost:9092/.
接下來,我們建立一個具有兩個分區的主題:
init-kafka:
image: confluentinc/cp-kafka:7.7.0
depends_on:
- broker
entrypoint: [ '/bin/sh', '-c' ]
command: |
" # blocks until kafka is reachable
kafka-topics --bootstrap-server broker:29092 --list
echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server broker:29092 --create \
--if-not-exists --topic user-data --partitions 2 "
作為可選步驟,讓我們設定 Kafka UI 以輕鬆查看訊息,但在本文中我們將使用 CLI 檢查詳細資訊:
kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- "3030:8080"
depends_on:
- broker
- init-kafka
environment:
KAFKA_CLUSTERS_0_NAME: broker
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: broker:29092
這使得 Kafka UI 在http://localhost:3030/
可用:
3. 配置中的消費者偏移參考
當消費者第一次加入群組時,它會根據auto.offset.reset
配置確定獲取記錄的偏移位置,設定為earliest
或latest
。
讓我們以生產者的身分推送一些訊息:
docker exec -i <CONTAINER-ID> kafka-console-producer \
--broker-list localhost:9092 \
--topic user-data <<< '{"id": 1, "first_name": "John", "last_name": "Doe"}
{"id": 2, "first_name": "Alice", "last_name": "Johnson"}'
接下來,讓我們透過註冊消費者來從主題user-data
中讀取這些訊息來使用這些訊息,並將所有分區中的auto.offset.reset
設定為earliest
:
docker exec -it <CONTAINER_ID> kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic user-data \
--consumer-property auto.offset.reset=earliest \
--group consumer-user-data
這會為consumer-user-data
群組新增一個新消費者。我們可以在代理日誌和 Kafka UI 中檢查重新平衡。它還應根據最早的重置策略列出所有訊息。
我們需要記住,消費者在終端機中保持開啟以進行持續的訊息消費。為了檢查中斷後的行為,我們終止此會話。
4.消費者抵銷主題參考
當消費者加入群組時,代理程式會建立一個內部主題__consumer_offsets,
以在主題和分區層級儲存客戶偏移狀態。如果啟用了Kafka自動提交,消費者會定期將最後處理的訊息偏移量提交到該主題。這允許在中斷後恢復消費時使用該狀態。
當群組中的消費者因崩潰或斷開連線而失敗時,Kafka 會偵測到遺失的心跳並觸發重新平衡。它將失敗的消費者的分區重新分配給活動消費者,確保訊息消費持續。內部主題的持久狀態用於恢復消費。
讓我們先驗證內部主題中已提交的偏移量狀態:
docker exec -it <CONTAINER_ID> kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic __consumer_offsets \
--formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
--from-beginning
該腳本使用特定格式以獲得更好的可讀性,因為預設格式為二進制,並且該腳本記錄來自主題的記錄,顯示消費者群組( consumer-user-data
)、主題( user-data
)、分區( 0
和1
) ,和偏移量元資料(offset = 2
):
[consumer-user-data,user-data,0]::OffsetAndMetadata(offset=2, leaderEpoch=Optional[0], metadata=, commitTimestamp=1726601656308, expireTimestamp=None)
[consumer-user-data,user-data,1]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1726601661314, expireTimestamp=None)
在這種情況下,分區0
已收到所有訊息,並且消費者提交了用於追蹤進度/恢復的狀態。
接下來,讓我們透過作為生產者推送附加訊息來驗證恢復行為:
docker exec -i <CONTAINER-ID> kafka-console-producer \
--broker-list localhost:9092 \
--topic user-data <<< '{"id": 3, "first_name": "Alice", "last_name": "Johnson"}
{"id": 4, "first_name": "Michael", "last_name": "Brown"}'
然後,讓我們重新啟動之前終止的消費者,檢查它是否從最後一個已知偏移量恢復消費記錄:
docker exec -it <CONTAINER_ID> kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic user-data \
--consumer-property auto.offset.reset=earliest \
--group consumer-user-data
即使auto.offset.reset
設定為earliest
,這也應該記錄使用者 id 3
和使用者 id 4
的記錄,因為偏移狀態儲存在內部主題中。最後,我們可以透過再次執行相同的命令來驗證__consumer_offsets
主題中的狀態:
[consumer-user-data, user-data, 1] :: OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1726611172398, expireTimestamp=None)
[consumer-user-data, user-data, 0] :: OffsetAndMetadata(offset=4, leaderEpoch=Optional[0], metadata=, commitTimestamp=1726611172398, expireTimestamp=None)
我們可以看到__consumer_offsets
主題更新了已提交的偏移量(值為4
),有效地從上次提交的偏移量恢復消費,因為狀態保留在主題中。
5. 結論
在本文中,我們探討了 Kafka 如何管理消費者偏移量以及消費者首次加入群組時auto.offset.reset
屬性如何運作。
我們也了解如何使用內部__consumer_offsets
主題的狀態在暫停或中斷後恢復消費。