具有Spring Batch的Spring Boot
- Spring
- Spring Boot
- Spring Batch
1.概述
Spring Batch是用於開發強大的批處理應用程序的強大框架。在之前的教程中,我們介紹了Spring Batch。
在本教程中,我們將在上一個教程的基礎上,學習如何使用Spring Boot設置和創建基本的批處理驅動的應用程序。
2. Maven依賴
首先,讓我們將spring-boot-starter-batch
到我們的pom.xml
:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<version>2.4.0</version>
</dependency>
我們還將添加org.hsqldb
依賴關係,該依賴關係也可以從Maven Central獲得:
<dependency>
<groupId>org.hsqldb</groupId>
<artifactId>hsqldb</artifactId>
<version>2.5.1</version>
<scope>runtime</scope>
</dependency>
3.定義一個簡單的Spring Batch作業
我們將構建一個工作,從CSV文件導入咖啡清單,使用自定義處理器對其進行轉換,並將最終結果存儲在內存數據庫中。
3.1。入門
讓我們從定義應用程序入口點開始:
@SpringBootApplication
public class SpringBootBatchProcessingApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootBatchProcessingApplication.class, args);
}
}
如我們所見,這是一個標準的Spring Boot應用程序。由於我們希望在可能的情況下使用默認配置值,因此我們將使用一組非常簡單的應用程序配置屬性。
我們將在src/main/resources/application.properties
文件中定義以下屬性:
file.input=coffee-list.csv
此屬性包含我們輸入的咖啡清單的位置。每行都包含我們咖啡的品牌,產地和一些特徵:
Blue Mountain,Jamaica,Fruity
Lavazza,Colombia,Strong
Folgers,America,Smokey
我們將看到,這是一個平面CSV文件,這意味著Spring可以在不進行任何特殊自定義的情況下對其進行處理。
接下來,我們將添加一個SQL腳本schema-all.sql
來創建我們的coffee
桌來存儲數據:
DROP TABLE coffee IF EXISTS;
CREATE TABLE coffee (
coffee_id BIGINT IDENTITY NOT NULL PRIMARY KEY,
brand VARCHAR(20),
origin VARCHAR(20),
characteristics VARCHAR(30)
);
通常,Spring Boot會在啟動過程中自動運行此腳本。
3.2。咖啡領域類
隨後,我們將需要一個簡單的域類來保存我們的咖啡項目:
public class Coffee {
private String brand;
private String origin;
private String characteristics;
public Coffee(String brand, String origin, String characteristics) {
this.brand = brand;
this.origin = origin;
this.characteristics = characteristics;
}
// getters and setters
}
如前所述,我們的Coffee
對象包含三個屬性:
- 一個品牌
- 起源
- 一些其他特徵
4.作業配置
現在,在關鍵組件上,我們的工作配置。我們將逐步進行,建立我們的配置並解釋其中的每個部分:
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Value("${file.input}")
private String fileInput;
// ...
}
首先,我們從標準的Spring @Configuration
類開始。接下來,我們在類中@EnableBatchProcessing
值得注意的是,這使我們可以訪問許多支持工作的有用bean,並可以節省很多日常工作。
此外,使用此批註還使我們能夠訪問兩個有用的工廠,稍後將在構建作業配置和作業步驟時使用它們。
對於初始配置的最後一部分,我們包括對先前聲明file.input
4.1。我們工作的讀者和作家
現在,我們可以繼續在我們的配置中定義一個閱讀器bean:
@Bean
public FlatFileItemReader reader() {
return new FlatFileItemReaderBuilder().name("coffeeItemReader")
.resource(new ClassPathResource(fileInput))
.delimited()
.names(new String[] { "brand", "origin", "characteristics" })
.fieldSetMapper(new BeanWrapperFieldSetMapper() {{
setTargetType(Coffee.class);
}})
.build();
}
簡而言之,上面定義的閱讀器bean將查找名為coffee-list.csv
的文件,並將每個訂單項解析為Coffee
對象。
同樣,我們定義一個writer bean:
@Bean
public JdbcBatchItemWriter writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO coffee (brand, origin, characteristics) VALUES (:brand, :origin, :characteristics)")
.dataSource(dataSource)
.build();
}
這次,我們包含了將單個咖啡項目插入到數據庫中所需的SQL語句,該語句由Coffee
對象的Java bean屬性驅動。方便地, dataSource
@EnableBatchProcessing
註釋自動創建的。
4.2。把我們的工作放在一起
最後,我們需要添加實際的作業步驟和配置:
@Bean
public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
return jobBuilderFactory.get("importUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step1)
.end()
.build();
}
@Bean
public Step step1(JdbcBatchItemWriter writer) {
return stepBuilderFactory.get("step1")
.<Coffee, Coffee> chunk(10)
.reader(reader())
.processor(processor())
.writer(writer)
.build();
}
@Bean
public CoffeeItemProcessor processor() {
return new CoffeeItemProcessor();
}
如我們所見,我們的工作相對簡單,由step1
方法中定義的一個步驟組成。
讓我們看一下此步驟正在執行的操作:
- 首先,我們配置步驟,以便使用
chunk(10)
聲明一次最多寫入十條記錄 - 然後,我們使用讀取器bean讀取咖啡數據,該讀取器bean是使用
reader
方法設置的 - 接下來,我們將每個咖啡項目傳遞給自定義處理器,在其中應用一些自定義業務邏輯
- 最後,我們使用之前看到的編寫器將每個咖啡項目寫入數據庫
另一方面,我們的importUserJob
包含我們的作業定義,其中包含使用內置RunIdIncrementer
類的ID。我們還設置了一個JobCompletionNotificationListener,
用來在作業完成時得到通知。
為了完成我們的作業配置,我們列出了每個步驟(儘管此作業只有一個步驟)。現在,我們完成了完美的配置!
5.定制咖啡處理器
讓我們詳細了解一下我們先前在作業配置中定義的自定義處理器:
public class CoffeeItemProcessor implements ItemProcessor<Coffee, Coffee> {
private static final Logger LOGGER = LoggerFactory.getLogger(CoffeeItemProcessor.class);
@Override
public Coffee process(final Coffee coffee) throws Exception {
String brand = coffee.getBrand().toUpperCase();
String origin = coffee.getOrigin().toUpperCase();
String chracteristics = coffee.getCharacteristics().toUpperCase();
Coffee transformedCoffee = new Coffee(brand, origin, chracteristics);
LOGGER.info("Converting ( {} ) into ( {} )", coffee, transformedCoffee);
return transformedCoffee;
}
}
特別感興趣的是, ItemProcessor
接口為我們提供了一種在作業執行過程中應用某些特定業務邏輯的機制。
為了簡單**CoffeeItemProcessor
,我們定義了CoffeeItemProcessor,它接受一個輸入Coffee
對象並將每個屬性轉換為uppercase** 。
6.工作完成
此外,我們還將編寫一個JobCompletionNotificationListener
以在工作完成時提供一些反饋:
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
LOGGER.info("!!! JOB FINISHED! Time to verify the results");
String query = "SELECT brand, origin, characteristics FROM coffee";
jdbcTemplate.query(query, (rs, row) -> new Coffee(rs.getString(1), rs.getString(2), rs.getString(3)))
.forEach(coffee -> LOGGER.info("Found < {} > in the database.", coffee));
}
}
在上面的示例中,我們重寫afterJob
方法,並檢查作業是否成功完成。此外,我們運行一個簡單查詢以檢查每個咖啡項目是否已成功存儲在數據庫中。
7.做好工作
現在我們已經準備就緒,可以開始工作了,這是有趣的部分。讓我們繼續工作吧:
...
17:41:16.336 [main] INFO cbbJobCompletionNotificationListener -
!!! JOB FINISHED! Time to verify the results
17:41:16.336 [main] INFO cbbJobCompletionNotificationListener -
Found < Coffee [brand=BLUE MOUNTAIN, origin=JAMAICA, characteristics=FRUITY] > in the database.
17:41:16.337 [main] INFO cbbJobCompletionNotificationListener -
Found < Coffee [brand=LAVAZZA, origin=COLOMBIA, characteristics=STRONG] > in the database.
17:41:16.337 [main] INFO cbbJobCompletionNotificationListener -
Found < Coffee [brand=FOLGERS, origin=AMERICA, characteristics=SMOKEY] > in the database.
...
如我們所見,我們的工作成功運行,每項咖啡都按預期存儲在數據庫中。
8.結論
在本文中,我們學習瞭如何使用Spring Boot創建一個簡單的Spring Batch作業。首先,我們從定義一些基本配置開始。
然後,我們看到瞭如何添加文件讀取器和數據庫寫入器。最後,我們看瞭如何應用一些自定義處理並檢查我們的作業是否成功執行。