Dapr 工作流程與 PubSub
1. 引言
我們先前的文章探討如何使用 Spring Boot 和 Dapr 為叫車應用程式建立一個鬆散耦合的 PubSub 訊息系統。雖然這種方法適用於簡單的訊息路由,但實際應用場景通常需要更複雜的編排。
在本教程中,我們將透過引入Dapr 工作流程來擴展我們的場景。我們將學習如何編排複雜的、長時間運行的流程,這些流程能夠回應 REST 端點和 PubSub 事件,同時保持持久性。我們還將示範如何使用 Testcontainers 測試這些工作流程,以確保我們的系統在沒有外部基礎架構的情況下也能如預期運作。
2. 理解持久執行
設想一下司機接受訂單後會發生什麼:我們需要驗證司機的身份資訊、計算車費、通知乘客、更新行程狀態,並且需要處理每個步驟中可能出現的故障。這些多步驟流程需要協調和狀態管理,而傳統的發布/訂閱機制無法單獨處理這些。如果任何步驟失敗,我們必須重試、進行補償或回滾之前的操作。
解決這一問題的傳統方法通常包括:
- 資料庫中的手動狀態管理
- 複雜的重試邏輯分散在各個服務中
- 針對每項操作進行自訂錯誤處理
- 難以追蹤工作流程進度
- 服務在進程中途重啟時會遺失上下文訊息
2.1. 利用 Dapr 工作流程解決問題
Dapr Workflows 透過持久執行來解決這些挑戰,而持久執行是多種解決方案的結合:
- 自動狀態持久化:工作流程引擎會在每個步驟結束後持久化狀態。如果流程崩潰或重啟,它會自動從中斷處恢復,不會遺失進度。由於運行時會處理這些複雜性,因此整個過程是透明的。
- 工作流程重播:當工作流程在故障後復原時,Dapr 會從頭開始重播工作流程程式碼。但是,它不會重新執行已完成的活動,而是使用已持久化的結果。這意味著我們的工作流程編排邏輯會運行多次,因此它必須是確定性的。
- 內建彈性:我們無需為每個操作實作自訂重試邏輯和錯誤處理,只需定義一次重試策略。 Dapr會自動套用這些策略、管理逾時並處理瞬態故障。
- 簡化流程編排:複雜的流程可以簡化為簡潔的程式碼。我們可以使用
await()呼叫來編寫順序邏輯,使用條件分支,並循環執行各個步驟。
透過與 Dapr 的其他建置模組(例如發布/訂閱、狀態管理和服務呼叫)集成,工作流程可以協調分散式操作,而無需依賴特定的基礎設施。這意味著我們的業務邏輯可以在不同的環境和雲端提供者之間保持可移植性。
3. 工作流程即代碼
Dapr Workflows 讓我們可以使用熟悉的程式結構來定義編排邏輯,而不是使用 XML 或 JSON 定義。 Spring Boot 整合透過將工作流程和活動視為 Spring Bean 進行管理,使此流程變得無縫銜接。
3.1 Workflow抽象
工作流程代表我們需要執行的一系列步驟。在 Dapr 中,我們實作了Workflow接口,並使用create()方法定義我們的邏輯。工作流程context提供了用於呼叫活動、等待外部事件和管理工作流程狀態的方法。我們還沒有建立活動,所以讓我們從任務選項開始:
@Component
public class RideProcessingWorkflow implements Workflow {
@Override
public WorkflowStub create() {
return context -> {
WorkflowTaskOptions options = taskOptions();
// ...
};
}
}
同樣,工作流程程式碼必須是確定性的,因為它會在每次工作流程恢復時重播。這意味著我們不應該直接執行 I/O 操作、呼叫外部 API 或與資料庫或其他基礎設施互動。相反,與外部系統的互動必須封裝在活動中。這樣可以確保工作流程重播時,能夠一致地利用先前活動執行中持久化的結果做出相同的決策。
3.2 WorkflowActivity抽象
活動封裝了工作流程中的各個工作單元,代表與外部系統、資料庫或服務互動的任務。我們的第一個活動是透過從上下文中取得RideWorkflowRequest來簡單地驗證司機 ID:
@Component
public class ValidateDriverActivity implements WorkflowActivity {
@Autowired
private RestTemplate restTemplate;
@Override
public Object run(WorkflowActivityContext context) {
RideWorkflowRequest request = ctx.getInput(RideWorkflowRequest.class);
logger.info("Validating driver: {}", request.getDriverId());
if (request.getDriverId() != null && !request.getDriverId().isEmpty()) {
logger.info("Driver {} validated successfully", request.getDriverId());
return true;
}
throw new IllegalArgumentException("Invalid driver ID");
}
}
由於 Activity 是元件,我們可以利用依賴注入來存取其他 bean,例如儲存庫或任何我們需要的服務。 Activity可能會失敗,工作流程執行時會處理它們的執行和狀態管理。如果驗證失敗,我們會拋出異常。
3.3 啟用工作流程自動發現
@EnableDaprWorkflows註解透過自動發現類別路徑上的所有工作流程和活動來簡化工作流程註冊。讓我們在 Spring Boot 應用程式類別中使用它:
@EnableDaprWorkflows
@SpringBootApplication
public class DaprWorkflowApp {
public static void main(String[] args) {
SpringApplication.run(DaprWorkflowApp.class, args);
}
}
有了這個註解,Spring Boot 會自動執行以下操作:
- 掃描實作
Workflow和WorkflowActivity類 - 將它們登記為春豆
- 使它們可供 Dapr 工作流程運行時使用
- 在整個工作流程基礎架構中啟用依賴注入
這樣就省去了樣板配置,遵循了 Spring Boot 的約定優於配置的概念。
4. 擴展叫車發布/訂閱範例
讓我們改進我們的叫車應用程序,使其包含一個協調整個乘車生命週期的工作流程。
4.1. 情景
當司機接受乘車請求時,我們希望:
- 驗證駕駛員的資質
- 計算並處理預估票價
- 通知乘客司機正在趕往目的地。
- 將行程狀態更新為“進行中”
當從 PubSub 系統收到「驅動程式已接受」事件時,此工作流程啟動。每個步驟都需要妥善的錯誤處理,整個過程必須具有持久性。如果我們的服務重啟,工作流程應無縫恢復。
4.2. 定義領域模型
我們將把RideRequest模型封裝在RideWorkflowRequest物件中,以包含特定於工作流程的資訊:
public class RideWorkflowRequest {
private RideRequest rideRequest;
private String rideId;
private String driverId;
private String workflowInstanceId;
// constructors, getters, and setters
}
最後,我們將建立一個簡單的模型來定義工作流程完成後的進度:
public record RideWorkflowStatus(String rideId, String status, String message) {}
4.3. 透過 REST 啟動工作流程
首先,我們建立一個 REST 端點來啟動工作流程。我們的控制器只需要DaprWorkflowClient,而我們的端點會接收我們需要的上下文輸入資料:
@RestController
@RequestMapping("/workflow")
public class RideWorkflowController {
@Autowired
DaprWorkflowClient workflowClient;
@PostMapping("/start-ride")
public RideWorkflowRequest startRideWorkflow(
@RequestBody RideWorkflowRequest request) {
// ...
return request;
}
}
DaprWorkflowClient允許我們調度新的工作流程實例並查詢其狀態。每個工作流程都會獲得一個唯一的實例 ID,我們可以使用該 ID 來追蹤其進度。讓我們透過呼叫scheduleNewWorkflow()並傳入我們的工作流程類別和上下文資料來啟動工作流程:
String instanceId = workflowClient.scheduleNewWorkflow(
RideProcessingWorkflow.class, request);
request.setWorkflowInstanceId(instanceId);
4.4. 發起工作流程事件
只要有了工作流程實例 ID,我們就可以從任何地方觸發事件。讓我們建立一個乘客使用的端點,該端點將觸發確認訊息以及一個String有效負載:
@PostMapping("/confirm/{instanceId}")
public void confirmRide(
@PathVariable("instanceId") String instanceId, @RequestBody String confirmation) {
workflowClient.raiseEvent(instanceId, "passenger-confirmation", confirmation);
}
通常使用WorkflowContext中的waitForExternalEvent()來引發事件,以進行流程編排。
4.5. 從發布/訂閱事件觸發工作流程
創建監聽乘車接受事件的訂閱者看起來非常相似。唯一的區別在於我們使用了@Topic註解,並將輸入包裝在CloudEvent中:
@PostMapping("/driver-accepted")
@Topic(pubsubName = "ride-hailing", name = "driver-acceptance")
public void onDriverAcceptance(@RequestBody CloudEvent<RideWorkflowRequest> event) {
RideWorkflowRequest request = event.getData();
workflowClient.scheduleNewWorkflow(RideProcessingWorkflow.class, request);
}
5.實施更多工作流程活動
讓我們繼續為工作流程中的每個步驟建立活動。
5.1 計算票價
接下來,我們將建立一個活動來計算車費:
@Component
public class CalculateFareActivity implements WorkflowActivity {
@Override
public Object run(WorkflowActivityContext context) {
double baseFare = 5.0;
double perMileFare = 2.5;
double estimatedMiles = 10.0;
return baseFare + (perMileFare * estimatedMiles);
}
}
5.2 通知乘客
最後,我們來創建一個活動來通知乘客。我們將使用一個記錄來儲存所有需要的資料:
public record NotificationInput(RideWorkflowRequest request, double fare) {}
在實際應用中,我們會透過電子郵件、推播或其他方式發送通知。為了專注於框架本身,我們僅記錄訊息內容:
@Component
public class NotifyPassengerActivity implements WorkflowActivity {
@Override
public Object run(WorkflowActivityContext context) {
NotificationInput input = context.getInput(NotificationInput.class);
String message = String.format(
"Driver %s is on the way to %s. Estimated fare: $%.2f",
input.request().getDriverId(),
input.request().getRideRequest().getLocation(),
input.fare());
context.getLogger().info("Notification sent: {}", message);
return message;
}
}
6. 協調工作流程
現在我們已經擁有完成工作流程所需的一切,讓我們回到RideProcessingWorkflow類別來實現活動的編排。
6.1 驗證驅動程式
我們將記錄目前步驟並檢查ValidateDriverActivity傳回的boolean :
context.getLogger().info("Step 1: Validating driver {}", request.getDriverId());
boolean isValid = context.callActivity(
ValidateDriverActivity.class.getName(), request, options, boolean.class)
.await();
如果該活動傳回 false,我們將以「失敗」狀態結束工作流程:
if (!isValid) {
context.complete(new RideWorkflowStatus(
request.getRideId(), "FAILED", "Driver validation failed"));
return;
}
6.2 計算票價並通知乘客
首先,我們透過呼叫CalculateFareActivity來取得票價值:
context.getLogger().info("Step 2: Calculating fare");
double fare = context.callActivity(
CalculateFareActivity.class.getName(), request, options, double.class)
.await();
然後使用它來建立我們的NotificationInput並呼叫其 Activity:
context.getLogger().info("Step 3: Notifying passenger");
NotificationInput notificationInput = new NotificationInput(request, fare);
String notification = context.callActivity(
NotifyPassengerActivity.class.getName(), notificationInput, options, String.class)
.await();
6.3 等待乘客確認
為了阻塞直到事件觸發,我們對上下文呼叫waitForExternalEvent()函數,並可選擇性地指定一個逾時值。如果我們想要存取事件的有效負載,還需要指定其類型:
context.getLogger().info("Step 4: Waiting for passenger confirmation");
String confirmation = context.waitForExternalEvent(
"passenger-confirmation", Duration.ofMinutes(5), String.class)
.await();
我們期望此範例中的確認有效載荷會精確地標記為「已確認」。否則,我們將以「已取消」狀態結束:
if (!"confirmed".equalsIgnoreCase(confirmation)) {
context.complete(new RideWorkflowStatus(
request.getRideId(),
"CANCELLED",
"Passenger did not confirm the ride within the timeout period"));
return;
}
6.4 完成工作流程
如果一切順利,我們最終會將工作流程標記為「已完成」:
String message = String.format(
"Ride confirmed and processed successfully. Fare: $%.2f. %s", fare, notification);
RideWorkflowStatus status = new RideWorkflowStatus(
request.getRideId(), "COMPLETED", message);
context.getLogger().info("Workflow completed: {}", message);
context.complete(status);
7. 使用 Testcontainers 測試工作流程
測試工作流程對於確保我們的編排邏輯正確運作至關重要。 dapr-spring-boot-starter-test 模組與 Testcontainers 無縫集成,使我們能夠在無需外部 CLI 工具的情況下測試整個系統。
7.1. 測試我們的快樂路徑
那麼,讓我們從第一次呼叫 REST 服務「start-ride」開始,測試整個工作流程:
@Test
void whenWorkflowStartedViaRest_thenAllActivitiesExecute() {
RideRequest rideRequest = new RideRequest(
"passenger-1", "Downtown", "Airport");
RideWorkflowRequest workflowRequest = new RideWorkflowRequest(
"ride-123", rideRequest, "driver-456", null);
RideWorkflowRequest response = given().contentType(ContentType.JSON)
.body(workflowRequest)
.when()
.post("/workflow/start-ride")
.then()
.statusCode(200)
.extract()
.as(RideWorkflowRequest.class);
String instanceId = response.getWorkflowInstanceId();
assertNotNull(instanceId);
// ...
}
收到回應後,我們儲存工作流程實例 ID,並透過 awaitility 檢查workflowClient.getInstanceState()來等待工作流程開始執行:
await().atMost(Duration.ofSeconds(10))
.pollInterval(Duration.ofMillis(200))
.until(() -> {
WorkflowInstanceStatus status = workflowClient.getInstanceState(instanceId, false);
return status != null && status.getRuntimeStatus() == WorkflowRuntimeStatus.RUNNING;
});
然後,我們觸發乘客確認事件,以便工作流程可以繼續進行:
given().contentType(ContentType.TEXT)
.body("confirmed")
.when()
.post("/workflow/confirm/" + instanceId)
.then()
.statusCode(200);
最後,我們透過重新檢查實例狀態來等待工作流程完成:
await().atMost(Duration.ofSeconds(15))
.pollInterval(Duration.ofMillis(500))
.until(() -> {
WorkflowInstanceStatus status = workflowClient.getInstanceState(instanceId, false);
return status != null && status.getRuntimeStatus() == WorkflowRuntimeStatus.COMPLETED;
});
最後,我們透過檢查實例狀態來驗證工作流程是否成功完成:
WorkflowInstanceStatus finalStatus = workflowClient.getInstanceState(instanceId, true);
assertEquals(WorkflowRuntimeStatus.COMPLETED, finalStatus.getRuntimeStatus());
7.2 建構我們的重試策略
讓我們看看如何利用 Dapr 的持久執行模型以及內建的逾時、重試和補償支援。我們將透過實作我們在RideProcessingWorkflow中定義的taskOptions()方法來定義工作流程中每個步驟所使用的重試策略。我們將透過實例化一個WorkflowTaskRetryPolicy並將其作為可重複使用的WorkflowTaskOptions返回來定義最大重試次數、退避逾時、重試間隔和最大逾時:
private WorkflowTaskOptions taskOptions() {
int maxRetries = 3;
Duration backoffTimeout = Duration.ofSeconds(1);
double backoffCoefficient = 1.5;
Duration maxRetryInterval = Duration.ofSeconds(5);
Duration maxTimeout = Duration.ofSeconds(10);
WorkflowTaskRetryPolicy retryPolicy = new WorkflowTaskRetryPolicy(
maxRetries, backoffTimeout, backoffCoefficient, maxRetryInterval, maxTimeout);
return new WorkflowTaskOptions(retryPolicy);
}
重試策略確定後,我們就可以看看我們的工作流程如何處理出現問題的情況了。
7.3. 測試一條不愉快的路徑
讓我們驗證一下,當某個活動持續失敗時,重試策略是否能正常運作:
@Test
void whenActivityFails_thenRetryPolicyApplies() {
RideWorkflowRequest invalidRequest = new RideWorkflowRequest(
"ride-789", new RideRequest("passenger-3", "Park", "Beach"), "", null);
String instanceId = workflowClient.scheduleNewWorkflow(
RideProcessingWorkflow.class, invalidRequest);
await().atMost(Duration.ofSeconds(20))
.pollInterval(Duration.ofMillis(500))
.until(() -> {
WorkflowInstanceStatus status = workflowClient.getInstanceState(instanceId, false);
return status != null && status.getRuntimeStatus() == WorkflowRuntimeStatus.FAILED;
});
WorkflowInstanceStatus finalStatus = workflowClient.getInstanceState(instanceId, true);
assertEquals(WorkflowRuntimeStatus.FAILED, finalStatus.getRuntimeStatus());
}
在此測試中,我們提供了一個空的司機 ID,這會導致ValidateDriverActivity拋出異常。工作流程引擎會根據我們的重試策略自動重試該活動。重試次數用盡後,工作流程狀態將變成FAILED 。
請注意,我們使用了更長的超時時間來考慮重試嘗試,這表明 Dapr 工作流程如何優雅地處理故障,同時給予瞬態問題時間來解決。
8. 在 Diagrid CLI 上使用 Catalyst 運行
本地開發完成後,自行建置和維護基礎設施會帶來巨大的營運複雜性。 Diagrid Catalyst透過提供完全託管的 Dapr 環境,消除了這項負擔。
8.1. 設定和運行我們的應用程式
註冊免費帳戶後,我們就可以安裝Diagrid CLI並使用以下命令運行我們的應用程式:
diagrid dev run \
--project spring-boot \
--app-id dapr-workflows \
--app-port 60603 \
-- mvn spring-boot:run
這條命令:
- 建立新專案/應用
- 將我們的本機應用程式連接到 Catalyst 的託管 Dapr 服務
- 利用基於雲端的工作流程編排和狀態管理以及生產級基礎設施
9. 結論
在本文中,我們擴展了 Dapr PubSub 範例,使其包含持久化工作流程編排功能。透過將 Dapr PubSub 與 Dapr Workflows 結合,我們創建了一個能夠處理事件驅動型訊息傳遞和複雜多步驟流程的系統。
和往常一樣,原始碼可以在 GitHub 上找到。