Spring Batch 中一個讀取器具有多個處理器和寫入器
1. 簡介
在本教程中,我們將探討如何使用一個讀取器、多個處理器和多個寫入器來實作 Spring Batch 作業。當我們需要讀取一次數據,以不同的方式處理數據,然後將結果寫入多個目標時,這種方法非常有用。
2. 設定 Spring Batch 項目
在開始之前,我們需要在pom.xml
檔案中包含Spring Boot Starter Batch和Spring Boot Starter Data JPA依賴項:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<version>3.5.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
<version>3.4.2</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
這些依賴項引入了 Spring Batch 用於我們的作業處理、Spring Data JPA 用於資料庫操作以及 H2 作為用於開發目的的記憶體資料庫。
2.1. 準備輸入 CSV 文件
在實作批次元件之前,我們需要處理樣本資料。讓我們建立一個名為customers.csv
的簡單CSV文件,其內容如下:
id,name,email,type
1,John,[email protected],A
2,Alice,[email protected],B
3,Bob,[email protected],A
4,Eve,[email protected],B
該文件包含四個欄位的客戶記錄:唯一識別碼、姓名、電子郵件地址以及用於確定處理路徑的類型標識。我們將此檔案儲存在專案的src/main/resources
目錄中。
2.2. 建立資料模型
我們的批次作業需要一個 Java 類別來表示 CSV 檔案中的客戶資料。讓我們建立一個映射到資料庫表的Customer
實體類別:
@Entity
public class Customer {
@Id
private Long id;
private String name;
private String email;
private String type;
// Constructors, getters, and setters
}
3.實作CSV讀取器
現在,我們可以建立從 CSV 檔案讀取記錄的元件。 Spring Batch 透過FlatFileItemReader
類別為平面檔案讀取提供了出色的支援:
@Bean
public FlatFileItemReader<Customer> customerReader() {
return new FlatFileItemReaderBuilder<Customer>()
.name("customerItemReader")
.resource(new ClassPathResource("customers.csv"))
.delimited()
.names("id", "name", "email", "type")
.fieldSetMapper(new BeanWrapperFieldSetMapper<Customer>() {{
setTargetType(Customer.class);
}})
.build();
}
此配置建立了一個讀取器,用於逐行解析 CSV 文件,並將每筆記錄對應到Customer
物件。讀取器會自動處理檔案的開啟和關閉,並以分塊方式處理數據,從而提高記憶體效率。
names()
方法中指定的欄位名稱必須與我們的 CSV 標頭和我們的Customer
類別屬性相符。
4.建立條件處理器
我們將創建兩個獨立的處理器和一個路由機制來在它們之間進行選擇。每個處理器都實作了 Spring Batch 的ItemProcessor
接口,該接口定義了一個方法process()
,用於在寫入輸入資料之前對其進行轉換:
public class TypeAProcessor implements ItemProcessor<Customer, Customer> {
@Override
public Customer process(Customer customer) {
customer.setName(customer.getName().toUpperCase());
customer.setEmail("A_" + customer.getEmail());
return customer;
}
}
public class TypeBProcessor implements ItemProcessor<Customer, Customer> {
@Override
public Customer process(Customer customer) {
customer.setName(customer.getName().toLowerCase());
customer.setEmail("B_" + customer.getEmail());
return customer;
}
}
TypeAProcessor
處理A
類客戶,將客戶的姓名轉換為大寫,並在其電子郵件地址前面加上前綴。 process process()
方法接受一個Customer
對象,對其進行轉換,然後返回修改後的版本。
對於B
類型的客戶, TypeBProcessor
會套用不同的轉換方式,將姓名轉換為小寫,並使用不同的email
前綴。這兩個處理器都實作了相同的ItemProcessor
接口,因此它們在我們的處理流程中可以互換。
5. 實作處理器路由器
為了將我們的處理器連接到適當的記錄,我們需要一個檢查每個客戶的類型欄位的路由機制:
public class CustomerProcessorRouter implements ItemProcessor<Customer, Customer> {
private final TypeAProcessor typeAProcessor;
private final TypeBProcessor typeBProcessor;
public CustomerProcessorRouter(TypeAProcessor typeAProcessor,
TypeBProcessor typeBProcessor) {
this.typeAProcessor = typeAProcessor;
this.typeBProcessor = typeBProcessor;
}
@Override
public Customer process(Customer customer) throws Exception {
if ("A".equals(customer.getType())) {
return typeAProcessor.process(customer);
} else if ("B".equals(customer.getType())) {
return typeBProcessor.process(customer);
}
return customer;
}
}
我們的路由器類別會檢查每個傳入的Customer
對象,並根據其type
欄位將其委託給相應的處理器。這種設計使我們的處理邏輯清晰分離,同時在作業定義中保持單一的處理步驟。
6. 配置多個寫入器
在根據資料類型進行不同處理後,我們希望將結果寫入多個目標。我們將實作資料庫寫入器和平面文件寫入器。
6.1. 資料庫寫入器配置
我們首先建立處理所有 JPA 操作的資料庫寫入器元件:
@Bean
public JpaItemWriter<Customer> dbWriter(EntityManagerFactory entityManagerFactory) {
JpaItemWriter<Customer> writer = new JpaItemWriter<>();
writer.setEntityManagerFactory(entityManagerFactory);
return writer;
}
此寫入器使用 JPA 將處理後的Customer
物件持久化到資料庫中。在作業執行期間,此JpaItemWriter
會將處理後的Customer
物件持久化到已配置的資料庫中,並處理所有必要的 JPA 操作,包括插入和更新。
6.2. 平面檔案寫入器配置
對於我們的輔助輸出目的地,我們實作了一個產生 CSV 檔案的平面檔案寫入器:
@Bean
public FlatFileItemWriter<Customer> fileWriter() {
return new FlatFileItemWriterBuilder<Customer>()
.name("customerItemWriter")
.resource(new FileSystemResource("output/processed_customers.txt"))
.delimited()
.delimiter(",")
.names("id", "name", "email", "type")
.build();
}
FlatFileItemWriter
配置為使用逗號分隔符,並包含與Customer
實體屬性相符的明確欄位命名。在作業執行期間,此寫入器將建立一個結構化的 CSV 文件,其中包含指定格式的所有已處理的客戶記錄。
6.3. 在複合編寫器中組合組件
為了同時寫入兩個目的地,我們將使用 Spring Batch 的CompositeItemWriter
:
@Bean
public CompositeItemWriter<Customer> compositeWriter(
JpaItemWriter<Customer> dbWriter,
FlatFileItemWriter<Customer> fileWriter) {
CompositeItemWriter<Customer> writer = new CompositeItemWriter<>();
writer.setDelegates(List.of(dbWriter, fileWriter));
return writer;
}
此複合寫入器充當寫入器的委託,確保每個已處理的項目都寫入所有目的地。委託的順序決定了寫入的順序。
7. 設定Step
和Job
現在,讓我們透過建立步驟和作業配置將所有內容整合在一起:
@Bean
public Job processCustomersJob(JobBuilderFactory jobs,
StepBuilderFactory steps,
FlatFileItemReader<Customer> reader,
CustomerProcessorRouter processor,
CompositeItemWriter<Customer> writer) {
Step step = steps.get("processCustomersStep")
.<Customer, Customer>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
return jobs.get("customerProcessingJob")
.start(step)
.build();
}
此作業配置定義了一個步驟,以 10 個區塊為單位讀取客戶訊息,透過我們的路由器處理每個客戶信息,然後將結果寫入資料庫和平面檔案。
8. 運行並測試作業
為了驗證我們的批次作業是否按預期工作,讓我們編寫一個整合測試來啟動作業並針對不同客戶類型斷言資料庫和輸出文件結果:
List<Customer> dbCustomers = jdbcTemplate.query(
"SELECT id, name, email, type FROM customer WHERE type = 'A'",
(rs, rowNum) -> new Customer(
rs.getLong("id"),
rs.getString("name"),
rs.getString("email"),
rs.getString("type"))
);
assertFalse(dbCustomers.isEmpty());
dbCustomers.forEach(c -> {
assertEquals(c.getName(), c.getName().toUpperCase());
assertTrue(c.getEmail().startsWith("A_"));
});
Path outputFile = Paths.get("output/processed_customers.txt");
assertTrue(Files.exists(outputFile));
List<String> lines = Files.readAllLines(outputFile);
boolean hasTypeB = lines.stream().anyMatch(line -> line.endsWith(",B"));
assertTrue(hasTypeB);
lines.forEach(line -> {
String[] parts = line.split(",");
if ("B".equals(parts[3])) {
assertEquals(parts[1], parts[1].toLowerCase());
assertTrue(parts[2].startsWith("B_"));
}
});
從測試案例中,我們首先查詢資料庫,檢查A
類型的客戶是否已儲存,其姓名是否已轉換為大寫,且其電子郵件是否以「A_」為前綴。接下來,我們也讀取輸出文件,以確認B
類型的客戶是否已將其姓名轉換為小寫,且其電子郵件是否以「B_」為前綴。
9. 結論
在本文中,我們學習如何使用單一讀取器、多個處理器和寫入器來設定 Spring Batch 作業。我們從 CSV 檔案讀取數據,根據每筆記錄的內容將其路由到特定的處理器,最後將寫入任務委託給多個寫入器。
與往常一樣,原始碼可在 GitHub 上取得。