Spring Batch是一款基于 Spring 的企业批处理框架,有点像ETL工具。
应用场景
- 数据量大,少则百万,多则上亿的数量级 - 不需要人工干预,由系统根据配置自动完成 - 与时间相关,如每天执行一次或每月执行一次 - 对数据处理的准确性要求高,并且需要容错机制、回滚机制、完善的日志监控等。
应用环节
- 读数据,数据可能来自文件、数据库或消息队列等 - 数据处理,如电信支撑系统的计费处理 - 写数据,将输出结果写入文件、数据库或消息队列等
应用亮点
- 统一的读写接口 - 丰富的任务处理方式、 - 灵活的事务管理及并发处理 - 日志、监控、任务重启与跳过等特性
对象认识
-
JobLauncher: 顾名思义,该领域对象就是Job的启动器,其作用就是绑定一组JobParameters到Job上,然后运行该Job。
-
Job: 定义,配置批处理任务的领域对象,该对象的作用,第一是做Step的容器,配置该批处理任务需要的Step,以及他们之间的逻辑关系。第二是配置该批处理任务的特征,比方说名字,是否可重启,是否对JobParameters进行验证以及验证规则等。
-
Step: 定义批处理任务中一个对立的逻辑任务处理单元。基本上的业务逻辑处理代码都是封装在Step中的。Step有2种实现形式,一种是Tasklet形式的,这种形式非常自由,开发人员只需要实现Tasklet接口,其中的逻辑完全有自己决定,另一种是Chunk-Oriented形式的,这种形式定义了一个Step的流程必须是“读-处理(可选)-写”,当然Spring Batch也对每一个步骤提供了接口ItemReader, ItemProcessor,ItemWriter还有很多常用的默认实现(读文件,读数据库,写文件,写数据库等等)。 每一个Step只能由一个Tasklet或者一个Chunk构成。
-
JobRepository: 该领域对象会为Spring Batch的运维数据提供一种持久化机制。其为所有的运维数据的提供CRUD的操作接口,并为所有的操作提供事务支持。
示例代码
@EnableBatchProcessing(modular = true)public class BatchConfiguration { @Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public StepBuilderFactory stepBuilderFactory; @Autowired public SqlSessionFactory sqlSessionFactory; @Bean public MyBatisPagingItemReaderreader() { MyBatisPagingItemReader reader = new MyBatisPagingItemReader (); reader.setSqlSessionFactory(sqlSessionFactory); reader.setQueryId("selectCompany"); reader.setPageSize(3); return reader; } @Bean public CompanyPricingItemProcessor processor() { return new CompanyPricingItemProcessor(); } @Bean public MyBatisBatchItemWriter writer() { MyBatisBatchItemWriter writer = new MyBatisBatchItemWriter (); writer.setSqlSessionFactory(sqlSessionFactory); writer.setStatementId("saveCompanyPricing"); return writer; } @Bean public Job testBatch(JobCompletionNotificationListener listener) { return jobBuilderFactory.get("testBatch") .incrementer(new RunIdIncrementer()) .listener(listener)//状态监控 .flow(step1()) .end().build(); } @Bean public Step step1() { return stepBuilderFactory.get("step1") . chunk(100) .reader(reader()) .processor(processor()) .writer(writer()) // .taskExecutor(simpleAsyncTaskExecutor()) // .throttleLimit(3) //开启多少个线程 .allowStartIfComplete(true) .build(); } /** * 异步多线程 * @return */ @Bean public SimpleAsyncTaskExecutor simpleAsyncTaskExecutor() { SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor(); return simpleAsyncTaskExecutor; }}
代码调用非常简明,只要认清楚各个对象的做什么就能快速的进行项目实践。
demo流程介绍
主要环节:通过两表查询 -> 处理数据 -> 写入第三张表
reader
/** * * @param _skiprows * @param _pagesize * @return */ @Select("select * from companypricing left join company on (company.companyCode= companypricing.companyCode) limit #{_skiprows},#{_pagesize}") CompanyPricing selectCompany(@Param("_skiprows") int _skiprows, @Param("_pagesize") int _pagesize); /** * 写入第三张表 * @param companyPricing */ @Insert("insert into CompanyPricingTest (pricingcode, companycode,fullname,suitusednumber) VALUES (#{pricingcode},#{companycode},#{fullname},#{suitusednumber})") void saveCompanyPricing(CompanyPricing companyPricing);
数据查询应用的mybatis。这里我用的分页查询,这里有个坑是当前页数和分页数量。由于查询时用的MyBatisPagingItemReader的setQueryId方法。导致这里参数是无法设置的。通过翻看MyBatisPagingItemReader源码发现在这里进行设置
@Override protected void doReadPage() { Mapparameters = new HashMap (); if (parameterValues != null) { parameters.putAll(parameterValues); } parameters.put("_page", getPage()); parameters.put("_pagesize", getPageSize()); parameters.put("_skiprows", getPage() * getPageSize()); if (results == null) { results = new CopyOnWriteArrayList (); } else { results.clear(); } results.addAll(sqlSessionTemplate. selectList(queryId, parameters)); }
Processor
public class CompanyPricingItemProcessor implements ItemProcessor{ private static final Logger log = LoggerFactory.getLogger(CompanyPricingItemProcessor.class); @Override public CompanyPricing process(final CompanyPricing person) throws Exception { final CompanyPricing transformedPerson = new CompanyPricing(); transformedPerson.setCompanycode(person.getCompanycode()); transformedPerson.setSuitname(person.getSuitname()); transformedPerson.setSuitusednumber(person.getSuitusednumber()); transformedPerson.setPricingcode(person.getPricingcode()); transformedPerson.setFullname(person.getFullname()); log.info("Converting (" + person.getPricingcode() + ") into (" + transformedPerson.getFullname() + ")"); return transformedPerson; }
processor 比较简单,当然这里还可以做数据校验。实际的项目中肯定有数据校验的
Writer
@Bean public MyBatisBatchItemWriterwriter() { MyBatisBatchItemWriter writer = new MyBatisBatchItemWriter (); writer.setSqlSessionFactory(sqlSessionFactory); writer.setStatementId("saveCompanyPricing"); return writer; }
这里用到的是MyBatisBatchItemWriter 批量写入。非常棒的一个功能,比如根据机器的内存来定,当数据到达10w时一次性写入数据库。
定时器Scheduled
@Scheduled(fixedRate = 30000) public void reportCurrentTime() throws Exception { String date = DateTimeFormat.forPattern("yyyyMMddHHmmss").print(new DateTime()) ; System.out.println("当前时间:" + date); JobExecution execution = jobLauncher.run(job, new JobParametersBuilder().addString("date", date).toJobParameters()); System.out.println("Execution status: "+ execution.getStatus()); }
采用spring定时器 ,比较方便只需开启就行@EnableScheduling
对于扩展性,框架提供的扩展能力包括如下的四种模式
- Multithreaded Step 多线程执行一个Step - Parallel Step 通过多线程并行执行多个Step - Remote Chunking 在远端节点上执行分布式Chunk操作 - Partitioning Step 对数据进行分区,并分开执行
这这种机制非常棒,有兴趣的同学可以自行进行尝试。
参考文章链接
这几年随着互联网的大肆其道,带来的垃圾也是越来越多,比如想搜索一个学习spring batch的文章会出现很多垃圾的信息,通过不停的查看,最终只有几篇有效的文章出现。
spring batch官网链接。不管什么时候官方的第一手资料才是最佳的指导。