Java 中時間工作流引擎入門
1. 簡介
許多應用程式需要編排引擎來實現業務流程。尤其值得一提的是,Java 領域不乏經過實務檢驗的引擎,包括開源引擎和專有引擎。
在本教程中,我們將介紹Temporal ,這是一個於 2020 年首次發布的開源工作流程引擎,其靈感來自其作者在 Uber、Azure 和 Amazon 的早期作品。
2.什麼是時間?
Temporal 支援使用 Java、Go、TypeScript 等支援的語言編寫工作流程。這些工作流程負責協調活動,這些活動可以是任何工作單元,從發送電子郵件到提交付款審批請求,或任何其他類似的業務流程。
從架構角度來看, Temporal 遵循了Saga 微服務模式中著名的orchestrator
變體。此模式的主要特徵是存在一個集中式協調器服務,該服務追蹤工作流程實例的進度,並將命令分派給處理各個活動執行的工作人員。
如果一個工作進程崩潰或某個活動失敗,時間協調器會透明地重試並從停止的地方恢復執行。這在分散式系統中尤其有用,因為故障幾乎是不可避免的。
在部署方面,Temporal 提供了一些選項:
- 獨立伺服器,用於本地開發
- 在 Docker 相容引擎、Kubernetes 或裸機上進行本機部署
- 基於 SaaS,具有託管的 Temporal 實例,提供支援、SLA 等
此處的演示使用了獨立伺服器。線上文件提供了有關其他選項的更多詳細資訊。
3.核心概念
在深入研究具體程式碼之前,讓我們先簡單介紹一下 Temporal 背後的主要元件和想法。
3.1. 工作流程
工作流程定義了執行給定業務用例所需的活動序列。
工作流程實例也可以啟動其他子工作流程。這些子實例可以相對於其父工作流程同步或非同步運行,並且預設情況下,如果父工作流程終止,子工作流程也會終止。或者,父工作流程可以變更此行為,以便子工作流程繼續獨立執行。
工作流程是確定性的,可能需要幾天、幾週甚至幾年才能完成。
讓我們來看一個確定性工作流程要求的例子。確定性意味著,對於給定的輸入,工作流程程式碼必須始終遵循相同的執行路徑。
例如,假設我們有一個簡單的工作流程邏輯:
public selectDeliveryMethod(Order order) {
int weight = deliveryActivities.calculateWeight(order);
if (weight > 100 ) {
deliveryActivities.sendByTrain(order);
}
else {
deliveryActivities.sendByAirplane(order);
}
}
這段程式碼是確定性的,因為只要第一次呼叫的結果相同,它總是執行一條路徑。如果我們將決策邏輯改為使用當前時間,就會失去確定性,因為結果可能會根據執行時間而改變。
這種確定性要求背後的原因是,Temporal 會保存所有活動呼叫的日誌,以便在工作進程發生故障時重播工作流程執行。有關此主題的完整討論,官方文件是一個很好的起點。
3.2. 活動
活動在工作流程中承擔著繁重的任務,執行與給定工作流程步驟相關的必要操作。例如,預訂工作流程可以包含一個活動,用於向航空公司發送訊息以確保座位,另一個活動用於向客戶發送電子郵件,等等。
活動應遵循單一職責規則,並且通常持續時間較短。如果活動失敗,只需拋出異常,並讓 Temporal 處理即可。
預設情況下,由於異常而終止的活動將根據重試策略自動重新提交。 Temporal 還允許開發人員透過拋出ApplicationFailure
異常來指示不應重試給定的活動。
活動的一個重要要求是它們必須是冪等的。根據設計,Temporal 的彈性機制遵循at least once principle
。實際上,這意味著從故障恢復時,給定的活動可能會執行多次。
例如,某個活動可能已成功完成,但網路問題可能導致工作器無法傳回結果。伺服器會逾時並重試執行。在這種情況下,活動程式碼需要偵測這是重複請求,並傳回與第一次呼叫相同的結果。
3.3. 工人
工作器是執行工作流程和活動程式碼的獨立應用程式。它們輪詢 Temporal 的任務並報告結果。
通常,生產環境具有給定工作應用程式的多個實例,以提供吞吐量和彈性。
3.4. 任務隊列
由 Temporal 伺服器管理的邏輯佇列將工作流程和活動任務路由給工作執行緒。由於工作執行緒會輪詢這些佇列以取得任務,因此它們也是在可用工作執行緒之間分配負載的主要方法。
3.5. 臨時伺服器
管理工作流程狀態、持久性和協調的後端服務。該伺服器還提供 Web UI 來監視工作流程、工作器等的執行情況。
4. 設定本地開發的時間
本機開發伺服器嵌入在 Temporal 的 CLI 工具temporal
中,可供下載。該軟體包包含一個可執行文件,我們可以將其保存到任何目錄。
要啟動開發伺服器,我們使用temporal
指令的 server start-dev
指令:
$ temporal server start-dev
CLI 1.4.1 (Server 1.28.0, UI 2.39.0)
Server: localhost:7233
UI: http://localhost:8233
Metrics: http://localhost:34031/metrics
啟動後,我們可以檢查其Web UI
管理介面:
如預期,目前還沒有工作流程實例。值得注意的是,測試伺服器沒有持久化功能,因此停止它可能會導致資料遺失。
5.建立基本工作流程
讓我們建立一個簡單的Hello World
工作流程,它由一個Hello
活動組成。
5.1. 項目設置
首先,我們將所需的 SDK 相依性新增至新的 Java 專案:
<dependency>
<groupId>io.temporal</groupId>
<artifactId>temporal-sdk</artifactId>
<version>1.31.0</version>
</dependency>
<dependency>
<groupId>io.temporal</groupId>
<artifactId>temporal-testing</artifactId>
<version>1.31.0</version>
<scope>test</scope>
</dependency>
這些相依性的最新版本可在 Maven Central 上找到:
接下來我們繼續進行流程啟動。
5.2. 定義工作流程介面
工作流程介面定義了工作流程的入口點。它是一個使用@WorkflowInterface
註解的常規 Java 接口,且只包含一個使用WorkflowMethod
註解的方法。
為此,讓我們看看HelloWorkflow
介面:
@WorkflowInterface
public interface HelloWorkflow {
@WorkflowMethod
String hello(String person);
}
重要的是,Temporal 使用不帶包名的介面名稱作為workflow type
。我們可以透過使用@WorkflowMethod
註解的name
可選屬性來變更此預設行為,以使用其他名稱。
除了入口點方法之外,工作流程介面還可以有其他方法與其互動:
-
@SignalMethod
:向工作流程發送非同步訊息 -
@QueryMethod
:傳回正在運行的工作流程的狀態資訊 -
@UpdateMethod
:變更正在執行的工作流程的狀態 -
@UpdateValidatorMethod
:在將資料傳送到相關處理程序之前,先驗證傳送至@UpdateMethod
數據
對於本指南,我們不涉及這些註釋。
5.3. 定義和實作活動接口
對於給定的工作流程,我們還需要定義一個或多個接口,這些接口定義了與其活動相對應的方法。在本例中,我們的工作流程只包含一個活動,因此我們有一個相當簡單的介面:
@ActivityInterface
public interface SayHelloActivity {
@ActivityMethod
String sayHello(String person);
}
活動介面必須帶有@ActivityInterface
註解。此外,所有活動方法也應該使用@ActivityMethod
註解。預設情況下,方法名稱將成為活動名稱,並顯示在 UI 中。如果需要,我們可以使用註解name
屬性來定義其他名稱。
我們現在可以編寫介面實作:
public class SayHelloActivityImpl implements SayHelloActivity {
public String sayHello(String person) {
return "Hello, " + person;
}
}
值得注意的是,到目前為止,無需任何時間概念的知識即可編寫程式碼。這使得活動程式碼保持整潔,並使單元測試和整合測試更容易實現。
5.4. 實作HelloWorkflow
介面
工作流程實作類別包含協調活動執行以完成給定目標的程式碼。
對於這個例子,我們所要做的就是呼叫它的單一活動:
public class HelloWorkflowImpl implements HelloWorkflow {
private final SayHelloActivity activity = Workflow.newActivityStub(
SayHelloActivity.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(10))
.build()
);
@Override
public String hello(String person) {
return activity.sayHello(person);
}
}
在初始化過程中,我們使用 Temporal 存根工廠方法來建立SayHelloActivity
的實例。在這裡,我們為每次執行定義了一個截止時間-在本例中為10
秒。
儘管看起來很簡單,但在呼叫sayHello
方法時會發生很多事情:
- 伺服器在工作流程歷史記錄中建立與活動呼叫相對應的條目。
- 新的活動訊息被放入執行隊列。
- 工作人員獲取訊息,執行活動方法,並將結果傳回伺服器。
- 伺服器建立一個條目來記錄活動的成功執行。
- 伺服器將結果傳回給工作流程代碼。
將這些活動隱藏在幕後是 Temporal 提供的便利的一部分。
5.5. 設定工作器
要運行該範例,我們還需要一個Worker
,它承載工作流程和活動的實作:
public class HelloWorkflowApplication {
private static final String QUEUE_NAME = "say-hello-queue";
public static void main(String[] args) {
var service = WorkflowServiceStubs.newLocalServiceStubs();
var client = WorkflowClient.newInstance(service);
var factory = WorkerFactory.newInstance(client);
var worker = factory.newWorker(QUEUE_NAME);
HelloWorkflowRegistrar.newInstance().register(worker);
factory.start();
}
}
這個相當簡約的工作者有一些樣板程式碼,用於初始化時間引擎並建立輪詢工作流程和活動執行請求的本地工作者。
此外,我們使用HelloWorkflowRegistrar
(可在線獲取)助手來告知 Temporal 我們處理哪些工作流程類型和活動。使用這樣的助手有助於將與給定工作流程相關的類別很好地打包在一起,從而提高可維護性。
最後,我們透過呼叫WorkerFactory.start()
來啟動主事件循環。此時,worker 開始輪詢 Temporal 並執行工作流程。
不過,有幾點值得一提:
- 此程式碼假定 Temporal 可在
localhost
預設連接埠上存取。 - QUEUE_NAME 常數定義從中消費訊息的任務佇列。
在現實場景中,我們應該讓使用者使用外部化機制(命令列參數、環境變數等)自訂時間端點和佇列名稱。
此外,雖然將兩種實作放在同一個可部署構件中很方便,但這並非嚴格要求。例如,我們可以讓某些工作執行緒只處理工作流程邏輯,而其他工作執行緒處理活動。事實上,這些工作執行緒甚至不需要用同一種語言編寫。
6. 啟動工作流程
現在我們已經掌握了時間難題的所有部分,我們應該能夠運行工作流程了。
6.1. 啟動 Worker
首先,讓我們啟動 Worker。如果使用 Eclipse 或 IntelliJ 之類的 IDE,只需執行HelloWorkflowApplication
的main()
方法即可。我們也可以使用exec
插件直接從 Maven 運行它:
mvn compile exec:java -Dexec.mainClass=com.baeldung.temporal.workflows.hello.HelloWorkflowApplication
無論選擇哪種方法,我們都應該在控制台上看到表明工作輪詢器已啟動的訊息:
INFO cbtwhHelloWorkflowApplication - Registering workflows and activities...
INFO cbtwhHelloWorkflowApplication - Starting worker...
INFO itiworker.MultiThreadedPoller - start: MultiThreadedPoller{name=Workflow Poller taskQueue="say-hello-queue", namespace="default", identity=992243@sacha}
INFO itiworker.MultiThreadedPoller - start: MultiThreadedPoller{name=Activity Poller taskQueue="say-hello-queue", namespace="default", identity=992243@sacha}
INFO itsWorkflowServiceStubsImpl - Created WorkflowServiceStubs for channel: ManagedChannelOrphanWrapper{delegate=ManagedChannelImpl{logId=1, target=127.0.0.1:7233}}
此時,我們應該能夠使用工作者進行工作流程。
6.2. 使用 CLI 啟動工作流程
啟動工作流程最簡單的方法可能是使用時間CLI 工具:
$ temporal workflow start --task-queue say-hello-queue --type HelloWorkflow -i \"Baeldung\"
輸出應該具有通用格式:
Running execution:
WorkflowId <workflowId>
RunId <runId>
Type HelloWorkflow
Namespace default
TaskQueue say-hello-queue
現在,讓我們使用產生的WorkflowId
來查詢其結果:
$ temporal workflow result -w <workflowId>
Results:
Status COMPLETED
Result "Hello, Baeldung"
ResultEncoding json/plain
因此,輸出顯示了工作流程結果。在本例中,這只是以Hello
和逗號開頭的輸入值。
6.3. Web UI 結果
在 Web UI 中,我們可以看到此工作流程實例有一個條目:
我們可以點擊獲取有關此執行的更多詳細資訊:
UI 和 CLI 都提供了附加功能,使我們能夠在運行或停止時追蹤工作流程。
6.4. 使用 API 啟動工作流程
在實際應用中,我們通常會使用 Temporal API 啟動工作流程。 WorkflowClient 是主要的 API 入口點,使我們能夠WorkflowClient
正在執行的 Temporal 服務進行互動。
有三種方法可以建立WorkflowClient
:
-
TestWorkflowEnvironment.getWorkflowClient()
:連接到適合單元和整合測試的進程內 Temporal 服務 -
WorkflowServiceStubs.newLocalServiceStubs()
:取得預先配置的服務存根以連接到本地運行的 Temporal 開發伺服器,並將其與WorkflowClient.newInstance()
一起使用 -
WorkflowServiceStubs.newServiceStubs()
:連接到臨時伺服器(本地或基於雲端)
一旦我們有了WorkflowClient
,我們就可以使用newWorflowStub()
來建立工作流程介面的實例。從這裡開始,啟動實例最簡單的方法是呼叫@WorkflowMethod
註解的方法。
因此,讓我們來看看一個整合測試範例,它使用第二個選項來建立客戶端,然後啟動Hello
工作流程:
@Test
void givenPerson_whenSayHello_thenSuccess() {
var service = WorkflowServiceStubs.newLocalServiceStubs();
var client = WorkflowClient.newInstance(service);
var wfid = UUID.randomUUID().toString();
var workflow = client.newWorkflowStub(
HelloWorkflow.class,
WorkflowOptions.newBuilder()
.setTaskQueue(QUEUE_NAME)
.setWorkflowId(wfid)
.build()
);
// Run the workflow synchronously
var result = workflow.hello("Baeldung");
assertEquals("Hello, Baeldung", result);
}
此呼叫會阻塞直到工作流程完成,這通常不是我們想要的。
事實上,更常見的情況是啟動工作流程,使其相對於呼叫者非同步運行:
var execution = WorkflowClient.start(workflow::hello,"Baeldung");
var workflowStub = client.newUntypedWorkflowStub(execution.getWorkflowId());
WokflowClient.start()
方法接受工作流程入口點方法以及一個或多個參數。它傳回一個WorkflowExecution
,其中包含與此實例關聯的workflowId
和runId
。
6.6. 透過 API 進行交互
我們在啟動工作流程後立即創建的WorkflowStub
使我們能夠與其進行互動。
例如,我們可以使用getResult()
或getAsyncResult()
來取得工作流程結果。我們使用前者,它回傳一個CompletableFuture
:
var future = workflowStub.getResultAsync(String.class);
var result = future.get();
assertEquals("Hello, Baeldung", result);
此存根具有以下功能:
- 取消或終止正在運行的實例
- 查詢內部狀態
- 發送訊號
- 檢索迄今為止運行的所有步驟的信息
這些在常見的控制場景中很方便。
7. 結論
在本文中,我們介紹了 Temporal 工作流程引擎的基礎知識,重點介紹了初始專案設定並介紹了其核心概念。
總而言之,儘管該引擎提供了許多選項和可能性,但在實踐中使用它卻相當簡單。
與往常一樣,所有程式碼均可在 GitHub 上取得。