基於 Spring Boot 的時間工作流程引擎
1. 引言
在本教程中,我們將探討如何將Temporal Workflow Engine與 Spring Boot 整合程式庫(作為其 Java SDK 的一部分)一起使用。
2. 快速回顧
Temporal 是一個強大的工作流程引擎,它透過確定性執行來高度重視彈性。
我們在「入門」教程中已經介紹了 Temporal 的基本概念,所以這裡我們只重點介紹它的主要功能:
- 基於集中式服務的執行時間架構,該服務負責協調工作流程的執行,這些工作流程「駐留在」分散式工作實例上。
- 工作流程:定義完成給定業務案例的步驟順序,以及更新和/或查詢其狀態的方法。
- 活動:執行與特定步驟相對應的操作
工作流程和活動只是開發人員必須使用相應的 SDK,以受支援的語言之一編寫的程式碼。
3. Temporal 的 Spring Boot 整合概述
Spring Boot Integration 模組為使用 Spring 框架的專案提供了一些提升使用者體驗的功能,是對基礎 Java SDK 的補充:
- 利用標準 DI 功能自動註冊工作流程和活動
- 工作隊列的聲明式配置
- 自動配置的
WorkflowClient實例可作為常規 bean 在整個應用程式中使用。 - 只需更改幾個屬性,即可輕鬆地從本機開發伺服器切換到記憶體測試伺服器或生產伺服器。
實際上,使用此模組只需要在專案的 pom 檔案中添加額外的依賴項:
<dependency>
<groupId>io.temporal</groupId>
<artifactId>temporal-spring-boot-starter</artifactId>
<version>1.32.0</version>
</dependency>
該依賴項的最新版本可在 Maven Central 上找到。
4. 訂單處理範例
在本教程中,我們將使用 Temporal 的 Spring Boot 整合模組建立一個簡單的訂單處理工作流程。下圖是我們將要實現的工作流程的圖形化表示,採用的是 BPMN 表示法:
此圖顯示了我們需要協調的一系列活動和事件,以完全完成訂單處理工作流程實例。
當然,這只是一個非常簡單的模型,但它包含了現實世界中可能出現的特徵:
- 並行執行:建立發貨單和請求付款
- 外部事件處理:付款已接受/已拒絕,出貨進度事件
- 處理超時:取貨超時、送貨超時
- 流程級故障處理:處理返回事件,退款
5. 應用程式結構
訂單應用程式遵循與其他 Spring Boot 應用程式類似的熟悉結構。
最頂層是OrderApplication類,它只包含應用程式的入口點。在其下方,我們有用於不同組件的子包:
-
workflow:工作流程介面與實現 -
activities:活動介面和實施 -
services:用於執行與活動相關的任務的服務 -
domain:應用程式中使用的值對象 -
adapter.rest:控制器,用於公開一個簡單的 REST API,允許客戶端提交訂單、向即時實例發送事件以及查詢有關實例的資訊。 -
config:Spring 配置類
最後,我們還需要定義一些屬性,以便自動配置機制能夠引導引擎的環境。以下是一個範例application.yaml文件,其中定義了運行範例所需的最小屬性集:
spring:
temporal:
connection:
target: local
workers-auto-discovery:
packages:
- "com.baeldung.temporal.workflows.sboot.order"
所有與 Temporal 相關的屬性都使用spring.temporal前綴。在本例中,我們將連線目標定義為local ,它是本機運行的開發伺服器的別名,該伺服器在標準連接埠 7233 上處理請求。
workers-auto-discovery屬性需要一個套件名稱列表,該列表將遞歸掃描,以查找具有 Temporal 相關註解的類別和/或介面。
6. 工作流程介面
首先,我們將建立OrderWorkflow接口,並加入必要的@WorkflowInterface註解來表示我們的訂單業務流程。我們定義一個processOrder()方法,並為其應用@WorkflowMethod註解,以便 Temporal 知道它是工作流程的入口點。此方法接收一個OrderSpec記錄,其中包含有關所購商品的信息,以及帳單、出貨和客戶資訊:
@WorkflowInterface
public interface OrderWorkflow {
@WorkflowMethod
void processOrder(OrderSpec spec);
// ... other methods omitted
}
接下來,我們加入帶有@SignalMethod-註解的方法,這些方法會對應到工作流程期望的訊息:
@WorkflowInterface
public interface OrderWorkflow {
// ... other methods omitted
@SignalMethod
void paymentAuthorized(String transactionId, String authorizationId);
@SignalMethod
void paymentDeclined(String transactionId, String cause);
@SignalMethod
void packagePickup(Instant pickupTime);
@SignalMethod
void packageDelivered(Instant pickupTime);
@SignalMethod
void packageReturned(Instant pickupTime);
}
請注意,我們使用@SignalMethod而不是@UpdateMethod ,因為在我們的場景中,客戶端不需要等待這些呼叫被處理。這也被稱為「即發即棄」模式,與@UpdateMethod呼叫形成對比,後者會阻塞客戶端直到方法完成。
為了完善工作流程介面,我們將添加@QueryMethod方法,這將允許我們觀察實例的內部狀態:
@WorkflowInterface
public interface OrderWorkflow {
// ... other methods omitted
@QueryMethod
Order getOrder();
@QueryMethod
Shipping getShipping();
@QueryMethod
PaymentAuthorization getPayment();
@QueryMethod
RefundRequest getRefund();
}
7. 活動介面
活動介面充當一個外觀層,其方法如其名,對應於工作流程實例在其生命週期中執行的活動。這意味著,通常情況下,我們將擁有一些方法,這些方法或多或少可以直接對應到 BPMN 圖中的活動框:
@ActivityInterface
public interface OrderActivities {
void reserveOrderItems(Order order);
void cancelReservedItems(Order order);
void returnOrderItems(Order order);
void dispatchOrderItems(Order order);
PaymentAuthorization createPaymentRequest(Order order, BillingInfo billingInfo);
RefundRequest createRefundRequest(PaymentAuthorization payment);
Shipping createShipping(Order order);
Shipping updateShipping(Shipping shipping, ShippingStatus status);
}
8. 工作流程實施
接下來進入實施階段,我們可以看到業務流程有兩個需要注意的特點:
- 並行執行:預留訂單商品後,流程分為兩個分支,一個分支建立支付授權請求,另一個分支建立出貨請求。
- 阻塞互斥事件:逾時或(互斥的)訊息
讓我們來看看如何實現這些功能,首先從並行執行開始:
@Service
@WorkflowImpl(taskQueues = "ORDERS")
public class OrderWorkflowImpl implements OrderWorkflow {
// ... fields and constructor omitted
@Override
public void processOrder(OrderSpec spec) {
// ... order initialization omitted
activities.reserveOrderItems(spec.order());
// Create a payment request
Async.function(() -> payment = activities.createPaymentRequest(spec.order(), spec.billingInfo()));
// Create a shipping request
shipping = activities.createShipping(spec.order());
// ... workflow logic omitted
}
// ... other methods omitted
}
關鍵在於使用Async.function()在另一個執行緒中執行 Activity。由於我們只有兩個分支,因此我們在背景處理付款請求,同時繼續在主執行緒上執行其他 Activity。
建立發貨訂單後,我們需要等待付款。這裡我們將使用 Workflow.await(),它接受一個傳回布林值的 lambda 表達式:
Workflow.await(() -> payment != null && payment.status() != PaymentStatus.PENDING);
在這種情況下,我們必須先檢查是否已有付款及其狀態。之所以要進行null檢查,是因為payment類別的實例變數是非同步設定的,所以我們不能假定它已經存在值。
這些 lambda 函數的另一個關鍵方面是它們絕對不能有副作用。這是 Temporal 類別的一項要求,與確定性工作流程執行原則密切相關,而確定性工作流程執行原則是正確錯誤復原的基礎。
9. 訊號和查詢方法實現
與主要工作流程方法相比,這些方法非常簡單。以下是paymentAuthorized訊息的訊號處理程序實作:
@Override
public void paymentAuthorized(String transactionId, String authorizationId) {
Workflow.await(() -> payment != null);
payment = new PaymentAuthorization(
payment.info(),
PaymentStatus.APPROVED,
payment.orderId(),
transactionId,
authorizationId,
null
);
}
請注意使用了Workflow.await()來確保工作流程已經建立了初始支付請求。我們需要進行此檢查,因為客戶端甚至可能在主工作流程方法啟動之前就呼叫 `signal`、`update` 和 `query` 方法。
我們必須注意,與訊號和更新方法不同,查詢方法不能使用Workflow await() ——這會導致運行時異常!如果客戶端請求的資訊不可用,則回傳值必須能夠清楚地表明這種情況:
@Override
public PaymentAuthorization getPayment() {
return payment;
}
如果在工作流程為其賦值之前呼叫該方法,則該方法將直接傳回null 。
10. 活動實施
OrderActivitiesImpl類別實作了OrderActivities介面。真正的操作都發生在這裡,因為它的方法會與工作流程的支援服務互動。
舉個典型的例子,我們來看看reserveInventoryItems方法:
@Service
@ActivityImpl(taskQueues = "ORDERS")
public class OrderActivitiesImpl implements OrderActivities {
// ... fields and constructors omitted
@Override
public void reserveOrderItems(Order order) {
for (OrderItem item : order.items()) {
inventoryService.reserveInventory(item.sku(), item.quantity());
}
}
// ... other activities omitted
}
活動會遍歷訂單項,並請求inventoryService從可用庫存中預留這些訂單項。
11. REST API
此 API 的主要目的是保護客戶端免受任何與時間相關的資訊影響。雖然並非絕對必要,但這種機制的普遍做法是避免過度依賴特定供應商。
如有需要,此 API 可以作為一個完全獨立的模組的一部分。為簡單起見,這裡我們將其添加到同一個專案中。以下是用於建立新工作流程實例的 API 的 @PostMapping 方法:
@RestController
@RequestMapping("/order")
public class OrderApi {
// ...fields and constructor omitted
@PostMapping
public ResponseEntity<OrderCreationResponse> createOrder(@RequestBody OrderSpec orderSpec) {
var execution = orderService.createOrderWorkflow(orderSpec);
var location = UriComponentsBuilder.fromUriString("/order/{orderExecutionId}").build(execution);
return ResponseEntity.created(location).body(new OrderCreationResponse(execution));
}
// ... other endpoint methods omitted
}
OrderService是 Temporal 提供的WorkflowClient bean 的輕量級封裝層,它省略了一些樣板程式碼:
@Service
public class OrderService {
private final WorkflowClient workflowClient;
public OrderService(WorkflowClient workflowClient) {
this.workflowClient = workflowClient;
}
public OrderWorkflow getWorkflow(String orderExecutionId) {
return workflowClient.newWorkflowStub(OrderWorkflow.class, orderExecutionId);
}
public String createOrderWorkflow(OrderSpec orderSpec) {
var uuid = UUID.randomUUID();
var wf = workflowClient.newWorkflowStub(
OrderWorkflow.class,
WorkflowOptions.newBuilder()
.setTaskQueue("ORDERS")
.setWorkflowId(uuid.toString()).build());
var execution = WorkflowClient.start(wf::processOrder, orderSpec);
return execution.getWorkflowId();
}
}
12. 測試
工作流程應用程式需要進行測試,以模擬與被呼叫服務及其相關故障模式的複雜交互作用。在這種情況下,單元測試雖然仍然有助於驗證系統的特定部分,但通常會輔以整合測試。
建構這些測試的一種方法是為「正常路徑」及其相關的替代路徑建立場景。重點在於覆寫給定工作流程實例可能採取的所有路徑,這在實踐中意味著我們應該力求實現主工作流程方法 100% 的程式碼覆蓋率。
本範例展示了「正常路徑」測試案例的實作。需要注意的幾點:
- 該測試使用記憶體中的 Temporal 測試伺服器。為了啟用它,我們將
spring.temporal.test-server.enabled屬性設為true - 所有測試均由對 REST API 的呼叫驅動,這些呼叫由
@SpringBootTest註解建立。 - 為了模擬支付網關的回應,我們需要工作流程內部的支付請求資料。請注意,我們使用循環查詢此信息,因為此資訊可能在工作流程邏輯建立之前不可用。
使用嵌入式測試伺服器時,一個非常實用的功能是能夠跳過特定時間段。這對於測試那些需要花費數小時甚至數天才能完成的活動的工作流程至關重要。
要使用此功能,我們需要使用TestWorkflowEnvironment服務中提供的sleep()方法,該方法可注入到 Spring 測試中。以下是如何使用此方法模擬訂單工作流程中的取貨逾時:
@Test
public void whenPickupTimeout_thenItemsReturnToStock() {
// ... order creation steps (omitted)
// Fast-forward 1 day to force a the delivery timeout
testEnv.sleep(Duration.ofDays(1));
// Wait until the workflow completes
testEnv.getWorkflowClient().newUntypedWorkflowStub(orderExecutionId).getResult(Void.class);
// ... Check for order cancelled and itens returned to stock (omitted)
}
當測試程式碼呼叫sleep(),引擎會將其內部時鐘快轉指定的量,因此任何待處理的 await 呼叫都會相應地完成。
13. 結論
本文介紹如何將 Temporal 工作流引擎與 Spring Boot 結合使用。我們以訂單履行為例,展示如何設定項目以及如何使用自動配置支援來簡化設定。
我們也詳細介紹了實現真實世界工作流程所需的技術,包括並行執行、訊息傳遞和逾時。
和往常一樣,所有程式碼都可以在 GitHub 上找到。