中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

網(wǎng)站做的好的公司名稱泉州百度關(guān)鍵詞優(yōu)化

網(wǎng)站做的好的公司名稱,泉州百度關(guān)鍵詞優(yōu)化,寧波高端網(wǎng)站設(shè)計(jì)公司,wordpress底部欄如何編輯Executors快速創(chuàng)建線程池的方法 Java通過Executors 工廠提供了5種創(chuàng)建線程池的方法,具體方法如下 方法名描述newSingleThreadExecutor()創(chuàng)建一個(gè)單線程的線程池,該線程池中只有一個(gè)工作線程。所有任務(wù)按照提交的順序依次執(zhí)行,保證任務(wù)的順序性…

Executors快速創(chuàng)建線程池的方法

Java通過Executors 工廠提供了5種創(chuàng)建線程池的方法,具體方法如下

方法名描述
newSingleThreadExecutor()創(chuàng)建一個(gè)單線程的線程池,該線程池中只有一個(gè)工作線程。所有任務(wù)按照提交的順序依次執(zhí)行,保證任務(wù)的順序性。當(dāng)工作線程意外終止時(shí),會創(chuàng)建一個(gè)新的線程來替代它。適用于需要順序執(zhí)行任務(wù)且保證任務(wù)安全性的場景。
newFixedThreadPool(int nThreads)創(chuàng)建一個(gè)固定大小的線程池,該線程池中的線程數(shù)量固定為指定的數(shù)量。當(dāng)有新任務(wù)提交時(shí),如果線程池中有空閑線程,則立即使用空閑線程執(zhí)行任務(wù);如果沒有空閑線程,則任務(wù)將被放入任務(wù)隊(duì)列等待執(zhí)行。
newCachedThreadPool()創(chuàng)建一個(gè)緩存線程池,該線程池中的線程數(shù)量不固定,可以根據(jù)任務(wù)的需求動態(tài)調(diào)整線程數(shù)量??臻e線程會被保留一段時(shí)間,如果在保留時(shí)間內(nèi)沒有任務(wù)執(zhí)行,則這些線程將被終止并從線程池中刪除。適用于執(zhí)行大量短期任務(wù)的場景
newScheduledThreadPool(int corePoolSize)創(chuàng)建一個(gè)可調(diào)度的線程池,該線程池能夠按照一定的調(diào)度策略執(zhí)行任務(wù)。除了執(zhí)行任務(wù)外,還可以按照指定的延遲時(shí)間或周期性地執(zhí)行任務(wù)。適用于需要按照計(jì)劃執(zhí)行任務(wù)、定時(shí)任務(wù)或周期性任務(wù)的場景。
newWorkStealingPool(int parallelism) newWorkStealingPool(int parallelism)方法用于創(chuàng)建一個(gè)工作竊取線程池。工作竊取線程池是一種特殊的線程池,它根據(jù)一定的調(diào)度策略執(zhí)行任務(wù)。除了執(zhí)行任務(wù)外,工作竊取線程池還可以按照指定的延遲時(shí)間或周期性地執(zhí)行任務(wù)。

ThreadFactory

在學(xué)習(xí)Executor創(chuàng)建線程池之前,我們先來學(xué)習(xí)一下

ThreadFactory是一個(gè)接口,用于創(chuàng)建線程對象的工廠。它定義了一個(gè)方法newThread,用于創(chuàng)建新的線程。

在Java中,線程的創(chuàng)建通常通過Thread類的構(gòu)造函數(shù)進(jìn)行,但是使用ThreadFactory可以將線程的創(chuàng)建過程與線程的執(zhí)行邏輯分離開來。通過自定義的ThreadFactory,我們可以對線程進(jìn)行更加靈活的配置和管理,例如指定線程名稱、設(shè)置線程優(yōu)先級、設(shè)置線程是否為守護(hù)線程等。

ThreadFactory接口只有一個(gè)方法:

Thread newThread(Runnable runnable);

該方法接受一個(gè)Runnable對象作為參數(shù),并返回一個(gè)新的Thread對象。

一般情況下,我們可以通過實(shí)現(xiàn)ThreadFactory接口來自定義線程的創(chuàng)建。以下是一個(gè)示例的自定義ThreadFactory實(shí)現(xiàn):

public class MyThreadFactory implements ThreadFactory {// 自定義線程的名稱private final String namePrefix = "test-async-thread";private final AtomicInteger threadNumber = new AtomicInteger(1);public MyThreadFactory(String namePrefix) {this.namePrefix = namePrefix;}@Overridepublic Thread newThread(Runnable runnable) {Thread thread = new Thread(runnable);thread.setName(namePrefix + "-" + threadNumber.getAndIncrement());thread.setPriority(Thread.NORM_PRIORITY);thread.setDaemon(false);return thread;}
}

在上面的示例中,MyThreadFactory實(shí)現(xiàn)了ThreadFactory接口,并通過構(gòu)造函數(shù)傳入一個(gè)namePrefix參數(shù),用于指定線程的名稱前綴。

newThread方法中,首先創(chuàng)建一個(gè)新的Thread對象,并設(shè)置線程的名稱為namePrefix加上一個(gè)遞增的數(shù)字。然后,可以根據(jù)需要設(shè)置線程的優(yōu)先級、是否為守護(hù)線程等屬性。

newSingleThreadExecutor() 創(chuàng)建單線程化線程池

該方法用于創(chuàng)建一個(gè)單線程化的線程池,也就是只有一個(gè)線程的線程池。該線程池中只有一個(gè)工作線程,它負(fù)責(zé)按照任務(wù)的提交順序依次執(zhí)行任務(wù)。當(dāng)有任務(wù)提交時(shí),會創(chuàng)建一個(gè)新的線程來執(zhí)行任務(wù)。如果工作線程意外終止,線程池會創(chuàng)建一個(gè)新的線程來替代它,確保線程池中始終有一個(gè)可用的線程。

newSingleThreadExecutor()方法返回的線程池實(shí)例實(shí)現(xiàn)了ExecutorService接口,因此可以使用submit()方法提交任務(wù)并獲取Future對象,或使用execute()方法提交任務(wù)。

該線程池適用于需要順序執(zhí)行任務(wù)且保證任務(wù)之間不會發(fā)生并發(fā)沖突的場景。由于只有一個(gè)工作線程,所以不存在線程間的競爭和并發(fā)問題,可以確保任務(wù)的安全性。

此外,newSingleThreadExecutor()方法創(chuàng)建的線程池還可以用于任務(wù)的異常處理。當(dāng)任務(wù)拋出異常時(shí),線程池會捕獲異常并記錄或處理異常,避免異常導(dǎo)致整個(gè)應(yīng)用程序崩潰。

需要注意的是,由于該線程池只有一個(gè)線程,如果任務(wù)執(zhí)行時(shí)間過長任務(wù)量過大可能會導(dǎo)致任務(wù)隊(duì)列堆積,造成應(yīng)用程序的性能問題。所以在使用該線程池時(shí),需要根據(jù)任務(wù)的特性和需求進(jìn)行適當(dāng)?shù)脑u估和調(diào)優(yōu)。

下面我們使用Executors中newSingleThreadExecutor()方法創(chuàng)建一個(gè)單線程線程池

	/*** 這里創(chuàng)建的線程是 是Executors.newSingleThreadExecutor() 一樣 保證只有一個(gè)線程來進(jìn)行執(zhí)行 并且按照提交的順序進(jìn)行執(zhí)行* <pre>*     {@code*  public static ExecutorService newSingleThreadExecutor() {* 			return new Executors.FinalizableDelegatedExecutorService (* 		    	new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));*          }*     }* </pre>*/@Testpublic void test10() {// 如果有多個(gè)任務(wù)提交到線程池中,那么這個(gè)線程池中的線程會依次執(zhí)行任務(wù) 和ExecutorService executorService = Executors.newSingleThreadExecutor();// 創(chuàng)建線程 1Thread t1 = new Thread(() -> {logger.error("t1 ----> 開始執(zhí)行了~");}, "t1");// 創(chuàng)建線程 2Thread t2 = new Thread(() -> {logger.error("t2 ----> 開始執(zhí)行了~");}, "t2");// 創(chuàng)建線程 3Thread t3 = new Thread(() -> {logger.error("t3 ----> 開始執(zhí)行了~");}, "t3");// Executor 這個(gè)接口定義的功能很有限,同時(shí)也只支持 Runnale 形式的異步任務(wù)// 向線程池提交任務(wù)executorService.submit(t1);executorService.submit(t2);executorService.submit(t3);// 關(guān)閉線程池executorService.shutdown();}

首先,通過Executors.newSingleThreadExecutor();創(chuàng)建了一個(gè)單線程化的線程池。

然后,創(chuàng)建了三個(gè)線程t1、t2和t3,分別用于執(zhí)行不同的任務(wù)。

接著,通過executorService.submit(t1)將t1線程提交到線程池中進(jìn)行執(zhí)行。同樣地,也將t2和t3線程提交到線程池中。

最后,通過executorService.shutdown()關(guān)閉線程池。

執(zhí)行時(shí),可以觀察到日志輸出的順序。由于線程池中只有一個(gè)線程,所以任務(wù)會依次按照提交的順序進(jìn)行執(zhí)行。

需要注意的是,通過線程池執(zhí)行任務(wù)后,線程的名稱不再是我們自定義的線程名稱,而是線程池的名稱(如pool-2-thread-1)。這是因?yàn)榫唧w的執(zhí)行任務(wù)是交給線程池來管理和執(zhí)行的。

從輸出中我們可以看出,該線程池有以下特點(diǎn):

  • 單線程化的線程池中的任務(wù),都是按照提交的順序來進(jìn)行執(zhí)行的。

  • 該線程池中的唯一線程存活時(shí)間是無限的

  • 當(dāng)線程池中唯一的線程正在繁忙時(shí),新提交的任務(wù)會進(jìn)入到其內(nèi)部的阻塞隊(duì)列中,而且阻塞隊(duì)列的容量是無限的

  • // 這是   newSingleThreadExecutor 一個(gè)無參的構(gòu)造方法  
    public static ExecutorService newSingleThreadExecutor() {// 創(chuàng)建一個(gè)FinalizableDelegatedExecutorService實(shí)例,該實(shí)例是ExecutorService接口的一個(gè)包裝類// 將上面創(chuàng)建的ThreadPoolExecutor實(shí)例作為參數(shù)傳入// 這樣就得到了一個(gè)單線程化的線程池return new FinalizableDelegatedExecutorService(   // 創(chuàng)建一個(gè)ThreadPoolExecutor實(shí)例,指定參數(shù)如下:// corePoolSize: 1,線程池中核心線程的數(shù)量為1// maximumPoolSize: 1,線程池中最大線程的數(shù)量為1// keepAliveTime: 0L,空閑線程的存活時(shí)間為0毫秒,即空閑線程立即被回收// unit: TimeUnit.MILLISECONDS,存活時(shí)間的時(shí)間單位是毫秒// workQueue: new LinkedBlockingQueue<Runnable>(),使用無界阻塞隊(duì)列作為任務(wù)隊(duì)列new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}// 其中 阻塞隊(duì)列大小,如果不傳容量,默認(rèn)是整形的最大值
    public LinkedBlockingQueue() {this(Integer.MAX_VALUE);
    }
    
  • 總體來說:ewSingleThreadExecutor();適用于按照任務(wù)提交次序,一個(gè)接一個(gè)的執(zhí)行場景

image-20240313074456452

newFixedThreadPool 創(chuàng)建固定數(shù)量的線程池

該方法用于創(chuàng)建一個(gè)固定數(shù)量的線程池,其中唯一的參數(shù)是用于設(shè)置線程池中線程的數(shù)量

newFixedThreadPoolExecutors類提供的一個(gè)靜態(tài)方法,用于創(chuàng)建一個(gè)固定大小的線程池。

方法具體如下

public static ExecutorService newFixedThreadPool(int nThreads)

參數(shù)nThreads表示線程池中的線程數(shù)量,即固定的線程數(shù)量。線程池中的線程數(shù)不會根據(jù)任務(wù)的多少進(jìn)行動態(tài)調(diào)整,即使有空閑線程也不會銷毀,除非調(diào)用了線程池的shutdown方法。

newFixedThreadPool方法返回一個(gè)ExecutorService對象,它是Executor接口的子接口,提供了更加豐富的任務(wù)提交和管理方法。

通過創(chuàng)建固定大小的線程池,可以在任務(wù)并發(fā)量較高且預(yù)期的任務(wù)數(shù)量固定的情況下,提供一定程度的線程復(fù)用和線程調(diào)度控制。線程池會根據(jù)固定的線程數(shù)量來創(chuàng)建對應(yīng)數(shù)量的線程,并將任務(wù)分配給這些線程進(jìn)行執(zhí)行。

線程池的工作原理如下:

  • 當(dāng)有任務(wù)提交到線程池時(shí),線程池中的某個(gè)線程會被喚醒來執(zhí)行任務(wù)。
  • 如果所有線程都在執(zhí)行任務(wù),新的任務(wù)會被放入一個(gè)任務(wù)隊(duì)列中等待執(zhí)行。
  • 當(dāng)任務(wù)隊(duì)列已滿時(shí),線程池會根據(jù)配置的拒絕策略來處理無法執(zhí)行的任務(wù)。

需要注意的是,由于線程池的大小是固定的,如果任務(wù)數(shù)量超過線程池的容量,任務(wù)會在任務(wù)隊(duì)列中等待執(zhí)行。這可能會導(dǎo)致任務(wù)等待時(shí)間增加或任務(wù)堆積,進(jìn)而影響系統(tǒng)的響應(yīng)性能。因此,在選擇線程池大小時(shí),需要根據(jù)系統(tǒng)的負(fù)載情況和任務(wù)特點(diǎn)進(jìn)行合理的配置。

下面我們通過代碼來了解一下 newFixedThreadPool

@Testpublic void test18() {ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);executor.submit(new Runnable() {@Overridepublic void run() {// 隨機(jī)睡覺timeSleep();printPoolInfo(executor, "*[耗時(shí)線程-1]");}});executor.submit(new Runnable() {@Overridepublic void run() {// 隨機(jī)睡覺timeSleep();printPoolInfo(executor, "*[耗時(shí)線程-2]");}});executor.submit(new Runnable() {@Overridepublic void run() {timeSleep();printPoolInfo(executor, "*[耗時(shí)線程-3]");}});for (int i = 0; i < 5; i++) {int finalI = i;executor.submit(new Runnable() {@Overridepublic void run() {printPoolInfo(executor, "普通線程");}});}// 優(yōu)雅關(guān)閉線程池executor.shutdown();waitPoolExecutedEnd(executor);}// 這個(gè)方法用于等待線程全部執(zhí)行結(jié)束public void waitPoolExecutedEnd(ThreadPoolExecutor executor) {// 確保主線程完全等待子線程執(zhí)行完畢try {// 等待線程池中的任務(wù)執(zhí)行完畢,最多等待1天if (!executor.awaitTermination(5, TimeUnit.MINUTES)) {logger.error("線程池中的任務(wù)還未全部執(zhí)行完畢~");}} catch (InterruptedException e) {logger.error("等待線程池中的任務(wù)被中斷~");}}

這段代碼創(chuàng)建了一個(gè)固定大小為2的線程池 executor,并向其中提交了多個(gè)任務(wù)。其中:

  1. 通過 Executors.newFixedThreadPool(2) 創(chuàng)建了一個(gè)固定大小為2的線程池。
  2. 向線程池中提交了多個(gè)任務(wù),包括兩個(gè)耗時(shí)任務(wù)和五個(gè)普通任務(wù)。
  3. 每個(gè)耗時(shí)任務(wù)都會執(zhí)行 timeSleep() 方法進(jìn)行一段隨機(jī)時(shí)間的睡眠,然后執(zhí)行 printPoolInfo() 方法輸出當(dāng)前線程池的信息。
  4. 普通任務(wù)直接執(zhí)行 printPoolInfo() 方法輸出當(dāng)前線程池的信息。
  5. 在所有任務(wù)提交完畢后,調(diào)用 executor.shutdown() 方法優(yōu)雅地關(guān)閉線程池,并調(diào)用 waitPoolExecutedEnd(executor) 方法等待線程池中的任務(wù)執(zhí)行完畢。

image-20240313210333249

根據(jù)輸出的結(jié)果 我們可以觀察到

  • 當(dāng)?shù)谝粋€(gè)耗時(shí)任務(wù)執(zhí)行時(shí),線程池中有2個(gè)核心線程在工作。
  • 當(dāng)?shù)谌齻€(gè)任務(wù)執(zhí)行時(shí),線程池中的任務(wù)數(shù)量已經(jīng)達(dá)到5個(gè),但是活躍線程數(shù)量仍然為2,說明任務(wù)正在等待空閑線程來執(zhí)行。
  • 所有任務(wù)執(zhí)行完成后,線程池中的線程數(shù)量仍然保持為2,這是因?yàn)榫€程池是一個(gè)固定大小的線程池。
  • 在任務(wù)執(zhí)行的過程中,線程池的核心線程數(shù)量一直為2,線程數(shù)量也一直保持在2個(gè),因?yàn)榫€程池的大小是固定的。

固定大小線程池適用于以下場景:

  1. 資源受限場景:當(dāng)系統(tǒng)資源受限,無法創(chuàng)建過多線程時(shí),固定大小線程池能夠限制線程數(shù)量,防止系統(tǒng)資源被耗盡。
  2. 穩(wěn)定的任務(wù)處理:適用于穩(wěn)定的任務(wù)處理場景,例如批量數(shù)據(jù)處理、定時(shí)任務(wù)等,因?yàn)楣潭ù笮【€程池能夠保持一定數(shù)量的線程長時(shí)間運(yùn)行,避免線程的頻繁創(chuàng)建和銷毀。
  3. 控制并發(fā)數(shù)量:在需要限制并發(fā)數(shù)量的場景下,固定大小線程池能夠控制同時(shí)執(zhí)行的任務(wù)數(shù)量,防止系統(tǒng)過載。
  4. 避免資源競爭:固定大小線程池可以避免多個(gè)任務(wù)爭奪系統(tǒng)資源而導(dǎo)致的競爭和性能下降。
  5. 穩(wěn)定的性能預(yù)測:在需要穩(wěn)定的性能預(yù)測下,固定大小線程池能夠提供一致的性能表現(xiàn),因?yàn)榫€程數(shù)量是固定的,可以更好地進(jìn)行性能測試和預(yù)測。

newCachedThreadPool 創(chuàng)建可以緩存的線程池

newCachedThreadPool 用于創(chuàng)建一個(gè)可以緩存的線程池,如果線程內(nèi)的某些線程無事可干,那么就會成為空線程,可緩存線程池,可以靈活回收這些空閑線程

newCachedThreadPoolExecutors類提供的一個(gè)靜態(tài)方法,用于創(chuàng)建一個(gè)緩存型線程池。

方法定義如下

public static ExecutorService newCachedThreadPool()

newCachedThreadPool方法返回一個(gè)ExecutorService對象,它是Executor接口的子接口,提供了更加豐富的任務(wù)提交和管理方法。

緩存型線程池會根據(jù)需要自動創(chuàng)建和回收線程,線程池的大小可以根據(jù)任務(wù)的數(shù)量自動調(diào)整。如果當(dāng)前沒有可用的空閑線程,會創(chuàng)建新的線程來執(zhí)行任務(wù);如果有空閑線程并且它們在指定的時(shí)間內(nèi)沒有執(zhí)行任務(wù),那么這些空閑線程將會被回收。

使用緩存型線程池的優(yōu)點(diǎn)是可以根據(jù)任務(wù)的數(shù)量動態(tài)調(diào)整線程池的大小,以適應(yīng)不同的負(fù)載情況。當(dāng)任務(wù)數(shù)量較少時(shí),線程池會減少線程的數(shù)量以節(jié)省資源;當(dāng)任務(wù)數(shù)量增加時(shí),線程池會增加線程的數(shù)量以提高并發(fā)性。

需要注意的是,由于緩存型線程池的大小是不限制的,它可能會創(chuàng)建大量的線程,如果任務(wù)的提交速度超過了線程執(zhí)行任務(wù)的速度,可能會導(dǎo)致系統(tǒng)資源消耗過多,甚至造成系統(tǒng)崩潰。因此,在使用緩存型線程池時(shí),需要根據(jù)任務(wù)特點(diǎn)和系統(tǒng)資源情況進(jìn)行合理的配置。

下面我們通過一個(gè)案例,來了解一下newCachedThreadPool,但是了解newCachedThreadPool之前,我們先來熟悉一個(gè)阻塞隊(duì)列,這個(gè)會在后面的阻塞隊(duì)列專題中詳細(xì)介紹,這里只是作為了解

SynchronousQueue
  1. 無內(nèi)部存儲容量:與其他阻塞隊(duì)列不同,SynchronousQueue 不存儲元素,其容量為零。換句話說,它是一個(gè)零容量的隊(duì)列,用于在線程之間同步傳輸數(shù)據(jù)。
  2. 阻塞隊(duì)列:作為 BlockingQueue 接口的一個(gè)實(shí)現(xiàn),SynchronousQueue 提供了阻塞操作,允許線程在隊(duì)列的插入和移除操作上進(jìn)行阻塞等待。
  3. 匹配插入和移除操作:在 SynchronousQueue 中,每個(gè)插入操作必須等待另一個(gè)線程的移除操作,反之亦然。換句話說,發(fā)送線程必須等待接收線程,而接收線程也必須等待發(fā)送線程,才能夠完成操作。這樣的特性保證了數(shù)據(jù)的可靠傳輸,只有在有線程與之匹配時(shí),才會進(jìn)行數(shù)據(jù)傳輸。
  4. 不支持 peek 操作:由于 SynchronousQueue 內(nèi)部沒有存儲元素,因此不能調(diào)用 peek 操作。只有在移除元素時(shí)才會有元素可供操作。
  5. 支持公平和非公平模式:SynchronousQueue 可以在構(gòu)造時(shí)指定為公平或非公平模式。在公平模式下,隊(duì)列會按照線程的到達(dá)順序進(jìn)行操作;而在非公平模式下,則不保證操作的順序。

SynchronousQueue 在多線程并發(fā)編程中常用于一些特定場景,例如生產(chǎn)者-消費(fèi)者模式中,用于傳輸數(shù)據(jù)的場景,以及一些任務(wù)執(zhí)行器中用于任務(wù)的傳遞等。其特殊的同步機(jī)制保證了線程之間數(shù)據(jù)的可靠傳輸和同步操作。

基于SynchronousQueue 一個(gè)小案例

	/*** 理解SynchronousQueue* SynchronousQueue,實(shí)際上它不是一個(gè)真正的隊(duì)列,因?yàn)镾ynchronousQueue沒有容量。與其他BlockingQueue(阻塞隊(duì)列)不同,* SynchronousQueue是一個(gè)不存儲元素的BlockingQueue。只是它維護(hù)一組線程,這些線程在等待著把元素加入或移出隊(duì)列。* 我們簡單分為以下幾種特點(diǎn):* 內(nèi)部沒有存儲(容量為0)* 阻塞隊(duì)列(也是blockingqueue的一個(gè)實(shí)現(xiàn))* 發(fā)送或者消費(fèi)線程會阻塞,只有有一對消費(fèi)和發(fā)送線程匹配上,才同時(shí)退出。* (其中每個(gè)插入操作必須等待另一個(gè)線程的移除操作,同樣任何一個(gè)移除操作都等待另一個(gè)線程的插入操作。因此此隊(duì)列內(nèi)部其 實(shí)沒有任何一個(gè)元素,因此不能調(diào)用peek操作,因?yàn)橹挥幸瞥貢r(shí)才有元素。)* 配對有公平模式和非公平模式(默認(rèn))*/@Testpublic void test19() throws InterruptedException {SynchronousQueue<String> queue = new SynchronousQueue<>();// 我們通過線程內(nèi) 入隊(duì) 和 出隊(duì) 了解下 SynchronousQueue的特性new Thread(new Runnable() {@Overridepublic void run() {try {// 入隊(duì)logger.error("---- 喜羊羊進(jìn)鍋,沐浴~ ----");queue.put("喜羊羊!");logger.error("---- 懶羊羊進(jìn)鍋,沐浴~ ----");queue.put("懶羊羊!");logger.error("---- 美羊羊進(jìn)鍋,沐浴~ ----");queue.put("美羊羊!");} catch (InterruptedException e) {throw new RuntimeException(e);}}}, "t1").start();new Thread(new Runnable() {@Overridepublic void run() {try {// 入隊(duì)Thread.sleep(5000);String node1 = queue.take();logger.error("---- {} 出鍋~ ----", node1);Thread.sleep(10000);String node2 = queue.take();logger.error("---- {} 出鍋~ ----", node2);Thread.sleep(5000);String node3 = queue.take();logger.error("---- {} 出鍋~ ----", node3);} catch (InterruptedException e) {throw new RuntimeException(e);}}}, "t2").start();Thread.sleep(Integer.MAX_VALUE);}

image-20240313211349801

	// 根據(jù) 結(jié)果可以發(fā)現(xiàn), 三只羊同時(shí)進(jìn)鍋,但是一個(gè)鍋只能容納一只羊,所以只有一只羊能進(jìn)鍋,其他的羊只能等待,直到鍋里的羊出鍋,才能進(jìn)鍋// 也就是說,SynchronousQueue是一個(gè)不存儲元素的BlockingQueue。只是它維護(hù)一組線程,這些線程在等待著把元素加入或移出隊(duì)列。// 喜羊羊進(jìn)鍋,然后等待 5s后   喜羊羊出鍋,此時(shí)美羊羊開始進(jìn)鍋

了解這個(gè)阻塞隊(duì)列后,我們再來了解一下newCachedThreadPool這個(gè)線程池,還是通過一個(gè)案例來進(jìn)行了解一下具體用法

	/*** newCachedThreadPool 創(chuàng)建可以緩存的線程池*/@Testpublic void test17() {// 創(chuàng)建可以緩存的線程池// 創(chuàng)建一個(gè)可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。// keepAliveTime為60S,意味著線程空閑時(shí)間超過60S就會被殺死(60L)ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newCachedThreadPool();// 這里使用兩個(gè)線程,異步執(zhí)行// 此處 羊羊線程是不休眠的,直接放入線程池new Thread(() -> {for (int i = 0; i < 2; i++) {int finalI = i;threadPoolExecutor.submit(() -> {printPoolInfo(threadPoolExecutor, "羊羊線程" + finalI + "-正在執(zhí)行");});}}, "任務(wù)線程-1").start();// 狼狼線程 此時(shí)第一個(gè)線程也是不會進(jìn)行阻塞,此時(shí)應(yīng)該是  三個(gè)線程 (兩個(gè)羊羊+一個(gè)狼狼 同時(shí)進(jìn)線程池)new Thread(() -> {for (int i = 0; i < 5; i++) {if (i == 1 || i == 2) {try {Thread.sleep(TimeUnit.SECONDS.toMillis(5));} catch (InterruptedException e) {throw new RuntimeException(e);}}int finalI = i;threadPoolExecutor.submit(() -> {printPoolInfo(threadPoolExecutor, "狼狼線程" + finalI + "-正在執(zhí)行");});}}, "任務(wù)線程-2").start();// 等待全部線程都執(zhí)行完畢waitPoolExecutedEnd(threadPoolExecutor);threadPoolExecutor.shutdown();}

這段代碼的主要邏輯如下

  1. 使用 Executors.newCachedThreadPool() 創(chuàng)建了一個(gè)可緩存的線程池 threadPoolExecutor。這種線程池的特點(diǎn)是,如果線程池長度超過當(dāng)前任務(wù)需求,它會靈活地回收空閑線程;若沒有可回收的線程,則會新建線程來處理任務(wù)。線程的空閑時(shí)間超過60秒后,就會被回收。
  2. 創(chuàng)建了兩個(gè)新的線程,分別用于提交任務(wù)到線程池中執(zhí)行。其中,一個(gè)線程負(fù)責(zé)提交羊羊線程,另一個(gè)線程負(fù)責(zé)提交狼狼線程。
  3. 羊羊線程的任務(wù)不會進(jìn)行阻塞,直接提交到線程池中執(zhí)行。
  4. 狼狼線程的任務(wù)中,當(dāng) i 等于1或2時(shí),會進(jìn)行5秒的睡眠,模擬任務(wù)的耗時(shí)操作,然后再提交到線程池中執(zhí)行。
  5. 使用 waitPoolExecutedEnd(threadPoolExecutor) 方法等待線程池中的任務(wù)執(zhí)行完畢。
  6. 在所有任務(wù)執(zhí)行完畢后,調(diào)用 threadPoolExecutor.shutdown() 方法關(guān)閉線程池。

image-20240313211756425

  1. 通過結(jié)果可以看到每個(gè)任務(wù)的執(zhí)行都是間隔5秒執(zhí)行一次。
  2. 線程池信息中的線程數(shù)量始終為3,這是因?yàn)?Executors.newCachedThreadPool() 創(chuàng)建的是一個(gè)可緩存的線程池,其最大線程數(shù)量為 Integer.MAX_VALUE,因此當(dāng)任務(wù)提交到線程池時(shí),如果沒有空閑線程可用,則會新建線程來處理任務(wù),線程數(shù)量會一直增加,直到達(dá)到設(shè)定的最大值。
  3. 在任務(wù)提交時(shí),活躍線程數(shù)量始終為3,這是因?yàn)槊看翁峤蝗蝿?wù)時(shí),都有空閑線程可用,所以不需要新建線程,而是直接使用已存在的線程執(zhí)行任務(wù)。
  4. 狼狼線程的任務(wù)會進(jìn)行5秒的睡眠操作,模擬耗時(shí)操作,因此在執(zhí)行任務(wù)期間,線程池中的活躍線程數(shù)量會減少,直到任務(wù)執(zhí)行完畢后,線程池會繼續(xù)維持3個(gè)活躍線程數(shù)量。
  5. 在執(zhí)行完所有任務(wù)后,線程池并不會立即關(guān)閉,因?yàn)榫€程池是可緩存的,會等待一段時(shí)間后空閑線程自動被回收。

應(yīng)用場景:

  1. 短期任務(wù)處理:適用于處理大量短期任務(wù)的場景,因?yàn)樗軌蚋鶕?jù)需要?jiǎng)討B(tài)地創(chuàng)建線程,處理任務(wù),處理完畢后又自動回收線程,避免了線程過多占用資源的問題。
  2. 任務(wù)處理時(shí)間不確定:適用于任務(wù)處理時(shí)間不確定的場景,因?yàn)樗軌蚋鶕?jù)實(shí)際情況動態(tài)調(diào)整線程數(shù)量,保證任務(wù)能夠及時(shí)得到處理,提高系統(tǒng)的響應(yīng)速度。
  3. 需要快速響應(yīng)的任務(wù):適用于需要快速響應(yīng)的任務(wù),因?yàn)樗軌蚩焖俚貏?chuàng)建線程來處理任務(wù),縮短任務(wù)等待的時(shí)間,提高任務(wù)的處理效率。
  4. 任務(wù)負(fù)載波動大:適用于任務(wù)負(fù)載波動大的場景,因?yàn)樗軌蚋鶕?jù)負(fù)載情況動態(tài)調(diào)整線程數(shù)量,使系統(tǒng)能夠更好地適應(yīng)負(fù)載的變化。

缺點(diǎn):

  1. 線程數(shù)量不受限制:由于 newCachedThreadPool 的最大線程數(shù)量為 Integer.MAX_VALUE,因此在大量任務(wù)提交的情況下,可能會導(dǎo)致線程數(shù)量過多,占用大量系統(tǒng)資源,導(dǎo)致系統(tǒng)負(fù)載過高,甚至引發(fā)系統(tǒng)崩潰。
  2. 不適用于長時(shí)間任務(wù):由于它的線程數(shù)量不受限制,適用于處理短期任務(wù),但不適用于長時(shí)間任務(wù),因?yàn)殚L時(shí)間任務(wù)可能會導(dǎo)致線程數(shù)量過多,占用大量系統(tǒng)資源。
  3. 可能導(dǎo)致頻繁創(chuàng)建和銷毀線程:由于 newCachedThreadPool 是一個(gè)動態(tài)的線程池類型,可能會頻繁地創(chuàng)建和銷毀線程,這種線程的創(chuàng)建和銷毀操作會帶來一定的性能開銷。

綜上所述,newCachedThreadPool 適用于任務(wù)處理時(shí)間不確定、負(fù)載波動大、需要快速響應(yīng)的場景,但在大量長時(shí)間任務(wù)的情況下,需要慎重選擇以避免占用過多系統(tǒng)資源。

newScheduledThreadPool 創(chuàng)建可調(diào)度的線程池

項(xiàng)目中經(jīng)常會遇到一些非分布式的調(diào)度任務(wù),需要在未來的某個(gè)時(shí)刻周期性執(zhí)行。實(shí)現(xiàn)這樣的功能,我們有多種方式可以選擇:

  1. Timer類, jdk1.3引入,不推薦
    1. 它所有任務(wù)都是串行執(zhí)行的,同一時(shí)間只能有一個(gè)任務(wù)在執(zhí)行,而且前一個(gè)任務(wù)的延遲或異常都將會影響到之后的任務(wù)。
  2. Spring的@Scheduled注解,不是很推薦
    1. 這種方式底層雖然是用線程池實(shí)現(xiàn),但是有個(gè)最大的問題,所有的任務(wù)都使用的同一個(gè)線程池,可能會導(dǎo)致長周期的任務(wù)運(yùn)行影響短周期任務(wù)運(yùn)行,造成線程池"饑餓",更加推薦的做法是同種類型的任務(wù)使用同一個(gè)線程池。
  3. 自定義ScheduledThreadPoolExecutor實(shí)現(xiàn)調(diào)度任務(wù)
    這也是下面重點(diǎn)講解的方式,通過自定義ScheduledThreadPoolExecutor調(diào)度線程池,提交調(diào)度任務(wù)才是最優(yōu)解。

newScheduledThreadPool 用于創(chuàng)建一個(gè)可調(diào)度的線程newScheduledThreadPoolExecutors類提供的一個(gè)靜態(tài)方法

方法如下

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

newScheduledThreadPool方法返回一個(gè)ScheduledExecutorService對象,它是ExecutorService接口的子接口,提供了任務(wù)調(diào)度的能力。

可調(diào)度的線程池可以用于延時(shí)執(zhí)行任務(wù)和周期性執(zhí)行任務(wù)。它可以根據(jù)需要自動創(chuàng)建和回收線程,并且可以在指定的延時(shí)時(shí)間后執(zhí)行任務(wù),或在指定的時(shí)間間隔內(nèi)重復(fù)執(zhí)行任務(wù)。

使用newScheduledThreadPool創(chuàng)建的可調(diào)度線程池有以下特點(diǎn):

  • corePoolSize參數(shù)指定了線程池的核心線程數(shù),即線程池中同時(shí)執(zhí)行任務(wù)的最大線程數(shù)。
  • 當(dāng)任務(wù)的延時(shí)時(shí)間到達(dá)時(shí),線程池會創(chuàng)建新的線程來執(zhí)行任務(wù)。
  • 如果線程池中的線程數(shù)量超過核心線程數(shù),空閑的線程會在指定的時(shí)間內(nèi)被回收。
  • 可以使用schedule方法來延時(shí)執(zhí)行任務(wù),也可以使用scheduleAtFixedRate方法或scheduleWithFixedDelay方法來周期性執(zhí)行任務(wù)。

使用示例:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
executor.schedule(() -> {// 延時(shí)執(zhí)行的任務(wù)邏輯
}, 5, TimeUnit.SECONDS);executor.scheduleAtFixedRate(() -> {// 周期性執(zhí)行的任務(wù)邏輯
}, 1, 3, TimeUnit.SECONDS);executor.scheduleWithFixedDelay(() -> {// 周期性執(zhí)行的任務(wù)邏輯
}, 2, 4, TimeUnit.SECONDS);

在示例中,schedule方法用于延時(shí)執(zhí)行任務(wù),它接受一個(gè)任務(wù)和延時(shí)時(shí)間,表示在指定的延時(shí)時(shí)間后執(zhí)行任務(wù)。

scheduleAtFixedRate方法用于周期性執(zhí)行任務(wù),它接受一個(gè)任務(wù)、初始延時(shí)時(shí)間和周期時(shí)間,表示在初始延時(shí)時(shí)間后開始執(zhí)行任務(wù),并以指定的周期時(shí)間重復(fù)執(zhí)行任務(wù)。

scheduleWithFixedDelay方法也用于周期性執(zhí)行任務(wù),它接受一個(gè)任務(wù)、初始延時(shí)時(shí)間和周期時(shí)間,表示在初始延時(shí)時(shí)間后開始執(zhí)行任務(wù),并在任務(wù)執(zhí)行完成后等待指定的周期時(shí)間,然后再執(zhí)行下一個(gè)任務(wù)。

總之,newScheduledThreadPool方法用于創(chuàng)建一個(gè)可調(diào)度的線程池,可以用于延時(shí)執(zhí)行任務(wù)和周期性執(zhí)行任務(wù)。通過合理配置延時(shí)時(shí)間和周期時(shí)間,可以滿足不同場景下的任務(wù)調(diào)度需求。

下面我們通過一個(gè)案例來了解如何創(chuàng)建延時(shí)線程 和 定時(shí)線程

@Testpublic void test20() {// 使用Executors.newScheduledThreadPool 創(chuàng)建線程池ScheduledThreadPoolExecutor scheduledThreadPool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(5, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {// 對線程名稱進(jìn)行自定義return new Thread(r, "my-scheduled-job-" + r.hashCode());}});// 準(zhǔn)備工作DelayedThread delayedThread = new DelayedThread();LoopThread loopThread = new LoopThread();// 執(zhí)行延時(shí)線程 (延時(shí) 10s開始執(zhí)行 )logger.error("延時(shí)線程工作準(zhǔn)備結(jié)束!");scheduledThreadPool.schedule(delayedThread, 10, TimeUnit.SECONDS);// 執(zhí)行循環(huán)線程// scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)// command: 執(zhí)行的任務(wù)// initialDelay: 初始延遲的時(shí)間// delay: 上次執(zhí)行結(jié)束,延遲多久執(zhí)行// unit:單位logger.error("循環(huán)線程工作準(zhǔn)備結(jié)束!");scheduledThreadPool.scheduleAtFixedRate(loopThread, 10, 5, TimeUnit.SECONDS);waitPoolExecutedEnd(scheduledThreadPool);scheduledThreadPool.shutdown();}static class DelayedThread implements Runnable {@Overridepublic void run() {logger.error("延時(shí)線程正在開始執(zhí)行~~");}}static class LoopThread implements Runnable {@Overridepublic void run() {logger.error("循環(huán)線程開始執(zhí)行~~");}}

image-20240313213718558

首先,通過Executors.newScheduledThreadPool方法創(chuàng)建了一個(gè)可調(diào)度的線程池ScheduledThreadPoolExecutor,并指定了線程池的核心線程數(shù)為5。同時(shí),通過自定義的ThreadFactory來創(chuàng)建線程,并給線程指定了自定義的名稱。

接下來,定義了兩個(gè)任務(wù)類DelayedThreadLoopThread,分別實(shí)現(xiàn)Runnable接口。

在測試方法中,首先創(chuàng)建了DelayedThreadLoopThread的實(shí)例。

然后,在測試方法中,首先創(chuàng)建了DelayedThreadLoopThread的實(shí)例。

然后,通過調(diào)用scheduledThreadPool.schedule方法,將DelayedThread任務(wù)提交給線程池,并指定延時(shí)時(shí)間為10秒。這意味著DelayedThread任務(wù)將在10秒后執(zhí)行。

接著,通過調(diào)用scheduledThreadPool.scheduleAtFixedRate方法,將LoopThread任務(wù)提交給線程池,并指定初始延時(shí)時(shí)間為10秒、周期時(shí)間為5秒。這意味著LoopThread任務(wù)將在初始延時(shí)時(shí)間后開始執(zhí)行,并且每隔5秒重復(fù)執(zhí)行一次。

最后,調(diào)用waitPoolExecutedEnd方法等待線程池中的任務(wù)執(zhí)行完畢,并調(diào)用線程池的shutdown方法關(guān)閉線程池。

總結(jié)起來,這段代碼演示了使用Executors.newScheduledThreadPool創(chuàng)建可調(diào)度的線程池,并展示了延時(shí)執(zhí)行和周期性執(zhí)行的例子。通過合理配置延時(shí)時(shí)間和周期時(shí)間,可以實(shí)現(xiàn)在指定的時(shí)間點(diǎn)或時(shí)間間隔內(nèi)執(zhí)行任務(wù)。

newWorkStealingPool 創(chuàng)建一個(gè)可竊取任務(wù)的線程池

newWorkStealingPool(int parallelism)方法用于創(chuàng)建一個(gè)工作竊取線程池。工作竊取線程池是一種特殊的線程池,它根據(jù)一定的調(diào)度策略執(zhí)行任務(wù)。除了執(zhí)行任務(wù)外,工作竊取線程池還可以按照指定的延遲時(shí)間或周期性地執(zhí)行任務(wù)。

工作竊取線程池最早由美國計(jì)算機(jī)科學(xué)家 Charles E. Leiserson 和 John C. Bains 發(fā)明,他們在 1994 年的論文《Scheduling Multithreaded Computations by Work Stealing》中首次提出了這一概念。

工作竊取線程池的設(shè)計(jì)初衷是為了解決并行計(jì)算中獨(dú)立任務(wù)的負(fù)載均衡問題。在并行計(jì)算中,通常存在大量的獨(dú)立任務(wù)需要并行執(zhí)行,而這些獨(dú)立任務(wù)的執(zhí)行時(shí)間往往不一致。如果簡單地將任務(wù)平均分配給每個(gè)線程,那些執(zhí)行時(shí)間較短的任務(wù)將會導(dǎo)致線程空閑,而執(zhí)行時(shí)間較長的任務(wù)則可能導(dǎo)致線程被阻塞,從而降低整體的執(zhí)行效率。

為了解決這個(gè)問題,工作竊取線程池引入了工作竊取算法。該算法允許空閑線程從其他線程的任務(wù)隊(duì)列末尾竊取任務(wù)來執(zhí)行,以實(shí)現(xiàn)負(fù)載均衡。每個(gè)線程都維護(hù)一個(gè)自己的任務(wù)隊(duì)列,當(dāng)線程自己的任務(wù)執(zhí)行完畢后,它會嘗試從其他線程的任務(wù)隊(duì)列末尾竊取任務(wù)執(zhí)行。這樣,任務(wù)的分配和執(zhí)行可以更加均衡,避免線程之間出現(xiàn)明顯的負(fù)載不均衡。

我們了解 newWorkStealingPool 先來了解一下Fork/Join

Fork/Join框架是Java提供的一種并行執(zhí)行任務(wù)的框架,它基于工作竊取算法實(shí)現(xiàn)任務(wù)的自動調(diào)度和負(fù)載均衡。在Fork/Join框架中,工作竊取線程池是其中的核心組件。

Fork/Join框架的工作原理如下:

  1. 每個(gè)任務(wù)被劃分為更小的子任務(wù),這個(gè)過程通常被稱為"fork"。
  2. 當(dāng)一個(gè)線程執(zhí)行"fork"操作時(shí),它會將子任務(wù)放入自己的工作隊(duì)列中。
  3. 當(dāng)一個(gè)線程完成自己的任務(wù)后,它會從其他線程的工作隊(duì)列中"steal"(竊取)任務(wù)來執(zhí)行。
  4. 竊取的任務(wù)通常是其他線程工作隊(duì)列的末尾的任務(wù),這樣可以減少線程之間的競爭。

工作竊取線程池在Fork/Join框架中的應(yīng)用主要體現(xiàn)在以下幾個(gè)方面:

  1. 任務(wù)分割: 在Fork/Join框架中,任務(wù)被遞歸地分割成更小的子任務(wù),直到達(dá)到某個(gè)終止條件。工作竊取線程池中的線程負(fù)責(zé)執(zhí)行這些任務(wù)。每個(gè)線程都有自己的任務(wù)隊(duì)列,當(dāng)一個(gè)線程執(zhí)行完自己的任務(wù)后,會從自己的隊(duì)列中獲取新的任務(wù)來執(zhí)行。
  2. 負(fù)載均衡: 工作竊取線程池通過工作竊取算法實(shí)現(xiàn)負(fù)載均衡。當(dāng)一個(gè)線程的任務(wù)隊(duì)列為空時(shí),它會從其他線程的任務(wù)隊(duì)列中竊取任務(wù)來執(zhí)行,以保持各個(gè)線程的工作量相對均衡。這種負(fù)載均衡策略可以避免線程之間出現(xiàn)明顯的負(fù)載不均衡,提高整體的執(zhí)行效率。
  3. 遞歸任務(wù)執(zhí)行: Fork/Join框架中的任務(wù)通常是遞歸執(zhí)行的。當(dāng)一個(gè)任務(wù)被分割成多個(gè)子任務(wù)時(shí),每個(gè)子任務(wù)會被提交到工作竊取線程池中執(zhí)行。如果子任務(wù)還可以進(jìn)一步分割,線程會繼續(xù)執(zhí)行這個(gè)過程,直到任務(wù)不能再分割為止。這種遞歸的任務(wù)執(zhí)行方式能夠充分利用線程池中的線程資源,提高并行任務(wù)的執(zhí)行效率。
  4. Join操作: 在Fork/Join框架中,一個(gè)任務(wù)可以等待其子任務(wù)執(zhí)行完成后再繼續(xù)執(zhí)行,這個(gè)操作被稱為"join"。工作竊取線程池在執(zhí)行任務(wù)時(shí)會自動進(jìn)行join操作,確保任務(wù)的執(zhí)行順序滿足依賴關(guān)系。這樣可以避免線程之間的競爭和沖突,保證任務(wù)的正確性。

總的來說,工作竊取線程池在Fork/Join框架中扮演著重要的角色。它通過工作竊取算法和負(fù)載均衡策略,實(shí)現(xiàn)了并行任務(wù)的自動調(diào)度和執(zhí)行。通過遞歸任務(wù)執(zhí)行和join操作,工作竊取線程池能夠高效地處理大量的并行任務(wù),并充分利用系統(tǒng)的并行計(jì)算能力。

newWorkStealingPool 是一個(gè)創(chuàng)建工作竊取線程池的方法,它使用了ForkJoinPool,并根據(jù)CPU核心數(shù)動態(tài)調(diào)整線程數(shù)量。這種線程池適用于CPU密集型的任務(wù)。

與其他四種線程池不同,newWorkStealingPool 使用了ForkJoinPool。它的優(yōu)勢在于將一個(gè)任務(wù)拆分成多個(gè)小任務(wù),并將這些小任務(wù)分發(fā)給多個(gè)線程并行執(zhí)行。當(dāng)所有小任務(wù)都執(zhí)行完成后,再將它們的結(jié)果合并。

相較于之前的線程池,newWorkStealingPool 中的每個(gè)線程都擁有自己的任務(wù)隊(duì)列,而不是多個(gè)線程共享一個(gè)阻塞隊(duì)列。

當(dāng)一個(gè)線程發(fā)現(xiàn)自己的任務(wù)隊(duì)列為空時(shí),它會去其他線程的隊(duì)列中竊取任務(wù)來執(zhí)行。可以將這個(gè)過程簡單理解為"竊取"。為了降低沖突,一般情況下,自己的本地隊(duì)列采用后進(jìn)先出(LIFO)的順序,而竊取時(shí)則采用先進(jìn)先出(FIFO)的順序。由于竊取的動作非??焖?#xff0c;這種沖突會大大降低,從而提高了性能。這也是一種優(yōu)化方式。

下面我們通過一個(gè)案例 來了解一下 newWorkStealingPool

	@Testpublic void test21() {// 返回可用的計(jì)算資源int core = Runtime.getRuntime().availableProcessors();logger.error("cpu 可以計(jì)算機(jī)資源 :{}", core);// 無參數(shù)的話,會根據(jù)cpu當(dāng)前核心數(shù)據(jù) 動態(tài)分配// ExecutorService executorService = Executors.newWorkStealingPool();// 當(dāng)傳入?yún)?shù),就可以指定cpu的一個(gè)并行數(shù)量ForkJoinPool forkJoinPool = (ForkJoinPool) Executors.newWorkStealingPool(8);for (int i = 1; i <= core * 2; i++) {WorkStealThread workStealThread = new WorkStealThread(i, forkJoinPool);forkJoinPool.submit(workStealThread);}// 優(yōu)雅關(guān)閉forkJoinPool.shutdown();// 為了防止 主線程 沒有完全執(zhí)行結(jié)束try {Thread.sleep(Integer.MAX_VALUE);} catch (InterruptedException e) {throw new RuntimeException(e);}}static class WorkStealThread implements Runnable {private int i;private ForkJoinPool forkJoinPool;WorkStealThread() {}WorkStealThread(int i, ForkJoinPool forkJoinPool) {this.i = i;this.forkJoinPool = forkJoinPool;}@Overridepublic void run() {try {// 隨機(jī)休眠Thread.sleep(TimeUnit.SECONDS.toMillis(new Random().nextInt(10)));} catch (InterruptedException e) {throw new RuntimeException(e);}logger.error("工作竊取線程-{},線程池的大小:[{}],活動線程數(shù):[{}],總?cè)蝿?wù)竊取次數(shù):[{}]",i, forkJoinPool.getPoolSize(), forkJoinPool.getActiveThreadCount(), forkJoinPool.getStealCount());}}

當(dāng)代碼運(yùn)行時(shí),首先通過Runtime.getRuntime().availableProcessors()獲取可用的計(jì)算資源,即CPU核心數(shù),并將其記錄在日志中。

接下來,使用Executors.newWorkStealingPool(8)創(chuàng)建一個(gè)工作竊取線程池,其中參數(shù)8表示線程池的并行度,即同時(shí)執(zhí)行的線程數(shù)。這個(gè)線程池是基于ForkJoinPool實(shí)現(xiàn)的,具有任務(wù)竊取的特性。

然后,通過一個(gè)循環(huán)將一些工作竊取線程提交到線程池中進(jìn)行并行執(zhí)行。每個(gè)工作竊取線程都有一個(gè)編號,從1到核心數(shù)的兩倍。這些線程使用WorkStealThread類實(shí)現(xiàn)了Runnable接口。

WorkStealThread類中的run方法定義了線程的執(zhí)行邏輯。首先,線程會隨機(jī)休眠一段時(shí)間,模擬執(zhí)行一些耗時(shí)的任務(wù)。然后,它會輸出一些關(guān)于線程池狀態(tài)的信息,包括線程池的大小、活動線程數(shù)和總?cè)蝿?wù)竊取次數(shù)。這些信息會記錄在日志中。

最后,線程池會被優(yōu)雅地關(guān)閉,確保所有任務(wù)都能執(zhí)行完畢。為了防止主線程提前結(jié)束,使用Thread.sleep(Integer.MAX_VALUE)使主線程休眠,直到被中斷或拋出異常。

通過以上代碼,我們可以了解到如何使用newWorkStealingPool方法創(chuàng)建工作竊取線程池,并通過工作竊取線程實(shí)現(xiàn)并行執(zhí)行。同時(shí),通過獲取線程池的狀態(tài)信息,我們可以了解線程池的工作情況,包括活動線程數(shù)和任務(wù)竊取次數(shù)。

通過觀察結(jié)果,我們可以發(fā)現(xiàn),當(dāng)前線程執(zhí)行完畢,如果空閑的話,會去執(zhí)行別的線程任務(wù)

image-20240313215049387

http://www.risenshineclean.com/news/3064.html

相關(guān)文章:

  • 網(wǎng)站開發(fā)公司經(jīng)營范圍手機(jī)百度賬號申請注冊
  • 企業(yè)做網(wǎng)站平臺的好處鶴壁seo推廣
  • 許昌知名網(wǎng)站建設(shè)價(jià)格重慶發(fā)布的最新消息今天
  • 宣傳網(wǎng)站建設(shè)意義查看百度關(guān)鍵詞價(jià)格
  • 阿圖什網(wǎng)站寧波核心關(guān)鍵詞seo收費(fèi)
  • 網(wǎng)站 展示百度搜索風(fēng)云榜總榜
  • 北京澳環(huán)網(wǎng)站拼多多關(guān)鍵詞怎么優(yōu)化
  • 甜品網(wǎng)站設(shè)計(jì)與實(shí)現(xiàn)畢業(yè)設(shè)計(jì)網(wǎng)站首頁不收錄
  • 做英文行程的網(wǎng)站汕頭自動seo
  • 濟(jì)南企業(yè)做網(wǎng)站網(wǎng)絡(luò)營銷策劃案例
  • 寧波網(wǎng)站建設(shè)方案報(bào)價(jià)四川seo平臺
  • 不花錢自己可以做網(wǎng)站嗎專門搜索知乎內(nèi)容的搜索引擎
  • 宣傳冊內(nèi)容排版揚(yáng)州seo
  • 怎樣php網(wǎng)站建設(shè)河南網(wǎng)站建設(shè)公司哪家好
  • 北京智能網(wǎng)站建設(shè)系統(tǒng)加盟怎么用網(wǎng)絡(luò)推廣業(yè)務(wù)
  • 在做網(wǎng)站編代碼網(wǎng)頁導(dǎo)航條中的文字出現(xiàn)在導(dǎo)航條的下方怎莫解決東莞有限公司seo
  • 網(wǎng)站建設(shè)的一般步驟精準(zhǔn)粉絲引流推廣
  • 成都哪里做網(wǎng)站便宜百度的合作網(wǎng)站有哪些
  • 網(wǎng)站建設(shè)實(shí)訓(xùn)內(nèi)容廣東東莞疫情最新消息今天又封了
  • 什么情況下需要建設(shè)網(wǎng)站蘋果自研搜索引擎或?yàn)樘娲雀?/a>
  • 成都網(wǎng)站建設(shè)博客國際新聞最新消息十條摘抄
  • 時(shí)光軸網(wǎng)站模板關(guān)鍵詞在線聽
  • 龍巖新聞網(wǎng)龍巖kk社區(qū)搜索引擎外部鏈接優(yōu)化
  • 地方門戶網(wǎng)站建站流程杭州百度seo優(yōu)化
  • wordpress商城建站教程創(chuàng)建網(wǎng)站要錢嗎
  • 永久免費(fèi)自助建站朋友圈廣告投放價(jià)格表
  • 織夢網(wǎng)站首頁打開慢鄭州企業(yè)網(wǎng)絡(luò)推廣外包
  • 做網(wǎng)站做的好的公司有哪些免費(fèi)網(wǎng)站排名優(yōu)化軟件
  • 甘肅做網(wǎng)站哪家專業(yè)推廣計(jì)劃方案模板
  • 深圳 電子商務(wù)網(wǎng)站開發(fā)在線智能識圖