如何捕捉 Spring-Kafka 中的反序列化錯誤?
1. 概述
在本文中,我們將了解 Spring-Kafka 的RecordDeserializationException
。之後,我們將建立一個自訂錯誤處理程序來捕獲此異常並跳過無效訊息,從而允許使用者繼續處理下一個事件。
本文依賴 Spring Boot 的 Kafka 模組,該模組提供了與代理程式互動的便利工具。為了更深入了解 Kafka 的內部結構,我們可以重新審視平台的基本概念。
2. 建立Kafka監聽器
對於本文中的程式碼範例,我們將使用一個小型應用程式來偵聽主題「 baeldung.articles.published”
並處理傳入訊息。為了展示自訂錯誤處理,我們的應用程式應該在遇到反序列化異常後繼續使用訊息。
Spring-Kafka 的版本將由父親 Spring Boot pom 自動解析。因此,我們只需要添加模組依賴即可:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
這個模組使我們能夠使用@KafkaListener
註釋,它是 Kafka Consumer API 的抽象。讓我們利用此註解來建立ArticlesPublishedListener
元件。此外,我們將引入另一個元件EmailService
,它將對每個傳入訊息執行操作:
@Component
class ArticlesPublishedListener {
private final EmailService emailService;
// constructor
@KafkaListener(topics = "baeldung.articles.published")
public void onArticlePublished(ArticlePublishedEvent event) {
emailService.sendNewsletter(event.article());
}
}
record ArticlePublishedEvent(String article) {
}
對於消費者配置,我們將專注於僅定義對我們的範例至關重要的屬性。當我們開發生產應用程式時,我們可以調整這些屬性以滿足我們的特定需求或將它們外部化到單獨的配置檔案中:
@Bean
ConsumerFactory<String, ArticlePublishedEvent> consumerFactory(
@Value("${spring.kafka.bootstrap-servers}") String bootstrapServers
) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung-app-1");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
new JsonDeserializer<>(ArticlePublishedEvent.class)
);
}
@Bean
KafkaListenerContainerFactory<String, ArticlePublishedEvent> kafkaListenerContainerFactory(
ConsumerFactory<String, ArticlePublishedEvent> consumerFactory
) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, ArticlePublishedEvent>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
3. 建構測試環境
為了設定我們的測試環境,我們可以利用 Kafka Testcontainer 無縫地啟動 Kafka Docker 容器進行測試:
@Testcontainers
@SpringBootTest(classes = Application.class)
class DeserializationExceptionLiveTest {
@Container
private static KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
@DynamicPropertySource
static void setProps(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers);
}
// ...
}
除此之外,我們還需要KafkaProducer
和EmailService
來驗證偵聽器的功能。這些元件將向我們的偵聽器發送訊息並驗證其處理是否準確。為了簡化測試並避免模擬,讓我們將所有傳入的文章保存在記憶體中的清單中,然後使用 getter 存取它們:
@Service
class EmailService {
private final List<String> articles = new ArrayList<>();
// logger, getter
public void sendNewsletter(String article) {
log.info("Sending newsletter for article: " + article);
articles.add(article);
}
}
因此,我們只需將EmailService
注入到我們的測試類別中即可。讓我們繼續建立testKafkaProducer
:
@Autowired
EmailService emailService;
static KafkaProducer<String, String> testKafkaProducer;
@BeforeAll
static void beforeAll() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
testKafkaProducer = new KafkaProducer<>(props);
}
透過這個設置,我們已經可以測試快樂的流程了。讓我們使用有效的 JSON 發布兩篇文章,並驗證我們的應用程式是否成功地為每篇文章呼叫了emailService
:
@Test
void whenPublishingValidArticleEvent_thenProcessWithoutErrors() {
publishArticle("{ \"article\": \"Kotlin for Java Developers\" }");
publishArticle("{ \"article\": \"The SOLID Principles\" }");
await().untilAsserted(() ->
assertThat(emailService.getArticles())
.containsExactlyInAnyOrder(
"Kotlin for Java Developers",
"The SOLID Principles"
));
}
4. 引發RecordDeserializationException
如果配置的反序列化器無法正確解析訊息的鍵或值,Kafka 會拋出RecordDeserializationException
。要重現此錯誤,我們只需發布一條包含無效 JSON 正文的訊息:
@Test
void whenPublishingInvalidArticleEvent_thenCatchExceptionAndContinueProcessing() {
publishArticle("{ \"article\": \"Introduction to Kafka\" }");
publishArticle(" !! Invalid JSON !! ");
publishArticle("{ \"article\": \"Kafka Streams Tutorial\" }");
await().untilAsserted(() ->
assertThat(emailService.getArticles())
.containsExactlyInAnyOrder(
"Kotlin for Java Developers",
"The SOLID Principles"
));
}
如果我們執行此測試並檢查控制台,我們將觀察到記錄的重複錯誤:
ERROR 7716 --- [ntainer#0-0-C-1] osklKafkaMessageListenerContainer : Consumer exception
**java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer**
at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:151) ~[spring-kafka-2.8.11.jar:2.8.11]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1815) ~[spring-kafka-2.8.11.jar:2.8.11]
...
**Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition baeldung.articles.published-0 at offset 1. If needed, please seek past the record to continue consumption.**
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1448) ~[kafka-clients-3.1.2.jar:na]
...
**Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data** [[32, 33, 33, 32, 73, 110, 118, 97, 108, 105, 100, 32, 74, 83, 79, 78, 32, 33, 33, 32]] from topic [baeldung.articles.published]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:588) ~[spring-kafka-2.8.11.jar:2.8.11]
...
**Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('!' (code 33))**: expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
**at [Source: (byte[])" !! Invalid JSON !! "; line: 1, column: 3]**
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391) ~[jackson-core-2.13.5.jar:2.13.5]
...
然後,測試最終會逾時並失敗。如果我們檢查斷言錯誤,我們會注意到只有第一個訊息被成功處理:
org.awaitility.core.ConditionTimeoutException: Assertion condition
Expecting actual:
["Introduction to Kafka"]
to contain exactly in any order:
["Introduction to Kafka", "Kafka Streams Tutorial"]
but could not find the following elements:
["Kafka Streams Tutorial"]
within 5 seconds.
正如預期的那樣,第二個訊息的反序列化失敗。因此,偵聽器繼續嘗試使用相同的訊息,導致錯誤重複發生。
5. 建立錯誤處理程序
如果我們仔細分析失敗日誌,我們會注意到兩個建議:
- 考慮配置一個“
ErrorHandlingDeserializer
”; - 如有需要,請翻閱記錄繼續消費;
換句話說,我們可以建立一個自訂錯誤處理程序來處理反序列化異常並增加消費者偏移量。這將使我們能夠跳過無效訊息並繼續消費。
5.1.實作CommonErrorHandler
要實作CommonErrprHandler
接口,我們必須重寫兩個沒有default
實作的公共方法:
-
handleRecord()
– 呼叫以處理單一失敗記錄; -
handleOtherException()
– 拋出異常時調用,但不是針對特定記錄;
我們可以使用類似的方法來處理這兩種情況。讓我們先捕獲異常並記錄錯誤訊息:
class KafkaErrorHandler implements CommonErrorHandler {
@Override
public void handleRecord(Exception exception, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, MessageListenerContainer container) {
handle(exception, consumer);
}
@Override
public void handleOtherException(Exception exception, Consumer<?, ?> consumer, MessageListenerContainer container, boolean batchListener) {
handle(exception, consumer);
}
private void handle(Exception exception, Consumer<?, ?> consumer) {
log.error("Exception thrown", exception);
// ...
}
}
5.2. Kafka Consumer的seek()
和commitSync()
我們可以使用Consumer
介面中的seek()
方法來手動更改主題中特定分區的目前偏移位置。簡而言之,我們可以使用它根據訊息的偏移量根據需要重新處理或跳過訊息。
在我們的例子中,如果異常是RecordDeserializationException,
我們將使用主題分區和下一個偏移量來呼叫seek()
方法:
void handle(Exception exception, Consumer<?, ?> consumer) {
log.error("Exception thrown", exception);
if (exception instanceof RecordDeserializationException ex) {
consumer.seek(ex.topicPartition(), ex.offset() + 1L);
consumer.commitSync();
} else {
log.error("Exception not handled", exception);
}
}
我們可以注意到,我們需要從Consumer
介面呼叫commitSync()
。這將提交偏移量並確保新位置被 Kafka 代理確認並保留。這一步驟至關重要,因為它更新了消費者群組提交的偏移量,表明調整位置之前的訊息已成功處理。
5.3.更新配置
最後,我們需要將自訂錯誤處理程序新增到我們的消費者配置中。讓我們先將其聲明為@Bean
:
@Bean
CommonErrorHandler commonErrorHandler() {
return new KafkaErrorHandler();
}
之後,我們將使用其專用的 setter 將新 bean 新增至ConcurrentKafkaListenerContainerFactory
:
@Bean
ConcurrentKafkaListenerContainerFactory<String, ArticlePublishedEvent> kafkaListenerContainerFactory(
ConsumerFactory<String, ArticlePublishedEvent> consumerFactory,
CommonErrorHandler commonErrorHandler
) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, ArticlePublishedEvent>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(commonErrorHandler);
return factory;
}
就是這樣!我們現在可以重新運行測試,並期望偵聽器跳過無效訊息並繼續使用訊息。
六,結論
在本文中,我們討論了 Spring Kafka 的RecordDeserializationException
,我們發現,如果處理不當,它可能會阻塞給定分區的消費者群組。
接下來,我們深入研究了 Kafka 的CommonErrorHandler
介面並實現了它,以使我們的偵聽器能夠在繼續處理訊息的同時處理反序列化失敗。我們利用消費者的API方法,即seek()
和commitSync(),
透過相應地調整消費者偏移量來繞過無效訊息。
與往常一樣,本文的源代碼可在 GitHub 上取得。