有.net源碼如何做網(wǎng)站/網(wǎng)絡優(yōu)化培訓要多少錢
1. 工作原理
1.1 流程圖

1.2 執(zhí)行示意圖

從上圖得知
如果當前運行的線程數(shù)小于corePoolSize(核心線程數(shù)),則會創(chuàng)建新線程作為核心線程來執(zhí)行任務(注意,執(zhí)行這一步需要獲取全局鎖)。
如果運行的線程等于或多于corePoolSize,則將任務加入BlockingQueue中。
如果無法將任務加入BlockingQueue(隊列已滿),則將創(chuàng)建線程來執(zhí)行任務(注意,執(zhí)行這一步驟需要獲取全局鎖)。
如果第三步創(chuàng)建線程使得當前線程數(shù)大于maximumPoolSize,任務將被拒絕,并調用RejectdExecutionHandler.rejectedExecution()方法。
2. 源碼解析
2.1 核心參數(shù)
corePoolSize 核心線程數(shù),池中所保存的線程數(shù),包括空閑線程。
maximumPoolSize 線程池最大容量,在核心線程數(shù)的基礎上可能會額外增加一些非核心線程,需要注意的是只有當workQueue隊列填滿時才會創(chuàng)建多于corePoolSize的線程(線程池總線程數(shù)不超過maxPoolSize)
workQueue 用于保存任務的隊列
SynchronousQueue 線性安全、capacity是0不存儲任何元素、可以在兩個線程中傳遞同一個對象。synchronousQueue.put(object);synchronousQueue.take();如果不希望任務在隊列中等待而是希望將任務直接移交給工作線程,可使用SynchronousQueue作為等待隊列。
LinkedBlockingQueue 鏈式隊列,隊列容量不足或為0時自動阻塞。鎖是分離的,高并發(fā)的情況下生產者和消費者可以并行地操作隊列中的數(shù)據(jù),可以提高吞吐量。當QPS很高,發(fā)送數(shù)據(jù)很大,大量的任務被添加到這個無界LinkedBlockingQueue 中,導致cpu和內存飆升服務器掛掉。
ArrayBlockingQueue 數(shù)組實現(xiàn)的有界隊列,使用獨占鎖,生產者和消費者無法并行操作,在高并發(fā)場景下會成為性能瓶頸。
PriorityBlockingQueue PriorityBlockingQueue中的優(yōu)先級由任務的Comparator決定。
keepAliveTime 空閑線程的存活時間 當線程空閑時間達到keepAliveTime時,線程會退出,直到線程數(shù)量=corePoolSize。如果allowCoreThreadTimeout=true,則會直到線程數(shù)量=0。
threadFactory 線程工廠 默認DefaultThreadFactory 創(chuàng)建同樣分組(ThreadGroup) 同樣優(yōu)先級(NORM_PRIORITY) non-daemon的線程。
handler 拒絕策略處理 默認AbortPolicy 拋異常RejectedExecutionException
AbortPolicy:不執(zhí)行該任務,并拋出RejectedExecutionException異常。
CallerRunsPolicy:由調用線程處理該任務,如果執(zhí)行程序已關閉,則會丟棄該任務。
DiscardOldestPolicy:丟棄隊列中最老的一個任務,然后重新嘗試執(zhí)行任務(重復此過程。
DiscardPolicy:不執(zhí)行該任務,也不拋異常。
2.2 線程池狀態(tài)
2.2.1 線程池狀態(tài)
成員變量ctl是Integer的原子變量,使用一個變量同時記錄線程池狀態(tài)和線程池中線程個數(shù),假設計算機硬件的Integer類型是32位二進制標示,如下面代碼所示,其中高3位用來表示線程池狀態(tài),后面29位用來記錄線程池線程個數(shù)。
//線程個數(shù)掩碼位數(shù),并不是所有平臺int類型是32位,所以準確說是具體平臺下Integer的二進制位數(shù)-3后的剩余位數(shù)才是線程的個數(shù)
private static final int COUNT_BITS = Integer.SIZE - 3;
//線程最大個數(shù)(低29位)00011111111111111111111111111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//(高3位):11100000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
//(高3位):00000000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
//(高3位):00100000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
//(高3位):01000000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
//(高3位):01100000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
獲取線程池的運行狀態(tài)和線程池中的線程個數(shù)
// 獲取高三位 運行狀態(tài)
private static int runStateOf(int c) { return c & ~CAPACITY; }
//獲取低29位 線程個數(shù)
private static int workerCountOf(int c) { return c & CAPACITY; }
//計算ctl新值,線程狀態(tài) 與 線程個數(shù)
private static int ctlOf(int rs, int wc) { return rs | wc; }
2.2.2 線程池狀態(tài)含義:
RUNNING:接收新任務并且處理阻塞隊列里的任務。
SHUTDOWN:拒絕新任務但是處理阻塞隊列里的任務。
STOP:拒絕新任務并且拋棄阻塞隊列里的任務,同時中斷正在處理的任務。
TIDYING:所有任務都執(zhí)行完(包含阻塞隊列里面任務),當前線程池活動線程為0,將要調用terminated方法。
TERMINATED:終止狀態(tài)。terminated方法調用完成以后的狀態(tài)。
2.3 執(zhí)行方法
2.3.1 線程池中任務提交方法
execute()方法
public void execute(Runnable command) {//(1) 如果任務為null,則拋出NPE異常if (command == null)throw new NullPointerException();//(2)獲取當前線程池的狀態(tài)+線程個數(shù)變量的組合值int c = ctl.get();//(3)當前線程池線程個數(shù)是否小于corePoolSize,小于則開啟新線程運行if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}//(4)如果線程池處于RUNNING狀態(tài),則添加任務到阻塞隊列if (isRunning(c) && workQueue.offer(command)) {//(4.1)二次檢查int recheck = ctl.get();//(4.2)如果當前線程池狀態(tài)不是RUNNING則從隊列刪除任務,并執(zhí)行拒絕策略if (! isRunning(recheck) && remove(command))reject(command);//(4.3)如果當前線程池線程為空,則添加一個線程else if (workerCountOf(recheck) == 0)addWorker(null, false);}//(5)如果隊列滿了,則新增線程,新增失敗則執(zhí)行拒絕策略else if (!addWorker(command, false))reject(command);
}
submit(Runnable task)方法
public Future<?> submit(Runnable task) {// 6 NPE判斷if (task == null) throw new NullPointerException();// 7 包裝任務為FutureTaskRunnableFuture<Void> ftask = newTaskFor(task, null);// 8 投遞到線程池執(zhí)行execute(ftask);// 9 返回ftaskreturn ftask;
}protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {return new FutureTask<T>(runnable, value);
}public FutureTask(Runnable runnable, V result) {//將runnable適配為Callable類型任務,并且讓result作為執(zhí)行結果this.callable = Executors.callable(runnable, result);this.state = NEW; // ensure visibility of callable
}
submit(Runnable task,T result)方法
public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;
}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);
}
2.3.2 線程池中任務執(zhí)行方法
當用戶線程提交任務到線程池后,在線程池沒有執(zhí)行拒絕策略的情況下,用戶線 程會馬上返回,而提交的任務要么直接切換到線程池中的Worker線程來執(zhí)行,要么先放入線程池的阻塞隊列里面,稍后再由Worker線程來執(zhí)行。
Worker(Runnable firstTask) {// 在調用runWorker前禁止中斷 避免當前Worker在調用runWorker方法前被中斷(當其他線程調用了線程池的shutdownNow時,如果Worker狀態(tài)≥0則會中斷該線程)。status=0標示鎖未被獲取的狀態(tài),state=1標示鎖已經(jīng)被獲取的狀態(tài),state=–1是創(chuàng)建Worker時默認的狀態(tài)。setState(-1);this.firstTask = firstTask;//使用線程池中指定的線程池工廠創(chuàng)建一個線程作為該Worker對象的執(zhí)行線程this.thread = getThreadFactory().newThread(this);
}public void run() {runWorker(this);//委托給runWorker方法
}final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;//(1)status設置為0,允許中斷。這時候調用shutdownNow會中斷Worker線程。w.unlock();boolean completedAbruptly = true;try {//(2)當前task==null或者調用getTask從任務隊列獲取的任務返回nullwhile (task != null || (task = getTask()) != null) {//(2.1)獲取工作線程內部持有的獨占鎖w.lock();...try {//(2.2)任務執(zhí)行前干一些事情beforeExecute(wt, task);Throwable thrown = null;try {//(2.3)執(zhí)行任務task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {//(2.4)任務執(zhí)行完畢后干一些事情afterExecute(task, thrown);}} finally {task = null;//(2.5)統(tǒng)計當前Worker完成了多少個任務w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {//(3)執(zhí)行清工作processWorkerExit(w, completedAbruptly);}
}private void processWorkerExit(Worker w, boolean completedAbruptly) {...//(3.1)統(tǒng)計整個線程池完成的任務個數(shù),并從工作集里面刪除當前woker 加全局鎖final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;workers.remove(w);} finally {mainLock.unlock();}//(3.2)如果當前線程池狀態(tài)是shutdown狀態(tài)并且工作隊列為空,或者當前是stop狀態(tài)并且當前線程池里面沒有活動線程,則設置線程池狀態(tài)為TERMINATED。tryTerminate();//(3.3)判斷當前線程中的線程個數(shù)是否小于核心線程個數(shù),如果是則新增一個線程。int c = ctl.get();if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false);}
}
2.3.3 線程池關閉方法
shutdown()方法
調用 shutdown后,線程池就不會再接收新的任務,但是工作隊列里面的任務還是要執(zhí)行的,該方法是立刻返回的,并不同步等待隊列任務完成再返回。
首先使用CAS設置當前線程池狀態(tài)為TIDYING,如果成功則執(zhí)行擴展接口terminated在線程池狀態(tài)變?yōu)門ERMINATED前做一些事情,然后設置當前 線程池狀態(tài)為TERMINATED,最后調用termination.signalAll()來激活調用線程池的 awaitTermination系列方法被阻塞的所有線程。
public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//(1)權限檢查如果設置了安全管理器,則看當前調用shutdown命令的線程是否有關閉線程的權限,如果有權限則還要看調用線程是否有中斷工作線程的權限,如果沒有權限則拋出SecurityException或者NullPointerException異常。checkShutdownAccess();//(2)設置當前線程池狀態(tài)為SHUTDOWN,如果已經(jīng)是SHUTDOWN則直接返回advanceRunState(SHUTDOWN);//(3)設置中斷標志。這里首先加了全局鎖,同時只有一個線程可以調用shutdown設置中斷標志。然后嘗試獲取Worker本身的鎖,獲取成功則設置中斷標識,由于正在執(zhí)行的任務已經(jīng)獲取了鎖,所以正在執(zhí)行的任務沒有被中斷。這里中斷的是阻塞到getTask()方法,企圖從隊列里獲取任務的線程,也就是空閑線程。interruptIdleWorkers();onShutdown();} finally {mainLock.unlock();}//(4)嘗試狀態(tài)變?yōu)門ERMINATEDtryTerminate();
}private void advanceRunState(int targetState) {for (;;) {int c = ctl.get();if (runStateAtLeast(c, targetState) ||ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))break;}
}private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers) {Thread t = w.thread;//如果工作線程沒有被中斷,并且沒有正在運行則設置中斷if (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();}
}final void tryTerminate() {for (;;) {
...int c = ctl.get();
...final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//設置當前線程池狀態(tài)為TIDYINGif (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {terminated();} finally {//設置當前線程池狀態(tài)為TERMINATEDctl.set(ctlOf(TERMINATED, 0));//激活調用條件變量termination的await系列方法被阻塞的所有線程termination.signalAll();}return;}} finally {mainLock.unlock();}}
}
shutdownNow()方法
調用 shutdownnow后,線程池就不會再接收新的任務,并且會丟棄工作隊列里面的任務,正在執(zhí)行的任務也會被中斷,該方法是立刻返回的,并不同步等待激活的任務執(zhí)行完成再返回。
調用線程池隊列的drainTo方法把隊列中的任務移除到taskList 里,如果發(fā)現(xiàn)線程池隊列還不為空(比如DelayQueue或者其他類型的隊列drainTo可能 移除元素失敗),則循環(huán)移除里面的元素,最后返回移除的任務列表。
public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//(5)權限檢查checkShutdownAccess();//(6) 設置線程池狀態(tài)為stopadvanceRunState(STOP);//(7)中斷所有線程,這里需要注意的是中斷所有線程,包含空閑線程和正在執(zhí)行任務的線程interruptWorkers();//(8)移動隊列任務到taskstasks = drainQueue();} finally {mainLock.unlock();}//(9)終止狀態(tài)tryTerminate();return tasks;
}private void interruptWorkers() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers)w.interruptIfStarted();} finally {mainLock.unlock();}
}private List<Runnable> drainQueue() {//8.1獲取任務隊列BlockingQueue<Runnable> q = workQueue;ArrayList<Runnable> taskList = new ArrayList<Runnable>();//8.2 從任務隊列移除任務到taskList列表q.drainTo(taskList);//8.3 如果q還不為空,則說明drainTo接口調用失效,則循環(huán)移除if (!q.isEmpty()) {for (Runnable r : q.toArray(new Runnable[0])) {if (q.remove(r))taskList.add(r);}}//8.4返回異常的任務列表return taskList;
}
2.4 參數(shù)優(yōu)化
2.4.1 默認值
corePoolSize = 1
maxPoolSize = Integer.MAX_VALUE
queueCapacity = Integer.MAX_VALUE
keepAliveTime = 60s
allowCoreThreadTimeout = false
rejectedExecutionHandler = AbortPolicy()
需要根據(jù)幾個值來決定
tasks :每秒的任務數(shù),假設為500~1000
taskcost:每個任務花費時間,假設為0.1s
responsetime:系統(tǒng)允許容忍的最大響應時間,假設為1s
corePoolSize
corePoolSize = 每秒需要多少個線程處理?
threadcount = tasks/(1/taskcost) = taskstaskcout = (500 ~ 1000)0.1 = 50~100 個線程。
corePoolSize設置應該大于50。根據(jù)8020原則,如果80%的每秒任務數(shù)小于800,那么corePoolSize設置為80。
queueCapacity
隊列容量 queueCapacity = (coreSizePool/taskcost)*responsetime
切記不能設置為Integer.MAX_VALUE,這樣隊列會很大,線程數(shù)只會保持在corePoolSize大小,當任務陡增時,
不能新開線程來執(zhí)行,響應時間會隨之陡增。
maxPoolSize
最大線程數(shù)在生產環(huán)境上我們往往設置成corePoolSize一樣,這樣可以減少在處理過程中創(chuàng)建線程的開銷。
rejectedExecutionHandler:根據(jù)具體情況來決定,任務不重要可丟棄,任務重要則要利用一些緩沖機制來處理。
keepAliveTime和allowCoreThreadTimeout采用默認通常能滿足。
2.4.2 參考值
@Configuration
public class ConcurrentThreadGlobalConfig {@Beanpublic ThreadPoolTaskExecutor defaultThreadPool() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();//核心線程數(shù)目executor.setCorePoolSize(65);//指定最大線程數(shù)executor.setMaxPoolSize(65);//隊列中最大的數(shù)目executor.setQueueCapacity(650);//線程名稱前綴executor.setThreadNamePrefix("DefaultThreadPool_");//rejection-policy:當pool已經(jīng)達到max size的時候,如何處理新任務//CALLER_RUNS:不在新線程中執(zhí)行任務,而是由調用者所在的線程來執(zhí)行//對拒絕task的處理策略executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//線程空閑后的最大存活時間executor.setKeepAliveSeconds(60);//加載executor.initialize();return executor;}
}
以上