帶有 Spring 的 Kafka 死信隊列
一、簡介
在本教程中,我們將學習如何使用 Spring 為 Apache Kafka 配置死信佇列機制。
2. 死信隊列
死信佇列 (DLQ) 用於儲存因各種原因(例如間歇性系統故障、無效的訊息架構或損壞的內容)而無法正確處理的訊息。這些訊息稍後可以從 DLQ 中刪除以進行分析或重新處理。
下圖展示了DLQ機制的簡化流程:
使用 DLQ 通常是個好主意,但在某些情況下應該避免使用。 例如,不建議對訊息的確切順序很重要的佇列使用 DLQ,因為重新處理 DLQ 訊息會破壞訊息到達時的順序。
3. Spring Kafka 中的死信隊列
Spring Kafka 中 DLQ 概念的等價物是死信主題(DLT)。在下面的部分中,我們將了解 DLT 機制如何適用於簡單的支付系統。
3.1.模型類
讓我們從模型類別開始:
public class Payment {
private String reference;
private BigDecimal amount;
private Currency currency;
// standard getters and setters
}
我們還實作一個用於建立事件的實用方法:
static Payment createPayment(String reference) {
Payment payment = new Payment();
payment.setAmount(BigDecimal.valueOf(71));
payment.setCurrency(Currency.getInstance("GBP"));
payment.setReference(reference);
return payment;
}
3.2.設定
接下來,讓我們加入所需的spring-kafka
和jackson-databind
依賴項:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.9.13</version> </dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.14.3</version>
</dependency>
我們現在可以建立ConsumerFactory
和ConcurrentKafkaListenerContainerFactory
bean:
@Bean
public ConsumerFactory<String, Payment> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return new DefaultKafkaConsumerFactory<>(
config, new StringDeserializer(), new JsonDeserializer<>(Payment.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Payment> containerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Payment> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
最後,讓我們實現主題的消費者:
@KafkaListener(topics = { "payments" }, groupId = "payments")
public void handlePayment(
Payment payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Event on main topic={}, payload={}", topic, payment);
}
在繼續討論 DLT 範例之前,我們將討論重試配置。
3.3.關閉重試
在現實專案中,在將事件傳送到 DLT 之前,如果發生錯誤,通常會重試處理事件。使用 Spring Kafka 提供的非阻塞重試機制可以輕鬆實現這一點。
然而,在本文中,我們將關閉重試以突出 DLT 機制。當主要主題的消費者無法處理事件時,事件將直接發佈到 DLT。
首先,我們要定義producerFactory
和retryableTopicKafkaTemplate
bean:
@Bean
public ProducerFactory<String, Payment> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return new DefaultKafkaProducerFactory<>(
config, new StringSerializer(), new JsonSerializer<>());
}
@Bean
public KafkaTemplate<String, Payment> retryableTopicKafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
現在我們可以定義主要主題的使用者而無需額外重試,如前所述:
@RetryableTopic(attempts = "1", kafkaTemplate = "retryableTopicKafkaTemplate")
@KafkaListener(topics = { "payments"}, groupId = "payments")
public void handlePayment(
Payment payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Event on main topic={}, payload={}", topic, payment);
}
@RetryableTopic
註解中的attempts
屬性表示在將訊息傳送到 DLT 之前嘗試的嘗試次數。
4. 配置死信主題
我們現在已準備好實施 DLT 消費者:
@DltHandler
public void handleDltPayment(
Payment payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Event on dlt topic={}, payload={}", topic, payment);
}
使用@DltHandler
註解的方法必須與@KafkaListener
註解的方法放在同一個類別中。
在以下部分中,我們將探討 Spring Kafka 中可用的三種 DLT 配置。我們將為每個策略使用專門的主題和使用者,以使每個範例易於單獨遵循。
4.1.錯誤失敗的 DLT
使用 FAIL_ON_ERROR
策略我們可以設定DLT消費者在DLT處理失敗時結束執行而不重試:
@RetryableTopic(
attempts = "1",
kafkaTemplate = "retryableTopicKafkaTemplate",
dltStrategy = DltStrategy.FAIL_ON_ERROR)
@KafkaListener(topics = { "payments-fail-on-error-dlt"}, groupId = "payments")
public void handlePayment(
Payment payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Event on main topic={}, payload={}", topic, payment);
}
@DltHandler
public void handleDltPayment(
Payment payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Event on dlt topic={}, payload={}", topic, payment);
}
值得注意的是, @KafkaListener
消費者從payments-fail-on-error-dlt
主題讀取訊息。
讓我們驗證一下,當主要消費者成功時,該事件沒有發佈到 DLT:
@Test
public void whenMainConsumerSucceeds_thenNoDltMessage() throws Exception {
CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1);
doAnswer(invocation -> {
mainTopicCountDownLatch.countDown();
return null;
}).when(paymentsConsumer)
.handlePayment(any(), any());
kafkaProducer.send(TOPIC, createPayment("dlt-fail-main"));
assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
verify(paymentsConsumer, never()).handleDltPayment(any(), any());
}
讓我們看看當主消費者和 DLT 消費者都無法處理事件時會發生什麼:
@Test
public void whenDltConsumerFails_thenDltProcessingStops() throws Exception {
CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1);
CountDownLatch dlTTopicCountDownLatch = new CountDownLatch(2);
doAnswer(invocation -> {
mainTopicCountDownLatch.countDown();
throw new Exception("Simulating error in main consumer");
}).when(paymentsConsumer)
.handlePayment(any(), any());
doAnswer(invocation -> {
dlTTopicCountDownLatch.countDown();
throw new Exception("Simulating error in dlt consumer");
}).when(paymentsConsumer)
.handleDltPayment(any(), any());
kafkaProducer.send(TOPIC, createPayment("dlt-fail"));
assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(dlTTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isFalse();
assertThat(dlTTopicCountDownLatch.getCount()).isEqualTo(1);
}
在上面的測試中,事件由主消費者處理一次,並且僅由 DLT 消費者處理一次。
4.2. DLT重試
我們可以使用以下方法將 DLT 使用者配置為在 DLT 處理失敗時嘗試重新處理事件 ALWAYS_RETRY_ON_ERROR
策略。 這是預設使用的策略:
@RetryableTopic(
attempts = "1",
kafkaTemplate = "retryableTopicKafkaTemplate",
dltStrategy = DltStrategy.ALWAYS_RETRY_ON_ERROR)
@KafkaListener(topics = { "payments-retry-on-error-dlt"}, groupId = "payments")
public void handlePayment(
Payment payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Event on main topic={}, payload={}", topic, payment);
}
@DltHandler
public void handleDltPayment(
Payment payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Event on dlt topic={}, payload={}", topic, payment);
}
值得注意的是, @KafkaListener
消費者從payments-retry-on-error-dlt
主題讀取訊息。
接下來,我們來測試一下當 main 和 DLT 消費者處理事件失敗時會發生什麼:
@Test
public void whenDltConsumerFails_thenDltConsumerRetriesMessage() throws Exception {
CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1);
CountDownLatch dlTTopicCountDownLatch = new CountDownLatch(3);
doAnswer(invocation -> {
mainTopicCountDownLatch.countDown();
throw new Exception("Simulating error in main consumer");
}).when(paymentsConsumer)
.handlePayment(any(), any());
doAnswer(invocation -> {
dlTTopicCountDownLatch.countDown();
throw new Exception("Simulating error in dlt consumer");
}).when(paymentsConsumer)
.handleDltPayment(any(), any());
kafkaProducer.send(TOPIC, createPayment("dlt-retry"));
assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(dlTTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(dlTTopicCountDownLatch.getCount()).isEqualTo(0);
}
正如預期的那樣,DLT 消費者嘗試重新處理該事件。
4.3.禁用DLT
DLT 機制也可以使用以下指令關閉 NO_DLT
策略:
@RetryableTopic(
attempts = "1",
kafkaTemplate = "retryableTopicKafkaTemplate",
dltStrategy = DltStrategy.NO_DLT)
@KafkaListener(topics = { "payments-no-dlt" }, groupId = "payments")
public void handlePayment(
Payment payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Event on main topic={}, payload={}", topic, payment);
}
@DltHandler
public void handleDltPayment(
Payment payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Event on dlt topic={}, payload={}", topic, payment);
}
值得注意的是, @KafkaListener
消費者從payments-no-dlt
主題讀取訊息。
讓我們檢查一下當主要主題的使用者無法處理事件時,事件不會轉發到 DLT:
@Test
public void whenMainConsumerFails_thenDltConsumerDoesNotReceiveMessage() throws Exception {
CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1);
CountDownLatch dlTTopicCountDownLatch = new CountDownLatch(1);
doAnswer(invocation -> {
mainTopicCountDownLatch.countDown();
throw new Exception("Simulating error in main consumer");
}).when(paymentsConsumer)
.handlePayment(any(), any());
doAnswer(invocation -> {
dlTTopicCountDownLatch.countDown();
return null;
}).when(paymentsConsumer)
.handleDltPayment(any(), any());
kafkaProducer.send(TOPIC, createPayment("no-dlt"));
assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(dlTTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isFalse();
assertThat(dlTTopicCountDownLatch.getCount()).isEqualTo(1);
}
正如預期的那樣,事件不會轉發到 DLT,儘管我們已經實現了使用@DltHandler
註釋的用戶。
5. 結論
在本文中,我們學習了三種不同的 DLT 策略。第一個是FAIL_ON_ERROR
策略,即 DLT 使用者在發生故障時不會嘗試重新處理事件。相反, ALWAYS_RETRY_ON_ERROR
策略可確保 DLT 使用者在失敗時嘗試重新處理事件。當沒有明確設定其他策略時,這是用作預設值。最後一種是NO_DLT
策略,完全關閉DLT機制。
像往常一樣,完整的程式碼可以在 GitHub 上找到。