如何修復 Apache Kafka 中的未知魔法位元組錯誤
1.概述
在本文中,我們將學習如何處理使用 Spring Kafka 消費 Avro 訊息時出現的「 Unknown magic byte
」錯誤以及其他反序列化問題。我們將探索ErrorHandlingDeserializer
函數,並了解它如何幫助管理「毒丸」訊息。
最後,我們將配置DefaultErrorHandler
和DeadLetterPublishingRecoverer
,將有問題的記錄路由到 DLQ 主題,確保消費者繼續處理而不會卡住。
2. 毒丸和魔法字節
有時,我們會收到一些由於格式問題或內容異常而無法處理的訊息——這些訊息被稱為「毒丸」訊息。與其無休止地處理這些訊息,不如優雅地處理它們。
在 Kafka 中,當消費者期望接收 Avro 編碼資料但實際接收的並非 Avro 編碼資料時,可能會出現毒丸訊息。例如,使用StringSerializer
的生產者可能會向期望接收 Avro 編碼資料的主題發送純文字訊息,導致消費者端的AvroDeserializer
失敗:
因此,我們會收到帶有“Unknown magic byte”
訊息的反序列化錯誤。 「魔法位元組」是 Avro 編碼訊息開頭的標記,用於幫助反序列化器識別並正確處理該訊息。如果該訊息未使用 Avro 序列化器進行序列化,且不以此位元組開頭,則反序列化器會拋出錯誤,指示格式不符。
3. 重現問題
為了重現此問題,我們將使用一個簡單的 Spring Boot 應用程序,該應用程式從 Kafka 主題中使用 Avro 格式的訊息。我們的應用將使用spring-kafka 、 avro和kafka-avro-deserialzier依賴項:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.9.1</version>
</dependency>
此外,我們的服務使用@KafkaListener
監聽所有來自“baeldung.article.published”
主題的訊息。為了演示,我們將所有傳入訊息的文章名稱儲存在記憶體中的List
中:
@Component
class AvroMagicByteApp {
// logger
List<String> blog = new ArrayList<>();
@KafkaListener(topics = "baeldung.article.published")
public void listen(Article article) {
LOG.info("a new article was published: {}", article);
blog.add(article.getTitle());
}
}
接下來,我們將新增 Kafka 特定的應用程式屬性。由於我們使用 Spring Boot 內建的 Testcontainers 支持,因此可以省略bootstrap-servers
屬性,因為它會自動注入。我們也將schema.registry.url
設定為“ mock://test
”,因為測試期間我們不會使用真實的 schema 註冊表:
spring:
kafka:
# bootstrap-servers <-- it'll be injected in test by Spring and Testcontainers
consumer:
group-id: test-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
schema.registry.url: mock://test
specific.avro.reader: true
就是這樣,我們現在可以使用 Testcontainers 啟動帶有 Kafka 代理程式的 Docker 容器並測試我們簡單應用程式的快樂路徑。
但是,如果我們向測試主題發布毒丸訊息,就會遇到“Unknown magic byte!”
異常。為了產生不合規的訊息,我們將利用一個使用StringSerializer
的KafkaTemplate
實例,並向主題發布一個虛擬String
:
@SpringBootTest
class AvroMagicByteLiveTest {
@Container
@ServiceConnection
static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("apache/kafka:4.0.0"));
@Test
void whenSendingMalformedMessage_thenSendToDLQ() throws Exception {
stringKafkaTemplate()
.send("baeldung.article.published", "not a valid avro message!")
.get();
Thread.sleep(10_000L);
// manually verify that the poison-pill message is handled correctly
}
private static KafkaTemplate<Object, Object> stringKafkaTemplate() { /* ... */ }
}
此外,我們還暫時添加了一個Thread.sleep()
,用於觀察應用程式日誌。不出所料,我們的服務無法反序列化訊息,並遇到了“Unknown magic byte!”
錯誤:
ERROR osklKafkaMessageListenerContainer - Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; __please consider configuring an 'ErrorHandlingDeserializer'__ in the value and/or key deserializer
at org.springframework.kafka...DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:192)
[...]
Caused by: org.apache.kafka...RecordDeserializationException:
Error deserializing VALUE for partition baeldung.article.published-0 at offset 1.
__If needed, please seek past the record to continue consumption.__
at org.apache.kafka.clients...CompletedFetch.newRecordDeserializationException(CompletedFetch.java:346)
[...]
Caused by: org.apache.kafka...errors.SerializationException: __Unknown magic byte!__
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:649)
[...]
此外,由於我們沒有正確處理該訊息,並且從未確認過,我們也會反覆看到此錯誤。簡而言之,消費者會卡在該偏移量處,並不斷嘗試處理格式錯誤的訊息。
4. 錯誤處理反序列化器
幸運的是,錯誤日誌很詳細,甚至提出了可能的修復方法:
This error handler cannot process 'SerializationException's directly;
please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer.
Spring Kafka 中的ErrorHandlingDeserializer
是一個包裝器,用於捕獲反序列化錯誤,並允許我們的應用程式優雅地處理這些錯誤,從而防止消費者崩潰。它的工作原理是將實際的反序列化委託給另一個反序列化器(例如JsonDeserializer
或KafkaAvroDeserializer,
並捕獲在此過程中拋出的任何異常。
要配置它,我們將value-deserializer
屬性更新為ErrorHandlingDeserializer
。此外,我們將在spring.kafka.consumer.spring.deserializer.value.delegate.class
下指定原始反序列化器:
spring.kafka:
consumer:
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
使用此配置,「 Unknown magic byte!
」異常只會在日誌中出現一次。這一次,應用程式優雅地處理了毒丸訊息,並繼續運行,而無需再次嘗試反序列化。
5.發佈到 DLQ
到目前為止,我們已經為訊息負載配置了ErrorHandlingDeserializer
,並正確處理了毒丸攻擊場景。但是,如果我們只是捕獲異常並繼續處理,檢查或恢復這些故障訊息就會變得非常困難。為了解決這個問題,我們應該考慮將它們發送到 DLQ 主題。
死信佇列 (DLQ) 是一個特殊主題,用於儲存經過一次或多次嘗試仍無法成功處理的訊息。讓我們在應用程式中啟用此行為:
@Configuration
class DlqConfig {
@Bean
DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer dlqPublishingRecoverer) {
return new DefaultErrorHandler(dlqPublishingRecoverer);
}
@Bean
DeadLetterPublishingRecoverer dlqPublishingRecoverer(KafkaTemplate<byte[], byte[]> bytesKafkaTemplate) {
return new DeadLetterPublishingRecoverer(bytesKafkaTemplate);
}
@Bean("bytesKafkaTemplate")
KafkaTemplate<?, ?> bytesTemplate(ProducerFactory<?, ?> kafkaProducerFactory) {
return new KafkaTemplate<>(kafkaProducerFactory,
Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()));
}
}
我們可以觀察到,我們定義了一個DefaultErrorHandler
bean,它決定哪些異常可重試。在我們的例子中,反序列化異常被視為不可重試,因此它們將直接發送到 DLQ 。在建立錯誤處理程序時,我們將透過建構函式註入一個DeadLetterPublishingRecoverer
實例。
另一方面,由於毒丸訊息的具體格式未知, dlqPublishingRecoverer
使用帶有ByteArraySerializer
的KafkaTemplate
將失敗的訊息轉發到 DLQ 主題。此外,它還負責解析 DLQ 主題名稱;預設情況下,它會在原始主題名稱後面附加「 -dlt
」:
@Test
void whenSendingMalformedMessage_thenSendToDLQ() throws Exception {
stringKafkaTemplate()
.send("baeldung.article.published", "not a valid avro message!")
.get();
var dlqRecord = listenForOneMessage("baeldung.article.published-dlt", ofSeconds(5L));
assertThat(dlqRecord.value())
.isEqualTo("not a valid avro message!");
}
private static ConsumerRecord<?, ?> listenForOneMessage(String topic, Duration timeout) {
return KafkaTestUtils.getOneRecord(
kafka.getBootstrapServers(), "test-group-id", topic, 0, false, true, timeout);
}
可以看到,配置ErrorHandlingDeserializer
使我們能夠優雅地處理格式錯誤的訊息。之後,自訂的DefaultErrorHandler
和DeadLetterPublishingRecoverer
Bean 使我們能夠將這些錯誤訊息推送到 DLQ 主題。
6. 結論
在本教學中,我們介紹如何解決使用 Spring Kafka 處理 Avro 訊息時可能出現的“Unknown magic byte”
錯誤以及其他反序列化問題。我們也探討了ErrorHandlingDeserializer
如何防止消費者被有問題的消息阻塞。
最後,我們回顧了死信佇列的概念,並配置了 Spring Kafka bean 將毒丸訊息路由到專用 DLQ 主題,確保處理順暢且不間斷。
與往常一樣,本文中提供的程式碼可在 GitHub 上找到。