使用 Spring Modulith 進行事件外部化
1. 概述
在本文中,我們將討論在@Transactional
區塊中發布訊息的需求以及相關的效能挑戰,例如延長的資料庫連線時間。為了解決這個問題,我們將利用 Spring Modulith 的功能來監聽 Spring 應用程式事件並自動將它們發佈到 Kafka 主題。
2. 事務操作和訊息代理
對於本文的程式碼範例,我們假設我們正在編寫負責在 Baeldung 上保存Article
的功能:
@Service
class Baeldung {
private final ArticleRepository articleRepository;
// constructor
@Transactional
public void createArticle(Article article) {
validateArticle(article);
article = addArticleTags(article);
// ... other business logic
articleRepository.save(article);
}
}
此外,我們需要將這條新Article
通知系統的其他部分。有了這些訊息,其他模組或服務將做出相應反應,創建報告或向網站讀者發送新聞通訊。
實現此目的的最簡單方法是注入知道如何發布此事件的依賴項。對於我們的範例,讓我們使用KafkaOperations
向「 baeldung.articles.published
」主題發送訊息,並使用Article
的slug()
作為鍵:
@Service
class Baeldung {
private final ArticleRepository articleRepository;
private final KafkaOperations<String, ArticlePublishedEvent> messageProducer;
// constructor
@Transactional
public void createArticle(Article article) {
// ... business logic
validateArticle(article);
article = addArticleTags(article);
article = articleRepository.save(article);
messageProducer.send(
"baeldung.articles.published",
article.slug(),
new ArticlePublishedEvent(article.slug(), article.title())
).join();
}
}
**然而,由於一些不同的原因,這種方法並不理想。從設計的角度來看,我們將領域服務與訊息生產者耦合.
**此外,領域服務直接依賴較低層級的元件,這違反了基本的清潔架構規則之一。
此外,這種方法也會產生效能影響,因為一切都發生在@Transacional
方法中。因此,為保存Article
而獲取的資料庫連線將保持開啟狀態,直到訊息成功發布。
最後,保存實體和發布訊息將作為原子操作完成。換句話說,如果生產者未能發布事件,資料庫事務將會回滾。
3. 使用 Spring 事件進行依賴反轉
我們可以利用 Spring Events 來改進我們解決方案的設計。我們的目標是避免直接從我們的網域服務將訊息發佈到Kafka
。讓我們刪除KafkaOperations
依賴項並發佈內部應用程式事件:
@Service
public class Baeldung {
private final ApplicationEventPublisher applicationEvents;
private final ArticleRepository articleRepository;
// constructor
@Transactional
public void createArticle(Article article) {
// ... business logic
validateArticle(article);
article = addArticleTags(article);
article = articleRepository.save(article);
applicationEvents.publishEvent(
new ArticlePublishedEvent(article.slug(), article.title()));
}
}
除此之外,我們還將有一個專門的 Kafka 生產者作為我們基礎設施層的一部分。該元件將偵聽ArticlePublishedEvent
並將發布委託給底層KafkaOperations
bean:
@Component
class ArticlePublishedKafkaProducer {
private final KafkaOperations<String, ArticlePublishedEvent> messageProducer;
// constructor
@EventListener
public void publish(ArticlePublishedEvent article) {
Assert.notNull(article.slug(), "Article Slug must not be null!");
messageProducer.send("baeldung.articles.published", article.splug(), event);
}
}
透過這種抽象,基礎設施元件現在依賴領域服務產生的事件。換句話說,我們已經成功地減少了耦合並反轉了原始碼依賴。此外,如果其他模組對Article
創建感興趣,它們現在可以無縫監聽這些應用程式事件並做出相應反應。
4. 原子與原子非原子操作
現在,讓我們深入研究性能考慮因素。首先,我們必須確定當與訊息代理的通訊失敗時回滾是否是所需的行為。這種選擇根據具體情況而有所不同。
如果我們不需要這種原子性,則必須釋放資料庫連線並非同步發布事件。為了模擬這一點,我們可以嘗試創建一篇沒有slug,
導致ArticlePublishedKafkaProducer::publish
失敗:
@Test
void whenPublishingMessageFails_thenArticleIsStillSavedToDB() {
var article = new Article(null, "Introduction to Spring Boot", "John Doe", "<p> Spring Boot is [...] </p>");
baeldung.createArticle(article);
assertThat(repository.findAll())
.hasSize(1).first()
.extracting(Article::title, Article::author)
.containsExactly("Introduction to Spring Boot", "John Doe");
}
如果我們現在運行測試,它將失敗。發生這種情況是因為ArticlePublishedKafkaProducer
拋出異常,導致域服務回滾交易。但是,我們可以透過將@EventListener
註解替換為@TransactionalEventListener
和@Async
來使事件監聽器非同步:
@Async
@TransactionalEventListener
public void publish(ArticlePublishedEvent event) {
Assert.notNull(event.slug(), "Article Slug must not be null!");
messageProducer.send("baeldung.articles.published", event);
}
如果我們現在重新執行測試,我們會注意到異常已記錄,事件未發布,並且實體已儲存到資料庫中。而且,資料庫連線釋放得更快,允許其他執行緒使用它。
5. 使用 Spring Modulith 進行事件外部化
我們透過兩步驟方法成功解決了原始程式碼範例的設計和效能問題:
- 使用 Spring 應用程式事件進行依賴反轉
- 利用
@TransactionalEventListener
和@Async
進行非同步發布
Spring Modulith 允許我們進一步簡化程式碼,為該模式提供內建支援。讓我們先將[spring-modulith-events-api](https://central.sonatype.com/artifact/org.springframework.modulith/spring-modulith-events-api/versions)
的 Maven 依賴項加入pom.xml
:
<dependency>
<groupId>org.springframework.modulith</groupId>
<artifactId>spring-modulith-events-api</artifactId>
<version>1.1.2</version>
</dependency>
此模組可以配置為偵聽應用程式事件並自動將它們外部化到各種訊息系統。我們將堅持原來的範例並重點關注 Kafka。對於此集成,我們需要添加[spring-modulith-events-kafka](https://central.sonatype.com/artifact/org.springframework.modulith/spring-modulith-events-kafka/versions)
依賴項:
<dependency>
<groupId>org.springframework.modulith</groupId>
<artifactId>spring-modulith-events-kafka</artifactId>
<version>1.1.2</version>
</dependency>
現在,我們需要更新ArticlePublishedEvent
並使用@Externalized
對其進行註解。此註解需要路由目標的名稱和金鑰。換句話說,Kafka 主題和訊息鍵。對於鍵,我們將使用SpEL
表達式來呼叫Article
:: slug()
:
@Externalized("baeldung.article.published::#{slug()}")
public record ArticlePublishedEvent(String slug, String title) {
}
6. 事件外部化配置
儘管@Externalized
註解的值對於簡潔的 SpEL 表達式很有用,但在某些情況下我們可能希望避免使用它:
- 如果表達式變得過於複雜
- 當我們的目標是將有關主題的資訊與應用程式事件分開時
- 如果我們想要應用程式事件和外部化事件的不同模型
對於這些用例,我們可以使用EventExternalizationConfiguration'
建構器來設定必要的路由和事件對應。之後,我們只需將此配置公開為 Spring bean:
@Bean
EventExternalizationConfiguration eventExternalizationConfiguration() {
return EventExternalizationConfiguration.externalizing()
.select(EventExternalizationConfiguration.annotatedAsExternalized())
.route(
ArticlePublishedEvent.class,
it -> RoutingTarget.forTarget("baeldung.articles.published").andKey(it.slug())
)
.mapping(
ArticlePublishedEvent.class,
it -> new ArticlePublishedKafkaEvent(it.slug(), it.title())
)
.build();
}
在這種情況下,我們將從ArticlePublishedEvent
中刪除路由信息,並保留@Externalized
註釋,使其不具有任何值:
@Externalized
public record ArticlePublishedEvent(String slug, String title) {
}
七、結論
在本文中,我們討論了需要我們從事務區塊內發布訊息的場景。我們發現這種模式可能會對效能產生很大的影響,因為它可能會長時間阻塞資料庫連線。
之後,我們使用 Spring Modulith 的功能來監聽 Spring 應用程式事件並自動發佈到 Kafka 主題。這種方法使我們能夠非同步外部化事件並更快地釋放資料庫連線。
完整的源代碼可以在 GitHub 上找到。