中英西班牙網(wǎng)站建設(shè)一鍵優(yōu)化是什么意思
前言
在Java中,AbstractQueuedSynchronizer
(簡稱AQS)是一個用于實現(xiàn)同步器的抽象類,它為實現(xiàn)各種類型的同步器(如鎖、信號量等)提供了基本的框架。AQS通過一個雙向隊列(等待隊列)和一個整數(shù)變量(狀態(tài))來管理線程的排隊和狀態(tài)控制。
AQS的具體實現(xiàn)類主要有以下幾種:
ReentrantLock
:可重入鎖的實現(xiàn)類,支持獨占模式。ReentrantReadWriteLock
:可重入讀寫鎖的實現(xiàn)類,支持共享和獨占模式。Semaphore
:信號量的實現(xiàn)類,用于控制同時訪問某個資源的線程數(shù)量。CountDownLatch
:倒計時門閂的實現(xiàn)類,用于等待一組線程完成某個操作后再執(zhí)行。CyclicBarrier
:循環(huán)屏障的實現(xiàn)類,用于等待一組線程都到達某個狀態(tài)點后再一起繼續(xù)執(zhí)行。Phaser
:分階段屏障的實現(xiàn)類,用于協(xié)調(diào)多個線程在不同階段的同步。LockSupport
:用于創(chuàng)建鎖和其他同步類的基本線程阻塞原語。
其中,ReentrantLock
和ReentrantReadWriteLock
是比較常用的同步器實現(xiàn)類,用于提供可重入的獨占鎖和讀寫鎖。Semaphore
、CountDownLatch
、CyclicBarrier
和Phaser
等同步器類則在特定場景下發(fā)揮重要作用,幫助線程協(xié)調(diào)、控制和同步操作。
這些AQS實現(xiàn)類可以根據(jù)具體的需求選擇使用,通過繼承AbstractQueuedSynchronizer
類并實現(xiàn)其中的抽象方法,可以定制和擴展自定義的同步器。
CountDownLatch
簡介
CountDownLatch是Java中的一個同步輔助類,用于實現(xiàn)線程間的等待和通知機制。它通過一個計數(shù)器來實現(xiàn),該計數(shù)器被初始化為一個正整數(shù),并且只能遞減。線程在等待階段通過調(diào)用await()方法等待計數(shù)器變?yōu)?,而其他線程在完成自己的任務(wù)后通過調(diào)用countDown()方法來減少計數(shù)器的值。當(dāng)計數(shù)器的值變?yōu)?時,所有等待的線程都將被喚醒繼續(xù)執(zhí)行。
CountDownLatch的主要方法包括:
void await():當(dāng)前線程等待計數(shù)器的值變?yōu)?。如果計數(shù)器的值大于0,await()方法將導(dǎo)致線程阻塞,直到計數(shù)器的值變?yōu)?或被中斷。
void countDown():將計數(shù)器的值減1。每個調(diào)用countDown()方法的線程都會使計數(shù)器減少1。
long getCount():獲取當(dāng)前計數(shù)器的值。
CountDownLatch通常用于以下場景:
啟動多個線程等待某個任務(wù)的完成:主線程在創(chuàng)建需要等待的線程后,通過CountDownLatch的構(gòu)造函數(shù)將計數(shù)器的值設(shè)置為等待的線程數(shù)。每個線程在完成任務(wù)后調(diào)用countDown()方法,計數(shù)器的值減1。主線程在需要等待的位置調(diào)用await()方法,直到計數(shù)器的值變?yōu)?,才會繼續(xù)執(zhí)行。
并行任務(wù)的等待和合并:多個線程同時執(zhí)行某個任務(wù),但是要求它們在繼續(xù)執(zhí)行前等待其他所有線程都完成。每個線程在完成自己的任務(wù)后調(diào)用countDown()方法,主線程調(diào)用await()方法等待所有線程完成。
測試并發(fā)性能:可以使用CountDownLatch來同時啟動多個線程,然后在主線程中等待所有線程完成,以測試并發(fā)操作的性能。
通過使用CountDownLatch,我們可以實現(xiàn)線程之間的同步和協(xié)調(diào),確保某些線程在其他線程完成任務(wù)后再繼續(xù)執(zhí)行,以及在需要等待多個線程完成后再進行下一步操作。
其中內(nèi)部有一個類Sync可以看到該類繼承AbstractQueuedSynchronizer,擁有AQS特性。
從構(gòu)造函數(shù)來看Sync(int count),構(gòu)造時傳遞一個線程狀態(tài)存儲。
//CountDownLatch的同步控制。使用AQS狀態(tài)表示計數(shù)。private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;Sync(int count) {setState(count);}int getCount() {return getState();}protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}}
函數(shù)
await
-
await()函數(shù)是CountDownLatch的核心函數(shù)之一,用于使當(dāng)前線程在計數(shù)器倒計數(shù)至零之前一直等待。如果線程被中斷,則會拋出InterruptedException。
-
函數(shù)內(nèi)部通過調(diào)用sync.acquireSharedInterruptibly(1)將操作轉(zhuǎn)發(fā)給了Sync對象的acquireSharedInterruptibly方法。
public void await() throws InterruptedException {// 轉(zhuǎn)發(fā)到sync對象上sync.acquireSharedInterruptibly(1);
}
acquireSharedInterruptibly
acquireSharedInterruptibly(int arg)方法在AQS中定義,用于在獲取共享資源時可中斷地等待。如果線程被中斷,則會拋出InterruptedException。
acquireSharedInterruptibly(int arg)方法內(nèi)部先檢查線程的中斷狀態(tài),如果被中斷,則拋出InterruptedException。
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}
tryAcquireShared
如果通過tryAcquireShared(arg)方法嘗試獲取共享資源失敗(返回值小于0),則調(diào)用doAcquireSharedInterruptibly(arg)方法進行進一步的等待操作。
tryAcquireShared(int acquires)是Sync類的方法,它根據(jù)AQS的狀態(tài)來判斷是否可以獲取共享資源。在CountDownLatch的實現(xiàn)中,該方法簡單地判斷AQS的狀態(tài)是否為0,如果為0,則返回1,表示可以獲取共享資源;否則返回-1,表示不能獲取共享資源
protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;
}
示例1
創(chuàng)建了一個CountDownLatch對象,并將其初始計數(shù)值設(shè)置為3(numOfThreads)。然后,我們創(chuàng)建了3個工作線程(Worker類的實例),每個工作線程模擬執(zhí)行任務(wù)的時間。每個工作線程完成任務(wù)后,調(diào)用countDown()方法來減少計數(shù)器的值。
**主線程通過調(diào)用latch.await()來等待所有工作線程完成。**當(dāng)計數(shù)器的值變?yōu)?時,主線程被喚醒,并打印出"All workers have completed their tasks."的消息。
該示例演示了如何使用CountDownLatch實現(xiàn)主線程等待多個工作線程完成任務(wù)后再繼續(xù)執(zhí)行的場景。
import java.util.concurrent.CountDownLatch;public class CountDownLatchExample {public static void main(String[] args) {int numOfThreads = 3;CountDownLatch latch = new CountDownLatch(numOfThreads);for (int i = 0; i < numOfThreads; i++) {Thread thread = new Thread(new Worker(latch));thread.start();}try {latch.await(); // 主線程等待所有工作線程完成System.out.println("All workers have completed their tasks.");} catch (InterruptedException e) {e.printStackTrace();}}static class Worker implements Runnable {private final CountDownLatch latch;public Worker(CountDownLatch latch) {this.latch = latch;}@Overridepublic void run() {// 模擬每個工作線程執(zhí)行任務(wù)的時間try {Thread.sleep(2000);System.out.println("Worker completed its task.");} catch (InterruptedException e) {e.printStackTrace();} finally {latch.countDown(); // 每個工作線程完成任務(wù)后調(diào)用countDown()}}}
}
}
在上面的示例中,我們創(chuàng)建了一個CountDownLatch對象,并將其初始計數(shù)值設(shè)置為3(numOfThreads)。然后,我們創(chuàng)建了3個工作線程(Worker類的實例),每個工作線程模擬執(zhí)行任務(wù)的時間。每個工作線程完成任務(wù)后,調(diào)用countDown()方法來減少計數(shù)器的值。
主線程通過調(diào)用latch.await()來等待所有工作線程完成。當(dāng)計數(shù)器的值變?yōu)?時,主線程被喚醒,并打印出"All workers have completed their tasks."的消息。
該示例演示了如何使用CountDownLatch實現(xiàn)主線程等待多個工作線程完成任務(wù)后再繼續(xù)執(zhí)行的場景。
示例2
業(yè)務(wù)場景:某個業(yè)務(wù)操作非常耗時,但又必須等這個操作結(jié)束后才能進行后續(xù)操作。
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;/*** 多線程任務(wù)處理工具類*/
public class TaskDisposeUtils {//并行線程數(shù)public static final int POOL_SIZE;static {//判斷核心線程數(shù) 如果機器的核心線程數(shù)大于5則用機器核心線程數(shù)POOL_SIZE = Integer.max(Runtime.getRuntime().availableProcessors(), 5);}/*** 并行處理,并等待結(jié)束**/public static <T> void dispose(List<T> taskList, Consumer<T> consumer) throws InterruptedException {dispose(true, POOL_SIZE, taskList, consumer);}/*** 并行處理,并等待結(jié)束**/public static <T> void dispose(boolean moreThread, int poolSize, List<T> taskList, Consumer<T> consumer) throws InterruptedException {if (CollectionUtils.isEmpty(taskList)) {return;}//如果是多線程且核心線程數(shù)大于一則進入方法if (moreThread && poolSize > 1) {poolSize = Math.min(poolSize, taskList.size());ExecutorService executorService = null;try {//新建一個固定大小的線程池 核心線程數(shù)為poolSizeexecutorService = Executors.newFixedThreadPool(poolSize);//juc工具類 用于讓必須所有任務(wù)都處理完后才進行下一步CountDownLatch countDownLatch = new CountDownLatch(taskList.size());for (T item : taskList) {executorService.execute(() -> {try {//消費任務(wù)consumer.accept(item);} finally {//處理完后減一countDownLatch.countDown();}});}//在此等待 當(dāng)countDownLatch變成0后才繼續(xù)進行下一步countDownLatch.await();} finally {if (executorService != null) {executorService.shutdown();}}} else {for (T item : taskList) {consumer.accept(item);}}}public static void main(String[] args) throws InterruptedException {//生成1-10的10個數(shù)字,放在list中,相當(dāng)于10個任務(wù)List<Integer> list = Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList());JSONObject object=new JSONObject();object.put("name","sss");//啟動多線程處理list中的數(shù)據(jù),每個任務(wù)休眠時間為list中的數(shù)值
// Consumer<Integer> c= item -> {
// try {
// long startTime = System.currentTimeMillis();
// object.put("s",item);
// TimeUnit.SECONDS.sleep(item);
// long endTime = System.currentTimeMillis();
// System.out.println(object.toJSONString());
// System.out.println(System.currentTimeMillis() + ",任務(wù)" + item + "執(zhí)行完畢,耗時:" + (endTime - startTime));
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// };TaskDisposeUtils.dispose(list, item -> {try {long startTime = System.currentTimeMillis();object.put("s",item);TimeUnit.SECONDS.sleep(item);long endTime = System.currentTimeMillis();System.out.println(object.toJSONString());System.out.println(System.currentTimeMillis() + ",任務(wù)" + item + "執(zhí)行完畢,耗時:" + (endTime - startTime));} catch (InterruptedException e) {e.printStackTrace();}});//上面所有任務(wù)處理完畢完畢之后,程序才能繼續(xù)System.out.println(list + "中的任務(wù)都處理完畢!");}
}