快捷搜索:  汽车  科技

spring boot 2.3.0 过滤器(SpringBoot项目中应用SpringBatch批处理框架)

spring boot 2.3.0 过滤器(SpringBoot项目中应用SpringBatch批处理框架)<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId>

环境:Springboot2.3.12RELEASE Spring Batch4.2.7


Spring Batch是一个轻量级的,完全面向Spring的批处理框架,可以应用于企业级大量的数据处理系统。Spring Batch以POJO和大家熟知的spring框架为基础,使开发者更容易的访问和利用企业级服务。Spring Batch可以提供大量的,可重复的数据处理功能,包括日志记录/跟踪,事务管理,作业处理统计工作重新启动、跳过,和资源管理等重要功能。

业务场景:

  1. 定期提交批处理。
  2. 并行批处理:作业的并行处理
  3. 分阶段、企业消息驱动的处理
  4. 大规模并行批处理
  5. 故障后手动或计划重新启动
  6. 相关步骤的顺序处理(扩展到工作流驱动的批处理)
  7. 部分处理:跳过记录(例如,回滚时)
  8. 整批事务,适用于小批量或现有存储过程/脚本的情况

技术目标:

  1. 批处理开发人员使用Spring编程模型:专注于业务逻辑,让框架负责基础设施。
  2. 基础架构、批处理执行环境和批处理应用程序之间的关注点清晰分离。
  3. 提供通用的核心执行服务,作为所有项目都可以实现的接口。
  4. 提供可“开箱即用”的核心执行接口的简单和默认实现。
  5. 通过在所有层中利用spring框架,可以轻松配置、定制和扩展服务。
  6. 所有现有的核心服务都应该易于替换或扩展,而不会对基础架构层造成任何影响。
  7. 提供一个简单的部署模型,使用Maven构建的架构JAR与应用程序完全分离。

Spring Batch的结构:

spring boot 2.3.0 过滤器(SpringBoot项目中应用SpringBatch批处理框架)(1)

此分层体系结构突出了三个主要的高级组件:应用程序、核心和基础架构。该应用程序包含开发人员使用SpringBatch编写的所有批处理作业和自定义代码。批处理核心包含启动和控制批处理作业所需的核心运行时类。它包括JobLauncher、Job和Step的实现。应用程序和核心都构建在公共基础架构之上。此基础结构包含公共读写器和服务(如RetryTemplate),应用程序开发人员(读写器,如ItemReader和ItemWriter)和核心框架本身(retry,它是自己的库)都使用这些服务。

下面介绍开发流程

本例完成 读取文件内容,经过处理后,将数据保存到数据库中

  • 引入依赖

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <dependency> <groupId>org.hibernate</groupId> <artifactId>hibernate-validator</artifactId> <version>6.0.7.Final</version> </dependency>

  • 应用配置文件

spring: datasource: driverClassName: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/Batch?serverTimezone=GMT+8 username: root password: ******* type: com.zaxxer.hikari.HikariDataSource hikari: minimumIdle: 10 maximumPoolSize: 200 autoCommit: true idleTimeout: 30000 poolName: MasterDatabookHikariCP maxLifetime: 1800000 connectionTimeout: 30000 connectionTestQuery: SELECT 1 --- spring: jpa: generateddl: false hibernate: ddlAuto: update openInView: true show-sql: true --- spring: batch: job: enabled: false #是否自动执行任务 initialize-schema: always #自动为我们创建数据库脚本

  • 开启批处理功能

@Configuration @EnableBatchProcessing public class BatchConfig extends DefaultBatchConfigurer{ }

  • 任务启动器

接着上一步的配置类BatchConfig重写对应方法

@Override protected JobLauncher createJobLauncher() throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(createJobRepository()); jobLauncher.afterPropertiesSet(); return jobLauncher; }

  • 任务存储

接着上一步的配置类BatchConfig重写对应方法

@Resource private PlatformTransactionManager transactionManager ; @Override protected JobRepository createJobRepository() throws Exception { JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean(); factory.setDatabaseType("mysql"); factory.setTransactionManager(transactionManager); factory.setDataSource(dataSource); factory.afterPropertiesSet(); return factory.getObject(); }

  • 定义JOB

@Bean public Job myJob(JobBuilderFactory builder @Qualifier("myStep")Step step){ return builder.get("myJob") .incrementer(new RunIdIncrementer()) .flow(step) .end() .listener(jobExecutionListener) .build(); }

  • 定义ItemReader读取器

@Bean public ItemReader<Person> reader(){ FlatFileItemReader<Person> reader = new FlatFileItemReader<>(); reader.setResource(new ClassPathResource("cvs/persons.cvs")); reader.setLineMapper(new DefaultLineMapper<Person>() { // 代码块 { setLineTokenizer(new DelimitedLineTokenizer(" ") { { setNames("id" "name"); } }) ; setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() { { setTargetType(Person.class) ; } }); } }); return reader; }

  • 定义ItemProcessor处理器

@Bean public ItemProcessor<Person Person2> processorPerson(){ return new ItemProcessor<Person Person2>() { @Override public Person2 process(Person item) throws Exception { Person2 p = new Person2() ; p.setId(item.getId()) ; p.setName(item.getName() " pk"); return p ; } } ; }

  • 定义ItemWriter写数据

@Resource private Validator<Person> validator ; @Resource private EntityManagerFactory entityManagerFactory ; @Bean public ItemWriter<Person2> writerPerson(){ JpaItemWriter<Person2> writer = null ; JpaItemWriterBuilder<Person2> builder = new JpaItemWriterBuilder<>() ; builder.entityManagerFactory(entityManagerFactory) ; writer = builder.build() ; return writer; }

  • 定义Step

@Bean public Step myStep(StepBuilderFactory stepBuilderFactory ItemReader<Person> reader ItemWriter<Person> writer ItemProcessor<Person Person> processor){ return stepBuilderFactory .get("myStep") .<Person Person>chunk(2) // Chunk的机制(即每次读取一条数据,再处理一条数据,累积到一定数量后再一次性交给writer进行写入操作) .reader(reader).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(2) .listener(new MyReadListener()) .processor(processor) .writer(writer).faultTolerant().skip(Exception.class).skipLimit(2) .listener(new MyWriteListener()) .build(); }

  • 定义相应的监听器

public class MyReadListener implements ItemReadListener<Person> { private Logger logger = LoggerFactory.getLogger(MyReadListener.class); @Override public void beforeRead() { } @Override public void afterRead(Person item) { System.out.println("reader after: " Thread.currentThread().getName()) ; } @Override public void onReadError(Exception ex) { logger.info("读取数据错误:{}" ex); } }

@Component public class MyWriteListener implements ItemWriteListener<Person> { private Logger logger = LoggerFactory.getLogger(MyWriteListener.class); @Override public void beforeWrite(List<? extends Person> items) { } @Override public void afterWrite(List<? extends Person> items) { System.out.println("writer after: " Thread.currentThread().getName()) ; } @Override public void onWriteError(Exception exception List<? extends Person> items) { try { logger.info(format("%s%n" exception.getMessage())); for (Person item : items) { logger.info(format("Failed writing BlogInfo : %s" item.toString())); } } catch (Exception e) { e.printStackTrace(); } } }

person.cvs文件内容

spring boot 2.3.0 过滤器(SpringBoot项目中应用SpringBatch批处理框架)(2)

实体类:

@Entity @Table(name = "t_person") public class Person { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Integer id ; private String name ; }

启动任务执行

@RestController @RequestMapping("/demo") public class DemoController { @Resource @Qualifier("myJob") private Job job ; @Resource private JobLauncher launcher ; @GetMapping("/index") public Object index() { JobParameters jobParameters = new JobParametersBuilder().toJobParameters() ; try { launcher.run(job jobParameters) ; } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) { e.printStackTrace(); } return "success" ; } }

启动服务,自动为我们创建了表

spring boot 2.3.0 过滤器(SpringBoot项目中应用SpringBatch批处理框架)(3)

执行任务

查看表情况

spring boot 2.3.0 过滤器(SpringBoot项目中应用SpringBatch批处理框架)(4)

spring boot 2.3.0 过滤器(SpringBoot项目中应用SpringBatch批处理框架)(5)

spring boot 2.3.0 过滤器(SpringBoot项目中应用SpringBatch批处理框架)(6)

完毕!!!

公众:Springboot实战案例锦集

spring boot 2.3.0 过滤器(SpringBoot项目中应用SpringBatch批处理框架)(7)

spring boot 2.3.0 过滤器(SpringBoot项目中应用SpringBatch批处理框架)(8)

spring boot 2.3.0 过滤器(SpringBoot项目中应用SpringBatch批处理框架)(9)

spring boot 2.3.0 过滤器(SpringBoot项目中应用SpringBatch批处理框架)(10)

猜您喜欢: