微米觀察和 Spring Kafka
1.概述
在本教學中,我們將使用 Micrometer 和 Spring Boot Actuator 來探索 Spring Kafka 提供的監控功能。我們首先會查看 Apache Kafka 為生產者和消費者公開的本機指標,這些指標可提供有關效能、吞吐量、錯誤和延遲的寶貴見解。
接下來,我們將深入研究spring.kafka.listener
和spring.kafka.template
下所揭露的 Spring 特定指標。我們還將學習如何自訂@KafkaListener
和KafkaTemplate
以透過 Spring 配置為這些指標添加自訂標籤。
最後,我們將討論追蹤並了解 Spring Kafka 如何輕鬆傳播 Micrometer 產生的追蹤訊息。這使我們能夠追蹤和關聯訊息,以便更好地調試和監控。
2. 設定環境
對於本文的程式碼範例,我們假設我們正在為部落格和學習網站(如 Baeldung)創建後端應用程式。
我們將使用一個使用spring-kafka
和spring-boot-starter-actuator
依賴項的簡單 Spring Boot 應用程式:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
在src/test/resources/docker
資料夾中,我們可以找到一個docker-compose.yml
文件,該文件可用於在本地啟動應用程式。
此外,該應用程式配置為透過執行器端點公開所有指標:
management.endpoints.web.exposure.include: '*'
此時,我們應該能夠在本地啟動應用程序,訪問[http://localhost:8081/actuator](http://localhost:8081/actuator) ,
並查看所有公開的端點和指標:
在功能方面,我們的服務提供了一個 HTTP 端點來建立文章評論。提交評論後,應用程式會向 Kafka 主題baeldung.article-comment.added
發送一條訊息,這讓我們可以追蹤 Kafka 生產者指標。
稍後,我們將設定一個 Kafka 監聽器來使用來自相同主題的訊息。這將幫助我們了解如何監控 Kafka 監聽器以及它公開的指標。
3. 原生 Kafka 指標
只要加入spring-boot-starter-actuator
依賴項,我們就會**開箱即用地公開一些 Kafka 指標。**首先,讓我們專注於 Kafka 生產者和消費者的本機指標。
3.1.生產者指標
我們需要讓應用程式產生一些事件,所以讓我們透過 REST API 建立一些評論:
curl --location 'http://localhost:8081/api/articles/oop-best-practices/comments' \
--header 'Content-Type: application/json' \
--data '{
"articleAuthor": "Andrey the Author",
"comment": "Great article!",
"commentAuthor": "Richard the Reader"
}'
之後,我們可以再檢查[http://localhost:8081/actuator/metrics](http://localhost:8081/actuator/metrics)
端點。這次,我們將看到許多 Kafka 生產者指標,包括延遲和故障率等值:
-
kafka.producer.record.error.rate
– 記錄傳送失敗的頻率 -
kafka.producer.request.latency.avg
– 完成生產請求所需的平均時間 -
kafka.producer.buffer.exhausted.rate
– 生產者耗盡緩衝區空間的頻率 -
kafka.producer.record.send.rate
– 記錄傳送到代理程式的速率 -
kafka.producer.requests.in.flight
– 目前正在執行的未確認生產請求的數量
為了深入了解,我們可以在Apache Kafka 文件中找到所有 Kafka 生產者指標的完整清單。
不用說,我們可以透過將其名稱附加到路徑來探索每個指標。例如,讓我們存取監控生產者錯誤率的端點:
3.2.消費者指標
與生產者指標類似,Micrometer 記錄與 Kafka 消費者相關的指標並透過執行器公開它們。
為了說明這一點,讓我們為我們的應用程式添加一個@KafkaListener
註解。為了簡單起見,它將監聽相同的baeldung.article-comment.added
主題:
@Component
public class ArticleCommentsListener {
@KafkaListener(topics = "baeldung.article-comment.added")
public void onArticleComment(ArticleCommentAddedEvent event) {
// some logic here...
}
}
現在,我們可以運行應用程序,發送更多請求,然後再次檢查執行器指標。這次,我們應該會看到幾個與 Kafka 消費者相關的指標。除此之外,我們會注意到以下指標:
-
kafka.consumer.fetch.manager.records.lag
– 消費者落後多少 -
kafka.consumer.fetch.manager.fetch.latency.avg
– 從代理商取得資料的平均時間 -
kafka.consumer.coordinator.rebalance.rate.per.hour
– 消費者群組重新平衡發生的頻率 -
kafka.consumer.last.poll.seconds.ago
– 自消費者上次輪詢以來的時間 -
kafka.consumer.time.between.poll.avg
– 連續輪詢調用之間的平均時間
我們期望找到的消費者屬性的完整清單可以在Kafka 的官方文件中找到。
3.3.新增自訂標籤
Spring Kafka 還具有內建 API,可使用自訂標籤豐富本機指標。例如,我們可以透過在ProducerFactoryÂ
bean中加入監聽器來客製化生產者指標:
@Bean
ProducerFactory<String, ArticleCommentAddedEvent> producerFactory(
KafkaProperties kafkaProperties, MeterRegistry meterRegistry
) {
ProducerFactory pf = new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
pf.addListener(
new MicrometerProducerListener<String, String>(
meterRegistry,
Collections.singletonList(new ImmutableTag("app-name", "article-comments-app"))
)
);
return pf;
}
我們可以看到,這為所有 Kafka 生產者指標添加了一個自訂名稱。此外,我們可以修改ConsumerFactory,
新增MicrometerConsumerListener
來為消費者指標新增自訂標籤。
4.監控KafkaTemplate
在我們的演示應用程式中,我們使用KafkaTemplate
bean 將訊息發佈到 Kafka。除了生產者指標之外,Micrometer 還記錄特定於KafkaTemplate
指標,並以spring.kafka.template
指標名稱公開它們。
我們可以自訂KafkaTemplate
bean 來新增或更新與該指標相關的標籤。例如, setMicrometerTags()
方法允許我們將標籤定義為鍵值對,並將它們附加到特定的kafkaTemplate
bean:
@Bean
@Qualifier("articleCommentsKafkaTemplate")
KafkaTemplate<String, ArticleCommentAddedEvent> articleCommentsKafkaTemplate(
ProducerFactory<String, ArticleCommentAddedEvent> producerFactory
) {
var template = new KafkaTemplate<>(producerFactory);
template.setMicrometerTags(Map.of(
"topic", "baeldung.article-comment.added"
));
return template;
}
此外,我們可以使用setMicrometerTagsProvider()
為給定記錄動態產生標籤。讓我們使用此方法提取記錄的密鑰並將其附加為標籤:
template.setMicrometerTagsProvider(
record -> Map.of("article-slug", record.key().toString())
);
我們現在可以為不同的文章添加更多評論,然後驗證http://localhost:8081/actuator/metrics/spring.kafka.template
端點:
正如預期的那樣,數據包括有關KafkaTemplate
性能的所有記錄資訊以及我們的自訂標籤。
5.監控KafkaListener
與KafkaTemplate
一樣,Micrometer 也監視KafkaListener
並在spring.kafka.listener
名稱下公開相關指標。 Spring Kafka 維護一致的 API,可以輕鬆為監聽器指標配置自訂標籤。
setMicrometerTags()
和setMicrometerTagsProvider()
方法可用來附加這些自訂 Micrometer 標籤,並在ConcurrentKafkaListenerContainerFactory
層級進行設定:
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> customKafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory
) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
ContainerProperties containerProps = factory.getContainerProperties();
containerProps.setMicrometerTags(Map.of(
"app-name", "article-comments-app"
));
containerProps.setMicrometerTagsProvider(
record -> Map.of("article-slug", record.key().toString())
);
return factory;
}
此外,我們需要更新@KafkaListenerÂ
註解並將其指向修改後的containerFactory:
@KafkaListener(
topics = "baeldung.article-comment.added",
containerFactory = "customKafkaListenerContainerFactory"
)
public void onArticleComment(ArticleCommentAddedEvent event) {
// ...
}
因此,Micrometer 將我們的靜態和動態標籤附加到指標,Actuator 透過http://localhost:8081/actuator/metrics
/ spring.kafka.listener
端點公開它們。
6. 追蹤 Kafka 訊息
Micrometer 的追蹤功能透過向日誌添加追蹤資訊來幫助我們追蹤請求的流程。這使得調試和監控變得更加容易。
然後,我們將使用 Spring Kafka 的內建功能,透過訊息**元資料輕鬆地在我們的系統中傳播追蹤上下文。**
6.1.豐富日誌
Micrometer 使用映射診斷上下文 (MDC) 以兩個 ID 的形式儲存追蹤詳細資訊: traceId
和spanId
。
我們可以透過在 REST 控制器中放置一個斷點並在程式碼暫停時評估表達式MDC.getCopyOfContextMap()
來觀察這一點:
MDC 內部有兩個字段,使我們能夠輕鬆地使用追蹤資訊豐富我們的日誌。讓我們在logback.xml
中配置它:
因此,只要traceId
和spanId
出現在 MDC 上下文中,它們就會記錄在執行緒名稱旁邊。
6.2.傳播上下文
在我們的應用程式中添加追蹤功能的真正好處是能夠在整個系統中傳播追蹤訊息並關聯不同的元件。換句話說,我們需要使用元資料(例如 HTTP 請求頭或 Kafka 訊息頭)來傳遞traceId
和spanId
資訊。
首先,讓我們來看看我們的KafkaTemplate
。對於每個訊息,我們應該對其進行自訂,以從 MDC 中提取traceId
並將其新增為自訂標頭。幸運的是,此功能已受支援 - 我們只需透過呼叫setObservationEnabled(true)
來啟用它。讓我們將其應用到我們的KafkaTemplate
bean 中:
@Bean
KafkaTemplate<String, ArticleCommentAddedEvent> articleCommentsKafkaTemplate(
ProducerFactory<String, ArticleCommentAddedEvent> producerFactory)
{
var template = new KafkaTemplate<>(producerFactory);
template.setObservationEnabled(true);
// other config ...
return template;
}
因此, KafkaTemplate
現在將追蹤資訊作為訊息頭添加,鍵為「 traceparent
」。
在監聽器端,我們已經可以看到新的traceparent
標頭,但我們仍然需要解析它並將其新增至 MDC。與生產者類似,如果我們在容器層級啟用觀察,Spring Kafka 可以為我們處理這個問題:
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> customKafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory
) {
var factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
ContainerProperties containerProps = factory.getContainerProperties();
containerProps.setObservationEnabled(true);
// other config...
return factory;
}
透過此設置,我們可以在本地運行應用程序,發送一些請求,並追蹤從初始 HTTP 請求到 Kafka 訊息的流程,然後由偵聽器在不同線程中處理該訊息。這使我們能夠關聯整個流程:
[http-nio-8081-exec-2] __680df9d4fcab49ea0511b54ff0f3ce9f__0511b54ff0f3ce9f INFO HTTP Request received to save article comment: ArticleCommentAddedDto[...]
[org.s.kafka...#0-0-C-1] __680df9d4fcab49ea0511b54ff0f3ce9f__de00d94a8258a1b9 INFO Kafka Message Received: Comment Added: ArticleCommentAddedEvent[...]
不用說,這只是一個簡單的例子,但該功能對於跨具有多個服務的更複雜系統追蹤請求非常有用。
7. 結論
在本文中,我們探討了 Spring Kafka 提供的監控功能。透過將 Apache Kafka 的原生指標與 Spring Kafka 和 Micrometer 提供的擴展監控支援結合,我們可以全面了解訊息系統的健康狀況和效能。
本機指標為我們提供了低階操作洞察力,而 Spring 特定的指標則允許更多的上下文和應用程式感知的可觀察性。最後,我們學習如何使用客製化的標籤來豐富每個指標。
最後,我們學習如何啟用追蹤並使用traceId
和spanId
資訊豐富我們的日誌。我們客製化了 Spring Kafka bean 來透過traceparent
訊息標頭傳播這些字段,這使我們能夠追蹤跨不同元件的訊息流。
與往常一樣,本文中提供的程式碼可在 GitHub 上找到。