專(zhuān)業(yè)網(wǎng)站建設(shè)首選公司沈陽(yáng)seo網(wǎng)站關(guān)鍵詞優(yōu)化
目錄
1.線程池概
2 為什么要使用線程池
1創(chuàng)建線程問(wèn)題
2解決上面兩個(gè)問(wèn)題思路:
3線程池的好處
4線程池適合應(yīng)用場(chǎng)景
3 線程池的構(gòu)造函數(shù)參數(shù)
1.corePoolSize int 線程池核心線程大小
2.maximumPoolSize int 線程池最大線程數(shù)量
3.keepAliveTime long 空閑線程存活時(shí)間
4.unit 空閑線程存活時(shí)間單位
5.workQueue? BlockingQueue 工作隊(duì)列
①ArrayBlockingQueue
②LinkedBlockingQuene
③SynchronousQuene
④PriorityBlockingQueue
6.threadFactory ThreadFactory? 線程工廠? ? ? ?
7.handler RejectedExecutionHandler 拒絕策略
①CallerRunsPolicy
?8.添加線程例子
例如飯店的桌子的故事:
線程池的而實(shí)際例子:
4增減線程的特點(diǎn)
5線程池手動(dòng)創(chuàng)建還是自動(dòng)創(chuàng)建
1自動(dòng)創(chuàng)建好處
2自動(dòng)創(chuàng)建可能帶來(lái)哪些問(wèn)題
1.newFixedThreadPool
2.newSingleThreadExecutor
?3.newCachedThreadPool
3正確創(chuàng)建線程池的方法
4線程池里的線程數(shù)量設(shè)定多少比較適合
6停止線程的正確方法
1.shutdown
?2isShutdown
3.isTerminated
4.awaitTermination
5.shutdownNow
7.線程池拒絕策略
1.拒絕時(shí)機(jī)
2.四種拒絕策略
1.概述
8.鉤子方法
1.每個(gè)任務(wù)執(zhí)行之前之后暫?;謴?fù)模擬日志、統(tǒng)計(jì)
9.線程池原理
1.線程池組成部分
2.線程管理器
3.工作線程
4.任務(wù)列隊(duì)
5.任務(wù)接口
1.線程池概
軟件中的池,就是提前創(chuàng)建好了東西放在池子里,你直接去池子里拿去用就行了,有現(xiàn)成的可用的,節(jié)省了你臨時(shí)創(chuàng)建的時(shí)間。
我們的資源是有限的,比如就10個(gè)線程,創(chuàng)造一個(gè)10個(gè)線程的線程池。
線程池中創(chuàng)建的線程,可以重復(fù)使用,可以控制資源總量。如果不使用線程池,每個(gè)任務(wù)都開(kāi)一個(gè)新線程處理
1.1個(gè)線程
2.for循環(huán)創(chuàng)建線程
3.當(dāng)任務(wù)量生成1000個(gè)時(shí)
public class EveryTaskOneThread {public static void main(String[] args) {for (int i = 0; i < 1000; i++) {Thread thread = new Thread(new Task());thread.start();}}static class Task implements Runnable{@Overridepublic void run() {System.out.println("執(zhí)行了任務(wù)!");}}
}
這樣開(kāi)銷(xiāo)太大,我們希望有固定數(shù)量的線程,
來(lái)執(zhí)行這1000個(gè)線程,
這樣就避免了反復(fù)創(chuàng)建并銷(xiāo)毀線程所帶來(lái)的開(kāi)銷(xiāo)問(wèn)題。
2 為什么要使用線程池
1創(chuàng)建線程問(wèn)題
- 反復(fù)創(chuàng)建線程開(kāi)銷(xiāo)大
- 過(guò)多的線程會(huì)占用大多內(nèi)存
2解決上面兩個(gè)問(wèn)題思路:
- 用少量線程-避免內(nèi)存占用過(guò)多
- 讓這部分線程都保持工作,且可以反復(fù)執(zhí)行任務(wù)-避免生命周期的損耗
3線程池的好處
- 加快響應(yīng)速度
- 合理利用CPU和內(nèi)存
- 統(tǒng)一歸管理資源
4線程池適合應(yīng)用場(chǎng)景
- 服務(wù)器接收大量請(qǐng)求時(shí),使用線程池 是非常合適的,他可以大大減少線程池的創(chuàng)建和銷(xiāo)毀次數(shù),提高服務(wù)器的工作效率
- 實(shí)際開(kāi)發(fā)中,如果需要?jiǎng)?chuàng)建5個(gè)以上的線程,那么就可以使用線程池來(lái)管理
3 線程池的構(gòu)造函數(shù)參數(shù)
1.corePoolSize int 線程池核心線程大小
線程池中會(huì)維護(hù)一個(gè)最小的線程數(shù)量,即使這些線程處理空閑狀態(tài),他們也不會(huì)被銷(xiāo)毀,除非設(shè)置了allowCoreThreadTimeOut。這里的最小線程數(shù)量即是corePoolSize。任務(wù)提交到線程池后,首先會(huì)檢查當(dāng)前線程數(shù)是否達(dá)到了corePoolSize,如果沒(méi)有達(dá)到的話,則會(huì)創(chuàng)建一個(gè)新線程來(lái)處理這個(gè)任務(wù)。
是否需要添加線程規(guī)則
2.maximumPoolSize int 線程池最大線程數(shù)量
當(dāng)前線程數(shù)達(dá)到corePoolSize后,如果繼續(xù)有任務(wù)被提交到線程池,會(huì)將任務(wù)緩存到工作隊(duì)列(后面會(huì)介紹)中。如果隊(duì)列也已滿(mǎn),則會(huì)去創(chuàng)建一個(gè)新線程來(lái)出來(lái)這個(gè)處理。線程池不會(huì)無(wú)限制的去創(chuàng)建新線程,它會(huì)有一個(gè)最大線程數(shù)量的限制,這個(gè)數(shù)量即由maximunPoolSize指定。
3.keepAliveTime long 空閑線程存活時(shí)間
一個(gè)線程如果處于空閑狀態(tài),并且當(dāng)前的線程數(shù)量大于corePoolSize,那么在指定時(shí)間后,這個(gè)空閑線程會(huì)被銷(xiāo)毀,這里的指定時(shí)間由keepAliveTime來(lái)設(shè)定
4.unit 空閑線程存活時(shí)間單位
keepAliveTime的計(jì)量單位?
5.workQueue? BlockingQueue 工作隊(duì)列
新任務(wù)被提交后,會(huì)先進(jìn)入到此工作隊(duì)列中,任務(wù)調(diào)度時(shí)再?gòu)年?duì)列中取出任務(wù)。jdk中提供了四種工作隊(duì)列:?
①ArrayBlockingQueue
基于數(shù)組的有界阻塞隊(duì)列,按FIFO排序。新任務(wù)進(jìn)來(lái)后,會(huì)放到該隊(duì)列的隊(duì)尾,有界的數(shù)組可以防止資源耗盡問(wèn)題。當(dāng)線程池中線程數(shù)量達(dá)到corePoolSize后,再有新任務(wù)進(jìn)來(lái),則會(huì)將任務(wù)放入該隊(duì)列的隊(duì)尾,等待被調(diào)度。如果隊(duì)列已經(jīng)是滿(mǎn)的,則創(chuàng)建一個(gè)新線程,如果線程數(shù)量已經(jīng)達(dá)到maxPoolSize,則會(huì)執(zhí)行拒絕策略。
②LinkedBlockingQuene
基于鏈表的無(wú)界阻塞隊(duì)列(其實(shí)最大容量為Interger.MAX),按照FIFO排序。由于該隊(duì)列的近似無(wú)界性,當(dāng)線程池中線程數(shù)量達(dá)到corePoolSize后,再有新任務(wù)進(jìn)來(lái),會(huì)一直存入該隊(duì)列,而基本不會(huì)去創(chuàng)建新線程直到maxPoolSize(很難達(dá)到Interger.MAX這個(gè)數(shù)),因此使用該工作隊(duì)列時(shí),參數(shù)maxPoolSize其實(shí)是不起作用的。
③SynchronousQuene
一個(gè)不緩存任務(wù)的阻塞隊(duì)列,生產(chǎn)者放入一個(gè)任務(wù)必須等到消費(fèi)者取出這個(gè)任務(wù)。也就是說(shuō)新任務(wù)進(jìn)來(lái)時(shí),不會(huì)緩存,而是直接被調(diào)度執(zhí)行該任務(wù),如果沒(méi)有可用線程,則創(chuàng)建新線程,如果線程數(shù)量達(dá)到maxPoolSize,則執(zhí)行拒絕策略。
④PriorityBlockingQueue
具有優(yōu)先級(jí)的無(wú)界阻塞隊(duì)列,優(yōu)先級(jí)通過(guò)參數(shù)Comparator實(shí)現(xiàn)。
6.threadFactory ThreadFactory? 線程工廠? ? ? ?
創(chuàng)建一個(gè)新線程時(shí)使用的工廠,可以用來(lái)設(shè)定線程名、是否為daemon線程等等
新的線程是由ThreadFactory創(chuàng)建的,默認(rèn)使用Executors.defaultThreadFactory0,創(chuàng)建出來(lái)的線程都在同個(gè)線程組,擁有同樣的NORM PRIORITY優(yōu)先級(jí)并且都不是守護(hù)線程。如果自己指定ThreadFactory,那么就可以改變線程名、線程組、優(yōu)先級(jí)、是否是守護(hù)線程等
通常使用默認(rèn)的即可
7.handler RejectedExecutionHandler 拒絕策略
當(dāng)工作隊(duì)列中的任務(wù)已到達(dá)最大限制,并且線程池中的線程數(shù)量也達(dá)到最大限制,這時(shí)如果有新任務(wù)提交進(jìn)來(lái),該如何處理呢。這里的拒絕策略,就是解決這個(gè)問(wèn)題的,jdk中提供了4中拒絕策略:?
①CallerRunsPolicy
該策略下,在調(diào)用者線程中直接執(zhí)行被拒絕任務(wù)的run方法,除非線程池已經(jīng)shutdown,則直接拋棄任務(wù)。
②AbortPolicy
該策略下,直接丟棄任務(wù),并拋出RejectedExecutionException異常。
③DiscardPolicy
該策略下,直接丟棄任務(wù),什么都不做。
④DiscardOldestPolicy
該策略下,拋棄進(jìn)入隊(duì)列最早的那個(gè)任務(wù),然后嘗試把這次拒絕的任務(wù)放入隊(duì)列
?8.添加線程例子
例如飯店的桌子的故事:
飯店屋子里面10個(gè)桌子,這10個(gè)桌子是一直存在的。是corePoolSize。
生意火爆,里面桌子坐滿(mǎn)了,需要使用臨時(shí)桌子,放到飯店門(mén)口。是maxPoolSize。在收攤的時(shí)候外面的椅子會(huì)收回來(lái)的。里面的桌子不會(huì)處理,一直會(huì)等待。
線程池的而實(shí)際例子:
線程池:核心池大小5,最大池大小為10,隊(duì)列為100
線程池中的請(qǐng)求最多創(chuàng)建5個(gè),然后任務(wù)被添加到隊(duì)列中,直到達(dá)到100.當(dāng)隊(duì)列已滿(mǎn)時(shí),將創(chuàng)建最新的線程。maxPoolSize。最多10個(gè)線程,如果再來(lái)任務(wù),就拒絕。
4增減線程的特點(diǎn)
- 通過(guò)設(shè)置corePoolSize和maximunPoolSize相同,就可以創(chuàng)建固定大小的線程池
- 線程池希望保持更小的線程數(shù)量,并且只有在負(fù)載變得很大的時(shí)候才去增加他
- 通過(guò)設(shè)置maximunPoolSize為很高的值,可以允許線程池容納任意數(shù)量的并發(fā)任務(wù)
- 只有隊(duì)列滿(mǎn)的時(shí)才會(huì)創(chuàng)建多余corePoolSize的線程,如果使用無(wú)界隊(duì)列(LinkedBlockingQueue),那么線程數(shù)就不會(huì)超過(guò)corePoolSize
5線程池手動(dòng)創(chuàng)建還是自動(dòng)創(chuàng)建
1自動(dòng)創(chuàng)建好處
- 手動(dòng)創(chuàng)建更好,因?yàn)檫@樣可以讓我們更加明確線程池創(chuàng)建的規(guī)則,避免資源耗盡的風(fēng)險(xiǎn)。
2自動(dòng)創(chuàng)建可能帶來(lái)哪些問(wèn)題
1.newFixedThreadPool
傳進(jìn)去的LinkedBlockingQueue是沒(méi)有容量限制的,所以當(dāng)請(qǐng)求數(shù)量越來(lái)越多。并且無(wú)法及時(shí)處理完的時(shí)候,也就會(huì)請(qǐng)求堆積,會(huì)很容易造成占用大量?jī)?nèi)存??赡軙?huì)導(dǎo)致oom
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); }
package cn.butool.threadpool;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class FixedThreadPoolTest {public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(4);for (int i = 0; i < 100; i++) {executorService.execute(new Task());}}static class Task implements Runnable{@Overridepublic void run() {try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread());}}
}
錯(cuò)誤案例
修改idea內(nèi)存配置?-Xmx1m -Xms1m
?沒(méi)有點(diǎn)擊Modify options,然后add即可
應(yīng)用之后啟動(dòng)下面代碼
package cn.butool.threadpool;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** 演示newFixedThreadPool出錯(cuò)的情況*/
public class FixedThreadPoolOOMTest {private static ExecutorService executorService = Executors.newFixedThreadPool(1);public static void main(String[] args) {for (int i = 0; i < Integer.MAX_VALUE; i++) {executorService.execute(new Task());}}static class Task implements Runnable{@Overridepublic void run() {System.out.println(Thread.currentThread());try {Thread.sleep(500000000);} catch (InterruptedException e) {e.printStackTrace();}}}
}
?控制臺(tái)打印如下
2.newSingleThreadExecutor
package cn.butool.threadpool;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class SingleThreadExcutor {public static void main(String[] args) {// 單獨(dú)的線程不需要傳參數(shù),線程數(shù)默認(rèn)1/** public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}* */ExecutorService executorService = Executors.newSingleThreadExecutor();for (int i = 0; i < 100; i++) {executorService.execute(new Task());}}static class Task implements Runnable{@Overridepublic void run() {System.out.println(Thread.currentThread());}}
}
?3.newCachedThreadPool
package cn.butool.threadpool;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class CachedThreadExcutor {public static void main(String[] args) {// 無(wú)界線程池,具有自動(dòng)回收對(duì)于線程的功能ExecutorService executorService = Executors.newCachedThreadPool();for (int i = 0; i < 100; i++) {executorService.execute(new Task());}}static class Task implements Runnable{@Overridepublic void run() {System.out.println(Thread.currentThread());}}
}
3正確創(chuàng)建線程池的方法
- 根據(jù)不同的業(yè)務(wù)場(chǎng)景,自己設(shè)置線程池參數(shù),比如我們的內(nèi)存有多大,我們想給線程取什么名字等等。
4線程池里的線程數(shù)量設(shè)定多少比較適合
創(chuàng)建多少線程合適,要看多線程具體的應(yīng)用場(chǎng)景。我們的程序一般都是 CPU 計(jì)算和 I/O 操作交叉執(zhí)行的,由于 I/O 設(shè)備的速度相對(duì)于 CPU 來(lái)說(shuō)都很慢,所以大部分情況下,I/O 操作執(zhí)行的時(shí)間相對(duì)于 CPU 計(jì)算來(lái)說(shuō)都非常長(zhǎng),這種場(chǎng)景我們一般都稱(chēng)為 I/O 密集型計(jì)算;和 I/O 密集型計(jì)算相對(duì)的就是 CPU 密集型計(jì)算了,CPU 密集型計(jì)算大部分場(chǎng)景下都是純 CPU 計(jì)算。I/O 密集型程序和 CPU 密集型程序,計(jì)算最佳線程數(shù)的方法是不同的。
下面我們對(duì)這兩個(gè)場(chǎng)景分別說(shuō)明。
對(duì)于 CPU 密集型計(jì)算,多線程本質(zhì)上是提升多核 CPU 的利用率,所以對(duì)于一個(gè) 4 核的 CPU,每個(gè)核一個(gè)線程,理論上創(chuàng)建 4 個(gè)線程就可以了,再多創(chuàng)建線程也只是增加線程切換的成本。所以,對(duì)于 CPU 密集型的計(jì)算場(chǎng)景,理論上“線程的數(shù)量 =CPU 核數(shù)”就是最合適的。不過(guò)在工程上,線程的數(shù)量一般會(huì)設(shè)置為“CPU 核數(shù) +1”,這樣的話,當(dāng)線程因?yàn)榕紶柕膬?nèi)存頁(yè)失效或其他原因?qū)е伦枞麜r(shí),這個(gè)額外的線程可以頂上,從而保證 CPU 的利用率。
對(duì)于 I/O 密集型的計(jì)算場(chǎng)景,比如前面我們的例子中,如果 CPU 計(jì)算和 I/O 操作的耗時(shí)是 1:1,那么 2 個(gè)線程是最合適的。如果 CPU 計(jì)算和 I/O 操作的耗時(shí)是 1:2,那多少個(gè)線程合適呢?是 3 個(gè)線程,如下圖所示:CPU 在 A、B、C 三個(gè)線程之間切換,對(duì)于線程 A,當(dāng) CPU 從 B、C 切換回來(lái)時(shí),線程 A 正好執(zhí)行完 I/O 操作。這樣 CPU 和 I/O 設(shè)備的利用率都達(dá)到了 100%。
CPU密集型(加密、計(jì)算hash等): 最佳線程數(shù)為CPU核心數(shù)的1-2倍左右。
耗時(shí)IO型(讀寫(xiě)數(shù)據(jù)庫(kù)、文件、網(wǎng)絡(luò)讀寫(xiě)等):最佳線程數(shù)般會(huì)大于cpu核心數(shù)很多倍,以JVM線程監(jiān)控顯示繁忙情況頭依據(jù),保證線程空閑可以銜接上
參考Brain Goetz推薦最終終計(jì)算方法: 線程數(shù)=CPU核心數(shù)*( 1+平均等待時(shí)間/平均工作時(shí)間)
6停止線程的正確方法
1.shutdown
對(duì)于線程池而言,拒絕后面新的的任務(wù),存量任務(wù)執(zhí)行完會(huì)關(guān)閉
package cn.butool.threadpool;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** 演示關(guān)閉線程*/
public class ShuntDown {public static void main(String[] args) throws InterruptedException {// 創(chuàng)建10個(gè)固定的線程池ExecutorService executorService = Executors.newFixedThreadPool(10);for (int i = 0; i < 10; i++) {executorService.execute(new ShuntDownTask());}Thread.sleep(500);executorService.shutdown();// 再提交新的任務(wù),提交不進(jìn)去for (int i = 0; i < 1000; i++) {executorService.execute(new ShuntDownTask());}}static class ShuntDownTask implements Runnable{@Overridepublic void run() {try {Thread.sleep(500);System.out.println(Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}}}
}
控制臺(tái)打印,可以看到后面的任務(wù)沒(méi)有被執(zhí)行
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task cn.butool.threadpool.ShuntDown$ShuntDownTask@29453f44 rejected from java.util.concurrent.ThreadPoolExecutor@5cad8086[Shutting down, pool size = 10, active threads = 10, queued tasks = 0, completed tasks = 0]at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)at cn.butool.threadpool.ShuntDown.main(ShuntDown.java:20)
pool-1-thread-9
pool-1-thread-1
pool-1-thread-2
pool-1-thread-5
pool-1-thread-6
pool-1-thread-3
pool-1-thread-4
pool-1-thread-8
pool-1-thread-7
pool-1-thread-10Process finished with exit code 1
?2isShutdown
isShutDown當(dāng)調(diào)用shutdown()方法后返回為true。
package cn.butool.threadpool;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** 演示關(guān)閉線程*/
public class ShuntDown {public static void main(String[] args) throws InterruptedException {// 創(chuàng)建10個(gè)固定的線程池ExecutorService executorService = Executors.newFixedThreadPool(10);for (int i = 0; i < 10; i++) {executorService.execute(new ShuntDownTask());}Thread.sleep(500);System.out.println(executorService.isShutdown());executorService.shutdown();System.out.println(executorService.isShutdown());}static class ShuntDownTask implements Runnable{@Overridepublic void run() {try {Thread.sleep(500);System.out.println(Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}}}
}
pool-1-thread-2
pool-1-thread-5
pool-1-thread-6
pool-1-thread-10
pool-1-thread-9
pool-1-thread-1
pool-1-thread-4
pool-1-thread-7
pool-1-thread-8
false
pool-1-thread-3
true
3.isTerminated
isTerminated當(dāng)調(diào)用shutdown()方法后,并且所有提交的任務(wù)完成后返回為true
package cn.butool.threadpool;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** 演示關(guān)閉線程*/
public class ShuntDown {public static void main(String[] args) throws InterruptedException {// 創(chuàng)建10個(gè)固定的線程池ExecutorService executorService = Executors.newFixedThreadPool(10);for (int i = 0; i < 10; i++) {executorService.execute(new ShuntDownTask());}Thread.sleep(500);System.out.println(executorService.isShutdown());executorService.shutdown();System.out.println(executorService.isTerminated());}static class ShuntDownTask implements Runnable{@Overridepublic void run() {try {Thread.sleep(500);System.out.println(Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}}}
}
打印
pool-1-thread-10
pool-1-thread-3
pool-1-thread-4
pool-1-thread-7
pool-1-thread-8
false
pool-1-thread-9
pool-1-thread-6
false
pool-1-thread-5
pool-1-thread-2
pool-1-thread-1
縮小任務(wù)查看
package cn.butool.threadpool;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** 演示關(guān)閉線程*/
public class ShuntDown {public static void main(String[] args) throws InterruptedException {// 創(chuàng)建10個(gè)固定的線程池ExecutorService executorService = Executors.newFixedThreadPool(10);for (int i = 0; i < 1; i++) {executorService.execute(new ShuntDownTask());}Thread.sleep(500);System.out.println(executorService.isShutdown());executorService.shutdown();System.out.println(executorService.isTerminated());}static class ShuntDownTask implements Runnable{@Overridepublic void run() {try {Thread.sleep(500);System.out.println(Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}}}
}
打印
false
pool-1-thread-1
true
4.awaitTermination
有時(shí)場(chǎng)景需要主線程等各子線程都運(yùn)行完畢后再執(zhí)行。這時(shí)候就需要用到ExecutorService接口中的awaitTermination方法
package cn.butool.threadpool;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;/*** 演示關(guān)閉線程*/
public class ShuntDown {public static void main(String[] args) throws InterruptedException {// 創(chuàng)建10個(gè)固定的線程池ExecutorService executorService = Executors.newFixedThreadPool(10);for (int i = 0; i < 10; i++) {executorService.execute(new ShuntDownTask());}Thread.sleep(500);executorService.shutdown();//該方法調(diào)用會(huì)被阻塞,并且在以下幾種情況任意一種發(fā)生時(shí)都會(huì)導(dǎo)致該方法的執(zhí)行:// 即shutdown方法被調(diào)用之后,或者參數(shù)中定義的timeout時(shí)間到達(dá)或者當(dāng)前線程被打斷,// 這幾種情況任意一個(gè)發(fā)生了都會(huì)導(dǎo)致該方法在所有任務(wù)完成之后才執(zhí)行。// 第一個(gè)參數(shù)是long類(lèi)型的超時(shí)時(shí)間,第二個(gè)參數(shù)可以為該時(shí)間指定單位。System.out.println(executorService.awaitTermination(3, TimeUnit.SECONDS));}static class ShuntDownTask implements Runnable{@Overridepublic void run() {try {Thread.sleep(500);System.out.println(Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}}}
}
控制臺(tái)打印
pool-1-thread-2
pool-1-thread-3
pool-1-thread-7
pool-1-thread-6
pool-1-thread-4
pool-1-thread-8
pool-1-thread-5
pool-1-thread-1
pool-1-thread-9
pool-1-thread-10
trueProcess finished with exit code 0
5.shutdownNow
shutdownNow調(diào)用的是中斷所有的Workers,shutdownNow會(huì)把所有任務(wù)隊(duì)列中的任務(wù)取出來(lái),返回一個(gè)任務(wù)列表。
package cn.butool.threadpool;import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** 演示關(guān)閉線程*/
public class ShuntDown {public static void main(String[] args) throws InterruptedException {// 創(chuàng)建10個(gè)固定的線程池ExecutorService executorService = Executors.newFixedThreadPool(10);for (int i = 0; i < 100; i++) {executorService.execute(new ShuntDownTask());}Thread.sleep(1500);// 立刻關(guān)閉掉線程池,返會(huì)沒(méi)有執(zhí)行的任務(wù)列表List<Runnable> runnables = executorService.shutdownNow();System.out.println("被中斷數(shù)量"+runnables.size());}static class ShuntDownTask implements Runnable{@Overridepublic void run() {try {Thread.sleep(500);System.out.println(Thread.currentThread().getName());} catch (InterruptedException e) {System.out.println("被中斷了"+Thread.currentThread().getName());}}}
}
控制臺(tái)打印
pool-1-thread-3
pool-1-thread-9
pool-1-thread-10
pool-1-thread-6
pool-1-thread-5
pool-1-thread-2
pool-1-thread-1
pool-1-thread-8
pool-1-thread-7
pool-1-thread-4
pool-1-thread-3
pool-1-thread-1
pool-1-thread-4
pool-1-thread-9
pool-1-thread-10
pool-1-thread-6
pool-1-thread-5
pool-1-thread-2
pool-1-thread-8
pool-1-thread-7
被中斷了pool-1-thread-7
被中斷了pool-1-thread-5
被中斷了pool-1-thread-6
被中斷了pool-1-thread-9
被中斷了pool-1-thread-3
被中斷了pool-1-thread-1
被中斷了pool-1-thread-2
被中斷了pool-1-thread-10
被中斷了pool-1-thread-4
被中斷了pool-1-thread-8
被中斷數(shù)量70Process finished with exit code 0
7.線程池拒絕策略
1.拒絕時(shí)機(jī)
- Executors關(guān)閉時(shí)提交新任務(wù)會(huì)拒絕
- Executors最大線程和工作隊(duì)列容量使用優(yōu)先邊界并且已經(jīng)飽和
2.四種拒絕策略
1.概述
??拒絕策略提供頂級(jí)接口 RejectedExecutionHandler ,其中方法 rejectedExecution 即定制具體的拒絕策略的執(zhí)行邏輯。
jdk默認(rèn)提供了四種拒絕策略
2.AbortPolicy?-?拋出異常,中止任務(wù)。拋出拒絕執(zhí)行 RejectedExecutionException 異常信息。線程池默認(rèn)的拒絕策略。必須處理好拋出的異常,否則會(huì)打斷當(dāng)前的執(zhí)行流程,影響后續(xù)的任務(wù)執(zhí)行
3.CallerRunsPolicy?-?使用調(diào)用線程執(zhí)行任務(wù)。當(dāng)觸發(fā)拒絕策略,只要線程池沒(méi)有關(guān)閉的話,則使用調(diào)用線程直接運(yùn)行任務(wù)。一般并發(fā)比較小,性能要求不高,不允許失敗。但是,由于調(diào)用者自己運(yùn)行任務(wù),如果任務(wù)提交速度過(guò)快,可能導(dǎo)致程序阻塞,性能效率上必然的損失較大
4.DiscardPolicy?-?直接丟棄,其他啥都沒(méi)有
5.DiscardOldestPolicy?-?丟棄隊(duì)列最老任務(wù),添加新任務(wù)。當(dāng)觸發(fā)拒絕策略,只要線程池沒(méi)有關(guān)閉的話,丟棄阻塞隊(duì)列 workQueue 中最老的一個(gè)任務(wù),并將新任務(wù)加入
8.鉤子方法
1.每個(gè)任務(wù)執(zhí)行之前之后暫?;謴?fù)模擬日志、統(tǒng)計(jì)
package cn.butool.threadpool;import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;/*** 每個(gè)任務(wù)執(zhí)行的前后放鉤子函數(shù)*/
public class SuspendThreadPoolTest extends ThreadPoolExecutor {public SuspendThreadPoolTest(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);}public SuspendThreadPoolTest(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);}public SuspendThreadPoolTest(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);}public SuspendThreadPoolTest(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);}/*** 重寫(xiě)方法* @param t* @param r*/@Overrideprotected void beforeExecute(Thread t, Runnable r) {super.beforeExecute(t, r);lock.lock();try {while (isPaused){unPaused.await();}} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}private boolean isPaused;private final ReentrantLock lock= new ReentrantLock();private Condition unPaused=lock.newCondition();private void usePaused(){// 枷鎖lock.lock();try {isPaused=true;} catch (Exception e) {e.printStackTrace();} finally {// 釋放鎖lock.unlock();}}/*** 恢復(fù)函數(shù)*/private void resume(){// 枷鎖lock.lock();try {isPaused=false;unPaused.signalAll();} catch (Exception e) {e.printStackTrace();} finally {// 釋放鎖lock.unlock();}}public static void main(String[] args) throws InterruptedException {SuspendThreadPoolTest suspendThreadPoolTest = new SuspendThreadPoolTest(5,10,20,TimeUnit.SECONDS,new LinkedBlockingDeque<>());Runnable runnable = new Runnable() {@Overridepublic void run() {System.out.println("我被執(zhí)行"+Thread.currentThread().getName());try {Thread.sleep(10);} catch (Exception e) {e.printStackTrace();} finally {}}};for (int i = 0; i < 10000; i++) {suspendThreadPoolTest.execute(runnable);}Thread.sleep(1500);//線程池暫停suspendThreadPoolTest.usePaused();System.out.println("線程池被暫停了");Thread.sleep(1500);suspendThreadPoolTest.resume();System.out.println("線程池被恢復(fù)了");}
}
9.線程池原理
1.線程池組成部分
1.線程管理器
2.工作線程
3.任務(wù)列隊(duì)
4.任務(wù)接口
2.線程池家族關(guān)系
3.線程池如何實(shí)現(xiàn)線程復(fù)用的
- 相同的線程執(zhí)行不同的任務(wù)
源碼
public void execute(Runnable command) {if (command == null)throw new NullPointerException();/**分三步進(jìn)行:** 1. 如果運(yùn)行的線程少于corePoolSize,請(qǐng)嘗試*以給定的命令作為第一個(gè)命令啟動(dòng)一個(gè)新線程*任務(wù)。對(duì)addWorker的調(diào)用原子地檢查runState和*workerCount,這樣可以防止錯(cuò)誤警報(bào)*在不應(yīng)該執(zhí)行的情況下執(zhí)行線程,返回false。** 2. 如果任務(wù)可以成功排隊(duì),那么我們?nèi)匀恍枰?仔細(xì)檢查我們是否應(yīng)該添加一個(gè)線程*(因?yàn)樽陨洗螜z查以來(lái)已有的死亡)或*池在進(jìn)入此方法后關(guān)閉。所以我們*重新檢查狀態(tài),并在必要時(shí)回滾排隊(duì)*已停止,或者在沒(méi)有線程的情況下啟動(dòng)一個(gè)新線程。** 3. 如果我們不能對(duì)任務(wù)進(jìn)行排隊(duì),那么我們嘗試添加一個(gè)新的*螺紋。如果它失敗了,我們就知道我們已經(jīng)關(guān)閉或飽和了*因此拒絕該任務(wù)。*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}
增加worker核心方法
/** Methods for creating, running and cleaning up after workers*//***檢查是否可以相對(duì)于當(dāng)前工作人員添加新工作人員*池狀態(tài)和給定的界限(核心或最大值)。如果是,*相應(yīng)地調(diào)整工人人數(shù),如果可能的話*將創(chuàng)建并啟動(dòng)新的輔助進(jìn)程,并將firstTask作為其*第一項(xiàng)任務(wù)。如果池已停止或*有資格關(guān)閉。如果線程*當(dāng)被要求時(shí),工廠無(wú)法創(chuàng)建線程。如果螺紋*創(chuàng)建失敗,可能是由于線程工廠返回*null,或由于異常(通常為中的OutOfMemoryError*Thread.start()),我們干凈地回滾。**@param firstTask新線程應(yīng)該首先運(yùn)行的任務(wù)(或*如果沒(méi)有,則為null)。工人是用最初的第一個(gè)任務(wù)創(chuàng)建的*(在方法execute()中),以在數(shù)量較少時(shí)繞過(guò)排隊(duì)*比corePoolSize線程(在這種情況下,我們總是啟動(dòng)一個(gè)線程),*或者當(dāng)隊(duì)列已滿(mǎn)時(shí)(在這種情況下,我們必須繞過(guò)隊(duì)列)。*最初的空閑線程通常通過(guò)*prestartCoreThread或替換其他垂死的工人。**@param core如果為true,則使用corePoolSize作為綁定,否則*最大池大小。(此處使用布爾指示符,而不是*值,以確保在檢查其他池后讀取新值*州)。*@如果成功,返回true*/private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}
worker核心類(lèi),調(diào)用runWorker方法實(shí)現(xiàn)了線程復(fù)用;
/**Class Worker主要維護(hù)運(yùn)行任務(wù)的線程的中斷控制狀態(tài),以及其他次要的記賬。此類(lèi)機(jī)會(huì)主義地?cái)U(kuò)展了AbstractQueuedSynchronizer,以簡(jiǎn)化獲取和釋放每個(gè)任務(wù)執(zhí)行周?chē)逆i。這可以防止旨在喚醒等待任務(wù)的工作線程而不是中斷正在運(yùn)行的任務(wù)的中斷。我們實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的非重入互斥鎖,而不是使用ReentrantLock,因?yàn)槲覀儾幌Mぷ魅蝿?wù)在調(diào)用諸如setCorePoolSize之類(lèi)的池控制方法時(shí)能夠重新獲取鎖。此外,為了在線程真正開(kāi)始運(yùn)行任務(wù)之前抑制中斷,我們將鎖定狀態(tài)初始化為負(fù)值,并在啟動(dòng)時(shí)清除它(在runWorker中)。
*/private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{/*** This class will never be serialized, but we provide a* serialVersionUID to suppress a javac warning.*/private static final long serialVersionUID = 6138294804551838833L;/** Thread this worker is running in. Null if factory fails. */final Thread thread;/** Initial task to run. Possibly null. */Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*/Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker */public void run() {runWorker(this);}// Lock methods//// The value 0 represents the unlocked state.// The value 1 represents the locked state.protected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}
runWorker
/*主要工人運(yùn)行循環(huán)。重復(fù)地從隊(duì)列中獲取任務(wù)并執(zhí)行它們,
同時(shí)處理許多問(wèn)題:
1。我們可以從一個(gè)初始任務(wù)開(kāi)始,在這種情況下,
我們不需要獲得第一個(gè)任務(wù)。否則,只要池正在運(yùn)行,
我們就會(huì)從getTask獲得任務(wù)。如果返回null,
則工作進(jìn)程將由于池狀態(tài)或配置參數(shù)的更改而退出。
其他退出是由外部代碼中的異常拋出引起的,
在這種情況下,completedAbruptly保持不變,
這通常會(huì)導(dǎo)致processWorkerExit替換此線程。
2.在運(yùn)行任何任務(wù)之前,獲取鎖以防止任務(wù)執(zhí)行時(shí)其他池中斷,
然后我們確保除非池停止,否則此線程不會(huì)設(shè)置中斷。
3.每次任務(wù)運(yùn)行之前都會(huì)調(diào)用beforeExecute,
這可能會(huì)引發(fā)異常,在這種情況下,
我們會(huì)導(dǎo)致線程在不處理任務(wù)的情況下死亡(用completedAbruptly true中斷循環(huán))。
4.假設(shè)beforeExecute正常完成,我們運(yùn)行任務(wù),
收集它拋出的任何異常,發(fā)送到afterExecute。
我們分別處理RuntimeException、Error(規(guī)范保證我們捕獲這兩者)和任意Throwables。
因?yàn)槲覀儫o(wú)法在Runnable.run中重新拋出Throwables,所以我們將它們封裝在退出時(shí)的
Errors中(到線程的UncaughtException Handler)。
任何拋出的異常也會(huì)保守地導(dǎo)致線程死亡。
5.task.run完成后,我們調(diào)用afterExecute,
它也可能拋出異常,這也會(huì)導(dǎo)致線程死亡。
根據(jù)JLS Sec 14.20,即使task.run拋出,
這個(gè)異常也將生效。
異常機(jī)制的凈效果是afterExecute和線程的UncaughtException處理程序
具有我們所能提供的關(guān)于用戶(hù)代碼遇到的任何問(wèn)題的準(zhǔn)確信息。參數(shù):w–工人*/
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();//如果池正在停止,請(qǐng)確保線程被中斷;//如果沒(méi)有,請(qǐng)確保線程沒(méi)有中斷。這//第二種情況需要重新檢查才能處理//shutdown清除中斷時(shí)無(wú)競(jìng)爭(zhēng)if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}