之前写过quartz或者引用过quartz的一些文章,有很多人给我发消息问quartz的相关问题,
quartz 报错:java.lang.classNotFoundException
quartz源码分析之深刻理解job,sheduler,calendar,trigger及listener之间的关系
Quartz框架多个trigger任务执行出现漏执行的问题分析--转
趁着年底比较清闲,把quartz的问题整理了一下,顺带翻了翻源码,做了一些总结,希望能帮助到一些人或者减少人们探索的时间。
注意,使用版本为quartz2.2.3 spring boot2.1.3
1.quartz的核心组件
1.1 Job组件
1.1.1Job
Job负责任务执行的逻辑,所有逻辑在execute()方法中,执行所需要的数据存放在JobExecutionContext 中
Job实例:@PersistJobDataAfterExecution @DisallowConcurrentExecutionpublic class ColorJob implementsJob {private static Logger log = LoggerFactory.getLogger(ColorJob.class);//parameter names specific to this job public static final String FAVORITECOLOR = "favorite color";public static final String EXECUTIONCOUNT = "count";//Since Quartz will re-instantiate a class every time it//gets executed, members non-static member variables can//not be used to maintain state! private int counter = 1;//// /
/ Quartz requires a public empty constructor so that the / scheduler can instantiate the class whenever it needs. /
// publicColorJob() { }/////
Called by the{@linkorg.quartz.Scheduler}
when a / {@linkorg.quartz.Trigger}
fires that is associated with / the Job
. / / /@throwsJobExecutionException / if there is an exception while executing the job./*/ public voidexecute(JobExecutionContext context)throwsJobExecutionException {//This job simply prints out its job name and the//date and time that it is running JobKey jobKey =context.getJobDetail().getKey();//Grab and print passed parameters JobDataMap data =context.getJobDetail().getJobDataMap(); String favoriteColor=data.getString(FAVORITE_COLOR);int count =data.getInt(EXECUTION_COUNT); _log.info("ColorJob: " + jobKey + " executing at " + new Date() + "n" + " favorite color is " + favoriteColor + "n" + " execution count (from job map) is " + count + "n" + " execution count (from job member variable) is " +_counter);//increment the count and store it back into the//job map so that job state can be properly maintained count++; data.put(EXECUTION_COUNT, count);//Increment the local member variable//This serves no real purpose since job state can not//be maintained via member variables! _counter++; } }
1.1.2 JobDetail存储Job的信息
主要负责
1.指定执行的Job类,唯一标识(job名称和组别 名称)
2.存储JobDataMap信息//job1 will only run 5 times (at start time, plus 4 repeats), every 10 seconds JobDetail job1 = newJob(ColorJob.class).withIdentity("job1", "group1").build();//pass initialization parameters into the job job1.getJobDataMap().put(ColorJob.FAVORITECOLOR, "Green"); job1.getJobDataMap().put(ColorJob.EXECUTIONCOUNT,1);
数据库存储如下:
1.1.3 Quartz JobBuilder提供了一个链式api创建JobDetail@BeanpublicJobDetail jobDetail() {return JobBuilder.newJob().ofType(SampleJob.class) .storeDurably() .withIdentity("QrtzJobDetail") .withDescription("Invoke Sample Job service...") .build(); }
1.1.4 Spring JobDetailFactoryBean
spring提供的一个创建JobDetail的方式工厂bean@BeanpublicJobDetailFactoryBean jobDetail() { JobDetailFactoryBean jobDetailFactory= newJobDetailFactoryBean(); jobDetailFactory.setJobClass(SampleJob.class); jobDetailFactory.setDescription("Invoke Sample Job service..."); jobDetailFactory.setDurability(true);returnjobDetailFactory; }
1.2 Trigger组件
trigger的状态不同trigger的状态 // STATES String STATEWAITING = "WAITING"; String STATEACQUIRED = "ACQUIRED"; String STATEEXECUTING = "EXECUTING"; String STATECOMPLETE = "COMPLETE"; String STATEBLOCKED = "BLOCKED"; String STATEERROR = "ERROR"; String STATEPAUSED = "PAUSED"; String STATEPAUSEDBLOCKED = "PAUSEDBLOCKED"; String STATE_DELETED = "DELETED";
状态的表结构
trigger的类型// TRIGGER TYPES /// Simple Trigger type. //String TTYPE_SIMPLE = "SIMPLE"; /// Cron Trigger type. //String TTYPECRON = "CRON"; /// Calendar Interval Trigger type. /*/String TTYPECALINT = "CALINT"; /// Daily Time Interval Trigger type. //String TTYPEDAILYTIMEINT = "DAILYI"; /// A general blob Trigger type. //String TTYPE_BLOB = "BLOB";
对应表结构
1.2.1 trigger实例SimpleTrigger trigger1 = newTrigger().withIdentity("trigger1", "group1").startAt(startTime) .withSchedule(simpleSchedule().withIntervalInSeconds(10).withRepeatCount(4)).build();
Trigger存储在mysql中
1.2.2 Quartz TriggerBuilder
提供了一个链式创建Trigger的api@BeanpublicTrigger trigger(JobDetail job) {returnTriggerBuilder.newTrigger().forJob(job) .withIdentity("Qrtz_Trigger") .withDescription("Sample trigger") .withSchedule(simpleSchedule().repeatForever().withIntervalInHours(1)) .build(); }
1.2.3 Spring SimpleTriggerFactoryBean
spring提供的一个创建SimpleTrigger的工厂类@BeanpublicSimpleTriggerFactoryBean trigger(JobDetail job) { SimpleTriggerFactoryBean trigger= newSimpleTriggerFactoryBean(); trigger.setJobDetail(job); trigger.setRepeatInterval(3600000); trigger.setRepeatCount(SimpleTrigger.REPEAT_INDEFINITELY);returntrigger; }
1.3 调度组件
1.3.1 quartz提供的工厂类@BeanpublicScheduler scheduler(Trigger trigger, JobDetail job) { StdSchedulerFactory factory= newStdSchedulerFactory(); factory.initialize(new ClassPathResource("quartz.properties").getInputStream()); Scheduler scheduler=factory.getScheduler(); scheduler.setJobFactory(springBeanJobFactory()); scheduler.scheduleJob(job, trigger); scheduler.start(); returnscheduler; }
1.3.2 spring提供的工厂bean
@BeanpublicSchedulerFactoryBean scheduler(Trigger trigger, JobDetail job) { SchedulerFactoryBean schedulerFactory= newSchedulerFactoryBean(); schedulerFactory.setConfigLocation(new ClassPathResource("quartz.properties")); schedulerFactory.setJobFactory(springBeanJobFactory()); schedulerFactory.setJobDetails(job); schedulerFactory.setTriggers(trigger);returnschedulerFactory; }
2.工作原理
2.1 核心类QuartzScheduler
Scheduler实现类StdScheduler封装了核心工作类QuartzScheduler//// /StdScheduler
instance to proxy the given / QuartzScheduler
instance, and with the given SchedulingContext
. /
2.2 JobDetail的存取
public void addJob(JobDetail jobDetail, boolean replace, boolean storeNonDurableWhileAwaitingScheduling) throwsSchedulerException { validateState();if (!storeNonDurableWhileAwaitingScheduling && !jobDetail.isDurable()) {throw newSchedulerException("Jobs added with no trigger must be durable."); } resources.getJobStore().storeJob(jobDetail, replace); notifySchedulerThread(0L); notifySchedulerListenersJobAdded(jobDetail); }
2.2.1 存储JobDetail信息(以mysql Jdbc方式为例)
//// /
调用StdJDBCDelegate实现
//// /
注意:JobDataMap序列化后以Blob形式存储到数据库中
StdJDBCConstants中执行sql如下:String INSERTJOBDETAIL = "INSERT INTO " + TABLEPREFIXSUBST + TABLEJOBDETAILS + " (" + COLSCHEDULERNAME + ", " +COLJOBNAME+ ", " + COLJOBGROUP + ", " + COLDESCRIPTION + ", " + COLJOBCLASS + ", " + COLISDURABLE + ", " + COLISNONCONCURRENT + ", " + COLISUPDATEDATA + ", " + COLREQUESTSRECOVERY + ", " + COLJOBDATAMAP + ") " + " VALUES(" + SCHEDNAMESUBST + ", ?, ?, ?, ?, ?, ?, ?, ?, ?)";
2.2.2 查询JobDetail
强调一下,因JobDetail中的JobDataMap是以Blob形式存放到数据库中的(也可以通过useProperties属性修改成string存储,默认是false,Blob形式存储),所以查询时需要特殊处理:StdJDBCDelegate.java//// /
2.3 查询trigger
//// /{@linkorg.quartz.Trigger}
. /
Trigger
, or null if there is no / match./*/ public OperableTrigger retrieveTrigger(final TriggerKey triggerKey) throwsJobPersistenceException {return (OperableTrigger)executeWithoutLock( //no locks necessary for read... newTransactionCallback() {public Object execute(Connection conn) throwsJobPersistenceException {returnretrieveTrigger(conn, triggerKey); } }); }protectedOperableTrigger retrieveTrigger(Connection conn, TriggerKey key)throwsJobPersistenceException {try{returngetDelegate().selectTrigger(conn, key); }catch(Exception e) {throw new JobPersistenceException("Couldn't retrieve trigger: " +e.getMessage(), e); } }
StdJDBCDelegate.java
//// /
{@linkorg.quartz.Trigger}
object /@throwsJobPersistenceException/*/ public OperableTrigger selectTrigger(Connection conn, TriggerKey triggerKey) throwsSQLException, ClassNotFoundException, IOException, JobPersistenceException { PreparedStatement ps= null; ResultSet rs= null;try{ OperableTrigger trigger= null; ps=conn.prepareStatement(rtp(SELECT_TRIGGER)); ps.setString(1, triggerKey.getName()); ps.setString(2, triggerKey.getGroup()); rs=ps.executeQuery();if(rs.next()) { String jobName=rs.getString(COL_JOB_NAME); String jobGroup=rs.getString(COL_JOB_GROUP); String description=rs.getString(COL_DESCRIPTION);long nextFireTime =rs.getLong(COL_NEXT_FIRE_TIME);long prevFireTime =rs.getLong(COL_PREV_FIRE_TIME); String triggerType=rs.getString(COL_TRIGGER_TYPE);long startTime =rs.getLong(COL_START_TIME);long endTime =rs.getLong(COL_END_TIME); String calendarName=rs.getString(COL_CALENDAR_NAME);int misFireInstr =rs.getInt(COL_MISFIRE_INSTRUCTION);int priority =rs.getInt(COL_PRIORITY); Map, ?> map = null;if(canUseProperties()) { map=getMapFromProperties(rs); }else{ map= (Map, ?>) getObjectFromBlob(rs, COL_JOB_DATAMAP); } Date nft= null;if (nextFireTime > 0) { nft= newDate(nextFireTime); } Date pft= null;if (prevFireTime > 0) { pft= newDate(prevFireTime); } Date startTimeD= newDate(startTime); Date endTimeD= null;if (endTime > 0) { endTimeD= newDate(endTime); }if(triggerType.equals(TTYPE_BLOB)) { rs.close(); rs= null; ps.close(); ps= null; ps=conn.prepareStatement(rtp(SELECT_BLOB_TRIGGER)); ps.setString(1, triggerKey.getName()); ps.setString(2, triggerKey.getGroup()); rs=ps.executeQuery();if(rs.next()) { trigger=(OperableTrigger) getObjectFromBlob(rs, COL_BLOB); } }else{ TriggerPersistenceDelegate tDel=findTriggerPersistenceDelegate(triggerType);if(tDel == null)throw new JobPersistenceException("No TriggerPersistenceDelegate for trigger discriminator type: " +triggerType); TriggerPropertyBundle triggerProps= null;try{ triggerProps=tDel.loadExtendedTriggerProperties(conn, triggerKey); }catch(IllegalStateException isex) {if(isTriggerStillPresent(ps)) {throwisex; }else{//QTZ-386 Trigger has been deleted return null; } } TriggerBuilder> tb =newTrigger() .withDescription(description) .withPriority(priority) .startAt(startTimeD) .endAt(endTimeD) .withIdentity(triggerKey) .modifiedByCalendar(calendarName) .withSchedule(triggerProps.getScheduleBuilder()) .forJob(jobKey(jobName, jobGroup));if (null !=map) { tb.usingJobData(newJobDataMap(map)); } trigger=(OperableTrigger) tb.build(); trigger.setMisfireInstruction(misFireInstr); trigger.setNextFireTime(nft); trigger.setPreviousFireTime(pft); setTriggerStateProperties(trigger, triggerProps); } }returntrigger; }finally{ closeResultSet(rs); closeStatement(ps); } }
执行的sql:
String SELECTTRIGGER = "SELECT /* FROM " + TABLEPREFIXSUBST + TABLETRIGGERS + " WHERE " + COLSCHEDULERNAME + " = " +SCHEDNAMESUBST+ " AND " + COLTRIGGERNAME + " = ? AND " + COLTRIGGERGROUP + " = ?";
和JobDetail一样,也存在Blob的问题,不再赘述。
2.4 调度执行线程QuartzSchedulerThread//// /QuartzSchedulerThread
. /
2.4.1 获取trigger(红色1)protected List
2.4.2 触发trigger(红色2)
protectedTriggerFiredBundle triggerFired(Connection conn, OperableTrigger trigger)throwsJobPersistenceException { JobDetail job; Calendar cal= null;//Make sure trigger wasn't deleted, paused, or completed... try { //if trigger was deleted, state will be STATEDELETED String state =getDelegate().selectTriggerState(conn, trigger.getKey());if (!state.equals(STATEACQUIRED)) {return null; } }catch(SQLException e) {throw new JobPersistenceException("Couldn't select trigger state: " +e.getMessage(), e); }try{ job=retrieveJob(conn, trigger.getJobKey());if (job == null) { return null; } }catch(JobPersistenceException jpe) {try{ getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe); getDelegate().updateTriggerState(conn, trigger.getKey(), STATEERROR); }catch(SQLException sqle) { getLog().error("Unable to set trigger state to ERROR.", sqle); }throwjpe; }if (trigger.getCalendarName() != null) { cal=retrieveCalendar(conn, trigger.getCalendarName());if (cal == null) { return null; } }try{ getDelegate().updateFiredTrigger(conn, trigger, STATEEXECUTING, job); }catch(SQLException e) {throw new JobPersistenceException("Couldn't insert fired trigger: " +e.getMessage(), e); } Date prevFireTime=trigger.getPreviousFireTime();//call triggered - to update the trigger's next-fire-time state... trigger.triggered(cal); String state=STATEWAITING;boolean force = true;if(job.isConcurrentExectionDisallowed()) { state=STATEBLOCKED; force= false;try{ getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(), STATEBLOCKED, STATEWAITING); getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(), STATEBLOCKED, STATEACQUIRED); getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(), STATEPAUSEDBLOCKED, STATEPAUSED); }catch(SQLException e) {throw newJobPersistenceException("Couldn't update states of blocked triggers: " +e.getMessage(), e); } }if (trigger.getNextFireTime() == null) { state=STATECOMPLETE; force= true; } storeTrigger(conn, trigger, job,true, state, force, false); job.getJobDataMap().clearDirtyFlag();return newTriggerFiredBundle(job, trigger, cal, trigger.getKey().getGroup() .equals(Scheduler.DEFAULTRECOVERYGROUP),newDate(), trigger .getPreviousFireTime(), prevFireTime, trigger.getNextFireTime()); }
2.4.3 数据库锁
StdRowLockSemaphore针对支持select for update的数据库如mysql
UpdateLockRowSemaphore针对不支持select for update的数据库如mssqlserver
StdRowLockSemaphore的实现如下:public static final String SELECTFORLOCK = "SELECT /* FROM " + TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_SCHEDULER_NAME + " = " +SCHED_NAME_SUBST+ " AND " + COL_LOCK_NAME + " = ? FOR UPDATE";public static final String INSERT_LOCK = "INSERT INTO " + TABLE_PREFIX_SUBST + TABLE_LOCKS + "(" + COL_SCHEDULER_NAME + ", " + COL_LOCK_NAME + ") VALUES (" + SCHED_NAME_SUBST + ", ?)";
总结:
1.quartz的三大组件Job/trigger/scheduler,job负责业务逻辑,trigger负责执行时机,scheduler负责调度Job和trigger来执行。
2.使用mysql作为存储的话,使用StdJDBCDelegate和数据库进行交互,交互的sql在StdJDBCConstants中定义
3.QuartzScheduler是核心类,Scheduler做其代理,真正执行的是QuartzSchedulerThread
4.JobStore存储控制,JobStoreSupport的两个实现JobStoreCMT容器管理事务,不需要使用commit和rollback;JobStoreTX用在单机环境,需要处理commit和rollback
5.数据库锁使用了悲观锁select for update,定义为Semaphore
6.qrtzschedulerstate定义了扫描间隔集群扫描间隔
参考文献:
【1】https://www.baeldung.com/spring-quartz-schedule
【2】https://blog.csdn.net/xiaojin21cen/article/details/79298883