測試Kafka和Spring Boot
1.概述
Apache Kafka是一個功能強大的分佈式容錯流處理系統。
在本教程中,學習如何編寫不依賴於運行外部Kafka服務器的可靠,獨立的集成測試**。
首先,我們將開始研究如何使用和配置Kafka的嵌入式實例。然後,我們將看到如何利用測試中流行的框架Testcontainers 。
2.依存關係
當然,我們需要將標準的spring-kafka
依賴項添加到我們的pom.xml
:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.3.RELEASE</version>
</dependency>
然後,我們將需要另外兩個依賴項專門用於我們的測試。首先,我們將添加spring-kafka-test
工件:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>2.6.3.RELEASE</version>
<scope>test</scope>
</dependency>
最後,我們將添加Testcontainers Kafka依賴項,該依賴項也可以在Maven Central上使用:
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.15.0</version>
<scope>test</scope>
</dependency>
現在我們已經配置了所有必需的依賴項,我們可以使用Kafka編寫一個簡單的Spring Boot應用程序。
3.一個簡單的Kafka生產者-消費者應用程序
在整個教程中,我們測試的重點將是一個簡單的生產者-消費者Spring Boot Kafka應用程序。
讓我們從定義應用程序入口點開始:
@SpringBootApplication
@EnableAutoConfiguration
public class KafkaProducerConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaProducerConsumerApplication.class, args);
}
}
如我們所見,這是一個標準的Spring Boot應用程序。在可能的情況下,我們希望使用默認配置值。考慮到這一點,我們利用@EnableAutoConfiguration
批註自動配置應用程序。
3.1。生產者設置
接下來,讓我們考慮一個生產者bean,我們將使用它將消息發送到給定的Kafka主題:
@Component
public class KafkaProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, String payload) {
LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
上面定義的我們的KafkaProducer
bean僅僅是KafkaTemplate
類的包裝。此類提供高級線程安全操作,例如將數據發送到所提供的主題,這正是我們在send
方法中所做的。
3.2。消費者設置
同樣,我們現在將定義一個簡單的消費者Bean,它將監聽Kafka主題並接收消息:
@Component
public class KafkaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
private CountDownLatch latch = new CountDownLatch(1);
private String payload = null;
@KafkaListener(topics = "${test.topic}")
public void receive(ConsumerRecord<?, ?> consumerRecord) {
LOGGER.info("received payload='{}'", consumerRecord.toString());
setPayload(consumerRecord.toString());
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
public String getPayload() {
return payload;
}
}
我們的簡單使用者在receive
方法上使用@KafkaListener
批註來偵聽有關給定主題的消息。稍後我們將看到如何從測試中配置test.topic
。
此外,receive方法將消息內容存儲在我們的bean中,並減少latch
變量的計數。此變量是一個簡單的線程安全計數器字段,稍後我們將在測試中使用該字段,以確保我們成功接收到message 。
現在,我們已經使用Spring Boot實現了簡單的Kafka應用程序,讓我們看看如何編寫集成測試。
4.關於測試的話
通常,在編寫簡潔的集成測試時,我們不應依賴於我們可能無法控製或突然停止工作的外部服務。這可能會對我們的測試結果產生不利影響。
同樣,如果我們依賴外部服務(在這種情況下為運行中的Kafka經紀人),則可能無法按照我們希望通過測試的方式對其進行設置,控制和拆除。
4.1。應用屬性
我們將從測試中使用一組非常簡單的應用程序配置屬性。我們將在src/test/resources/application.yml
文件中定義以下屬性:
spring:
kafka:
consumer:
auto-offset-reset: earliest
group-id: baeldung
test:
topic: embedded-test-topic
這是使用Kafka嵌入式實例或本地代理時所需的最少屬性集。
其中大多數是不言自明的,但是我們應該特別強調的一個是消費者財產的auto-offset-reset: earliest
。此屬性可確保我們的使用者組獲取我們發送的消息,因為容器可能在發送完成後啟動。
另外,我們為主題屬性配置了值embedded-test-topic
,這是我們將在測試中使用的主題。
5.使用嵌入式Kafka進行測試
在本節中,我們將研究如何使用內存中的Kafka實例對我們的測試進行測試。這也稱為嵌入式Kafka。
我們之前添加的依賴項spring-kafka-test
包含一些有用的實用程序,以幫助測試我們的應用程序。最值得注意的是,它包含EmbeddedKafkaBroker
類。
考慮到這一點,讓我們繼續編寫我們的第一個集成測試:
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class EmbeddedKafkaIntegrationTest {
@Autowired
private KafkaConsumer consumer;
@Autowired
private KafkaProducer producer;
@Value("${test.topic}")
private String topic;
@Test
public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived()
throws Exception {
producer.send(topic, "Sending with own simple KafkaProducer");
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(consumer.getLatch().getCount(), equalTo(0L));
assertThat(consumer.getPayload(), containsString("embedded-test-topic"));
}
}
讓我們來看一下測試的關鍵部分。首先,我們先用兩個漂亮的標準Spring註釋裝飾測試類:
-
@SpringBootTest
批註將確保我們的測試引導Spring應用程序上下文 - 我們還使用
@DirtiesContext
批註,以確保清除此上下文並在不同測試之間重置
至關重要的部分到了,我們使用@EmbeddedKafka
批註將EmbeddedKafkaBroker
的實例注入我們的測試中。此外,我們可以使用一些屬性來配置嵌入式Kafka節點:
-
partitions
–這是每個主題使用的分區數。為了使事情變得簡單明了,我們只希望在測試中使用一個 -
brokerProperties
– Kafka經紀人的其他屬性。同樣,我們保持簡單,並指定純文本偵聽器和端口號
接下來,我們自動連接consumer
和producer
類,並配置一個主題以使用application.properties
的值。
對於拼圖的最後一部分,我們僅向測試主題發送一條消息,並驗證是否已收到該消息並包含測試主題的名稱。
運行測試時,我們將在冗長的Spring輸出中看到:
...
12:45:35.099 [main] INFO cbkafka.embedded.KafkaProducer -
sending payload='Sending with our own simple KafkaProducer' to topic='embedded-test-topic'
...
12:45:35.103 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]
INFO cbkafka.embedded.KafkaConsumer - received payload=
'ConsumerRecord(topic = embedded-test-topic, partition = 0, leaderEpoch = 0, offset = 1,
CreateTime = 1605267935099, serialized key size = -1,
serialized value size = 41, headers = RecordHeaders(headers = [], isReadOnly = false),
這確認我們的測試正常運行。太棒了!現在,我們有一種使用內存中的Kafka代理編寫獨立的,獨立的集成測試的方法。
6.使用TestContainers測試Kafka
有時,我們可能會看到真正的外部服務與專門為測試目的提供的服務的嵌入式內存實例之間的細微差異。儘管不太可能,但也有可能是我們測試中使用的端口被佔用,從而導致故障。
考慮到這一點,在本節中,我們將看到我們以前使用Testcontainers框架進行測試的方法的一種變化。我們將通過集成測試了解如何實例化和管理託管在Docker容器中的外部Apache Kafka代理。
讓我們定義另一個集成測試,該測試與上一節中看到的非常相似:
@RunWith(SpringRunner.class)
@Import(com.baeldung.kafka.testcontainers.KafkaTestContainersLiveTest.KafkaTestContainersConfiguration.class)
@SpringBootTest(classes = KafkaProducerConsumerApplication.class)
@DirtiesContext
public class KafkaTestContainersLiveTest {
@ClassRule
public static KafkaContainer kafka =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));
@Autowired
private KafkaConsumer consumer;
@Autowired
private KafkaProducer producer;
@Value("${test.topic}")
private String topic;
@Test
public void givenKafkaDockerContainer_whenSendingtoSimpleProducer_thenMessageReceived()
throws Exception {
producer.send(topic, "Sending with own controller");
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(consumer.getLatch().getCount(), equalTo(0L));
assertThat(consumer.getPayload(), containsString("embedded-test-topic"));
}
}
讓我們看看這次的差異。我們聲明的是kafka
字段,它是標準的JUnit @ClassRule
。該字段是KafkaContainer
類的實例,該類將準備和管理運行Kafka的容器的生命週期。
為了避免端口衝突,當我們的Docker容器啟動時,Testcontainers會動態分配端口號。因此,我們使用KafkaTestContainersConfiguration
類提供自定義的使用者和生產者工廠配置:
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung");
// more standard configuration
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
// more standard configuration
return new DefaultKafkaProducerFactory<>(configProps);
}
然後,我們在測試開始時通過@Import
批註引用此配置。
原因是我們需要一種將服務器地址注入到應用程序中的方法,如前所述,該地址是動態生成的。我們通過調用getBootstrapServers()
方法實現此目的,該方法將返回引導服務器位置:
bootstrap.servers = [PLAINTEXT://localhost:32789]
現在,當我們運行測試時,我們應該看到Testcontainers做以下幾件事:
- 檢查我們的本地Docker設置。
- 必要時
confluentinc/cp-kafka:5.4.3
docker映像 - 啟動一個新容器並等待其準備就緒
- 最後,在測試完成後關閉並刪除容器
再次通過檢查測試輸出來確認:
13:33:10.396 [main] INFO ? [confluentinc/cp-kafka:5.4.3]
- Creating container for image: confluentinc/cp-kafka:5.4.3
13:33:10.454 [main] INFO ? [confluentinc/cp-kafka:5.4.3]
- Starting container with ID: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3
13:33:10.785 [main] INFO ? [confluentinc/cp-kafka:5.4.3]
- Container confluentinc/cp-kafka:5.4.3 is starting: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3
快點!使用Kafka docker容器的有效集成測試。
7.結論
在本文中,我們了解了幾種使用Spring Boot測試Kafka應用程序的方法。在第一種方法中,我們看到瞭如何配置和使用本地內存Kafka代理。
然後,我們從測試中看到瞭如何使用Testcontainers來設置在docker容器內運行的外部Kafka代理。