快捷搜索:  汽车  科技

springboot 快速计算列表数据:SpringBootSpringBatchQuartz整合定时批量任务

springboot 快速计算列表数据:SpringBootSpringBatchQuartz整合定时批量任务spring: datasource: username: thinklink password: thinklink url: jdbc:postgresql://172.16.205.54:5432/thinklink driver-class-name: org.postgresql.Driver batch: job: enabled: false server: port: 8073 #upgrade-dispatch-base-url: http://172.16.205.125:8080/api/rpc/dispatch/command/ upgrade-dispatch-base-url: http://172.16.205.211:8080/api/noauth/rpc/dispatch/command/

一、引言

最近一周,被借调到其他部门,赶一个紧急需求,需求内容如下:

PC网页触发一条设备升级记录(下图),后台要定时批量设备更新。这里定时要用到Quartz,批量数据处理要用到SpringBatch,二者结合,可以完成该需求。

由于之前,没有用过Springbatch,于是上网查了下资料,发现可参考的不是很多,于是只能去慢慢的翻看官方文档。

https://docs.spring.io/spring-Batch/4.1.x/reference/html/

遇到不少问题,就记录一下吧。

springboot 快速计算列表数据:SpringBootSpringBatchQuartz整合定时批量任务(1)

二、代码具体实现

由于公司一些原因,完整代码不能放到GitHub上。这里大概贴一些,如果需要完整代码的话可以私信【628】获取

1、pom文件

<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> </dependencies>

2、application.yaml文件

spring: datasource: username: thinklink password: thinklink url: jdbc:postgresql://172.16.205.54:5432/thinklink driver-class-name: org.postgresql.Driver batch: job: enabled: false server: port: 8073 #upgrade-dispatch-base-url: http://172.16.205.125:8080/api/rpc/dispatch/command/ upgrade-dispatch-base-url: http://172.16.205.211:8080/api/noauth/rpc/dispatch/command/ # 每次批量处理的数据量,默认为5000 batch-size: 5000

3、Service实现类,触发批处理任务的入口,执行一个job

@Service("batchService") public class BatchServiceImpl implements BatchService { // 框架自动注入 @Autowired private JobLauncher jobLauncher; @Autowired private Job updateDeviceJob; /** * 根据 taskId 创建一个Job * @param taskId * @throws Exception */ @Override public void createBatchJob(String taskId) throws Exception { JobParameters jobParameters = new JobParametersBuilder() .addString("taskId" taskId) .addString("uuid" UUID.randomUUID().toString().replace("-" "")) .toJobParameters(); // 传入一个Job任务和任务需要的参数 jobLauncher.run(updateDeviceJob jobParameters); } }

4、SpringBatch配置类,此部分最重要(☆☆☆☆☆)

@Configuration public class BatchConfiguration { private static final Logger log = LoggerFactory.getLogger(BatchConfiguration.class); @Value("${batch-size:5000}") private int batchSize; // 框架自动注入 @Autowired public JobBuilderFactory jobBuilderFactory; // 框架自动注入 @Autowired public StepBuilderFactory stepBuilderFactory; // 数据过滤器,对从数据库读出来的数据,注意进行操作 @Autowired public TaskItemProcessor taskItemProcessor; // 接收job参数 public Map<String JobParameter> parameters; public Object taskId; @Autowired private JdbcTemplate jdbcTemplate; // 读取数据库操作 @Bean @StepScope public JdbcCursorItemReader<DispatchRequest> itemReader(dataSource dataSource) { String querysql = " SELECT " " e. ID AS taskId " " e.user_id AS userId " " e.timing_startup AS startTime " " u.device_id AS deviceId " " d.app_name AS appName " " d.compose_File AS composeFile " " e.failure_retry AS failureRetry " " e.tetry_times AS retryTimes " " e.device_managered AS deviceManagered " " FROM " " eiot_upgrade_task e " " LEFT JOIN eiot_upgrade_device u ON e. ID = u.upgrade_task_id " " LEFT JOIN eiot_app_detail d ON e.app_id = d. ID " " WHERE " " ( " " u.device_upgrade_status = 0 " " OR u.device_upgrade_status = 2" " )" " AND e.tetry_times > u.retry_times " " AND e. ID = ?"; return new JdbcCursorItemReaderBuilder<DispatchRequest>() .name("itemReader") .sql(querySql) .dataSource(dataSource) .queryArguments(new Object[]{parameters.get("taskId").getValue()}) .rowMapper(new DispatchRequest.DispatchRequestRowMapper()) .build(); } // 将结果写回数据库 @Bean @StepScope public ItemWriter<ProcessResult> itemWriter() { return new ItemWriter<ProcessResult>() { private int updateTaskStatus(DispatchRequest dispatchRequest int status) { log.info("update taskId: {} deviceId: {} to status {}" dispatchRequest.getTaskId() dispatchRequest.getDeviceId() status); Integer retryTimes = jdbcTemplate.queryForObject( "select retry_times from eiot_upgrade_device where device_id = ? and upgrade_task_id = ?" new Object[]{ dispatchRequest.getDeviceId() dispatchRequest.getTaskId()} Integer.class ); retryTimes = 1; int updateCount = jdbcTemplate.update("update eiot_upgrade_device set device_upgrade_status = ? retry_times = ? " "where device_id = ? and upgrade_task_id = ?" status retryTimes dispatchRequest.getDeviceId() dispatchRequest.getTaskId()); if (updateCount <= 0) { log.warn("no task updated"); } else { log.info("count of {} task updated" updateCount); } // 最后一次重试 if (status == STATUS_DISPATCH_FAILED && retryTimes == dispatchRequest.getRetryTimes()) { log.info("the last retry of {} failed inc deviceManagered" dispatchRequest.getTaskId()); return 1; } else { return 0; } } @Override @Transactional public void write(List<? extends ProcessResult> list) throws Exception { Map taskMap = jdbcTemplate.queryForMap( "select device_managered device_count task_status from eiot_upgrade_task where id = ?" list.get(0).getDispatchRequest().getTaskId() // 我们认定一个批量里面,taskId都是一样的 ); int deviceManagered = (int)taskMap.get("device_managered"); Integer deviceCount = (Integer) taskMap.get("device_count"); if (deviceCount == null) { log.warn("deviceCount of task {} is null" list.get(0).getDispatchRequest().getTaskId()); } int taskStatus = (int)taskMap.get("task_status"); for (ProcessResult result: list) { deviceManagered = updateTaskStatus(result.getDispatchRequest() result.getStatus()); } if (deviceCount != null && deviceManagered == deviceCount) { taskStatus = 2; //任务状态 0:待升级,1:升级中,2:已完成 } jdbcTemplate.update("update eiot_upgrade_task set device_managered = ? task_status = ? " "where id = ?" deviceManagered taskStatus list.get(0).getDispatchRequest().getTaskId()); } }; } /** * 定义一个下发更新的 job * @return */ @Bean public Job updateDeviceJob(Step updateDeviceStep) { return jobBuilderFactory.get(UUID.randomUUID().toString().replace("-" "")) .listener(new JobListener()) // 设置Job的监听器 .flow(updateDeviceStep)// 执行下发更新的Step .end() .build(); } /** * 定义一个下发更新的 step * @return */ @Bean public Step updateDeviceStep(JdbcCursorItemReader<DispatchRequest> itemReader ItemWriter<ProcessResult> itemWriter) { return stepBuilderFactory.get(UUID.randomUUID().toString().replace("-" "")) .<DispatchRequest ProcessResult> chunk(batchSize) .reader(itemReader) //根据taskId从数据库读取更新设备信息 .processor(taskItemProcessor) // 每条更新信息,执行下发更新接口 .writer(itemWriter) .build(); } // job 监听器 public class JobListener implements JobExecutionListener { @Override public void beforeJob(JobExecution jobExecution) { log.info(jobExecution.getJobInstance().getJobName() " before... "); parameters = jobExecution.getJobParameters().getParameters(); taskId = parameters.get("taskId").getValue(); log.info("job param taskId : " parameters.get("taskId")); } @Override public void afterJob(JobExecution jobExecution) { log.info(jobExecution.getJobInstance().getJobName() " after... "); // 当所有job执行完之后,查询设备更新状态,如果有失败,则要定时重新执行job String sql = " SELECT " " count(*) " " FROM " " eiot_upgrade_device d " " LEFT JOIN eiot_upgrade_task u ON d.upgrade_task_id = u. ID " " WHERE " " u. ID = ? " " AND d.retry_times < u.tetry_times " " AND ( " " d.device_upgrade_status = 0 " " OR d.device_upgrade_status = 2 " " ) "; // 获取更新失败的设备个数 Integer count = jdbcTemplate.queryForObject(sql new Object[]{taskId} Integer.class); log.info("update device failure count : " count); // 下面是使用Quartz触发定时任务 // 获取任务时间 单位秒 // String time = jdbcTemplate.queryForObject(sql new Object[]{taskId} Integer.class); // 此处方便测试,应该从数据库中取taskId对应的重试间隔,单位秒 Integer millSecond = 10; if(count != null && count > 0){ String jobName = "UpgradeTask_" taskId; String reTaskId = taskId.toString(); Map<String Object> params = new HashMap<>(); params.put("jobName" jobName); params.put("taskId" reTaskId); if (QuartzManager.checkNameNotExist(jobName)) { QuartzManager.scheduleRunOnceJob(jobName RunOnceJobLogic.class params millSecond); } } } } }

5、Processor,处理每条数据,可以在此对数据进行过滤操作

@Component("taskItemProcessor") public class TaskItemProcessor implements ItemProcessor<DispatchRequest ProcessResult> { public static final int STATUS_DISPATCH_FAILED = 2; public static final int STATUS_DISPATCH_SUCC = 1; private static final Logger log = LoggerFactory.getLogger(TaskItemProcessor.class); @Value("${upgrade-dispatch-base-url:http://localhost/api/v2/rpc/dispatch/command/}") private String dispatchUrl; @Autowired JdbcTemplate jdbcTemplate; /** * 在这里,执行 下发更新指令 的操作 * @param dispatchRequest * @return * @throws Exception */ @Override public ProcessResult process(final DispatchRequest dispatchRequest) { // 调用接口,下发指令 String url = dispatchUrl dispatchRequest.getDeviceId() "/" dispatchRequest.getUserId(); log.info("request url:" url); RestTemplate restTemplate = new RestTemplate(); HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON_UTF8); MultiValueMap<String String> params = new LinkedMultiValueMap<String String>(); JSONObject jsonOuter = new JSONObject(); JSONObject jsonInner = new JSONObject(); try { jsonInner.put("jobId" dispatchRequest.getTaskId()); jsonInner.put("name" dispatchRequest.getName()); jsonInner.put("composeFile" Base64Util.bytesToBase64Str(dispatchRequest.getComposeFile())); jsonInner.put("policy" new JSONObject().put("startTime" dispatchRequest.getPolicy())); jsonInner.put("timestamp" dispatchRequest.getTimestamp()); jsonOuter.put("method" "updateApp"); jsonOuter.put("params" jsonInner); } catch (JSONException e) { log.info("JSON convert Exception :" e); }catch (IOException e) { log.info("Base64Util bytesToBase64Str :" e); } log.info("request body json :" jsonOuter); HttpEntity<String> requestEntity = new HttpEntity<String>(jsonOuter.toString() headers); int status; try { ResponseEntity<String> response = restTemplate.postForEntity(url requestEntity String.class); log.info("response :" response); if (response.getStatusCode() == HttpStatus.OK) { status = STATUS_DISPATCH_SUCC; } else { status = STATUS_DISPATCH_FAILED; } }catch (Exception e){ status = STATUS_DISPATCH_FAILED; } return new ProcessResult(dispatchRequest status); } }

6、封装数据库返回数据的实体Bean,注意静态内部类

public class DispatchRequest { private String taskId; private String deviceId; private String userId; private String name; private byte[] composeFile; private String policy; private String timestamp; private String md5; private int failureRetry; private int retryTimes; private int deviceManagered; // 省略构造函数,setter/getter/tostring方法 //...... public static class DispatchRequestRowMapper implements RowMapper<DispatchRequest> { @Override public DispatchRequest mapRow(ResultSet resultSet int i) throws SQLException { DispatchRequest dispatchRequest = new DispatchRequest(); dispatchRequest.setTaskId(resultSet.getString("taskId")); dispatchRequest.setUserId(resultSet.getString("userId")); dispatchRequest.setPolicy(resultSet.getString("startTime")); dispatchRequest.setDeviceId(resultSet.getString("deviceId")); dispatchRequest.setName(resultSet.getString("appName")); dispatchRequest.setComposeFile(resultSet.getBytes("composeFile")); dispatchRequest.setTimestamp(DateUtil.DateToString(new Date())); dispatchRequest.setRetryTimes(resultSet.getInt("retryTimes")); dispatchRequest.setFailureRetry(resultSet.getInt("failureRetry")); dispatchRequest.setDeviceManagered(resultSet.getInt("deviceManagered")); return dispatchRequest; } } }

7、启动类上要加上注解

@SpringBootApplication @EnableBatchProcessing public class Application { public static void main(String[] args) { SpringApplication.run(Application.class args); } } 三、小结

其实SpringBatch并没有想象中那么好用,当从数据库中每次取5000条数据后,进入processor中是逐条处理的,这个时候不能不行操作,等5000条数据处理完之后,再一次性执行ItemWriter方法。

在使用的过程中,最坑的地方是ItemReader和ItemWriter这两个地方,如何执行自定义的Sql,参考文中代码就行。

至于Quartz定时功能,很简单,只要定时创建SpringBatch里面的Job,让这个job启动就好了,此处就不在给出了,贴的代码太多了。

猜您喜欢: