如何清除Apache Kafka主題數據?

1.概述

在本文中,我們將探討一些清除Apache Kafka主題中的數據的策略

2.清理方案

在學習清理數據的策略之前,讓我們先了解一個需要清除活動的簡單方案。

2.1場景

經過配置的保留時間後,Apache Kafka中的消息會自動過期。但是,在某些情況下,我們可能希望消息立即刪除。

假設在應用程序代碼中引入了一個缺陷,該缺陷在Kafka主題中生成消息。到集成錯誤修復程序時, Kafka主題中已經有許多損壞的消息可供使用。

此類問題在開發環境中最常見,我們希望獲得快速結果。因此,批量刪除消息是一個合理的事情。

2.2。模擬

為了模擬這種情況,讓我們從Kafka安裝目錄中purge-scenario

$ bin/kafka-topics.sh \

 --create --topic purge-scenario --if-not-exists \

 --partitions 2 --replication-factor 1 \

 --zookeeper localhost:2181

接下來,讓我們使用shuf命令**生成隨機數據並將其提供給kafka-console-producer.sh**腳本:

$ /usr/bin/shuf -i 1-100000 -n 50000000 \

 | tee -a /tmp/kafka-random-data \

 | bin/kafka-console-producer.sh \

 --bootstrap-server=0.0.0.0:9092 \

 --topic purge-scenario

我們必須注意,我們已經使用了tee命令來保存仿真數據,以備後用。

最後,讓我們驗證使用者是否可以使用該主題中的消息:

$ bin/kafka-console-consumer.sh \

 --bootstrap-server=0.0.0.0:9092 \

 --from-beginning --topic purge-scenario \

 --max-messages 3

 76696

 49425

 1744

 Processed a total of 3 messages

3.消息過期

purge-scenario主題中生成的消息的默認保留期為7天。要清除消息,我們可以**retention.ms主題級別的屬性臨時重置**為十秒鐘,然後等待消息過期:

$ bin/kafka-configs.sh --alter \

 --add-config retention.ms=10000 \

 --bootstrap-server=0.0.0.0:9092 \

 --topic purge-scenario \

 && sleep 10

接下來,讓我們驗證消息是否已從該主題過期:

$ bin/kafka-console-consumer.sh \

 --bootstrap-server=0.0.0.0:9092 \

 --from-beginning --topic purge-scenario \

 --max-messages 1 --timeout-ms 1000

 [2021-02-28 11:20:15,951] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)

 org.apache.kafka.common.errors.TimeoutException

 Processed a total of 0 messages

最後,我們可以將該主題的原始保留期恢復為7天:

$ bin/kafka-configs.sh --alter \

 --add-config retention.ms=604800000 \

 --bootstrap-server=0.0.0.0:9092 \

 --topic purge-scenario

通過這種方法,Kafka將針對purge-scenario主題清除所有分區中的消息。

4.選擇性記錄刪除

有時,我們可能希望選擇性地刪除特定主題的一個或多個分區中的記錄。我們可以使用kafka-delete-records.sh腳本滿足這些要求。

delete-config.json配置文件中指定分區級別的偏移量。

讓我們使用offset=-1 partition=1的所有消息:

{

 "partitions": [

 {

 "topic": "purge-scenario",

 "partition": 1,

 "offset": -1

 }

 ],

 "version": 1

 }

接下來,讓我們繼續刪除記錄:

$ bin/kafka-delete-records.sh \

 --bootstrap-server localhost:9092 \

 --offset-json-file delete-config.json

我們可以驗證我們仍然能夠從partition=0讀取:

$ bin/kafka-console-consumer.sh \

 --bootstrap-server=0.0.0.0:9092 \

 --from-beginning --topic purge-scenario --partition=0 \

 --max-messages 1 --timeout-ms 1000

 44017

 Processed a total of 1 messages

但是,當我們從partition=1讀取時,將沒有記錄可處理:

$ bin/kafka-console-consumer.sh \

 --bootstrap-server=0.0.0.0:9092 \

 --from-beginning --topic purge-scenario \

 --partition=1 \

 --max-messages 1 --timeout-ms 1000

 [2021-02-28 11:48:03,548] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)

 org.apache.kafka.common.errors.TimeoutException

 Processed a total of 0 messages

5.刪除並重新創建主題

清除Kafka主題的所有消息的另一種解決方法是刪除並重新創建它。但是,只有delete.topic.enable屬性設置為 啟動Kafka服務器時為**true**

$ bin/kafka-server-start.sh config/server.properties \

 --override delete.topic.enable=true

要刪除主題,我們可以使用kafka-topics.sh腳本:

$ bin/kafka-topics.sh \

 --delete --topic purge-scenario \

 --zookeeper localhost:2181

 Topic purge-scenario is marked for deletion.

 Note: This will have no impact if delete.topic.enable is not set to true.

讓我們通過列出主題來驗證它:

$ bin/kafka-topics.sh --zookeeper localhost:2181 --list

確認不再列出該主題後,我們現在可以繼續並重新創建它。

六,結論

在本教程中,我們模擬了需要清除Apache Kafka主題的場景。此外,我們探索了**多種策略來完全或選擇性地清除分區中的數據**。