Spring Kafka:在同一主題上配置多個監聽器
一、概述
在本文中,我們將通過一個實際示例來學習如何為同一個 Kafka 主題配置多個偵聽器。
如果這是第一次在 Spring 上配置 Kafka,那麼可以從我們對 Apache Kafka with Spring 的介紹開始。
2.項目設置
讓我們構建一個圖書消費者服務,它監聽圖書館內新到的圖書,並出於不同的目的使用它們,例如全文內容搜索、價格索引或用戶通知。
首先,讓我們創建一個 Spring Boot 服務並使用spring-kafka
依賴:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
此外,讓我們定義聽眾將使用的BookEvent
:
public class BookEvent {
private String title;
private String description;
private Double price;
// standard constructors, getters and setters
}
3. 生產消息
Kafka 生產者對生態系統至關重要,因為生產者將消息寫入 Kafka 集群。考慮到這一點,首先,我們需要定義一個生產者,它在一個主題上寫入消息,稍後由消費者應用程序使用。
按照我們的示例,讓我們編寫一個簡單的 Kafka 生產者函數,將新的BookEvent
對象寫入“books”主題。
private static final String TOPIC = "books";
@Autowired
private KafkaTemplate<String, BookEvent> bookEventKafkaTemplate;
public void sentBookEvent(BookEvent book){
bookEventKafkaTemplate.send(TOPIC, UUID.randomUUID().toString(), book);
}
4. 從多個監聽器中消費相同的 Kafka 主題
Kafka 消費者是訂閱 Kafka 集群的一個或多個主題的客戶端應用程序。稍後,我們將研究如何針對同一主題設置多個偵聽器。
4.1.消費者配置
首先,要配置消費者,我們需要定義監聽器需要的ConcurrentKafkaListenerContainerFactory Bean。
現在,讓我們定義我們將用於使用BookEvent
對象的容器工廠:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, BookEvent> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, BookEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
public ConsumerFactory<String, BookEvent> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
// required consumer factory properties
return new DefaultKafkaConsumerFactory<>(props);
}
}
接下來,我們將研究用於收聽傳入消息的不同策略。
4.2.具有相同消費者組的多個偵聽器
將多個偵聽器添加到同一消費者組的一種策略是提高同一消費者組內的並發級別。因此,我們可以簡單地在@KafkaListener
註釋中指定它。
為了理解它是如何工作的,讓我們為我們的庫定義一個通知監聽器:
@KafkaListener(topics = "books", groupId = "book-notification-consumer", concurrency = "2")
public void bookNotificationConsumer(BookEvent event) {
logger.info("Books event received for notification => {}", event);
}
接下來,我們將在發布三個消息後看到控制台輸出。此外,讓我們了解為什麼消息只被消費一次:
Books event received for notification => BookEvent(title=book 1, description=description 1, price=1.0)
Books event received for notification => BookEvent(title=book 2, description=description 2, price=2.0)
Books event received for notification => BookEvent(title=book 3, description=description 3, price=3.0)
發生這種情況是因為,在內部,對於每個並發級別,Kafka 在同一消費者組中實例化一個新的偵聽器。此外,同一消費者組內的所有偵聽器實例的範圍是在彼此之間分發消息以更快地完成工作並提高吞吐量。
4.3.具有不同消費者群體的多個聽眾
如果我們需要多次使用相同的消息並為每個偵聽器應用不同的處理邏輯,我們必須將@KafkaListener
配置為具有不同的組 ID。通過這樣做, Kafka 將為每個監聽器創建專門的消費者組,並將所有已發布的消息推送給每個監聽器。
要查看此策略的實際效果,讓我們為全文搜索索引定義一個偵聽器,並為價格索引定義一個偵聽器。兩者都將收聽相同的“書籍”主題:
@KafkaListener(topics = "books", groupId = "books-content-search")
public void bookContentSearchConsumer(BookEvent event) {
logger.info("Books event received for full-text search indexing => {}", event);
}
@KafkaListener(topics = "books", groupId = "books-price-index")
public void bookPriceIndexerConsumer(BookEvent event) {
logger.info("Books event received for price indexing => {}", event);
}
現在,讓我們運行上面的代碼並分析輸出:
Books event received for price indexing => BookEvent(title=book 1, description=description 1, price=1.0)
Books event received for full-text search indexing => BookEvent(title=book 1, description=description 1, price=1.0)
Books event received for full-text search indexing => BookEvent(title=book 2, description=description 2, price=2.0)
Books event received for price indexing => BookEvent(title=book 2, description=description 2, price=2.0)
Books event received for full-text search indexing => BookEvent(title=book 3, description=description 3, price=3.0)
Books event received for price indexing => BookEvent(title=book 3, description=description 3, price=3.0)
正如我們所見,兩個偵聽器都接收到每個BookEvent
,並且可以對所有傳入消息應用獨立的處理邏輯。
5. 何時使用不同的偵聽器策略
正如我們已經了解到的,我們可以設置多個偵聽器,方法是將@KafkaListener
註釋的concurrency
配置為大於 1 的值,或者定義多個@KafkaListener
方法來偵聽同一個 Kafka 主題並分配不同的消費者 ID .
選擇一種策略或另一種策略取決於我們想要實現的目標。只要我們解決性能問題以通過更快地處理消息來提高吞吐量,正確的策略就是增加同一消費者組中的偵聽器數量。
然而,為了多次處理同一條消息以滿足不同的需求,我們應該定義專門的監聽器,具有不同的消費者組來監聽同一主題。
根據經驗,我們應該為每個需要滿足的需求使用一個消費者組,如果我們需要使該偵聽器更快,我們可以增加同一消費者組中的偵聽器數量。
六,結論
在本文中,我們學習瞭如何使用 Spring Kafka 庫為同一主題配置多個偵聽器,並查看了圖書庫的實際示例。我們從生產者和消費者配置開始,然後繼續使用不同的方式為同一主題添加多個偵聽器。
與往常一樣,可以在 GitHub 上找到示例的完整源代碼。