網(wǎng)站做的好的公司名稱泉州百度關(guān)鍵詞優(yōu)化
Executors快速創(chuàng)建線程池的方法
Java通過Executors 工廠提供了5種創(chuàng)建線程池的方法,具體方法如下
方法名 | 描述 |
---|---|
newSingleThreadExecutor() | 創(chuàng)建一個(gè)單線程的線程池,該線程池中只有一個(gè)工作線程。所有任務(wù)按照提交的順序依次執(zhí)行,保證任務(wù)的順序性。當(dāng)工作線程意外終止時(shí),會創(chuàng)建一個(gè)新的線程來替代它。適用于需要順序執(zhí)行任務(wù)且保證任務(wù)安全性的場景。 |
newFixedThreadPool(int nThreads) | 創(chuàng)建一個(gè)固定大小的線程池,該線程池中的線程數(shù)量固定為指定的數(shù)量。當(dāng)有新任務(wù)提交時(shí),如果線程池中有空閑線程,則立即使用空閑線程執(zhí)行任務(wù);如果沒有空閑線程,則任務(wù)將被放入任務(wù)隊(duì)列等待執(zhí)行。 |
newCachedThreadPool() | 創(chuàng)建一個(gè)緩存線程池,該線程池中的線程數(shù)量不固定,可以根據(jù)任務(wù)的需求動態(tài)調(diào)整線程數(shù)量??臻e線程會被保留一段時(shí)間,如果在保留時(shí)間內(nèi)沒有任務(wù)執(zhí)行,則這些線程將被終止并從線程池中刪除。適用于執(zhí)行大量短期任務(wù)的場景 |
newScheduledThreadPool(int corePoolSize) | 創(chuàng)建一個(gè)可調(diào)度的線程池,該線程池能夠按照一定的調(diào)度策略執(zhí)行任務(wù)。除了執(zhí)行任務(wù)外,還可以按照指定的延遲時(shí)間或周期性地執(zhí)行任務(wù)。適用于需要按照計(jì)劃執(zhí)行任務(wù)、定時(shí)任務(wù)或周期性任務(wù)的場景。 |
newWorkStealingPool(int parallelism) | newWorkStealingPool(int parallelism) 方法用于創(chuàng)建一個(gè)工作竊取線程池。工作竊取線程池是一種特殊的線程池,它根據(jù)一定的調(diào)度策略執(zhí)行任務(wù)。除了執(zhí)行任務(wù)外,工作竊取線程池還可以按照指定的延遲時(shí)間或周期性地執(zhí)行任務(wù)。 |
ThreadFactory
在學(xué)習(xí)Executor創(chuàng)建線程池之前,我們先來學(xué)習(xí)一下
ThreadFactory
是一個(gè)接口,用于創(chuàng)建線程對象的工廠。它定義了一個(gè)方法newThread
,用于創(chuàng)建新的線程。
在Java中,線程的創(chuàng)建通常通過Thread
類的構(gòu)造函數(shù)進(jìn)行,但是使用ThreadFactory
可以將線程的創(chuàng)建過程與線程的執(zhí)行邏輯分離開來。通過自定義的ThreadFactory
,我們可以對線程進(jìn)行更加靈活的配置和管理,例如指定線程名稱、設(shè)置線程優(yōu)先級、設(shè)置線程是否為守護(hù)線程等。
ThreadFactory
接口只有一個(gè)方法:
Thread newThread(Runnable runnable);
該方法接受一個(gè)Runnable
對象作為參數(shù),并返回一個(gè)新的Thread
對象。
一般情況下,我們可以通過實(shí)現(xiàn)ThreadFactory
接口來自定義線程的創(chuàng)建。以下是一個(gè)示例的自定義ThreadFactory
實(shí)現(xiàn):
public class MyThreadFactory implements ThreadFactory {// 自定義線程的名稱private final String namePrefix = "test-async-thread";private final AtomicInteger threadNumber = new AtomicInteger(1);public MyThreadFactory(String namePrefix) {this.namePrefix = namePrefix;}@Overridepublic Thread newThread(Runnable runnable) {Thread thread = new Thread(runnable);thread.setName(namePrefix + "-" + threadNumber.getAndIncrement());thread.setPriority(Thread.NORM_PRIORITY);thread.setDaemon(false);return thread;}
}
在上面的示例中,MyThreadFactory
實(shí)現(xiàn)了ThreadFactory
接口,并通過構(gòu)造函數(shù)傳入一個(gè)namePrefix
參數(shù),用于指定線程的名稱前綴。
在newThread
方法中,首先創(chuàng)建一個(gè)新的Thread
對象,并設(shè)置線程的名稱為namePrefix
加上一個(gè)遞增的數(shù)字。然后,可以根據(jù)需要設(shè)置線程的優(yōu)先級、是否為守護(hù)線程等屬性。
newSingleThreadExecutor() 創(chuàng)建單線程化線程池
該方法用于創(chuàng)建一個(gè)單線程化的線程池
,也就是只有一個(gè)線程的線程池
。該線程池中只有一個(gè)工作線程,它負(fù)責(zé)按照任務(wù)的提交順序依次執(zhí)行任務(wù)。當(dāng)有任務(wù)提交時(shí),會創(chuàng)建一個(gè)新的線程來執(zhí)行任務(wù)。如果工作線程意外終止,線程池會創(chuàng)建一個(gè)新的線程來替代它,確保線程池中始終有一個(gè)可用的線程。
newSingleThreadExecutor()
方法返回的線程池實(shí)例實(shí)現(xiàn)了ExecutorService
接口,因此可以使用submit()
方法提交任務(wù)并獲取Future
對象,或使用execute()
方法提交任務(wù)。
該線程池適用于需要順序執(zhí)行任務(wù)且保證任務(wù)之間不會發(fā)生并發(fā)沖突的場景。由于只有一個(gè)工作線程,所以不存在線程間的競爭
和并發(fā)問題
,可以確保任務(wù)的安全性。
此外,newSingleThreadExecutor()
方法創(chuàng)建的線程池還可以用于任務(wù)的異常處理。當(dāng)任務(wù)拋出異常時(shí),線程池會捕獲異常并記錄或處理異常,避免異常導(dǎo)致整個(gè)應(yīng)用程序崩潰。
需要注意的是,由于該線程池只有一個(gè)線程,如果任務(wù)執(zhí)行時(shí)間過長
或任務(wù)量過大
,可能會導(dǎo)致任務(wù)隊(duì)列堆積
,造成應(yīng)用程序的性能問題。所以在使用該線程池時(shí),需要根據(jù)任務(wù)的特性和需求進(jìn)行適當(dāng)?shù)脑u估和調(diào)優(yōu)。
下面我們使用Executors中newSingleThreadExecutor()方法創(chuàng)建一個(gè)單線程線程池
/*** 這里創(chuàng)建的線程是 是Executors.newSingleThreadExecutor() 一樣 保證只有一個(gè)線程來進(jìn)行執(zhí)行 并且按照提交的順序進(jìn)行執(zhí)行* <pre>* {@code* public static ExecutorService newSingleThreadExecutor() {* return new Executors.FinalizableDelegatedExecutorService (* new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));* }* }* </pre>*/@Testpublic void test10() {// 如果有多個(gè)任務(wù)提交到線程池中,那么這個(gè)線程池中的線程會依次執(zhí)行任務(wù) 和ExecutorService executorService = Executors.newSingleThreadExecutor();// 創(chuàng)建線程 1Thread t1 = new Thread(() -> {logger.error("t1 ----> 開始執(zhí)行了~");}, "t1");// 創(chuàng)建線程 2Thread t2 = new Thread(() -> {logger.error("t2 ----> 開始執(zhí)行了~");}, "t2");// 創(chuàng)建線程 3Thread t3 = new Thread(() -> {logger.error("t3 ----> 開始執(zhí)行了~");}, "t3");// Executor 這個(gè)接口定義的功能很有限,同時(shí)也只支持 Runnale 形式的異步任務(wù)// 向線程池提交任務(wù)executorService.submit(t1);executorService.submit(t2);executorService.submit(t3);// 關(guān)閉線程池executorService.shutdown();}
首先,通過Executors.newSingleThreadExecutor();
創(chuàng)建了一個(gè)單線程化的線程池。
然后,創(chuàng)建了三個(gè)線程t1、t2和t3,分別用于執(zhí)行不同的任務(wù)。
接著,通過executorService.submit(t1)
將t1線程提交到線程池中進(jìn)行執(zhí)行。同樣地,也將t2和t3線程提交到線程池中。
最后,通過executorService.shutdown()
關(guān)閉線程池。
執(zhí)行時(shí),可以觀察到日志輸出的順序。由于線程池中只有一個(gè)線程,所以任務(wù)會依次按照提交的順序進(jìn)行執(zhí)行。
需要注意的是,通過線程池執(zhí)行任務(wù)后,線程的名稱不再是我們自定義的線程名稱,而是線程池的名稱(如pool-2-thread-1
)。這是因?yàn)榫唧w的執(zhí)行任務(wù)是交給線程池來管理和執(zhí)行的。
從輸出中我們可以看出,該線程池有以下特點(diǎn):
-
單線程化的線程池中的任務(wù),都是按照提交的順序來進(jìn)行執(zhí)行的。
-
該線程池中的唯一線程存活時(shí)間是無限的
-
當(dāng)線程池中唯一的線程正在繁忙時(shí),新提交的任務(wù)會進(jìn)入到其內(nèi)部的阻塞隊(duì)列中,而且阻塞隊(duì)列的容量是無限的
-
// 這是 newSingleThreadExecutor 一個(gè)無參的構(gòu)造方法 public static ExecutorService newSingleThreadExecutor() {// 創(chuàng)建一個(gè)FinalizableDelegatedExecutorService實(shí)例,該實(shí)例是ExecutorService接口的一個(gè)包裝類// 將上面創(chuàng)建的ThreadPoolExecutor實(shí)例作為參數(shù)傳入// 這樣就得到了一個(gè)單線程化的線程池return new FinalizableDelegatedExecutorService( // 創(chuàng)建一個(gè)ThreadPoolExecutor實(shí)例,指定參數(shù)如下:// corePoolSize: 1,線程池中核心線程的數(shù)量為1// maximumPoolSize: 1,線程池中最大線程的數(shù)量為1// keepAliveTime: 0L,空閑線程的存活時(shí)間為0毫秒,即空閑線程立即被回收// unit: TimeUnit.MILLISECONDS,存活時(shí)間的時(shí)間單位是毫秒// workQueue: new LinkedBlockingQueue<Runnable>(),使用無界阻塞隊(duì)列作為任務(wù)隊(duì)列new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}// 其中 阻塞隊(duì)列大小,如果不傳容量,默認(rèn)是整形的最大值 public LinkedBlockingQueue() {this(Integer.MAX_VALUE); }
-
總體來說:
ewSingleThreadExecutor();
適用于按照任務(wù)提交次序,一個(gè)接一個(gè)的執(zhí)行場景
newFixedThreadPool 創(chuàng)建固定數(shù)量的線程池
該方法用于創(chuàng)建一個(gè)
固定數(shù)量的線程池
,其中唯一的參數(shù)是用于設(shè)置線程池中線程的數(shù)量
newFixedThreadPool
是Executors
類提供的一個(gè)靜態(tài)方法,用于創(chuàng)建一個(gè)固定大小的線程池。方法具體如下
public static ExecutorService newFixedThreadPool(int nThreads)
參數(shù)
nThreads
表示線程池中的線程數(shù)量,即固定的線程數(shù)量。線程池中的線程數(shù)不會根據(jù)任務(wù)的多少進(jìn)行動態(tài)調(diào)整,即使有空閑線程也不會銷毀,除非調(diào)用了線程池的shutdown
方法。
newFixedThreadPool
方法返回一個(gè)ExecutorService
對象,它是Executor
接口的子接口,提供了更加豐富的任務(wù)提交和管理方法。通過創(chuàng)建固定大小的線程池,可以在任務(wù)并發(fā)量較高且預(yù)期的任務(wù)數(shù)量固定的情況下,提供一定程度的線程復(fù)用和線程調(diào)度控制。線程池會根據(jù)固定的線程數(shù)量來創(chuàng)建對應(yīng)數(shù)量的線程,并將任務(wù)分配給這些線程進(jìn)行執(zhí)行。
線程池的工作原理如下:
- 當(dāng)有任務(wù)提交到線程池時(shí),線程池中的某個(gè)線程會被喚醒來執(zhí)行任務(wù)。
- 如果所有線程都在執(zhí)行任務(wù),新的任務(wù)會被放入一個(gè)任務(wù)隊(duì)列中等待執(zhí)行。
- 當(dāng)任務(wù)隊(duì)列已滿時(shí),線程池會根據(jù)配置的拒絕策略來處理無法執(zhí)行的任務(wù)。
需要注意的是,由于線程池的大小是固定的,如果任務(wù)數(shù)量超過線程池的容量,任務(wù)會在任務(wù)隊(duì)列中等待執(zhí)行。這可能會導(dǎo)致任務(wù)等待時(shí)間增加或任務(wù)堆積,進(jìn)而影響系統(tǒng)的響應(yīng)性能。因此,在選擇線程池大小時(shí),需要根據(jù)系統(tǒng)的負(fù)載情況和任務(wù)特點(diǎn)進(jìn)行合理的配置。
下面我們通過代碼來了解一下 newFixedThreadPool
@Testpublic void test18() {ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);executor.submit(new Runnable() {@Overridepublic void run() {// 隨機(jī)睡覺timeSleep();printPoolInfo(executor, "*[耗時(shí)線程-1]");}});executor.submit(new Runnable() {@Overridepublic void run() {// 隨機(jī)睡覺timeSleep();printPoolInfo(executor, "*[耗時(shí)線程-2]");}});executor.submit(new Runnable() {@Overridepublic void run() {timeSleep();printPoolInfo(executor, "*[耗時(shí)線程-3]");}});for (int i = 0; i < 5; i++) {int finalI = i;executor.submit(new Runnable() {@Overridepublic void run() {printPoolInfo(executor, "普通線程");}});}// 優(yōu)雅關(guān)閉線程池executor.shutdown();waitPoolExecutedEnd(executor);}// 這個(gè)方法用于等待線程全部執(zhí)行結(jié)束public void waitPoolExecutedEnd(ThreadPoolExecutor executor) {// 確保主線程完全等待子線程執(zhí)行完畢try {// 等待線程池中的任務(wù)執(zhí)行完畢,最多等待1天if (!executor.awaitTermination(5, TimeUnit.MINUTES)) {logger.error("線程池中的任務(wù)還未全部執(zhí)行完畢~");}} catch (InterruptedException e) {logger.error("等待線程池中的任務(wù)被中斷~");}}
這段代碼創(chuàng)建了一個(gè)固定大小為2的線程池 executor
,并向其中提交了多個(gè)任務(wù)。其中:
- 通過
Executors.newFixedThreadPool(2)
創(chuàng)建了一個(gè)固定大小為2的線程池。 - 向線程池中提交了多個(gè)任務(wù),包括兩個(gè)耗時(shí)任務(wù)和五個(gè)普通任務(wù)。
- 每個(gè)耗時(shí)任務(wù)都會執(zhí)行
timeSleep()
方法進(jìn)行一段隨機(jī)時(shí)間的睡眠,然后執(zhí)行printPoolInfo()
方法輸出當(dāng)前線程池的信息。 - 普通任務(wù)直接執(zhí)行
printPoolInfo()
方法輸出當(dāng)前線程池的信息。 - 在所有任務(wù)提交完畢后,調(diào)用
executor.shutdown()
方法優(yōu)雅地關(guān)閉線程池,并調(diào)用waitPoolExecutedEnd(executor)
方法等待線程池中的任務(wù)執(zhí)行完畢。
根據(jù)輸出的結(jié)果 我們可以觀察到
- 當(dāng)?shù)谝粋€(gè)耗時(shí)任務(wù)執(zhí)行時(shí),線程池中有2個(gè)核心線程在工作。
- 當(dāng)?shù)谌齻€(gè)任務(wù)執(zhí)行時(shí),線程池中的任務(wù)數(shù)量已經(jīng)達(dá)到5個(gè),但是活躍線程數(shù)量仍然為2,說明任務(wù)正在等待空閑線程來執(zhí)行。
- 所有任務(wù)執(zhí)行完成后,線程池中的線程數(shù)量仍然保持為2,這是因?yàn)榫€程池是一個(gè)固定大小的線程池。
- 在任務(wù)執(zhí)行的過程中,線程池的核心線程數(shù)量一直為2,線程數(shù)量也一直保持在2個(gè),因?yàn)榫€程池的大小是固定的。
固定大小線程池適用于以下場景:
- 資源受限場景:當(dāng)系統(tǒng)資源受限,無法創(chuàng)建過多線程時(shí),固定大小線程池能夠限制線程數(shù)量,防止系統(tǒng)資源被耗盡。
- 穩(wěn)定的任務(wù)處理:適用于穩(wěn)定的任務(wù)處理場景,例如批量數(shù)據(jù)處理、定時(shí)任務(wù)等,因?yàn)楣潭ù笮【€程池能夠保持一定數(shù)量的線程長時(shí)間運(yùn)行,避免線程的頻繁創(chuàng)建和銷毀。
- 控制并發(fā)數(shù)量:在需要限制并發(fā)數(shù)量的場景下,固定大小線程池能夠控制同時(shí)執(zhí)行的任務(wù)數(shù)量,防止系統(tǒng)過載。
- 避免資源競爭:固定大小線程池可以避免多個(gè)任務(wù)爭奪系統(tǒng)資源而導(dǎo)致的競爭和性能下降。
- 穩(wěn)定的性能預(yù)測:在需要穩(wěn)定的性能預(yù)測下,固定大小線程池能夠提供一致的性能表現(xiàn),因?yàn)榫€程數(shù)量是固定的,可以更好地進(jìn)行性能測試和預(yù)測。
newCachedThreadPool 創(chuàng)建可以緩存的線程池
newCachedThreadPool 用于創(chuàng)建一個(gè)
可以緩存的線程池
,如果線程內(nèi)的某些線程無事可干,那么就會成為空線程,可緩存線程池
,可以靈活回收這些空閑線程
newCachedThreadPool
是Executors
類提供的一個(gè)靜態(tài)方法,用于創(chuàng)建一個(gè)緩存型線程池。方法定義如下
public static ExecutorService newCachedThreadPool()
newCachedThreadPool
方法返回一個(gè)ExecutorService
對象,它是Executor
接口的子接口,提供了更加豐富的任務(wù)提交和管理方法。緩存型線程池會根據(jù)需要自動創(chuàng)建和回收線程,線程池的大小可以根據(jù)任務(wù)的數(shù)量自動調(diào)整。如果當(dāng)前沒有可用的空閑線程,會創(chuàng)建新的線程來執(zhí)行任務(wù);如果有空閑線程并且它們在指定的時(shí)間內(nèi)沒有執(zhí)行任務(wù),那么這些空閑線程將會被回收。
使用緩存型線程池的優(yōu)點(diǎn)是可以根據(jù)任務(wù)的數(shù)量動態(tài)調(diào)整線程池的大小,以適應(yīng)不同的負(fù)載情況。當(dāng)任務(wù)數(shù)量較少時(shí),線程池會減少線程的數(shù)量以節(jié)省資源;當(dāng)任務(wù)數(shù)量增加時(shí),線程池會增加線程的數(shù)量以提高并發(fā)性。
需要注意的是,由于緩存型線程池的大小是不限制的,它可能會創(chuàng)建大量的線程,如果任務(wù)的提交速度超過了線程執(zhí)行任務(wù)的速度,可能會導(dǎo)致系統(tǒng)資源消耗過多,甚至造成系統(tǒng)崩潰。因此,在使用緩存型線程池時(shí),需要根據(jù)任務(wù)特點(diǎn)和系統(tǒng)資源情況進(jìn)行合理的配置。
下面我們通過一個(gè)案例,來了解一下newCachedThreadPool,但是了解newCachedThreadPool之前,我們先來熟悉一個(gè)阻塞隊(duì)列,這個(gè)會在后面的阻塞隊(duì)列專題中詳細(xì)介紹,這里只是作為了解
SynchronousQueue
- 無內(nèi)部存儲容量:與其他阻塞隊(duì)列不同,SynchronousQueue 不存儲元素,其容量為零。換句話說,它是一個(gè)零容量的隊(duì)列,用于在線程之間同步傳輸數(shù)據(jù)。
- 阻塞隊(duì)列:作為 BlockingQueue 接口的一個(gè)實(shí)現(xiàn),SynchronousQueue 提供了阻塞操作,允許線程在隊(duì)列的插入和移除操作上進(jìn)行阻塞等待。
- 匹配插入和移除操作:在 SynchronousQueue 中,每個(gè)插入操作必須等待另一個(gè)線程的移除操作,反之亦然。換句話說,發(fā)送線程必須等待接收線程,而接收線程也必須等待發(fā)送線程,才能夠完成操作。這樣的特性保證了數(shù)據(jù)的可靠傳輸,只有在有線程與之匹配時(shí),才會進(jìn)行數(shù)據(jù)傳輸。
- 不支持 peek 操作:由于 SynchronousQueue 內(nèi)部沒有存儲元素,因此不能調(diào)用 peek 操作。只有在移除元素時(shí)才會有元素可供操作。
- 支持公平和非公平模式:SynchronousQueue 可以在構(gòu)造時(shí)指定為公平或非公平模式。在公平模式下,隊(duì)列會按照線程的到達(dá)順序進(jìn)行操作;而在非公平模式下,則不保證操作的順序。
SynchronousQueue 在多線程并發(fā)編程中常用于一些特定場景,例如生產(chǎn)者-消費(fèi)者模式中,用于傳輸數(shù)據(jù)的場景,以及一些任務(wù)執(zhí)行器中用于任務(wù)的傳遞等。其特殊的同步機(jī)制保證了線程之間數(shù)據(jù)的可靠傳輸和同步操作。
基于SynchronousQueue 一個(gè)小案例
/*** 理解SynchronousQueue* SynchronousQueue,實(shí)際上它不是一個(gè)真正的隊(duì)列,因?yàn)镾ynchronousQueue沒有容量。與其他BlockingQueue(阻塞隊(duì)列)不同,* SynchronousQueue是一個(gè)不存儲元素的BlockingQueue。只是它維護(hù)一組線程,這些線程在等待著把元素加入或移出隊(duì)列。* 我們簡單分為以下幾種特點(diǎn):* 內(nèi)部沒有存儲(容量為0)* 阻塞隊(duì)列(也是blockingqueue的一個(gè)實(shí)現(xiàn))* 發(fā)送或者消費(fèi)線程會阻塞,只有有一對消費(fèi)和發(fā)送線程匹配上,才同時(shí)退出。* (其中每個(gè)插入操作必須等待另一個(gè)線程的移除操作,同樣任何一個(gè)移除操作都等待另一個(gè)線程的插入操作。因此此隊(duì)列內(nèi)部其 實(shí)沒有任何一個(gè)元素,因此不能調(diào)用peek操作,因?yàn)橹挥幸瞥貢r(shí)才有元素。)* 配對有公平模式和非公平模式(默認(rèn))*/@Testpublic void test19() throws InterruptedException {SynchronousQueue<String> queue = new SynchronousQueue<>();// 我們通過線程內(nèi) 入隊(duì) 和 出隊(duì) 了解下 SynchronousQueue的特性new Thread(new Runnable() {@Overridepublic void run() {try {// 入隊(duì)logger.error("---- 喜羊羊進(jìn)鍋,沐浴~ ----");queue.put("喜羊羊!");logger.error("---- 懶羊羊進(jìn)鍋,沐浴~ ----");queue.put("懶羊羊!");logger.error("---- 美羊羊進(jìn)鍋,沐浴~ ----");queue.put("美羊羊!");} catch (InterruptedException e) {throw new RuntimeException(e);}}}, "t1").start();new Thread(new Runnable() {@Overridepublic void run() {try {// 入隊(duì)Thread.sleep(5000);String node1 = queue.take();logger.error("---- {} 出鍋~ ----", node1);Thread.sleep(10000);String node2 = queue.take();logger.error("---- {} 出鍋~ ----", node2);Thread.sleep(5000);String node3 = queue.take();logger.error("---- {} 出鍋~ ----", node3);} catch (InterruptedException e) {throw new RuntimeException(e);}}}, "t2").start();Thread.sleep(Integer.MAX_VALUE);}
// 根據(jù) 結(jié)果可以發(fā)現(xiàn), 三只羊同時(shí)進(jìn)鍋,但是一個(gè)鍋只能容納一只羊,所以只有一只羊能進(jìn)鍋,其他的羊只能等待,直到鍋里的羊出鍋,才能進(jìn)鍋// 也就是說,SynchronousQueue是一個(gè)不存儲元素的BlockingQueue。只是它維護(hù)一組線程,這些線程在等待著把元素加入或移出隊(duì)列。// 喜羊羊進(jìn)鍋,然后等待 5s后 喜羊羊出鍋,此時(shí)美羊羊開始進(jìn)鍋
了解這個(gè)阻塞隊(duì)列后,我們再來了解一下newCachedThreadPool這個(gè)線程池,還是通過一個(gè)案例來進(jìn)行了解一下具體用法
/*** newCachedThreadPool 創(chuàng)建可以緩存的線程池*/@Testpublic void test17() {// 創(chuàng)建可以緩存的線程池// 創(chuàng)建一個(gè)可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。// keepAliveTime為60S,意味著線程空閑時(shí)間超過60S就會被殺死(60L)ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newCachedThreadPool();// 這里使用兩個(gè)線程,異步執(zhí)行// 此處 羊羊線程是不休眠的,直接放入線程池new Thread(() -> {for (int i = 0; i < 2; i++) {int finalI = i;threadPoolExecutor.submit(() -> {printPoolInfo(threadPoolExecutor, "羊羊線程" + finalI + "-正在執(zhí)行");});}}, "任務(wù)線程-1").start();// 狼狼線程 此時(shí)第一個(gè)線程也是不會進(jìn)行阻塞,此時(shí)應(yīng)該是 三個(gè)線程 (兩個(gè)羊羊+一個(gè)狼狼 同時(shí)進(jìn)線程池)new Thread(() -> {for (int i = 0; i < 5; i++) {if (i == 1 || i == 2) {try {Thread.sleep(TimeUnit.SECONDS.toMillis(5));} catch (InterruptedException e) {throw new RuntimeException(e);}}int finalI = i;threadPoolExecutor.submit(() -> {printPoolInfo(threadPoolExecutor, "狼狼線程" + finalI + "-正在執(zhí)行");});}}, "任務(wù)線程-2").start();// 等待全部線程都執(zhí)行完畢waitPoolExecutedEnd(threadPoolExecutor);threadPoolExecutor.shutdown();}
這段代碼的主要邏輯如下
- 使用
Executors.newCachedThreadPool()
創(chuàng)建了一個(gè)可緩存的線程池threadPoolExecutor
。這種線程池的特點(diǎn)是,如果線程池長度超過當(dāng)前任務(wù)需求,它會靈活地回收空閑線程;若沒有可回收的線程,則會新建線程來處理任務(wù)。線程的空閑時(shí)間超過60秒后,就會被回收。 - 創(chuàng)建了兩個(gè)新的線程,分別用于提交任務(wù)到線程池中執(zhí)行。其中,一個(gè)線程負(fù)責(zé)提交羊羊線程,另一個(gè)線程負(fù)責(zé)提交狼狼線程。
- 羊羊線程的任務(wù)不會進(jìn)行阻塞,直接提交到線程池中執(zhí)行。
- 狼狼線程的任務(wù)中,當(dāng)
i
等于1或2時(shí),會進(jìn)行5秒的睡眠,模擬任務(wù)的耗時(shí)操作,然后再提交到線程池中執(zhí)行。 - 使用
waitPoolExecutedEnd(threadPoolExecutor)
方法等待線程池中的任務(wù)執(zhí)行完畢。 - 在所有任務(wù)執(zhí)行完畢后,調(diào)用
threadPoolExecutor.shutdown()
方法關(guān)閉線程池。
- 通過結(jié)果可以看到每個(gè)任務(wù)的執(zhí)行都是間隔5秒執(zhí)行一次。
- 線程池信息中的線程數(shù)量始終為3,這是因?yàn)?
Executors.newCachedThreadPool()
創(chuàng)建的是一個(gè)可緩存的線程池,其最大線程數(shù)量為 Integer.MAX_VALUE,因此當(dāng)任務(wù)提交到線程池時(shí),如果沒有空閑線程可用,則會新建線程來處理任務(wù),線程數(shù)量會一直增加,直到達(dá)到設(shè)定的最大值。 - 在任務(wù)提交時(shí),活躍線程數(shù)量始終為3,這是因?yàn)槊看翁峤蝗蝿?wù)時(shí),都有空閑線程可用,所以不需要新建線程,而是直接使用已存在的線程執(zhí)行任務(wù)。
- 狼狼線程的任務(wù)會進(jìn)行5秒的睡眠操作,模擬耗時(shí)操作,因此在執(zhí)行任務(wù)期間,線程池中的活躍線程數(shù)量會減少,直到任務(wù)執(zhí)行完畢后,線程池會繼續(xù)維持3個(gè)活躍線程數(shù)量。
- 在執(zhí)行完所有任務(wù)后,線程池并不會立即關(guān)閉,因?yàn)榫€程池是可緩存的,會等待一段時(shí)間后空閑線程自動被回收。
應(yīng)用場景:
- 短期任務(wù)處理:適用于處理大量短期任務(wù)的場景,因?yàn)樗軌蚋鶕?jù)需要?jiǎng)討B(tài)地創(chuàng)建線程,處理任務(wù),處理完畢后又自動回收線程,避免了線程過多占用資源的問題。
- 任務(wù)處理時(shí)間不確定:適用于任務(wù)處理時(shí)間不確定的場景,因?yàn)樗軌蚋鶕?jù)實(shí)際情況動態(tài)調(diào)整線程數(shù)量,保證任務(wù)能夠及時(shí)得到處理,提高系統(tǒng)的響應(yīng)速度。
- 需要快速響應(yīng)的任務(wù):適用于需要快速響應(yīng)的任務(wù),因?yàn)樗軌蚩焖俚貏?chuàng)建線程來處理任務(wù),縮短任務(wù)等待的時(shí)間,提高任務(wù)的處理效率。
- 任務(wù)負(fù)載波動大:適用于任務(wù)負(fù)載波動大的場景,因?yàn)樗軌蚋鶕?jù)負(fù)載情況動態(tài)調(diào)整線程數(shù)量,使系統(tǒng)能夠更好地適應(yīng)負(fù)載的變化。
缺點(diǎn):
- 線程數(shù)量不受限制:由于
newCachedThreadPool
的最大線程數(shù)量為 Integer.MAX_VALUE,因此在大量任務(wù)提交的情況下,可能會導(dǎo)致線程數(shù)量過多,占用大量系統(tǒng)資源,導(dǎo)致系統(tǒng)負(fù)載過高,甚至引發(fā)系統(tǒng)崩潰。 - 不適用于長時(shí)間任務(wù):由于它的線程數(shù)量不受限制,適用于處理短期任務(wù),但不適用于長時(shí)間任務(wù),因?yàn)殚L時(shí)間任務(wù)可能會導(dǎo)致線程數(shù)量過多,占用大量系統(tǒng)資源。
- 可能導(dǎo)致頻繁創(chuàng)建和銷毀線程:由于
newCachedThreadPool
是一個(gè)動態(tài)的線程池類型,可能會頻繁地創(chuàng)建和銷毀線程,這種線程的創(chuàng)建和銷毀操作會帶來一定的性能開銷。
綜上所述,newCachedThreadPool
適用于任務(wù)處理時(shí)間不確定、負(fù)載波動大、需要快速響應(yīng)的場景,但在大量長時(shí)間任務(wù)的情況下,需要慎重選擇以避免占用過多系統(tǒng)資源。
newScheduledThreadPool 創(chuàng)建可調(diào)度的線程池
項(xiàng)目中經(jīng)常會遇到一些非分布式的調(diào)度任務(wù),需要在未來的某個(gè)時(shí)刻周期性執(zhí)行。實(shí)現(xiàn)這樣的功能,我們有多種方式可以選擇:
- Timer類, jdk1.3引入,不推薦
- 它所有任務(wù)都是串行執(zhí)行的,同一時(shí)間只能有一個(gè)任務(wù)在執(zhí)行,而且前一個(gè)任務(wù)的延遲或異常都將會影響到之后的任務(wù)。
- Spring的@Scheduled注解,不是很推薦
- 這種方式底層雖然是用線程池實(shí)現(xiàn),但是有個(gè)最大的問題,所有的任務(wù)都使用的同一個(gè)線程池,可能會導(dǎo)致長周期的任務(wù)運(yùn)行影響短周期任務(wù)運(yùn)行,造成線程池"饑餓",更加推薦的做法是同種類型的任務(wù)使用同一個(gè)線程池。
- 自定義ScheduledThreadPoolExecutor實(shí)現(xiàn)調(diào)度任務(wù)
這也是下面重點(diǎn)講解的方式,通過自定義ScheduledThreadPoolExecutor調(diào)度線程池,提交調(diào)度任務(wù)才是最優(yōu)解。
newScheduledThreadPool
用于創(chuàng)建一個(gè)可調(diào)度的線程
,newScheduledThreadPool
是Executors
類提供的一個(gè)靜態(tài)方法方法如下
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
newScheduledThreadPool
方法返回一個(gè)ScheduledExecutorService
對象,它是ExecutorService
接口的子接口,提供了任務(wù)調(diào)度的能力。可調(diào)度的線程池可以用于延時(shí)執(zhí)行任務(wù)和周期性執(zhí)行任務(wù)。它可以根據(jù)需要自動創(chuàng)建和回收線程,并且可以在指定的延時(shí)時(shí)間后執(zhí)行任務(wù),或在指定的時(shí)間間隔內(nèi)重復(fù)執(zhí)行任務(wù)。
使用
newScheduledThreadPool
創(chuàng)建的可調(diào)度線程池有以下特點(diǎn):
corePoolSize
參數(shù)指定了線程池的核心線程數(shù),即線程池中同時(shí)執(zhí)行任務(wù)的最大線程數(shù)。- 當(dāng)任務(wù)的延時(shí)時(shí)間到達(dá)時(shí),線程池會創(chuàng)建新的線程來執(zhí)行任務(wù)。
- 如果線程池中的線程數(shù)量超過核心線程數(shù),空閑的線程會在指定的時(shí)間內(nèi)被回收。
- 可以使用
schedule
方法來延時(shí)執(zhí)行任務(wù),也可以使用scheduleAtFixedRate
方法或scheduleWithFixedDelay
方法來周期性執(zhí)行任務(wù)。使用示例:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(3); executor.schedule(() -> {// 延時(shí)執(zhí)行的任務(wù)邏輯 }, 5, TimeUnit.SECONDS);executor.scheduleAtFixedRate(() -> {// 周期性執(zhí)行的任務(wù)邏輯 }, 1, 3, TimeUnit.SECONDS);executor.scheduleWithFixedDelay(() -> {// 周期性執(zhí)行的任務(wù)邏輯 }, 2, 4, TimeUnit.SECONDS);
在示例中,
schedule
方法用于延時(shí)執(zhí)行任務(wù),它接受一個(gè)任務(wù)和延時(shí)時(shí)間,表示在指定的延時(shí)時(shí)間后執(zhí)行任務(wù)。
scheduleAtFixedRate
方法用于周期性執(zhí)行任務(wù),它接受一個(gè)任務(wù)、初始延時(shí)時(shí)間和周期時(shí)間,表示在初始延時(shí)時(shí)間后開始執(zhí)行任務(wù),并以指定的周期時(shí)間重復(fù)執(zhí)行任務(wù)。
scheduleWithFixedDelay
方法也用于周期性執(zhí)行任務(wù),它接受一個(gè)任務(wù)、初始延時(shí)時(shí)間和周期時(shí)間,表示在初始延時(shí)時(shí)間后開始執(zhí)行任務(wù),并在任務(wù)執(zhí)行完成后等待指定的周期時(shí)間,然后再執(zhí)行下一個(gè)任務(wù)。總之,
newScheduledThreadPool
方法用于創(chuàng)建一個(gè)可調(diào)度的線程池,可以用于延時(shí)執(zhí)行任務(wù)和周期性執(zhí)行任務(wù)。通過合理配置延時(shí)時(shí)間和周期時(shí)間,可以滿足不同場景下的任務(wù)調(diào)度需求。
下面我們通過一個(gè)案例來了解如何創(chuàng)建延時(shí)線程 和 定時(shí)線程
@Testpublic void test20() {// 使用Executors.newScheduledThreadPool 創(chuàng)建線程池ScheduledThreadPoolExecutor scheduledThreadPool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(5, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {// 對線程名稱進(jìn)行自定義return new Thread(r, "my-scheduled-job-" + r.hashCode());}});// 準(zhǔn)備工作DelayedThread delayedThread = new DelayedThread();LoopThread loopThread = new LoopThread();// 執(zhí)行延時(shí)線程 (延時(shí) 10s開始執(zhí)行 )logger.error("延時(shí)線程工作準(zhǔn)備結(jié)束!");scheduledThreadPool.schedule(delayedThread, 10, TimeUnit.SECONDS);// 執(zhí)行循環(huán)線程// scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)// command: 執(zhí)行的任務(wù)// initialDelay: 初始延遲的時(shí)間// delay: 上次執(zhí)行結(jié)束,延遲多久執(zhí)行// unit:單位logger.error("循環(huán)線程工作準(zhǔn)備結(jié)束!");scheduledThreadPool.scheduleAtFixedRate(loopThread, 10, 5, TimeUnit.SECONDS);waitPoolExecutedEnd(scheduledThreadPool);scheduledThreadPool.shutdown();}static class DelayedThread implements Runnable {@Overridepublic void run() {logger.error("延時(shí)線程正在開始執(zhí)行~~");}}static class LoopThread implements Runnable {@Overridepublic void run() {logger.error("循環(huán)線程開始執(zhí)行~~");}}
首先,通過Executors.newScheduledThreadPool
方法創(chuàng)建了一個(gè)可調(diào)度的線程池ScheduledThreadPoolExecutor
,并指定了線程池的核心線程數(shù)為5。同時(shí),通過自定義的ThreadFactory
來創(chuàng)建線程,并給線程指定了自定義的名稱。
接下來,定義了兩個(gè)任務(wù)類DelayedThread
和LoopThread
,分別實(shí)現(xiàn)Runnable
接口。
在測試方法中,首先創(chuàng)建了DelayedThread
和LoopThread
的實(shí)例。
然后,在測試方法中,首先創(chuàng)建了DelayedThread
和LoopThread
的實(shí)例。
然后,通過調(diào)用scheduledThreadPool.schedule
方法,將DelayedThread
任務(wù)提交給線程池,并指定延時(shí)時(shí)間為10秒。這意味著DelayedThread
任務(wù)將在10秒后執(zhí)行。
接著,通過調(diào)用scheduledThreadPool.scheduleAtFixedRate
方法,將LoopThread
任務(wù)提交給線程池,并指定初始延時(shí)時(shí)間為10秒、周期時(shí)間為5秒。這意味著LoopThread
任務(wù)將在初始延時(shí)時(shí)間后開始執(zhí)行,并且每隔5秒重復(fù)執(zhí)行一次。
最后,調(diào)用waitPoolExecutedEnd
方法等待線程池中的任務(wù)執(zhí)行完畢,并調(diào)用線程池的shutdown
方法關(guān)閉線程池。
總結(jié)起來,這段代碼演示了使用Executors.newScheduledThreadPool
創(chuàng)建可調(diào)度的線程池,并展示了延時(shí)執(zhí)行和周期性執(zhí)行的例子。通過合理配置延時(shí)時(shí)間和周期時(shí)間,可以實(shí)現(xiàn)在指定的時(shí)間點(diǎn)或時(shí)間間隔內(nèi)執(zhí)行任務(wù)。
newWorkStealingPool 創(chuàng)建一個(gè)可竊取任務(wù)的線程池
newWorkStealingPool(int parallelism)
方法用于創(chuàng)建一個(gè)工作竊取線程池。工作竊取線程池是一種特殊的線程池,它根據(jù)一定的調(diào)度策略執(zhí)行任務(wù)。除了執(zhí)行任務(wù)外,工作竊取線程池還可以按照指定的延遲時(shí)間或周期性地執(zhí)行任務(wù)。工作竊取線程池最早由美國計(jì)算機(jī)科學(xué)家 Charles E. Leiserson 和 John C. Bains 發(fā)明,他們在 1994 年的論文《Scheduling Multithreaded Computations by Work Stealing》中首次提出了這一概念。
工作竊取線程池的設(shè)計(jì)初衷是為了解決并行計(jì)算中獨(dú)立任務(wù)的負(fù)載均衡問題。在并行計(jì)算中,通常存在大量的獨(dú)立任務(wù)需要并行執(zhí)行,而這些獨(dú)立任務(wù)的執(zhí)行時(shí)間往往不一致。如果簡單地將任務(wù)平均分配給每個(gè)線程,那些執(zhí)行時(shí)間較短的任務(wù)將會導(dǎo)致線程空閑,而執(zhí)行時(shí)間較長的任務(wù)則可能導(dǎo)致線程被阻塞,從而降低整體的執(zhí)行效率。
為了解決這個(gè)問題,工作竊取線程池引入了工作竊取算法。該算法允許空閑線程從其他線程的任務(wù)隊(duì)列末尾竊取任務(wù)來執(zhí)行,以實(shí)現(xiàn)負(fù)載均衡。每個(gè)線程都維護(hù)一個(gè)自己的任務(wù)隊(duì)列,當(dāng)線程自己的任務(wù)執(zhí)行完畢后,它會嘗試從其他線程的任務(wù)隊(duì)列末尾竊取任務(wù)執(zhí)行。這樣,任務(wù)的分配和執(zhí)行可以更加均衡,避免線程之間出現(xiàn)明顯的負(fù)載不均衡。
我們了解 newWorkStealingPool 先來了解一下Fork/Join
Fork/Join框架是Java提供的一種并行執(zhí)行任務(wù)的框架,它基于工作竊取算法實(shí)現(xiàn)任務(wù)的自動調(diào)度和負(fù)載均衡。在Fork/Join框架中,工作竊取線程池是其中的核心組件。
Fork/Join框架的工作原理如下:
- 每個(gè)任務(wù)被劃分為更小的子任務(wù),這個(gè)過程通常被稱為"fork"。
- 當(dāng)一個(gè)線程執(zhí)行"fork"操作時(shí),它會將子任務(wù)放入自己的工作隊(duì)列中。
- 當(dāng)一個(gè)線程完成自己的任務(wù)后,它會從其他線程的工作隊(duì)列中"steal"(竊取)任務(wù)來執(zhí)行。
- 竊取的任務(wù)通常是其他線程工作隊(duì)列的末尾的任務(wù),這樣可以減少線程之間的競爭。
工作竊取線程池在Fork/Join框架中的應(yīng)用主要體現(xiàn)在以下幾個(gè)方面:
- 任務(wù)分割: 在Fork/Join框架中,任務(wù)被遞歸地分割成更小的子任務(wù),直到達(dá)到某個(gè)終止條件。工作竊取線程池中的線程負(fù)責(zé)執(zhí)行這些任務(wù)。
每個(gè)線程都有自己的任務(wù)隊(duì)列
,當(dāng)一個(gè)線程執(zhí)行完自己的任務(wù)后,會從自己的隊(duì)列中獲取新的任務(wù)來執(zhí)行。 - 負(fù)載均衡: 工作竊取線程池通過
工作竊取算法實(shí)現(xiàn)負(fù)載均衡
。當(dāng)一個(gè)線程的任務(wù)隊(duì)列為空時(shí),它會從其他線程的任務(wù)隊(duì)列中竊取任務(wù)來執(zhí)行,以保持各個(gè)線程的工作量相對均衡。這種負(fù)載均衡策略可以避免線程之間出現(xiàn)明顯的負(fù)載不均衡,提高整體的執(zhí)行效率。 - 遞歸任務(wù)執(zhí)行: Fork/Join框架中的任務(wù)通常是
遞歸執(zhí)行的
。當(dāng)一個(gè)任務(wù)被分割成多個(gè)子任務(wù)時(shí),每個(gè)子任務(wù)會被提交到工作竊取線程池中執(zhí)行
。如果子任務(wù)還可以進(jìn)一步分割,線程會繼續(xù)執(zhí)行這個(gè)過程,直到任務(wù)不能再分割為止。這種遞歸的任務(wù)執(zhí)行方式能夠充分利用線程池中的線程資源,提高并行任務(wù)的執(zhí)行效率。 - Join操作: 在Fork/Join框架中,
一個(gè)任務(wù)可以等待其子任務(wù)執(zhí)行完成后再繼續(xù)執(zhí)行
,這個(gè)操作被稱為"join"。工作竊取線程池在執(zhí)行任務(wù)時(shí)會自動進(jìn)行join操作,確保任務(wù)的執(zhí)行順序滿足依賴關(guān)系。這樣可以避免線程之間的競爭和沖突,保證任務(wù)的正確性。
總的來說,工作竊取線程池在Fork/Join框架中扮演著重要的角色。它通過工作竊取算法和負(fù)載均衡策略,實(shí)現(xiàn)了并行任務(wù)的自動調(diào)度和執(zhí)行。通過遞歸任務(wù)執(zhí)行和join操作,工作竊取線程池能夠高效地處理大量的并行任務(wù),并充分利用系統(tǒng)的并行計(jì)算能力。
newWorkStealingPool
是一個(gè)創(chuàng)建工作竊取線程池的方法,它使用了ForkJoinPool,并根據(jù)CPU核心數(shù)動態(tài)調(diào)整線程數(shù)量。這種線程池適用于CPU密集型的任務(wù)。
與其他四種線程池不同,newWorkStealingPool
使用了ForkJoinPool。它的優(yōu)勢在于將一個(gè)任務(wù)拆分成多個(gè)小任務(wù),并將這些小任務(wù)分發(fā)給多個(gè)線程并行執(zhí)行。當(dāng)所有小任務(wù)都執(zhí)行完成后,再將它們的結(jié)果合并。
相較于之前的線程池,newWorkStealingPool
中的每個(gè)線程都擁有自己的任務(wù)隊(duì)列,而不是多個(gè)線程共享一個(gè)阻塞隊(duì)列。
當(dāng)一個(gè)線程發(fā)現(xiàn)自己的任務(wù)隊(duì)列為空時(shí),它會去其他線程的隊(duì)列中竊取任務(wù)來執(zhí)行。可以將這個(gè)過程簡單理解為"竊取"。為了降低沖突,一般情況下,自己的本地隊(duì)列采用后進(jìn)先出(LIFO)的順序,而竊取時(shí)則采用先進(jìn)先出(FIFO)的順序。由于竊取的動作非??焖?#xff0c;這種沖突會大大降低,從而提高了性能。這也是一種優(yōu)化方式。
下面我們通過一個(gè)案例 來了解一下 newWorkStealingPool
@Testpublic void test21() {// 返回可用的計(jì)算資源int core = Runtime.getRuntime().availableProcessors();logger.error("cpu 可以計(jì)算機(jī)資源 :{}", core);// 無參數(shù)的話,會根據(jù)cpu當(dāng)前核心數(shù)據(jù) 動態(tài)分配// ExecutorService executorService = Executors.newWorkStealingPool();// 當(dāng)傳入?yún)?shù),就可以指定cpu的一個(gè)并行數(shù)量ForkJoinPool forkJoinPool = (ForkJoinPool) Executors.newWorkStealingPool(8);for (int i = 1; i <= core * 2; i++) {WorkStealThread workStealThread = new WorkStealThread(i, forkJoinPool);forkJoinPool.submit(workStealThread);}// 優(yōu)雅關(guān)閉forkJoinPool.shutdown();// 為了防止 主線程 沒有完全執(zhí)行結(jié)束try {Thread.sleep(Integer.MAX_VALUE);} catch (InterruptedException e) {throw new RuntimeException(e);}}static class WorkStealThread implements Runnable {private int i;private ForkJoinPool forkJoinPool;WorkStealThread() {}WorkStealThread(int i, ForkJoinPool forkJoinPool) {this.i = i;this.forkJoinPool = forkJoinPool;}@Overridepublic void run() {try {// 隨機(jī)休眠Thread.sleep(TimeUnit.SECONDS.toMillis(new Random().nextInt(10)));} catch (InterruptedException e) {throw new RuntimeException(e);}logger.error("工作竊取線程-{},線程池的大小:[{}],活動線程數(shù):[{}],總?cè)蝿?wù)竊取次數(shù):[{}]",i, forkJoinPool.getPoolSize(), forkJoinPool.getActiveThreadCount(), forkJoinPool.getStealCount());}}
當(dāng)代碼運(yùn)行時(shí),首先通過Runtime.getRuntime().availableProcessors()
獲取可用的計(jì)算資源,即CPU核心數(shù),并將其記錄在日志中。
接下來,使用Executors.newWorkStealingPool(8)
創(chuàng)建一個(gè)工作竊取線程池,其中參數(shù)8表示線程池的并行度,即同時(shí)執(zhí)行的線程數(shù)。這個(gè)線程池是基于ForkJoinPool
實(shí)現(xiàn)的,具有任務(wù)竊取的特性。
然后,通過一個(gè)循環(huán)將一些工作竊取線程提交到線程池中進(jìn)行并行執(zhí)行。每個(gè)工作竊取線程都有一個(gè)編號,從1到核心數(shù)的兩倍。這些線程使用WorkStealThread
類實(shí)現(xiàn)了Runnable
接口。
WorkStealThread
類中的run
方法定義了線程的執(zhí)行邏輯。首先,線程會隨機(jī)休眠一段時(shí)間,模擬執(zhí)行一些耗時(shí)的任務(wù)。然后,它會輸出一些關(guān)于線程池狀態(tài)的信息,包括線程池的大小、活動線程數(shù)和總?cè)蝿?wù)竊取次數(shù)。這些信息會記錄在日志中。
最后,線程池會被優(yōu)雅地關(guān)閉,確保所有任務(wù)都能執(zhí)行完畢。為了防止主線程提前結(jié)束,使用Thread.sleep(Integer.MAX_VALUE)
使主線程休眠,直到被中斷或拋出異常。
通過以上代碼,我們可以了解到如何使用newWorkStealingPool
方法創(chuàng)建工作竊取線程池,并通過工作竊取線程實(shí)現(xiàn)并行執(zhí)行。同時(shí),通過獲取線程池的狀態(tài)信息,我們可以了解線程池的工作情況,包括活動線程數(shù)和任務(wù)竊取次數(shù)。
通過觀察結(jié)果,我們可以發(fā)現(xiàn),當(dāng)前線程執(zhí)行完畢,如果空閑的話,會去執(zhí)行別的線程任務(wù)