中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

專業(yè)的網(wǎng)站建設(shè)哪家快業(yè)務(wù)員用什么軟件找客戶

專業(yè)的網(wǎng)站建設(shè)哪家快,業(yè)務(wù)員用什么軟件找客戶,如何做轉(zhuǎn)運(yùn)網(wǎng)站,制作 網(wǎng)頁碼出高效:Java開發(fā)手冊筆記(線程池及其源碼) 碼出高效:Java開發(fā)手冊筆記(線程池及其源碼) 碼出高效:Java開發(fā)手冊筆記(線程池及其源碼)前言一、線程池的作用線程的生命周…

碼出高效:Java開發(fā)手冊筆記(線程池及其源碼)


碼出高效:Java開發(fā)手冊筆記(線程池及其源碼)

  • 碼出高效:Java開發(fā)手冊筆記(線程池及其源碼)
  • 前言
  • 一、線程池的作用
    • 線程的生命周期
  • 二、線程池7大參數(shù)
    • execute() 與 submit()
    • Executors 靜態(tài)工廠方法創(chuàng)建線程池
    • 自定義實(shí)現(xiàn)線程工廠 ThreadFactory
    • 自定義線程池拒絕策略 實(shí)現(xiàn)*RejectedExecutionHandler*
    • 線程池拒絕策略
  • 三、線程池源碼詳解
  • 總結(jié)


前言

線程池


一、線程池的作用

線程使應(yīng)用能夠更加充分合理地協(xié)調(diào)利用 CPU、內(nèi)存、網(wǎng)絡(luò)、 I/O 等系統(tǒng)資源。線程的創(chuàng)建需要開辟虛擬機(jī)棧、本地方法棧、程序計(jì)數(shù)器等線程私有的內(nèi)存空間。在線程銷毀時(shí)需要回收這些系統(tǒng)資源。頻繁地創(chuàng)建和銷毀線程會(huì)浪費(fèi)大量的系統(tǒng)資源,增加并發(fā)編程風(fēng)險(xiǎn)。另外,在服務(wù)器負(fù)載過大的時(shí)候,如何讓新的線程等待或者友好地拒絕服務(wù)?這些都是線程自身無法解決的。所以需要通過線程池協(xié)調(diào)多個(gè)線程 , 并實(shí)現(xiàn)類似主次線程隔離、定時(shí)執(zhí)行、周期執(zhí)行等任務(wù)。線程池的作用包括:

  • 利用線程池管理并復(fù)用線程、控制最大并發(fā)數(shù)等。
  • 實(shí)現(xiàn)任務(wù)線程隊(duì)列緩存策略和拒絕機(jī)制。
  • 實(shí)現(xiàn)某些與時(shí)間相關(guān)的功能,如定時(shí)執(zhí)行、周期執(zhí)行等。
  • 隔離線程環(huán)境。比如,交易服務(wù)和搜索服務(wù)在同一臺(tái)服務(wù)器上,分別開啟兩個(gè)線程池,交易線程的資源消耗明顯要大;因此,通過配置獨(dú)立的線程池 ,將較慢的交易服務(wù)與搜索服務(wù)隔離開,避免各服務(wù)線程相互影響。

線程的生命周期

在這里插入圖片描述
線程池重要的參數(shù),ctl是一個(gè)包含兩個(gè)屬性的原子整型;一個(gè)屬性是 workerCount,指的是有效線程數(shù);另一個(gè)是 runState,指的是線程的狀態(tài)(線程的生命周期)

/*** The workerCount is the number of workers that have been* permitted to start and not permitted to stop.  The value may be* transiently different from the actual number of live threads,* for example when a ThreadFactory fails to create a thread when* asked, and when exiting threads are still performing* bookkeeping before terminating. The user-visible pool size is* reported as the current size of the workers set.*/
/*** The runState provides the main lifecycle control, taking on values:**   RUNNING:  Accept new tasks and process queued tasks*   SHUTDOWN: Don't accept new tasks, but process queued tasks*   STOP:     Don't accept new tasks, don't process queued tasks,*             and interrupt in-progress tasks*   TIDYING:  All tasks have terminated, workerCount is zero,*             the thread transitioning to state TIDYING*             will run the terminated() hook method*   TERMINATED: terminated() has completed*/// runState is stored in the high-order bitsprivate static final int RUNNING    = -1 << COUNT_BITS;private static final int SHUTDOWN   =  0 << COUNT_BITS;private static final int STOP       =  1 << COUNT_BITS;private static final int TIDYING    =  2 << COUNT_BITS;private static final int TERMINATED =  3 << COUNT_BITS;

在這里插入圖片描述

Thread類中的線程生命周期

public enum State {/*** Thread state for a thread which has not yet started.*/NEW,/*** Thread state for a runnable thread.  A thread in the runnable* state is executing in the Java virtual machine but it may* be waiting for other resources from the operating system* such as processor.*/RUNNABLE,/*** Thread state for a thread blocked waiting for a monitor lock.* A thread in the blocked state is waiting for a monitor lock* to enter a synchronized block/method or* reenter a synchronized block/method after calling* {@link Object#wait() Object.wait}.*/BLOCKED,/*** Thread state for a waiting thread.* A thread is in the waiting state due to calling one of the* following methods:* <ul>*   <li>{@link Object#wait() Object.wait} with no timeout</li>*   <li>{@link #join() Thread.join} with no timeout</li>*   <li>{@link LockSupport#park() LockSupport.park}</li>* </ul>** <p>A thread in the waiting state is waiting for another thread to* perform a particular action.** For example, a thread that has called <tt>Object.wait()</tt>* on an object is waiting for another thread to call* <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on* that object. A thread that has called <tt>Thread.join()</tt>* is waiting for a specified thread to terminate.*/WAITING,/*** Thread state for a waiting thread with a specified waiting time.* A thread is in the timed waiting state due to calling one of* the following methods with a specified positive waiting time:* <ul>*   <li>{@link #sleep Thread.sleep}</li>*   <li>{@link Object#wait(long) Object.wait} with timeout</li>*   <li>{@link #join(long) Thread.join} with timeout</li>*   <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>*   <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>* </ul>*/TIMED_WAITING,/*** Thread state for a terminated thread.* The thread has completed execution.*/TERMINATED;
}

在了解線程池的基本作用后,我們學(xué)習(xí)一下線程池是如何創(chuàng)建線程的。首先從 ThreadPoolExecutor 構(gòu)造方法講起 , 學(xué)習(xí)如何自定義 ThreadFactory 和RejectedExecutionHandler , 并編寫一個(gè)最簡單的線程池示例。然后,通過分析ThreadPoolExecutor 的 execute 和 addWorker 兩個(gè)核心方法,學(xué)習(xí)如何把任務(wù)線程加入到線程池中運(yùn)行。 ThreadPoolExecutor 的構(gòu)造方法如下:
在這里插入圖片描述

二、線程池7大參數(shù)

public ThreadPoolExecutor(int corePoolSize,						//(第一個(gè)參數(shù))int maximumPoolSize,					//(第二個(gè)參數(shù))long keepAliveTime,					//(第三個(gè)參數(shù))TimeUnit unit,						//(第四個(gè)參數(shù))BlockingQueue<Runnable> workQueue,	//(第五個(gè)參數(shù))ThreadFactory threadFactory,			//(第六個(gè)參數(shù))RejectedExecutionHandler handler) {	//(第七個(gè)參數(shù))if (corePoolSize < 0 ||// maximumPoolSize必須大于或等于1也要大于或等于corePoolSize (第1處)maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();// (第2處)if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;
}

???? 第 1 個(gè)參數(shù):corePoolSize 表示常駐核心線程數(shù)。如果等于 0 ,則任務(wù)執(zhí)行完之后 ,沒有任何請求進(jìn)入時(shí)銷毀線程池的線程;如果大于 0 ,即使本地任務(wù)執(zhí)行完畢,核心線程也不會(huì)被銷毀。這個(gè)值的設(shè)置非常關(guān)鍵,設(shè)置過大會(huì)浪費(fèi)資源,設(shè)置過小會(huì)導(dǎo)致線程頻繁地創(chuàng)建或銷毀。
???? 第 2 個(gè)參數(shù): maximumPoolSize 表示線程池能夠容納同時(shí)執(zhí)行的最大線程數(shù)。從上方示例代碼中的第 1 處來看,必須大于或等于 1。如果待執(zhí)行的線程數(shù)大于此值,需要借助第 5 個(gè)參數(shù)的幫助,緩存在隊(duì)列中。如果 maximumPoolSize 與 corePoolSize相等,即是固定大小線程池。
???? 第 3 個(gè)參數(shù): keepAliveTime 表示線程池中的線程空閑時(shí)間,當(dāng)空閑時(shí)間達(dá)到 keepAliveTime 值時(shí),線程會(huì)被銷毀,直到只剩下 corePoolSize 個(gè)線程為止,避免浪費(fèi)內(nèi)存和句柄資源。在默認(rèn)情況下,當(dāng)線程池的線程數(shù)大于 corePoolSize 時(shí),keepAliveTime 才會(huì)起作用。但是當(dāng) ThreadPoolExecutor 的 allowCoreThreadTimeOut變量設(shè)置為 true 時(shí) , 核心線程超時(shí)后也會(huì)被回收。
???? 第 4 個(gè)參數(shù):TimeUnit 表示時(shí)間單位。 keepAliveTime 的時(shí)間單位通常是TimeUnit.SECONDS 。
???? 第 5 個(gè)參數(shù): workQueue 表示緩存隊(duì)列。當(dāng)請求的線程數(shù)大于 maximumPoolSize時(shí) , 線程進(jìn)入 BlockingQueue 阻塞隊(duì)列。后續(xù)示例代碼中使用的 LinkedBlockingQueue是單向鏈表,使用鎖來控制入隊(duì)和出隊(duì)的原子性,兩個(gè)鎖分別控制元素的添加和獲取,是一個(gè)生產(chǎn)消費(fèi)模型隊(duì)列。
???? 第 6 個(gè)參數(shù): threadFactory 表示線程工廠。它用來生產(chǎn)一組相同任務(wù)的線程。線程池的命名是通過給這個(gè) factory 增加組名前綴來實(shí)現(xiàn)的。在虛擬機(jī)棧分析時(shí),就可以知道線程任務(wù)是由哪個(gè)線程工廠產(chǎn)生的。
???? 第 7 個(gè)參數(shù): handler 表示執(zhí)行拒絕策略的對象。當(dāng)超過第 5 個(gè)參數(shù) workQueue的任務(wù)緩存區(qū)上限的時(shí)候,就可以通過該策略處理請求,這是 種簡單的限流保護(hù)。像某年雙十一沒有處理好訪問流量過載時(shí)的拒絕策略,導(dǎo)致內(nèi)部測試頁面被展示出來,使用戶手足無措。友好的拒絕策略可以是如下三種:
???? ( 1 )保存到數(shù)據(jù)庫進(jìn)行削峰填谷。在空間時(shí)再提取出來執(zhí)行。
???? ( 2 )轉(zhuǎn)向某個(gè)提示頁由。
???? ( 3 )打印日志。

???? 從代碼第 2 處來看 , 隊(duì)列、線程工廠、拒絕處理服務(wù)都必須有實(shí)例對象 , 但在實(shí)際編程中,很少有程序員對這三者進(jìn)行實(shí)例化,而通過 Executors 這個(gè)線程池靜態(tài)工廠提供默認(rèn)實(shí)現(xiàn) , 那么 Exceutors 與 ThreadPoolExecutor 是什么關(guān)系呢?線程池相關(guān)類圖如下圖所示。
在這里插入圖片描述

execute() 與 submit()

execute為ThreadPoolExecutor里的方法,沒有返回值

/*** @param 線程任務(wù)* @throws RejectedExecutionException 如果無法創(chuàng)建任何狀態(tài)的線程任務(wù)* /
void execute (Runnable command );

submit為AbstractExecutorService里的方法,有返回值

public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;
}

Executors 靜態(tài)工廠方法創(chuàng)建線程池

???? ExecutorService 接口繼承了 Executor 接口,定義了管理線程任務(wù)的方法。ExecutorService 的抽象類 AbstractExecutorService 提供了 submit() 、 invokeAll() 等部分方法的實(shí)現(xiàn) , 但是核心方法Executor.execute() 并沒有在這里實(shí)現(xiàn)。因?yàn)樗械娜蝿?wù)都在這個(gè)方法里執(zhí)行 , 不同實(shí)現(xiàn)會(huì)帶來不同的執(zhí)行策略,這點(diǎn)在后續(xù)的 ThreadPoolExecutor 解析時(shí) ,會(huì)一步步地分析。通過Executors 的靜態(tài)工廠方法可以創(chuàng)建三個(gè)線城池的包裝對象。 ForkJoinPool 、 ThreadPooIExecutor、ScheduledThreadPoolExecutor 。 Executors 核心的方法有五個(gè):

  • Executors.newWorkStealingPool: JDK8 引人,創(chuàng)建持有足夠線程的線程池支持給定的并行度 , 并通過使用多個(gè)隊(duì)列減少競爭 , 此構(gòu)造方法中把 CPU 數(shù)量設(shè)置為默認(rèn)的并行度:
    /*** Creates a work-stealing thread pool using all* {@link Runtime#availableProcessors available processors}* as its target parallelism level.* @return the newly created thread pool* @see #newWorkStealingPool(int)* @since 1.8*/public static ExecutorService newWorkStealingPool() {// 返回 ForkJoinPool(JDK7引入)對象,它也是 AbstractExecutorService 的子類return new ForkJoinPool(Runtime.getRuntime().availableProcessors(),ForkJoinPool.defaultForkJoinWorkerThreadFactory,null, true);}
  • Executors.newCachedThreadPool: maximumPoolSize 最大可以至 Integer.MAX_VALUE, 是高度可伸縮的線程池 ,如果達(dá)到這個(gè)上限 , 相信沒有任何服務(wù)器能夠繼續(xù)工作,肯定會(huì)拋出 OOM 異常。 keepAliveTime 默認(rèn)為 60 秒,工作線程處于空閑狀態(tài) , 則回收工作線程。如果任務(wù)數(shù)增加 ,再次創(chuàng)建出新線程處理任務(wù)。
  • Executors.newScheduledThreadPool:線程數(shù)最大至 Integer.MAX_VALUE,與上述相同,存在 OOM 風(fēng)險(xiǎn)。它是ScheduledExecutorService 接口家族的實(shí)現(xiàn)類,支持定時(shí)及周期性任務(wù)執(zhí)行。相 比 Timer , ScheduledExecutorService 更安全,功能更強(qiáng)大,與 newCachedThreadPool 的區(qū)別是不回收工作線程。
  • Executors.newSingleThreadExecutor : 創(chuàng)建一個(gè)單線程的線程池,相當(dāng)于單線程串行執(zhí)行所有任務(wù),保證按任務(wù)的提交順序依次執(zhí)行。
  • Executors.newFixedThreadPool : 輸入的參數(shù)即是固定線程數(shù),既是核心線程數(shù)也是最大線程數(shù) , 不存在空閑線程,所以keepAliveTime 等于 0:
    /*** Creates a thread pool that reuses a fixed number of threads* operating off a shared unbounded queue.  At any point, at most* {@code nThreads} threads will be active processing tasks.* If additional tasks are submitted when all threads are active,* they will wait in the queue until a thread is available.* If any thread terminates due to a failure during execution* prior to shutdown, a new one will take its place if needed to* execute subsequent tasks.  The threads in the pool will exist* until it is explicitly {@link ExecutorService#shutdown shutdown}.** @param nThreads the number of threads in the pool* @return the newly created thread pool* @throws IllegalArgumentException if {@code nThreads <= 0}*/public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}

???? 這里 , 輸入的隊(duì)列沒有指明長度,下面介紹 LinkedBlockingQueue 的構(gòu)造方法:

    /*** Creates a {@code LinkedBlockingQueue} with a capacity of* {@link Integer#MAX_VALUE}.*/public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}

???? 使用這樣的無界隊(duì)列, 如果瞬間請求非常大 , 會(huì)有 OOM 的風(fēng)險(xiǎn)。除newWorkStealingPool 外 ,其他四個(gè)創(chuàng)建方式都存在資源耗盡的風(fēng)險(xiǎn)。

自定義實(shí)現(xiàn)線程工廠 ThreadFactory

???? Executors 中默認(rèn)的線程工廠和拒絕策略過于簡單,通常對用戶不夠友好。線程工廠需要做創(chuàng)建前的準(zhǔn)備工作,對線程池創(chuàng)建的線程必須明確標(biāo)識(shí),就像藥品的生產(chǎn)批號一樣,為線程本身指定有意義的名稱和相應(yīng)的序列號。拒絕策略應(yīng)該考慮到業(yè)務(wù)場景,返回相應(yīng)的提示或者友好地跳轉(zhuǎn)。以下為簡單的 ThreadFactory 示例:

package com.example.demo.test1;import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;/*** @Author: Ron* @Create: 2023-04-24 15:07*/
public class UserThreadFactory implements ThreadFactory {private final String namePrefix;private final AtomicInteger nextId = new AtomicInteger(1);// 定義線程組名稱,在使用jstack來排查線程問題時(shí),可以通過線程組名稱來快速定位問題UserThreadFactory(String whatFeatureOfGroup) {namePrefix = "From UserThreadFactory's " + whatFeatureOfGroup + "-Worker-";}@Overridepublic Thread newThread(Runnable task) {String name = namePrefix + nextId.getAndIncrement();Thread thread = new Thread(null, task, name, 0);
//        System.out.println(thread.getName());return thread;}
}

???? 上述示例包括線程工廠和任務(wù)執(zhí)體的定義 , 通過 newThread 方法快速、統(tǒng)一地創(chuàng)建線程任務(wù),強(qiáng)調(diào)線程一定要有特定意義的名稱,方便出錯(cuò)時(shí)回溯。
???? 如圖 7-5 所示為排查底層公共緩存調(diào)用出錯(cuò)時(shí)的截圖,綠色框采用自定義的線程工廠,明顯比藍(lán)色框默認(rèn)的線程工廠創(chuàng)建的線程名稱擁有更多的額外信息 。 如調(diào)用來源、線程的業(yè)務(wù)含義,有助于快速定位到死鎖、 StackOverflowError 等問題。
在這里插入圖片描述

自定義線程池拒絕策略 實(shí)現(xiàn)RejectedExecutionHandler

???? 下面再簡單地實(shí)現(xiàn)一下 RejectedExecutionHandler,實(shí)現(xiàn)了接口的 rejectedExecution方法,打印出當(dāng)前線程池狀態(tài),源碼如下:

package com.example.demo.test1;import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;/*** @Author: Ron* @Create: 2023-04-24 15:48*/
public class UserRejectHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {System.out.println("Task rejected from " + executor.toString());// 可以將task存入隊(duì)列,等空閑時(shí)再去執(zhí)行}
}

線程池拒絕策略

在ThreadPoolExecutor 中提供了四個(gè)公開的內(nèi)部靜態(tài)類:

  • AbortPolicy (默認(rèn)):丟棄任務(wù)并拋出 RejectedExecutionException 異常。
  • DiscardPolicy : 丟棄任務(wù),但是不拋出異常 , 這是不推薦的做法。
  • DiscardOldestPolicy : 拋棄隊(duì)列中等待最久的任務(wù) , 然后把當(dāng)前任務(wù)加入隊(duì)列中。
  • CallerRunsPolicy :調(diào)用任務(wù)的 run() 方法繞過線程池直接執(zhí)行,使用調(diào)用線程執(zhí)行任務(wù)。
 new ThreadPoolExecutor.AbortPolicy(); // 默認(rèn),隊(duì)列滿了丟棄任務(wù)并拋出異常new ThreadPoolExecutor.CallerRunsPolicy(); // 如果添加線城池失敗,使用調(diào)用線程執(zhí)行任務(wù)new ThreadPoolExecutor.DiscardOldestPolicy(); // 將最早進(jìn)入隊(duì)列的任務(wù)刪除,之后嘗試加入隊(duì)列new ThreadPoolExecutor.DiscardPolicy(); // 隊(duì)列滿了,丟棄任務(wù)不拋出異常

???? 根據(jù)之前實(shí)現(xiàn)的線程工廠和拒絕策略,線程池的相關(guān)代碼實(shí)現(xiàn)如下:

package com.example.demo.test1;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** @Author: Ron* @Create: 2023-04-24 15:59*/
public class UserThreadPool {public static void main(String[] args) {// 緩存隊(duì)列設(shè)置固定長度為2,為了快速觸發(fā) rejectHandlerBlockingQueue queue = new LinkedBlockingQueue(2);// 假設(shè)外部任務(wù)線程的來源由機(jī)房1和機(jī)房2的混合調(diào)用UserThreadFactory f1 = new UserThreadFactory("第1機(jī)房");UserThreadFactory f2 = new UserThreadFactory("第2機(jī)房");// 自定義拒絕策略UserRejectHandler handler = new UserRejectHandler();// 核心線程為1,最大線程為2,為了保證觸發(fā)rejectHandlerThreadPoolExecutor threadPoolFirst= new ThreadPoolExecutor(1, 2, 60,TimeUnit.SECONDS, queue, f1, handler);// 利用第二個(gè)線程工廠實(shí)力創(chuàng)建第二個(gè)線程池ThreadPoolExecutor threadPoolSecond= new ThreadPoolExecutor(1, 2, 60,TimeUnit.SECONDS, queue, f2, handler);// 創(chuàng)建400個(gè)任務(wù)線程Runnable task = new Task();for (int i = 0; i < 200; i++) {threadPoolFirst.execute(task);threadPoolSecond.execute(task);threadPoolSecond.submit(task);}}}

執(zhí)行結(jié)果如下:
在這里插入圖片描述

當(dāng)任務(wù)被拒絕的時(shí)候,拒絕策略會(huì)打印出當(dāng)前線程池的大小已經(jīng)達(dá)到了maximumPoolSize=2 ,且隊(duì)列已滿,完成的任務(wù)數(shù)提示已經(jīng)有 4 個(gè)(最后一行)。

三、線程池源碼詳解

???? 在 ThreadPoolExecutor 的屬性定義中頻繁地用位移運(yùn)算來表示線程池狀態(tài), 位移運(yùn)算是改變當(dāng)前值的一種高效手段 , 包括左移與右移。下面從屬性定義開始閱讀ThreadPoolExecutor 的源碼。

	// Integer共有32位,最右邊29位表示工作線程數(shù),最左邊3位表示線程池狀態(tài)// 注:簡單地說,3個(gè)二進(jìn)制可以表示從0到7共8個(gè)不同的數(shù)值(第1處)private static final int COUNT_BITS = Integer.SIZE - 3;// 000-11111...1111(29個(gè)1),類似于子網(wǎng)掩碼,用于位的與運(yùn)算// 得到左邊3位,還是右邊29位private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bits// 用左邊3位,實(shí)現(xiàn)5種線程池狀態(tài)。(在左3位之后加入中畫線有助于理解)// 111-00000...0000(29個(gè)0),十進(jìn)制值,-536,870,912// 此狀態(tài)表示線程池能接受新任務(wù)private static final int RUNNING    = -1 << COUNT_BITS;// 000-00000...0000(29個(gè)0),十進(jìn)制值:0// 此狀態(tài)不再接受新任務(wù),但可以繼續(xù)執(zhí)行隊(duì)列中的任務(wù)private static final int SHUTDOWN   =  0 << COUNT_BITS;// 001-00000...0000(29個(gè)0),十進(jìn)制值:536,870,912// 此狀態(tài)全面拒絕,并中斷正在處理的任務(wù)private static final int STOP       =  1 << COUNT_BITS;// 010-00000...0000(29個(gè)0),十進(jìn)制值:1,073,741,824// 此狀態(tài)表示所有任務(wù)已經(jīng)被終止private static final int TIDYING    =  2 << COUNT_BITS;// 011-00000...0000(29個(gè)0),十進(jìn)制值:1,610,612,736// 此狀態(tài)表示已清理完現(xiàn)場private static final int TERMINATED =  3 << COUNT_BITS;// Packing and unpacking ctl// 與運(yùn)算,比如 001-00000...0000(29個(gè)0),表示67個(gè)工作線程// 掩碼取反:   111-00000...0000(29個(gè)0),即得到左邊3位001// 表示線程池當(dāng)前出于STOP狀態(tài)private static int runStateOf(int c)     { return c & ~CAPACITY; }// 同理掩碼 000-11111...1111(29個(gè)1),得到右邊29位,即工作線程數(shù)private static int workerCountOf(int c)  { return c & CAPACITY; }// 把左邊3位與右邊29位按或運(yùn)算,合并成一個(gè)值private static int ctlOf(int rs, int wc) { return rs | wc; }

第 l :處說明 , 線程池的狀態(tài)用高 3 位表示,其中包括了符號位。五種狀態(tài)的
十進(jìn)制值按從小到大依次排序?yàn)?RUNNING < SHUTDOWN < STOP < TIDYING <
TERMINATED , 這樣設(shè)計(jì)的好處是可以通過比較值的大小來確定線程池的狀態(tài)。例
如程序中經(jīng)常會(huì)出現(xiàn) isRunning 的判斷

private static boolean isRunning(int c) {return c < SHUTDOWN;
}

我們都知道 Executor 接口有且只有一個(gè)方法 execute , 通過參數(shù)傳入待執(zhí)行線程的對象。下面分析 ThreadPoolExecutor 關(guān)于 execute 方法的實(shí)現(xiàn),

	/*** Executes the given task sometime in the future.  The task* may execute in a new thread or in an existing pooled thread.** If the task cannot be submitted for execution, either because this* executor has been shutdown or because its capacity has been reached,* the task is handled by the current {@code RejectedExecutionHandler}.** @param command the task to execute* @throws RejectedExecutionException at discretion of*         {@code RejectedExecutionHandler}, if the task*         cannot be accepted for execution* @throws NullPointerException if {@code command} is null*/public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task.  The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread.  If it fails, we know we are shut down or saturated* and so reject the task.*/// 返回包含線程數(shù)及線程池狀態(tài)的Integer類型數(shù)值int c = ctl.get();// 如果工作線程數(shù)小于核心線程數(shù),則創(chuàng)建線程任務(wù)并執(zhí)行if (workerCountOf(c) < corePoolSize) {// addWorker是另一個(gè)極為重要的方法,見下一段源碼解析(第1處)if (addWorker(command, true))return;// 如果創(chuàng)建失敗,防止外部已經(jīng)在線程池中加入新任務(wù),重新獲取一下c = ctl.get();}// 只有線程池處于RUNNING狀態(tài),才執(zhí)行后半句:置入隊(duì)列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 如果線程池不是RUNNING狀態(tài),則將剛加入隊(duì)列的任務(wù)移除if (! isRunning(recheck) && remove(command))reject(command);// 如果之前的線程已被消費(fèi)完,新建一個(gè)線程else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 核心池和隊(duì)列都已滿,嘗試創(chuàng)建一個(gè)新線程else if (!addWorker(command, false))// 如果addWorker返回是false,即創(chuàng)建失敗,則喚醒拒絕策略(第2處)reject(command);}

第 1 處: execute 方法在不同的階段有三次 addWorker 的嘗試動(dòng)作。
第 2 處: 發(fā)生拒絕的理由有兩個(gè)
( 1 )線程池狀態(tài)為非 RUNNING狀態(tài)
( 2 )等待隊(duì)列己滿。

下面繼續(xù)分析 addWorker 方法的源碼:

    /***  根據(jù)當(dāng)前線程池狀態(tài),檢查是否可以添加新的任務(wù)線程,如果則創(chuàng)建并啟動(dòng)任務(wù)*  如果一切正常則返回true。返回false的可能性如下:*  1.線程池沒有處于RUNNING狀態(tài)*  2.線程工廠創(chuàng)建新的任務(wù)線程失敗* @param firstTask 外部啟動(dòng)線程池是需要構(gòu)造的第一個(gè)線程,他是線程的母體* @param core 新增工作線程時(shí)的判斷指標(biāo),解釋如下*             true 表示新增工作線程時(shí),需要判斷當(dāng)前RUNNING狀態(tài)的線程是否少于corePoolSize*             false 表示新增工作線程時(shí),需要判斷當(dāng)前RUNNING狀態(tài)的線程是否少于maximumPoolSize* @return*/private boolean addWorker(Runnable firstTask, boolean core) {// 不需要任務(wù)預(yù)定義的語法標(biāo)簽,響應(yīng)下文的continue retry,快速退出多層嵌套循環(huán)(第1處)retry:for (;;) {// 參考之前的狀態(tài)分析:如果RUNNING狀態(tài),則條件為假,不執(zhí)行后面的判斷// 如果是STOP及之上的狀態(tài),或者firstTask初始線程不為空,或者工作隊(duì)列不為空,// 都會(huì)直接返回創(chuàng)建失敗(第2處)int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))return false;for (;;) {// 如果超過最大允許線程數(shù)則不能再添加新的線程// 最大線程數(shù)不能超過2^29,否則影響左邊3位的線程池狀態(tài)值int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// 將當(dāng)前活動(dòng)線程數(shù)+1 (第3處)if (compareAndIncrementWorkerCount(c))break retry;// 線程池狀態(tài)和工作線程數(shù)是可變化的,需要經(jīng)常提取這個(gè)最新值c = ctl.get();  // Re-read ctl// 如果已經(jīng)關(guān)閉,則再次從retry標(biāo)簽處進(jìn)入,在第2處再做判斷 (第4處)if (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop// 如果線程還是處于RUNNING狀態(tài),那就在說明僅僅是第3處失敗// 繼續(xù)循環(huán)執(zhí)行 (第5處)}}// 開始創(chuàng)建工作線程boolean workerStarted = false;boolean workerAdded = false;ThreadPoolExecutor.Worker w = null;try {// 利用Worker構(gòu)造方法中的線程池工廠創(chuàng)建線程,并封裝成工作線程Worker對象w = new ThreadPoolExecutor.Worker(firstTask);// 注意這是Worker中的屬性對象thread (第6處)final Thread t = w.thread;if (t != null) {// 在進(jìn)行ThreadPoolExecutor的敏感操作時(shí)// 都需要持有主鎖,避免在添加和啟動(dòng)線程時(shí)被干擾final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());// 當(dāng)線程池狀態(tài)為RUNNING或SHUTDOWN// 且firstTask初始線程為空時(shí)if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();// 整個(gè)線程池在運(yùn)行期間的最大并發(fā)任務(wù)個(gè)數(shù)if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {// 終于看到親切的start方法// 注意,并非線程池的execute的command參數(shù)指向的線程t.start();workerStarted = true;}}} finally {if (! workerStarted)// 線程啟動(dòng)失敗,把剛才第3處加上的工作線程計(jì)數(shù)再減回去addWorkerFailed(w);}return workerStarted;}

這段代碼晦澀難懂,部分地方甚至違反了代碼規(guī)約,但其中蘊(yùn)含的豐富的編碼知識(shí)點(diǎn)值得我們?nèi)W(xué)習(xí),下面接序號來依次講解。
第 1 處, 配合循環(huán)語旬出現(xiàn)的 label ,類似于 goto 作用。 label 定義時(shí),必須把標(biāo)簽和冒號的組合語旬緊緊相鄰定義在循環(huán)體之前,否則會(huì)編譯出錯(cuò)。目的是在實(shí)現(xiàn)多重循環(huán)時(shí)能夠快速退出到任何一層。這種做法的出發(fā)點(diǎn)似乎非常貼心,但是在大型軟件項(xiàng)目中 ,濫用標(biāo)簽行跳轉(zhuǎn)的后果將是災(zāi)難性的。示例代碼中,在 retry 下方有兩個(gè)無限循環(huán),在 workerCount 加 1 成功后,直接退出兩層循環(huán)。
第 2 處,這樣的表達(dá)式不利于代碼閱讀,應(yīng)該改成:

Boolean isNotAllowedToCreateTask= runStateLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP)|| firstTask != null || workQueue.isEmpty());
if (isNotAllowedToCreateTask) {// ...
}

第 3 處, 與第 1 處的標(biāo)簽呼應(yīng) , Atomiclnteger 對象的加1操作是原子性的。break retry 表示直接跳出與 retry 相鄰的這個(gè)循環(huán)體。
第 4 處,此 continue 跳轉(zhuǎn)至標(biāo)簽處,繼續(xù)執(zhí)行循環(huán)。如果條件為假,則說明線程池還處于運(yùn)行狀態(tài),即繼續(xù)在 for( ;;)循環(huán)內(nèi)執(zhí)行。
第 5 處, compareAndlncrementWorkerCount 方法執(zhí)行失敗的概率非常低。即使失敗 , 再次執(zhí)行時(shí)成功的概率也是極高的,類似于自旋鎖原理。這里的處理邏輯是先加 1,創(chuàng)建失敗再減1,這是輕量處理并發(fā)創(chuàng)建線程的方式。如果先創(chuàng)建線程,成功再加1,當(dāng)發(fā)現(xiàn)超出限制后再銷毀線程,那么這樣的處理方式明顯比前者代價(jià)要大。
第 6 處 , Worker 對象是工作線程的核心類實(shí)現(xiàn),部分源碼如下:

/*** 它實(shí)現(xiàn)Runnable接口,并把本對象作為參數(shù)輸入給run()方法中的runWorker(this),* 所以內(nèi)部屬性線程thread在start的時(shí)候,即會(huì)調(diào)用runWorker方法*/private final class Worker extends AbstractQueuedSynchronizer implements Runnable {/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*/Worker(Runnable firstTask) {// 它是AbstractQueuedSynchronizer的方法// 在runWorker方法執(zhí)行之前禁止線程被中斷setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}// 當(dāng)thread被start()之后,執(zhí)行runWorker的方法/** Delegates main run loop to outer runWorker  */public void run() {runWorker(this);}}

總結(jié)

???? 線程池的相關(guān)源碼比較精煉,還包括線程池的銷毀、任務(wù)提取和消費(fèi)等,與線程狀態(tài)圖一樣,線程池也有自己獨(dú)立的狀態(tài)轉(zhuǎn)化流程,本節(jié)不再展開??偨Y(jié)一下,使用線程池要注意如下幾點(diǎn):
???? ( 1 )合理設(shè)置各類參數(shù),應(yīng)根據(jù)實(shí)際業(yè)務(wù)場景來設(shè)置合理的工作線程數(shù)。
???? ( 2 )線程資源必須通過線城池提供,不允許在應(yīng)用中自行顯式創(chuàng)建線程。
???? ( 3 )創(chuàng)建線程或線城池時(shí)請指定有意義的線程名稱,方便出錯(cuò)時(shí)回溯。

???? 線程池不允許使用 Executors ,而是通過 ThreadPoolExecutor 的方式創(chuàng)建 ,這樣的處理方式能更加明確線程池的運(yùn)行規(guī)則,規(guī)避資源耗盡的風(fēng)險(xiǎn)。

http://www.risenshineclean.com/news/35564.html

相關(guān)文章:

  • 深度網(wǎng)網(wǎng)站建設(shè)方案寧波網(wǎng)站seo哪家好
  • 中小企業(yè)的網(wǎng)站建設(shè)論文今天國際新聞大事
  • 荔灣網(wǎng)站建設(shè)公司網(wǎng)頁制作軟件下載
  • 還有哪些網(wǎng)站做產(chǎn)品眾籌淘寶美工培訓(xùn)推薦
  • 開發(fā)建設(shè)網(wǎng)站的實(shí)施過程是一個(gè)全球最大的中文搜索引擎
  • 工業(yè)設(shè)計(jì)研究生院校排名seo系統(tǒng)源碼出售
  • 做品牌網(wǎng)站的企業(yè)徐州關(guān)鍵詞優(yōu)化排名
  • 公益事業(yè)做網(wǎng)站網(wǎng)站流量查詢站長之家
  • 網(wǎng)站建設(shè)幾種語言對比免費(fèi)加精準(zhǔn)客源
  • 一起做網(wǎng)站鄭州電商網(wǎng)站入口
  • 如何對網(wǎng)站的圖片做cdn萬網(wǎng)創(chuàng)始人
  • 做我網(wǎng)站最近發(fā)生的熱點(diǎn)新聞事件
  • 什么是單頁網(wǎng)站西安網(wǎng)絡(luò)推廣外包公司
  • 網(wǎng)站服務(wù)合同用交印花稅嗎點(diǎn)擊器 百度網(wǎng)盤
  • 濟(jì)寧網(wǎng)站建設(shè)優(yōu)化百度網(wǎng)盤登錄入口 網(wǎng)頁
  • 把html文件生成網(wǎng)址平原縣網(wǎng)站seo優(yōu)化排名
  • 網(wǎng)頁設(shè)計(jì)實(shí)訓(xùn)總結(jié)2500字seo網(wǎng)站推廣案例
  • 諸城營銷型網(wǎng)站建設(shè)seo自動(dòng)優(yōu)化工具
  • 中國空間站科幻作文1000字中國突然宣布一重磅消息
  • 網(wǎng)頁設(shè)計(jì)與網(wǎng)站建設(shè)作業(yè)怎么做周口seo公司
  • 東莞專業(yè)做網(wǎng)站建設(shè)服務(wù)怎么做百度推廣運(yùn)營
  • 46設(shè)計(jì)網(wǎng)站官網(wǎng)快速開發(fā)網(wǎng)站的應(yīng)用程序
  • 手機(jī)怎么在百度做網(wǎng)站百度seo軟件優(yōu)化
  • 獨(dú)立ip做網(wǎng)站廣告聯(lián)盟app推廣
  • 做房產(chǎn)應(yīng)看的網(wǎng)站下載微信
  • lamp 搭建wordpressseo外鏈工具有用嗎
  • 怎么推廣網(wǎng)站建設(shè)業(yè)務(wù)百度醫(yī)生
  • 做網(wǎng)站都有什么項(xiàng)目網(wǎng)絡(luò)營銷推廣要求
  • 建設(shè)公司營銷網(wǎng)站優(yōu)化落實(shí)新十條措施
  • 屬于網(wǎng)站的管理 更新 維護(hù)微信營銷軟件手機(jī)版