在 Spring Batch 中從 ItemReader 存取作業參數
1. 概述
Spring Batch 是 Java 中用於批次的強大框架,因此使其成為資料處理活動和排程作業運行的熱門選擇。根據業務邏輯的複雜性,作業可以依賴不同的配置值和動態參數。
在本文中,我們將探討如何使用JobParameters
以及如何從基本批次元件存取它們。
2. 演示設定
我們將為藥局服務開發 Spring Batch 。主要業務任務是找到即將過期的藥品,根據銷售量計算新的價格,並通知消費者即將過期的藥品。此外,我們將從記憶體中的 H2 資料庫中讀取資料並將所有處理詳細資訊寫入日誌以簡化實作。
2.1.依賴關係
要開始示範應用程序,我們需要新增 Spring Batch 和 H2 依賴項:
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>2.2.224</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<version>3.2.0</version>
</dependency>
我們可以在 Maven Central 儲存庫中找到最新的H2和Spring Batch版本。
2.2.準備測試數據
讓我們先在schema-all.sql
中定義模式:
DROP TABLE medicine IF EXISTS;
CREATE TABLE medicine (
med_id VARCHAR(36) PRIMARY KEY,
name VARCHAR(30),
type VARCHAR(30),
expiration_date TIMESTAMP,
original_price DECIMAL,
sale_price DECIMAL
);
data.sql
中提供了初步測試資料:
INSERT INTO medicine VALUES ('ec278dd3-87b9-4ad1-858f-dfe5bc34bdb5', 'Lidocaine', 'ANESTHETICS', DATEADD('DAY', 120, CURRENT_DATE), 10, null);
INSERT INTO medicine VALUES ('9d39321d-34f3-4eb7-bb9a-a69734e0e372', 'Flucloxacillin', 'ANTIBACTERIALS', DATEADD('DAY', 40, CURRENT_DATE), 20, null);
INSERT INTO medicine VALUES ('87f4ff13-de40-4c7f-95db-627f309394dd', 'Amoxicillin', 'ANTIBACTERIALS', DATEADD('DAY', 70, CURRENT_DATE), 30, null);
INSERT INTO medicine VALUES ('acd99d6a-27be-4c89-babe-0edf4dca22cb', 'Prozac', 'ANTIDEPRESSANTS', DATEADD('DAY', 30, CURRENT_DATE), 40, null);
Spring Boot 在應用程式啟動過程中執行這些文件,我們將在測試執行中使用這些測試資料。
2.3. Medicine
領域類
對於我們的服務,我們需要一個簡單的Medicine
實體類別:
@AllArgsConstructor
@Data
public class Medicine {
private UUID id;
private String name;
private MedicineCategory type;
private Timestamp expirationDate;
private Double originalPrice;
private Double salePrice;
}
ItemReader
使用expirationDate
欄位來計算藥物是否即將過期。當藥物接近失效日期時, ItemProcessor
將更新salePrice
欄位。
2.4.應用程式屬性
應用程式需要src/main/resources/application.properties
檔案中的多個屬性:
spring.batch.job.enabled=false
batch.medicine.cron=0 */1 * * * *
batch.medicine.alert_type=LOGS
batch.medicine.expiration.default.days=60
batch.medicine.start.sale.default.days=45
batch.medicine.sale=0.1
由於我們只配置一項作業,因此spring.batch.job.enabled
應設定為false
以停用初始作業執行。預設情況下,Spring 在上下文啟動後使用空參數運行作業:
[main] INFO osbabJobLauncherApplicationRunner - Running default command line with: []
batch.medicine.cron
屬性定義計劃運行的 cron 表達式。根據定義的場景,我們應該每天執行該作業。然而,在我們的例子中,作業每分鐘都會啟動,以便能夠輕鬆檢查處理行為。
InputReader
、 InputProcessor
和InpurWriter
需要其他屬性來執行業務邏輯。
3. 作業參數
Spring Batch 包含一個JobParameters
類,旨在儲存特定作業執行的執行時間參數。事實證明,此功能在各種情況下都很有用。例如,它允許傳遞特定運行期間產生的動態變數。此外,它還可以建立一個控制器,該控制器可以根據客戶端提供的參數啟動作業。
在我們的場景中,我們將利用此類來保存應用程式參數和動態運行時參數。
3.1. StepScope
和JobScope
除了常規 Spring 中眾所周知的 bean 作用域之外, Spring Batch 還引入了兩個額外的作用域: StepScope
和JobScope
。有了這些範圍,就可以為工作流程中的每個步驟或作業建立唯一的 bean。 Spring 確保與特定步驟/作業關聯的資源在其整個生命週期中被隔離和獨立管理。
有了這個功能,我們可以輕鬆控制上下文,並在特定運行的讀取、處理和寫入部分之間共享所有所需的屬性。為了能夠注入作業參數,我們需要使用@StepScope
或@JobScope
註解所依賴的 bean。
3.2.在計劃執行中填入作業參數
讓我們定義the MedExpirationBatchRunner
類,它將透過 cron 表達式啟動我們的工作(在我們的例子中每 1 分鐘一次)。我們應該用@EnableScheduling
來註解該類別並定義適當的@Scheduled
入口方法:
@Component
@EnableScheduling
public class MedExpirationBatchRunner {
...
@Scheduled(cron = "${batch.medicine.cron}", zone = "GMT")
public void runJob() {
ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
launchJob(now);
}
}
由於我們想要手動啟動作業,因此我們應該使用JobLaucher
類別並在JobLauncher#
run()
方法中提供填入的JobParameter
。在我們的範例中,我們提供了application.properties
中的值以及兩個特定於運行的參數(觸發作業的日期和追蹤 ID):
public void launchJob(ZonedDateTime triggerZonedDateTime) {
try {
JobParameters jobParameters = new JobParametersBuilder()
.addString(BatchConstants.TRIGGERED_DATE_TIME, triggerZonedDateTime.toString())
.addString(BatchConstants.ALERT_TYPE, alertType)
.addLong(BatchConstants.DEFAULT_EXPIRATION, defaultExpiration)
.addLong(BatchConstants.SALE_STARTS_DAYS, saleStartDays)
.addDouble(BatchConstants.MEDICINE_SALE, medicineSale)
.addString(BatchConstants.TRACE_ID, UUID.randomUUID().toString())
.toJobParameters();
jobLauncher.run(medExpirationJob, jobParameters);
} catch (Exception e) {
log.error("Failed to run", e);
}
}
配置參數後,我們有幾個選擇如何在程式碼中使用這些值。
3.3.讀取Bean定義中的作業參數
使用 SpEL,我們可以從組態類別中的 bean 定義存取作業參數。 Spring 將所有參數組合到常規String
到Object
映射中:
@Bean
@StepScope
public MedicineProcessor medicineProcessor(@Value("#{jobParameters}") Map<String, Object> jobParameters) {
...
}
在這個方法內部,我們將使用jobParameters
來啟動MedicineProcessor.
3.4.直接讀取Service中的作業參數
另一個選擇是在ItemReader
本身中使用 setter 注入。我們可以像從任何其他映射中一樣透過 SpEL 表達式獲取確切的參數值:
@Setter
public class ExpiresSoonMedicineReader extends AbstractItemCountingItemStreamItemReader<Medicine> {
@Value("#{jobParameters['DEFAULT_EXPIRATION']}")
private long defaultExpiration;
}
我們只需要確保SpEL中使用的key與參數初始化時使用的key相同即可。
3.5.透過Before Step讀取作業參數
Spring Batch 提供了StepExecutionListener
接口,讓我們可以監聽步驟執行階段:步驟開始之前和步驟完成後。我們可以利用此功能,在步驟開始之前存取屬性,並執行任何自訂邏輯。最簡單的方法就是使用@BeforeStep
註釋,它對應於StepExecutionListener
中的beforeStep()
方法:
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
JobParameters parameters = stepExecution.getJobExecution()
.getJobParameters();
...
log.info("Before step params: {}", parameters);
}
4. 作業配置
讓我們把所有的部分結合起來看一下全貌。
讀取器、處理器和寫入器需要兩個屬性: BatchConstants.TRIGGERED_DATE_TIME
和BatchConstants.TRACE_ID.
我們將對所有步驟 bean 定義中的公共參數使用相同的提取邏輯:
private void enrichWithJobParameters(Map<String, Object> jobParameters, ContainsJobParameters container) {
if (jobParameters.get(BatchConstants.TRIGGERED_DATE_TIME) != null) {
container.setTriggeredDateTime(ZonedDateTime.parse(jobParameters.get(BatchConstants.TRIGGERED_DATE_TIME)
.toString()));
}
if (jobParameters.get(BatchConstants.TRACE_ID) != null) {
container.setTraceId(jobParameters.get(BatchConstants.TRACE_ID).toString());
}
}
總的來說,其他參數是特定於組件的,沒有共同的邏輯。
4.1.配置ItemReader
首先,我們要設定ExpiresSoonMedicineReader
並豐富常用參數:
@Bean
@StepScope
public ExpiresSoonMedicineReader expiresSoonMedicineReader(JdbcTemplate jdbcTemplate, @Value("#{jobParameters}") Map<String, Object> jobParameters) {
ExpiresSoonMedicineReader medicineReader = new ExpiresSoonMedicineReader(jdbcTemplate);
enrichWithJobParameters(jobParameters, medicineReader);
return medicineReader;
}
讓我們仔細看看閱讀器的具體實作。 TriggeredDateTime
和traceId
參數在bean建置期間直接注入, defaultExpiration
參數由Spring透過setter注入。為了演示,我們在doOpen()
方法中使用了所有這些:
public class ExpiresSoonMedicineReader extends AbstractItemCountingItemStreamItemReader<Medicine> implements ContainsJobParameters {
private ZonedDateTime triggeredDateTime;
private String traceId;
@Value("#{jobParameters['DEFAULT_EXPIRATION']}")
private long defaultExpiration;
private List<Medicine> expiringMedicineList;
...
@Override
protected void doOpen() {
expiringMedicineList = jdbcTemplate.query(FIND_EXPIRING_SOON_MEDICINE, ps -> ps.setLong(1, defaultExpiration), (rs, row) -> getMedicine(rs));
log.info("Trace = {}. Found {} meds that expires soon", traceId, expiringMedicineList.size());
if (!expiringMedicineList.isEmpty()) {
setMaxItemCount(expiringMedicineList.size());
}
}
@PostConstruct
public void init() {
setName(ClassUtils.getShortName(getClass()));
}
}
ItemReader
不應標記為@Component.
另外,我們需要呼叫setName()
方法來設定所需的讀者名稱。
4.2.配置ItemProcessor
和ItemWriter
ItemProcessor
和ItemWriter
遵循與ItemReader
相同的方法。因此它們不需要任何特定配置來存取參數。 bean定義邏輯透過enrichWithJobParameters()
方法初始化公用參數。其他由單一類別使用且不需要在所有組件中填充的參數,由 Spring 透過在相應類別中進行 setter 注入來豐富。
我們應該使用@StepScope
註解來標記所有依賴屬性的 bean。否則,Spring 將僅在上下文啟動時建立一次 bean,並且不會注入參數值。
4.3.配置完整流程
我們不需要採取任何特定操作來使用參數配置作業。因此我們只需要組合所有的bean:
@Bean
public Job medExpirationJob(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
MedicineWriter medicineWriter,
MedicineProcessor medicineProcessor,
ExpiresSoonMedicineReader expiresSoonMedicineReader) {
Step notifyAboutExpiringMedicine = new StepBuilder("notifyAboutExpiringMedicine", jobRepository).<Medicine, Medicine>chunk(10)
.reader(expiresSoonMedicineReader)
.processor(medicineProcessor)
.writer(medicineWriter)
.faultTolerant()
.transactionManager(transactionManager)
.build();
return new JobBuilder("medExpirationJob", jobRepository)
.incrementer(new RunIdIncrementer())
.start(notifyAboutExpiringMedicine)
.build();
}
5. 運行應用程式
讓我們運行一個完整的範例,看看應用程式如何使用所有參數。我們需要從SpringBatchExpireMedicationApplication
類別啟動 Spring Boot 應用程式。
一旦計劃的方法執行,Spring 就會記錄所有參數:
INFO osbclsupport.SimpleJobLauncher - Job: [SimpleJob: [name=medExpirationJob]] launched with the following parameters: [{'SALE_STARTS_DAYS':'{value=45, type=class java.lang.Long, identifying=true}','MEDICINE_SALE':'{value=0.1, type=class java.lang.Double, identifying=true}','TRACE_ID':'{value=e35a26a4-4d56-4dfe-bf36-c1e5f20940a5, type=class java.lang.String, identifying=true}','ALERT_TYPE':'{value=LOGS, type=class java.lang.String, identifying=true}','TRIGGERED_DATE_TIME':'{value=2023-12-06T22:36:00.011436600Z, type=class java.lang.String, identifying=true}','DEFAULT_EXPIRATION':'{value=60, type=class java.lang.Long, identifying=true}'}]
首先, ItemReader
寫入根據DEFAULT_EXPIRATION
參數找到的藥物資訊:
INFO cbbjob.ExpiresSoonMedicineReader - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5. Found 2 meds that expires soon
其次, ItemProcessor
使用SALE_STARTS_DAYS
和MEDICINE_SALE
參數來計算新價格:
INFO cbbjob.MedicineProcessor - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5, calculated new sale price 18.0 for medicine 9d39321d-34f3-4eb7-bb9a-a69734e0e372
INFO cbbjob.MedicineProcessor - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5, calculated new sale price 36.0 for medicine acd99d6a-27be-4c89-babe-0edf4dca22cb
最後, ItemWriter
將更新的藥物寫入相同追蹤中的日誌:
INFO cbbjob.MedicineWriter - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5. This medicine is expiring Medicine(id=9d39321d-34f3-4eb7-bb9a-a69734e0e372, name=Flucloxacillin, type=ANTIBACTERIALS, expirationDate=2024-01-16 00:00:00.0, originalPrice=20.0, salePrice=18.0)
INFO cbbjob.MedicineWriter - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5. This medicine is expiring Medicine(id=acd99d6a-27be-4c89-babe-0edf4dca22cb, name=Prozac, type=ANTIDEPRESSANTS, expirationDate=2024-01-06 00:00:00.0, originalPrice=40.0, salePrice=36.0)
INFO cbbjob.MedicineWriter - Finishing job started at 2023-12-07T11:58:00.014430400Z
六,結論
在本文中,我們學習如何在 Spring Batch 中使用作業參數。 ItemReader
、 ItemProcessor
和ItemWriter
可以在 bean 初始化期間手動豐富參數,也可以由 Spring 透過@BeforeStep
或 setter 注入來豐富。
與往常一樣,完整的範例可以在 GitHub 上找到。