通過 REST 使用和測試 Axon 應用程序
一、概述
Axon Framework 幫助我們構建事件驅動的微服務系統。在 Axon 框架指南中,我們通過簡單的 Axon Spring Boot 應用程序了解了 Axon。該應用程序可以創建和更新訂單,還可以確認和運送這些訂單。
在 Axon Framework 中的調度查詢中,我們向OrderQueryService
添加了更多查詢。
查詢通常用於 UI,通常調用 REST 端點。
在本教程中,我們將為所有查詢創建 REST 端點。我們還將使用來自集成測試的這些端點。
2. 在 REST 端點中使用查詢
我們可以通過向@RestController
註釋類添加函數來添加 REST 端點。為此,我們將使用類OrderRestEndpoint
。以前我們直接在控制器中使用QueryGateway
。我們將為QueryGateway
替換注入的OrderQueryService,
這是我們在 Axon Framework 中的 Dispatching Queries 中實現的。這樣,控制器功能唯一關心的是將行為綁定到 REST 路徑。
所有端點都列在項目的order-api.http
文件中。多虧了這個文件,當使用 IntelliJ 作為我們的 IDE 時,端點是可調用的。
2.1.點對點查詢
點對點查詢只有一個響應,因此很容易實現:
@GetMapping("/all-orders")
public CompletableFuture<List<OrderResponse>> findAllOrders() {
return orderQueryService.findAllOrders();
}
Spring 等待CompletableFuture
被解析並以 JSON 格式的負載響應。我們可以通過調用localhost:8080/all-orders
來測試這一點,以獲取一個數組中的所有訂單。
從一個乾淨的設置中,如果我們首先使用一個帖子添加兩個訂單到http://localhost:8080/order/666a1661-474d-4046-8b12-8b5896312768
和http://localhost:8080/ship-order
,我們應該看到當我們在http://localhost:8080/all-orders
上調用 get 時的以下內容:
[
{
"orderId": "72d67527-a27c-416e-a904-396ebf222344",
"products": {
"Deluxe Chair": 1
},
"orderStatus": "SHIPPED"
},
{
"orderId": "666a1661-474d-4046-8b12-8b5896312768",
"products": {},
"orderStatus": "CREATED"
}
]
2.2.流式查詢
流式查詢將返回事件流並最終關閉。我們可以等待流關閉並在完成後發送響應。但是,直接流式傳輸效率更高。我們通過使用服務器發送事件來做到這一點:
@GetMapping(path = "/all-orders-streaming", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<OrderResponse> allOrdersStreaming() {
return orderQueryService.allOrdersStreaming();
}
通過添加媒體類型,Spring 了解我們希望將響應作為服務器發送事件。這意味著每個訂單都是單獨發送的。如果客戶端支持server-send事件, localhost:8080/all-orders-streaming
會一一返回所有的訂單。
在數據庫中擁有與點對點查詢相同的項目將得到如下結果:
data:{"orderId":"72d67527-a27c-416e-a904-396ebf222344","products":{"Deluxe Chair":1},"orderStatus":"SHIPPED"}
data:{"orderId":"666a1661-474d-4046-8b12-8b5896312768","products":{},"orderStatus":"CREATED"}
這些都是單獨的服務器發送事件。
2.3.分散-聚集查詢。
組合返回到 Axon 查詢的響應的邏輯已經存在於OrderQueryService
中。這使得實現與點對點查詢非常相似,因為只有一個響應。例如,要使用分散聚集查詢添加端點:
@GetMapping("/total-shipped/{product-id}")
public Integer totalShipped(@PathVariable("product-id") String productId) {
return orderQueryService.totalShipped(productId);
}
調用http://localhost:8080/total-shipped/Deluxe Chair
返回運送的椅子總數,包括來自LegacyQueryHandler
的 234。如果來自ship-order
調用的那個仍在數據庫中,它應該返回 235。
2.4.訂閱查詢
與流式查詢相反,訂閱查詢可能永遠不會結束。因此,等待訂閱查詢完成是不可取的。我們將再次利用服務器發送事件來添加端點:
@GetMapping(path = "/order-updates/{order-id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<OrderResponse> orderUpdates(@PathVariable("order-id") String orderId) {
return orderQueryService.orderUpdates(orderId);
}
調用http://localhost:8080/order-updates/666a1661-474d-4046-8b12-8b5896312768
將為我們提供該產品的更新流。通過調用http://localhost:8080/order/666a1661-474d-4046-8b12-8b5896312768/product/a6aa01eb-4e38-4dfb-b53b-b5b82961fbf3dd
,我們觸發了更新。該更新作為服務器發送事件發送。
我們將同時看到初始狀態和更新後的狀態。連接保持打開狀態以接收進一步的更新。
data:{"orderId":"666a1661-474d-4046-8b12-8b5896312768","products":{},"orderStatus":"CREATED"}
data:{"orderId":"666a1661-474d-4046-8b12-8b5896312768","products":{"a6aa01eb-4e38-4dfb-b53b-b5b82961fbf3":1},"orderStatus":"CREATED"}
如我們所見,更新包含我們添加的產品。
3. 集成測試
對於集成測試,讓我們使用 WebClient。
對於這些測試,我們將使用**@SpringBootTest**
運行整個應用程序,首先使用其他 REST 端點更改狀態。這些其他 REST 端點觸發一個或多個命令來創建一個或多個事件。要創建訂單,我們將使用 Axon 框架指南中添加的端點。我們在每個測試中使用@DirtiesContext
註釋,因此在一個測試中創建的事件不會影響另一個。
我們在src/test/resources
的application.properties
中設置axon.axonserver.enabled=false
,而不是在集成測試期間運行 Axon Server。這樣,我們將使用非分佈式網關,它運行速度更快並且不需要 Axon 服務器。網關是處理三種不同類型消息的實例。
我們可以創建一些輔助方法來使我們的測試更具可讀性。這些輔助函數提供正確的類型並在需要時設置 HTTP 標頭。例如:
private void verifyVoidPost(WebClient client, String uri) {
StepVerifier.create(retrieveResponse(client.post()
.uri(uri)))
.verifyComplete();
}
這對於調用具有 void 返回類型的 post 端點很有用。它將使用*retrieveResponse()*輔助函數來執行調用並驗證它是否完成。這些東西經常使用,需要幾行代碼。我們通過將測試放在輔助函數中來使測試更具可讀性和可維護性。
3.1.測試點對點查詢
為了測試/all-orders
REST 端點,讓我們創建一個訂單,然後驗證我們是否可以檢索創建的訂單。為了能夠做到這一點,我們首先需要創建一個WebClient
。 Web 客戶端是一個反應式實例,我們可以使用它來進行 HTTP 調用。調用創建訂單後,我們獲取所有訂單並驗證結果:
WebClient client = WebClient.builder()
.clientConnector(httpConnector())
.build();
createRandomNewOrder(client);
StepVerifier.create(retrieveListResponse(client.get()
.uri("http://localhost:" + port + "/all-orders")))
.expectNextMatches(list -> 1 == list.size() && list.get(0)
.getOrderStatus() == OrderStatusResponse.CREATED)
.verifyComplete();
由於它是反應式的,我們可以使用reactor-test中的StepVerifier
來驗證響應。
我們希望列表中只有一個Order
,即我們剛剛創建的訂單。此外,我們希望Order
具有 CREATED 訂單狀態。
3.2.測試流式查詢
流式查詢可能會返回多個訂單。我們還想測試流是否完成。為了測試,我們將創建三個新的隨機訂單,然後測試流式查詢響應:
WebClient client = WebClient.builder()
.clientConnector(httpConnector())
.build();
for (int i = 0; i < 3; i++) {
createRandomNewOrder(client);
}
StepVerifier.create(retrieveStreamingResponse(client.get()
.uri("http://localhost:" + port + "/all-orders-streaming")))
.expectNextMatches(o -> o.getOrderStatus() == OrderStatusResponse.CREATED)
.expectNextMatches(o -> o.getOrderStatus() == OrderStatusResponse.CREATED)
.expectNextMatches(o -> o.getOrderStatus() == OrderStatusResponse.CREATED)
.verifyComplete();
通過最後的verifyComplete()
,我們確保流已關閉。我們應該注意,有可能以不完成的方式實現流式查詢。在這種情況下,確實如此,驗證它很重要。
3.3.測試分散-聚集查詢
要測試分散-聚集查詢,我們需要確保組合多個處理程序的結果。我們使用端點運送一把椅子。然後我們取回所有運送的椅子。由於LegacyQueryHandler
為椅子返回 234,因此結果應為 235 。
WebClient client = WebClient.builder()
.clientConnector(httpConnector())
.build();
verifyVoidPost(client, "http://localhost:" + port + "/ship-order");
StepVerifier.create(retrieveIntegerResponse(client.get()
.uri("http://localhost:" + port + "/total-shipped/Deluxe Chair")))
.assertNext(r -> assertEquals(235, r))
.verifyComplete();
retrieveIntegerResponse()
輔助函數從響應主體返回一個整數。
3.4.測試訂閱查詢
只要我們不關閉連接,訂閱查詢就會保持活動狀態。我們想測試初始結果和更新。因此,我們使用ScheduledExecutorService
以便我們可以在測試中使用多個線程。該服務允許從一個Thread
更新訂單,同時驗證另一個線程中返回的訂單。為了使其更具可讀性,我們使用不同的方法進行更新:
private void addIncrementDecrementConfirmAndShipProduct(String orderId, String productId) {
WebClient client = WebClient.builder()
.clientConnector(httpConnector())
.build();
String base = "http://localhost:" + port + "/order/" + orderId;
verifyVoidPost(client, base + "/product/" + productId);
verifyVoidPost(client, base + "/product/" + productId + "/increment");
// and some more
}
該方法創建並使用自己的 Web 客戶端實例,以不干擾用於驗證響應的實例。
實際測試將從執行程序調用它並驗證更新:
//Create two webclients, creating the id's for the test, and create an order.
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.schedule(() -> addIncrementDecrementConfirmAndShipProduct(orderId, productId), 1L, TimeUnit.SECONDS);
try {
StepVerifier.create(retrieveStreamingResponse(receiverClient.get()
.uri("http://localhost:" + port + "/order-updates/" + orderId)))
.assertNext(p -> assertTrue(p.getProducts().isEmpty()))
//Some more assertions.
.assertNext(p -> assertEquals(OrderStatusResponse.SHIPPED, p.getOrderStatus()))
.thenCancel()
.verify();
} finally {
executor.shutdown();
}
我們應該注意,我們在更新前等待一秒鐘,以確保我們不會錯過第一次更新。我們使用隨機UUID
來生成productId
,它用於更新和驗證結果。每次更改都應觸發更新。
根據更新後的預期狀態,我們添加一個斷言。我們需要調用thenCancel()
來結束測試,因為訂閱將在沒有它的情況下保持打開狀態。 finally
塊用於確保我們始終關閉執行程序。
4。結論
在本文中,我們學習瞭如何為查詢添加 REST 端點。這些可用於構建 UI。
我們還學習瞭如何使用WebClient
測試這些端點。
所有這些示例和代碼片段的實現都可以在 GitHub 上找到。
有關此主題的任何其他問題,另請查看討論 AxonIQ 。