使用 ElasticJob 進行分散式作業調度
1. 引言
在本教程中,我們將了解ElasticJob ,它是Apache ShardingSphere專案的一部分。我們將了解它是什麼,如何使用它以及我們可以用它做什麼。
2. 什麼是 ElasticJob?
ElasticJob是一個分片式分散式作業排程系統。它使我們能夠專注於編寫作業本身,而ElasticJob則處理所有其他細節。
ElasticJob 也支援各種類型的作業,這取決於我們需要做什麼:
- 基於 Java 的作業,它們以類別的形式存在於我們的應用程式中。
- 腳本作業允許我們在主機上執行腳本。
- HTTP 作業,即向遠端端點發出 HTTP 呼叫。
然後,它將處理調度作業並將其分配到應用程式中各個節點所需的一切。 ElasticJob 也會自動處理諸如分片故障時的故障轉移以及處理錯誤觸發的作業等細節。
執行作業時,我們會定義若干分片來分散工作負載。 ElasticJob 會自動將這些分片指派到叢集中所有可用的主機上,以確保負載平衡。如果叢集中新增或移除主機,分片也會自動重新分配,以保持負載在所有主機上的分散。
3. 依賴關係
在使用 ElasticJob 之前,我們需要將最新版本包含在我們的建置中,截至撰寫本文時,最新版本為3.0.5 。
如果我們使用 Maven,我們可以將此依賴項新增到 pom.xml 檔案中:
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-bootstrap</artifactId>
<version>3.0.5</version>
</dependency>
我們還需要在運行時有一個 Zookeeper 實例來管理分片之間的協調。
現在,我們已經準備好在我們的應用程式中使用它了。
4. 設定 ElasticJob
一旦我們設定好 ElasticJob 依賴項,就可以開始使用它了。
首先,我們需要確保 ZooKeeper 安裝正常運作。目前我們可以使用 Docker 來實現這一點:
$ docker run --rm -d -p 127.0.0.1:2181:2181 --name elasticjob-zookeeper zookeeper
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
....
2026-02-23 06:33:06,106 [myid:1] - INFO [main:oazsZooKeeperServer@588] - Snapshot taken in 0 ms
2026-02-23 06:33:06,110 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::oazsPrepRequestProcessor@138] - PrepRequestProcessor (sid:0) started, reconfigEnabled=false
然後我們需要配置一個CoordinatorRegistryCenter實例,使其指向我們的 ZooKeeper 實例:
CoordinatorRegistryCenter registryCenter =
new ZookeeperRegistryCenter(new ZookeeperConfiguration("localhost:2181", "my-service"));
registryCenter.init();
至此,ElasticJob 已設定完畢,可以使用了。
5. 撰寫招聘訊息
ElasticJob 準備好後,我們需要寫一些作業來搭配它使用。
5.1 工作實施
我們所有的作業都是基於ElasticJob的某個子類別實現的。在本例中,我們將繼承SimpleJob:
public class MyJob implements SimpleJob {
@Override
public void execute(ShardingContext context) {
// Job implementation
}
}
這樣我們就擁有了一個單獨的execute()方法,可以在其中實作我們的作業。 ElasticJob 會自動呼叫此方法。之後,作業中的具體操作完全由我們自己決定。
5.2 作業配置
有了作業類別之後,我們需要對其進行實際配置。這可以透過建構JobConfiguration實例來實現:
JobConfiguration jobConfig = JobConfiguration.newBuilder("MyJob", 3)
.cron("0 * * * * ?")
.build();
` newBuilder()方法接受兩個參數:作業名稱(無需與類別名稱相符)和要執行作業的分片數。然後,我們可以提供一個 cron 表達式來描述如何排程作業。在本例中,作業的調度時間為每分鐘的第 0 秒。
我們也可以使用jobParameter()方法來配置作業參數:
JobConfiguration jobConfig = JobConfiguration.newBuilder("MyJob", 3)
.jobParameter("Hello")
// ... more configuration
這裡傳入的任何內容都可以使用getJobParameter()方法在作業類別中提取出來。
此外,我們可以使用shardingItemParameters()方法提供分片參數:
JobConfiguration jobConfig = JobConfiguration.newBuilder("MyJob", 3)
.shardingItemParameters("0=a,1=b,2=c")
// ... more configuration
在這種情況下,提供的字串需要採用特殊格式。它是一個以逗號分隔的分片 ID 到值的列表。因此,這裡我們為分片 0 提供值“a” ,為分片 1 提供值“b” ,依此類推。
在我們的工作中, getShardingParameter()呼叫將從這個結構化字串中取得正確的值。如果未找到值,則會傳回null 。
5.3. 安排我們的工作
現在我們已經有了作業和一些作業配置,可以開始安排作業了。這是透過ScheduleJobBootstrap類別完成的:
new ScheduleJobBootstrap(registryCenter, new MyJob(), jobConfig)
.schedule();
在這裡,我們需要提供註冊表中心、作業配置以及作業類別的實例。 ElasticJob 隨後會將作業詳細資料記錄到註冊表中,並安排其按適當的計畫執行。
一旦恢復正常,我們的任務就可以按照預期在整個叢集上運行了。
6. 工作類型
我們已經了解如何建立作業並配置它們以按預期運行。然而,ElasticJob 為我們提供了更大的靈活性,讓我們能夠更好地控製作業的運作方式,從而更好地滿足我們的需求。
6.1 簡單工作
簡單作業是指實作了SimpleJob介面的作業。這為我們提供了一個方法—void void execute(ShardingContext) —我們只需在整個作業中實作該方法即可。之後,我們可以在 Java 程式碼中執行任何我們想要的操作,而該方法會在作業觸發時執行。
提供的ShardingContext實例使我們能夠存取某些詳細資訊。我們可以訪問:
-
getShardingTotalCount()– 此作業配置的分片總數。 -
getShardingItem()– 此特定分片的從 0 開始的索引。 -
getJobParameter()– 已配置的作業參數(如果有)。 -
getShardingParameter()– 此特定分片的分片參數(如果有)。
我們可以在 Java 程式碼中使用這些資訊來影響作業處理。例如,我們可以使用getShardingItem()的值來決定目前正在哪個分片上運行以及要處理哪些資料。
6.2 資料流作業
當需要處理項目清單時,資料流作業提供了一種替代簡單作業的方法。它們實作了DataflowJob<T>接口,其中通用參數T是我們要處理的項目類型。
這個介面要求我們實作兩個方法:一個用於獲取要處理的數據,另一個用於處理這些數據:
public static class MyDataflowJob implements DataflowJob<MyItem> {
private MyItemRepository repository;
@Override
public List<String> fetchData(ShardingContext shardingContext) {
return repository.getUnprocessedItems();
}
@Override
public void processData(ShardingContext shardingContext, List<MyItem> list) {
LOG.info("Processing data {} for job {}", list, shardingContext);
}
}
這樣我們就可以將資料取得與資料處理解耦。我們也可以將作業配置為以串流模式執行:
JobConfiguration jobConfig = JobConfiguration.newBuilder("MyDataflowJob", 3)
.setProperty("DataflowJobProperties.STREAM_PROCESS_KEY", "true")
// ... more configuration
這導致我們在fetchData()和processData()之間循環,直到fetchData()傳回null或空列表為止。
6.3. 腳本作業
除了執行以 Java 編寫的作業外,我們還可以觸發外部腳本來執行所需的操作。這些外部腳本可以是執行作業的主機上的任何可執行腳本。
對於這些情況,我們完全不需要寫作業類別。相反,我們只需提供哨兵值“SCRIPT”以及腳本運行所需的相應配置即可:
JobConfiguration jobConfig = JobConfiguration.newBuilder("MyScriptJob", 3)
.cron("0/5 * * * * ?")
.setProperty(ScriptJobProperties.SCRIPT_KEY, "/script.sh")
.build();
new ScheduleJobBootstrap(registryCenter, "SCRIPT", jobConfig)
.schedule();
這樣,每次作業執行時都會執行/script.sh命令,因此我們可以像這樣執行任何我們需要的操作。我們的ShardingContext將以 JSON 字串的形式作為第一個參數傳遞給腳本。
6.4. HTTP 作業
HTTP 作業可讓我們向已知伺服器發出 HTTP 請求,從而觸發遠端系統上的功能。對於這些作業,我們還需要提供一個哨兵值(這次是“HTTP” )以及關於要發起的 HTTP 呼叫的設定資訊:
JobConfiguration jobConfig = JobConfiguration.newBuilder("MyHttpJob", 3)
.cron("0/5 * * * * ?")
.setProperty(HttpJobProperties.URI_KEY, "https://example.com/job")
.setProperty(HttpJobProperties.METHOD_KEY, "POST")
.setProperty(HttpJobProperties.DATA_KEY, "source=Baeldung")
.build()
new ScheduleJobBootstrap(registryCenter, "HTTP", jobConfig)
.schedule();
這將導致 ElasticJob 在每次觸發作業時向此 URL 發出 HTTP POST請求。此請求的 HTTP 請求體將包含提供的數據,並且 HTTP 標頭hardingContext中還將提供ShardingContext的 JSON 版本:
POST /job HTTP/1.1
Content-Type: application/x-www-form-urlencoded
Content-Length: 15
Host: example.com
ShardingContext: {"jobName":"MyHttpJob","taskId":"MyHttpJob@-@0,1,2@-@READY@[email protected]@-@8253","shardingTotalCount":3,"jobParameter":"Hello","shardingItem":1,"shardingParameter":"b"}
source=Baeldung
處理此請求的任何伺服器都可以根據此資訊按需執行操作。
7. 總結
本文中,我們對 ElasticJob 進行了簡單介紹。它的功能遠不止於此。下次需要管理應用程式的定時任務時,不妨試試。
與往常一樣,本文中的所有範例都可以在 GitHub 上找到。