使用 Eventuate Tram 實作微服務的事務性訊息傳遞
1.概述
在本教程中,我們將探討在資料庫操作和訊息傳遞之間維護資料一致性的挑戰。我們將首先檢查問題,然後實現事務性發件箱模式來解決關鍵問題。
接下來,我們將引入Eventuate Tram ,用於將準備發佈到特定主題的訊息填入寄件匣表中。最後,我們將在自己的 Docker 容器中執行 Eventuate CDC 服務,以監控寄件匣表中的變更並透過 Kafka 發布相應的訊息。
2.我們什麼時候需要事務性訊息傳遞?
就像我們經常依賴資料庫事務來確保資料操作的原子性一樣,我們可能也需要以原子方式將訊息發佈到訊息代理程式。例如,有時我們需要將資料儲存到資料庫,並將訊息發佈到訊息代理,這可以作為一個單一的原子操作。
雖然這看起來很簡單,但它帶來了一些隱藏的挑戰。讓我們用一個簡單的用例來探討這個問題,我們將嘗試將一個Comment
實體保存到資料庫,並將一個事件發佈到 Kafka 主題baeldung.comment.added
。
一種簡單的方法是在事務塊內發布訊息。例如,如果我們使用 Spring Data JPA 進行資料庫操作,並使用KafkaTemplate
傳送訊息,那麼我們的領域服務可能如下所示:
@Service
class CommentService {
private final CommentRepository comments;
private final KafkaTemplate<Long, CommentAddedEvent> kafkaTemplate;
// constructor
@Transactional
public Long save(Comment comment) {
Comment saved = this.comments.save(comment);
log.info("Comment created: {}", saved);
CommentAddedEvent commentAdded = new CommentAddedEvent(saved.getId(), saved.getArticleSlug());
kafkaTemplate.send("baeldung.comment.added", saved.getId(), commentAdded);
}
}
然而,這種方法會在資料庫提交之前發布 Kafka 訊息。換句話說,即使交易在提交時失敗,操作也會被回滾,我們仍然有發送訊息的風險。
另一方面,如果我們嘗試刪除@Transactional
註解,當發佈到 Kafka 失敗時,Spring 將不會回滾資料庫插入。毋庸置疑,這兩種方法都不是理想的,因為它們都可能導致跨系統的資料不一致。
3. 事務性寄件箱模式
我們可以實現事務性發件箱模式來確保系統的最終一致性。此模式涉及在資料變更的相同交易中將訊息儲存到一個特殊的資料庫表(即「寄件匣」)中。
隨後,一個單獨的進程讀取發件箱並將訊息發佈到訊息代理程式。然後,它會更新、刪除或將記錄標記為已發布,以追蹤已發送的內容:
發布事件並更新寄件匣表時也可能發生類似的問題。我們希望避免在事件發布成功之前更新寄件箱,以防止遺失事件。另一方面,如果事件已發送但資料庫更新失敗,系統可能會重試並再次發送該事件。這可能會導致事件重複,但總比丟失好。
總體而言,這種方法確保「至少一次」交付,優先考慮可靠性而不是避免重複。
4. 演示應用程式概述
在本文中,我們將使用一個簡單的 Spring Boot 應用程式來管理像 Baeldung 這樣的部落格網站的文章評論。使用者可以透過向/api/articles/{slug}/comments
端點發送 POST 請求來為文章添加評論:
curl --location "http://localhost:8080/api/articles/oop-best-practices/comments" \
--header "Content-Type: application/json" \
--data "{
\"articleAuthor\": \"Andrey the Author\",
\"text\": \"Great article!\",
\"commentAuthor\": \"Richard the Reader\"
}"
為了快速測試,我們可以使用位於src/rest/resources
的post-comment.bat
腳本來執行此 curl 指令。
當Comment
實體儲存到資料庫時,系統也會發布一則 Kafka 訊息。該訊息包含新保存的Comment
ID 和文章slug
,並發送到名為baeldung.comment.added
的主題。
為了設定本機環境,我們將使用 Docker 啟動 PostgreSQL、Kafka 和 Eventuate CDC 服務的容器。這可以透過位於src/test/resources
目錄下的eventuate-docker-compose.yml
檔案輕鬆完成。我們也將使用eventuate
設定檔在本機啟動 Spring Boot 應用程式:
為了了解實踐中的一切,我們也可以參考我們的整合測試EventuateTramLiveTest
。
5. Eventuate Tram
Eventuate 是一個 Java 平台,支援 CQRS、事件溯源和事務 Sagas 等核心微服務模式。其元件之一**Eventuate Tram 透過事務寄件匣模式和事件發布實現可靠的服務間通訊。**
讓我們在範例中整合 Eventuate Tram,以確保 Kafka 訊息至少一次投遞。首先,我們在pom.xml
中加入[eventuate-tram-spring-jdbc-kafka](https://mvnrepository.com/artifact/io.eventuate.tram.core/eventuate-tram-spring-jdbc-kafka)
和[eventuate-tram-spring-events](https://mvnrepository.com/artifact/io.eventuate.tram.core/eventuate-tram-spring-events)
的必要依賴項:
<dependency>
<groupId>io.eventuate.tram.core</groupId>
<artifactId>eventuate-tram-spring-jdbc-kafka</artifactId>
<version>0.36.0-RELEASE</version>
</dependency>
<dependency>
<groupId>io.eventuate.tram.core</groupId>
<artifactId>eventuate-tram-spring-events</artifactId>
<version>0.36.0-RELEASE</version>
</dependency>
然後,我們將導入兩個配置類別:
@Configuration
@Import({
TramEventsPublisherConfiguration.class,
TramMessageProducerJdbcConfiguration.class
})
class EventuateConfig {
}
此外,我們需要修改CommentAddedEvent
記錄,並確保它實作了 Eventuate 的DomainEvent
介面:
record CommentAddedEvent(Long id, String articleSlug) implements DomainEvent {
}
最後,我們將重構包含所有邏輯的領域服務。這次,我們不再直接發佈到 Kafka,而是使用DomainEventPublisher
bean 來發佈CommentAddedEvent
:
@Service
class CommentService {
private final CommentRepository comments;
private final DomainEventPublisher domainEvents;
// constructor
@Transactional
public Long save(Comment comment) {
Comment saved = this.comments.save(comment);
log.info("Comment created: {}", saved);
CommentAddedEvent commentAdded = new CommentAddedEvent(saved.getId(), saved.getArticleSlug());
domainEvents.publish(
"baeldung.comment.added",
saved.getId(),
singletonList(commentAdded)
);
return saved.getId();
}
}
因此,每當我們持久化一個Comment
實體時,我們也會在同一事務中的eventuate.
message
中插入一個CommentAddedEvent
條目。
讓我們透過連接資料庫並查詢comment
表來驗證這一點:
mydb=# select * from comment;
id | article_slug | comment_author | text
----+-------------------+--------------------+------------------
1 | oop-best-practices | Richard the Reader | Great article!
(1 row)
我們也可以從eventuate
Schema 中查詢message
表。假設 CDC 服務已關閉,我們預計只會檢索到一條標記為未發布的訊息:
mydb=# select id, destination, published from eventuate.message;
id | destination | published
--------------------------------------+----------------------------+-----------
0000019713d8ffe4-e86a640584cf0000 | baeldung.comment.added | 0
(1 row)
6. Eventuate 的 CDC 服務
變更資料擷取 (CDC) 是一種用於偵測和追蹤資料庫中變更(例如插入、更新和刪除)的技術,以便擷取這些變更並將其傳送到其他系統。因此,Eventuate CDC 服務負責擷取寄件匣表的變更,並將其作為事件發佈到我們的訊息代理程式。
Eventuate CDC 服務目前支援多種訊息代理,包括 Apache Kafka、ActiveMQ、RabbitMQ 和 Redis。對於資料庫,它使用高效的事務日誌追蹤功能(透過 binlog 協定與 MySQL 配合使用)以及使用 WAL 協定(與 Postgres配合使用)。對於其他相容於 JDBC 的資料庫,它則採用效率較低的輪詢方法來偵測變更。
如果我們啟動 CDC 服務並重新執行測試,我們會注意到eventuate.messages
表中的條目將標記為已發布:
mydb=# select id, destination, published from eventuate.message;
id | destination | published
--------------------------------------+----------------------------+-----------
0000019713d8ffe4-e86a640584cf0000 | baeldung.comment.added | 1
(1 row)
最後,我們可以使用kafka-console-consumer.sh
來驗證訊息是否已成功發佈到我們的主題:
{
"payload": "{ \"id\": 1, \"articleSlug\": \"oop-best-practices\" }",
"headers": {
"PARTITION_ID": "1",
"event-aggregate-type": "baeldung.comment.added",
"DATE": "Tue, 27 May 2025 22:24:37 GMT",
"event-aggregate-id": "1",
"event-type": "com.baeldung.eventuate.tram.domain.CommentAddedEvent",
"DESTINATION": "baeldung.comment.added",
"ID": "0000019713d8ffe4-e86a640584cf0000"
}
}
如預期的那樣,訊息已送達,寄件匣表也相應更新。
7. 結論
在本文中,我們探討了事務性訊息傳遞的複雜性,首先探討如何以原子方式執行資料庫操作並發佈領域事件。我們發現了一些隱藏的難題,並了解了事務性寄件箱模式如何幫助解決這些問題。
然後,我們使用了 Eventuate Tram 框架,它為我們實作了這個模式。結合 Eventuate CDC 服務(該服務利用變更資料擷取來監控寄件匣表並向 Kafka 發送訊息),我們實現了最終一致性,並保證了系統中至少一次的交付。
與往常一樣,本文中提供的程式碼可在 GitHub 上找到。