博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spring Batch
阅读量:6259 次
发布时间:2019-06-22

本文共 5967 字,大约阅读时间需要 19 分钟。

hot3.png

Spring Batch是一款基于 Spring 的企业批处理框架,有点像ETL工具。

应用场景

- 数据量大,少则百万,多则上亿的数量级        - 不需要人工干预,由系统根据配置自动完成        - 与时间相关,如每天执行一次或每月执行一次        - 对数据处理的准确性要求高,并且需要容错机制、回滚机制、完善的日志监控等。

应用环节

- 读数据,数据可能来自文件、数据库或消息队列等        - 数据处理,如电信支撑系统的计费处理        - 写数据,将输出结果写入文件、数据库或消息队列等

应用亮点

- 统一的读写接口           - 丰富的任务处理方式、           - 灵活的事务管理及并发处理           - 日志、监控、任务重启与跳过等特性

对象认识

输入图片说明

  1. JobLauncher: 顾名思义,该领域对象就是Job的启动器,其作用就是绑定一组JobParameters到Job上,然后运行该Job。

  2. Job: 定义,配置批处理任务的领域对象,该对象的作用,第一是做Step的容器,配置该批处理任务需要的Step,以及他们之间的逻辑关系。第二是配置该批处理任务的特征,比方说名字,是否可重启,是否对JobParameters进行验证以及验证规则等。

  3. Step: 定义批处理任务中一个对立的逻辑任务处理单元。基本上的业务逻辑处理代码都是封装在Step中的。Step有2种实现形式,一种是Tasklet形式的,这种形式非常自由,开发人员只需要实现Tasklet接口,其中的逻辑完全有自己决定,另一种是Chunk-Oriented形式的,这种形式定义了一个Step的流程必须是“读-处理(可选)-写”,当然Spring Batch也对每一个步骤提供了接口ItemReader, ItemProcessor,ItemWriter还有很多常用的默认实现(读文件,读数据库,写文件,写数据库等等)。 每一个Step只能由一个Tasklet或者一个Chunk构成。

  4. JobRepository: 该领域对象会为Spring Batch的运维数据提供一种持久化机制。其为所有的运维数据的提供CRUD的操作接口,并为所有的操作提供事务支持。

示例代码

@EnableBatchProcessing(modular = true)public class BatchConfiguration {	@Autowired	public JobBuilderFactory jobBuilderFactory;	@Autowired	public StepBuilderFactory stepBuilderFactory;	@Autowired	public SqlSessionFactory sqlSessionFactory;	@Bean	public MyBatisPagingItemReader
reader() { MyBatisPagingItemReader
reader = new MyBatisPagingItemReader
(); reader.setSqlSessionFactory(sqlSessionFactory); reader.setQueryId("selectCompany"); reader.setPageSize(3); return reader; } @Bean public CompanyPricingItemProcessor processor() { return new CompanyPricingItemProcessor(); } @Bean public MyBatisBatchItemWriter
writer() { MyBatisBatchItemWriter
writer = new MyBatisBatchItemWriter
(); writer.setSqlSessionFactory(sqlSessionFactory); writer.setStatementId("saveCompanyPricing"); return writer; } @Bean public Job testBatch(JobCompletionNotificationListener listener) { return jobBuilderFactory.get("testBatch") .incrementer(new RunIdIncrementer()) .listener(listener)//状态监控 .flow(step1()) .end().build(); } @Bean public Step step1() { return stepBuilderFactory.get("step1") .
chunk(100) .reader(reader()) .processor(processor()) .writer(writer()) // .taskExecutor(simpleAsyncTaskExecutor()) // .throttleLimit(3) //开启多少个线程 .allowStartIfComplete(true) .build(); } /** * 异步多线程 * @return */ @Bean public SimpleAsyncTaskExecutor simpleAsyncTaskExecutor() { SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor(); return simpleAsyncTaskExecutor; }}

代码调用非常简明,只要认清楚各个对象的做什么就能快速的进行项目实践。

demo流程介绍

输入图片说明

主要环节:通过两表查询 -> 处理数据 -> 写入第三张表

reader

/**	 * 	 * @param _skiprows	 * @param _pagesize	 * @return	 */	@Select("select * from  companypricing left join  company on (company.companyCode= companypricing.companyCode) limit #{_skiprows},#{_pagesize}")	CompanyPricing selectCompany(@Param("_skiprows") int _skiprows, @Param("_pagesize") int _pagesize);		/**	 * 写入第三张表	 * @param companyPricing	 */	@Insert("insert into CompanyPricingTest (pricingcode, companycode,fullname,suitusednumber) VALUES (#{pricingcode},#{companycode},#{fullname},#{suitusednumber})")	void saveCompanyPricing(CompanyPricing companyPricing);

数据查询应用的mybatis。这里我用的分页查询,这里有个坑是当前页数和分页数量。由于查询时用的MyBatisPagingItemReader的setQueryId方法。导致这里参数是无法设置的。通过翻看MyBatisPagingItemReader源码发现在这里进行设置

@Override  protected void doReadPage() {    Map
parameters = new HashMap
(); if (parameterValues != null) { parameters.putAll(parameterValues); } parameters.put("_page", getPage()); parameters.put("_pagesize", getPageSize()); parameters.put("_skiprows", getPage() * getPageSize()); if (results == null) { results = new CopyOnWriteArrayList
(); } else { results.clear(); } results.addAll(sqlSessionTemplate.
selectList(queryId, parameters)); }

Processor

public class CompanyPricingItemProcessor implements ItemProcessor
{ private static final Logger log = LoggerFactory.getLogger(CompanyPricingItemProcessor.class); @Override public CompanyPricing process(final CompanyPricing person) throws Exception { final CompanyPricing transformedPerson = new CompanyPricing(); transformedPerson.setCompanycode(person.getCompanycode()); transformedPerson.setSuitname(person.getSuitname()); transformedPerson.setSuitusednumber(person.getSuitusednumber()); transformedPerson.setPricingcode(person.getPricingcode()); transformedPerson.setFullname(person.getFullname()); log.info("Converting (" + person.getPricingcode() + ") into (" + transformedPerson.getFullname() + ")"); return transformedPerson; }

processor 比较简单,当然这里还可以做数据校验。实际的项目中肯定有数据校验的

Writer

@Bean	public MyBatisBatchItemWriter
writer() { MyBatisBatchItemWriter
writer = new MyBatisBatchItemWriter
(); writer.setSqlSessionFactory(sqlSessionFactory); writer.setStatementId("saveCompanyPricing"); return writer; }

这里用到的是MyBatisBatchItemWriter 批量写入。非常棒的一个功能,比如根据机器的内存来定,当数据到达10w时一次性写入数据库。

定时器Scheduled

@Scheduled(fixedRate = 30000)	    public void reportCurrentTime() throws Exception {		 String date = DateTimeFormat.forPattern("yyyyMMddHHmmss").print(new DateTime()) ;	        System.out.println("当前时间:" + date);	        JobExecution execution = jobLauncher.run(job, new JobParametersBuilder().addString("date", date).toJobParameters());  	        System.out.println("Execution status: "+ execution.getStatus());  	    }

采用spring定时器 ,比较方便只需开启就行@EnableScheduling

对于扩展性,框架提供的扩展能力包括如下的四种模式

- Multithreaded Step 多线程执行一个Step        - Parallel Step 通过多线程并行执行多个Step        - Remote Chunking 在远端节点上执行分布式Chunk操作        - Partitioning Step 对数据进行分区,并分开执行

这这种机制非常棒,有兴趣的同学可以自行进行尝试。

参考文章链接

这几年随着互联网的大肆其道,带来的垃圾也是越来越多,比如想搜索一个学习spring batch的文章会出现很多垃圾的信息,通过不停的查看,最终只有几篇有效的文章出现。

spring batch官网链接。不管什么时候官方的第一手资料才是最佳的指导。

转载于:https://my.oschina.net/penghaozhong/blog/1418155

你可能感兴趣的文章
远程共享文件夹
查看>>
[转] C/C++中printf和C++中cout的输出格式
查看>>
swift 如何实现点击view后显示灰色背景
查看>>
【Android】3.9 覆盖物功能
查看>>
MySQL也有潜规则 – Select 语句不加 Order By 如何排序?
查看>>
搭建SolrCloud的详细步骤
查看>>
svn的安装与使用
查看>>
基于Linux下Iptables限制BT下载的研究
查看>>
Android对话框-中篇-之建立自己的对话框
查看>>
华为交换机VRP用户界面配置及Telnet登录实验
查看>>
作为一个程序员我最大的遗憾
查看>>
《SolidWorks 2012中文版从入门到精通》一6.5 综合实例——斜齿圆柱齿轮
查看>>
storm集群的监控
查看>>
RHCE 6.0学习笔记-2 RHEL 6 使用光盘配置本地YUM源
查看>>
Mongodb定期备份
查看>>
Confluence 6 数据库设置
查看>>
刨根问底-struts-怎么加载配置的相应的信息
查看>>
解决mysql数据库大小写敏感问题
查看>>
jsp页面组成
查看>>
LCS记录
查看>>