在 Spring Boot 中動態管理 Kafka 監聽器
1. 概述
在當今的事件驅動架構中,有效管理資料流至關重要。 Apache Kafka 是一個流行的選擇,但儘管有 Spring Kafka 等輔助框架,但將其整合到我們的應用程式中仍面臨挑戰。一項主要挑戰是實施適當的動態偵聽器管理,這提供了靈活性和控制力,這對於適應應用程式不斷變化的工作負載和維護至關重要。
在本教程中,我們將學習如何在 Spring Boot 應用程式中動態啟動和停止 Kafka 監聽器。
2. 前提條件
首先,我們將spring-kafka
依賴導入到我們的專案中:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.1.2</version>
</dependency>
3.配置Kafka消費者
生產者是將事件發布(寫入)到 Kafka 主題的應用程式。
在本教程中,我們將使用單元測試來模擬生產者將事件傳送到 Kafka 主題。訂閱主題並處理事件流的消費者由我們應用程式中的偵聽器代表。此偵聽器配置為處理來自 Kafka 的傳入訊息。
讓我們透過KafkaConsumerConfig
類別來配置我們的 Kafka 消費者,其中包括 Kafka 代理的地址、消費者群組 ID 以及鍵和值的反序列化器:
@Bean
public DefaultKafkaConsumerFactory<String, UserEvent> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.baeldung.spring.kafka.start.stop.consumer");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(UserEvent.class));
}
4.配置Kafka監聽器
在Spring Kafka中,**使用@KafkaListener
註解一個方法會建立一個監聽器**,該監聽器會消費來自指定主題的訊息。為了定義它,我們聲明一個UserEventListener
類別:
@KafkaListener(id = Constants.LISTENER_ID, topics = Constants.MULTI_PARTITION_TOPIC, groupId = "test-group",
containerFactory = "kafkaListenerContainerFactory", autoStartup = "false")
public void processUserEvent(UserEvent userEvent) {
logger.info("Received UserEvent: " + userEvent.getUserEventId());
userEventStore.addUserEvent(userEvent);
}
上面的監聽器等待來自主題multi_partition_topic
訊息並使用processUserEvent()
方法處理它們。我們將groupId
指定為test-group
,確保消費者成為更廣泛群組的一部分,從而促進跨多個實例的分散式處理。
我們使用id
屬性為每個偵聽器分配一個唯一識別碼。在此範例中,指派的偵聽器 ID 為listener-id-1
。
autoStartup
屬性使我們能夠控制在應用程式初始化時是否啟動偵聽器。在我們的範例中,我們將其設為false,
這表示偵聽器不會在應用程式啟動時自動啟動。此配置為我們提供了手動啟動偵聽器的靈活性。
此手動啟動可以由各種事件觸發,例如新使用者註冊、應用程式內的特定條件(例如達到特定資料量閾值)或管理操作(例如透過管理介面手動啟動偵聽器)。例如,如果線上零售應用程式在限時搶購期間偵測到流量激增,它可以自動啟動額外的偵聽器來處理增加的負載,從而優化效能。
UserEventStore
充當偵聽器接收到的事件的暫存:
@Component
public class UserEventStore {
private final List<UserEvent> userEvents = new ArrayList<>();
public void addUserEvent(UserEvent userEvent) {
userEvents.add(userEvent);
}
public List<UserEvent> getUserEvents() {
return userEvents;
}
public void clearUserEvents() {
this.userEvents.clear();
}
}
5. 動態控制監聽器
讓我們建立一個使用KafkaListenerEndpointRegistry
動態啟動和停止 Kafka 監聽器的KafkaListenerControlService
:
@Service
public class KafkaListenerControlService {
@Autowired
private KafkaListenerEndpointRegistry registry;
public void startListener(String listenerId) {
MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId);
if (listenerContainer != null && !listenerContainer.isRunning()) {
listenerContainer.start();
}
}
public void stopListener(String listenerId) {
MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId);
if (listenerContainer != null && listenerContainer.isRunning()) {
listenerContainer.stop();
}
}
}
KafkaListenerControlService
可以**根據指派的 ID 精確管理各個偵聽器實例**。 startListener()
和stopListener()
方法都使用listenerId
作為參數,讓我們可以根據需要啟動和停止主題的訊息來消費.
KafkaListenerEndpointRegistry
充當 Spring 應用程式上下文中定義的所有 Kafka 偵聽器端點的中央儲存庫。它監視這些偵聽器容器,從而允許對其狀態進行程式控制,無論是啟動、停止還是暫停。對於需要即時調整其訊息處理活動而無需重新啟動整個應用程式的應用程式來說,此功能至關重要。
6. 驗證動態監聽器控件
接下來,讓我們重點測試 Spring Boot 應用程式中 Kafka 偵聽器的動態啟動和停止功能。首先,讓我們啟動監聽器:
kafkaListenerControlService.startListener(Constants.LISTENER_ID);
然後,我們透過發送並處理測試事件來驗證偵聽器是否已啟動:
UserEvent startUserEventTest = new UserEvent(UUID.randomUUID().toString());
producer.send(new ProducerRecord<>(Constants.MULTI_PARTITION_TOPIC, startUserEventTest));
await().untilAsserted(() -> assertEquals(1, this.userEventStore.getUserEvents().size()));
this.userEventStore.clearUserEvents();
現在偵聽器已處於活動狀態,我們將發送一批 10 則訊息進行處理。發送四個訊息後,我們將停止偵聽器,然後將剩餘的訊息傳送到 Kafka 主題:
for (long count = 1; count <= 10; count++) {
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(Constants.MULTI_PARTITION_TOPIC, userEvent));
RecordMetadata metadata = future.get();
if (count == 4) {
await().untilAsserted(() -> assertEquals(4, this.userEventStore.getUserEvents().size()));
this.kafkaListenerControlService.stopListener(Constants.LISTENER_ID);
this.userEventStore.clearUserEvents();
}
logger.info("User Event ID: " + userEvent.getUserEventId() + ", Partition : " + metadata.partition());
}
在啟動偵聽器之前,我們將驗證事件儲存體中沒有訊息:
assertEquals(0, this.userEventStore.getUserEvents().size());
kafkaListenerControlService.startListener(Constants.LISTENER_ID);
await().untilAsserted(() -> assertEquals(6, this.userEventStore.getUserEvents().size()));
kafkaListenerControlService.stopListener(Constants.LISTENER_ID);
一旦偵聽器再次啟動,它就會處理偵聽器停止後我們發送到 Kafka 主題的剩餘 6 個訊息。該測試展示了 Spring Boot 應用程式動態管理 Kafka 監聽器的能力。
7. 使用案例
動態監聽管理在適應性要求較高的場景中表現出色。例如,在尖峰負載期間,我們可以動態啟動額外的偵聽器以提高吞吐量並減少處理時間。相反,在維護或低流量期間,我們可以停止偵聽器以節省資源。這種靈活性也有利於在功能標誌後面部署新功能,從而允許無縫即時調整,而不會影響整個系統。
讓我們考慮一個場景,其中電子商務平台引入了新的推薦引擎,旨在透過根據瀏覽歷史記錄和購買模式推薦產品來增強用戶體驗。為了在全面啟動之前驗證此功能的有效性,我們決定將其部署在功能標誌後面。
啟動此功能標誌將啟動 Kafka 偵聽器。當最終使用者與平台互動時,由 Kafka 監聽器支援的推薦引擎會處理傳入的使用者活動資料流,以產生個人化的產品推薦。
當我們停用功能標誌時,我們會停止 Kafka 偵聽器,並且平台預設使用其現有的推薦引擎。無論新引擎處於測試階段如何,這都確保了無縫的用戶體驗。
當該功能處於活動狀態時,我們會主動收集數據、監控效能指標並對推薦引擎進行調整。我們在多次迭代中重複此功能測試,直到達到預期結果。
透過這個迭代過程,動態監聽器管理被證明是一個有價值的工具。它允許無縫引入新功能
八、結論
在本文中,我們討論了 Kafka 與 Spring Boot 的集成,重點是動態管理 Kafka 監聽器。此功能對於管理波動的工作負載和執行日常維護至關重要。此外,它還支援功能切換、根據流量模式擴展服務,並使用特定觸發器管理事件驅動的工作流程。
與往常一樣,範例的原始程式碼可以在 GitHub 上取得。