做動(dòng)態(tài)的網(wǎng)站的參考資料有哪些/seo排名教程
1. 工作原理
1.1 流程圖

1.2 執(zhí)行示意圖

從上圖得知
如果當(dāng)前運(yùn)行的線程數(shù)小于corePoolSize(核心線程數(shù)),則會(huì)創(chuàng)建新線程作為核心線程來(lái)執(zhí)行任務(wù)(注意,執(zhí)行這一步需要獲取全局鎖)。
如果運(yùn)行的線程等于或多于corePoolSize,則將任務(wù)加入BlockingQueue中。
如果無(wú)法將任務(wù)加入BlockingQueue(隊(duì)列已滿),則將創(chuàng)建線程來(lái)執(zhí)行任務(wù)(注意,執(zhí)行這一步驟需要獲取全局鎖)。
如果第三步創(chuàng)建線程使得當(dāng)前線程數(shù)大于maximumPoolSize,任務(wù)將被拒絕,并調(diào)用RejectdExecutionHandler.rejectedExecution()方法。
2. 源碼解析
2.1 核心參數(shù)
corePoolSize 核心線程數(shù),池中所保存的線程數(shù),包括空閑線程。
maximumPoolSize 線程池最大容量,在核心線程數(shù)的基礎(chǔ)上可能會(huì)額外增加一些非核心線程,需要注意的是只有當(dāng)workQueue隊(duì)列填滿時(shí)才會(huì)創(chuàng)建多于corePoolSize的線程(線程池總線程數(shù)不超過(guò)maxPoolSize)
workQueue 用于保存任務(wù)的隊(duì)列
SynchronousQueue 線性安全、capacity是0不存儲(chǔ)任何元素、可以在兩個(gè)線程中傳遞同一個(gè)對(duì)象。synchronousQueue.put(object);synchronousQueue.take();如果不希望任務(wù)在隊(duì)列中等待而是希望將任務(wù)直接移交給工作線程,可使用SynchronousQueue作為等待隊(duì)列。
LinkedBlockingQueue 鏈?zhǔn)疥?duì)列,隊(duì)列容量不足或?yàn)?時(shí)自動(dòng)阻塞。鎖是分離的,高并發(fā)的情況下生產(chǎn)者和消費(fèi)者可以并行地操作隊(duì)列中的數(shù)據(jù),可以提高吞吐量。當(dāng)QPS很高,發(fā)送數(shù)據(jù)很大,大量的任務(wù)被添加到這個(gè)無(wú)界LinkedBlockingQueue 中,導(dǎo)致cpu和內(nèi)存飆升服務(wù)器掛掉。
ArrayBlockingQueue 數(shù)組實(shí)現(xiàn)的有界隊(duì)列,使用獨(dú)占鎖,生產(chǎn)者和消費(fèi)者無(wú)法并行操作,在高并發(fā)場(chǎng)景下會(huì)成為性能瓶頸。
PriorityBlockingQueue PriorityBlockingQueue中的優(yōu)先級(jí)由任務(wù)的Comparator決定。
keepAliveTime 空閑線程的存活時(shí)間 當(dāng)線程空閑時(shí)間達(dá)到keepAliveTime時(shí),線程會(huì)退出,直到線程數(shù)量=corePoolSize。如果allowCoreThreadTimeout=true,則會(huì)直到線程數(shù)量=0。
threadFactory 線程工廠 默認(rèn)DefaultThreadFactory 創(chuàng)建同樣分組(ThreadGroup) 同樣優(yōu)先級(jí)(NORM_PRIORITY) non-daemon的線程。
handler 拒絕策略處理 默認(rèn)AbortPolicy 拋異常RejectedExecutionException
AbortPolicy:不執(zhí)行該任務(wù),并拋出RejectedExecutionException異常。
CallerRunsPolicy:由調(diào)用線程處理該任務(wù),如果執(zhí)行程序已關(guān)閉,則會(huì)丟棄該任務(wù)。
DiscardOldestPolicy:丟棄隊(duì)列中最老的一個(gè)任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過(guò)程。
DiscardPolicy:不執(zhí)行該任務(wù),也不拋異常。
2.2 線程池狀態(tài)
2.2.1 線程池狀態(tài)
成員變量ctl是Integer的原子變量,使用一個(gè)變量同時(shí)記錄線程池狀態(tài)和線程池中線程個(gè)數(shù),假設(shè)計(jì)算機(jī)硬件的Integer類(lèi)型是32位二進(jìn)制標(biāo)示,如下面代碼所示,其中高3位用來(lái)表示線程池狀態(tài),后面29位用來(lái)記錄線程池線程個(gè)數(shù)。
//線程個(gè)數(shù)掩碼位數(shù),并不是所有平臺(tái)int類(lèi)型是32位,所以準(zhǔn)確說(shuō)是具體平臺(tái)下Integer的二進(jìn)制位數(shù)-3后的剩余位數(shù)才是線程的個(gè)數(shù)
private static final int COUNT_BITS = Integer.SIZE - 3;
//線程最大個(gè)數(shù)(低29位)00011111111111111111111111111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//(高3位):11100000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
//(高3位):00000000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
//(高3位):00100000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
//(高3位):01000000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
//(高3位):01100000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
獲取線程池的運(yùn)行狀態(tài)和線程池中的線程個(gè)數(shù)
// 獲取高三位 運(yùn)行狀態(tài)
private static int runStateOf(int c) { return c & ~CAPACITY; }
//獲取低29位 線程個(gè)數(shù)
private static int workerCountOf(int c) { return c & CAPACITY; }
//計(jì)算ctl新值,線程狀態(tài) 與 線程個(gè)數(shù)
private static int ctlOf(int rs, int wc) { return rs | wc; }
2.2.2 線程池狀態(tài)含義:
RUNNING:接收新任務(wù)并且處理阻塞隊(duì)列里的任務(wù)。
SHUTDOWN:拒絕新任務(wù)但是處理阻塞隊(duì)列里的任務(wù)。
STOP:拒絕新任務(wù)并且拋棄阻塞隊(duì)列里的任務(wù),同時(shí)中斷正在處理的任務(wù)。
TIDYING:所有任務(wù)都執(zhí)行完(包含阻塞隊(duì)列里面任務(wù)),當(dāng)前線程池活動(dòng)線程為0,將要調(diào)用terminated方法。
TERMINATED:終止?fàn)顟B(tài)。terminated方法調(diào)用完成以后的狀態(tài)。
2.3 執(zhí)行方法
2.3.1 線程池中任務(wù)提交方法
execute()方法
public void execute(Runnable command) {//(1) 如果任務(wù)為null,則拋出NPE異常if (command == null)throw new NullPointerException();//(2)獲取當(dāng)前線程池的狀態(tài)+線程個(gè)數(shù)變量的組合值int c = ctl.get();//(3)當(dāng)前線程池線程個(gè)數(shù)是否小于corePoolSize,小于則開(kāi)啟新線程運(yùn)行if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}//(4)如果線程池處于RUNNING狀態(tài),則添加任務(wù)到阻塞隊(duì)列if (isRunning(c) && workQueue.offer(command)) {//(4.1)二次檢查int recheck = ctl.get();//(4.2)如果當(dāng)前線程池狀態(tài)不是RUNNING則從隊(duì)列刪除任務(wù),并執(zhí)行拒絕策略if (! isRunning(recheck) && remove(command))reject(command);//(4.3)如果當(dāng)前線程池線程為空,則添加一個(gè)線程else if (workerCountOf(recheck) == 0)addWorker(null, false);}//(5)如果隊(duì)列滿了,則新增線程,新增失敗則執(zhí)行拒絕策略else if (!addWorker(command, false))reject(command);
}
submit(Runnable task)方法
public Future<?> submit(Runnable task) {// 6 NPE判斷if (task == null) throw new NullPointerException();// 7 包裝任務(wù)為FutureTaskRunnableFuture<Void> ftask = newTaskFor(task, null);// 8 投遞到線程池執(zhí)行execute(ftask);// 9 返回ftaskreturn ftask;
}protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {return new FutureTask<T>(runnable, value);
}public FutureTask(Runnable runnable, V result) {//將runnable適配為Callable類(lèi)型任務(wù),并且讓result作為執(zhí)行結(jié)果this.callable = Executors.callable(runnable, result);this.state = NEW; // ensure visibility of callable
}
submit(Runnable task,T result)方法
public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;
}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);
}
2.3.2 線程池中任務(wù)執(zhí)行方法
當(dāng)用戶線程提交任務(wù)到線程池后,在線程池沒(méi)有執(zhí)行拒絕策略的情況下,用戶線 程會(huì)馬上返回,而提交的任務(wù)要么直接切換到線程池中的Worker線程來(lái)執(zhí)行,要么先放入線程池的阻塞隊(duì)列里面,稍后再由Worker線程來(lái)執(zhí)行。
Worker(Runnable firstTask) {// 在調(diào)用runWorker前禁止中斷 避免當(dāng)前Worker在調(diào)用runWorker方法前被中斷(當(dāng)其他線程調(diào)用了線程池的shutdownNow時(shí),如果Worker狀態(tài)≥0則會(huì)中斷該線程)。status=0標(biāo)示鎖未被獲取的狀態(tài),state=1標(biāo)示鎖已經(jīng)被獲取的狀態(tài),state=–1是創(chuàng)建Worker時(shí)默認(rèn)的狀態(tài)。setState(-1);this.firstTask = firstTask;//使用線程池中指定的線程池工廠創(chuàng)建一個(gè)線程作為該Worker對(duì)象的執(zhí)行線程this.thread = getThreadFactory().newThread(this);
}public void run() {runWorker(this);//委托給runWorker方法
}final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;//(1)status設(shè)置為0,允許中斷。這時(shí)候調(diào)用shutdownNow會(huì)中斷Worker線程。w.unlock();boolean completedAbruptly = true;try {//(2)當(dāng)前task==null或者調(diào)用getTask從任務(wù)隊(duì)列獲取的任務(wù)返回nullwhile (task != null || (task = getTask()) != null) {//(2.1)獲取工作線程內(nèi)部持有的獨(dú)占鎖w.lock();...try {//(2.2)任務(wù)執(zhí)行前干一些事情beforeExecute(wt, task);Throwable thrown = null;try {//(2.3)執(zhí)行任務(wù)task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {//(2.4)任務(wù)執(zhí)行完畢后干一些事情afterExecute(task, thrown);}} finally {task = null;//(2.5)統(tǒng)計(jì)當(dāng)前Worker完成了多少個(gè)任務(wù)w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {//(3)執(zhí)行清工作processWorkerExit(w, completedAbruptly);}
}private void processWorkerExit(Worker w, boolean completedAbruptly) {...//(3.1)統(tǒng)計(jì)整個(gè)線程池完成的任務(wù)個(gè)數(shù),并從工作集里面刪除當(dāng)前woker 加全局鎖final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;workers.remove(w);} finally {mainLock.unlock();}//(3.2)如果當(dāng)前線程池狀態(tài)是shutdown狀態(tài)并且工作隊(duì)列為空,或者當(dāng)前是stop狀態(tài)并且當(dāng)前線程池里面沒(méi)有活動(dòng)線程,則設(shè)置線程池狀態(tài)為T(mén)ERMINATED。tryTerminate();//(3.3)判斷當(dāng)前線程中的線程個(gè)數(shù)是否小于核心線程個(gè)數(shù),如果是則新增一個(gè)線程。int c = ctl.get();if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false);}
}
2.3.3 線程池關(guān)閉方法
shutdown()方法
調(diào)用 shutdown后,線程池就不會(huì)再接收新的任務(wù),但是工作隊(duì)列里面的任務(wù)還是要執(zhí)行的,該方法是立刻返回的,并不同步等待隊(duì)列任務(wù)完成再返回。
首先使用CAS設(shè)置當(dāng)前線程池狀態(tài)為T(mén)IDYING,如果成功則執(zhí)行擴(kuò)展接口terminated在線程池狀態(tài)變?yōu)門(mén)ERMINATED前做一些事情,然后設(shè)置當(dāng)前 線程池狀態(tài)為T(mén)ERMINATED,最后調(diào)用termination.signalAll()來(lái)激活調(diào)用線程池的 awaitTermination系列方法被阻塞的所有線程。
public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//(1)權(quán)限檢查如果設(shè)置了安全管理器,則看當(dāng)前調(diào)用shutdown命令的線程是否有關(guān)閉線程的權(quán)限,如果有權(quán)限則還要看調(diào)用線程是否有中斷工作線程的權(quán)限,如果沒(méi)有權(quán)限則拋出SecurityException或者NullPointerException異常。checkShutdownAccess();//(2)設(shè)置當(dāng)前線程池狀態(tài)為SHUTDOWN,如果已經(jīng)是SHUTDOWN則直接返回advanceRunState(SHUTDOWN);//(3)設(shè)置中斷標(biāo)志。這里首先加了全局鎖,同時(shí)只有一個(gè)線程可以調(diào)用shutdown設(shè)置中斷標(biāo)志。然后嘗試獲取Worker本身的鎖,獲取成功則設(shè)置中斷標(biāo)識(shí),由于正在執(zhí)行的任務(wù)已經(jīng)獲取了鎖,所以正在執(zhí)行的任務(wù)沒(méi)有被中斷。這里中斷的是阻塞到getTask()方法,企圖從隊(duì)列里獲取任務(wù)的線程,也就是空閑線程。interruptIdleWorkers();onShutdown();} finally {mainLock.unlock();}//(4)嘗試狀態(tài)變?yōu)門(mén)ERMINATEDtryTerminate();
}private void advanceRunState(int targetState) {for (;;) {int c = ctl.get();if (runStateAtLeast(c, targetState) ||ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))break;}
}private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers) {Thread t = w.thread;//如果工作線程沒(méi)有被中斷,并且沒(méi)有正在運(yùn)行則設(shè)置中斷if (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();}
}final void tryTerminate() {for (;;) {
...int c = ctl.get();
...final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//設(shè)置當(dāng)前線程池狀態(tài)為T(mén)IDYINGif (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {terminated();} finally {//設(shè)置當(dāng)前線程池狀態(tài)為T(mén)ERMINATEDctl.set(ctlOf(TERMINATED, 0));//激活調(diào)用條件變量termination的await系列方法被阻塞的所有線程termination.signalAll();}return;}} finally {mainLock.unlock();}}
}
shutdownNow()方法
調(diào)用 shutdownnow后,線程池就不會(huì)再接收新的任務(wù),并且會(huì)丟棄工作隊(duì)列里面的任務(wù),正在執(zhí)行的任務(wù)也會(huì)被中斷,該方法是立刻返回的,并不同步等待激活的任務(wù)執(zhí)行完成再返回。
調(diào)用線程池隊(duì)列的drainTo方法把隊(duì)列中的任務(wù)移除到taskList 里,如果發(fā)現(xiàn)線程池隊(duì)列還不為空(比如DelayQueue或者其他類(lèi)型的隊(duì)列drainTo可能 移除元素失敗),則循環(huán)移除里面的元素,最后返回移除的任務(wù)列表。
public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//(5)權(quán)限檢查checkShutdownAccess();//(6) 設(shè)置線程池狀態(tài)為stopadvanceRunState(STOP);//(7)中斷所有線程,這里需要注意的是中斷所有線程,包含空閑線程和正在執(zhí)行任務(wù)的線程interruptWorkers();//(8)移動(dòng)隊(duì)列任務(wù)到taskstasks = drainQueue();} finally {mainLock.unlock();}//(9)終止?fàn)顟B(tài)tryTerminate();return tasks;
}private void interruptWorkers() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers)w.interruptIfStarted();} finally {mainLock.unlock();}
}private List<Runnable> drainQueue() {//8.1獲取任務(wù)隊(duì)列BlockingQueue<Runnable> q = workQueue;ArrayList<Runnable> taskList = new ArrayList<Runnable>();//8.2 從任務(wù)隊(duì)列移除任務(wù)到taskList列表q.drainTo(taskList);//8.3 如果q還不為空,則說(shuō)明drainTo接口調(diào)用失效,則循環(huán)移除if (!q.isEmpty()) {for (Runnable r : q.toArray(new Runnable[0])) {if (q.remove(r))taskList.add(r);}}//8.4返回異常的任務(wù)列表return taskList;
}
2.4 參數(shù)優(yōu)化
2.4.1 默認(rèn)值
corePoolSize = 1
maxPoolSize = Integer.MAX_VALUE
queueCapacity = Integer.MAX_VALUE
keepAliveTime = 60s
allowCoreThreadTimeout = false
rejectedExecutionHandler = AbortPolicy()
需要根據(jù)幾個(gè)值來(lái)決定
tasks :每秒的任務(wù)數(shù),假設(shè)為500~1000
taskcost:每個(gè)任務(wù)花費(fèi)時(shí)間,假設(shè)為0.1s
responsetime:系統(tǒng)允許容忍的最大響應(yīng)時(shí)間,假設(shè)為1s
corePoolSize
corePoolSize = 每秒需要多少個(gè)線程處理?
threadcount = tasks/(1/taskcost) = taskstaskcout = (500 ~ 1000)0.1 = 50~100 個(gè)線程。
corePoolSize設(shè)置應(yīng)該大于50。根據(jù)8020原則,如果80%的每秒任務(wù)數(shù)小于800,那么corePoolSize設(shè)置為80。
queueCapacity
隊(duì)列容量 queueCapacity = (coreSizePool/taskcost)*responsetime
切記不能設(shè)置為Integer.MAX_VALUE,這樣隊(duì)列會(huì)很大,線程數(shù)只會(huì)保持在corePoolSize大小,當(dāng)任務(wù)陡增時(shí),
不能新開(kāi)線程來(lái)執(zhí)行,響應(yīng)時(shí)間會(huì)隨之陡增。
maxPoolSize
最大線程數(shù)在生產(chǎn)環(huán)境上我們往往設(shè)置成corePoolSize一樣,這樣可以減少在處理過(guò)程中創(chuàng)建線程的開(kāi)銷(xiāo)。
rejectedExecutionHandler:根據(jù)具體情況來(lái)決定,任務(wù)不重要可丟棄,任務(wù)重要?jiǎng)t要利用一些緩沖機(jī)制來(lái)處理。
keepAliveTime和allowCoreThreadTimeout采用默認(rèn)通常能滿足。
2.4.2 參考值
@Configuration
public class ConcurrentThreadGlobalConfig {@Beanpublic ThreadPoolTaskExecutor defaultThreadPool() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();//核心線程數(shù)目executor.setCorePoolSize(65);//指定最大線程數(shù)executor.setMaxPoolSize(65);//隊(duì)列中最大的數(shù)目executor.setQueueCapacity(650);//線程名稱前綴executor.setThreadNamePrefix("DefaultThreadPool_");//rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù)//CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是由調(diào)用者所在的線程來(lái)執(zhí)行//對(duì)拒絕task的處理策略executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//線程空閑后的最大存活時(shí)間executor.setKeepAliveSeconds(60);//加載executor.initialize();return executor;}
}
以上