具有Spring Batch的Spring Boot

1.概述

Spring Batch是用於開發強大的批處理應用程序的強大框架。在之前的教程中,我們介紹了Spring Batch。

在本教程中,我們將在上一個教程的基礎上,學習如何使用Spring Boot設置和創建基本的批處理驅動的應用程序。

2. Maven依賴

首先,讓我們將spring-boot-starter-batch到我們的pom.xml

<dependency>

 <groupId>org.springframework.boot</groupId>

 <artifactId>spring-boot-starter-batch</artifactId>

 <version>2.4.0</version>

 </dependency>

我們還將添加org.hsqldb依賴關係,該依賴關係也可以從Maven Central獲得:

<dependency>

 <groupId>org.hsqldb</groupId>

 <artifactId>hsqldb</artifactId>

 <version>2.5.1</version>

 <scope>runtime</scope>

 </dependency>

3.定義一個簡單的Spring Batch作業

我們將構建一個工作,從CSV文件導入咖啡清單,使用自定義處理器對其進行轉換,並將最終結果存儲在內存數據庫中

3.1。入門

讓我們從定義應用程序入口點開始:

@SpringBootApplication

 public class SpringBootBatchProcessingApplication {



 public static void main(String[] args) {

 SpringApplication.run(SpringBootBatchProcessingApplication.class, args);

 }

 }

如我們所見,這是一個標準的Spring Boot應用程序。由於我們希望在可能的情況下使用默認配置值,因此我們將使用一組非常簡單的應用程序配置屬性。

我們將在src/main/resources/application.properties文件中定義以下屬性:

file.input=coffee-list.csv

此屬性包含我們輸入的咖啡清單的位置。每行都包含我們咖啡的品牌,產地和一些特徵:

Blue Mountain,Jamaica,Fruity

 Lavazza,Colombia,Strong

 Folgers,America,Smokey

我們將看到,這是一個平面CSV文件,這意味著Spring可以在不進行任何特殊自定義的情況下對其進行處理。

接下來,我們將添加一個SQL腳本schema-all.sql來創建我們的coffee桌來存儲數據:

DROP TABLE coffee IF EXISTS;



 CREATE TABLE coffee (

 coffee_id BIGINT IDENTITY NOT NULL PRIMARY KEY,

 brand VARCHAR(20),

 origin VARCHAR(20),

 characteristics VARCHAR(30)

 );

通常,Spring Boot會在啟動過程中自動運行此腳本

3.2。咖啡領域類

隨後,我們將需要一個簡單的域類來保存我們的咖啡項目:

public class Coffee {



 private String brand;

 private String origin;

 private String characteristics;



 public Coffee(String brand, String origin, String characteristics) {

 this.brand = brand;

 this.origin = origin;

 this.characteristics = characteristics;

 }



 // getters and setters

 }

如前所述,我們的Coffee對象包含三個屬性:

  • 一個品牌
  • 起源
  • 一些其他特徵

4.作業配置

現在,在關鍵組件上,我們的工作配置。我們將逐步進行,建立我們的配置並解釋其中的每個部分:

@Configuration

 @EnableBatchProcessing

 public class BatchConfiguration {



 @Autowired

 public JobBuilderFactory jobBuilderFactory;



 @Autowired

 public StepBuilderFactory stepBuilderFactory;



 @Value("${file.input}")

 private String fileInput;



 // ...

 }

首先,我們從標準的Spring @Configuration類開始。接下來,我們在類中@EnableBatchProcessing值得注意的是,這使我們可以訪問許多支持工作的有用bean,並可以節省很多日常工作。

此外,使用此批註還使我們能夠訪問兩個有用的工廠,稍後將在構建作業配置和作業步驟時使用它們。

對於初始配置的最後一部分,我們包括對先前聲明file.input

4.1。我們工作的讀者和作家

現在,我們可以繼續在我們的配置中定義一個閱讀器bean:

@Bean

 public FlatFileItemReader reader() {

 return new FlatFileItemReaderBuilder().name("coffeeItemReader")

 .resource(new ClassPathResource(fileInput))

 .delimited()

 .names(new String[] { "brand", "origin", "characteristics" })

 .fieldSetMapper(new BeanWrapperFieldSetMapper() {{

 setTargetType(Coffee.class);

 }})

 .build();

 }

簡而言之,上面定義的閱讀器bean將查找名為coffee-list.csv的文件,並將每個訂單項解析為Coffee對象

同樣,我們定義一個writer bean:

@Bean

 public JdbcBatchItemWriter writer(DataSource dataSource) {

 return new JdbcBatchItemWriterBuilder()

 .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())

 .sql("INSERT INTO coffee (brand, origin, characteristics) VALUES (:brand, :origin, :characteristics)")

 .dataSource(dataSource)

 .build();

 }

這次,我們包含了將單個咖啡項目插入到數據庫中所需的SQL語句,該語句由Coffee對象的Java bean屬性驅動。方便地, dataSource @EnableBatchProcessing註釋自動創建的

4.2。把我們的工作放在一起

最後,我們需要添加實際的作業步驟和配置:

@Bean

 public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {

 return jobBuilderFactory.get("importUserJob")

 .incrementer(new RunIdIncrementer())

 .listener(listener)

 .flow(step1)

 .end()

 .build();

 }



 @Bean

 public Step step1(JdbcBatchItemWriter writer) {

 return stepBuilderFactory.get("step1")

 .<Coffee, Coffee> chunk(10)

 .reader(reader())

 .processor(processor())

 .writer(writer)

 .build();

 }



 @Bean

 public CoffeeItemProcessor processor() {

 return new CoffeeItemProcessor();

 }

如我們所見,我們的工作相對簡單,由step1方法中定義的一個步驟組成。

讓我們看一下此步驟正在執行的操作:

  • 首先,我們配置步驟,以便使用chunk(10)聲明一次最多寫入十條記錄
  • 然後,我們使用讀取器bean讀取咖啡數據,該讀取器bean是使用reader方法設置的
  • 接下來,我們將每個咖啡項目傳遞給自定義處理器,在其中應用一些自定義業務邏輯
  • 最後,我們使用之前看到的編寫器將每個咖啡項目寫入數據庫

另一方面,我們的importUserJob包含我們的作業定義,其中包含使用內置RunIdIncrementer類的ID。我們還設置了一個JobCompletionNotificationListener,用來在作業完成時得到通知

為了完成我們的作業配置,我們列出了每個步驟(儘管此作業只有一個步驟)。現在,我們完成了完美的配置!

5.定制咖啡處理器

讓我們詳細了解一下我們先前在作業配置中定義的自定義處理器:

public class CoffeeItemProcessor implements ItemProcessor<Coffee, Coffee> {



 private static final Logger LOGGER = LoggerFactory.getLogger(CoffeeItemProcessor.class);



 @Override

 public Coffee process(final Coffee coffee) throws Exception {

 String brand = coffee.getBrand().toUpperCase();

 String origin = coffee.getOrigin().toUpperCase();

 String chracteristics = coffee.getCharacteristics().toUpperCase();



 Coffee transformedCoffee = new Coffee(brand, origin, chracteristics);

 LOGGER.info("Converting ( {} ) into ( {} )", coffee, transformedCoffee);



 return transformedCoffee;

 }

 }

特別感興趣的是, ItemProcessor接口為我們提供了一種在作業執行過程中應用某些特定業務邏輯的機制。

為了簡單**CoffeeItemProcessor ,我們定義了CoffeeItemProcessor,它接受一個輸入Coffee對象並將每個屬性轉換為uppercase** 。

6.工作完成

此外,我們還將編寫一個JobCompletionNotificationListener以在工作完成時提供一些反饋:

@Override

 public void afterJob(JobExecution jobExecution) {

 if (jobExecution.getStatus() == BatchStatus.COMPLETED) {

 LOGGER.info("!!! JOB FINISHED! Time to verify the results");



 String query = "SELECT brand, origin, characteristics FROM coffee";

 jdbcTemplate.query(query, (rs, row) -> new Coffee(rs.getString(1), rs.getString(2), rs.getString(3)))

 .forEach(coffee -> LOGGER.info("Found < {} > in the database.", coffee));

 }

 }

在上面的示例中,我們重寫afterJob方法,並檢查作業是否成功完成。此外,我們運行一個簡單查詢以檢查每個咖啡項目是否已成功存儲在數據庫中

7.做好工作

現在我們已經準備就緒,可以開始工作了,這是有趣的部分。讓我們繼續工作吧:

...

 17:41:16.336 [main] INFO cbbJobCompletionNotificationListener -

 !!! JOB FINISHED! Time to verify the results

 17:41:16.336 [main] INFO cbbJobCompletionNotificationListener -

 Found < Coffee [brand=BLUE MOUNTAIN, origin=JAMAICA, characteristics=FRUITY] > in the database.

 17:41:16.337 [main] INFO cbbJobCompletionNotificationListener -

 Found < Coffee [brand=LAVAZZA, origin=COLOMBIA, characteristics=STRONG] > in the database.

 17:41:16.337 [main] INFO cbbJobCompletionNotificationListener -

 Found < Coffee [brand=FOLGERS, origin=AMERICA, characteristics=SMOKEY] > in the database.

 ...

如我們所見,我們的工作成功運行,每項咖啡都按預期存儲在數據庫中

8.結論

在本文中,我們學習瞭如何使用Spring Boot創建一個簡單的Spring Batch作業。首先,我們從定義一些基本配置開始。

然後,我們看到瞭如何添加文件讀取器和數據庫寫入器。最後,我們看瞭如何應用一些自定義處理並檢查我們的作業是否成功執行。