Spring并行批处理

Spring Batch提供了可处理大量记录所必需的可重用功能,包括日志记录/跟踪,事务管理,作业处理统计,作业重启,跳过和资源管理等交叉问题。这里展示一个并行运行多个作业的示例,作业彼此独立并以并行方式完成执行。SpringBatch入口概念是Job,一个Job由多个step步骤组成,通过步骤的不同并行方式实现并行批处理,步骤并行模式有以下几个方式:

  • 步骤step是多线程(单个进程

  • 步骤step是并行的(单个过程)

  • 步骤是远程分块(多进程)

  • 步骤分区数据分片(单个或多个过程)

线程步骤

启动并行处理的最简单方法是在Step配置中添加一个TaskExecutor。

@Bean
public TaskExecutor taskExecutor(){
   return new SimpleAsyncTaskExecutor("spring_batch");
}

@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
   return this.stepBuilderFactory.get("sampleStep")
         .<String, String>chunk(10)
         .reader(itemReader())
         .writer(itemWriter())
         .taskExecutor(taskExecutor)
         .build();
}

TaskExecutor
是一个标准的Spring接口,最简单的多线程TaskExecutor是 SimpleAsyncTaskExecutor,上述配置的结果是Step通过在单独的执行线程中进行读取,处理和输出每个块。请注意,这意味着处理条目是没有固定的顺序。线程池默认为4个线程.你增加此限制以确保线程池是充分利用

@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
   return this.stepBuilderFactory.get("sampleStep")
         .<String, String>chunk(10)
         .reader(itemReader())
         .writer(itemWriter())
         .taskExecutor(taskExecutor)
         .throttleLimit(20)
         .build();
}

如果在步骤中使用了数据库连接池,这些连接池的最大连接数也可能会限制批处理的并发性,确保这些资源池中设置至少与步骤中所需的并发线程数一样大。

步骤step对于多线程使用还是有一定限制,其条目的读入、处理和输出处理器都是有状态的。如果状态没有被线程隔离,那么这些组件在多线程中不可用Step。你可以使用SynchronizedItemStreamReader确保线程安全

并行步骤

如果你的业务逻辑可以分成不同的职责并分配给各个步骤,那么它就可以在一个进程中并行化。并行步执行易于配置和使用。

首先使用FlowBuilder构建一个个小的flow流程,在这个流程里面指定步骤,两个流程flow是并行执行的,下面有两个并行流flow1和flow2,flow1里面有step1 step2先后顺序,flow2有step3,也就是说{step1,step2}一起和step3是并行的:

@Bean
public Job job() {
   return jobBuilderFactory.get("job")
         .start(splitFlow())
         .next(step4())
         .build()        //builds FlowJobBuilder instance
         .build();       //builds Job instance
}

@Bean
public Flow splitFlow() {
   return new FlowBuilder<SimpleFlow>("splitFlow")
         .split(taskExecutor())
         .add(flow1(), flow2())
         .build();
}

@Bean
public Flow flow1() {
   return new FlowBuilder<SimpleFlow>("flow1")
         .start(step1())
         .next(step2())
         .build();
}

@Bean
public Flow flow2() {
   return new FlowBuilder<SimpleFlow>("flow2")
         .start(step3())
         .build();
}

@Bean
public TaskExecutor taskExecutor(){
   return new SimpleAsyncTaskExecutor("spring_batch");
}

需要指定TaskExecutor 应该使用哪个实现来执行各个流。默认值为 SyncTaskExecutor没有用,需要异步TaskExecutor才能并行运行这些步骤。

再看一个并行案例:

@Bean
public Job parallelStepsJob() {

   Flow masterFlow = new FlowBuilder<Flow>("masterFlow").start(taskletStep("step1")).build();


   Flow flowJob1 = new FlowBuilder<Flow>("flow1").start(taskletStep("step2")).build();
   Flow flowJob2 = new FlowBuilder<Flow>("flow2").start(taskletStep("step3")).build();
   Flow flowJob3 = new FlowBuilder<Flow>("flow3").start(taskletStep("step4")).build();

   Flow slaveFlow = new FlowBuilder<Flow>("splitflow")
         .split(new SimpleAsyncTaskExecutor()).add(flowJob1, flowJob2, flowJob3).build();

   return (jobBuilderFactory.get("parallelFlowJob")
         .incrementer(new RunIdIncrementer())
         .start(masterFlow)
         .next(slaveFlow)
         .build()).build();

}


private TaskletStep taskletStep(String step) {
   return stepBuilderFactory.get(step).tasklet((contribution, chunkContext) -> {
      IntStream.range(1, 100).forEach(token -> logger.info("Step:" + step + " token:" + token));
      return RepeatStatus.FINISHED;
   }).build();

}

这里有四个流程,主流程masterFlow是最先开始,然后并行的是flowJob1, flowJob2, flowJob3

下面我们将分析分布式的处理方式。见spring batch批处理

原文 

https://www.jdon.com/springboot/spring-batch-parallel.html

本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。

PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » Spring并行批处理

赞 (0)
分享到:更多 ()

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址