使用 Reactor Kafka 創建 Kafka 消費者
1. 簡介
Apache Kafka 是一個流行的分散式事件流平台,與 Project Reactor 結合使用時,它可以建立一個有彈性且反應靈敏的應用程式。 Reactor Kafka 是建立在 Reactor 和 Kafka 生產者/消費者 API 之上的反應式 API。
Reactor Kafka API 使我們能夠使用具有背壓支援的功能性、非阻塞 API 向 Kafka 發布訊息和從 Kafka 使用訊息。這意味著系統可以根據需求和資源可用性動態調整訊息處理速率,確保高效率、容錯的運作。
在本教程中,我們將探討如何使用Reactor Kafka創建 Kafka 消費者,確保容錯性和可靠性。我們將深入研究以非阻塞方式非同步處理訊息時的關鍵概念,例如背壓、重試和錯誤處理。
2. 設定項目
首先,我們應該在專案中包含Spring Kafka和Reactor Kafka Maven 依賴項:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
3. 反應式 Kafka 消費者設置
接下來,我們將使用 Reactor Kafka 設定 Kafka 消費者。我們將首先配置必要的消費者屬性,確保它正確設定以與 Kafka 連接。然後,我們將初始化消費者,最後來看看如何被動地消費訊息。
3.1.配置 Kafka 消費者屬性
現在,讓我們來配置 Reactive Kafka 消費者屬性。 KafkaConfig
配置類別定義了消費者要使用的屬性:
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
public static Map<String, Object> consumerConfig() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "reactive-consumer-group");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return config;
}
}
ConsumerConfig.GROUP_ID_CONFIG
定義消費者群組並實現跨消費者的訊息負載平衡。同一組內的所有消費者負責處理來自某個主題的訊息。
接下來我們在實例化ReactiveKafkaConsumerTemplate
時使用配置類別來消耗事件:
public ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate() {
return new ReactiveKafkaConsumerTemplate<>(receiverOptions());
}
private ReceiverOptions<String, String> receiverOptions() {
Map<String, Object> consumerConfig = consumerConfig();
ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(consumerConfig);
return receiverOptions.subscription(Collections.singletonList("test-topic"));
}
receiverOptions()
方法使用consumerConfig()
中的設定來配置 Kafka 消費者並訂閱test-topic
,以確保它監聽訊息。 reactiveKafkaConsumerTemplate()
方法初始化一個ReactiveKafkaConsumerTemplate
,為我們的反應式應用程式提供非阻塞、背壓感知的訊息消費功能。
3.2.使用 Reactive Kafka 創建 Kafka 消費者
在 Reactor Kafka 中,Kafka Consumer 上選擇的抽像是入站Flux
,其中從 Kafka 接收的所有事件都由框架發布。透過呼叫 ReactiveKafkaConsumerTemplate 上的receive()
、 receiveAtmostOnce()
、 receiveAutoAck()
和receiveExactlyOnce()
方法之一來建立此Flux
ReactiveKafkaConsumerTemplate.
在這個範例中,我們使用receive()
運算元來使用入站的Flux:
public class ConsumerService {
private final ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate;
public Flux<String> consumeRecord() {
return reactiveKafkaConsumerTemplate.receive()
.map(ReceiverRecord::value)
.doOnNext(msg -> log.info("Received: {}", msg))
.doOnError(error -> log.error("Consumer error: {}", error.getMessage()));
}
}
這種方法允許系統在訊息到達時主動處理它們,而不會阻塞或遺失訊息。透過使用反應流,消費者可以按照自己的步調擴展和處理訊息,並在必要時施加背壓。在這裡,我們記錄透過doOnNext()
收到的每個訊息,並使用doOnError().
4.處理背壓
使用 Reactor Kafka 消費者的主要優點之一是它支援背壓。這確保系統不會因高吞吐量而超負荷。我們可以使用limitRate()
來限制處理速率,或使用buffer()
進行批次處理,而不是直接使用訊息:
public Flux<String> consumeWithLimit() {
return reactiveKafkaConsumerTemplate.receive()
.limitRate(2)
.map(ReceiverRecord::value);
}
這裡我們一次請求最多兩個訊息,控制流量。這種方法可確保高效且可感知背壓的訊息處理。最後,它僅提取並返回訊息值。
我們不需要單獨處理它們,而是可以透過緩衝固定數量的記錄,然後將它們作為一個群組發出,從而批量使用它們:
public Flux<String> consumeAsABatch() {
return reactiveKafkaConsumerTemplate.receive()
.buffer(2)
.flatMap(messages -> Flux.fromStream(messages.stream()
.map(ReceiverRecord::value)));
}
在這裡,我們最多緩衝兩條記錄,然後將它們作為一批發出。透過使用buffer(2)
,它將訊息分組並一起處理,從而減少了單獨處理的開銷。
5.錯誤處理策略
在被動的 Kafka 消費者中,管道中的錯誤充當終端訊號。這會導致消費者關閉,從而使服務實例在不消耗事件的情況下運作。 Reactor Kafka 提供了各種策略來解決這個問題,例如使用retryWhen
操作符的重試機制。這會捕獲故障,重新訂閱上游發布者,並重新創建 Kafka 消費者。
Kafka 消費者的另一個常見問題是反序列化錯誤,當消費者因意外的格式而無法反序列化訊息時就會發生這種情況。為了處理所謂的錯誤,我們可以使用 Spring Kafka 提供的ErrorHandlingDeserializer
。
5.1.重試策略
當我們想要重試失敗的操作時,重試策略至關重要。此策略確保以固定的延遲(例如五秒鐘)不斷重試,直到消費者成功重新連接或滿足預先定義的退出條件。
讓我們為消費者實作一個重試策略,以便當發生錯誤時它可以自動重試訊息處理:
public Flux<String> consumeWithRetryWithBackOff(AtomicInteger attempts) {
return reactiveKafkaConsumerTemplate.receive()
.flatMap(msg -> attempts.incrementAndGet() < 3 ?
Flux.error(new RuntimeException("Failure")) : Flux.just(msg))
.retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(1)))
.map(ReceiverRecord::value);
}
在此範例中, Retry.backoff(3, Duration.ofSeconds(1))
指定係統嘗試重試最多3
次,退避時間為1
秒。
5.2.使用ErrorHandlingDeserializer
處理序列化錯誤
當我們從 Kafka 消費訊息時,如果訊息格式與預期的模式不匹配,我們會遇到反序列化錯誤。為了解決這個問題,我們可以使用 Spring Kafka 的**ErrorHandlingDeserializer** .
透過捕捉反序列化錯誤可以防止消費者失敗。然後它將錯誤詳細資訊作為標題添加到ReceiverRecord
,而不是丟棄訊息或引發異常:
private Map<String, Object> errorHandlingConsumerConfig(){
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class);
return config;
}
6. 結論
在本文中,我們探討如何使用 Reactor Kafka 建立 Kafka 消費者,並專注於錯誤處理、重試和背壓管理。這些技術使我們的 Kafka 消費者即使在故障情況下也能保持容錯和高效。
與往常一樣,本文使用的所有程式碼範例均可在 GitHub 上找到。