在Apache Kafka中配置消息保留期限

    1.概述

    當生產者向Apache Kafka發送消息時,它將消息附加到日誌文件中並在配置的持續時間內保留。

    在本教程中,我們將學習為Kafka主題配置基於時間的消息保留屬性

    2.基於時間的保留

    保留期屬性到位後,消息具有TTL(生存時間) 。到期時,郵件將標記為刪除,從而釋放磁盤空間。

    相同的保留期限屬性適用於給定Kafka主題內的所有消息。此外,我們可以在創建主題之前設置這些屬性,也可以在運行時為預先存在的主題更改它們

    在以下各節中,我們將學習如何通過代理配置進行調整,以設置新主題的保留期以及主題級別的配置以在運行時對其進行控制

    3.服務器級配置

    Apache Kafka支持服務器級別的保留策略,我們可以通過完全配置以下三個基於時間的配置屬性之一來進行調整

    • log.retention.hours
    • log.retention.minutes
    • log.retention.ms

    重要的是要了解,Kafka會用較高的值覆蓋較低的精度值。因此, log.retention.ms的優先級最高

    3.1。基本

    首先,讓我們通過執行Apache Kafka目錄中grep命令來檢查默認值是否保留:

    $ grep -i 'log.retention.[hms].*\=' config/server.properties
    
     log.retention.hours=168
    

    在這裡我們可以注意到默認的保留時間是7天

    要僅將消息保留十分鐘,我們可以在config/server.properties log.retention.minutes屬性的值:

    log.retention.minutes=10
    

    3.2。新主題的保留期

    Apache Kafka軟件包包含幾個Shell腳本,我們可以使用它們執行管理任務。我們將使用它們來創建一個輔助腳本functions.sh ,該腳本將在本教程的過程中使用.

    讓我們從在functions.sh添加兩個函數開始,分別創建一個主題並描述其配置:

    function create_topic {
    
     topic_name="$1"
    
     bin/kafka-topics.sh --create --topic ${topic_name} --if-not-exists \
    
     --partitions 1 --replication-factor 1 \
    
     --zookeeper localhost:2181
    
     }
    
    
    
     function describe_topic_config {
    
     topic_name="$1"
    
     ./bin/kafka-configs.sh --describe --all \
    
     --bootstrap-server=0.0.0.0:9092 \
    
     --topic ${topic_name}
    
     }

    接下來,讓我們創建兩個獨立腳本, create-topic.shget-topic-retention-time.sh

    bash-5.1# cat create-topic.sh
    
     #!/bin/bash
    
     . ./functions.sh
    
     topic_name="$1"
    
     create_topic "${topic_name}"
    
     exit $?
    
    bash-5.1# cat get-topic-retention-time.sh
    
     #!/bin/bash
    
     . ./functions.sh
    
     topic_name="$1"
    
     describe_topic_config "${topic_name}" | awk 'BEGIN{IFS="=";IRS=" "} /^[ ]*retention.ms/{print $1}'
    
     exit $?
    

    我們必須注意, describe_topic_config將提供為該主題配置的所有屬性。因此,我們使用了awk retention.ms屬性添加了一個過濾器。

    最後,讓我們啟動Kafka環境,並驗證新示例主題的保留期配置:

    bash-5.1# ./create-topic.sh test-topic
    
     Created topic test-topic.
    
     bash-5.1# ./get-topic-retention-time.sh test-topic
    
     retention.ms=600000

    創建並描述了主題之後,我們會注意到, retention.ms設置為600000 (十分鐘)。這實際上是從我們先前在server.properties文件中**log.retention.minutes屬性派生的。**

    4.主題級配置

    啟動Broker服務器後, log.retention.{hours|minutes|ms}服務器級別的屬性將變為只讀。另一方面,我們可以訪問retention.ms屬性,可以在主題級別進行調整。

    讓我們在functions.sh腳本中添加一個方法來配置主題的屬性:

    function alter_topic_config {
    
     topic_name="$1"
    
     config_name="$2"
    
     config_value="$3"
    
     ./bin/kafka-configs.sh --alter \
    
     --add-config ${config_name}=${config_value} \
    
     --bootstrap-server=0.0.0.0:9092 \
    
     --topic ${topic_name}
    
     }

    然後,我們可以在alter-topic-config.sh腳本中使用它:

    #!/bin/sh
    
     . ./functions.sh
    
    
    
     alter_topic_retention_config $1 $2 $3
    
     exit $?

    test-topic保留時間設置為五分鐘,然後進行驗證:

    bash-5.1# ./alter-topic-config.sh test-topic retention.ms 300000
    
     Completed updating config for topic test-topic.
    
    
    
     bash-5.1# ./get-topic-retention-time.sh test-topic
    
     retention.ms=300000

    5.驗證

    到目前為止,我們已經了解瞭如何在Kafka主題中配置郵件的保留期限。是時候驗證保留超時後消息確實過期了。

    5.1。生產者-消費者

    讓我們在functions.sh中produce_messageconsume_message functions.sh.在內部,它們分別使用kafka-console-producer.shkafka-console-consumer.sh來生成/使用消息:

    function produce_message {
    
     topic_name="$1"
    
     message="$2"
    
     echo "${message}" | ./bin/kafka-console-producer.sh \
    
     --bootstrap-server=0.0.0.0:9092 \
    
     --topic ${topic_name}
    
     }
    
    
    
     function consume_message {
    
     topic_name="$1"
    
     timeout="$2"
    
     ./bin/kafka-console-consumer.sh \
    
     --bootstrap-server=0.0.0.0:9092 \
    
     --from-beginning \
    
     --topic ${topic_name} \
    
     --max-messages 1 \
    
     --timeout-ms $timeout
    
     }

    我們必須注意,使用者總是從頭開始閱讀消息,因為我們需要一個能夠讀取Kafka中任何可用消息的使用者。

    接下來,讓我們創建一個獨立的消息生成器:

    bash-5.1# cat producer.sh
    
     #!/bin/sh
    
     . ./functions.sh
    
     topic_name="$1"
    
     message="$2"
    
    
    
     produce_message ${topic_name} ${message}
    
     exit $?

    最後,讓我們有一個獨立的消息使用者:

    bash-5.1# cat consumer.sh
    
     #!/bin/sh
    
     . ./functions.sh
    
     topic_name="$1"
    
     timeout="$2"
    
    
    
     consume_message ${topic_name} $timeout
    
     exit $?

    5.2。訊息到期

    現在我們已經準備好基本設置,讓我們生成一條消息並立即使用兩次:

    bash-5.1# ./producer.sh "test-topic-2" "message1"
    
     bash-5.1# ./consumer.sh test-topic-2 10000
    
     message1
    
     Processed a total of 1 messages
    
     bash-5.1# ./consumer.sh test-topic-2 10000
    
     message1
    
     Processed a total of 1 messages

    因此,我們可以看到使用者正在重複使用任何可用消息。

    現在,讓我們介紹五分鐘的睡眠延遲,然後嘗試使用該消息:

    bash-5.1# sleep 300 && ./consumer.sh test-topic 10000
    
     [2021-02-06 21:55:00,896] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
    
     org.apache.kafka.common.errors.TimeoutException
    
     Processed a total of 0 messages

    如預期的那樣,使用者沒有找到任何要使用的消息,因為該消息已經超過了保留期

    6.局限性

    在內部,Kafka Broker維護另一個名為log.retention.check.interval.ms.此屬性確定檢查消息是否過期的頻率。

    所以,為了保持保留策略有效,我們必須確保的值log.retention.check.interval.ms比的屬性值低retention.ms對於任何給定的主題。

    7.結論

    在本教程中,我們探索了Apache Kafka以了解基於時間的消息保留策略。在此過程中,我們創建了簡單的Shell腳本來簡化管理活動。後來,我們創建了一個獨立的使用者和生產者,以驗證保留期後郵件的有效期。