冪等性對 Kafka 生產者效能的影響
1. 引言
在本教程中,我們將探討冪等性的概念,它如何應用於 Kafka 生產者,以及它對效能的影響。
在介紹核心概念之後,我們將使用 Java Microbenchmark Harness (JMH) 建立基準測試,以衡量冪等生產者對 Kafka 效能的影響。
最後,我們將分析基準測試結果,以確定何時應該停用冪等性,何時應該保持啟用狀態。
2. 卡夫卡作品中的冪等性意義
我們先來了解一下冪等性的意思。一般來說,冪等性是指多次執行相同操作會產生與執行一次相同的結果。
在分散式系統中,網路問題、代理伺服器宕機和逾時等故障會導致 Kafka 生產者重試請求,可能造成重複記錄。幸運的是,Kafka 的冪等性有助於解決這些問題。啟用 Kafka 生產者的冪等性後,可以保證重試傳送作業不會導致重複記錄寫入相同主題。
這是在協定層實現的,無需在應用層進行去重。每個生產者實例都被分配一個唯一標識符,記錄使用序號進行追蹤。如果出現上述問題,這使得代理人能夠丟棄因重試而導致的重複記錄。
3. 冪等生產者
如前所述,冪等生產者在協定層使用去重機制。生產者和代理會協調確保重試不會導致訊息重複發送。也就是說,冪等生產者的目標是確保每個訊息只發送一次。
在本節中,我們將了解如何實現這一結果。
3.1. 創建冪等生產者
Kafka 早已支援冪等生產者,但之前需要明確啟用。為了控制生產者是否啟用冪等性,我們將使用一個布林屬性:
enable.idempotence=true
自3.0 版本起,Kafka 強制執行了最嚴格的交付保證,這意味著它現在默認啟用。但是,冪等性僅在沒有設定衝突配置的情況下預設啟用。換句話說,某些生產者設定可能會禁用冪等性。
為確保啟用該功能,我們需要進行以下配置:
-
acks= all確保我們安全地確認訊息 -
retries > 0以允許重試(預設值為Integer.MAX_VALUE) -
max.in.flight.requests.per.connection<= 5以保持訊息順序不變
需要注意的是,如果明確啟用冪等性並且設定了衝突的配置,Kafka Java 用戶端將會拋出ConfigException 。
3.2. 冪等性如何改變生產者行為
啟用冪等性後,生產者和代理人都會改變其訊息傳遞行為,以確保精確一次語意。
首先,代理伺服器會為每個生產者指派一個唯一的生產者 ID (PID)。然後,生產者會為其發送的給定分區的每批記錄附加一個序號。代理伺服器使用 PID 和序號來識別重試情況,並丟棄重複記錄,而不是將它們再次寫入主題。
設定acks=all後,生產者會等待代理確認記錄已寫入該分區的所有副本後才會發送其他記錄。如果生產者沒有收到確認,則會假定發送操作失敗並自動重試。重試操作將保持相同的進程 ID (PID) 和序號。
同時,該機制也引入了嚴格的順序要求。我們必須保證max.in.flight.requests.per.connection請求數<= 5 。否則,如果允許過多的並發請求,可能會破壞序號驗證。
由於序號必須遞增,且冪等生產者必須等待代理確認已將記錄寫入分區的所有副本,因此效能會有所下降。這種新行為雖然保證了正確性,但卻降低了並行性和效能。
值得注意的是,去重功能僅在生產者處於活動狀態時有效。重啟後,生產者會遺失先前的序號,因此 Kafka 無法再保證記錄不會重複。事務可以解決此問題,它有助於維護生產者的狀態。
4. 衡量績效影響
現在我們已經了解了冪等生產者的工作原理,接下來讓我們看看它在實踐中的應用。在接下來的章節中,我們將配置基準測試,以衡量冪等生產者與非冪等生產者的效能差異。
我們的目標是觀察冪等性如何影響吞吐量。為了提高結果的可靠性,我們將在兩種環境下執行相同的基準測試。首先,我們將使用單一代理伺服器,不啟用複製。然後,我們將在一個包含三個代理伺服器且複製因子為 3 的叢集上重複該基準測試。
4.1 基準設計
為了創造穩定的環境,我們使用 Docker Compose 來運行 Kafka 叢集。為了簡化配置,每個 broker 同時運行 broker 和 controller,並暴露一個外部監聽器以供基準測試程序連接:
services:
kafka-1:
image: apache/kafka:3.9.0
container_name: kafka-1
ports:
- "29092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_LISTENERS: "PLAINTEXT://:19092,CONTROLLER://:19093,EXTERNAL://:9092"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-1:19092,EXTERNAL://localhost:29092"
KAFKA_MIN_INSYNC_REPLICAS: 2
// omitted irrelevant config
我們對其餘代理重複相同的配置,僅更改節點 ID 和連接埠。對於單代理場景,我們使用相同的設置,但僅運行一個代理,並將主題的複製因子設為 1。
關於生產者配置,除了冪等性標誌(該標誌在 JMH 基準測試中已參數化)之外,兩種配置完全相同。在初始狀態下,我們設定了必要的屬性acks=all 、無限重試以及請求數量的安全限制。隨後,我們配置了顯式批次和逾時,以減少運行之間的差異,從而使我們的基準測試更加穩定可靠:
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, String.valueOf(idempotent));
props.put(ProducerConfig.LINGER_MS_CONFIG, "5");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32 * 1024));
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "120000");
最後,基準測試方法非同步發送記錄,收集 future,然後等待每個 future 完成:
@Benchmark
@OperationsPerInvocation(MESSAGES)
public void sendMessages() throws Exception {
Future<RecordMetadata>[] futures = new Future[MESSAGES];
for (int i = 0; i < MESSAGES; i++) {
long key = counter++;
futures[i] = producer.send(new ProducerRecord<>(topic, key, value));
}
for (Future<RecordMetadata> f : futures) {
f.get();
}
}
這種方法的目的是測量生產者發送請求的速度、代理人將請求持久化到各個分區和副本的速度,以及確認事件已送達的速度。這樣,我們既能測量真實的端對端生產者吞吐量,又能確保 Kafka 的高效運作。
4.2 結果解讀
首先來看一個更簡單的場景,使用單一代理且不進行任何複製,我們得到以下結果:
Benchmark (idempotent) Mode Cnt Score Error Units
IdempotenceBenchmark.sendMessages true thrpt 10 24891.897 ± 263.271 ops/s
IdempotenceBenchmark.sendMessages false thrpt 10 24953.439 ± 313.723 ops/s
此分數代表吞吐量,以每秒記錄數衡量。換句話說,它表示生產者在一秒鐘內成功發送的記錄數。冪等生產者和非冪等生產者之間的差異相對於整體吞吐量而言很小。在這種類型的基準測試中,出現一些波動是正常的。
接下來,我們在複製因子為 3 的三代理叢集上執行基準測試:
Benchmark (idempotent) Mode Cnt Score Error Units
IdempotenceBenchmark.sendMessages true thrpt 10 23689.246 ± 568.638 ops/s
IdempotenceBenchmark.sendMessages false thrpt 10 24097.398 ± 500.293 ops/s
兩個生產者的吞吐量值非常接近。這種程度的波動在效能基準測試中是預期的,在某些測試中,我們甚至可以觀察到冪等生產者獲得了略高的分數。在這個吞吐量水準下,Kafka 每秒大約可以處理 24000 筆記錄。在這種情況下,吞吐量的微小差異幾乎可以忽略不計。
啟用冪等性後,即使使用多個代理程式和副本,我們也沒有測量到明顯的吞吐量下降。這些差異可能因運行而異,但兩種情況下結果都保持一致。
5. 何時啟用冪等生產者
根據 Kafka 的交付保證和我們的基準測試結果,我們可以看出冪等性並不會顯著降低吞吐量,反而提供了強大的去重機制。 Kafka的目標是可靠地交付每一筆記錄,因此在新版本中預設啟用冪等性。
為了避免重複記錄,並且當我們想要在不實現自訂應用程式級去重的情況下獲得強大的交付保證時,保持冪等性啟用是安全的。
另一方面,如果我們的主要目標是實現絕對最大吞吐量,並且可以接受重複數據,那麼我們可以停用冪等性。即使如此,效能差異通常也很小,但重複資料的風險卻會增加。
6. 結論
在本文中,我們探討了 Kafka 中的冪等性,並使用 JMH 基準測試衡量了其對效能的影響。
結果表明,冪等性不會導致吞吐量顯著下降。我們觀察到的差異很小,且在正常的測量誤差範圍內。
與往常一樣,完整的程式碼範例可在 GitHub 上找到。