專業(yè)的網(wǎng)站建設(shè)哪家快業(yè)務(wù)員用什么軟件找客戶
碼出高效:Java開發(fā)手冊筆記(線程池及其源碼)
碼出高效:Java開發(fā)手冊筆記(線程池及其源碼)
- 碼出高效:Java開發(fā)手冊筆記(線程池及其源碼)
- 前言
- 一、線程池的作用
- 線程的生命周期
- 二、線程池7大參數(shù)
- execute() 與 submit()
- Executors 靜態(tài)工廠方法創(chuàng)建線程池
- 自定義實(shí)現(xiàn)線程工廠 ThreadFactory
- 自定義線程池拒絕策略 實(shí)現(xiàn)*RejectedExecutionHandler*
- 線程池拒絕策略
- 三、線程池源碼詳解
- 總結(jié)
前言
線程池
一、線程池的作用
線程使應(yīng)用能夠更加充分合理地協(xié)調(diào)利用 CPU、內(nèi)存、網(wǎng)絡(luò)、 I/O 等系統(tǒng)資源。線程的創(chuàng)建需要開辟虛擬機(jī)棧、本地方法棧、程序計(jì)數(shù)器等線程私有的內(nèi)存空間。在線程銷毀時(shí)需要回收這些系統(tǒng)資源。頻繁地創(chuàng)建和銷毀線程會(huì)浪費(fèi)大量的系統(tǒng)資源,增加并發(fā)編程風(fēng)險(xiǎn)。另外,在服務(wù)器負(fù)載過大的時(shí)候,如何讓新的線程等待或者友好地拒絕服務(wù)?這些都是線程自身無法解決的。所以需要通過線程池協(xié)調(diào)多個(gè)線程 , 并實(shí)現(xiàn)類似主次線程隔離、定時(shí)執(zhí)行、周期執(zhí)行等任務(wù)。線程池的作用包括:
- 利用線程池管理并復(fù)用線程、控制最大并發(fā)數(shù)等。
- 實(shí)現(xiàn)任務(wù)線程隊(duì)列緩存策略和拒絕機(jī)制。
- 實(shí)現(xiàn)某些與時(shí)間相關(guān)的功能,如定時(shí)執(zhí)行、周期執(zhí)行等。
- 隔離線程環(huán)境。比如,交易服務(wù)和搜索服務(wù)在同一臺(tái)服務(wù)器上,分別開啟兩個(gè)線程池,交易線程的資源消耗明顯要大;因此,通過配置獨(dú)立的線程池 ,將較慢的交易服務(wù)與搜索服務(wù)隔離開,避免各服務(wù)線程相互影響。
線程的生命周期
線程池重要的參數(shù),ctl,是一個(gè)包含兩個(gè)屬性的原子整型;一個(gè)屬性是 workerCount,指的是有效線程數(shù);另一個(gè)是 runState,指的是線程的狀態(tài)(線程的生命周期)
/*** The workerCount is the number of workers that have been* permitted to start and not permitted to stop. The value may be* transiently different from the actual number of live threads,* for example when a ThreadFactory fails to create a thread when* asked, and when exiting threads are still performing* bookkeeping before terminating. The user-visible pool size is* reported as the current size of the workers set.*/
/*** The runState provides the main lifecycle control, taking on values:** RUNNING: Accept new tasks and process queued tasks* SHUTDOWN: Don't accept new tasks, but process queued tasks* STOP: Don't accept new tasks, don't process queued tasks,* and interrupt in-progress tasks* TIDYING: All tasks have terminated, workerCount is zero,* the thread transitioning to state TIDYING* will run the terminated() hook method* TERMINATED: terminated() has completed*/// runState is stored in the high-order bitsprivate static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;
Thread類中的線程生命周期
public enum State {/*** Thread state for a thread which has not yet started.*/NEW,/*** Thread state for a runnable thread. A thread in the runnable* state is executing in the Java virtual machine but it may* be waiting for other resources from the operating system* such as processor.*/RUNNABLE,/*** Thread state for a thread blocked waiting for a monitor lock.* A thread in the blocked state is waiting for a monitor lock* to enter a synchronized block/method or* reenter a synchronized block/method after calling* {@link Object#wait() Object.wait}.*/BLOCKED,/*** Thread state for a waiting thread.* A thread is in the waiting state due to calling one of the* following methods:* <ul>* <li>{@link Object#wait() Object.wait} with no timeout</li>* <li>{@link #join() Thread.join} with no timeout</li>* <li>{@link LockSupport#park() LockSupport.park}</li>* </ul>** <p>A thread in the waiting state is waiting for another thread to* perform a particular action.** For example, a thread that has called <tt>Object.wait()</tt>* on an object is waiting for another thread to call* <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on* that object. A thread that has called <tt>Thread.join()</tt>* is waiting for a specified thread to terminate.*/WAITING,/*** Thread state for a waiting thread with a specified waiting time.* A thread is in the timed waiting state due to calling one of* the following methods with a specified positive waiting time:* <ul>* <li>{@link #sleep Thread.sleep}</li>* <li>{@link Object#wait(long) Object.wait} with timeout</li>* <li>{@link #join(long) Thread.join} with timeout</li>* <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>* <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>* </ul>*/TIMED_WAITING,/*** Thread state for a terminated thread.* The thread has completed execution.*/TERMINATED;
}
在了解線程池的基本作用后,我們學(xué)習(xí)一下線程池是如何創(chuàng)建線程的。首先從 ThreadPoolExecutor 構(gòu)造方法講起 , 學(xué)習(xí)如何自定義 ThreadFactory 和RejectedExecutionHandler , 并編寫一個(gè)最簡單的線程池示例。然后,通過分析ThreadPoolExecutor 的 execute 和 addWorker 兩個(gè)核心方法,學(xué)習(xí)如何把任務(wù)線程加入到線程池中運(yùn)行。 ThreadPoolExecutor 的構(gòu)造方法如下:
二、線程池7大參數(shù)
public ThreadPoolExecutor(int corePoolSize, //(第一個(gè)參數(shù))int maximumPoolSize, //(第二個(gè)參數(shù))long keepAliveTime, //(第三個(gè)參數(shù))TimeUnit unit, //(第四個(gè)參數(shù))BlockingQueue<Runnable> workQueue, //(第五個(gè)參數(shù))ThreadFactory threadFactory, //(第六個(gè)參數(shù))RejectedExecutionHandler handler) { //(第七個(gè)參數(shù))if (corePoolSize < 0 ||// maximumPoolSize必須大于或等于1也要大于或等于corePoolSize (第1處)maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();// (第2處)if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;
}
???? 第 1 個(gè)參數(shù):corePoolSize 表示常駐核心線程數(shù)。如果等于 0 ,則任務(wù)執(zhí)行完之后 ,沒有任何請求進(jìn)入時(shí)銷毀線程池的線程;如果大于 0 ,即使本地任務(wù)執(zhí)行完畢,核心線程也不會(huì)被銷毀。這個(gè)值的設(shè)置非常關(guān)鍵,設(shè)置過大會(huì)浪費(fèi)資源,設(shè)置過小會(huì)導(dǎo)致線程頻繁地創(chuàng)建或銷毀。
???? 第 2 個(gè)參數(shù): maximumPoolSize 表示線程池能夠容納同時(shí)執(zhí)行的最大線程數(shù)。從上方示例代碼中的第 1 處來看,必須大于或等于 1。如果待執(zhí)行的線程數(shù)大于此值,需要借助第 5 個(gè)參數(shù)的幫助,緩存在隊(duì)列中。如果 maximumPoolSize 與 corePoolSize相等,即是固定大小線程池。
???? 第 3 個(gè)參數(shù): keepAliveTime 表示線程池中的線程空閑時(shí)間,當(dāng)空閑時(shí)間達(dá)到 keepAliveTime 值時(shí),線程會(huì)被銷毀,直到只剩下 corePoolSize 個(gè)線程為止,避免浪費(fèi)內(nèi)存和句柄資源。在默認(rèn)情況下,當(dāng)線程池的線程數(shù)大于 corePoolSize 時(shí),keepAliveTime 才會(huì)起作用。但是當(dāng) ThreadPoolExecutor 的 allowCoreThreadTimeOut變量設(shè)置為 true 時(shí) , 核心線程超時(shí)后也會(huì)被回收。
???? 第 4 個(gè)參數(shù):TimeUnit 表示時(shí)間單位。 keepAliveTime 的時(shí)間單位通常是TimeUnit.SECONDS 。
???? 第 5 個(gè)參數(shù): workQueue 表示緩存隊(duì)列。當(dāng)請求的線程數(shù)大于 maximumPoolSize時(shí) , 線程進(jìn)入 BlockingQueue 阻塞隊(duì)列。后續(xù)示例代碼中使用的 LinkedBlockingQueue是單向鏈表,使用鎖來控制入隊(duì)和出隊(duì)的原子性,兩個(gè)鎖分別控制元素的添加和獲取,是一個(gè)生產(chǎn)消費(fèi)模型隊(duì)列。
???? 第 6 個(gè)參數(shù): threadFactory 表示線程工廠。它用來生產(chǎn)一組相同任務(wù)的線程。線程池的命名是通過給這個(gè) factory 增加組名前綴來實(shí)現(xiàn)的。在虛擬機(jī)棧分析時(shí),就可以知道線程任務(wù)是由哪個(gè)線程工廠產(chǎn)生的。
???? 第 7 個(gè)參數(shù): handler 表示執(zhí)行拒絕策略的對象。當(dāng)超過第 5 個(gè)參數(shù) workQueue的任務(wù)緩存區(qū)上限的時(shí)候,就可以通過該策略處理請求,這是 種簡單的限流保護(hù)。像某年雙十一沒有處理好訪問流量過載時(shí)的拒絕策略,導(dǎo)致內(nèi)部測試頁面被展示出來,使用戶手足無措。友好的拒絕策略可以是如下三種:
???? ( 1 )保存到數(shù)據(jù)庫進(jìn)行削峰填谷。在空間時(shí)再提取出來執(zhí)行。
???? ( 2 )轉(zhuǎn)向某個(gè)提示頁由。
???? ( 3 )打印日志。
???? 從代碼第 2 處來看 , 隊(duì)列、線程工廠、拒絕處理服務(wù)都必須有實(shí)例對象 , 但在實(shí)際編程中,很少有程序員對這三者進(jìn)行實(shí)例化,而通過 Executors 這個(gè)線程池靜態(tài)工廠提供默認(rèn)實(shí)現(xiàn) , 那么 Exceutors 與 ThreadPoolExecutor 是什么關(guān)系呢?線程池相關(guān)類圖如下圖所示。
execute() 與 submit()
execute為ThreadPoolExecutor里的方法,沒有返回值
/*** @param 線程任務(wù)* @throws RejectedExecutionException 如果無法創(chuàng)建任何狀態(tài)的線程任務(wù)* /
void execute (Runnable command );
submit為AbstractExecutorService里的方法,有返回值
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;
}
Executors 靜態(tài)工廠方法創(chuàng)建線程池
???? ExecutorService 接口繼承了 Executor 接口,定義了管理線程任務(wù)的方法。ExecutorService 的抽象類 AbstractExecutorService 提供了 submit() 、 invokeAll() 等部分方法的實(shí)現(xiàn) , 但是核心方法Executor.execute() 并沒有在這里實(shí)現(xiàn)。因?yàn)樗械娜蝿?wù)都在這個(gè)方法里執(zhí)行 , 不同實(shí)現(xiàn)會(huì)帶來不同的執(zhí)行策略,這點(diǎn)在后續(xù)的 ThreadPoolExecutor 解析時(shí) ,會(huì)一步步地分析。通過Executors 的靜態(tài)工廠方法可以創(chuàng)建三個(gè)線城池的包裝對象。 ForkJoinPool 、 ThreadPooIExecutor、ScheduledThreadPoolExecutor 。 Executors 核心的方法有五個(gè):
- Executors.newWorkStealingPool: JDK8 引人,創(chuàng)建持有足夠線程的線程池支持給定的并行度 , 并通過使用多個(gè)隊(duì)列減少競爭 , 此構(gòu)造方法中把 CPU 數(shù)量設(shè)置為默認(rèn)的并行度:
/*** Creates a work-stealing thread pool using all* {@link Runtime#availableProcessors available processors}* as its target parallelism level.* @return the newly created thread pool* @see #newWorkStealingPool(int)* @since 1.8*/public static ExecutorService newWorkStealingPool() {// 返回 ForkJoinPool(JDK7引入)對象,它也是 AbstractExecutorService 的子類return new ForkJoinPool(Runtime.getRuntime().availableProcessors(),ForkJoinPool.defaultForkJoinWorkerThreadFactory,null, true);}
- Executors.newCachedThreadPool: maximumPoolSize 最大可以至 Integer.MAX_VALUE, 是高度可伸縮的線程池 ,如果達(dá)到這個(gè)上限 , 相信沒有任何服務(wù)器能夠繼續(xù)工作,肯定會(huì)拋出 OOM 異常。 keepAliveTime 默認(rèn)為 60 秒,工作線程處于空閑狀態(tài) , 則回收工作線程。如果任務(wù)數(shù)增加 ,再次創(chuàng)建出新線程處理任務(wù)。
- Executors.newScheduledThreadPool:線程數(shù)最大至 Integer.MAX_VALUE,與上述相同,存在 OOM 風(fēng)險(xiǎn)。它是ScheduledExecutorService 接口家族的實(shí)現(xiàn)類,支持定時(shí)及周期性任務(wù)執(zhí)行。相 比 Timer , ScheduledExecutorService 更安全,功能更強(qiáng)大,與 newCachedThreadPool 的區(qū)別是不回收工作線程。
- Executors.newSingleThreadExecutor : 創(chuàng)建一個(gè)單線程的線程池,相當(dāng)于單線程串行執(zhí)行所有任務(wù),保證按任務(wù)的提交順序依次執(zhí)行。
- Executors.newFixedThreadPool : 輸入的參數(shù)即是固定線程數(shù),既是核心線程數(shù)也是最大線程數(shù) , 不存在空閑線程,所以keepAliveTime 等于 0:
/*** Creates a thread pool that reuses a fixed number of threads* operating off a shared unbounded queue. At any point, at most* {@code nThreads} threads will be active processing tasks.* If additional tasks are submitted when all threads are active,* they will wait in the queue until a thread is available.* If any thread terminates due to a failure during execution* prior to shutdown, a new one will take its place if needed to* execute subsequent tasks. The threads in the pool will exist* until it is explicitly {@link ExecutorService#shutdown shutdown}.** @param nThreads the number of threads in the pool* @return the newly created thread pool* @throws IllegalArgumentException if {@code nThreads <= 0}*/public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}
???? 這里 , 輸入的隊(duì)列沒有指明長度,下面介紹 LinkedBlockingQueue 的構(gòu)造方法:
/*** Creates a {@code LinkedBlockingQueue} with a capacity of* {@link Integer#MAX_VALUE}.*/public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}
???? 使用這樣的無界隊(duì)列, 如果瞬間請求非常大 , 會(huì)有 OOM 的風(fēng)險(xiǎn)。除newWorkStealingPool 外 ,其他四個(gè)創(chuàng)建方式都存在資源耗盡的風(fēng)險(xiǎn)。
自定義實(shí)現(xiàn)線程工廠 ThreadFactory
???? Executors 中默認(rèn)的線程工廠和拒絕策略過于簡單,通常對用戶不夠友好。線程工廠需要做創(chuàng)建前的準(zhǔn)備工作,對線程池創(chuàng)建的線程必須明確標(biāo)識(shí),就像藥品的生產(chǎn)批號一樣,為線程本身指定有意義的名稱和相應(yīng)的序列號。拒絕策略應(yīng)該考慮到業(yè)務(wù)場景,返回相應(yīng)的提示或者友好地跳轉(zhuǎn)。以下為簡單的 ThreadFactory 示例:
package com.example.demo.test1;import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;/*** @Author: Ron* @Create: 2023-04-24 15:07*/
public class UserThreadFactory implements ThreadFactory {private final String namePrefix;private final AtomicInteger nextId = new AtomicInteger(1);// 定義線程組名稱,在使用jstack來排查線程問題時(shí),可以通過線程組名稱來快速定位問題UserThreadFactory(String whatFeatureOfGroup) {namePrefix = "From UserThreadFactory's " + whatFeatureOfGroup + "-Worker-";}@Overridepublic Thread newThread(Runnable task) {String name = namePrefix + nextId.getAndIncrement();Thread thread = new Thread(null, task, name, 0);
// System.out.println(thread.getName());return thread;}
}
???? 上述示例包括線程工廠和任務(wù)執(zhí)體的定義 , 通過 newThread 方法快速、統(tǒng)一地創(chuàng)建線程任務(wù),強(qiáng)調(diào)線程一定要有特定意義的名稱,方便出錯(cuò)時(shí)回溯。
???? 如圖 7-5 所示為排查底層公共緩存調(diào)用出錯(cuò)時(shí)的截圖,綠色框采用自定義的線程工廠,明顯比藍(lán)色框默認(rèn)的線程工廠創(chuàng)建的線程名稱擁有更多的額外信息 。 如調(diào)用來源、線程的業(yè)務(wù)含義,有助于快速定位到死鎖、 StackOverflowError 等問題。
自定義線程池拒絕策略 實(shí)現(xiàn)RejectedExecutionHandler
???? 下面再簡單地實(shí)現(xiàn)一下 RejectedExecutionHandler,實(shí)現(xiàn)了接口的 rejectedExecution方法,打印出當(dāng)前線程池狀態(tài),源碼如下:
package com.example.demo.test1;import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;/*** @Author: Ron* @Create: 2023-04-24 15:48*/
public class UserRejectHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {System.out.println("Task rejected from " + executor.toString());// 可以將task存入隊(duì)列,等空閑時(shí)再去執(zhí)行}
}
線程池拒絕策略
在ThreadPoolExecutor 中提供了四個(gè)公開的內(nèi)部靜態(tài)類:
- AbortPolicy (默認(rèn)):丟棄任務(wù)并拋出 RejectedExecutionException 異常。
- DiscardPolicy : 丟棄任務(wù),但是不拋出異常 , 這是不推薦的做法。
- DiscardOldestPolicy : 拋棄隊(duì)列中等待最久的任務(wù) , 然后把當(dāng)前任務(wù)加入隊(duì)列中。
- CallerRunsPolicy :調(diào)用任務(wù)的 run() 方法繞過線程池直接執(zhí)行,使用調(diào)用線程執(zhí)行任務(wù)。
new ThreadPoolExecutor.AbortPolicy(); // 默認(rèn),隊(duì)列滿了丟棄任務(wù)并拋出異常new ThreadPoolExecutor.CallerRunsPolicy(); // 如果添加線城池失敗,使用調(diào)用線程執(zhí)行任務(wù)new ThreadPoolExecutor.DiscardOldestPolicy(); // 將最早進(jìn)入隊(duì)列的任務(wù)刪除,之后嘗試加入隊(duì)列new ThreadPoolExecutor.DiscardPolicy(); // 隊(duì)列滿了,丟棄任務(wù)不拋出異常
???? 根據(jù)之前實(shí)現(xiàn)的線程工廠和拒絕策略,線程池的相關(guān)代碼實(shí)現(xiàn)如下:
package com.example.demo.test1;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** @Author: Ron* @Create: 2023-04-24 15:59*/
public class UserThreadPool {public static void main(String[] args) {// 緩存隊(duì)列設(shè)置固定長度為2,為了快速觸發(fā) rejectHandlerBlockingQueue queue = new LinkedBlockingQueue(2);// 假設(shè)外部任務(wù)線程的來源由機(jī)房1和機(jī)房2的混合調(diào)用UserThreadFactory f1 = new UserThreadFactory("第1機(jī)房");UserThreadFactory f2 = new UserThreadFactory("第2機(jī)房");// 自定義拒絕策略UserRejectHandler handler = new UserRejectHandler();// 核心線程為1,最大線程為2,為了保證觸發(fā)rejectHandlerThreadPoolExecutor threadPoolFirst= new ThreadPoolExecutor(1, 2, 60,TimeUnit.SECONDS, queue, f1, handler);// 利用第二個(gè)線程工廠實(shí)力創(chuàng)建第二個(gè)線程池ThreadPoolExecutor threadPoolSecond= new ThreadPoolExecutor(1, 2, 60,TimeUnit.SECONDS, queue, f2, handler);// 創(chuàng)建400個(gè)任務(wù)線程Runnable task = new Task();for (int i = 0; i < 200; i++) {threadPoolFirst.execute(task);threadPoolSecond.execute(task);threadPoolSecond.submit(task);}}}
執(zhí)行結(jié)果如下:
當(dāng)任務(wù)被拒絕的時(shí)候,拒絕策略會(huì)打印出當(dāng)前線程池的大小已經(jīng)達(dá)到了maximumPoolSize=2 ,且隊(duì)列已滿,完成的任務(wù)數(shù)提示已經(jīng)有 4 個(gè)(最后一行)。
三、線程池源碼詳解
???? 在 ThreadPoolExecutor 的屬性定義中頻繁地用位移運(yùn)算來表示線程池狀態(tài), 位移運(yùn)算是改變當(dāng)前值的一種高效手段 , 包括左移與右移。下面從屬性定義開始閱讀ThreadPoolExecutor 的源碼。
// Integer共有32位,最右邊29位表示工作線程數(shù),最左邊3位表示線程池狀態(tài)// 注:簡單地說,3個(gè)二進(jìn)制可以表示從0到7共8個(gè)不同的數(shù)值(第1處)private static final int COUNT_BITS = Integer.SIZE - 3;// 000-11111...1111(29個(gè)1),類似于子網(wǎng)掩碼,用于位的與運(yùn)算// 得到左邊3位,還是右邊29位private static final int CAPACITY = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bits// 用左邊3位,實(shí)現(xiàn)5種線程池狀態(tài)。(在左3位之后加入中畫線有助于理解)// 111-00000...0000(29個(gè)0),十進(jìn)制值,-536,870,912// 此狀態(tài)表示線程池能接受新任務(wù)private static final int RUNNING = -1 << COUNT_BITS;// 000-00000...0000(29個(gè)0),十進(jìn)制值:0// 此狀態(tài)不再接受新任務(wù),但可以繼續(xù)執(zhí)行隊(duì)列中的任務(wù)private static final int SHUTDOWN = 0 << COUNT_BITS;// 001-00000...0000(29個(gè)0),十進(jìn)制值:536,870,912// 此狀態(tài)全面拒絕,并中斷正在處理的任務(wù)private static final int STOP = 1 << COUNT_BITS;// 010-00000...0000(29個(gè)0),十進(jìn)制值:1,073,741,824// 此狀態(tài)表示所有任務(wù)已經(jīng)被終止private static final int TIDYING = 2 << COUNT_BITS;// 011-00000...0000(29個(gè)0),十進(jìn)制值:1,610,612,736// 此狀態(tài)表示已清理完現(xiàn)場private static final int TERMINATED = 3 << COUNT_BITS;// Packing and unpacking ctl// 與運(yùn)算,比如 001-00000...0000(29個(gè)0),表示67個(gè)工作線程// 掩碼取反: 111-00000...0000(29個(gè)0),即得到左邊3位001// 表示線程池當(dāng)前出于STOP狀態(tài)private static int runStateOf(int c) { return c & ~CAPACITY; }// 同理掩碼 000-11111...1111(29個(gè)1),得到右邊29位,即工作線程數(shù)private static int workerCountOf(int c) { return c & CAPACITY; }// 把左邊3位與右邊29位按或運(yùn)算,合并成一個(gè)值private static int ctlOf(int rs, int wc) { return rs | wc; }
第 l :處說明 , 線程池的狀態(tài)用高 3 位表示,其中包括了符號位。五種狀態(tài)的
十進(jìn)制值按從小到大依次排序?yàn)?RUNNING < SHUTDOWN < STOP < TIDYING <
TERMINATED , 這樣設(shè)計(jì)的好處是可以通過比較值的大小來確定線程池的狀態(tài)。例
如程序中經(jīng)常會(huì)出現(xiàn) isRunning 的判斷
private static boolean isRunning(int c) {return c < SHUTDOWN;
}
我們都知道 Executor 接口有且只有一個(gè)方法 execute , 通過參數(shù)傳入待執(zhí)行線程的對象。下面分析 ThreadPoolExecutor 關(guān)于 execute 方法的實(shí)現(xiàn),
/*** Executes the given task sometime in the future. The task* may execute in a new thread or in an existing pooled thread.** If the task cannot be submitted for execution, either because this* executor has been shutdown or because its capacity has been reached,* the task is handled by the current {@code RejectedExecutionHandler}.** @param command the task to execute* @throws RejectedExecutionException at discretion of* {@code RejectedExecutionHandler}, if the task* cannot be accepted for execution* @throws NullPointerException if {@code command} is null*/public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/// 返回包含線程數(shù)及線程池狀態(tài)的Integer類型數(shù)值int c = ctl.get();// 如果工作線程數(shù)小于核心線程數(shù),則創(chuàng)建線程任務(wù)并執(zhí)行if (workerCountOf(c) < corePoolSize) {// addWorker是另一個(gè)極為重要的方法,見下一段源碼解析(第1處)if (addWorker(command, true))return;// 如果創(chuàng)建失敗,防止外部已經(jīng)在線程池中加入新任務(wù),重新獲取一下c = ctl.get();}// 只有線程池處于RUNNING狀態(tài),才執(zhí)行后半句:置入隊(duì)列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 如果線程池不是RUNNING狀態(tài),則將剛加入隊(duì)列的任務(wù)移除if (! isRunning(recheck) && remove(command))reject(command);// 如果之前的線程已被消費(fèi)完,新建一個(gè)線程else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 核心池和隊(duì)列都已滿,嘗試創(chuàng)建一個(gè)新線程else if (!addWorker(command, false))// 如果addWorker返回是false,即創(chuàng)建失敗,則喚醒拒絕策略(第2處)reject(command);}
第 1 處: execute 方法在不同的階段有三次 addWorker 的嘗試動(dòng)作。
第 2 處: 發(fā)生拒絕的理由有兩個(gè)
( 1 )線程池狀態(tài)為非 RUNNING狀態(tài)
( 2 )等待隊(duì)列己滿。
下面繼續(xù)分析 addWorker 方法的源碼:
/*** 根據(jù)當(dāng)前線程池狀態(tài),檢查是否可以添加新的任務(wù)線程,如果則創(chuàng)建并啟動(dòng)任務(wù)* 如果一切正常則返回true。返回false的可能性如下:* 1.線程池沒有處于RUNNING狀態(tài)* 2.線程工廠創(chuàng)建新的任務(wù)線程失敗* @param firstTask 外部啟動(dòng)線程池是需要構(gòu)造的第一個(gè)線程,他是線程的母體* @param core 新增工作線程時(shí)的判斷指標(biāo),解釋如下* true 表示新增工作線程時(shí),需要判斷當(dāng)前RUNNING狀態(tài)的線程是否少于corePoolSize* false 表示新增工作線程時(shí),需要判斷當(dāng)前RUNNING狀態(tài)的線程是否少于maximumPoolSize* @return*/private boolean addWorker(Runnable firstTask, boolean core) {// 不需要任務(wù)預(yù)定義的語法標(biāo)簽,響應(yīng)下文的continue retry,快速退出多層嵌套循環(huán)(第1處)retry:for (;;) {// 參考之前的狀態(tài)分析:如果RUNNING狀態(tài),則條件為假,不執(zhí)行后面的判斷// 如果是STOP及之上的狀態(tài),或者firstTask初始線程不為空,或者工作隊(duì)列不為空,// 都會(huì)直接返回創(chuàng)建失敗(第2處)int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))return false;for (;;) {// 如果超過最大允許線程數(shù)則不能再添加新的線程// 最大線程數(shù)不能超過2^29,否則影響左邊3位的線程池狀態(tài)值int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// 將當(dāng)前活動(dòng)線程數(shù)+1 (第3處)if (compareAndIncrementWorkerCount(c))break retry;// 線程池狀態(tài)和工作線程數(shù)是可變化的,需要經(jīng)常提取這個(gè)最新值c = ctl.get(); // Re-read ctl// 如果已經(jīng)關(guān)閉,則再次從retry標(biāo)簽處進(jìn)入,在第2處再做判斷 (第4處)if (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop// 如果線程還是處于RUNNING狀態(tài),那就在說明僅僅是第3處失敗// 繼續(xù)循環(huán)執(zhí)行 (第5處)}}// 開始創(chuàng)建工作線程boolean workerStarted = false;boolean workerAdded = false;ThreadPoolExecutor.Worker w = null;try {// 利用Worker構(gòu)造方法中的線程池工廠創(chuàng)建線程,并封裝成工作線程Worker對象w = new ThreadPoolExecutor.Worker(firstTask);// 注意這是Worker中的屬性對象thread (第6處)final Thread t = w.thread;if (t != null) {// 在進(jìn)行ThreadPoolExecutor的敏感操作時(shí)// 都需要持有主鎖,避免在添加和啟動(dòng)線程時(shí)被干擾final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());// 當(dāng)線程池狀態(tài)為RUNNING或SHUTDOWN// 且firstTask初始線程為空時(shí)if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();// 整個(gè)線程池在運(yùn)行期間的最大并發(fā)任務(wù)個(gè)數(shù)if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {// 終于看到親切的start方法// 注意,并非線程池的execute的command參數(shù)指向的線程t.start();workerStarted = true;}}} finally {if (! workerStarted)// 線程啟動(dòng)失敗,把剛才第3處加上的工作線程計(jì)數(shù)再減回去addWorkerFailed(w);}return workerStarted;}
這段代碼晦澀難懂,部分地方甚至違反了代碼規(guī)約,但其中蘊(yùn)含的豐富的編碼知識(shí)點(diǎn)值得我們?nèi)W(xué)習(xí),下面接序號來依次講解。
第 1 處, 配合循環(huán)語旬出現(xiàn)的 label ,類似于 goto 作用。 label 定義時(shí),必須把標(biāo)簽和冒號的組合語旬緊緊相鄰定義在循環(huán)體之前,否則會(huì)編譯出錯(cuò)。目的是在實(shí)現(xiàn)多重循環(huán)時(shí)能夠快速退出到任何一層。這種做法的出發(fā)點(diǎn)似乎非常貼心,但是在大型軟件項(xiàng)目中 ,濫用標(biāo)簽行跳轉(zhuǎn)的后果將是災(zāi)難性的。示例代碼中,在 retry 下方有兩個(gè)無限循環(huán),在 workerCount 加 1 成功后,直接退出兩層循環(huán)。
第 2 處,這樣的表達(dá)式不利于代碼閱讀,應(yīng)該改成:
Boolean isNotAllowedToCreateTask= runStateLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP)|| firstTask != null || workQueue.isEmpty());
if (isNotAllowedToCreateTask) {// ...
}
第 3 處, 與第 1 處的標(biāo)簽呼應(yīng) , Atomiclnteger 對象的加1操作是原子性的。break retry 表示直接跳出與 retry 相鄰的這個(gè)循環(huán)體。
第 4 處,此 continue 跳轉(zhuǎn)至標(biāo)簽處,繼續(xù)執(zhí)行循環(huán)。如果條件為假,則說明線程池還處于運(yùn)行狀態(tài),即繼續(xù)在 for( ;;)循環(huán)內(nèi)執(zhí)行。
第 5 處, compareAndlncrementWorkerCount 方法執(zhí)行失敗的概率非常低。即使失敗 , 再次執(zhí)行時(shí)成功的概率也是極高的,類似于自旋鎖原理。這里的處理邏輯是先加 1,創(chuàng)建失敗再減1,這是輕量處理并發(fā)創(chuàng)建線程的方式。如果先創(chuàng)建線程,成功再加1,當(dāng)發(fā)現(xiàn)超出限制后再銷毀線程,那么這樣的處理方式明顯比前者代價(jià)要大。
第 6 處 , Worker 對象是工作線程的核心類實(shí)現(xiàn),部分源碼如下:
/*** 它實(shí)現(xiàn)Runnable接口,并把本對象作為參數(shù)輸入給run()方法中的runWorker(this),* 所以內(nèi)部屬性線程thread在start的時(shí)候,即會(huì)調(diào)用runWorker方法*/private final class Worker extends AbstractQueuedSynchronizer implements Runnable {/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*/Worker(Runnable firstTask) {// 它是AbstractQueuedSynchronizer的方法// 在runWorker方法執(zhí)行之前禁止線程被中斷setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}// 當(dāng)thread被start()之后,執(zhí)行runWorker的方法/** Delegates main run loop to outer runWorker */public void run() {runWorker(this);}}
總結(jié)
???? 線程池的相關(guān)源碼比較精煉,還包括線程池的銷毀、任務(wù)提取和消費(fèi)等,與線程狀態(tài)圖一樣,線程池也有自己獨(dú)立的狀態(tài)轉(zhuǎn)化流程,本節(jié)不再展開??偨Y(jié)一下,使用線程池要注意如下幾點(diǎn):
???? ( 1 )合理設(shè)置各類參數(shù),應(yīng)根據(jù)實(shí)際業(yè)務(wù)場景來設(shè)置合理的工作線程數(shù)。
???? ( 2 )線程資源必須通過線城池提供,不允許在應(yīng)用中自行顯式創(chuàng)建線程。
???? ( 3 )創(chuàng)建線程或線城池時(shí)請指定有意義的線程名稱,方便出錯(cuò)時(shí)回溯。
???? 線程池不允許使用 Executors ,而是通過 ThreadPoolExecutor 的方式創(chuàng)建 ,這樣的處理方式能更加明確線程池的運(yùn)行規(guī)則,規(guī)避資源耗盡的風(fēng)險(xiǎn)。