Error message here!

Hide Error message here!

忘记密码?

Error message here!

请输入正确邮箱

Hide Error message here!

密码丢失?请输入您的电子邮件地址。您将收到一个重设密码链接。

Error message here!

返回登录

Close

一文揭秘定时任务调度框架quartz

一天不进步,就是退步 2019-01-28 16:49:00 阅读数:240 评论数:0 点赞数:0 收藏数:0

之前写过quartz或者引用过quartz的一些文章,有很多人给我发消息问quartz的相关问题,

quartz 报错:java.lang.classNotFoundException

quartz源码分析之深刻理解job,sheduler,calendar,trigger及listener之间的关系

Quartz框架多个trigger任务执行出现漏执行的问题分析--转

quartz集群调度机制调研及源码分析---转载

分布式定时任务调度系统技术选型--转

趁着年底比较清闲,把quartz的问题整理了一下,顺带翻了翻源码,做了一些总结,希望能帮助到一些人或者减少人们探索的时间。

注意,使用版本为quartz2.2.3  spring boot2.1.3

1.quartz的核心组件

 1.1 Job组件

1.1.1Job

Job负责任务执行的逻辑,所有逻辑在execute()方法中,执行所需要的数据存放在JobExecutionContext 中

Job实例:@PersistJobDataAfterExecution @DisallowConcurrentExecutionpublic class ColorJob implementsJob {private static Logger log = LoggerFactory.getLogger(ColorJob.class);//parameter names specific to this job public static final String FAVORITECOLOR = "favorite color";public static final String EXECUTIONCOUNT = "count";//Since Quartz will re-instantiate a class every time it//gets executed, members non-static member variables can//not be used to maintain state! private int counter = 1;////

/ Empty constructor for job initialization /

/

/ Quartz requires a public empty constructor so that the / scheduler can instantiate the class whenever it needs. /

// publicColorJob() { }////

/ Called by the {@linkorg.quartz.Scheduler} when a / {@linkorg.quartz.Trigger} fires that is associated with / the Job. /

/ /@throwsJobExecutionException / if there is an exception while executing the job./*/ public voidexecute(JobExecutionContext context)throwsJobExecutionException {//This job simply prints out its job name and the//date and time that it is running JobKey jobKey =context.getJobDetail().getKey();//Grab and print passed parameters JobDataMap data =context.getJobDetail().getJobDataMap(); String favoriteColor=data.getString(FAVORITE_COLOR);int count =data.getInt(EXECUTION_COUNT); _log.info("ColorJob: " + jobKey + " executing at " + new Date() + "n" + " favorite color is " + favoriteColor + "n" + " execution count (from job map) is " + count + "n" + " execution count (from job member variable) is " +_counter);//increment the count and store it back into the//job map so that job state can be properly maintained count++; data.put(EXECUTION_COUNT, count);//Increment the local member variable//This serves no real purpose since job state can not//be maintained via member variables! _counter++; } }

1.1.2 JobDetail存储Job的信息

 主要负责

1.指定执行的Job类,唯一标识(job名称和组别 名称)

2.存储JobDataMap信息//job1 will only run 5 times (at start time, plus 4 repeats), every 10 seconds JobDetail job1 = newJob(ColorJob.class).withIdentity("job1", "group1").build();//pass initialization parameters into the job job1.getJobDataMap().put(ColorJob.FAVORITECOLOR, "Green"); job1.getJobDataMap().put(ColorJob.EXECUTIONCOUNT,1);

 数据库存储如下:

1.1.3 Quartz JobBuilder提供了一个链式api创建JobDetail@BeanpublicJobDetail jobDetail() {return JobBuilder.newJob().ofType(SampleJob.class) .storeDurably() .withIdentity("QrtzJobDetail") .withDescription("Invoke Sample Job service...") .build(); }

1.1.4 Spring JobDetailFactoryBean

   spring提供的一个创建JobDetail的方式工厂bean@BeanpublicJobDetailFactoryBean jobDetail() { JobDetailFactoryBean jobDetailFactory= newJobDetailFactoryBean(); jobDetailFactory.setJobClass(SampleJob.class); jobDetailFactory.setDescription("Invoke Sample Job service..."); jobDetailFactory.setDurability(true);returnjobDetailFactory; }

 

1.2 Trigger组件

 

trigger的状态不同trigger的状态 // STATES String STATEWAITING = "WAITING"; String STATEACQUIRED = "ACQUIRED"; String STATEEXECUTING = "EXECUTING"; String STATECOMPLETE = "COMPLETE"; String STATEBLOCKED = "BLOCKED"; String STATEERROR = "ERROR"; String STATEPAUSED = "PAUSED"; String STATEPAUSEDBLOCKED = "PAUSEDBLOCKED"; String STATE_DELETED = "DELETED";

状态的表结构

trigger的类型// TRIGGER TYPES /// Simple Trigger type. //String TTYPE_SIMPLE = "SIMPLE"; /// Cron Trigger type. //String TTYPECRON = "CRON"; /// Calendar Interval Trigger type. /*/String TTYPECALINT = "CALINT"; /// Daily Time Interval Trigger type. //String TTYPEDAILYTIMEINT = "DAILYI"; /// A general blob Trigger type. //String TTYPE_BLOB = "BLOB";

对应表结构

 

1.2.1 trigger实例SimpleTrigger trigger1 = newTrigger().withIdentity("trigger1", "group1").startAt(startTime) .withSchedule(simpleSchedule().withIntervalInSeconds(10).withRepeatCount(4)).build();

Trigger存储在mysql中

 1.2.2 Quartz TriggerBuilder

提供了一个链式创建Trigger的api@BeanpublicTrigger trigger(JobDetail job) {returnTriggerBuilder.newTrigger().forJob(job) .withIdentity("Qrtz_Trigger") .withDescription("Sample trigger") .withSchedule(simpleSchedule().repeatForever().withIntervalInHours(1)) .build(); }

1.2.3 Spring SimpleTriggerFactoryBean

 spring提供的一个创建SimpleTrigger的工厂类@BeanpublicSimpleTriggerFactoryBean trigger(JobDetail job) { SimpleTriggerFactoryBean trigger= newSimpleTriggerFactoryBean(); trigger.setJobDetail(job); trigger.setRepeatInterval(3600000); trigger.setRepeatCount(SimpleTrigger.REPEAT_INDEFINITELY);returntrigger; }

1.3 调度组件

1.3.1 quartz提供的工厂类@BeanpublicScheduler scheduler(Trigger trigger, JobDetail job) {     StdSchedulerFactory factory= newStdSchedulerFactory();     factory.initialize(new ClassPathResource("quartz.properties").getInputStream());       Scheduler scheduler=factory.getScheduler();     scheduler.setJobFactory(springBeanJobFactory());     scheduler.scheduleJob(job, trigger);       scheduler.start();     returnscheduler; }

1.3.2 spring提供的工厂bean

@BeanpublicSchedulerFactoryBean scheduler(Trigger trigger, JobDetail job) { SchedulerFactoryBean schedulerFactory= newSchedulerFactoryBean(); schedulerFactory.setConfigLocation(new ClassPathResource("quartz.properties")); schedulerFactory.setJobFactory(springBeanJobFactory()); schedulerFactory.setJobDetails(job); schedulerFactory.setTriggers(trigger);returnschedulerFactory; }

2.工作原理

  2.1 核心类QuartzScheduler

Scheduler实现类StdScheduler封装了核心工作类QuartzScheduler////

/ Construct a StdScheduler instance to proxy the given / QuartzScheduler instance, and with the given SchedulingContext. /

/*/ publicStdScheduler(QuartzScheduler sched) {this.sched =sched; }

  2.2 JobDetail的存取

public void addJob(JobDetail jobDetail, boolean replace, boolean storeNonDurableWhileAwaitingScheduling) throwsSchedulerException { validateState();if (!storeNonDurableWhileAwaitingScheduling && !jobDetail.isDurable()) {throw newSchedulerException("Jobs added with no trigger must be durable."); } resources.getJobStore().storeJob(jobDetail, replace); notifySchedulerThread(0L); notifySchedulerListenersJobAdded(jobDetail); }

2.2.1 存储JobDetail信息(以mysql Jdbc方式为例)

////

/ Insert or update a job. /

// protected voidstoreJob(Connection conn, JobDetail newJob,booleanreplaceExisting)throwsJobPersistenceException {boolean existingJob =jobExists(conn, newJob.getKey());try{if(existingJob) {if (!replaceExisting) {throw newObjectAlreadyExistsException(newJob); } getDelegate().updateJobDetail(conn, newJob); }else{ getDelegate().insertJobDetail(conn, newJob); } }catch(IOException e) {throw new JobPersistenceException("Couldn't store job: " +e.getMessage(), e); }catch(SQLException e) {throw new JobPersistenceException("Couldn't store job: " +e.getMessage(), e); } }

调用StdJDBCDelegate实现

////

/ Insert the job detail record. /

/ /@paramconn / the DB Connection /@paramjob / the job to insert /@returnnumber of rows inserted /@throwsIOException / if there were problems serializing the JobDataMap// public intinsertJobDetail(Connection conn, JobDetail job)throwsIOException, SQLException { ByteArrayOutputStream baos=serializeJobData(job.getJobDataMap()); PreparedStatement ps= null;int insertResult = 0;try{ ps=conn.prepareStatement(rtp(INSERTJOBDETAIL)); ps.setString(1, job.getKey().getName()); ps.setString(2, job.getKey().getGroup()); ps.setString(3, job.getDescription()); ps.setString(4, job.getJobClass().getName()); setBoolean(ps,5, job.isDurable()); setBoolean(ps,6, job.isConcurrentExectionDisallowed()); setBoolean(ps,7, job.isPersistJobDataAfterExecution()); setBoolean(ps,8, job.requestsRecovery()); setBytes(ps,9, baos); insertResult=ps.executeUpdate(); }finally{ closeStatement(ps); }returninsertResult; }

注意:JobDataMap序列化后以Blob形式存储到数据库中

StdJDBCConstants中执行sql如下:String INSERTJOBDETAIL = "INSERT INTO " + TABLEPREFIXSUBST + TABLEJOBDETAILS + " (" + COLSCHEDULERNAME + ", " +COLJOBNAME+ ", " + COLJOBGROUP + ", " + COLDESCRIPTION + ", " + COLJOBCLASS + ", " + COLISDURABLE + ", " + COLISNONCONCURRENT + ", " + COLISUPDATEDATA + ", " + COLREQUESTSRECOVERY + ", " + COLJOBDATAMAP + ") " + " VALUES(" + SCHEDNAMESUBST + ", ?, ?, ?, ?, ?, ?, ?, ?, ?)";

2.2.2  查询JobDetail

  强调一下,因JobDetail中的JobDataMap是以Blob形式存放到数据库中的(也可以通过useProperties属性修改成string存储,默认是false,Blob形式存储),所以查询时需要特殊处理:StdJDBCDelegate.java////

/ Select the JobDetail object for a given job name / group name. /

/ /@paramconn / the DB Connection /@returnthe populated JobDetail object /@throwsClassNotFoundException / if a class found during deserialization cannot be found or if / the job class could not be found /@throwsIOException / if deserialization causes an error/*/ publicJobDetail selectJobDetail(Connection conn, JobKey jobKey, ClassLoadHelper loadHelper)throwsClassNotFoundException, IOException, SQLException { PreparedStatement ps= null; ResultSet rs= null;try{ ps=conn.prepareStatement(rtp(SELECT_JOB_DETAIL)); ps.setString(1, jobKey.getName()); ps.setString(2, jobKey.getGroup()); rs=ps.executeQuery(); JobDetailImpl job= null;if(rs.next()) { job= newJobDetailImpl(); job.setName(rs.getString(COL_JOB_NAME)); job.setGroup(rs.getString(COL_JOB_GROUP)); job.setDescription(rs.getString(COL_DESCRIPTION)); job.setJobClass( loadHelper.loadClass(rs.getString(COL_JOB_CLASS), Job.class)); job.setDurability(getBoolean(rs, COL_IS_DURABLE)); job.setRequestsRecovery(getBoolean(rs, COL_REQUESTS_RECOVERY)); Map map = null;if(canUseProperties()) { map=getMapFromProperties(rs); }else{ map= (Map) getObjectFromBlob(rs, COL_JOB_DATAMAP); }if (null !=map) { job.setJobDataMap(newJobDataMap(map)); } }returnjob; }finally{ closeResultSet(rs); closeStatement(ps); } }

2.3 查询trigger

////

/ Retrieve the given {@linkorg.quartz.Trigger}. /

/ /@returnThe desired Trigger, or null if there is no / match./*/ public OperableTrigger retrieveTrigger(final TriggerKey triggerKey) throwsJobPersistenceException {return (OperableTrigger)executeWithoutLock( //no locks necessary for read... newTransactionCallback() {public Object execute(Connection conn) throwsJobPersistenceException {returnretrieveTrigger(conn, triggerKey); } }); }protectedOperableTrigger retrieveTrigger(Connection conn, TriggerKey key)throwsJobPersistenceException {try{returngetDelegate().selectTrigger(conn, key); }catch(Exception e) {throw new JobPersistenceException("Couldn't retrieve trigger: " +e.getMessage(), e); } }

StdJDBCDelegate.java

////

/ Select a trigger. /

/ /@paramconn / the DB Connection /@returnthe {@linkorg.quartz.Trigger} object /@throwsJobPersistenceException/*/ public OperableTrigger selectTrigger(Connection conn, TriggerKey triggerKey) throwsSQLException, ClassNotFoundException, IOException, JobPersistenceException { PreparedStatement ps= null; ResultSet rs= null;try{ OperableTrigger trigger= null; ps=conn.prepareStatement(rtp(SELECT_TRIGGER)); ps.setString(1, triggerKey.getName()); ps.setString(2, triggerKey.getGroup()); rs=ps.executeQuery();if(rs.next()) { String jobName=rs.getString(COL_JOB_NAME); String jobGroup=rs.getString(COL_JOB_GROUP); String description=rs.getString(COL_DESCRIPTION);long nextFireTime =rs.getLong(COL_NEXT_FIRE_TIME);long prevFireTime =rs.getLong(COL_PREV_FIRE_TIME); String triggerType=rs.getString(COL_TRIGGER_TYPE);long startTime =rs.getLong(COL_START_TIME);long endTime =rs.getLong(COL_END_TIME); String calendarName=rs.getString(COL_CALENDAR_NAME);int misFireInstr =rs.getInt(COL_MISFIRE_INSTRUCTION);int priority =rs.getInt(COL_PRIORITY); Map map = null;if(canUseProperties()) { map=getMapFromProperties(rs); }else{ map= (Map) getObjectFromBlob(rs, COL_JOB_DATAMAP); } Date nft= null;if (nextFireTime > 0) { nft= newDate(nextFireTime); } Date pft= null;if (prevFireTime > 0) { pft= newDate(prevFireTime); } Date startTimeD= newDate(startTime); Date endTimeD= null;if (endTime > 0) { endTimeD= newDate(endTime); }if(triggerType.equals(TTYPE_BLOB)) { rs.close(); rs= null; ps.close(); ps= null; ps=conn.prepareStatement(rtp(SELECT_BLOB_TRIGGER)); ps.setString(1, triggerKey.getName()); ps.setString(2, triggerKey.getGroup()); rs=ps.executeQuery();if(rs.next()) { trigger=(OperableTrigger) getObjectFromBlob(rs, COL_BLOB); } }else{ TriggerPersistenceDelegate tDel=findTriggerPersistenceDelegate(triggerType);if(tDel == null)throw new JobPersistenceException("No TriggerPersistenceDelegate for trigger discriminator type: " +triggerType); TriggerPropertyBundle triggerProps= null;try{ triggerProps=tDel.loadExtendedTriggerProperties(conn, triggerKey); }catch(IllegalStateException isex) {if(isTriggerStillPresent(ps)) {throwisex; }else{//QTZ-386 Trigger has been deleted return null; } } TriggerBuilder tb =newTrigger() .withDescription(description) .withPriority(priority) .startAt(startTimeD) .endAt(endTimeD) .withIdentity(triggerKey) .modifiedByCalendar(calendarName) .withSchedule(triggerProps.getScheduleBuilder()) .forJob(jobKey(jobName, jobGroup));if (null !=map) { tb.usingJobData(newJobDataMap(map)); } trigger=(OperableTrigger) tb.build(); trigger.setMisfireInstruction(misFireInstr); trigger.setNextFireTime(nft); trigger.setPreviousFireTime(pft); setTriggerStateProperties(trigger, triggerProps); } }returntrigger; }finally{ closeResultSet(rs); closeStatement(ps); } }

执行的sql:

String SELECTTRIGGER = "SELECT /* FROM " + TABLEPREFIXSUBST + TABLETRIGGERS + " WHERE " + COLSCHEDULERNAME + " = " +SCHEDNAMESUBST+ " AND " + COLTRIGGERNAME + " = ? AND " + COLTRIGGERGROUP + " = ?";

和JobDetail一样,也存在Blob的问题,不再赘述。

2.4 调度执行线程QuartzSchedulerThread////

/ The main processing loop of the QuartzSchedulerThread. /

//@Overridepublic voidrun() {boolean lastAcquireFailed = false;while (!halted.get()) {try{//check if we're supposed to pause... synchronized(sigLock) {while (paused && !halted.get()) {try{//wait until togglePause(false) is called... sigLock.wait(1000L); }catch(InterruptedException ignore) { } }if(halted.get()) {break; } }int availThreadCount =qsRsrcs.getThreadPool().blockForAvailableThreads();if(availThreadCount > 0) { //will always be true, due to semantics of blockForAvailableThreads... List triggers = null;long now =System.currentTimeMillis(); clearSignaledSchedulingChange();try{ triggers= qsRsrcs.getJobStore().acquireNextTriggers( now +idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); //1. lastAcquireFailed= false;if(log.isDebugEnabled()) log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers"); }catch(JobPersistenceException jpe) {if(!lastAcquireFailed) { qs.notifySchedulerListenersError("An error occurred while scanning for the next triggers to fire.", jpe); } lastAcquireFailed= true;continue; }catch(RuntimeException e) {if(!lastAcquireFailed) { getLog().error("quartzSchedulerThreadLoop: RuntimeException " +e.getMessage(), e); } lastAcquireFailed= true;continue; }if (triggers != null && !triggers.isEmpty()) { now=System.currentTimeMillis();long triggerTime = triggers.get(0).getNextFireTime().getTime();long timeUntilTrigger = triggerTime -now;while(timeUntilTrigger > 2) {synchronized(sigLock) {if(halted.get()) {break; }if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {try{//we could have blocked a long while//on 'synchronize', so we must recompute now =System.currentTimeMillis(); timeUntilTrigger= triggerTime -now;if(timeUntilTrigger >= 1) sigLock.wait(timeUntilTrigger); }catch(InterruptedException ignore) { } } }if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {break; } now=System.currentTimeMillis(); timeUntilTrigger= triggerTime -now; }//this happens if releaseIfScheduleChangedSignificantly decided to release triggers if(triggers.isEmpty())continue;//set triggers to 'executing' List bndles = new ArrayList();boolean goAhead = true;synchronized(sigLock) { goAhead= !halted.get(); }if(goAhead) {try{ List res = qsRsrcs.getJobStore().triggersFired(triggers); //2if(res != null) bndles=res; }catch(SchedulerException se) { qs.notifySchedulerListenersError("An error occurred while firing triggers '" + triggers + "'", se);//QTZ-179 : a problem occurred interacting with the triggers from the db//we release them and loop again for (int i = 0; i < triggers.size(); i++) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); }continue; } }for (int i = 0; i < bndles.size(); i++) { TriggerFiredResult result=bndles.get(i); TriggerFiredBundle bndle=result.getTriggerFiredBundle(); Exception exception=result.getException();if (exception instanceofRuntimeException) { getLog().error("RuntimeException while firing trigger " +triggers.get(i), exception); qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));continue; }//it's possible to get 'null' if the triggers was paused,//blocked, or other similar occurrences that prevent it being//fired at this time... or if the scheduler was shutdown (halted) if (bndle == null) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));continue; } JobRunShell shell= null;try{ shell=qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle); shell.initialize(qs); }catch(SchedulerException se) { qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SETALLJOBTRIGGERSERROR);continue; }if (qsRsrcs.getThreadPool().runInThread(shell) == false) {//this case should never happen, as it is indicative of the//scheduler being shutdown or a bug in the thread pool or//a thread pool being used concurrently - which the docs//say not to do... getLog().error("ThreadPool.runInThread() return false!"); qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SETALLJOBTRIGGERSERROR); } }continue; //while (!halted) } }else { //if(availThreadCount > 0)//should never happen, if threadPool.blockForAvailableThreads() follows contract continue; //while (!halted) }long now =System.currentTimeMillis();long waitTime = now +getRandomizedIdleWaitTime();long timeUntilContinue = waitTime -now;synchronized(sigLock) {try{if(!halted.get()) {//QTZ-336 A job might have been completed in the mean time and we might have//missed the scheduled changed signal by not waiting for the notify() yet//Check that before waiting for too long in case this very job needs to be//scheduled very soon if (!isScheduleChanged()) { sigLock.wait(timeUntilContinue); } } }catch(InterruptedException ignore) { } } }catch(RuntimeException re) { getLog().error("Runtime error occurred in main trigger firing loop.", re); } }//while (!halted)//drop references to scheduler stuff to aid garbage collection... qs = null; qsRsrcs= null; }

 

2.4.1 获取trigger(红色1)protected List acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, longtimeWindow)throwsJobPersistenceException {if (timeWindow < 0) {throw newIllegalArgumentException(); } List acquiredTriggers = new ArrayList(); Set acquiredJobKeysForNoConcurrentExec = new HashSet();final int MAXDOLOOPRETRY = 3;int currentLoopCount = 0;do{ currentLoopCount++;try{ List keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan +timeWindow, getMisfireTime(), maxCount);//No trigger is ready to fire yet. if (keys == null || keys.size() == 0)returnacquiredTriggers;long batchEnd =noLaterThan;for(TriggerKey triggerKey: keys) {//If our trigger is no longer available, try a new one. OperableTrigger nextTrigger =retrieveTrigger(conn, triggerKey);if(nextTrigger == null) {continue; //next trigger }//If trigger's job is set as @DisallowConcurrentExecution, and it has already been added to result, then//put it back into the timeTriggers set and continue to search for next trigger. JobKey jobKey =nextTrigger.getJobKey(); JobDetail job;try{ job=retrieveJob(conn, jobKey); }catch(JobPersistenceException jpe) {try{ getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe); getDelegate().updateTriggerState(conn, triggerKey, STATEERROR); }catch(SQLException sqle) { getLog().error("Unable to set trigger state to ERROR.", sqle); }continue; }if(job.isConcurrentExectionDisallowed()) {if(acquiredJobKeysForNoConcurrentExec.contains(jobKey)) {continue; //next trigger } else{ acquiredJobKeysForNoConcurrentExec.add(jobKey); } }if (nextTrigger.getNextFireTime().getTime() >batchEnd) {break; }//We now have a acquired trigger, let's add to return list.//If our trigger was no longer in the expected state, try a new one. int rowsUpdated =getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATEACQUIRED, STATEWAITING);if (rowsUpdated <= 0) {continue; //next trigger } nextTrigger.setFireInstanceId(getFiredTriggerRecordId()); getDelegate().insertFiredTrigger(conn, nextTrigger, STATEACQUIRED,null);if(acquiredTriggers.isEmpty()) { batchEnd= Math.max(nextTrigger.getNextFireTime().getTime(), System.currentTimeMillis()) +timeWindow; } acquiredTriggers.add(nextTrigger); }//if we didn't end up with any trigger to fire from that first//batch, try again for another batch. We allow with a max retry count. if(acquiredTriggers.size() == 0 && currentLoopCount DOLOOPRETRY) {continue; }//We are done with the while loop. break; }catch(Exception e) {throw newJobPersistenceException("Couldn't acquire next trigger: " +e.getMessage(), e); } }while (true);//Return the acquired trigger list returnacquiredTriggers; }

2.4.2 触发trigger(红色2)

protectedTriggerFiredBundle triggerFired(Connection conn, OperableTrigger trigger)throwsJobPersistenceException { JobDetail job; Calendar cal= null;//Make sure trigger wasn't deleted, paused, or completed... try { //if trigger was deleted, state will be STATEDELETED String state =getDelegate().selectTriggerState(conn, trigger.getKey());if (!state.equals(STATEACQUIRED)) {return null; } }catch(SQLException e) {throw new JobPersistenceException("Couldn't select trigger state: " +e.getMessage(), e); }try{ job=retrieveJob(conn, trigger.getJobKey());if (job == null) { return null; } }catch(JobPersistenceException jpe) {try{ getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe); getDelegate().updateTriggerState(conn, trigger.getKey(), STATEERROR); }catch(SQLException sqle) { getLog().error("Unable to set trigger state to ERROR.", sqle); }throwjpe; }if (trigger.getCalendarName() != null) { cal=retrieveCalendar(conn, trigger.getCalendarName());if (cal == null) { return null; } }try{ getDelegate().updateFiredTrigger(conn, trigger, STATEEXECUTING, job); }catch(SQLException e) {throw new JobPersistenceException("Couldn't insert fired trigger: " +e.getMessage(), e); } Date prevFireTime=trigger.getPreviousFireTime();//call triggered - to update the trigger's next-fire-time state... trigger.triggered(cal); String state=STATEWAITING;boolean force = true;if(job.isConcurrentExectionDisallowed()) { state=STATEBLOCKED; force= false;try{ getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(), STATEBLOCKED, STATEWAITING); getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(), STATEBLOCKED, STATEACQUIRED); getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(), STATEPAUSEDBLOCKED, STATEPAUSED); }catch(SQLException e) {throw newJobPersistenceException("Couldn't update states of blocked triggers: " +e.getMessage(), e); } }if (trigger.getNextFireTime() == null) { state=STATECOMPLETE; force= true; } storeTrigger(conn, trigger, job,true, state, force, false); job.getJobDataMap().clearDirtyFlag();return newTriggerFiredBundle(job, trigger, cal, trigger.getKey().getGroup() .equals(Scheduler.DEFAULTRECOVERYGROUP),newDate(), trigger .getPreviousFireTime(), prevFireTime, trigger.getNextFireTime()); }

 2.4.3 数据库锁

 StdRowLockSemaphore针对支持select for update的数据库如mysql

UpdateLockRowSemaphore针对不支持select for update的数据库如mssqlserver

 StdRowLockSemaphore的实现如下:public static final String SELECTFORLOCK = "SELECT /* FROM " + TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_SCHEDULER_NAME + " = " +SCHED_NAME_SUBST+ " AND " + COL_LOCK_NAME + " = ? FOR UPDATE";public static final String INSERT_LOCK = "INSERT INTO " + TABLE_PREFIX_SUBST + TABLE_LOCKS + "(" + COL_SCHEDULER_NAME + ", " + COL_LOCK_NAME + ") VALUES (" + SCHED_NAME_SUBST + ", ?)";

 

总结:

  1.quartz的三大组件Job/trigger/scheduler,job负责业务逻辑,trigger负责执行时机,scheduler负责调度Job和trigger来执行。

  2.使用mysql作为存储的话,使用StdJDBCDelegate和数据库进行交互,交互的sql在StdJDBCConstants中定义

   3.QuartzScheduler是核心类,Scheduler做其代理,真正执行的是QuartzSchedulerThread

   4.JobStore存储控制,JobStoreSupport的两个实现JobStoreCMT容器管理事务,不需要使用commit和rollback;JobStoreTX用在单机环境,需要处理commit和rollback

5.数据库锁使用了悲观锁select for update,定义为Semaphore

6.qrtzschedulerstate定义了扫描间隔集群扫描间隔

 

参考文献:

【1】https://www.baeldung.com/spring-quartz-schedule

 【2】https://blog.csdn.net/xiaojin21cen/article/details/79298883

版权声明
本文为[一天不进步,就是退步]所创,转载请带上原文链接,感谢
https://www.cnblogs.com/davidwang456/p/10329616.html

编程之旅,人生之路,不止于编程,还有诗和远方。
阅代码原理,看框架知识,学企业实践;
赏诗词,读日记,踏人生之路,观世界之行;

支付宝红包,每日可领