中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁(yè) > news >正文

桂林手機(jī)網(wǎng)站制作/官網(wǎng)設(shè)計(jì)公司

桂林手機(jī)網(wǎng)站制作,官網(wǎng)設(shè)計(jì)公司,兩個(gè)人做aj的視頻教程,今日體育新聞大綱 1.等待多線程完成的CountDownLatch介紹 2.CountDownLatch.await()方法源碼 3.CountDownLatch.coutDown()方法源碼 4.CountDownLatch總結(jié) 5.控制并發(fā)線程數(shù)的Semaphore介紹 6.Semaphore的令牌獲取過(guò)程 7.Semaphore的令牌釋放過(guò)程 8.同步屏障CyclicBarrier介紹 9.C…

大綱

1.等待多線程完成的CountDownLatch介紹

2.CountDownLatch.await()方法源碼

3.CountDownLatch.coutDown()方法源碼

4.CountDownLatch總結(jié)

5.控制并發(fā)線程數(shù)的Semaphore介紹

6.Semaphore的令牌獲取過(guò)程

7.Semaphore的令牌釋放過(guò)程

8.同步屏障CyclicBarrier介紹

9.CyclicBarrier的await()方法源碼

10.使用CountDownLatch等待注冊(cè)的完成

11.使用CyclicBarrier將工作任務(wù)多線程分而治之

12.使用CyclicBarrier聚合服務(wù)接口的返回結(jié)果

13.使用Semaphore等待指定數(shù)量線程完成任務(wù)

volatile、synchronized、CAS、AQS、讀寫(xiě)鎖、鎖優(yōu)化和鎖故障、并發(fā)集合、線程池、同步組件

1.等待多線程完成的CountDownLatch

(1)CountDownLatch的簡(jiǎn)介

(2)CountDownLatch的應(yīng)用

(3)CountDownLatch的例子

(1)CountDownLatch的簡(jiǎn)介

CountDownLatch允許一個(gè)或多個(gè)線程等待其他線程完成操作。CountDownLatch提供了兩個(gè)核心方法,分別是await()方法和countDown()方法。CountDownLatch.await()方法讓調(diào)用線程進(jìn)行阻塞進(jìn)入等待狀態(tài),CountDownLatch.countDown()方法用于對(duì)計(jì)數(shù)器進(jìn)行遞減。

CountDownLatch在構(gòu)造時(shí)需要傳入一個(gè)正整數(shù)作為計(jì)數(shù)器初始值。線程每調(diào)用一次countDown()方法,都會(huì)對(duì)該計(jì)數(shù)器減一。當(dāng)計(jì)數(shù)器為0時(shí),會(huì)喚醒所有執(zhí)行await()方法時(shí)被阻塞的線程。

(2)CountDownLatch的應(yīng)用

應(yīng)用一:

使用多線程去解析一個(gè)Excel里多個(gè)sheet的數(shù)據(jù),每個(gè)線程解析一個(gè)sheet里的數(shù)據(jù),等所有sheet解析完再提示處理完成。此時(shí)便可以使用CountDownLatch來(lái)實(shí)現(xiàn),當(dāng)然可以使用Thread.join()方法。

注意:Thread.join()方法是基于wait()和notify()來(lái)實(shí)現(xiàn)的。在main線程里開(kāi)啟一個(gè)線程A,main線程如果執(zhí)行了線程A的join()方法,那么就會(huì)導(dǎo)致main線程被阻塞,main線程會(huì)等待線程A執(zhí)行完畢才會(huì)繼續(xù)往下執(zhí)行。

應(yīng)用二:

微服務(wù)注冊(cè)中心的register-client,為了在注冊(cè)線程執(zhí)行成功后,才發(fā)送心跳??梢允褂肅ountDownLatch,當(dāng)然也可以使用Thread.join()方法。

應(yīng)用三:

可以通過(guò)CountDownLatch實(shí)現(xiàn)類(lèi)似并發(fā)的效果。把CountDownLatch的計(jì)數(shù)器設(shè)置為1,然后讓1000個(gè)線程調(diào)用await()方法。當(dāng)1000個(gè)線程初始化完成后,在main線程調(diào)用countDown()讓計(jì)數(shù)器歸零。這樣這1000個(gè)線程就會(huì)在一個(gè)for()循環(huán)中,依次被喚醒。

(3)CountDownLatch的例子

public class CountDownLatchDemo {public static void main(String[] args) throws Exception {final CountDownLatch latch = new CountDownLatch(2);new Thread() {public void run() {try {Thread.sleep(1000);System.out.println("線程1開(kāi)始執(zhí)行,休眠2秒...");Thread.sleep(1000);System.out.println("線程1準(zhǔn)備執(zhí)行countDown操作...");latch.countDown();System.out.println("線程1完成執(zhí)行countDown操作...");} catch (Exception e) {e.printStackTrace();}}}.start();new Thread() {public void run() {try {Thread.sleep(1000);System.out.println("線程2開(kāi)始執(zhí)行,休眠2秒...");Thread.sleep(1000);System.out.println("線程2準(zhǔn)備執(zhí)行countDown操作...");latch.countDown();System.out.println("線程2完成執(zhí)行countDown操作...");} catch (Exception e) {e.printStackTrace();}}}.start();System.out.println("main線程準(zhǔn)備執(zhí)行countDownLatch的await操作,將會(huì)同步阻塞等待...");latch.await();System.out.println("所有線程都完成countDown操作,結(jié)束同步阻塞等待...");}
}

2.CountDownLatch.await()方法源碼

(1)CountDownLatch.await()方法的阻塞流程

(2)CountDownLatch.await()方法的喚醒流程

(3)CountDownLatch.await()方法的阻塞總結(jié)

(1)CountDownLatch.await()方法的阻塞流程

CountDownLatch是基于AQS中的共享鎖來(lái)實(shí)現(xiàn)的。從CountDownLatch的構(gòu)造方法可知,CountDownLatch的count就是AQS的state。

調(diào)用CountDownLatch的await()方法時(shí),會(huì)先調(diào)用AQS的acquireSharedInterruptibly()模版方法,然后會(huì)調(diào)用CountDownLatch的內(nèi)部類(lèi)Sync實(shí)現(xiàn)的tryAcquireShared()方法。tryAcquireShared()方法會(huì)判斷state的值是否為0,如果為0,才返回1,否則返回-1。

當(dāng)調(diào)用CountDownLatch內(nèi)部類(lèi)Sync的tryAcquireShared()方法獲得的返回值是-1時(shí),才會(huì)調(diào)用AQS的doAcquireSharedInterruptibly()方法,將當(dāng)前線程封裝成Node結(jié)點(diǎn)加入等待隊(duì)列,然后掛起當(dāng)前線程進(jìn)行阻塞。

//A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
public class CountDownLatch {private final Sync sync;public CountDownLatch(int count) {if (count < 0) {throw new IllegalArgumentException("count < 0");}this.sync = new Sync(count);}//Synchronization control For CountDownLatch.//Uses AQS state to represent count.private static final class Sync extends AbstractQueuedSynchronizer {Sync(int count) {setState(count);}int getCount() {return getState();}protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}protected boolean tryReleaseShared(int releases) {//Decrement count; signal when transition to zerofor (;;) {int c = getState();if (c == 0) {return false;}int nextc = c-1;if (compareAndSetState(c, nextc)) {return nextc == 0;}}}}//Causes the current thread to wait until the latch has counted down to zero, //unless the thread is Thread#interrupt interrupted.public void await() throws InterruptedException {//執(zhí)行AQS的acquireSharedInterruptibly()方法sync.acquireSharedInterruptibly(1);}...
}public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {...//Acquires in shared mode, aborting if interrupted.//Implemented by first checking interrupt status, then invoking at least once #tryAcquireShared, returning on success.//Otherwise the thread is queued, possibly repeatedly blocking and unblocking,//invoking #tryAcquireShared until success or the thread is interrupted.public final void acquireSharedInterruptibly(int arg) throws InterruptedException {if (Thread.interrupted()) {throw new InterruptedException();}//執(zhí)行CountDownLatch的內(nèi)部類(lèi)Sync實(shí)現(xiàn)的tryAcquireShared()方法,搶占共享鎖if (tryAcquireShared(arg) < 0) {//執(zhí)行AQS的doAcquireSharedInterruptibly()方法doAcquireSharedInterruptibly(arg);}}//Acquires in shared interruptible mode.private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {final Node node = addWaiter(Node.SHARED);//封裝當(dāng)前線程為Shared類(lèi)型的Node結(jié)點(diǎn)boolean failed = true;try {//第一次循環(huán)r = -1,所以會(huì)執(zhí)行AQS的shouldParkAfterFailedAcquire()方法//將node結(jié)點(diǎn)的有效前驅(qū)結(jié)點(diǎn)的狀態(tài)設(shè)置為SIGNALfor (;;) {final Node p = node.predecessor();//node結(jié)點(diǎn)的前驅(qū)結(jié)點(diǎn)if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}//執(zhí)行shouldParkAfterFailedAcquire()方法設(shè)置node結(jié)點(diǎn)的前驅(qū)結(jié)點(diǎn)的狀態(tài)為SIGNAL//執(zhí)行parkAndCheckInterrupt()方法掛起當(dāng)前線程if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {throw new InterruptedException();}}} finally {if (failed) {cancelAcquire(node);}}}//Checks and updates status for a node that failed to acquire.//Returns true if thread should block. This is the main signal control in all acquire loops.private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL) {//This node has already set status asking a release to signal it, so it can safely park.return true;}if (ws > 0) {//Predecessor was cancelled. Skip over predecessors and indicate retry.do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {//waitStatus must be 0 or PROPAGATE.  //Indicate that we need a signal, but don't park yet.  //Caller will need to retry to make sure it cannot acquire before parking.compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}//設(shè)置頭結(jié)點(diǎn)和喚醒后續(xù)線程//Sets head of queue, and checks if successor may be waiting in shared mode, //if so propagating if either propagate > 0 or PROPAGATE status was set.private void setHeadAndPropagate(Node node, int propagate) {Node h = head;setHead(node);//將node結(jié)點(diǎn)設(shè)置為頭結(jié)點(diǎn)if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared()) {doReleaseShared();}}}private void setHead(Node node) {head = node;node.thread = null;node.prev = null;}...
}

(2)CountDownLatch.await()方法的喚醒流程

調(diào)用await()方法時(shí),首先會(huì)將當(dāng)前線程封裝成Node結(jié)點(diǎn)并添加到等待隊(duì)列中,然后在執(zhí)行第一次for循環(huán)時(shí)會(huì)設(shè)置該Node結(jié)點(diǎn)的前驅(qū)結(jié)點(diǎn)狀態(tài)為SIGNAL,接著在執(zhí)行第二次for循環(huán)時(shí)才會(huì)將當(dāng)前線程進(jìn)行掛起阻塞。

當(dāng)該線程后續(xù)被喚醒時(shí),該線程又會(huì)進(jìn)入下一次for循環(huán)。如果該線程對(duì)應(yīng)的node結(jié)點(diǎn)的前驅(qū)結(jié)點(diǎn)是等待隊(duì)列的頭結(jié)點(diǎn)且state值已為0,那么就執(zhí)行AQS的setHeadAndPropagate()方法設(shè)置頭結(jié)點(diǎn) + 喚醒后續(xù)線程。

其中setHeadAndPropagate()方法有兩個(gè)工作(設(shè)置頭結(jié)點(diǎn) + 喚醒傳遞):

工作一:設(shè)置當(dāng)前被喚醒線程對(duì)應(yīng)的結(jié)點(diǎn)為頭結(jié)點(diǎn)

工作二:當(dāng)滿(mǎn)足如下這兩個(gè)條件的時(shí)候需要調(diào)用doReleaseShared()方法喚醒后續(xù)的線程

條件一:propagate > 0,表示當(dāng)前是共享鎖,需要進(jìn)行喚醒傳遞

條件二:s.isShared()判斷當(dāng)前結(jié)點(diǎn)為共享模式

CountDownLatch的實(shí)現(xiàn)中會(huì)在以下兩個(gè)場(chǎng)景調(diào)用doReleaseShared()方法:

場(chǎng)景一:state為1時(shí)調(diào)用的countDown()方法會(huì)調(diào)用doReleaseShared()方法

場(chǎng)景二:當(dāng)阻塞的線程被喚醒時(shí),會(huì)調(diào)用setHeadAndPropagate()方法,進(jìn)而調(diào)用doReleaseShared()方法,這樣可以提升喚醒共享結(jié)點(diǎn)的速度

(3)CountDownLatch.await()方法的阻塞總結(jié)

只要state != 0,就會(huì)進(jìn)行如下處理:

一.將當(dāng)前線程封裝成一個(gè)Node結(jié)點(diǎn),然后添加到AQS的等待隊(duì)列中

二.調(diào)用LockSupport.park()方法,掛起當(dāng)前線程

3.CountDownLatch.coutDown()方法源碼

(1)CountDownLatch.coutDown()的喚醒流程

(2)CountDownLatch.tryReleaseShared()

(3)AQS的doReleaseShared()方法

(1)CountDownLatch.coutDown()的喚醒流程

調(diào)用CountDownLatch的countDown()方法時(shí),會(huì)先調(diào)用AQS的releaseShared()模版方法,然后會(huì)執(zhí)行CountDownLatch的內(nèi)部類(lèi)Sync實(shí)現(xiàn)的tryReleaseShared()方法。

如果tryReleaseShared()方法返回true,則執(zhí)行AQS的doReleaseShared()方法,通過(guò)AQS的doReleaseShared()方法喚醒共享鎖模式下的等待隊(duì)列中的線程。

//A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
public class CountDownLatch {private final Sync sync;public CountDownLatch(int count) {if (count < 0) {throw new IllegalArgumentException("count < 0");}this.sync = new Sync(count);}//Synchronization control For CountDownLatch.//Uses AQS state to represent count.private static final class Sync extends AbstractQueuedSynchronizer {Sync(int count) {setState(count);}int getCount() {return getState();}protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}protected boolean tryReleaseShared(int releases) {//Decrement count; signal when transition to zerofor (;;) {int c = getState();if (c == 0) {return false;}int nextc = c-1;if (compareAndSetState(c, nextc)) {return nextc == 0;}}}}//Decrements the count of the latch, releasing all waiting threads if the count reaches zero.public void countDown() {//執(zhí)行AQS的releaseShared()方法sync.releaseShared(1);}...
}public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {...//Releases in shared mode.  //Implemented by unblocking one or more threads if #tryReleaseShared returns true.public final boolean releaseShared(int arg) {//執(zhí)行CountDownLatch的內(nèi)部類(lèi)Sync實(shí)現(xiàn)的tryReleaseShared()方法,釋放共享鎖if (tryReleaseShared(arg)) {//執(zhí)行AQS的doReleaseShared()方法doReleaseShared();return true;}return false;}//Release action for shared mode -- signals successor and ensures propagation. //Note: For exclusive mode, release just amounts to calling unparkSuccessor of head if it needs signal.private void doReleaseShared() {for (;;) {//每次循環(huán)時(shí)頭結(jié)點(diǎn)都會(huì)發(fā)生變化//因?yàn)檎{(diào)用unparkSuccessor()方法會(huì)喚醒doAcquireSharedInterruptibly()方法中阻塞的線程//然后阻塞的線程會(huì)在執(zhí)行setHeadAndPropagate()方法時(shí)通過(guò)setHead()修改頭結(jié)點(diǎn)Node h = head;//獲取最新的頭結(jié)點(diǎn)if (h != null && h != tail) {//等待隊(duì)列中存在掛起線程的結(jié)點(diǎn)int ws = h.waitStatus;if (ws == Node.SIGNAL) {//頭結(jié)點(diǎn)的狀態(tài)正常,表示對(duì)應(yīng)的線程可以被喚醒if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {continue;//loop to recheck cases}//喚醒頭結(jié)點(diǎn)的后繼結(jié)點(diǎn)//喚醒的線程會(huì)在doAcquireSharedInterruptibly()方法中執(zhí)行setHeadAndPropagate()方法修改頭結(jié)點(diǎn)unparkSuccessor(h);} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {//如果ws = 0表示初始狀態(tài),則修改結(jié)點(diǎn)為PROPAGATE狀態(tài)continue;//loop on failed CAS}}if (h == head) {//判斷頭結(jié)點(diǎn)是否有變化break;//loop if head changed}}}//Wakes up node's successor, if one exists.private void unparkSuccessor(Node node) {int ws = node.waitStatus;if (ws < 0) {compareAndSetWaitStatus(node, ws, 0);}Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev) {if (t.waitStatus <= 0) {s = t;}}}if (s != null) {LockSupport.unpark(s.thread);}}...
}

(2)CountDownLatch.tryReleaseShared()

從tryReleaseShared()方法可知:每次countDown()其實(shí)就是把AQS的state值減1,然后通過(guò)CAS更新state值。如果CAS設(shè)置成功,那么就判斷當(dāng)前state值是否為0。如果是0那么就返回true,如果不是0那么就返回false。返回true的時(shí)候會(huì)調(diào)用AQS的doReleaseShared()方法,喚醒等待隊(duì)列中的線程。

(3)AQS的doReleaseShared()方法

該方法要從AQS的等待隊(duì)列中喚醒頭結(jié)點(diǎn)的后繼結(jié)點(diǎn),需要滿(mǎn)足:

條件一:等待隊(duì)列中要存在掛起線程的結(jié)點(diǎn)(h != null && h != tail)

條件二:等待隊(duì)列的頭結(jié)點(diǎn)的狀態(tài)正常(h.waitStatus = Node.SIGNAL)

在共享鎖模式下,state為0時(shí)需要通過(guò)喚醒傳遞把所有掛起的線程都喚醒。首先doReleaseShared()方法會(huì)通過(guò)for(;;)進(jìn)行自旋操作,每次循環(huán)都會(huì)通過(guò)Node h = head來(lái)獲取等待隊(duì)列中最新的頭結(jié)點(diǎn),然后通過(guò)if (h == head)來(lái)判斷等待隊(duì)列中的頭結(jié)點(diǎn)是否發(fā)生變化。如果沒(méi)有變化,則退出自旋。

注意:在共享鎖模式下,被unparkSuccessor()喚醒的等待隊(duì)列中的線程,會(huì)繼續(xù)在在doAcquireSharedInterruptibly()方法中,執(zhí)行setHeadAndPropagate()方法修改頭結(jié)點(diǎn),從而實(shí)現(xiàn)喚醒傳遞。

4.CountDownLatch總結(jié)

假設(shè)有兩個(gè)線程A和B,分別調(diào)用了CountDownLatch的await()方法,此時(shí)state所表示的計(jì)數(shù)器不為0。所以線程A和B會(huì)被封裝成SHARED類(lèi)型的結(jié)點(diǎn),并添加到AQS的等待隊(duì)列中。

當(dāng)線程C調(diào)用CountDownLatch的coutDown()方法后,如果state被遞減到0,那么就會(huì)調(diào)用doReleaseShared()方法喚醒等待隊(duì)列中的線程。然后被喚醒的線程會(huì)繼續(xù)調(diào)用setHeadAndPropagate()方法實(shí)現(xiàn)喚醒傳遞,從而繼續(xù)在doReleaseShared()方法中喚醒所有在等待隊(duì)列中的被阻塞的線程。

5.控制并發(fā)線程數(shù)的Semaphore介紹

(1)Semaphore的作用

(2)Semaphore的方法

(3)Semaphore原理分析

(1)Semaphore的作用

Semaphore信號(hào)量用來(lái)控制同時(shí)訪問(wèn)特定資源的線程數(shù)量,有兩核心方法。

方法一:acquire()方法,獲取一個(gè)令牌

方法二:release()方法,釋放一個(gè)令牌

多個(gè)線程訪問(wèn)某限制訪問(wèn)流量的資源時(shí),可先調(diào)用acquire()獲取訪問(wèn)令牌。如果能夠正常獲得,則表示允許訪問(wèn)。如果令牌不夠,則會(huì)阻塞當(dāng)前線程。當(dāng)某個(gè)獲得令牌的線程通過(guò)release()方法釋放一個(gè)令牌后,被阻塞在acquire()方法的線程就有機(jī)會(huì)獲得這個(gè)釋放的令牌。

public class SemaphoreDemo {public static void main(String[] args) throws InterruptedException {Semaphore semaphore = new Semaphore(10, true);//初始化10個(gè)資源,使用公平鎖 semaphore.acquire();//每次獲取一個(gè)資源,如果獲取不到,線程就會(huì)阻塞semaphore.release();//釋放一個(gè)資源}
}

(2)Semaphore的方法

Semaphore實(shí)際上并沒(méi)有一個(gè)真實(shí)的令牌發(fā)給線程,Semaphore只是對(duì)一個(gè)可分配數(shù)量進(jìn)行計(jì)數(shù)維護(hù),或者說(shuō)進(jìn)行許可證管理。Semaphore可以在公共資源有限的場(chǎng)景下實(shí)現(xiàn)流量控制,如數(shù)據(jù)庫(kù)連接。

一.Semaphore(permits, fair):permits表示令牌數(shù),fair表示公平性
二.acquire(permits):獲取指定數(shù)量的令牌,如果數(shù)量不足則阻塞當(dāng)前線程
三.tryAcquire(permits):嘗試獲取指定數(shù)量的令牌,此過(guò)程是非阻塞的,成功返回true,失敗返回false 
四.release(permits):釋放指定數(shù)量的令牌
五.drainPermits():當(dāng)前線程獲得剩下的所有令牌
六.hasQueuedThread():判斷當(dāng)前Semaphore實(shí)例上是否存在等待令牌的線程

(3)Semaphore原理分析

Semaphore也是基于AQS中的共享鎖來(lái)實(shí)現(xiàn)的。在創(chuàng)建Semaphore實(shí)例時(shí)傳遞的參數(shù)permits,其實(shí)就是AQS中的state屬性。每次調(diào)用Semaphore的acquire()方法,都會(huì)對(duì)state值進(jìn)行遞減。

所以從根本上說(shuō),Semaphore是通過(guò)重寫(xiě)AQS的兩個(gè)方法來(lái)實(shí)現(xiàn)的:

方法一:tryAcquireShared(),搶占共享鎖

方法二:tryReleaseShared(),釋放共享鎖

public class Semaphore implements java.io.Serializable {private final Sync sync;//Creates a Semaphore with the given number of permits and nonfair fairness setting.public Semaphore(int permits) {sync = new NonfairSync(permits);}static final class NonfairSync extends Sync {NonfairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);}}//Acquires a permit from this semaphore, blocking until one is available, //or the thread is Thread#interrupt interrupted.public void acquire() throws InterruptedException {//執(zhí)行AQS的模版方法acquireSharedInterruptibly()sync.acquireSharedInterruptibly(1);}//Releases a permit, returning it to the semaphore.public void release() {//執(zhí)行AQS的模版方法releaseShared()sync.releaseShared(1);}//Synchronization implementation for semaphore.  //Uses AQS state to represent permits. Subclassed into fair and nonfair versions.abstract static class Sync extends AbstractQueuedSynchronizer {Sync(int permits) {//設(shè)置state的值為傳入的令牌數(shù)setState(permits);}final int getPermits() {return getState();}final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 || compareAndSetState(available, remaining)) {return remaining;}}}protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) {throw new Error("Maximum permit count exceeded");}if (compareAndSetState(current, next)) {return true;}}}...}...
}public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {...//Acquires in shared mode, aborting if interrupted.//Implemented by first checking interrupt status, then invoking at least once #tryAcquireShared, returning on success.//Otherwise the thread is queued, possibly repeatedly blocking and unblocking,//invoking #tryAcquireShared until success or the thread is interrupted.public final void acquireSharedInterruptibly(int arg) throws InterruptedException {if (Thread.interrupted()) {throw new InterruptedException();}//執(zhí)行Semaphore的內(nèi)部類(lèi)Sync的子類(lèi)實(shí)現(xiàn)的tryAcquireShared()方法,搶占共享鎖if (tryAcquireShared(arg) < 0) {//執(zhí)行AQS的doAcquireSharedInterruptibly()方法doAcquireSharedInterruptibly(arg);}}//Releases in shared mode.  //Implemented by unblocking one or more threads if #tryReleaseShared returns true.public final boolean releaseShared(int arg) {//執(zhí)行Semaphore的內(nèi)部類(lèi)Sync實(shí)現(xiàn)的tryReleaseShared()方法,釋放共享鎖if (tryReleaseShared(arg)) {//執(zhí)行AQS的doReleaseShared()方法doReleaseShared();return true;}return false;}...
}

6.Semaphore的令牌獲取過(guò)程

(1)Semaphore的令牌獲取過(guò)程

(2)Semaphore的公平策略

(3)Semaphore的非公平策略

(4)tryAcquireShared()后的處理

(1)Semaphore的令牌獲取過(guò)程

在調(diào)用Semaphore的acquire()方法獲取令牌時(shí):首先會(huì)執(zhí)行AQS的模版方法acquireSharedInterruptibly(),然后執(zhí)行Sync子類(lèi)實(shí)現(xiàn)的tryAcquireShared()方法來(lái)?yè)屨兼i。如果搶占鎖失敗,則執(zhí)行AQS的doAcquireSharedInterruptibly()方法。該方法會(huì)將當(dāng)前線程封裝成Node結(jié)點(diǎn)并加入等待隊(duì)列,然后掛起線程。

(2)Semaphore的公平策略

在執(zhí)行Sync子類(lèi)FairSync的tryAcquireShared()方法嘗試獲取令牌時(shí),先通過(guò)AQS的hasQueuedPredecessors()判斷是否已有線程在等待隊(duì)列中。如果已經(jīng)有線程在等待隊(duì)列中,那么當(dāng)前線程獲取令牌就必然失敗。否則,就遞減state的值 + 判斷state是否小于0 + CAS設(shè)置state的值。

(3)Semaphore的非公平策略

在執(zhí)行Sync子類(lèi)NonfairSync的tryAcquireShared()方法嘗試獲取令牌時(shí),則會(huì)直接執(zhí)行Sync的nonfairTryAcquireShared()方法來(lái)獲取令牌,也就是遞減state的值 + 判斷state是否小于0 + CAS設(shè)置state的值。

(4)tryAcquireShared()后的處理

不管公平策略還是非公平策略,對(duì)應(yīng)的tryAcquireShared()方法都是通過(guò)自旋來(lái)?yè)屨剂钆?CAS設(shè)置state),直到令牌數(shù)不夠時(shí)才會(huì)讓tryAcquireShared()方法返回小于0的數(shù)值。然后觸發(fā)執(zhí)行AQS的doAcquireSharedInterruptibly()方法,該方法會(huì)將當(dāng)前線程封裝成Node結(jié)點(diǎn)并加入等待隊(duì)列,然后掛起線程。

public class Semaphore implements java.io.Serializable {private final Sync sync;//Creates a Semaphore with the given number of permits and nonfair fairness setting.public Semaphore(int permits) {sync = new NonfairSync(permits);}static final class NonfairSync extends Sync {NonfairSync(int permits) {super(permits);}//以非公平鎖的方式獲取令牌protected int tryAcquireShared(int acquires) {//執(zhí)行Sync的nonfairTryAcquireShared()方法return nonfairTryAcquireShared(acquires);}}static final class FairSync extends Sync {FairSync(int permits) {super(permits);}//以公平鎖的方式獲取令牌protected int tryAcquireShared(int acquires) {for (;;) {//如果已經(jīng)有線程在等待隊(duì)列中,那么就說(shuō)明獲取令牌必然失敗if (hasQueuedPredecessors()) {return -1;}int available = getState();int remaining = available - acquires;if (remaining < 0 || compareAndSetState(available, remaining)) {return remaining;}}}}//Acquires a permit from this semaphore, blocking until one is available, //or the thread is Thread#interrupt interrupted.public void acquire() throws InterruptedException {//執(zhí)行AQS的模版方法acquireSharedInterruptibly()sync.acquireSharedInterruptibly(1);}//Synchronization implementation for semaphore.  //Uses AQS state to represent permits. Subclassed into fair and nonfair versions.abstract static class Sync extends AbstractQueuedSynchronizer {Sync(int permits) {//設(shè)置state的值為傳入的令牌數(shù)setState(permits);}final int getPermits() {return getState();}final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 || compareAndSetState(available, remaining)) {return remaining;}}}...}...
}public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {...//Acquires in shared mode, aborting if interrupted.//Implemented by first checking interrupt status, then invoking at least once #tryAcquireShared, returning on success.//Otherwise the thread is queued, possibly repeatedly blocking and unblocking,//invoking #tryAcquireShared until success or the thread is interrupted.public final void acquireSharedInterruptibly(int arg) throws InterruptedException {if (Thread.interrupted()) {throw new InterruptedException();}//執(zhí)行Semaphore的內(nèi)部類(lèi)Sync的子類(lèi)實(shí)現(xiàn)的tryAcquireShared()方法,搶占共享鎖if (tryAcquireShared(arg) < 0) {//執(zhí)行AQS的doAcquireSharedInterruptibly()方法doAcquireSharedInterruptibly(arg);}}//Queries whether any threads have been waiting to acquire longer than the current thread.public final boolean hasQueuedPredecessors() {Node t = tail; // Read fields in reverse initialization orderNode h = head;Node s;return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());}//Acquires in shared interruptible mode.private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {final Node node = addWaiter(Node.SHARED);//封裝當(dāng)前線程為Shared類(lèi)型的Node結(jié)點(diǎn)boolean failed = true;try {//第一次循環(huán)r = -1,所以會(huì)執(zhí)行AQS的shouldParkAfterFailedAcquire()方法//將node結(jié)點(diǎn)的有效前驅(qū)結(jié)點(diǎn)的狀態(tài)設(shè)置為SIGNALfor (;;) {final Node p = node.predecessor();//node結(jié)點(diǎn)的前驅(qū)結(jié)點(diǎn)if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}//執(zhí)行shouldParkAfterFailedAcquire()方法設(shè)置node結(jié)點(diǎn)的前驅(qū)結(jié)點(diǎn)的狀態(tài)為SIGNAL//執(zhí)行parkAndCheckInterrupt()方法掛起當(dāng)前線程if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {throw new InterruptedException();}}} finally {if (failed) {cancelAcquire(node);}}}...
}

7.Semaphore的令牌釋放過(guò)程

(1)Semaphore的令牌釋放過(guò)程

(2)Semaphore的令牌釋放本質(zhì)

(1)Semaphore的令牌釋放過(guò)程

在調(diào)用Semaphore的release()方法去釋放令牌時(shí):首先會(huì)執(zhí)行AQS的模版方法releaseShared(),然后執(zhí)行Sync實(shí)現(xiàn)的tryReleaseShared()方法來(lái)釋放鎖(累加state值)。如果釋放鎖成功,則執(zhí)行AQS的doReleaseShared()方法去喚醒線程。

(2)Semaphore的令牌釋放本質(zhì)

Semaphore的release()方法釋放令牌的本質(zhì)就是對(duì)state字段進(jìn)行累加,然后喚醒等待隊(duì)列頭結(jié)點(diǎn)的后繼結(jié)點(diǎn) + 喚醒傳遞來(lái)喚醒等待的線程。

注意:并非一定要執(zhí)行acquire()方法的線程才能調(diào)用release()方法,任意一個(gè)線程都可以調(diào)用release()方法,也可以通過(guò)reducePermits()方法來(lái)減少令牌數(shù)。

public class Semaphore implements java.io.Serializable {private final Sync sync;//Creates a Semaphore with the given number of permits and nonfair fairness setting.public Semaphore(int permits) {sync = new NonfairSync(permits);}//Releases a permit, returning it to the semaphore.public void release() {//執(zhí)行AQS的模版方法releaseShared()sync.releaseShared(1);}//Synchronization implementation for semaphore.  //Uses AQS state to represent permits. Subclassed into fair and nonfair versions.abstract static class Sync extends AbstractQueuedSynchronizer {Sync(int permits) {//設(shè)置state的值為傳入的令牌數(shù)setState(permits);}//嘗試釋放鎖,也就是對(duì)state值進(jìn)行累加protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) {throw new Error("Maximum permit count exceeded");}if (compareAndSetState(current, next)) {return true;}}}...}...
}public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {...    //Releases in shared mode.  //Implemented by unblocking one or more threads if #tryReleaseShared returns true.public final boolean releaseShared(int arg) {//執(zhí)行Semaphore的內(nèi)部類(lèi)Sync實(shí)現(xiàn)的tryReleaseShared()方法,釋放共享鎖if (tryReleaseShared(arg)) {//執(zhí)行AQS的doReleaseShared()方法,喚醒等待隊(duì)列中的線程doReleaseShared();return true;}return false;}//Release action for shared mode -- signals successor and ensures propagation. //Note: For exclusive mode, release just amounts to calling unparkSuccessor of head if it needs signal.private void doReleaseShared() {for (;;) {//每次循環(huán)時(shí)頭結(jié)點(diǎn)都會(huì)發(fā)生變化//因?yàn)檎{(diào)用unparkSuccessor()方法會(huì)喚醒doAcquireSharedInterruptibly()方法中阻塞的線程//然后阻塞的線程會(huì)在執(zhí)行setHeadAndPropagate()方法時(shí)通過(guò)setHead()修改頭結(jié)點(diǎn)Node h = head;//獲取最新的頭結(jié)點(diǎn)if (h != null && h != tail) {//等待隊(duì)列中存在掛起線程的結(jié)點(diǎn)int ws = h.waitStatus;if (ws == Node.SIGNAL) {//頭結(jié)點(diǎn)的狀態(tài)正常,表示對(duì)應(yīng)的線程可以被喚醒if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {continue;//loop to recheck cases}//喚醒頭結(jié)點(diǎn)的后繼結(jié)點(diǎn)//喚醒的線程會(huì)在doAcquireSharedInterruptibly()方法中執(zhí)行setHeadAndPropagate()方法修改頭結(jié)點(diǎn)unparkSuccessor(h);} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {//如果ws = 0表示初始狀態(tài),則修改結(jié)點(diǎn)為PROPAGATE狀態(tài)continue;//loop on failed CAS}}if (h == head) {//判斷頭結(jié)點(diǎn)是否有變化break;//loop if head changed}}}//Wakes up node's successor, if one exists.private void unparkSuccessor(Node node) {int ws = node.waitStatus;if (ws < 0) {compareAndSetWaitStatus(node, ws, 0);}Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev) {if (t.waitStatus <= 0) {s = t;}}}if (s != null) {LockSupport.unpark(s.thread);}}...
}

8.同步屏障CyclicBarrier介紹

(1)CyclicBarrier的作用

(2)CyclicBarrier的基本原理

(1)CyclicBarrier的作用

CyclicBarrier的字面意思就是可循環(huán)使用的屏障。CyclicBarrier的主要作用就是讓一組線程到達(dá)一個(gè)屏障時(shí)被阻塞,直到最后一個(gè)線程到達(dá)屏障時(shí)屏障才會(huì)打開(kāi),接著才讓所有被屏障攔截的線程一起繼續(xù)往下執(zhí)行。線程進(jìn)入屏障是通過(guò)CyclicBarrier的await()方法來(lái)實(shí)現(xiàn)的。

(2)CyclicBarrier的基本原理

假設(shè)有3個(gè)線程在運(yùn)行中都會(huì)調(diào)用CyclicBarrier的await()方法,而每個(gè)線程從開(kāi)始執(zhí)行到執(zhí)行await()方法所用時(shí)間可能不一樣,最終當(dāng)執(zhí)行時(shí)間最長(zhǎng)的線程到達(dá)屏障時(shí),會(huì)喚醒其他較早到達(dá)屏障的線程繼續(xù)往下執(zhí)行。

CyclicBarrier包含兩個(gè)層面的意思:

一是Barrier屏障點(diǎn),線程調(diào)用await()方法都會(huì)阻塞在屏障點(diǎn),直到所有線程都到達(dá)屏障點(diǎn)后再放行。

二是Cyclic循環(huán),當(dāng)所有線程通過(guò)當(dāng)前屏障點(diǎn)后,又可以進(jìn)入下一輪的屏障點(diǎn)進(jìn)行等待,可以不斷循環(huán)。

9.CyclicBarrier的await()方法源碼

(1)CyclicBarrier的成員變量

(2)CyclicBarrier的await()方法源碼

(3)CountDownLatch和CyclicBarrier對(duì)比

(1)CyclicBarrier的成員變量

//A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.  
//CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. 
//The barrier is called cyclic because it can be re-used after the waiting threads are released.
public class CyclicBarrier {...private static class Generation {boolean broken = false;}private final ReentrantLock lock = new ReentrantLock();private final Condition trip = lock.newCondition();//用于線程之間相互喚醒private final int parties;//參與的線程數(shù)量private int count;//初始值是parties,每調(diào)用一次await()就減1private final Runnable barrierCommand;//回調(diào)任務(wù)private Generation generation = new Generation();...
}

CyclicBarrier是基于ReentrantLock + Condition來(lái)實(shí)現(xiàn)的。

parties表示每次要求到達(dá)屏障點(diǎn)的線程數(shù),只有到達(dá)屏障點(diǎn)的線程數(shù)滿(mǎn)足指定的parties數(shù)量,所有線程才會(huì)被喚醒。

count是一個(gè)初始值為parties的計(jì)數(shù)器,每個(gè)線程調(diào)用await()方法會(huì)對(duì)count減1,當(dāng)count為0時(shí)會(huì)喚醒所有線程,并且結(jié)束當(dāng)前的屏障周期generation,然后所有線程進(jìn)入下一個(gè)屏障周期,而且count會(huì)恢復(fù)成parties。

(2)CyclicBarrier的await()方法源碼

線程調(diào)用CyclicBarrier的await()方法時(shí),會(huì)觸發(fā)調(diào)用CyclicBarrier的dowait()方法。

CyclicBarrier的dowait()方法會(huì)對(duì)count計(jì)數(shù)器進(jìn)行遞減。如果count遞減到0,則會(huì)調(diào)用CyclicBarrier的nextGeneration()喚醒所有線程,同時(shí)如果異步回調(diào)任務(wù)barrierCommand不為空,則會(huì)執(zhí)行該任務(wù)。如果count還沒(méi)遞減到0,則調(diào)用Condition的await()方法阻塞當(dāng)前線程。

被阻塞的線程,除了會(huì)被CyclicBarrier的nextGeneration()方法喚醒外,還會(huì)被Thread的interrupt()方法喚醒、被中斷異常喚醒,而這些喚醒會(huì)調(diào)用CyclicBarrier的breakBarrier()方法。

在CyclicBarrier的nextGeneration()方法和CyclicBarrier的breakBarrier()方法中,都會(huì)通過(guò)Condition的signalAll()方法喚醒所有被阻塞等待的線程。

//A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.  
//CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. 
//The barrier is called cyclic because it can be re-used after the waiting threads are released.
public class CyclicBarrier {...private static class Generation {boolean broken = false;//用來(lái)標(biāo)記屏障是否被中斷}private final ReentrantLock lock = new ReentrantLock();private final Condition trip = lock.newCondition();//用于線程之間相互喚醒private final int parties;//參與的線程數(shù)量private int count;//初始值是parties,每調(diào)用一次await()就減1private final Runnable barrierCommand;//回調(diào)任務(wù)private Generation generation = new Generation();public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();this.parties = parties;this.count = parties;this.barrierCommand = barrierAction;}public CyclicBarrier(int parties) {this(parties, null);}//Waits until all #getParties have invoked await on this barrier.public int await() throws InterruptedException, BrokenBarrierException {try {//執(zhí)行CyclicBarrier的dowait()方法return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe);}}//Main barrier code, covering the various policies.private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {final ReentrantLock lock = this.lock;lock.lock();//使用Condition需要先獲取鎖try {//獲取當(dāng)前的generationfinal Generation g = generation;//確認(rèn)當(dāng)前generation的barrier是否有效,如果generation的broken為true,則拋出屏障中斷異常if (g.broken) {throw new BrokenBarrierException();}if (Thread.interrupted()) {breakBarrier();throw new InterruptedException();}//統(tǒng)計(jì)已經(jīng)到達(dá)當(dāng)前generation的線程數(shù)量int index = --count;//如果index為0,則表示所有線程都到達(dá)了屏障點(diǎn)if (index == 0) {boolean ranAction = false;try {final Runnable command = barrierCommand;if (command != null) {//觸發(fā)回調(diào)command.run();}ranAction = true;//執(zhí)行nextGeneration()方法喚醒所有線程,同時(shí)進(jìn)入下一個(gè)屏障周期nextGeneration();return 0;} finally {if (!ranAction) {breakBarrier();}}}//loop until tripped, broken, interrupted, or timed out//如果index > 0,則阻塞當(dāng)前線程for (;;) {try {if (!timed) {//通過(guò)Condition的await()方法,在阻塞當(dāng)前線程的同時(shí)釋放鎖//這樣其他線程就能獲取到鎖執(zhí)行上面的index = --counttrip.await();} else if (nanos > 0L) {nanos = trip.awaitNanos(nanos);}} catch (InterruptedException ie) {if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {Thread.currentThread().interrupt();}}if (g.broken) {throw new BrokenBarrierException();}if (g != generation) {return index;}if (timed && nanos <= 0L) {//中斷屏障,設(shè)置generation.broken為truebreakBarrier();throw new TimeoutException();}}} finally {lock.unlock();}}//Updates state on barrier trip and wakes up everyone.//Called only while holding lock.private void nextGeneration() {//通過(guò)Condition的signalAll()喚醒所有等待的線程trip.signalAll();//還原countcount = parties;//進(jìn)入新的generationgeneration = new Generation();}//Sets current barrier generation as broken and wakes up everyone.//Called only while holding lock.private void breakBarrier() {generation.broken = true;count = parties;//通過(guò)Condition的signalAll()喚醒所有等待的線程trip.signalAll();}...
}

(3)CountDownLatch和CyclicBarrier對(duì)比

一.CyclicBarrier可以被重用、可以響應(yīng)中斷

二.CountDownLatch的計(jì)數(shù)器只能使用一次,但可以通過(guò)reset()方法重置

10.使用CountDownLatch等待注冊(cè)的完成

Hadoop HDFS(分布式存儲(chǔ)系統(tǒng))的NameNode分為主備兩個(gè)節(jié)點(diǎn),各個(gè)DataNode在啟動(dòng)時(shí)都會(huì)向兩個(gè)NameNode進(jìn)行注冊(cè),此時(shí)就可以使用CountDownLatch等待向主備節(jié)點(diǎn)注冊(cè)的完成。

//DataNode啟動(dòng)類(lèi)
public class DataNode {//是否還在運(yùn)行private volatile Boolean shouldRun;//負(fù)責(zé)和一組NameNode(主NameNode + 備N(xiāo)ameNode)通信的組件private NameNodeGroupOfferService offerService;//初始化DataNodeprivate void initialize() {this.shouldRun = true;this.offerService = new NameNodeGroupOfferService();this.offerService.start();  }//運(yùn)行DataNodeprivate void run() {try {while(shouldRun) {Thread.sleep(10000);  }   } catch (Exception e) {e.printStackTrace();}}public static void main(String[] args) {DataNode datanode = new DataNode();datanode.initialize();datanode.run(); }
}//負(fù)責(zé)某個(gè)NameNode進(jìn)行通信的線程組件
public class NameNodeServiceActor {//向某個(gè)NameNode進(jìn)行注冊(cè)public void register(CountDownLatch latch) {Thread registerThread = new RegisterThread(latch);registerThread.start(); }//負(fù)責(zé)注冊(cè)的線程,傳入一個(gè)CountDownLatchclass RegisterThread extends Thread {CountDownLatch latch;public RegisterThread(CountDownLatch latch) {this.latch = latch;}@Overridepublic void run() {try {//發(fā)送rpc接口調(diào)用請(qǐng)求到NameNode去進(jìn)行注冊(cè)System.out.println("發(fā)送請(qǐng)求到NameNode進(jìn)行注冊(cè)...");Thread.sleep(1000);  latch.countDown();  } catch (Exception e) {e.printStackTrace();}}}
}//負(fù)責(zé)跟一組NameNode(主NameNode + 備N(xiāo)ameNode)進(jìn)行通信的線程組件
public class NameNodeGroupOfferService {//負(fù)責(zé)跟NameNode主節(jié)點(diǎn)通信的ServiceActor組件private NameNodeServiceActor activeServiceActor;//負(fù)責(zé)跟NameNode備節(jié)點(diǎn)通信的ServiceActor組件private NameNodeServiceActor standbyServiceActor;//構(gòu)造函數(shù)public NameNodeGroupOfferService() {this.activeServiceActor = new NameNodeServiceActor();this.standbyServiceActor = new NameNodeServiceActor();}//啟動(dòng)OfferService組件public void start() {//直接使用兩個(gè)ServiceActor組件分別向主備兩個(gè)NameNode節(jié)點(diǎn)進(jìn)行注冊(cè)register();}//向主備兩個(gè)NameNode節(jié)點(diǎn)進(jìn)行注冊(cè)private void register() {try {CountDownLatch latch = new CountDownLatch(2);  this.activeServiceActor.register(latch); this.standbyServiceActor.register(latch); latch.await();//阻塞等待主備都完成注冊(cè)System.out.println("主備N(xiāo)ameNode全部注冊(cè)完畢...");} catch (Exception e) {e.printStackTrace();  }}
}

11.使用CyclicBarrier將工作任務(wù)多線程分而治之

//輸出結(jié)果:
//線程1執(zhí)行自己的一部分工作...
//線程2執(zhí)行自己的一部分工作...
//線程3執(zhí)行自己的一部分工作...
//所有線程都完成自己的任務(wù),可以合并結(jié)果了...
//最終結(jié)果合并完成,線程3可以退出...
//最終結(jié)果合并完成,線程1可以退出...
//最終結(jié)果合并完成,線程2可以退出...
public class CyclicBarrierDemo {public static void main(String[] args) {final CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {public void run() {System.out.println("所有線程都完成自己的任務(wù),可以合并結(jié)果了...");}});new Thread() {public void run() {try {System.out.println("線程1執(zhí)行自己的一部分工作...");barrier.await();System.out.println("最終結(jié)果合并完成,線程1可以退出...");} catch (Exception e) {e.printStackTrace();}}}.start();new Thread() {public void run() {try {System.out.println("線程2執(zhí)行自己的一部分工作...");barrier.await();System.out.println("最終結(jié)果合并完成,線程2可以退出...");} catch (Exception e) {e.printStackTrace();}}}.start();new Thread() {public void run() {try {System.out.println("線程3執(zhí)行自己的一部分工作...");barrier.await();System.out.println("最終結(jié)果合并完成,線程3可以退出...");} catch (Exception e) {e.printStackTrace();}}}.start();}
}

12.使用CyclicBarrier聚合服務(wù)接口的返回結(jié)果

當(dāng)然也可以使用CountDownLatch來(lái)實(shí)現(xiàn)聚合服務(wù)接口的返回結(jié)果;

public class ApiServiceDemo {public Map<String, Object> queryOrders() throws Exception {final List<Object> results = new ArrayList<Object>();final Map<String, Object> map = new ConcurrentHashMap<String, Object>();CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {@Overridepublic void run() {map.put("price", results.get(0));   map.put("order", results.get(1)); map.put("stats", results.get(2));  }});//請(qǐng)求價(jià)格接口new Thread() {public void run() {try {System.out.println("請(qǐng)求價(jià)格服務(wù)..."); Thread.sleep(1000);  results.add(new Object());    barrier.await();} catch (Exception e) {e.printStackTrace();  } };}.start();//請(qǐng)求訂單接口new Thread() {public void run() {try {System.out.println("請(qǐng)求訂單服務(wù)..."); Thread.sleep(1000);  results.add(new Object());    barrier.await();} catch (Exception e) {e.printStackTrace();  } };}.start();//請(qǐng)求統(tǒng)計(jì)接口new Thread() {public void run() {try {System.out.println("請(qǐng)求訂單統(tǒng)計(jì)服務(wù)..."); Thread.sleep(1000);  results.add(new Object());    barrier.await();} catch (Exception e) {e.printStackTrace();  } };}.start();while(map.size() < 3) {Thread.sleep(100);  }return map;}
}

13.使用Semaphore等待指定數(shù)量線程完成任務(wù)

可以通過(guò)Semaphore實(shí)現(xiàn)等待指定數(shù)量的線程完成任務(wù)才往下執(zhí)行。

//輸出結(jié)果如下:
//線程2執(zhí)行一個(gè)計(jì)算任務(wù)
//等待1個(gè)線程完成任務(wù)即可...
//線程1執(zhí)行一個(gè)計(jì)算任務(wù)
public class SemaphoreDemo {public static void main(String[] args) throws Exception {final Semaphore semaphore = new Semaphore(0);new Thread() {public void run() {try {Thread.sleep(2000);System.out.println("線程1執(zhí)行一個(gè)計(jì)算任務(wù)");semaphore.release();} catch (Exception e) {e.printStackTrace();}}}.start();new Thread() {public void run() {try {Thread.sleep(1000);System.out.println("線程2執(zhí)行一個(gè)計(jì)算任務(wù)");semaphore.release();} catch (Exception e) {e.printStackTrace();}}}.start();semaphore.acquire(1);System.out.println("等待1個(gè)線程完成任務(wù)即可...");}
}

http://www.risenshineclean.com/news/407.html

相關(guān)文章:

  • 網(wǎng)站技術(shù)可行性/立即優(yōu)化在哪里
  • 全國(guó)做網(wǎng)站的公司/做推廣哪個(gè)平臺(tái)效果好
  • 簡(jiǎn)單展示網(wǎng)站模板/百度搜索官網(wǎng)
  • wordpress頁(yè)面markdown/seo品牌
  • 三明網(wǎng)站優(yōu)化/推廣優(yōu)化網(wǎng)站排名
  • 有哪些攝影網(wǎng)站/站長(zhǎng)工具源碼
  • 建設(shè)網(wǎng)站成本預(yù)算/青島疫情最新情況
  • 外貿(mào)網(wǎng)站使用攻略/自己開(kāi)網(wǎng)店怎么運(yùn)營(yíng)
  • 企業(yè)網(wǎng)站要更新文章嗎/域名注冊(cè)查詢(xún)?nèi)肟?/a>
  • 支付網(wǎng)站建設(shè)費(fèi)/正規(guī)的培訓(xùn)學(xué)校
  • 網(wǎng)站關(guān)鍵詞庫(kù)如何做/惠州seo推廣外包
  • 做網(wǎng)站jsp和php/成功營(yíng)銷(xiāo)十大經(jīng)典案例
  • 網(wǎng)站子頁(yè)面如何做seo/信息流優(yōu)化師工作總結(jié)
  • 怎么做一簾幽夢(mèng)網(wǎng)站/全渠道營(yíng)銷(xiāo)管理平臺(tái)
  • 珠海網(wǎng)站建設(shè)小程序/百度搜索網(wǎng)站
  • 武漢企業(yè)建站公司/百度官方電話號(hào)碼
  • 二手車(chē)交易網(wǎng)站怎么做/百度推廣網(wǎng)址是多少
  • 網(wǎng)站設(shè)計(jì)服務(wù)有哪些/網(wǎng)頁(yè)seo
  • 網(wǎng)站開(kāi)發(fā) 項(xiàng)目計(jì)劃/免費(fèi)推廣網(wǎng)站地址大全
  • 鹽城市亭湖區(qū)建設(shè)局網(wǎng)站/博客可以做seo嗎
  • 10個(gè)值得推薦的免費(fèi)設(shè)計(jì)網(wǎng)站/怎么讓網(wǎng)站快速收錄
  • photoshop在線修圖/東莞seo管理
  • 門(mén)戶(hù)網(wǎng)站樣式/百度提問(wèn)在線回答問(wèn)題
  • 網(wǎng)站聯(lián)系我們的地圖怎么做的/seo收費(fèi)標(biāo)準(zhǔn)
  • 北京網(wǎng)站建設(shè)交易/肇慶seo優(yōu)化
  • 網(wǎng)站 色調(diào)/世界搜索引擎大全
  • 南充網(wǎng)站建設(shè)工作室/互聯(lián)網(wǎng)推廣有哪些方式
  • 工業(yè)和信息化部icp網(wǎng)站備案系統(tǒng)/百度查詢(xún)?nèi)肟?/a>
  • 游戲推廣網(wǎng)站如何做的/一件代發(fā)48個(gè)貨源網(wǎng)站
  • 淘寶客怎么建設(shè)網(wǎng)站/實(shí)時(shí)新聞