在多個測試類別中重複使用嵌入式 Kafka Broker
1. 引言
在使用 Kafka 消費者時,我們經常需要透過整合測試來驗證訊息流。
Spring Kafka 中的[EmbeddedKafkaBroker](https://docs.spring.io/spring-kafka/reference/testing.html#ekb)提供了一個可靠且獨立的記憶體 Kafka 代理,使這一切變得更加容易。
然而,隨著測試套件的成長,每個測試類別往往都會啟動自己的 Kafka broker。這種行為會降低測試執行速度並增加資源消耗。本文將介紹如何在多個測試類別中重複使用嵌入式 Kafka broker,以加快整合測試的速度。
2. 問題陳述
Spring 的EmbeddedKafkaBroker會在每次測試類別載入應用程式上下文時建立一個新的代理程式。
當一個專案包含多個監聽器或多個測試類別時,每個監聽器或測試類別最終都會啟動自己的代理程式。
這種方法會導致:
- 測試啟動時間更長
- CI建置期間的額外開銷
另一方面,共享經紀商則具有幾個明顯的優勢:
- 由於只有一個代理程式啟動,因此測試執行速度更快。
- 多個測試類別中測試行為的一致性
- 更簡單的資源清理
透過這種方法,我們可以跨多個測試類別重複使用嵌入式 Kafka 代理,從而提高速度和一致性。
3. 在多個測試類別中重複使用嵌入式 Kafka Broker
假設我們的應用程式包含一個簡單的訂單監聽器和支付監聽器。它們分別接收訊息,並使用AdminClient API 記錄詳細資訊以及經紀商資訊。
為了幫助驗證測試期間訊息的接收情況,兩個監聽器都維護了一個靜態的CountDownLatch ,這有助於我們追蹤訊息何時被消費。
我們先來建立OrderListener :
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@KafkaListener(topics = "order")
public void receive(ConsumerRecord<String, String> consumerRecord) throws Exception {
try (AdminClient admin = AdminClient.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers))) {
LOG.info("Received customer order request [{}] from broker [{}]",
consumerRecord.value(),
admin.describeCluster().clusterId().get());
}
latch.countDown();
}
public static CountDownLatch getLatch() {
return latch;
}
接下來,我們來建立PaymentListener:
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@KafkaListener(topics = "payment")
public void receive(ConsumerRecord<String, String> consumerRecord) throws Exception {
try (AdminClient admin = AdminClient.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers))) {
LOG.info("Received payment request [{}] from broker [{}]",
consumerRecord.value(),
admin.describeCluster().clusterId().get());
}
latch.countDown();
}
public static CountDownLatch getLatch() {
return latch;
}
接下來,我們定義一個名為EmbeddedKafkaHolder的簡單實用程式類別。
此單例模式會延遲初始化並公開一個EmbeddedKafkaBroker實例:
public final class EmbeddedKafkaHolder {
private static final EmbeddedKafkaBroker embeddedKafka =
new EmbeddedKafkaKraftBroker(1, 1, "order", "payment")
.brokerListProperty("spring.kafka.bootstrap-servers");
private static volatile boolean started;
public static EmbeddedKafkaBroker getEmbeddedKafka() {
if (!started) {
synchronized (EmbeddedKafkaBroker.class) {
if (!started) {
try {
embeddedKafka.afterPropertiesSet();
} catch (Exception e) {
throw new KafkaException("Embedded broker failed to start", e);
}
started = true;
}
}
}
return embeddedKafka;
}
}
此實用程式設定測試主題,並保證嵌入式 Kafka 代理程式只啟動一次。
它透過維護一個volatile狀態並呼叫afterPropertiesSet()方法來實現這一點,該方法使用配置的屬性啟動代理程式。
接下來,讓我們在測試中使用此設定來重複使用同一個代理程式。
我們先從OrderListenerTest開始:
private static final EmbeddedKafkaBroker broker = EmbeddedKafkaHolder.getEmbeddedKafka();
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@DynamicPropertySource
static void kafkaProps(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", broker::getBrokersAsString);
}
@Test
void givenKafkaBroker_whenOrderMessageIsSent_thenListenerConsumesMessages() {
kafkaTemplate.send("order", "key", "{\"orderId\":%s}".formatted(UUID.randomUUID().toString()));
Awaitility.await()
.atMost(10, TimeUnit.SECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.until(() -> OrderListener.getLatch().getCount() == 0);
}
此測試使用EmbeddedKafkaHolder.getEmbeddedKafka()來擷取共用代理實例。
然後,它將代理詳細資訊注入到spring.kafka.bootstrap-servers屬性中,以便測試應用程式在啟動期間連接到它。
該測試向order主題發送訊息,並等待倒數鎖存器達到零。
這證實監聽器已消費該訊息並記錄了代理詳細資訊。
同樣,我們也可以在任何其他需要的測試中使用同一個代理程式。
例如,我們來看看PaymentListenerTest :
private static final EmbeddedKafkaBroker broker = EmbeddedKafkaHolder.getEmbeddedKafka();
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@DynamicPropertySource
static void kafkaProps(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", broker::getBrokersAsString);
}
@Test
void givenKafkaBroker_whenPaymentMessageIsSent_thenListenerConsumesMessages() {
kafkaTemplate.send("payment", "key", "{\"paymentId\":%s}".formatted(UUID.randomUUID().toString()));
Awaitility.await()
.atMost(10, TimeUnit.SECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.until(() -> PaymentListener.getLatch().getCount() == 0);
}
從日誌中我們可以看到兩個監聽器使用了相同的代理 ID,這證實了代理在各個測試中被重複使用:
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO cbkafka.sharedbroker.OrderListener - Received customer order request [{"orderId":c6e0c5b7-74c9-4dfb-8683-2c584148d21c}] from broker [PVU6g54OQ4GebLKOwjLVEw]
[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO cbksharedbroker.PaymentListener - Received payment request [{"paymentId":49f88030-9a6f-4424-a2bd-b47ee658c8c0}] from broker [PVU6g54OQ4GebLKOwjLVEw]
雖然本範例著重於EmbeddedKafkaBroker ,但我們也可以使用Testcontainers來實現更接近生產環境的設定。
4. 結論
在本文中,我們探討了在多個測試類別中重複使用同一個嵌入式 Kafka 代理程式的有效方法。
本文的程式碼可在 GitHub 上找到。