快捷搜索:  汽车  科技

系统架构图采集和web展示(基于Quartz设计采集系统并实现系统双活机制)

系统架构图采集和web展示(基于Quartz设计采集系统并实现系统双活机制)一个IP设备 那么这个时候 系统就会自动检测 自动为这个IP设备 生成一个定时任务.都需要有一个与之对应的定时器 而且 这个IP设备 是动态的 有可能将来 会动态的再添加1.自己定义一个QuartzTask普通的接口 然后实现几个这个接口 做为不同的定时任务.2.但这里需要的是 动态添加任务 因为系统启动后 不同的定时器 会做不同的工作并且 采集系统需要连接不同的IP的设置进行接口数据采集 这就要求 每个IP的设备

因为项目仅仅是不停的抓数据 没有弄成分布式的 但依然需要系统双活来保证系统稳定.

这个时候 我的思路是:这个在另一篇博文中有些的详细思路(在Timer定时任务中_基于Redis自己实现一套双机互备_双活系统_基于抢占原则) 可以搜索双活查到 但是实际

实现起来 碰到了一些问题 已经解决 记录下来.

来看一下定时任务Timer_Scheduled_quartz_如何在SpringBoot中整合

首先根据上面这个博文 已经实现了 一个简单的quartz定时:

1.自己定义一个QuartzTask普通的接口 然后实现几个这个接口 做为不同的定时任务.

2.但这里需要的是 动态添加任务 因为系统启动后 不同的定时器 会做不同的工作

并且 采集系统需要连接不同的IP的设置进行接口数据采集 这就要求 每个IP的设备

都需要有一个与之对应的定时器 而且 这个IP设备 是动态的 有可能将来 会动态的再添加

一个IP设备 那么这个时候 系统就会自动检测 自动为这个IP设备 生成一个定时任务.

3.所以可以看到 我们这个系统中的定时任务是动态添加的 但是应该是有2个定时任务是固定的.

4.一个定时任务是 用来检测系统 双活的 这个定时任务做的事情是 实时检测自身状态 没问题 就

更新最新时间 这样别的单点系统 在检测是时候发现 有一台机器上活着的 并且没有超时 就不会

上线 让出定时任务执行的机会 给主机 所有的从机属于待命状态.我们这样做 为了保证 集群

中的所有机器中 每个定时任务应该是只执行一份 而 我们这里还要求 不应该是每个定时任务只有

一份在集群中执行 而是集群中的 所有实例中 同时只有一个实例中的定时任务在执行.

5.然后还需要一个定时任务 是检测IP设备变化的 当新添加了新的IP设备以后 就需要动态 给

这个IP设备 添加一个定时任务 而添加的这定时任务就是需要为这个新的IP设备 添加

6.然后还需要一个定时任务 就是用来获取数据的 周期获取IP设备的数据 而这个定时任务 是需要

为每个IP设备都生成一个定时任务的 所以这里定时任务是需要动态的生成的.

7.然后还需要一个定时任务 用来发送ping命令的 定时给每个IP设备 发送ping命令 这样如果能正常

得到回应 那么就更新一下时间表 {type:ping time:2022-06-20 15:59:34 IP:192.168.1.110} 这样

就说明该IP设备 一直都是活动的.而某些IP设备是 连接上以后 它会自动的推送 ping过来 同时推送

心跳数据过来 这里ping过来指的是连接没有问题了 而心跳过来 才指的是IP设备的接口可以正常获取数据了.

8.还需要一个定时任务 来动态的循环所有的IP设备 然后去连接上所有的IP设备 这个是定时执行

如果定时执行发现设备掉线超时了他就会再去连接一下.

9.这些定时任务 只有一个定时任务是在启动的时候 利用quartz的配置类 走的 其他的都是动态创建的.

10.关于分布式 本来想通过Quartz的分布式功能 来实现的 因为quartz本身自带分布式功能 通过

结合数据库和锁实现的 这个时候 先在数据库表中创建quartz的表 然后在QuartzScheduleConfig中 去配置上数据库dataSource 就可以了 但是这个时候发现有个问题 就是任务是我自己定义的任务

这样的话 比较方便实现自己的业务 因为我都每个定时任务 都是需要有参数的 这样 我自己定义的任务 里面写上自己定义的参数 比如每个任务创建的时候 都需要把它对应的IP设备的IP 传入进去.

而 自己定义的任务需要用:

MethodInvokingJobDetailFactoryBean factoryBean = new MethodInvokingJobDetailFactoryBean(); factoryBean.setTargetObject(context.getBean(name)); factoryBean.setName(name); factoryBean.setTargetMethod(QuartzScheduleTask.Class.getMethods()[0].getName()); factoryBean.afterPropertiesSet(); 这种方式添加任务 而这里factoryBean.setTargetObject(context.getBean(name)); setTargetObject这个就是设置 定时器对象的 这个对象是自己写的一个接口 定义了一个execute方法 但是MethodInvokingJobDetailFactoryBean 这个类是不支持序列化的 也就是说 没办法配合quartz去实现分布式 不用quartz的分布式 就没办法实现系统的多活 因为quartz 提供的分布式功能 本身就可以保证 当一个集群去运行定时器的时候 可以保证一个定时器 只在同一个服务器实例上运行.而MethodInvokingJobDetailFactoryBean 不能序列化 就会导致 对应的task 没办法存入数据库 也就没办法实现分布式多活了.

系统架构图采集和web展示(基于Quartz设计采集系统并实现系统双活机制)(1)

也就是下面的方法就不行了 就考虑使用Job来创建JobDetail 但是这样的话 我们自己的任务

是需要继承Job这个quartz提供的类的. 本来是用下面的这个方法来做配置的.但是有问题

@Configuration @ConfigurationProperties("quartz") @Getter @Setter public class QuartzScheduleConfig { private String schedulerName; private String threadCount; private String threadNamePrefix; private String tasks; private final ApplicationContext context; @Autowired public QuartzScheduleConfig(ApplicationContext context) { this.context = context; } @Bean public SchedulerFactoryBean schedulerFactoryBean() { Properties properties = new Properties(); properties.setProperty("org.quartz.threadPool.threadCount" threadCount); properties.setProperty("org.quartz.threadPool.threadNamePrefix" threadNamePrefix); SchedulerFactoryBean factory = new SchedulerFactoryBean(); factory.setSchedulerName(schedulerName); factory.setQuartzProperties(properties); return factory; } @Bean public Scheduler scheduler() throws Exception { Scheduler scheduler = schedulerFactoryBean().getScheduler(); scheduler.scheduleJobs(createJobDetails() true); return scheduler; } /** * 创建JobDetail * 使用是Spring的MethodInvokingJobDetailFactoryBean来创建JobDetail * 使用Spring的ronTriggerFactoryBean来创建CronTrigger * * @return Map<JobDetail Set<CronTrigger>> */ private Map<JobDetail Set<? extends Trigger>> createJobDetails(){ Set<String> taskSet = StringUtils.commaDelimitedListToSet(tasks); Map<JobDetail Set<? extends Trigger>> map = new HashMap<>(taskSet.size()); for (String task : taskSet) { String[] nameAndCron = task.split(":"); String name = StringUtils.uncapitalize(nameAndCron[0]); String cron = nameAndCron[1]; MethodInvokingJobDetailFactoryBean factoryBean = new MethodInvokingJobDetailFactoryBean(); factoryBean.setTargetObject(context.getBean(name)); factoryBean.setName(name); factoryBean.setTargetMethod(QuartzScheduleTask.class.getMethods()[0].getName()); factoryBean.afterPropertiesSet(); CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean(); cronTriggerFactoryBean.setCronExpression(cron); cronTriggerFactoryBean.setJobDetail(factoryBean.getObject()); cronTriggerFactoryBean.setName(name); cronTriggerFactoryBean.afterPropertiesSet(); Set<CronTrigger> cronTriggerSet = new HashSet<>(1); cronTriggerSet.add(cronTriggerFactoryBean.getObject()); map.put(factoryBean.getObject() cronTriggerSet); } return map; } }

系统架构图采集和web展示(基于Quartz设计采集系统并实现系统双活机制)(2)

Spring的MethodInvokingJobDetailFactoryBean是不支持序列化的就导致 quartz无法存入数据库了

会报错 提示MethodInvokingJobDetailFactoryBean无法序列化.

11.这个时候怎么办? 就想到了 使用quartz中支持分布式的JobDetail来创建Job 这个时候我们 实现的自定义的任务 就需要继承Job这个类了 不能自己写一个接口了 这时候因为继承Job的这个定时任务 需要传入上下文context 而且 这个是系统调用的 不知道怎么传入初始化的参数 所以没能实现

但是后来查找资料 发现可以通过jobMapData来给新创建的定时任务传递参数 如果这样可以的话

好像也行 这样的话就可以实现quartz的分布式了 也就是能保证集群中同时只有一个定时任务在执行.这样也就相当于实现了 系统的多活了 因为我们的系统中大部分都是定时任务 而非定时任务部分 在spingboot中做的接口 可以通过nginx这样来做负载均衡就行.

12.也就是使用:jobDataMap 用来给 job 传递数据;如果你的Job不是自己定义的接口 而是用

extends Job来实现的话 那么就可以使用这个jobDataMap 来在创建这个job的时候 给这个job传递

参数 只要有这个功能 你就可以在比如 有新的IP设备来的时候 就把参数先记录在某个HashMap中

然后 在创建job的时候 获取参数放到对应的job中去.

JobDataMap属于JobDetail的一部分
可以在构建JobDataMap时传递参数

首先是在创建JobDetail的时候把参数数据填上:

//创建JobDetail JobDetail jobDetail = JobBuilder.newJob(MyJob.class) //唯一标识 .withIdentity("jobDetail1" "group1") //添加参数通过usingJobData方法,传递给job .usingJobData("name" "yx") .build();

系统架构图采集和web展示(基于Quartz设计采集系统并实现系统双活机制)(3)

然后就是用的时候在继承Job的任务类的execute方法中 从上下文中获取这个参数:

package com.gblfy.job; import org.quartz.Job; import org.quartz.JobDataMap; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import java.time.LocalTime; public class MyJob implements Job { @Override public void execute(JobExecutionContext context) throws JobExecutionException { LocalTime localTime = LocalTime.now(); /* 1.从上下文中先获取getJobDetail 2.再从getJobDetail中获取JobDataMap 3.最后,从JobDataMap中,根据key获取对应属性的值 */ JobDataMap jobDataMap = context.getJobDetail().getJobDataMap(); String name = jobDataMap.getString("name"); System.out.println("hello " name "我正在执行" localTime); } }

系统架构图采集和web展示(基于Quartz设计采集系统并实现系统双活机制)(4)

这样的话 也就是说 我们之前构想的利用quartz实现分布式 多活是可以的 但是 缺不满足我们的另一个想法 我们需要 对系统进行实时的监控 这部分还是需要我们自己来做 所以 后来就没有在用

quartz来做多活了.

12.来看一下 我们做的quartz任务管理的部分代码:

首先QuartzSchduleConfig这个是配置类:

系统启动会自动加载这个类 如果 系统配置了 任务的话会自动在这个地方进行加载:

import lombok.Getter; import lombok.Setter; import org.quartz.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.quartz.CronTriggerFactoryBean; import org.springframework.scheduling.quartz.JobDetailFactoryBean; import org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean; import org.springframework.scheduling.quartz.SchedulerFactoryBean; import org.springframework.util.StringUtils; import javax.sql.DataSource; import java.text.ParseException; import java.util.*; @Configuration public class QuartzScheduleConfig { @Value("${quartz.scheduler-name}") private String schedulerName; @Value("${quartz.thread-count}") private String threadCount; @Value("${quartz.thread-name-prefix}") private String threadNamePrefix; @Value("${quartz.tasks}") private String tasks; static final String jobGroupName = "jobGroup"; static final String triggerGroupName = "triggerGroup"; // @Autowired // @Qualifier(value = "dataSource") // private DataSource dataSource; // 注意这里如果打开的话 quartz在添加任务的时候就会自动的吧任务添加到数据库中的数据表去 // private final ApplicationContext context; @Autowired public QuartzScheduleConfig(ApplicationContext context) { this.context = context; } @Bean public SchedulerFactoryBean schedulerFactoryBean(){ Properties properties = new Properties(); properties.setProperty("org.quartz.threadPool.threadCount" threadCount); properties.setProperty("org.quartz.threadPool.threadNamePrefix" threadNamePrefix); SchedulerFactoryBean factoryBean = new SchedulerFactoryBean(); //factoryBean.setDataSource(dataSource); factoryBean.setSchedulerName(schedulerName); factoryBean.setQuartzProperties(properties); return factoryBean; } @Bean public Scheduler scheduler() throws Exception{ Scheduler scheduler = schedulerFactoryBean().getScheduler(); scheduler.scheduleJobs(createJobDetails() true); return scheduler; } /** * 创建JobDetail * 使用Spring的MethodInvokingJobDetailFactoryBean来创建Detail * @return * @throws NoSuchMethodException * @throws ClassNotFoundException * @throws ParseException */ private Map<JobDetail Set<? extends Trigger>> createJobDetails() throws NoSuchMethodException ClassNotFoundException ParseException { Set<String> taskSet = StringUtils.commaDelimitedListToSet(tasks); Map<JobDetail Set<? extends Trigger>> map = new HashMap<>(taskSet.size()); for (String task : taskSet) { String[] nameAndCron = task.split(":"); String name = StringUtils.uncapitalize(nameAndCron[0]); String cron = nameAndCron[1]; //集群模式下不能使用 MethodInvokingJobDetailFactoryBean 不支持序列化 MethodInvokingJobDetailFactoryBean factoryBean = new MethodInvokingJobDetailFactoryBean(); factoryBean.setTargetObject(context.getBean(name)); factoryBean.setName(name); factoryBean.setTargetMethod(QuartzScheduleTask.class.getMethods()[0].getName()); factoryBean.afterPropertiesSet(); //JobDetail jobDetail = org.quartz.JobBuilder.newJob(ReportTimeTask.class).withDescription(name).withIdentity(name jobGroupName).build(); CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean(); cronTriggerFactoryBean.setCronExpression(cron); cronTriggerFactoryBean.setJobDetail(factoryBean.getObject()); cronTriggerFactoryBean.setName(name); cronTriggerFactoryBean.afterPropertiesSet(); Set<CronTrigger> cronTriggerSet = new HashSet<>(); cronTriggerSet.add(cronTriggerFactoryBean.getObject()); map.put(factoryBean.getObject() cronTriggerSet); } return map; } }

系统架构图采集和web展示(基于Quartz设计采集系统并实现系统双活机制)(5)

13.然后我们再来看 如何写一个quartz的管理类 来实现任务的添加 删除 修改.

import org.quartz.*; import org.quartz.impl.StdSchedulerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.scheduling.quartz.CronTriggerFactoryBean; import org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean; import java.util.concurrent.ConcurrentHashMap; /** * 定时任务管理器 管理任务 */ public class QuartzManager { private static SchedulerFactory schedulerFactory = new StdSchedulerFactory(); static final String jobGroupName = "jobGroup"; static final String triggerGroupName = "triggerGroup"; // 保存心跳定时任务管理表对象 public static ConcurrentHashMap<String QuartzTaskEntity> AllTasksManager = new ConcurrentHashMap<>(); private final ApplicationContext context; @Autowired public QuartzManager(ApplicationContext context) { this.context = context; } /** * 添加任务 * @param name 任务名称 * @param cron 任务cron表达式 * @param clazz 任务类 */ public static void addTask(String name String cron QuartzScheduleTask clazz){ try{ //添加到任务管理对象中 QuartzTaskEntity quartzTaskEntity = new QuartzTaskEntity(); quartzTaskEntity.setName(name); quartzTaskEntity.setCron(cron); quartzTaskEntity.setQuartzScheduleTask(clazz); AllTasksManager.put(name quartzTaskEntity); Scheduler scheduler = schedulerFactory.getScheduler(); //任务名 任务组 任务执行类 MethodInvokingJobDetailFactoryBean factoryBean = new MethodInvokingJobDetailFactoryBean(); factoryBean.setTargetObject(clazz); factoryBean.setName(name); factoryBean.setTargetMethod(QuartzScheduleTask.class.getMethods()[0].getName()); factoryBean.afterPropertiesSet(); factoryBean.setGroup(jobGroupName); //JobDetail jobDetail = org.quartz.JobBuilder.newJob(clazz.getClass()). // withDescription(name).withIdentity(name jobGroupName).build(); CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean(); cronTriggerFactoryBean.setCronExpression(cron); cronTriggerFactoryBean.setJobDetail(factoryBean.getObject()); cronTriggerFactoryBean.setName(name); cronTriggerFactoryBean.afterPropertiesSet(); cronTriggerFactoryBean.setGroup(triggerGroupName); //JobDetail jobDetail = new JobDetailImpl(name jobGroupName clazz); //scheduler.scheduleJob(factoryBean.getObject() cronTriggerFactoryBean.getObject()); scheduler.scheduleJob(factoryBean.getObject() cronTriggerFactoryBean.getObject()); if(!scheduler.isShutdown()){ scheduler.start(); } }catch (Exception e){ e.printStackTrace(); } } /** * 修改任务 * @return */ public static void modifyTask(String name String cron){ try{ Scheduler scheduler = schedulerFactory.getScheduler(); JobKey jobKey = new JobKey(name); JobDetail jobDetail = scheduler.getJobDetail(jobKey); if(jobDetail!=null){ removeJob(name); QuartzScheduleTask quartzScheduleTask = AllTasksManager.get(name).getQuartzScheduleTask(); addTask(name cron quartzScheduleTask); } }catch (Exception e){ e.printStackTrace(); } } /** * 获取任务实例 * @param name * @return */ public static QuartzScheduleTask getTask(String name){ QuartzScheduleTask quartzScheduleTask = AllTasksManager.get(name).getQuartzScheduleTask(); return quartzScheduleTask; } /** * 移除任务 * @param name */ public static void removeJob(String name){ try{ Scheduler scheduler = schedulerFactory.getScheduler(); //停止触发器 TriggerKey triggerKey = new TriggerKey(name triggerGroupName); scheduler.pauseTrigger(triggerKey); //移除触发器 scheduler.unscheduleJob(triggerKey); //删除任务 JobKey jobKey = new JobKey(name jobGroupName); scheduler.deleteJob(jobKey); }catch (Exception e){ e.printStackTrace(); } } }

系统架构图采集和web展示(基于Quartz设计采集系统并实现系统双活机制)(6)

上面就实现了动态的任务的 添加 删除 修改 但是注意上面这个做法是不支持序列化的 因为用到了

MethodInvokingJobDetailFactoryBean 这个不支持序列化的bean来实现创建jobDetail.

并且 我这里的job 是我自己定义的 可以看看

14.定义task 这里是自定义的task 不是继承自job的.

定义一个顶层抽象的task:

/** * 定时任务接口 * @author hctel */ public abstract class QuartzScheduleTask { /** * 该方法定义某个具体的定时任务 */ public abstract void execute(); }

系统架构图采集和web展示(基于Quartz设计采集系统并实现系统双活机制)(7)

15.然后再去定义一个具体的任务:

import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.sgcc.lnsoft.common.collector.constans.CommonDefine; import com.sgcc.lnsoft.common.collector.util.CommonUtil; import com.sgcc.lnsoft.common.core.redis.RedisCache; import com.sgcc.lnsoft.common.core.utils.DateUtil; import com.sgcc.lnsoft.common.core.utils.WebUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; @Slf4j @Component //这里注意 如果启动的时候就希望加载到内存中去 并通过上面的 //我们说的配置类 加载的 可以加上这个@Component 加上以后 凡是在yml 或properties //文件中配置的任务就会自动加载到内存中 如果不加这个@Component 那么后面我们可以 //通过我们那个管理类中的addTask方法 动态的添加也可以的. @AllArgsConstructor public class ReportTimeTask extends QuartzScheduleTask { Private String IPStr; @Override public void execute() { } }

系统架构图采集和web展示(基于Quartz设计采集系统并实现系统双活机制)(8)

16.然后我们再看一下去利用我们的管理类 去添加一个任务:

ReportTimeTask reportTimeTask = new ReportTimeTask(IPStr); QuartzManager.addTask(IPStr "_reportTimeTask" "cron表达式" reportTimeTask);

系统架构图采集和web展示(基于Quartz设计采集系统并实现系统双活机制)(9)

这样就添加了一个任务 任务添加以后 就会根据cron表达式自动执行了.

17.删除一个任务和修改一个任务就非常简单 这里不说了.

18.上面说的 如果想使用分布式的话 quartz 需要借助数据库 需要先把quartz的mysql表都创建出来

然后在配置类中去 加上dataSource 然后再 创建jobDetail的时候一定要注意 这个时候 需要用:

JobDetail jobDetail = org.quartz.JobBuilder.newJob(ReportTimeTask.class) .withDescription(name) .withIdentity(name jobGroupName) .setJobData(jobDataMap).build();

系统架构图采集和web展示(基于Quartz设计采集系统并实现系统双活机制)(10)

这种方式来创建jobDetail 注意这里的ReportTimeTask 这个类就必须是继承了quartz提供的Job的

这个类才行.同时可以看到 里面有个setJobData() 这里 有个jobDataMap 这个就可以给我们的Job

传递参数了 我们就可以通过这种方式来 动态的给我们的Job传递参数 这样既可以实现分布式 也可以给我们的Job动态传递参数了.

系统架构图采集和web展示(基于Quartz设计采集系统并实现系统双活机制)(11)

系统架构图采集和web展示(基于Quartz设计采集系统并实现系统双活机制)(12)

​编辑

猜您喜欢: