Kafka Streams中的異常處理
1. 概述
Kafka Streams 是一個 Apache Kafka 函式庫,用於建立即時、事件驅動型應用程式來處理資料流。
與任何生產系統一樣,流處理可能會因多種原因而失敗,例如網路逾時、資料損壞或處理錯誤。
在本教程中,我們將學習如何在 Kafka 流應用程式中處理各種異常。我們將實作異常處理機制,並測試在流程處理過程中發生異常時會發生什麼。
2. Kafka Streams 的應用範例
假設我們需要建立一個簡單的 Kafka 流服務,用於聚合和發送資料。
2.1. Maven 依賴項
首先,我們將新增kafka-clients和kafka-streams依賴項:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.9.0</version>
</dependency>
接下來,我們將定義資料模型。
2.2 定義資料模型
我們將定義一個User記錄,生產者和消費者都將使用該記錄:
public record User(String id, String name, String country) {
}
生產者會將User資料傳送到串流媒體服務。
2.3. 實現序列化器和反序列化器
我們的 Kafka 流服務將需要上述User資料的序列化器和反序列化器類別。
首先,我們將透過實作Serializer介面來實作自訂UserSerializer類別:
public class UserSerializer implements Serializer<User> {
private final ObjectMapper mapper = new ObjectMapper();
@Override
public byte[] serialize(String topic, User user) {
if (user == null) {
return null;
}
try {
return mapper.writeValueAsBytes(user);
} catch (JsonProcessingException ex) {
throw new RuntimeException(ex);
}
}
}
上面的程式碼中,我們將User物件序列化為byte數組。
接下來,我們透過實作Deserializer的deserialize()方法來編寫UserDeserializer類別:
public class UserDeserializer implements Deserializer<User> {
private final ObjectMapper mapper = new ObjectMapper();
@Override
public User deserialize(String topic, byte[] bytes) {
if (bytes == null || bytes.length == 0) {
return null;
}
try {
return mapper.readValue(bytes, User.class);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
}
最後,我們將透過擴充Serdes.WrapperSerde類別來實作UserSerde類別:
public class UserSerde extends Serdes.WrapperSerde<User> {
public UserSerde() {
super(new UserSerializer(), new UserDeserializer());
}
}
Serde類別由串流服務使用,用於提供序列化/反序列化實現,從而實現記錄。
2.4. 實現串流服務
串流媒體服務將從user-topic接收User訊息。然後,它將按國家/地區匯總用戶數量,並將其發送到出站主題。
首先,我們將在UserStreamService類別中定義所需的流相關配置:
private static Properties getStreamProperties(String bootstrapServer) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "user-country-aggregator" + UUID.randomUUID());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return props;
}
接下來,我們將使用上述類別中的KStream和KTable實例來實作串流拓撲:
public void start(String bootstrapServer) {
StreamsBuilder builder = new StreamsBuilder();
KStream<String, User> userStream = builder.stream(
"user-topic", Consumed.with(Serdes.String(), new UserSerde()));
KTable<String, Long> usersPerCountry = userStream
.filter((key, user) -> Objects.nonNull(user) && user.country() != null && !user.country().isEmpty())
.groupBy((key, user) -> user.country(), Grouped.with(Serdes.String(), new UserSerde()))
.count(Materialized.as("users_per_country_store"));
usersPerCountry.toStream()
.to("users_per_country", Produced.with(Serdes.String(), Serdes.Long()));
}
在上面的程式碼中,我們透過傳遞入站主題和UserSerde實例來建立一個KStream物件。
然後,我們使用KStream實例建立一個KTable對象,用於儲存每個國家/地區的總計使用者數。聚合過程會過濾掉無效數據,按國家/地區分組,並儲存計數結果。
然後,我們將usersPerCountry聚合資料傳送到出站主題users_per_country 。
需要注意的是,上述過濾器/聚合中的任何異常都可以使用標準的 try-catch 語義來處理。
最後,我們將透過傳遞StreamsBuilder和先前定義的屬性來實例化KafkaStreams物件:
Properties props = getStreamProperties(bootstrapServer);
kafkaStreams = new KafkaStreams(builder.build(), props);
kafkaStreams.start();
Kafkastreams start()方法將執行流管道。
3. Kafka Streams 中的異常處理
流處理可能因多種原因失敗,例如反序列化錯誤、代理不可用、處理錯誤或出站主題中的生產異常。 Kafka 提供了內建的異常處理類別。
我們會處理每一種此類異常情況。
3.1 反序列化異常
對於反序列化相關的錯誤,我們可以將預設的反序列化異常處理程序屬性包含在流屬性中:
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
LogAndContinueExceptionHandler類別將記錄並繼續處理流中的其他訊息。
我們也可以使用LogAndFailExceptionHandler來使流處理失敗。
也可以透過擴充LogAndContinueExceptionHandler或實作DeserializationExceptionHandler介面來提供自訂處理程序。這增強了異常處理能力,例如,允許我們將失敗的訊息傳送到死信佇列。
3.2 生產異常
在向出站主題發送聚合訊息時,資料流也可能失敗。
我們可以將預設的生產處理程序類別包含在流屬性中:
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
DefaultProductionExceptionHandler.class)
如果出現任何異常,上述預設處理程序將導致整個流處理失敗。
或者,我們可以透過實作ProductionExceptionHandler介面並更新相同的配置來實作自訂處理程序:
public class CustomProductionExceptionHandler implements ProductionExceptionHandler {
@Override
public ProductionExceptionHandlerResponse handle(ErrorHandlerContext context,
ProducerRecord<byte[], byte[]> record, Exception exception) {
log.error("ProductionExceptionHandler Error producing record NodeId: {} | TaskId: {} | Topic: {} | Partition: {} | Exception: {}",
context.processorNodeId(), context.taskId(), record.topic(), record.partition(),
exception.getMessage(), exception);
return ProductionExceptionHandlerResponse.CONTINUE;
}
}
在上面的程式碼中,我們可以根據需要使用CONTINUE或FAIL枚舉。
3.3 處理異常
在處理每筆記錄時,流可能會拋出常見的應用程式異常,例如NullPointerException或其他因其他問題導致的例外狀況。
我們將透過實作ProcessingExceptionHandler介面來實作自訂處理異常處理程序:
public class CustomProductionExceptionHandler implements ProductionExceptionHandler {
@Override
public ProductionExceptionHandlerResponse handle(ErrorHandlerContext context,
ProducerRecord<byte[], byte[]> record, Exception exception) {
log.error("ProductionExceptionHandler Error producing record NodeId: {} | TaskId: {} | Topic: {} | Partition: {} | Exception: {}",
context.processorNodeId(), context.taskId(), record.topic(), record.partition(),
exception.getMessage(), exception);
return ProductionExceptionHandlerResponse.CONTINUE;
}
}
在上面的程式碼中,我們使用CONTINUE枚舉來記錄日誌並繼續處理。
我們還需要將上述處理程序包含在流程屬性中:
props.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
CustomProcessingExceptionHandler.class);
如果經過上述處理程序後仍然無法捕獲異常,我們也可以選擇處理未捕獲的異常。
3.4. 未捕獲異常處理程序
為了處理任何未捕獲的異常,我們將使用StreamsUncaughtExceptionHandler介面。
讓我們透過提供實作來實作一個自訂的未捕獲異常處理程序:
public class StreamExceptionHandler implements StreamsUncaughtExceptionHandler {
@Override
public StreamThreadExceptionResponse handle(Throwable exception) {
log.error("Stream encountered fatal exception: {}", exception.getMessage(), exception);
return StreamThreadExceptionResponse.REPLACE_THREAD;
}
}
StreamThreadExceptionResponse的可能回傳值為REPLACE_THREAD 、 SHUTDOWN_CLIENT,和SHUTDOWN_APPLICATION 。REPLACE_THREAD選項僅在發生異常時才會將目前執行的執行緒替換為新執行緒,而**SHUTDOWN_CLIENT則會因本機錯誤而停止本機 Kafka 流用戶端。
**只有當出現嚴重問題,需要關閉所有實例中的整個應用程式時,才會使用SHUTDOWN_APPLICATION選項。
我們也會將其附加到UserStreamService's start()方法中的KafkaStreams實例:
kafkaStreams.setUncaughtExceptionHandler(new StreamExceptionHandler());
此外,我們將在start()方法中新增一個關閉鉤子,以便在 JVM 關閉時優雅地關閉流:
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
接下來,我們將針對各種場景測試UserStreamService類別。
4. 測試串流服務
我們將使用 Kafka 的測試容器來實作UserStreamService類別的整合測試。
4.1. Maven 依賴項
讓我們加入kafka和junit-jupiter測試容器依賴項:
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.19.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.19.3</version>
<scope>test</scope>
</dependency>
接下來,我們將為應用程式設定整合測試環境。
4.2. 設定測試
首先,我們將設定一個UserStreamLiveTest類,其中包含測試生產者和消費者所需的配置:
private static Properties getProducerConfig() {
Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class);
return producerProperties;
}
private static Properties getConsumerConfig() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group-" + UUID.randomUUID());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
然後,我們將新增所需的 Kafka 相關物件依賴項,並啟動容器和串流服務:
@Testcontainers
class UserStreamLiveTest {
@Container
private static final KafkaContainer KAFKA_CONTAINER =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.9.0"));
private static KafkaProducer<String, User> producer;
private static KafkaConsumer<String, Long> consumer;
private static UserStreamService streamService;
@BeforeAll
static void setup() {
KAFKA_CONTAINER.start();
streamService = new UserStreamService();
producer = new KafkaProducer<>(getProducerConfig());
consumer = new KafkaConsumer<>(getConsumerConfig());
new Thread(() -> streamService.start(KAFKA_CONTAINER.getBootstrapServers())).start();
}
}
在上面的程式碼中,我們以 Docker 容器的形式啟動 Kafka 容器,然後在單獨的執行緒中呼叫 stream start()方法。
4.3. 實現串流服務測試
我們將首先測試一個場景,其中發送有效用戶,串流服務聚合資料並將其發送到出站主題。
@Test
void givenValidUserIsSent_whenStreamServiceStarts_thenAggregatedCountIsSent() {
producer.send(new ProducerRecord<>("user-topic", "x1", new User("1", "user1", "US")));
producer.send(new ProducerRecord<>("user-topic", "x2", new User("2", "user2", "DE")));
consumer.subscribe(List.of("users_per_country"));
Awaitility.await()
.atMost(45, TimeUnit.SECONDS)
.pollInterval(Duration.ofSeconds(1))
.untilAsserted(() -> {
ConsumerRecords<String, Long> records = consumer.poll(Duration.ofMillis(500));
Map<String, Long> counts = StreamSupport.stream(records.spliterator(), false)
.collect(Collectors.toMap(ConsumerRecord::key, ConsumerRecord::value));
assertTrue(counts.containsKey("US"));
assertTrue(counts.containsKey("DE"));
assertEquals(1L, counts.get("US"));
assertEquals(1L, counts.get("DE"));
});
}
在上面的測試中,我們看到總計計數與產生的訊息相符。
接下來,我們來測試一下生產者向主題發送無效 JSON 時會發生什麼:
@Test
void givenInvalidUserIsSent_whenStreamServiceStarts_thenAggregatedCountIsEmpty() throws Exception {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
byte[] invalidJson = "{ invalid json".getBytes(StandardCharsets.UTF_8);
producer.send(new ProducerRecord<>("user-topic", "x3", invalidJson));
consumer.subscribe(List.of("users_per_country"));
Awaitility.await()
.atMost(30, TimeUnit.SECONDS)
.pollInterval(Duration.ofSeconds(1))
.untilAsserted(() -> {
ConsumerRecords<String, Long> records = consumer.poll(Duration.ofMillis(500));
assertTrue(records.isEmpty());
});
}
從日誌中我們驗證,該記錄在流中反序列化失敗:
$ 21:53:28.534 [user-country-aggregatorfc997c10-StreamThread-1] ERROR cbkafkastreams.UserDeserializer - Error deserializing the message for topic user-topic
最後,我們將測試生產者發送空資料的情況,並驗證這些資料是否已在資料流中被過濾掉:
@Test
void givenValidAndNullUserIsSent_whenStreamServiceIsRunning_thenReturnAggregatedCount() {
producer.send(new ProducerRecord<>("user-topic", "x4", new User("4", "user4", "IE")));
producer.send(new ProducerRecord<>("user-topic", "x5", null));
consumer.subscribe(List.of("users_per_country"));
Awaitility.await()
.atMost(30, TimeUnit.SECONDS)
.pollInterval(Duration.ofSeconds(1))
.untilAsserted(() -> {
ConsumerRecords<String, Long> records = consumer.poll(Duration.ofMillis(500));
Map<String, Long> counts = StreamSupport.stream(records.spliterator(), false)
.collect(Collectors.toMap(ConsumerRecord::key, ConsumerRecord::value));
assertTrue(counts.containsKey("IE"));
assertEquals(1L, counts.get("IE"));
});
}
如果UserStreamService類別的過濾器中沒有空值檢查,那麼我們將收到處理程序日誌訊息:
$ 21:59:18.574 [user-country-aggregator41942d3b-StreamThread-1] ERROR cbkCustomProcessingExceptionHandler - ProcessingExceptionHandler Error for record NodeId: KSTREAM-FILTER-0000000001 | TaskId: 0_0 | Key: x4 | Value: null | Exception: Cannot invoke "com.baeldung.kafkastreams.User.country()" because "user" is null
CustomProcessingExceptionHandler的handle()方法傳回上述錯誤訊息。
5. 結論
本文透過範例說明如何實現 Kafka 流服務。我們還實現了各種異常處理程序,例如反序列化異常、生產異常、處理異常和未捕獲異常。
最後,我們透過向流發送訊息並接收聚合計數來測試整個設定。
和往常一樣,範例程式碼可以在 GitHub 上找到。