如何清除Apache Kafka主題數據?
- java
- Kafka
1.概述
在本文中,我們將探討一些清除Apache Kafka主題中的數據的策略。
2.清理方案
在學習清理數據的策略之前,讓我們先了解一個需要清除活動的簡單方案。
2.1場景
經過配置的保留時間後,Apache Kafka中的消息會自動過期。但是,在某些情況下,我們可能希望消息立即刪除。
假設在應用程序代碼中引入了一個缺陷,該缺陷在Kafka主題中生成消息。到集成錯誤修復程序時, Kafka主題中已經有許多損壞的消息可供使用。
此類問題在開發環境中最常見,我們希望獲得快速結果。因此,批量刪除消息是一個合理的事情。
2.2。模擬
為了模擬這種情況,讓我們從Kafka安裝目錄中purge-scenario
$ bin/kafka-topics.sh \
--create --topic purge-scenario --if-not-exists \
--partitions 2 --replication-factor 1 \
--zookeeper localhost:2181
接下來,讓我們使用shuf
命令**生成隨機數據並將其提供給kafka-console-producer.sh
**腳本:
$ /usr/bin/shuf -i 1-100000 -n 50000000 \
| tee -a /tmp/kafka-random-data \
| bin/kafka-console-producer.sh \
--bootstrap-server=0.0.0.0:9092 \
--topic purge-scenario
我們必須注意,我們已經使用了tee
命令來保存仿真數據,以備後用。
最後,讓我們驗證使用者是否可以使用該主題中的消息:
$ bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning --topic purge-scenario \
--max-messages 3
76696
49425
1744
Processed a total of 3 messages
3.消息過期
purge-scenario
主題中生成的消息的默認保留期為7天。要清除消息,我們可以**retention.ms
主題級別的屬性臨時重置**為十秒鐘,然後等待消息過期:
$ bin/kafka-configs.sh --alter \
--add-config retention.ms=10000 \
--bootstrap-server=0.0.0.0:9092 \
--topic purge-scenario \
&& sleep 10
接下來,讓我們驗證消息是否已從該主題過期:
$ bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning --topic purge-scenario \
--max-messages 1 --timeout-ms 1000
[2021-02-28 11:20:15,951] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages
最後,我們可以將該主題的原始保留期恢復為7天:
$ bin/kafka-configs.sh --alter \
--add-config retention.ms=604800000 \
--bootstrap-server=0.0.0.0:9092 \
--topic purge-scenario
通過這種方法,Kafka將針對purge-scenario
主題清除所有分區中的消息。
4.選擇性記錄刪除
有時,我們可能希望選擇性地刪除特定主題的一個或多個分區中的記錄。我們可以使用kafka-delete-records.sh
腳本滿足這些要求。
delete-config.json
配置文件中指定分區級別的偏移量。
讓我們使用offset=-1
partition=1
的所有消息:
{
"partitions": [
{
"topic": "purge-scenario",
"partition": 1,
"offset": -1
}
],
"version": 1
}
接下來,讓我們繼續刪除記錄:
$ bin/kafka-delete-records.sh \
--bootstrap-server localhost:9092 \
--offset-json-file delete-config.json
我們可以驗證我們仍然能夠從partition=0
讀取:
$ bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning --topic purge-scenario --partition=0 \
--max-messages 1 --timeout-ms 1000
44017
Processed a total of 1 messages
但是,當我們從partition=1
讀取時,將沒有記錄可處理:
$ bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning --topic purge-scenario \
--partition=1 \
--max-messages 1 --timeout-ms 1000
[2021-02-28 11:48:03,548] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages
5.刪除並重新創建主題
清除Kafka主題的所有消息的另一種解決方法是刪除並重新創建它。但是,只有將delete.topic.enable
屬性設置為 啟動Kafka服務器時為**true**
:
$ bin/kafka-server-start.sh config/server.properties \
--override delete.topic.enable=true
要刪除主題,我們可以使用kafka-topics.sh
腳本:
$ bin/kafka-topics.sh \
--delete --topic purge-scenario \
--zookeeper localhost:2181
Topic purge-scenario is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
讓我們通過列出主題來驗證它:
$ bin/kafka-topics.sh --zookeeper localhost:2181 --list
確認不再列出該主題後,我們現在可以繼續並重新創建它。
六,結論
在本教程中,我們模擬了需要清除Apache Kafka主題的場景。此外,我們探索了**多種策略來完全或選擇性地清除分區中的數據**。