转载

Spring Batch(4)——Item概念及使用代码 原 荐

在 批处理概念 中介绍一个标准的批处理分为 Job 和 Step 。本文将结合代码介绍在 StepReaderProcessorWriter 的实际使用。

Reader

Reader 是指从各种各样的外部输入中获取数据,框架为获取各种类型的文件已经预定义了常规的 Reader 实现类。 Reader 通过 ItemReader 接口实现:

public interface ItemReader<T> {
    T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}

read 方法的作用就是读取一条数据,数据以泛型T的实体结构返回, 当read返回null时表示所有数据读取完毕 。返回的数据可以是任何结构,比如文件中的一行字符串,数据库的一行数据,或者xml文件中的一系列元素,只要是一个Java对象即可。

Writer

Writer 通过 ItemWriter 接口实现:

public interface ItemWriter<T> {
    void write(List<? extends T> items) throws Exception;
}

WriterReader 的反向操作,是将数据写入到特定的数据源中。在 Step控制 一文已经介绍 Writer 是根据 chunk 属性设定的值按列表进行操作的,所以传入的是一个 List 结构。 chunk 用于表示批处理的事物分片,因此需要注意的是,在 writer 方法中进行完整数据写入事物操作。例如向数据库写入 List 中的数据,在写入完成之后再提交事物。

读写的组合模式

无论是读还是写,有时会需要从多个不同的来源获取文件,或者写入到不同的数据源,或者是需要在读和写之间处理一些业务。可以使用组合模式来实现这个目的:

public class CompositeItemWriter<T> implements ItemWriter<T> {
    ItemWriter<T> itemWriter;
    public CompositeItemWriter(ItemWriter<T> itemWriter) {
        this.itemWriter = itemWriter;
    }

    public void write(List<? extends T> items) throws Exception {
        //Add business logic here
       itemWriter.write(items);
    }

    public void setDelegate(ItemWriter<T> itemWriter){
        this.itemWriter = itemWriter;
    }
}

Processor

除了使用组合模式,直接使用 Processor 是一种更优雅的方法。 ProcessorStep 中的可选项,但是批处理大部分时候都需要对数据进行处理,因此框架提供了 ItemProcessor 接口来满足 Processor 过程:

public interface ItemProcessor<I, O> {
    O process(I item) throws Exception;
}

Processor 的结构非常简单也是否易于理解。传入一个类型I,然后由 Processor 处理成为O。

Processor链

在一个 Step 中可以使用多个 Processor 来按照顺序处理业务,此时同样可以使用 CompositeItem 模式来实现:

@Bean
public CompositeItemProcessor compositeProcessor() {
    //创建 CompositeItemProcessor
    CompositeItemProcessor<Foo,Foobar> compositeProcessor = new CompositeItemProcessor<Foo,Foobar>();
    List itemProcessors = new ArrayList();
    //添加第一个 Processor
    itemProcessors.add(new FooTransformer());
    //添加第二个 Processor
    itemProcessors.add(new BarTransformer());
    //添加链表
    compositeProcessor.setDelegates(itemProcessors);
    return processor;
}

过滤记录

Reader 读取数据的过程中,并不是所有的数据都可以使用,此时 Processor 还可以用于过滤非必要的数据,同时不会影响 Step 的处理过程。只要 ItemProcesspr 的实现类在 procss 方法中返回 null 即表示改行数据被过滤掉了。

ItemStream

在 Step控制 一文中已经提到了 ItemStream 。在 数据批处理概念 中提到过,Spring Batch的每一步都是无状态的,进而 ReaderWriter 也是无状态的,这种方式能够很好的隔离每行数据的处理,也能将容错的范围收窄到可以空子的范围。但是这并不意味着整个批处理的过程中并不需要控制状态。例如从数据库持续读入或写入数据,每次 ReaderWriter 都单独去申请数据源的链接、维护数据源的状态(打开、关闭等)。因此框架提供了 ItemStream 接口来完善这些操作:

public interface ItemStream {
    void open(ExecutionContext executionContext) throws ItemStreamException;
    void update(ExecutionContext executionContext) throws ItemStreamException;
    void close() throws ItemStreamException;
}

持久化数据

在使用Spring Batch之前需要初始化他的元数据存储(Meta-Data Schema),也就是要将需要用到的表导入到对应的数据库中。当然,Spring Batch支持不使用任何持久化数据库,仅仅将数据放到内存中,不设置 DataSource 即可。

初始化序列

Spring Batch相关的工作需要使用序列 SEQUENCE

CREATE SEQUENCE BATCH_STEP_EXECUTION_SEQ;
CREATE SEQUENCE BATCH_JOB_EXECUTION_SEQ;
CREATE SEQUENCE BATCH_JOB_SEQ;

有些数据库不支持 SEQUENCE ,可以通过表代理,比如在MySql(InnoDB数据库)中:

CREATE TABLE BATCH_STEP_EXECUTION_SEQ (ID BIGINT NOT NULL);
INSERT INTO BATCH_STEP_EXECUTION_SEQ values(0);
CREATE TABLE BATCH_JOB_EXECUTION_SEQ (ID BIGINT NOT NULL);
INSERT INTO BATCH_JOB_EXECUTION_SEQ values(0);
CREATE TABLE BATCH_JOB_SEQ (ID BIGINT NOT NULL);
INSERT INTO BATCH_JOB_SEQ values(0);

关于Version字段

某些表中都有 Version 字段。因为Spring的更新策略是乐观锁,因此在进行数据更新之后都会对表的 Version 字段进行+1处理。在内存与数据库交互的过程中,会使用采用 getVersionincreaseVersion(+1)updateDataAndVersion 的过程,如果在 update 的时候发现Version不是预计的数值(+1),则会抛出 OptimisticLockingFailureException 的异常。当同一个 Job 在进群中不同服务上执行时,需要注意这个问题。

BATCH_JOB_INSTANCE

BATCH_JOB_INSTANCE 用于记录JobInstance,在 数据批处理概念 中介绍了他的工作方式,其结构为:

CREATE TABLE BATCH_JOB_INSTANCE  (
  JOB_INSTANCE_ID BIGINT  PRIMARY KEY ,
  VERSION BIGINT,
  JOB_NAME VARCHAR(100) NOT NULL ,
  JOB_KEY VARCHAR(2500)
);
字段 说明
JOB_INSTANCE_ID 主键,主键与单个 JobInstance 相关。当获取到某个 JobInstance 实例后,通过 getId 方法可以获取到此数据
VERSION
JOB_NAME Job的名称,用于标记运行的Job,在创建Job时候指定
JOB_KEY JobParameters 的序列化数值。在 数据批处理概念 中介绍了一个 JobInstance 相当于 Job + JobParameters 。他用于标记同一个 Job 不同的实例

BATCH_JOB_EXECUTION_PARAMS

BATCH_JOB_EXECUTION_PARAMS 对应的是 JobParameters 对象。其核心功能是存储 Key-Value 结构的各种状态数值。字段中 IDENTIFYING=true 用于标记那些运行过程中必须的数据(可以理解是框架需要用到的数据),为了存储 key-value 结构该表一个列数据格式:

CREATE TABLE BATCH_JOB_EXECUTION_PARAMS  (
	JOB_EXECUTION_ID BIGINT NOT NULL ,
	TYPE_CD VARCHAR(6) NOT NULL ,
	KEY_NAME VARCHAR(100) NOT NULL ,
	STRING_VAL VARCHAR(250) ,
	DATE_VAL DATETIME DEFAULT NULL ,
	LONG_VAL BIGINT ,
	DOUBLE_VAL DOUBLE PRECISION ,
	IDENTIFYING CHAR(1) NOT NULL ,
	constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
	references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
);
字段 说明
JOB_EXECUTION_ID BATCH_JOB_EXECUTION 表关联的外键,详见 数据批处理概念 中 JobJobInstanceJobExecute 的关系
TYPE_CD 用于标记数据的对象类型,例如 stringdatelongdouble ,非空
KEY_NAME key的值
STRING_VAL string 类型的数值
DATE_VAL date 类型的数值
LONG_VAL long 类型的数值
DOUBLE_VAL double 类型的数值
IDENTIFYING 标记这对 key-valuse 是否来自于 JobInstace 自身

BATCH_JOB_EXECUTION

关联 JobExecution ,每当运行一个 Job 都会产生一个新的 JobExecution ,对应的在表中都会新增一行数据。

CREATE TABLE BATCH_JOB_EXECUTION  (
  JOB_EXECUTION_ID BIGINT  PRIMARY KEY ,
  VERSION BIGINT,
  JOB_INSTANCE_ID BIGINT NOT NULL,
  CREATE_TIME TIMESTAMP NOT NULL,
  START_TIME TIMESTAMP DEFAULT NULL,
  END_TIME TIMESTAMP DEFAULT NULL,
  STATUS VARCHAR(10),
  EXIT_CODE VARCHAR(20),
  EXIT_MESSAGE VARCHAR(2500),
  LAST_UPDATED TIMESTAMP,
  JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,
  constraint JOB_INSTANCE_EXECUTION_FK foreign key (JOB_INSTANCE_ID)
  references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ;
字段 说明
JOB_EXECUTION_ID JobExecution 的主键, JobExecution::getId 方法可以获取到该值
VERSION
JOB_INSTANCE_ID 关联到 JobInstace 的外键,详见 数据批处理概念 中 JobJobInstanceJobExecute 的关系
CREATE_TIME 创建时间戳
START_TIME 开始时间戳
END_TIME 结束时间戳,无论成功或失败都会更新这一项数据。如果某行数据该值为空表示运行期间出现错误,并且框架无法更新该值
STATUS JobExecute 的运行状态: COMPLETEDSTARTED 或者其他状态。此数值对应Java中 BatchStatus 枚举值
EXIT_CODE JobExecute 执行完毕之后的退出返回值
EXIT_MESSAGE JobExecute 退出的详细内容,如果是异常退出可能会包括异常堆栈的内容
LAST_UPDATED 最后一次更新的时间戳

BATCH_STEP_EXECUTION

该表对应的是 StepExecution ,其结构和 BATCH_JOB_EXECUTION 基本相似,只是对应的对象是 Step ,增加了与之相对的一些字段数值:

CREATE TABLE BATCH_STEP_EXECUTION  (
  STEP_EXECUTION_ID BIGINT  PRIMARY KEY ,
  VERSION BIGINT NOT NULL,
  STEP_NAME VARCHAR(100) NOT NULL,
  JOB_EXECUTION_ID BIGINT NOT NULL,
  START_TIME TIMESTAMP NOT NULL ,
  END_TIME TIMESTAMP DEFAULT NULL,
  STATUS VARCHAR(10),
  COMMIT_COUNT BIGINT ,
  READ_COUNT BIGINT ,
  FILTER_COUNT BIGINT ,
  WRITE_COUNT BIGINT ,
  READ_SKIP_COUNT BIGINT ,
  WRITE_SKIP_COUNT BIGINT ,
  PROCESS_SKIP_COUNT BIGINT ,
  ROLLBACK_COUNT BIGINT ,
  EXIT_CODE VARCHAR(20) ,
  EXIT_MESSAGE VARCHAR(2500) ,
  LAST_UPDATED TIMESTAMP,
  constraint JOB_EXECUTION_STEP_FK foreign key (JOB_EXECUTION_ID)
  references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;

未填入内容部分见 BATCH_JOB_EXECUTION 说明。

字段 说明
STEP_EXECUTION_ID StepExecute 对应的主键
VERSION
STEP_NAME Step 名称
JOB_EXECUTION_ID 关联到 BATCH_JOB_EXECUTION 表的外键,标记该 StepExecute 所属的 JobExecute
START_TIME
END_TIME
STATUS
COMMIT_COUNT 执行过程中,事物提交的次数,该值与数据的规模以及 chunk 的设置有关
READ_COUNT 读取数据的次数
FILTER_COUNT Processor 中过滤记录的次数
WRITE_COUNT 吸入数据的次数
READ_SKIP_COUNT 读数据的跳过次数
WRITE_SKIP_COUNT 写数据的跳过次数
PROCESS_SKIP_COUNT Processor 跳过的次数
ROLLBACK_COUNT 回滚的次数
EXIT_CODE
EXIT_MESSAGE
LAST_UPDATED

BATCH_JOB_EXECUTION_CONTEXT

该表会记录所有与 Job 相关的 ExecutionContext 信息。每个 ExecutionContext 都对应一个 JobExecution ,在运行的过程中它包含了所有 Job 范畴的状态数据,这些数据在执行失败后对于后续处理有中重大意义。

CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT  (
  JOB_EXECUTION_ID BIGINT PRIMARY KEY,
  SHORT_CONTEXT VARCHAR(2500) NOT NULL,
  SERIALIZED_CONTEXT CLOB,
  constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
  references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;
字段 说明
JOB_EXECUTION_ID 关联到 JobExecution 的外键,建立 JobExecutionExecutionContext 的关系。
SHORT_CONTEXT 标记 SERIALIZED_CONTEXT 的版本号
SERIALIZED_CONTEXT 序列化的 ExecutionContext

BATCH_STEP_EXECUTION_CONTEXT

StepExecutionContext 相关的数据表,结构与 BATCH_JOB_EXECUTION_CONTEXT 完全一样。

表索引建议

上面的所有建表语句都没有提供索引,但是并不代表索引没有价值。当感觉到SQL语句的执行有效率问题时候,可以增加索引。

索引带来的价值取决于SQL查询的频率以及关联关系,下面是Spring Batch框架在运行过程中会用到的一些查询条件语句,用于参考优化索引:

Where条件 执行频率
BATCH_JOB_INSTANCE JOB_NAME = ? and JOB_KEY = ? 每次Job启动执时
BATCH_JOB_EXECUTION JOB_INSTANCE_ID = ? 每次Job重启时
BATCH_EXECUTION_CONTEXT EXECUTION_ID = ? and KEY_NAME = ? chunk 的大小而定
BATCH_STEP_EXECUTION VERSION = ? chunk 的大小而定
BATCH_STEP_EXECUTION STEP_NAME = ? and JOB_EXECUTION_ID = ? 每一个 Step 执行之前

使用案例

下面是Spring Batch一些简单的应用,源码在下列地址的 simple 工程:

  • Gitee: https://gitee.com/chkui-com/spring-batch-sample
  • Github: https://github.com/chkui/spring-batch-sample

Spring Batch提供了2种执行方式:命令行方式或Java内嵌方式。命令行方式是直到需要执行批处理任务的时候才启动程序,内嵌方式是结合Web工程或其他外部化框架来使用。2者最大的差别就是是否直接向IoCs注入一个 Job 实例。

通用基本配置

两种方式的基本配置都是一样的,通过 ReaderProcessorWriter 来组装一个 Step 。代码中 Item 并不涉及文件或数据库的操作,只是简单的模拟数据读取、处理、写入的过程。实体 RecordMsg 用于模拟数据转换,基本配置如下:

public class BatchDefaultConfig {
	@Bean
	//配置Step
	public Step simpleStep(StepBuilderFactory builder, ItemReader<Record> reader, ItemProcessor<Record, Msg> processor,
			ItemWriter<Msg> writer) {
		return builder.get("SimpleStep").<Record, Msg>chunk(10).reader(reader).processor(processor).writer(writer)
				.build();
	}

	@Bean
	//配置 Reader
	public ItemReader<Record> reader() {
		return new ItemReader<Record>() {
			private int count = 0;
			public Record read()
					throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
				return ++this.count < 100 ? new Record().setId(this.count).setMsg("Read Number:" + this.count) : null;
			}
		};
	}

	@Bean
	//配置 Processor
	public ItemProcessor<Record, Msg> processor() {
		return new ItemProcessor<Record, Msg>() {
			public Msg process(Record item) throws Exception {
				return new Msg("MSG GET INFO = " + item.getMsg());
			}
		};
	}

	@Bean
	//配置 Writer
	public ItemWriter<Msg> writer() {
		return new ItemWriter<Msg>() {
			private int batchCount = 0;
			public void write(List<? extends Msg> items) throws Exception {
				System.out.println("Batch Count : " + ++batchCount + ". Data:");
				for (Msg msg : items) {
					System.out.println(msg.getInfo());
				}
			}
		};
	}
}

命令行方式运行

有了基本配置之后,命令行运行的方式仅仅是向容器添加一个 Job

@Configuration
//导入依赖配置
@Import({ BatchDefaultConfig.class })
public class BatchCommondConfig {
	@Bean
	public Job simpleJob(Step step, JobBuilderFactory builder) {
		return builder.get("SimpleJob").start(step).build(); //向容器返回一个Job的Bean
	}
}

然后启动Spring Framework则会自动启用Command Runner运行方式运行——先调用 SpringApplication::callRunner 方法,然后使用 JobLauncherCommandLineRunner::execute 运行:

public class CommondSample {
	public static void main(String[] args) throws DuplicateJobException {
		//模拟测试参数, 这些参数值在执行Java时从外部传入的,比如-Dkey=value
		String[] argsExt = new String[2];
		argsExt[0] = "BuilderParam1=Value1";
		argsExt[1] = "BuilderParam2=Value2";
		//运行Spring Framework
		SpringApplication.run(CommondSample.class, argsExt);
	}
}

启用之后观察数据库已经发生了变更。使用命令行需要通过 Java运行参数(-Dkey=value)传递 JobParameters 的数据,上面的代码模拟实现了相关的过程。

Java内嵌运行

Java内嵌的方式主要是用于搭配外部工程化使用,比如使用Web框架或则统一调度平台管之类的结构化框架来统一管理批处理任务。与命令行执行最大的区别就是不向容器注入 Job

@Configuration
//导入进出配置 
@Import({BatchDefaultConfig.class})
public class BatchOperatoConfig {
	@Bean
	//返回JobFactory
	public JobFactory simpleJob(Step step, JobBuilderFactory builder) {
		SimpleJobFactory sampleJobFactory = new SimpleJobFactory();
		sampleJobFactory.setJob(builder.get("SimpleJob").start(step).build());
		return sampleJobFactory;
	}
}

配置代码向容器添加了一个 JobFactory 的实现类, JobFactory 的两个接口一个是获取 Job 一个是获取 Job 的名称, SimpleJobFactory 实现了 JobFactory

public class SimpleJobFactory implements JobFactory {
	private Job job;
	public void setJob(Job job) {
		this.job = job;
	}
	@Override
	public Job createJob() {
		return job;
	}
	@Override
	public String getJobName() {
		return job.getName();
	}
}

最后通过 SimpleJobFactory 来启动一个 Job

@SpringBootApplication
@EnableBatchProcessing
@EnableScheduling
public class OperatorSample {
	public static void main(String[] args) throws DuplicateJobException {
		new SuspendThread().run(); //挂起系统一直运行
		ConfigurableApplicationContext ctx = SpringApplication.run(OperatorSample.class);
		Cron cron = ctx.getBean(Cron.class);
		cron.register(); //注册JobFactory
		cron.runJobLaunch();
	}
}

@Service
class Cron {
	@Autowired
	JobLauncher jobLauncher;

	@Autowired
	private JobOperator jobOperator;

	@Autowired
	private JobRegistry jobRegistry;

	@Autowired
	private JobFactory jobFactory;

	//注册JobFactory
	void register() throws DuplicateJobException {
		jobRegistry.register(jobFactory);
	}

	//使用JobLaunch执行
	void runJobLaunch() {
		Map<String, JobParameter> map = new HashMap<>();
		map.put("Builder", new JobParameter("1"));
		map.put("Timer", new JobParameter("2"));
		jobLauncher.run(jobFactory.createJob(), new JobParameters(map));
	}

	@Scheduled(cron = "30 * * * * ? ")
	void task1() {
		System.out.println("1");
		runOperator();
	}

	//定时任务使用 JobOperator执行
	private void runOperator() {
		jobOperator.start("SimpleJob", "Builder=1,Timer=2");
	}
}

这里使用了2种执行方式: JobLauncherJobOperatorJobLauncher 简单明了的启动一个批处理任务。而 JobOperator 扩展了一些用于 Job 管理的接口方法,观察 JobOperator 的源码可以发现它提供了获取 ExecuteContext 、检查 JobInstance 等功能,如果需要定制开发一个基于Web或者JMX管理批处理任务的系统, JobOperator 更合适。 JobOperator 的第二个参数用于传递 JobParameters ,等号两端分别是 keyvalue ,逗号用于分割多行数据。

在 Job配置与运行 提及过一个 JobInstance 相当于 Job + JobParameters ,因此虽然上面的代码使用了两种不同的运行方式,但是 JobJobParameters 是一样的。在运行被定时任务包裹的 runOperator 方法时,会一直抛出 JobInstanceAlreadyExistsException 异常,因为同一个实例不能运行2次。如果运行失败可以使用对应的 restart 方法。

后续会介绍各种 ReaderWriter 的使用。

原文  https://my.oschina.net/chkui/blog/3071265
正文到此结束
Loading...