Kafka 中的流拆分
一、簡介
在本教程中,我們將探討如何在 Kafka Streams 中動態路由訊息。當訊息的目標主題取決於其內容時,動態路由特別有用,使我們能夠根據有效負載中的特定條件或屬性來引導訊息。這種條件路由可以在物聯網事件處理、使用者活動追蹤和詐欺偵測等各個領域找到實際應用。
我們將逐步解決從單一 Kafka 主題消費訊息並有條件地將其路由到多個目標主題的問題。主要關注點是如何使用 Kafka Streams 庫在 Spring Boot 應用程式中進行設定。
2.Kafka流路由技術
Kafka Streams 中訊息的動態路由並不局限於單一方法,而是可以使用多種技術來實現。每種方案都有其獨特的優勢、挑戰以及對不同場景的適用性:
-
KStream
條件分支:KStream.split().branch()
方法是基於謂詞分離流的傳統方法。雖然此方法很容易實現,但在擴展條件數量時存在局限性,並且可能變得難以管理。 - 使用
KafkaStreamBrancher
進行分支:此功能出現在 Spring Kafka 2.2.4 版本中。它提供了一種更優雅、更可讀的方式在 Kafka Stream 中創建分支,消除了對「幻數」的需求,並允許更流暢的串流操作連結。 - 使用
TopicNameExtractor
進行動態路由:主題路由的另一種方法是使用TopicNameExtractor
。這允許在運行時根據訊息鍵、值甚至整個記錄上下文進行更動態的主題選擇。但是,它需要提前創建主題。這種方法可以對主題選擇進行更精細的控制,並且更適合複雜的用例。 - 自訂處理器:對於需要複雜路由邏輯或多個鍊式操作的場景,我們可以在Kafka Streams拓撲中套用自訂處理器節點。這種方法是最靈活的,但實施起來也是最複雜的。
在本文中,我們將重點放在前三種方法的實作: KStream
條件分支、使用KafkaStreamBrancher
進行分支以及使用TopicNameExtractor
進行動態路由。
3. 環境搭建
在我們的場景中,我們有一個物聯網感測器網絡,將各種類型的資料(例如溫度、濕度和運動)傳輸到名為iot_sensor_data
的集中式 Kafka 主題。每個傳入訊息都包含一個 JSON 對象,其中包含一個名為sensorType
的字段,該字段指示感測器發送的資料類型。我們的目標是將這些訊息動態路由到每種類型感測器資料的專用主題。
首先,讓我們建立一個正在運行的 Kafka 實例。我們可以使用 Docker 和 Docker Compose 透過建立docker-compose.yml
檔案來設定 Kafka、Zookeeper 和Kafka UI :
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka:29092,EXTERNAL://localhost:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
kafka_ui:
image: provectuslabs/kafka-ui:latest
depends_on:
- kafka
ports:
- 8082:8080
environment:
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
kafka-init-topics:
image: confluentinc/cp-kafka:latest
depends_on:
- kafka
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b kafka:29092 1 30 && \
kafka-topics --create --topic iot_sensor_data --partitions 1 --replication-factor 1 --if-not-exists --bootstrap-server kafka:29092'"
在這裡,我們設定所有必需的環境變數和服務之間的依賴關係。此外,我們正在使用kafka-init-topics
服務中的特定命令來建立iot_sensor_data
主題。
現在我們可以透過執行docker-compose up -d
在 Docker 中運行 Kafka。
接下來,我們必須將 Kafka Streams 依賴項新增到pom.xml
檔案中:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.6.0</version>`
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.0.12</version>
</dependency>
第一個依賴項是[org.apache.kafka.kafka-streams](https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams)
套件,它提供 Kafka Streams 功能。隨後的 Maven 套件[org.springframework.kafka.spring-kafka](https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka)
方便了 Kafka 與 Spring Boot 的配置和整合。
另一個重要方面是配置 Kafka 代理的位址。這通常是透過在應用程式的屬性檔案中指定代理詳細資訊來完成的。讓我們將此配置與其他屬性一起新增到我們的application.properties
檔案中:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.streams.application-id=baeldung-streams
spring.kafka.consumer.group-id=baeldung-group
spring.kafka.streams.properties[default.key.serde]=org.apache.kafka.common.serialization.Serdes$StringSerde
kafka.topics.iot=iot_sensor_data
接下來,我們定義一個範例資料類IotSensorData
:
public class IotSensorData {
private String sensorType;
private String value;
private String sensorId;
}
最後,我們需要配置Serde
以用於 Kafka 中類型化訊息的序列化和反序列化:
@Bean
public Serde<IotSensorData> iotSerde() {
return Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(IotSensorData.class));
}
4. 在Kafka Streams中實現動態路由
設定好環境並安裝所需的依賴項後,讓我們專注於在 Kafka Streams 中實現動態路由邏輯。
動態消息路由可以是事件驅動應用程式的重要組成部分,因為它使系統能夠適應各種類型的資料流和條件,而無需更改程式碼。
4.1. KStream
條件分支
Kafka Streams 中的分支允許我們取得單一資料流並根據某些條件將其拆分為多個流。這些條件作為謂詞提供,用於在每個訊息通過流時對其進行評估。
在 Kafka Streams 的最新版本中, branch()
方法已被棄用,取而代之的是較新的split().branch()
方法,該方法旨在提高 API 的整體可用性和靈活性。儘管如此,我們可以以同樣的方式應用它,根據某些謂詞將KStream
拆分為多個流。
這裡我們定義了利用split().branch()
方法進行動態主題路由的設定:
@Bean
public KStream<String, IotSensorData> iotStream(StreamsBuilder streamsBuilder) {
KStream<String, IotSensorData> stream = streamsBuilder.stream(iotTopicName, Consumed.with(Serdes.String(), iotSerde()));
stream.split()
.branch((key, value) -> "temp".equals(value.getSensorType()), Branched.withConsumer((ks) -> ks.to(iotTopicName + "_temp")))
.branch((key, value) -> "move".equals(value.getSensorType()), Branched.withConsumer((ks) -> ks.to(iotTopicName + "_move")))
.branch((key, value) -> "hum".equals(value.getSensorType()), Branched.withConsumer((ks) -> ks.to(iotTopicName + "_hum")))
.noDefaultBranch();
return stream;
}
在上面的範例中,我們根據sensorType
屬性將iot_sensor_data
主題中的初始流拆分為多個流,並相應地將它們路由到其他主題。
如果可以根據訊息內容產生目標主題名稱,我們可以在to
方法中使用 lambda 函數來實現更動態的主題路由:
@Bean
public KStream<String, IotSensorData> iotStreamDynamic(StreamsBuilder streamsBuilder) {
KStream<String, IotSensorData> stream = streamsBuilder.stream(iotTopicName, Consumed.with(Serdes.String(), iotSerde()));
stream.split()
.branch((key, value) -> value.getSensorType() != null,
Branched.withConsumer(ks -> ks.to((key, value, recordContext) -> "%s_%s".formatted(iotTopicName, value.getSensorType()))))
.noDefaultBranch();
return stream;
}
如果可以根據訊息內容產生主題名稱,則此方法可以為根據訊息內容動態路由訊息提供更大的靈活性。
4.2.使用KafkaStreamBrancher
進行路由
KafkaStreamBrancher
類別提供了一個建構器風格的 API,可以更輕鬆地連結分支條件,使程式碼更具可讀性和可維護性。
主要好處是消除了與管理分支流數組相關的複雜性,這就是原始KStream.branch
方法的工作原理。相反, KafkaStreamBrancher
讓我們定義每個分支以及該分支應該發生的操作,從而無需使用幻數或複雜的索引來識別正確的分支。由於引入了split().branch()
方法,這種方法與前面討論的前一種方法密切相關。
讓我們將此方法應用於串流:
@Bean
public KStream<String, IotSensorData> kStream(StreamsBuilder streamsBuilder) {
KStream<String, IotSensorData> stream = streamsBuilder.stream(iotTopicName, Consumed.with(Serdes.String(), iotSerde()));
new KafkaStreamBrancher<String, IotSensorData>()
.branch((key, value) -> "temp".equals(value.getSensorType()), (ks) -> ks.to(iotTopicName + "_temp"))
.branch((key, value) -> "move".equals(value.getSensorType()), (ks) -> ks.to(iotTopicName + "_move"))
.branch((key, value) -> "hum".equals(value.getSensorType()), (ks) -> ks.to(iotTopicName + "_hum"))
.defaultBranch(ks -> ks.to("%s_unknown".formatted(iotTopicName)))
.onTopOf(stream);
return stream;
}
我們應用 Fluent API 將訊息路由到特定主題。類似地,我們可以透過使用內容作為主題名稱的一部分,使用單一branch()
方法呼叫來路由到多個主題:
@Bean
public KStream<String, IotSensorData> iotBrancherStream(StreamsBuilder streamsBuilder) {
KStream<String, IotSensorData> stream = streamsBuilder.stream(iotTopicName, Consumed.with(Serdes.String(), iotSerde()));
new KafkaStreamBrancher<String, IotSensorData>()
.branch((key, value) -> value.getSensorType() != null, (ks) ->
ks.to((key, value, recordContext) -> String.format("%s_%s", iotTopicName, value.getSensorType())))
.defaultBranch(ks -> ks.to("%s_unknown".formatted(iotTopicName)))
.onTopOf(stream);
return stream;
}
透過為分支邏輯提供更高層級的抽象, KafkaStreamBrancher
不僅使程式碼更加簡潔,而且增強了其可管理性,特別是對於具有複雜路由需求的應用程式。
4.3.使用TopicNameExtractor
進行動態主題路由
在 Kafka Streams 中管理條件分支的另一種方法是使用TopicNameExtractor
,顧名思義,它為流中的每個訊息動態提取主題名稱。與先前討論的split().branch()
和KafkaStreamBrancher
方法相比,此方法對於某些用例來說更加簡單。
以下是在 Spring Boot 應用程式中使用TopicNameExtractor
範例配置:
@Bean
public KStream<String, IotSensorData> kStream(StreamsBuilder streamsBuilder) {
KStream<String, IotSensorData> stream = streamsBuilder.stream(iotTopicName, Consumed.with(Serdes.String(), iotSerde()));
TopicNameExtractor<String, IotSensorData> sensorTopicExtractor = (key, value, recordContext) -> "%s_%s".formatted(iotTopicName, value.getSensorType());
stream.to(sensorTopicExtractor);
return stream;
}
雖然TopicNameExtractor
方法精通將記錄路由到特定主題的主要功能,但與split().branch()
和KafkaStreamBrancher
等其他方法相比,它有一些限制。具體來說, TopicNameExtractor
不提供在同一路由步驟中執行其他轉換(例如映射或過濾)的選項。
5. 結論
在本文中,我們了解了使用 Kafka Streams 和 Spring Boot 進行動態主題路由的不同方法。
我們首先探索現代分支機制,例如split().branch()
方法和KafkaStreamBrancher
類別。此外,我們也檢查了TopicNameExtractor
提供的動態主題路由功能。
每種技術都有其優點和挑戰。例如, split().branch(
)在處理大量條件時可能很麻煩,而TopicNameExtractor
提供了結構化流程,但限制了某些內嵌資料處理。因此,掌握每種方法的細微差別對於創建有效的路由實作至關重要。
與往常一樣,完整的源代碼可以在 GitHub 上取得。