用身份證備案網(wǎng)站外貿(mào)營銷渠道
?
我們在前面文章中說到過Quartz涉及到的線程,但是散落在幾篇文章中,不好找。而Quartz涉及到的線程對于理解Quartz也比較重要,所以今天專門提取出來單獨(dú)說一下。
Quartz中的主要線程:
-
任務(wù)調(diào)度線程QuartzSchedulerThread
-
任務(wù)執(zhí)行線程
-
Misfire處理線程
-
ClusterManager線程
任務(wù)調(diào)度線程 QuartzSchedulerThread
QuartzSchedulerThread是任務(wù)調(diào)度線程,他的職責(zé)是對滿足觸發(fā)條件(nextFireTimer到了)的注冊到JobStore的Trigger分配給可用的任務(wù)執(zhí)行線程去執(zhí)行。
QuartzSchedulerThread啟動
任務(wù)調(diào)度線程QuartzSchedulerThread是在調(diào)度器Scheduler創(chuàng)建的時候啟動的。
應(yīng)用層通過以下調(diào)用創(chuàng)建Scheduler:
Scheduler sche = new StdSchedulerFactory().getScheduler();
一般情況下通過StdSchedulerFactory構(gòu)建Scheduler,getScheduler首先嘗試從SchedulerRepository獲取Schedule,首次運(yùn)行獲取不到,則通過instantiate()方法獲取。
public Scheduler getScheduler() throws SchedulerException {if (cfg == null) {initialize();}SchedulerRepository schedRep = SchedulerRepository.getInstance();Scheduler sched = schedRep.lookup(getSchedulerName());if (sched != null) {if (sched.isShutdown()) {schedRep.remove(getSchedulerName());} else {return sched;}} sched = instantiate();return sched;
}
instantiate()方法特別特別特別長,幾乎就是在這里完成Quartz所有相關(guān)組件的初始化的。
其中會創(chuàng)建QuartzScheduler,之后會將QuartzScheduler包裝到stdScheduler中存入SchedulerRepository中。
instantiate()創(chuàng)建QuartzScheduler是調(diào)用的構(gòu)造方法:
public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)throws SchedulerException {this.resources = resources;if (resources.getJobStore() instanceof JobListener) {addInternalJobListener((JobListener)resources.getJobStore());}this.schedThread = new QuartzSchedulerThread(this, resources); ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();schedThreadExecutor.execute(this.schedThread);if (idleWaitTime > 0) { this.schedThread.setIdleWaitTime(idleWaitTime);}jobMgr = new ExecutingJobsManager();addInternalJobListener(jobMgr);errLogger = new ErrorLogger();addInternalSchedulerListener(errLogger);signaler = new SchedulerSignalerImpl(this, this.schedThread);getLog().info("Quartz Scheduler v." + getVersion() + " created.");
}
可以看到構(gòu)造方法中創(chuàng)建了QuartzSchedulerThread對象,之后獲取ThreadExecutor(一般情況下為DefaultThreadExecutor)并通過調(diào)用其execute方法啟動QuartzSchedulerThread線程:
public class DefaultThreadExecutor implements ThreadExecutor {public void initialize() {}public void execute(Thread thread) {thread.start();}}
QuartzSchedulerThread的運(yùn)行
其實(shí)就是他的run方法,我們也大概分析過,其主要邏輯是:
-
從作業(yè)執(zhí)行線程池獲取availThreadCount,也就是當(dāng)前可用的線程數(shù)
-
調(diào)用JobStore的acquireNextTriggers方法,獲取特定短時間(idleWaitTime,默認(rèn)30秒)內(nèi)可能需要被觸發(fā)的,數(shù)量不超過availThreadCount的觸發(fā)器
-
調(diào)用JobStore的triggersFired方法對獲取到的可能需要被觸發(fā)的觸發(fā)器進(jìn)行二次加工,再次獲取到最終的待觸發(fā)器結(jié)果集
-
循環(huán)處理最終的待處理觸發(fā)器結(jié)果集中的每一個需要被觸發(fā)的觸發(fā)器
-
用JobRunShell包裝該觸發(fā)器,送給線程池執(zhí)行該觸發(fā)器關(guān)聯(lián)的作業(yè)
好了,作業(yè)調(diào)度線程QuartzSchedulerThread我們就基本搞清楚了。
作業(yè)執(zhí)行線程
Quartz的作業(yè)執(zhí)行線程是放在線程池中進(jìn)行管理的,默認(rèn)是SimpleTreadPool,有關(guān)SimpleThreadPool我們前面專門有一篇文章介紹過,這里就不再贅述了。
作業(yè)執(zhí)行線程和作業(yè)調(diào)度線程一樣,也是在作業(yè)調(diào)度器Scheduler創(chuàng)建后立即啟動,這個過程同樣也是在StdSchedulerFactory的instantiate()方法中完成的:
instantiate()創(chuàng)建SimpleThreadPool之后會調(diào)用SimpleThreadPool的initialize方法,根據(jù)配置文件指定的任務(wù)執(zhí)行線程數(shù)完成工作線程的初始化和啟動。比如配置文件設(shè)置為10則初始化10個工作線程并逐個啟動。作業(yè)執(zhí)行線程的初始化及啟動的詳細(xì)過程請參考Quartz - SimpleThreadPool。
MisfireHandler線程
如果你的項目使用RAMJobStore,而不是JDBC-based JobStore(指需要持久化到數(shù)據(jù)庫的JobStore),那么就不存在Misfire處理線程。
因為RAMJobStore在處理正常觸發(fā)的過程中順便就處理了Misfire,所以就不再需要其他處理機(jī)制了,這部分我們在前面的文章中也分析過Quartz - Misfire (for RAMJobstore)。
JDBC-based JobStore在處理正常觸發(fā)的時候只獲取未錯過觸發(fā)時間的觸發(fā)器,對于錯過觸發(fā)時間的、也就是Misfire的觸發(fā)器就需要另外的機(jī)制來處理。
Misfire處理線程就是Quartz采用JDBC-based JobStore的情況下用來處理錯過觸發(fā)時機(jī)的觸發(fā)器的線程。
MisfireHandler啟動
MisfireHandler定義在JobStoreSupport類中,JobStoreSupport是JobStore的JDBC-based JobStore的虛擬類,Quartz主要提供了兩個基于JDBC的JobStore的實(shí)現(xiàn):JobStoreTX、JobStoreCMT。JDBC-based JobStore詳細(xì)內(nèi)容請參考Quartz - JDBC-Based JobStore,今天主要分析MisfireHandler。
我們在應(yīng)用中創(chuàng)建任務(wù)調(diào)度器Scheduler后需要調(diào)用他的start方法:
Scheduler sche = new StdSchedulerFactory().getScheduler();sche.scheduleJob(jobDetail,trigger);sche.start();
這個start方法會調(diào)用到JobStore的schedulerStarted()方法,如果我們應(yīng)用中采用的是JDBC-based JobStore的話,會調(diào)用到JobStoreSupport的schedulerStarted(),其中會創(chuàng)建MisfireHandler之后調(diào)用MisfireHandler的initialize():
misfireHandler = new MisfireHandler();if(initializersLoader != null)misfireHandler.setContextClassLoader(initializersLoader);misfireHandler.initialize();
Misfire的initialize將創(chuàng)建好的MisFireHandle線程交給ThreadExecutor啟動。
public void initialize() {ThreadExecutor executor = getThreadExecutor();executor.execute(MisfireHandler.this);}
MisfireHandle的運(yùn)行
也就是MisfireHandle的run方法。處理邏輯和RAMJobStore處理misfired trigger的邏輯類似,只不過MisfireHandle的所有處理邏輯都是通過數(shù)據(jù)庫操作完成的。
JDBC-Based JobStore對應(yīng)的表結(jié)構(gòu)我們會找機(jī)會專門分析,這里就不詳細(xì)展開了。
MisfireHandle的run方法的主要邏輯為:
-
從數(shù)據(jù)庫中獲取在WAITING(等待執(zhí)行)狀態(tài)、下次執(zhí)行時間小于msifiredtime(當(dāng)前時間 - MisfireThreshold)的觸發(fā)器,也就是錯過觸發(fā)時間的觸發(fā)器
-
為了避免存在大量misfired trigger的情況下,一次處理太多數(shù)據(jù)影響其他正常觸發(fā)器的執(zhí)行,MisfireHandle線程每次僅獲取部分而不是全部misfired trigger(參數(shù)maxToRecoverAtATime指定,默認(rèn)為20)
-
對獲取到的每一個錯過執(zhí)行時間的觸發(fā)器(misfired trigger),調(diào)用觸發(fā)器的updateAfterMisfire方法獲取下次執(zhí)行時間,updateAfterMisfire方法我們在上一篇講Misfire處理策略的文章中說過,就是根據(jù)觸發(fā)器的處理策略獲取下次執(zhí)行時間
-
updateAfterMisfire方法執(zhí)行后,獲取到的觸發(fā)器的下次執(zhí)行時間如果不為空的話,更新到數(shù)據(jù)庫中,等待正常的任務(wù)執(zhí)行線程調(diào)度執(zhí)行
ClusterManager線程
與MisfiredHandle一樣,ClusterManager線程也是JDBC-Based JobStore特有的。
顧名思義,ClusterManager線程與集群有關(guān)系,JDBC-Based JobStore是可以支持Quartz的集群部署的,在集群環(huán)境下,Quartz服務(wù)節(jié)點(diǎn)可能會down機(jī)、掉線,從而影響任務(wù)的執(zhí)行,ClusterManager線程就是負(fù)責(zé)檢查Quartz服務(wù)節(jié)點(diǎn)的在線狀態(tài)的,如果發(fā)生掉線后,將該服務(wù)節(jié)點(diǎn)負(fù)責(zé)的觸發(fā)器交給其他服務(wù)節(jié)點(diǎn)來處理。
具體邏輯等到我們分析完Quartz Cluster之后補(bǔ)充。