批次作業如果要處理的比較好 其實有非常多細節還是要去處理 比如 排程執行的紀錄 資料輸入輸出的統計 每一個 任務的成功失敗 資料流...等等...到 Retry / Skip 的處理.
這邊開發時是使用 Spring Boot 1.5.9 所以搭配的是 Spring Batch 3.0.8 , 現在 SpringBoot 2 發佈出來了, 搭配的是 Spring Batch 4.0.0, 如果開新專案建議直接開 SpringBoot 2 來開發吧, 後面再整理一下 Spring Batch 4.0.0 的寫法.
基本上呢, 就是下圖這樣的架構
最小的 Step 就是由 ItemReader(讀來源) -> ItemProcessor(處理) -> ItemWriter(寫結果) 作為一次的處理動作
複數的 Step 可以組合在一起變一個 大的 Job 就這樣而已
而每一個 Step 的啟動 結束 成功失敗 讀多少筆資料 處理筆數 寫入筆數 都會完整記錄在 資料庫 中, 也可以大概知道排程處理資料的結果.
下面這是啟動的主程式
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.data.jpa.repository.config.EnableJpaAuditing;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import java.util.TimeZone;
@EnableAsync
@EnableScheduling
@EnableJpaAuditing
@EnableBatchProcessing
@SpringBootApplication
public class ImportApplication {
public static void main(String/[/] args) {
TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai"));
ApplicationContext context = SpringApplication.run(ImportApplication.class, args);
}
}
@EnableBatchProcessing
就是啟用 SpringBatch 如果你還沒有 Batch 用的排程資料表, 他則會用你預設的 DataSource 來建立會用到的表格.
接下來介紹程式部分, 下面這支是一個很基本的作業流程配置
package com.ps.batch.schedule.config;
import com.ps.batch.batch.serivce.ImportFinanceService;
import com.ps.batch.constant.ImportFinanceConstant;
import com.ps.batch.dto.batch.ImportFinanceDto;
import com.ps.batch.dto.batch.ImportUserDto;
import com.ps.batch.schedule.BatchJobCompletionListener;
import com.ps.batch.schedule.processor.ImportFinanceDtoProcessor;
import com.ps.batch.schedule.processor.ImportUserDtoProcessor;
import com.ps.batch.schedule.reader.ImportFinanceDtoReader;
import com.ps.batch.schedule.reader.ImportUserDtoReader;
import com.ps.batch.schedule.writer.ImportFinanceDtoWriter;
import com.ps.batch.schedule.writer.ImportUserDtoWriter;
import com.ps.batch.service.JobService;
import com.ps.batch.service.MemberTagService;
import com.ps.batch.service.NotifyService;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.Scheduled;
/**
* 會員打標任務
*/
@Data
@Slf4j
@Configuration
public class BatchJobMemberTag {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private BatchJobCompletionListener batchJobCompletionListener;
@Autowired
private ImportFinanceService importFinanceService;
@Autowired
private MemberTagService memberTagService;
private String jobName = "健身房 会员 打標 作業";
@Value("${membertag.api.notify.mailto}")
private String membertagApiNotifyMailto;
@Autowired
private JobService jobService;
@Autowired
private ImportFinanceDtoReader importFinanceDtoReader;
@Autowired
private ImportFinanceDtoProcessor importFinanceDtoProcessor;
@Autowired
private ImportFinanceDtoWriter importFinanceDtoWriter;
@Autowired
private ImportUserDtoReader importUserDtoReader;
@Autowired
private ImportUserDtoProcessor importUserDtoProcessor;
@Autowired
private ImportUserDtoWriter importUserDtoWriter;
@Autowired
private NotifyService notifyService;
@Autowired
@Qualifier("threadPoolTaskExecutor")
private TaskExecutor taskExecutor;
@Scheduled(initialDelay = 1 * 1000, fixedDelay = 10 * 60 * 1000)
public void jobRun() {
try {
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addLong("time", System.currentTimeMillis());
jobParametersBuilder.addString("jobName", jobService.encodeToBase64Str(jobName));
jobParametersBuilder.addString("mailTo", jobService.encodeToBase64Str(membertagApiNotifyMailto));
JobParameters jobParameters = jobParametersBuilder.toJobParameters();
jobService.runJob(exportMemberTag(), jobParameters, jobName);
} catch (Exception e) {
notifyService.notifyError("異常終止通知", "啟動失敗=" + e.getMessage(), jobName, null);
}
}
public Job exportMemberTag() throws Exception {
return jobBuilderFactory.get("exportTag")
.incrementer(new RunIdIncrementer())
.start(step_exportMemberTag())
.next(step_exportImportFinanceDtoTag())
.listener(batchJobCompletionListener)
.build();
}
public Step step_exportImportFinanceDtoTag() throws Exception {
return stepBuilderFactory.get("exportImportFinanceTag")
.<ImportFinanceDto, ImportFinanceDto>chunk(100)
.reader(importFinanceDtoReader.getUnTaggedImportFinance())
.processor(importFinanceDtoProcessor.sendTag())
.writer(importFinanceDtoWriter.updateTaggedWriter(ImportFinanceConstant.membertagged_tagged))
.taskExecutor(taskExecutor)
.throttleLimit(10)
.build();
}
public Step step_exportMemberTag() throws Exception {
return stepBuilderFactory.get("exportImportUserTag")
.<ImportUserDto, ImportUserDto>chunk(100)
.reader(importUserDtoReader.findUnTaggedImportUser())
.processor(importUserDtoProcessor.sendTag())
.writer(importUserDtoWriter.updateMembertaggedIsTrue())
.taskExecutor(taskExecutor)
.throttleLimit(10)
.build();
}
}
首先從 Scheduled 開始看, 這是 Spring Batch 提供的註解, 可用來配置定時任務, 可以用 cron 或是 initialDelay & fixedDelay 來設定, 另外要注意一點這邊啟動的時候會用著主執行緒一路執行下去, 也就是執行完一個下一次的才會啟動喔.
在啟動排程中, 我建立了 JobParametersBuilder 來傳遞必要的參數在排程中可以存取 例如任務名稱 jobName 跟 發信通知的對象 mailTo,
這邊注意 這些參數 SpringBatch 都會儲存在 batch_job_execution_params 的表格內, 但是如果你放中文的話會儲存失敗造成 Exception, 所以我這邊就稍微編碼成 Base64 再傳送進去
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addLong("time", System.currentTimeMillis());
jobParametersBuilder.addString("jobName", jobService.encodeToBase64Str(jobName));
jobParametersBuilder.addString("mailTo", jobService.encodeToBase64Str(membertagApiNotifyMailto));
JobParameters jobParameters = jobParametersBuilder.toJobParameters();
batch_job_execution_params 表格內的資料大概是像這樣
| JOB_EXECUTION_ID | TYPE_CD | KEY_NAME | STRING_VAL | DATE_VAL | LONG_VAL | DOUBLE_VAL | IDENTIFYING |
|---|---|---|---|---|---|---|---|
| 128992 | LONG | time | 1970-01-01 08:00:00 | 1521009519606 | 0 | Y | |
| 128992 | STRING | jobName | 5YGl6Lqr5oi/IOWinumHjyDkvJrlkZgg5ZCM5q2l | 1970-01-01 08:00:00 | 0 | 0 | Y |
| 128993 | LONG | time | 1970-01-01 08:00:00 | 1521009530611 | 0 | Y | |
| 128993 | STRING | jobName | 5YGl6Lqr5oi/IOWinumHjyDorqLljZUg5ZCM5q2lKDI05bCP5pmCKQ== | 1970-01-01 08:00:00 | 0 | 0 | Y |
然後 我的 jobService 做了什麼事?
jobService.runJob(exportMemberTag(), jobParameters, jobName);
其實就是做一個任務的啟動總開關
因為難免有要更換版的時候, 所以做了一個開關控制, 更新版本時先暫停不要再啟動新的任務, 等現在任務都結束後再進行 shoutdown 後更換 jar 檔 會是比較保險, 以免有的任務有卡 交易等等...善後很麻煩啊XD
public void runJob(Job job, JobParameters jobParameters, String jobName) {
if (BatchConstant.batchEnable.booleanValue() == Boolean.TRUE.booleanValue()) {
try {
jobLauncher.run(job, jobParameters);
} catch (Exception e) {
log.error("", e);
notifyService.notifyError("任务启动失败通知", e.getMessage(), jobName, null);
}
} else {
log.warn("任务暫停執行通知 {}", jobName);
}
}
接下來看怎麼去編排一個任務
public Job exportMemberTag() throws Exception {
return jobBuilderFactory.get("exportTag")
.incrementer(new RunIdIncrementer())
.start(step_exportMemberTag())
.next(step_exportImportFinanceDtoTag())
.listener(batchJobCompletionListener)
.build();
}
透過工廠物件 jobBuilderFactory 去 組合你的 Step 步驟順序 跟監聽器 listener
那再來看我們的第一個 Step (step_exportMemberTag) 怎麼編排出來的
public Step step_exportImportFinanceDtoTag() throws Exception {
return stepBuilderFactory.get("exportImportFinanceTag")
.<ImportFinanceDto, ImportFinanceDto>chunk(100)
.reader(importFinanceDtoReader.getUnTaggedImportFinance())
.processor(importFinanceDtoProcessor.sendTag())
.writer(importFinanceDtoWriter.updateTaggedWriter(ImportFinanceConstant.membertagged_tagged))
.taskExecutor(taskExecutor)
.throttleLimit(10)
.build();
}
也是透過 stepBuilderFactory 去組合出來, 這邊需要定義的 chunk 是從 reader 讀出來一次要處理多少筆,
chunk 前面泛型 ImportFinanceDto 定義 reader & processor & writer 中間處理的資料型態
taskExecutor 則是定義用哪個 ThreadPool 這邊你可以先略過, 因為要用的話還有其他配置要搭配處理
throttleLimit 則是讓資料同時發散出去的大小, 比如設定 10 就同時會讓 10 筆資料丟到 ThreadPool 讓他去執行, 跟上面 taskExecutor 要搭配一起配置
再來看一下 reader 怎麼寫
public ItemReader<ImportFinanceDto> getUnTaggedImportFinance() {
JdbcCursorItemReader<ImportFinanceDto> reader = new JdbcCursorItemReader<ImportFinanceDto>();
reader.setSql("select * from importfinance where membertagged = 0 order by orgcreatetime ");
reader.setDataSource(batchDataSource);
reader.setRowMapper(this.getImportFinanceDtoRowMapper());
return reader;
}
private RowMapper<ImportFinanceDto> getImportFinanceDtoRowMapper() {
return new RowMapper<ImportFinanceDto>() {
@Override
public ImportFinanceDto mapRow(ResultSet resultSet, int i) throws SQLException {
if (!(resultSet.isAfterLast()) && !(resultSet.isBeforeFirst())) {
ImportFinanceDto importFinanceDto = new ImportFinanceDto();
importFinanceDto.setSerid(resultSet.getLong("serid"));
importFinanceDto.setPlatform(resultSet.getString("platform"));
importFinanceDto.setPpk(resultSet.getString("ppk"));
importFinanceDto.setRulecodes(resultSet.getString("rulecodes"));
importFinanceDto.setSaleamount(resultSet.getDouble("saleamount"));
importFinanceDto.setTradeamount(resultSet.getDouble("tradeamount"));
importFinanceDto.setSource(resultSet.getLong("source"));
importFinanceDto.setSourcetype(resultSet.getInt("sourcetype"));
importFinanceDto.setOuterid(resultSet.getString("outerid"));
importFinanceDto.setOutertype(resultSet.getInt("outertype"));
importFinanceDto.setTerminusUserId(resultSet.getLong("terminususerid"));
importFinanceDto.setExported(resultSet.getInt("exported"));
importFinanceDto.setCardtype(resultSet.getString("cardtype"));
importFinanceDto.setOrgcreatetime(resultSet.getLong("orgcreatetime"));
importFinanceDto.setMembertagged(resultSet.getInt("membertagged"));
importFinanceDto.setCreateddate(resultSet.getDate("createddate"));
importFinanceDto.setLastmodifieddate(resultSet.getDate("lastmodifieddate"));
return importFinanceDto;
} else {
log.info("Returning null from rowMapper");
return null;
}
}
};
}
這樣就是一個最基本常用的 ItemReader , 如果你有需要使用 多執行緒下去取資料 簡單一點可以加個 synchronized 像下面這樣
public ItemReader<ImportFinanceDto> getUnExportImportFinance() {
JdbcCursorItemReader<ImportFinanceDto> reader = new JdbcCursorItemReader<ImportFinanceDto>() {
@Override
public synchronized ImportFinanceDto read() throws Exception {
return super.read();
}
};
reader.setSql("select * from importfinance where exported = 0 order by orgcreatetime ");
reader.setDataSource(batchDataSource);
reader.setRowMapper(this.getImportFinanceDtoRowMapper());
return reader;
}
如果你想真的比較有效率的取資料 則可以改用 JdbcPagingItemReader 分頁來讀取資料庫 , 下面是另一個範例
public ItemReader<ImportUserDto> findNewImportUserThreadSafe() {
JdbcPagingItemReader<ImportUserDto> jdbcPagingItemReader = new JdbcPagingItemReader();
jdbcPagingItemReader.setDataSource(batchDataSource);
jdbcPagingItemReader.setFetchSize(10);
jdbcPagingItemReader.setPageSize(100);
try {
jdbcPagingItemReader.setQueryProvider(new MySqlPagingQueryProvider() {{
setSelectClause("select * ");
setFromClause("from importusers");
setWhereClause("isnew = :isnew");
setSortKeys(new HashMap<String, Order>() {{
put("serid", Order.ASCENDING);
}});
}});
jdbcPagingItemReader.setParameterValues(new HashMap<String, Object>() {{
put("isnew", "1");
}});
jdbcPagingItemReader.setRowMapper(this.getImportUserDtoRowMapper());
// 這個要做, 不然沒有 JdbcTemplate 可以操作
jdbcPagingItemReader.afterPropertiesSet();
} catch (Exception e) {
e.printStackTrace();
}
return jdbcPagingItemReader;
}
再講 processor 的中間處理, 在這階段你可以做一些像過濾, 或是轉換物件給下一階段 Step 處理
public ItemProcessor<ImportFinanceDto, ImportFinanceDto> sendTag() {
return new ItemProcessor<ImportFinanceDto, ImportFinanceDto>() {
@Override
public ImportFinanceDto process(ImportFinanceDto importFinanceDto) throws Exception {
ImportFinanceDto return_obj = null;
MemberTagCreateResult memberTagCreateResult = null;
try {
// 建檔
memberTagCreateResult = memberTagService.tagCreate(importFinanceDto);
// 都成功了就要回傳
return_obj = importFinanceDto;
} catch (Exception e) {
log.error("打標系統發生未知錯誤 錯誤訊息 UnknowException", e);
bacthJobEvents.appendError("打標系統發生未知錯誤 錯誤訊息 " + e.getMessage());
}
return return_obj;
}
};
}
最後 接收結果來更新資料庫
public ItemWriter<ImportFinanceDto> updateTaggedWriter(Integer membertagged) {
JdbcBatchItemWriter<ImportFinanceDto> databaseItemWriter = new JdbcBatchItemWriter<ImportFinanceDto>();
databaseItemWriter.setDataSource(batchDataSource);
databaseItemWriter.setSql("update importfinance set membertagged = ?, lastmodifieddate = ? where serid = ? ");
databaseItemWriter.setItemPreparedStatementSetter(updateTaggedWriterSetter(membertagged));
return databaseItemWriter;
}
private ItemPreparedStatementSetter<ImportFinanceDto> updateTaggedWriterSetter(Integer membertagged) {
return (importFinance, ps) -> {
ps.setInt(1, membertagged);
ps.setTimestamp(2, new Timestamp(System.currentTimeMillis()));
ps.setLong(3, importFinance.getSerid());
};
}
上面是使用像 PreparedStatement 的語句來進行配置, 有時欄位一多就對到眼花, 你可以換像用 Hibernate 的 SQL 寫法 使用 : 來指定參數 參考如下 直接使用 InLeftDto 來提供參數
public ItemWriter<InLeftDto> saveInLeft() {
JdbcBatchItemWriter<InLeftDto> databaseItemWriter = new JdbcBatchItemWriter<InLeftDto>();
NamedParameterJdbcTemplate jdbcTemplate = new NamedParameterJdbcTemplate(erpDataSource);
databaseItemWriter.setJdbcTemplate(jdbcTemplate);
databaseItemWriter.setSql("REPLACE into T_INLEFT ( VC_CODE, VC_CLUB) values ( :vcCode, :vcClub)");
ItemSqlParameterSourceProvider<InLeftDto> paramProvider =
new BeanPropertyItemSqlParameterSourceProvider<InLeftDto>();
databaseItemWriter.setItemSqlParameterSourceProvider(paramProvider);
databaseItemWriter.afterPropertiesSet();
return databaseItemWriter;
}
這樣就是一個基本 Batch 操作
← Configuring Visual Studio Code Run SpringBoot