持久化查詢模型
一、概述
Axon 框架幫助我們構建事件驅動的微服務系統。在 Axon 框架指南中,我們通過一個簡單的 Axon Spring Boot 應用程序了解了 Axon,該應用程序包括構建一個示例Order
模型供我們更新和查詢。在 Axon 框架中調度查詢時,我們添加了所有支持的查詢。
本文將著眼於持久化 Axon 框架的查詢模型。我們將介紹使用 MongoDB 存儲投影,以及測試的挑戰以及如何使流與查詢模型保持同步。
2.持久性考慮
為了創建一個使用數據庫來持久化查詢模型的處理程序,我們實現了OrdersEventHandler
接口。在生產環境中,我們不想每次都從頭構建查詢模型。通過Axon框架,我們可以選擇如何持久化模型,選擇什麼取決於涉及的數據。如果我們想要自由文本搜索,我們可能想要使用 Elasticsearch。當我們有非結構化數據時,我們可能想使用 MongoDB。當實體之間有很多關係時,我們可能希望使用像 Neo4J 這樣的圖數據庫。
2.1.代幣商店
在通過事件構建查詢模型時,Axon 使用TokenStore
進行跟踪。理想情況下,令牌存儲保存在與查詢模型相同的數據庫中以確保一致性。使用持久性令牌存儲還將確保我們可以運行多個實例,其中每個實例只需要處理部分事件。拆分為多個實例適用於segments ,其中一個實例可以聲明所有或部分段以進行處理。如果我們使用 JPA 或 JDBC 進行持久化,請使用[JpaTokenStore](https://apidocs.axoniq.io/4.6/org/axonframework/eventhandling/tokenstore/jpa/JpaTokenStore.html)
或JdbcTokenStore 。兩種令牌存儲實現都可以在 Axon 框架中使用,無需擴展。
2.2.構建查詢模型
在啟動時, 流式事件處理器將開始從事件存儲中讀取事件。使用持久性TokenStore
,處理器從它之前離開的地方開始。否則,默認情況下,它將從頭開始。對於每個事件,處理器將調用事件處理程序註釋的方法。
讓我們進一步構建訂單應用程序,並允許以多種方式創建和更新訂單。通過更新內存模型在InMemoryOrdersEventHandler
中處理ProductAddedEvent
:
@EventHandler
public void on(ProductAddedEvent event) {
orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
order.addProduct(event.getProductId());
return order;
});
}
這里內存映射中的順序將使用addProduct
函數更新。我們可以將數據存儲在數據庫中,而不是內存模型。
3. Mongo 擴展
讓我們使用 MongoDB 來持久化我們的查詢模型。我們使用Axon 框架 mongo 擴展來持久化 Mongo 中的令牌存儲。由於我們已經添加了[axon-bom](https://search.maven.org/search?q=a:axon-bom)
,因此在將擴展添加到我們的pom.xml
時不需要指定版本:
<dependency>
<groupId>org.axonframework.extensions.mongo</groupId>
<artifactId>axon-mongo</artifactId>
</dependency>
3.1.代幣商店
有了適當的依賴關係,我們可以配置 Axon 以使用MongoTokenStore
:
@Bean
public TokenStore getTokenStore(MongoClient client, Serializer serializer){
return MongoTokenStore.builder()
.mongoTemplate(
DefaultMongoTemplate.builder()
.mongoDatabase(client)
.build()
)
.serializer(serializer)
.build();
}
3.2.事件句柄類
名為mongo
的 Spring Profile 可以在事件處理程序的實現之間切換。當mongo
配置文件處於活動狀態時,將使用MongoOrdersEventHandler
以及令牌存儲配置。這一起構成了事件處理程序類:
@Service
@ProcessingGroup("orders")
@Profile("mongo")
public class MongoOrdersEventHandler implements OrdersEventHandler {
// all methods regarding updating an querying the projection
}
同時,我們在InMemoryOrdersEventHandler
中添加了@Profile("!mongo")
,因此兩者不會同時處於活動狀態。 Spring 配置文件是有條件地啟用組件的絕佳方式。
我們將在構造函數中使用依賴注入來獲取MongoClient
和QueryUpdateEmitter.
我們使用MongoClient
創建 MongoCollection 和索引。我們注入QueryUpdateEmitter
以啟用訂閱查詢:
public MongoOrdersEventHandler(MongoClient client, QueryUpdateEmitter emitter) {
orders = client
.getDatabase(AXON_FRAMEWORK_DATABASE_NAME)
.getCollection(ORDER_COLLECTION_NAME);
orders.createIndex(Indexes.ascending(ORDER_ID_PROPERTY_NAME),
new IndexOptions().unique(true));
this.emitter = emitter;
}
請注意,我們將訂單 ID 設置為唯一。這樣,我們就可以確定不會有兩個文檔具有相同的訂單 ID。
MongoOrdersEventHandler
使用orders
mongo 集合來處理查詢。我們需要使用documentToOrder()
方法將 Mongo 文檔映射到訂單:
@QueryHandler
public List<Order> handle(FindAllOrderedProductsQuery query) {
List<Order> orderList = new ArrayList<>();
orders
.find()
.forEach(d -> orderList.add(documentToOrder(d)));
return orderList;
}
3.3.複雜查詢
為了能夠處理TotalProductsShippedQuery,
我們添加了一個**shippedProductFilter
來過濾出已發貨的訂單並擁有產品:**
private Bson shippedProductFilter(String productId){
return and(
eq(ORDER_STATUS_PROPERTY_NAME, OrderStatus.SHIPPED.toString()),
exists(String.format(PRODUCTS_PROPERTY_NAME + ".%s", productId))
);
}
然後在提取和添加產品計數的查詢處理程序中使用此過濾器:
@QueryHandler
public Integer handle(TotalProductsShippedQuery query) {
AtomicInteger result = new AtomicInteger();
orders
.find(shippedProductFilter(query.getProductId()))
.map(d -> d.get(PRODUCTS_PROPERTY_NAME, Document.class))
.map(d -> d.getInteger(query.getProductId(), 0))
.forEach(result::addAndGet);
return result.get();
}
此查詢將獲取所有已發貨的產品並檢索這些文檔中的所有產品。然後它將計算查詢的特定產品並返回總數。
4. 測試持久化查詢模型
使用持久模型進行測試會使事情變得更加困難,因為我們**希望每個測試都有一個隔離的環境。
**
4.1.單元測試
對於MongoOrdersEventHandler,
我們需要確保刪除集合,這樣我們就不會保留之前測試的狀態。我們通過實現getHandler()
方法來做到這一點:
@Override
protected OrdersEventHandler getHandler() {
mongoClient.getDatabase("axonframework").drop();
return new MongoOrdersEventHandler(mongoClient, emitter);
}
使用@BeforeEach
註釋方法,我們可以確保每個測試都重新開始。在這種情況下,我們使用嵌入式 Mongo 進行測試。另一種選擇是使用測試容器。在這方面,測試查詢模型與其他需要數據庫的應用程序測試沒有什麼不同。
4.2.集成測試
對於集成測試,我們使用類似的方法。但是,由於集成測試使用OrdersEventHandler
接口,我們依賴於已實現的reset()
方法。
reset()
方法的實現是:
@Override
public void reset(List<Order> orderList) {
orders.deleteMany(new Document());
orderList.forEach(o -> orders.insertOne(orderToDocument(o)));
}
reset()
方法確保只有列表中的訂單是集合的一部分。該方法在OrderQueryServiceIntegrationTest
中的每個測試之前執行:
@BeforeEach
void setUp() {
orderId = UUID.randomUUID().toString();
Order order = new Order(orderId);
handler.reset(Collections.singletonList(order));
}
至於測試查詢,我們至少需要一個訂單。通過已經存儲一個訂單,它使測試本身更容易。
5.結論
在本文中,我們展示瞭如何持久化查詢模型。我們學習瞭如何使用 MongoDB 查詢和測試模型。
與往常一樣,本文中使用的完整代碼可在 GitHub 上獲得。