使用Spring Cloud數據流進行批處理

1.概述

在該系列的第一篇文章中,我們介紹了Spring Cloud Data Flow的體系結構組件以及如何使用它創建流數據管道。

與流水線處理無限數量的數據流相反,批處理過程使創建按需執行任務的短期服務變得容易

2.本地數據流服務器和shell

Local Data Flow Server是負責部署應用程序的組件,而Data Flow Shell使我們能夠執行與服務器交互所需的DSL命令。

在上一篇文章中,我們使用Spring Initilizr將它們都設置為Spring Boot Application。

@EnableDataFlowServer批註分別添加到server's主類和@ EnableDataFlowShell批註分別添加到Shell的主類之後,可以通過執行以下操作來啟動它們:

mvn spring-boot:run

服務器將在9393端口上啟動,並且外殼程序將根據提示與之交互。

您可以參考上一篇文章,以獲取有關如何獲取和使用Local Data Flow Server及其外殼客戶端的詳細信息。

3.批量程序

與服務器和shell程序一樣,我們可以使用Spring Initilizr來設置root Spring Boot批處理應用程序。

進入網站後,只需選擇GroupArtifact名稱,然後從依賴項搜索框中選擇Cloud Task

完成此操作後,單擊Generate Project按鈕以開始下載Maven工件。

該工件已預先配置並帶有基本代碼。讓我們看看如何對其進行編輯以構建批處理應用程序。

3.1。 Maven依賴

首先,讓我們添加幾個Maven依賴項。由於這是一個批處理應用程序,因此我們需要從Spring Batch Project導入庫:

<dependency>

 <groupId>org.springframework.boot</groupId>

 <artifactId>spring-boot-starter-batch</artifactId>

 </dependency>

另外,由於Spring Cloud Task使用關係數據庫存儲已執行任務的結果,因此我們需要向RDBMS驅動程序添加依賴項:

<dependency>

 <groupId>com.h2database</groupId>

 <artifactId>h2</artifactId>

 </dependency>

我們選擇使用Spring提供的H2內存數據庫。這為我們提供了一種引導開發的簡單方法。但是,在生產環境中,您將需要配置自己的DataSource

請記住,工件的版本將從Spring Boot的父pom.xml文件繼承。

3.2。 Main Class

啟用所需功能的關鍵點是在Spring Boot's主類中添加@EnableTask@EnableBatchProcessing批註。這個類級別的註釋告訴Spring Cloud Task引導所有內容:

@EnableTask

 @EnableBatchProcessing

 @SpringBootApplication

 public class BatchJobApplication {



 public static void main(String[] args) {

 SpringApplication.run(BatchJobApplication.class, args);

 }

 }

3.3。Job配置

最後,讓我們配置一個作業–在這種情況下,將String簡單打印到日誌文件:

@Configuration

 public class JobConfiguration {



 private static Log logger

 = LogFactory.getLog(JobConfiguration.class);



 @Autowired

 public JobBuilderFactory jobBuilderFactory;



 @Autowired

 public StepBuilderFactory stepBuilderFactory;



 @Bean

 public Job job() {

 return jobBuilderFactory.get("job")

 .start(stepBuilderFactory.get("jobStep1")

 .tasklet(new Tasklet() {



 @Override

 public RepeatStatus execute(StepContribution contribution,

 ChunkContext chunkContext) throws Exception {



 logger.info("Job was run");

 return RepeatStatus.FINISHED;

 }

 }).build()).build();

 }

 }

d etails如何配置和定義工作超出這個文章的範圍。有關更多信息,請參見我們的Spring Batch簡介文章。

最後,我們的應用程序已準備就緒。讓我們將其安裝在本地Maven存儲庫中。要將cd插入項目的根目錄並發出命令:

mvn clean install

現在是時候將應用程序放入Data Flow Server.

4.註冊申請

要在App Registry中註冊應用程序,我們需要提供一個唯一的名稱,一個應用程序類型和一個可以解析為應用程序工件的URI。

轉到Spring Cloud Data Flow Shell並從提示符處發出命令:

app register --name batch-job --type task

 --uri maven://com.baeldung.spring.cloud:batch-job:jar:0.0.1-SNAPSHOT

5.創建任務

可以使用以下命令創建任務定義:

task create myjob --definition batch-job

這將創建一個名為myjob的新任務,該任務指向先前已註冊的myjob -job應用程序。

可以使用以下命令獲取當前任務定義的列表:

task list

6.啟動任務

要啟動任務,我們可以使用以下命令:

task launch myjob

啟動任務後,任務的狀態將存儲在關係數據庫中。我們可以使用以下命令檢查任務執行的狀態:

task execution list

7.查看結果

在此示例中,作業僅在日誌文件中打印字符串。日誌文件位於Data Flow Server的日誌輸出中顯示的目錄內。

要查看結果,我們可以拖尾日誌:

tail -f PATH_TO_LOG\spring-cloud-dataflow-2385233467298102321\myjob-1472827120414\myjob

 [...] --- [main] osbatch.core.job.SimpleStepHandler: Executing step: [jobStep1]

 [...] --- [main] obspring.cloud.JobConfiguration: Job was run

 [...] --- [main] osbclsupport.SimpleJobLauncher:

 Job: [SimpleJob: [name=job]] completed with the following parameters:

 [{}] and the following status: [COMPLETED]

8.結論

在本文中,我們展示瞭如何通過使用Spring Cloud Data Flow處理批處理。

示例代碼可以在GitHub項目中找到。