靠比較好的軟件下載網(wǎng)站社交媒體營銷三種方式
Java JUC(三) AQS與同步工具詳解
一. ReentrantLock 概述
ReentrantLock
是 java.util.concurrent.locks
包下的一個同步工具類,它實現(xiàn)了 Lock
接口,提供了一種相比synchronized
關(guān)鍵字更靈活的鎖機制。ReentrantLock
是一種獨占式且可重入的鎖,并且支持可中斷、公平鎖/非公平鎖、超時等待、條件變量等高級特性。其特點如下:
- 獨占式: 一把鎖在同一時間只能被一個線程所獲取;
- 可重入: 可重入意味著同一個線程如果已持有某個鎖,則可以繼續(xù)多次獲得該鎖(注意釋放同樣次之后才算完全釋放成功);
- 可中斷: 在線程獲取鎖的等待過程中可以中斷獲取,放棄等待而轉(zhuǎn)去執(zhí)行其他邏輯;
- 公平性:
ReentrantLock
支持公平鎖和非公平鎖(默認)兩種模式。其中,公平鎖會按照線程請求鎖的順序來分配鎖(降低性能),而非公平鎖允許線程搶占已等待的線程的鎖(可能存在饑餓現(xiàn)象); - 條件變量: 通過
Condition
接口的實現(xiàn),允許線程在某些條件下等待或喚醒,即可以實現(xiàn)選擇性通知;
Type | Method | Description |
---|---|---|
/ | ReentrantLock() | 無參構(gòu)造方法,默認為非公平鎖 |
/ | ReentrantLock(boolean fair) | 帶參構(gòu)造方法,其中fair 表示鎖的公平性策略:- true : 公平鎖- false : 非公平鎖 |
void | lock() | 不可中斷式獲取鎖。若當前鎖已被其他線程持有,則阻塞等待;**注意:**該獲鎖過程不可被中斷 |
void | lockInterruptibly() throws InterruptedException | 可中斷式獲取鎖。若當前鎖已被其他線程持有,則阻塞等待;**注意:**該獲鎖等待過程可被中斷,拋出InterruptedException ,并清除當前線程的中斷狀態(tài) |
boolean | tryLock() | 嘗試獲取鎖,該方法會立即返回。若獲鎖成功,則返回true ,否則將返回false ;**注意:**該方法會破壞公平鎖配置,即在公平鎖策略下,該方法也會立即嘗試獲取可用鎖 |
boolean | tryLock(long timeout,TimeUnit unit) throws InterruptedException | 在給定時間timeout 內(nèi)嘗試獲取鎖。若獲鎖成功,則返回true ,否則將阻塞等待直到timeout 過期,返回false ;注意: - 獲鎖等待過程可被中斷,拋出 InterruptedException ,并清除當前線程的中斷狀態(tài)- 遵循公平鎖配置策略,即在公平鎖策略下,該方法會按順序等待獲取鎖 |
void | unlock() | 當前線程嘗試釋放該鎖。若當前線程未持有該鎖,則拋出IllegalMonitorStateException 異常 |
Condition | newCondition() | 返回一個與當前Lock 實例綁定的條件變量集合對象(默認返回AQS 內(nèi)部實現(xiàn)類ConditionObject ),用于實現(xiàn)線程的條件等待/喚醒(詳見后文) |
1. 鎖的基本使用
相比synchronized
關(guān)鍵字來說,ReentrantLock
屬于顯式鎖,其鎖機制都是針對Lock
實例對象本身進行加鎖,并且在使用過程中需要手動釋放,即鎖的獲取與釋放是成對出現(xiàn)的;除此之外,ReentrantLock
屬于JDK API層面實現(xiàn)的互斥鎖,其通過方法調(diào)用實現(xiàn)鎖功能,可以跨方法從而更加靈活。為了避免出現(xiàn)死鎖問題,官方建議的開發(fā)方式如下:
// new lock object
ReentrantLock lock = new ReentrantLock();
lock.lock(); // block until condition holds
try {// ... method body
} finally {lock.unlock(); // release
}
問題背景: 假設(shè)當前有一個賣票系統(tǒng),一共有100張票,有4個窗口同時售賣,請模擬該賣票過程,注意保證出票的正確性。
public class Test_01 {public static void main(String[] args) {ReentrantLock lock = new ReentrantLock();for (int i = 0; i < 4; i++){new Thread(new TicketSeller(lock), "Thread_" + i).start();}}
}
// 售票類實現(xiàn)
class TicketSeller implements Runnable{private static int ticketCount = 100;private ReentrantLock windowLock;public TicketSeller(ReentrantLock lock) {this.windowLock = lock;}@Overridepublic void run() {while (true) {try {Thread.sleep(10);}catch(InterruptedException e){e.printStackTrace();}windowLock.lock();try{if(ticketCount > 0){System.out.println(Thread.currentThread().getName() + ": sale and left " + --ticketCount);}else{System.out.println(Thread.currentThread().getName() + ": sold out...");break;}}finally {windowLock.unlock();}}}
}
2. 條件變量機制
在synchronized
關(guān)鍵字中,我們可以通過wait¬ify
實現(xiàn)線程的等待與喚醒,但在此場景下存在虛假喚醒問題,根本原因就是其等待/喚醒機制只支持單條件(等待線程未作區(qū)分,只能全部喚醒)。相比之下,ReentrantLock
基于Condition
接口也實現(xiàn)了相同的機制,并提供了更細的粒度和更高級的功能;每個Condition
實例都對應(yīng)一個條件隊列,用于維護在該條件場景下等待通知的線程,并且ReentrantLock
支持多條件變量,即一個ReentrantLock
可以關(guān)聯(lián)多個Condition
實例。其常用方法如下:
Type | Method | Description |
---|---|---|
void | await() throws InterruptedException | 使當前線程阻塞等待(進入該條件隊列),并釋放與此條件變量所關(guān)聯(lián)的鎖。注意: 若在等待期間被中斷,則拋出InterruptedException ,并清除當前線程中斷狀態(tài) |
boolean | await(long time,TimeUnit unit) throws InterruptedException | 使當前線程阻塞等待,并釋放與此條件變量所關(guān)聯(lián)的鎖,直到被喚醒、被中斷或等待time 時間過期。其返回值表示:- true : 在等待時間之內(nèi),條件被喚醒;- false : 等待時間過期,條件未被喚醒;注意: 若在等待期間被中斷,則拋出 InterruptedException ,并清除當前線程中斷狀態(tài) |
void | signal() | 喚醒一個等待在Condition 上的線程。如果有多個線程在此條件變量下等待,則選擇任意一個線程喚醒;注意: 從等待方法返回前必須重新獲得Condition 相關(guān)聯(lián)的鎖 |
void | signalAll() | 喚醒所有等待在Condition 上的線程。如果有多個線程在此條件變量下等待,則全部喚醒 ;注意: 從等待方法返回前必須重新獲得Condition 相關(guān)聯(lián)的鎖 |
在使用時需要注意:
- 調(diào)用
await
相關(guān)方法前需要先獲得對應(yīng)條件變量所關(guān)聯(lián)的鎖,否則會拋出IllegalMonitorStateException
異常; - 調(diào)用
signal
相關(guān)方法前需要先獲得對應(yīng)條件變量所關(guān)聯(lián)的鎖,否則會拋出IllegalMonitorStateException
異常; await
線程被喚醒(或等待時間過期、被中斷)后會重新參與鎖的競爭,若成功拿到鎖則將從await
處恢復(fù)繼續(xù)向下執(zhí)行;
/*** 場景模擬:奶茶店和咖啡店共用一個窗口(window)出餐,等待顧客點單...* - 奶茶店(teaWithMilk):顧客需要奶茶,則奶茶店開始工作;* - 咖啡店(coffee):顧客需要咖啡,則咖啡店開始工作;*/
public class Test {// 窗口鎖(ReentrantLock實現(xiàn))static final ReentrantLock window = new ReentrantLock();// 奶茶點單條件變量static Condition teaWithMilk = window.newCondition();// 咖啡點單條件變量static Condition coffee = window.newCondition();public static void main(String[] args) throws InterruptedException {// 奶茶店監(jiān)控線程new Thread(new Runnable() {@Overridepublic void run() {while (true) {window.lock();try {System.out.println("[奶茶店] 等待接單...");try {teaWithMilk.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("[奶茶店] 接到訂單...");} finally {window.unlock();}System.out.println("[奶茶店] 開始工作...");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("[奶茶店] 工作完成...");}}}).start();Thread.sleep(1000);// 咖啡店監(jiān)控線程new Thread(new Runnable() {@Overridepublic void run() {while (true) {window.lock();try {System.out.println("[咖啡店] 等待接單...");try {coffee.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("[咖啡店] 接到訂單...");} finally {window.unlock();}System.out.println("[咖啡店] 開始工作...");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("[咖啡店] 工作完成...");}}}).start();Thread.sleep(1000);// 顧客點單線程: mainwindow.lock();try{System.out.println("[顧客] 點了咖啡!!");coffee.signal(); // 喚醒咖啡條件等待線程}finally {window.unlock();}}
}
二. 從 ReentrantLock 分析 AQS 的原理
1. AQS 框架
AQS
全稱為 AbstractQueuedSynchronizer
,即抽象隊列同步器;AQS
是 java.util.concurrent.locks
包下的一個抽象類,其為構(gòu)建鎖和同步器提供了一系列通用模板與框架的實現(xiàn),大部分JUC
包下的并發(fā)工具都是基于AQS
來構(gòu)建的,比如ReentrantLock
、Semaphore
、CountDownLatch
等。其核心源碼如下:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {// 同步隊列的節(jié)點static final class Node {...}// 指向同步隊列頭部private transient volatile Node head;// 指向同步隊列尾部private transient volatile Node tail;// 同步狀態(tài)private volatile int state;// 提供一系列并發(fā)、同步隊列的基本操作方法// 比如: 掛起、取消、節(jié)點插入、節(jié)點替換等...// 交由子類實現(xiàn)的模板方法(鉤子方法): 自定義同步器的核心實現(xiàn)目標protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();}protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();}protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();}protected boolean isHeldExclusively() {throw new UnsupportedOperationException();}// 條件變量 Condition 接口的內(nèi)部實現(xiàn)類public class ConditionObject implements Condition {...}}
由上可知,AQS內(nèi)部實現(xiàn)了一個核心內(nèi)部類Node
,該內(nèi)部類表示對等待獲取鎖的線程的封裝節(jié)點;在AQS中,基于Node
維護了一個雙向鏈表(模擬同步隊列),其中head
節(jié)點指向同步隊列的頭部,而tail
節(jié)點指向同步隊列的尾部。Node
類的核心源碼如下:
static final class Node {// 共享模式(共享鎖、比如Semaphore)static final Node SHARED = new Node();// 獨占模式(獨占鎖、比如ReentrantLock)static final Node EXCLUSIVE = null;// 標識節(jié)點線程獲取鎖的請求已取消、已結(jié)束static final int CANCELLED = 1;// 標識節(jié)點線程已準備就緒,等待被喚醒獲取資源static final int SIGNAL = -1;// 標識節(jié)點線程在條件變量Condition中等待static final int CONDITION = -2;// 在共享模式下啟用: 標識獲得的同步狀態(tài)會被傳播static final int PROPAGATE = -3;/*** waitStatus 標識節(jié)點線程在同步隊列中的狀態(tài),共存在以下幾種情況:* (1)SIGNAL: 被標記為SIGNAL的節(jié)點處于等待喚醒獲取資源的狀態(tài),只要前驅(qū)節(jié)點釋放鎖就會通知該狀態(tài)的后續(xù)節(jié)點線程執(zhí)行* (2)CANCELLED: 在同步隊列中等待超時、被中斷的線程會進入取消狀態(tài),不再響應(yīng)并會在遍歷過程中被移除* (3)CONDITION: 標識當前節(jié)點線程在Condition下等待,被喚醒后將重新從等待隊列轉(zhuǎn)移到同步隊列* (4)PROPAGATE: 與共享模式有關(guān) * (5)0: 默認初始值狀態(tài),代表節(jié)點初始化*/volatile int waitStatus;// 同步隊列中的前驅(qū)節(jié)點volatile Node prev;// 同步隊列中的后繼節(jié)點volatile Node next;// 等待獲取鎖資源的線程volatile Thread thread;// Condition 等待隊列中的后繼節(jié)點(單向鏈表)Node nextWaiter;// 判斷是否為共享模式final boolean isShared() {return nextWaiter == SHARED;}// 獲取當前節(jié)點在同步隊列中的前驅(qū)節(jié)點final Node predecessor() throws NullPointerException {Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}// 省略構(gòu)造方法...}
在了解Node
的基本數(shù)據(jù)結(jié)構(gòu)與狀態(tài)之后,AQS
還有一個核心狀態(tài)變量即state
,該全局變量用于表示同步鎖的狀態(tài),其具體含義一般在子類實現(xiàn)中進行定義和維護(獨占鎖和共享鎖不一樣)。但不管獨占模式還是共享模式,簡單來說都是使用一個volatile
的全局變量來表示資源同步狀態(tài)(state
),并通過CAS完成對state
值的修改(修改成功則表示獲鎖成功),當持有鎖的線程數(shù)量超過當前模式(獨占模式一般限制為1)時,則通過內(nèi)置的FIFO同步隊列來完成資源獲取線程的排隊工作(通過LockSupport park/unpark
方法實現(xiàn)掛起與喚醒)。其核心原理圖如下:
綜上,AQS采用了模板方法模式來構(gòu)建同步框架,并提供了一系列并發(fā)操作的公共基礎(chǔ)方法,支持共享模式和獨占模式兩種實現(xiàn);但AQS并不負責對外提供具體的加鎖/解鎖邏輯,因為鎖是千變?nèi)f化的,AQS只關(guān)注基礎(chǔ)組件、頂層模板這些總的概念,具體的鎖邏輯將通過”鉤子“的方式下放給子類實現(xiàn)。也就是說,獨占模式只需要實現(xiàn)tryAcquire-tryRelease
方法、共享模式只需要實現(xiàn)tryAcquireShared-tryReleaseShared
方法,搭配AQS提供的框架和基礎(chǔ)組件就能輕松實現(xiàn)自定義的同步工具。
2. ReentrantLock 源碼分析
由ReentrantLock
的類結(jié)構(gòu)圖可以看出,ReentrantLock
實現(xiàn)了Lock
接口,其內(nèi)部包含一個內(nèi)部類Sync
,該內(nèi)部類繼承了AQS
(AbstractQueuedSynchronizer
),ReentrantLock
大部分的鎖操作都是通過Sync
實現(xiàn)的。除此之外,ReentrantLock
有公平鎖和非公平鎖兩種模式,分別對應(yīng)Sync
的FairSync
和NonfairSync
兩個子類實現(xiàn)。
public class ReentrantLock implements Lock, java.io.Serializable {private final Sync sync;// 默認構(gòu)造函數(shù): 默認創(chuàng)建非公平鎖public ReentrantLock() {sync = new NonfairSync();}// 帶參構(gòu)造函數(shù): true公平鎖/false非公平鎖public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}// 加鎖操作public void lock() {sync.lock();}//...
}
接下來,本節(jié)將首先以ReentrantLock
的非公平鎖為例進行分析,然后再介紹公平鎖與非公平鎖的主要區(qū)別。
2.1 非公平鎖lock加鎖原理
非公平鎖NonfairSync
的源碼如下:
static final class NonfairSync extends Sync {// 加鎖操作final void lock() {// CAS修改state狀態(tài)以獲取鎖資源if (compareAndSetState(0, 1))// 成功則將獨占鎖線程設(shè)置為當前線程setExclusiveOwnerThread(Thread.currentThread());else// 否則再次請求同步狀態(tài)(AQS的模板方法)acquire(1);}// AQS 獲鎖的鉤子方法實現(xiàn)protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}
在調(diào)用lock
方法獲取鎖的過程中,當前線程會先通過CAS操作嘗試修改state
從0
(表示無鎖)到1
(表示占有鎖),若修改成功則將AQS
中保存的獨占線程exclusiveOwnerThread
修改為當前線程;若失敗則執(zhí)行acquire(1)
方法,該方法是AQS
中的一個模板方法,其源碼如下:
public final void acquire(int arg) {// tryAcquire -> addWaiter -> acquireQueuedif (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
可以看到該方法首先調(diào)用了鉤子方法tryAcquire
,該方法是交由子類NonfairSync
實現(xiàn)的,在上面的源碼中我們已經(jīng)給出了tryAcquire
的實現(xiàn)代碼,其直接調(diào)用了父類Sync
中的nonfairTryAcquire
方法,其源碼如下:
abstract static class Sync extends AbstractQueuedSynchronizer {abstract void lock();// nonfairTryAcquire 方法實現(xiàn)final boolean nonfairTryAcquire(int acquires) {// 獲取當前線程及同步隊列狀態(tài)statefinal Thread current = Thread.currentThread();int c = getState();// 若狀態(tài)為0表示鎖已釋放: 重新嘗試獲取鎖if (c == 0) {// CAS嘗試修改state的值if (compareAndSetState(0, acquires)) {// 若成功則設(shè)置獨占線程為當前線程setExclusiveOwnerThread(current);return true;}}// 若獨占線程即當前線程,則屬于重入鎖else if (current == getExclusiveOwnerThread()) {// 修改state為重入值int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}//省略代碼...
}
由上述代碼可知,該方法首先再次判斷鎖是否已釋放,這是為了避免之前持鎖的線程在這段時間內(nèi)又重新釋放了鎖,若state==0
則會嘗試再次CAS修改同步狀態(tài)以獲取鎖資源;否則的話,則判斷當前線程是否是鎖重入的情況,若兩個判斷都不滿足則返回false
;到目前為止,我們回過頭來想一下非公平鎖的非公平性是在哪體現(xiàn)的?
很明顯,在上述代碼分析中,當有任何線程嘗試獲取鎖時(調(diào)用lock
方法),不論當前同步隊列中是否已有線程排隊等待,NonfairSync
的lock()
方法以及Sync
的nonfairTryAcquire()
方法都沒有對同步隊列中的等待情況進行判斷,而是直接通過CAS嘗試修改state
的值來為當前線程直接占有鎖;這就是非公平性的體現(xiàn),搶占線程可以直接與等待線程競爭鎖資源,而不用按照順序加入隊列。
分析完這部分之后,我們再回到acquire(1)
方法,若tryAcquire(arg)
方法返回false
即獲取不到鎖時會繼續(xù)向下執(zhí)行到addWaiter(Node.EXCLUSIVE)
方法,該方法用于封裝線程入隊,其源碼如下:
private Node addWaiter(Node mode) {// 將請求占鎖失敗的線程封裝為Node節(jié)點Node node = new Node(Thread.currentThread(), mode);Node pred = tail;// 若同步隊列不為空,則嘗試CAS在尾部插入當前節(jié)點(FIFO)if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}// 若隊列為空或CAS插入節(jié)點失敗則執(zhí)行enq()方法處理入隊enq(node);return node;}
接著,我們繼續(xù)分析enq(node)
方法的實現(xiàn):
private Node enq(final Node node) {// 開啟自旋(循環(huán))for (;;) {Node t = tail;// 若隊列為空if (t == null) { // Must initialize// 則嘗試CAS創(chuàng)建頭節(jié)點(不存儲數(shù)據(jù))// 原因: 隊列為空可能是因為其他線程非公平占有了鎖(當前線程試過沒搶到),因此這里需要先斬后奏,即再次創(chuàng)建頭節(jié)點表示已占鎖線程的占位,來維護同步隊列if (compareAndSetHead(new Node()))tail = head;} else {// 否則嘗試CAS添加尾節(jié)點node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}// 若CAS失敗(可能有多線程并發(fā)操作),則不斷自旋重試直到插入成功}
可以看到頭節(jié)點head
其實是不存儲數(shù)據(jù)的,它只表示一個線程占位(占位了鎖資源),因為位于頭節(jié)點的線程肯定已經(jīng)獲取到了鎖,頭節(jié)點只存儲后繼節(jié)點指向,用于當前線程釋放鎖資源時喚醒后繼節(jié)點;那么到此這個方法也就分析完成了,在節(jié)點入隊成功之后會返回當前節(jié)點node
,然后會繼續(xù)執(zhí)行到acquireQueued(addWaiter(Node.EXCLUSIVE),arg)
方法,其源碼如下:
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false; //阻塞掛起標志// 開啟自旋(循環(huán))for (;;) {// 獲取當前節(jié)點的前驅(qū)節(jié)點pfinal Node p = node.predecessor();// 若p是頭節(jié)點則當前節(jié)點嘗試獲取鎖資源if (p == head && tryAcquire(arg)) {// 占鎖成功則設(shè)置當前節(jié)點node為頭節(jié)點: 頭節(jié)點狀態(tài)保持SIGNAL狀態(tài)setHead(node);p.next = null; // help GCfailed = false;return interrupted;}// 若p不是頭節(jié)點或獲取鎖資源失敗,則判斷是否阻塞掛起線程來等待if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {// 若最終無法獲取鎖,則取消該線程的請求if (failed)cancelAcquire(node);}}// 將傳遞的節(jié)點設(shè)置為同步隊列的頭節(jié)點private void setHead(Node node) {head = node;// 清空當前節(jié)點存儲的線程信息node.thread = null;node.prev = null;}
在acquireQueued
方法中,線程會開啟自旋,若發(fā)現(xiàn)(或被喚醒后發(fā)現(xiàn))當前節(jié)點的前驅(qū)節(jié)點變?yōu)轭^節(jié)點,則說明當前節(jié)點能夠嘗試獲取鎖資源,并嘗試通過tryAcquire
方法獲取同步狀態(tài);需要注意的是,head
頭節(jié)點表示當前占有鎖的線程節(jié)點,只有當head
節(jié)點對應(yīng)的線程釋放鎖資源并喚醒后繼節(jié)點時,后繼節(jié)點線程才會自旋去嘗試占有鎖資源,因此:在同步隊列中,只有前驅(qū)節(jié)點變?yōu)轭^節(jié)點時,當前節(jié)點才有資格嘗試獲取鎖資源,其他時候都將被掛起等待,避免空轉(zhuǎn)CPU
。
除此之外,若在自旋過程中,當前節(jié)點的前驅(qū)節(jié)點不是頭節(jié)點或者節(jié)點嘗試tryAcquire
獲取鎖資源失敗,則會執(zhí)行shouldParkAfterFailedAcquire(p,node)&&parkAndCheckInterrupt()
邏輯;需要注意的是在前驅(qū)節(jié)點是頭節(jié)點但嘗試獲取鎖資源失敗這種特殊情況發(fā)生時(比如非公平鎖模式下被新到來的請求線程搶占),頭節(jié)點head
此時可能有兩種狀態(tài):
-
waitStatus==0
: 處于該狀態(tài)一種情況是初始同步隊列為空時,默認頭節(jié)點狀態(tài)初始化為0;另一種情況是鎖釋放時(見后文)被unparkSuccessor
重置頭節(jié)點狀態(tài); -
waitStatus==SIGNAL
: 處于該狀態(tài)一種情況是waitStatus==0
時更新狀態(tài)后又自旋回來但仍未獲取到鎖(可能釋放鎖后被非公平搶占);另一種情況是釋放鎖時unparkSuccessor
重置失敗;
其源碼如下:
// 判斷節(jié)點線程是否掛起等待private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {// 獲取前驅(qū)節(jié)點的等待狀態(tài)int ws = pred.waitStatus;// 若是SIGNAL狀態(tài),則說明前驅(qū)節(jié)點就緒,當前節(jié)點正常需要繼續(xù)等待即返回trueif (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;// 若等待狀態(tài)>0則說明前驅(qū)節(jié)點是結(jié)束狀態(tài),需要遍歷前驅(qū)節(jié)點直到找到非結(jié)束狀態(tài)的有效節(jié)點if (ws > 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {// 若等待狀態(tài)<=0且非SIGNAL,則嘗試將前驅(qū)節(jié)點設(shè)置為SIGNAL/** waitStatus must be 0 or PROPAGATE. Indicate that we* need a signal, but don't park yet. Caller will need to* retry to make sure it cannot acquire before parking.*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}// false 則返回去繼續(xù)自旋return false;}// 執(zhí)行掛起阻塞操作 LockSupport.parkprivate final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}
shouldParkAfterFailedAcquire
方法的執(zhí)行邏輯是判斷前驅(qū)節(jié)點的等待狀態(tài)waitStatus
,用于掛起當前節(jié)點線程等待,結(jié)合上述分析,包括以下幾種情況:
- 前驅(qū)節(jié)點狀態(tài)
waitStatus==SIGNAL
: 若前驅(qū)節(jié)點是頭節(jié)點,則說明占鎖線程還未執(zhí)行結(jié)束,當前節(jié)點線程仍需掛起等待;若前驅(qū)節(jié)點不是頭節(jié)點,則說明前驅(qū)節(jié)點就緒/擁有更高的優(yōu)先級,下次執(zhí)行還輪不到當前節(jié)點,所以也可以安全掛起,直接返回true
; - 前驅(qū)節(jié)點狀態(tài)
waitStatus>0
: 說明前驅(qū)節(jié)點已處于結(jié)束/取消狀態(tài),應(yīng)該從同步隊列中移除,并遍歷所有前驅(qū)節(jié)點直到找到非結(jié)束狀態(tài)的有效節(jié)點作為前驅(qū); - 前驅(qū)節(jié)點狀態(tài)
waitStatus<0
且非SIGNAL: 前驅(qū)節(jié)點剛從Condition
的條件等待隊列被喚醒,從而轉(zhuǎn)移到同步隊列,需要轉(zhuǎn)換為SIGNAL
狀態(tài)等待; - 前驅(qū)節(jié)點狀態(tài)
waitStatus==0
:若前驅(qū)節(jié)點是頭節(jié)點,則說明同步隊列剛初始化(0)或鎖剛被釋放重置,鎖資源可能未被其他線程持有,需判斷能否占有鎖(不管當前線程能否占有,該鎖一定會被占有,都需要轉(zhuǎn)換狀態(tài)為SIGNAL
);若前驅(qū)節(jié)點不是頭節(jié)點,則說明該線程節(jié)點剛初始化并被插入隊列,需要轉(zhuǎn)換為SIGNAL
狀態(tài);
綜上,當shouldParkAfterFailedAcquire()
方法返回true
時會調(diào)用parkAndCheckInterrupt
方法掛起線程等待被喚醒,返回false
時則會繼續(xù)自旋判斷;至此,ReetrantLock
內(nèi)部間接依靠AQS
的FIFO
同步隊列,就完成了lock()
加鎖操作。
2.2 公平鎖lock加鎖原理
公平鎖FairSync
的源碼如下:
static final class FairSync extends Sync {// 加鎖操作final void lock() {acquire(1);}// AQS 獲鎖的鉤子方法實現(xiàn)protected final boolean tryAcquire(int acquires) {// 獲取當前線程final Thread current = Thread.currentThread();// 獲取同步狀態(tài)int c = getState();// 若當前沒有線程持有鎖資源if (c == 0) {// 首先判斷同步隊列是否存在等待節(jié)點if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 鎖重入else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}}
與非公平鎖唯一不同的是:公平鎖的tryAcquire
實現(xiàn)中,在嘗試修改state
之前,會先調(diào)用hasQueuedPredecessors()
判斷AQS
內(nèi)部同步隊列中是否已存在等待節(jié)點。如果存在,則說明在此之前,已經(jīng)有線程提交了獲取鎖的請求,那么當前線程就會直接被封裝成Node
節(jié)點,追加到隊尾等待。
2.3 釋放鎖原理
ReetrantLock
顯式鎖需要手動釋放鎖資源,其unlock()
方法直接調(diào)用了Sync
中的release(1)
方法,而該方法又是在其父類AQS
中直接實現(xiàn)的,其源碼如下:
public final boolean release(int arg) {// 嘗試釋放鎖if (tryRelease(arg)) {// 進入該代碼塊則說明鎖已完全釋放(state=0)// 獲取頭節(jié)點Node h = head;if (h != null && h.waitStatus != 0)// 喚醒head頭節(jié)點的后繼節(jié)點線程unparkSuccessor(h);return true;}return false;}// 喚醒node的后繼節(jié)點線程private void unparkSuccessor(Node node) {// 獲取節(jié)點狀態(tài)int ws = node.waitStatus;if (ws < 0) // 重置節(jié)點狀態(tài),允許失敗compareAndSetWaitStatus(node, ws, 0);// 獲取后繼節(jié)點Node s = node.next;// 若后繼節(jié)點為空或已結(jié)束if (s == null || s.waitStatus > 0) {s = null;// 尋找后繼可被喚醒的有效等待節(jié)點for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}// 執(zhí)行線程喚醒(繼續(xù)去自旋) LockSupport.unparkif (s != null)LockSupport.unpark(s.thread);}
release
方法能否釋放鎖并喚醒后繼節(jié)點線程依賴于tryRelease
鉤子方法,而該方法又下放到了Sync
中實現(xiàn),其源碼如下:
// ReentrantLock -> Sync -> tryRelease(1)protected final boolean tryRelease(int releases) {// 計算釋放鎖后的同步更新狀態(tài)int c = getState() - releases;// 如果當前釋放鎖的線程不為持有鎖的線程則拋出異常if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;// 判斷更新狀態(tài)是否為0,如果是則說明已完全釋放鎖if (c == 0) {free = true;// 完全釋放鎖才清空當前占有線程setExclusiveOwnerThread(null);}// 更新state值setState(c);// 完全釋放鎖才返回truereturn free;}
注意:tryRelease
方法執(zhí)行完成返回true
之后,就說明當前線程持有的鎖已被釋放(非公平鎖中就已經(jīng)可以被搶占了),后續(xù)unparkSuccessor
方法只是進行一些善后工作,其中重置頭節(jié)點狀態(tài)的目的是表示邏輯上從持鎖到無鎖的轉(zhuǎn)換,鎖資源目前可能并沒有線程持有,因此在后續(xù)線程喚醒后執(zhí)行acquireQueued
自旋時waitStatus==0
狀態(tài)會再一次判斷并嘗試獲取鎖,而修改為SIGNAL
就表示占鎖線程正在執(zhí)行,其他線程需要掛起等待。至此,整個流程可以結(jié)合起來理解:s
節(jié)點的線程被喚醒后,會繼續(xù)執(zhí)行acquireQueued()
方法中的自旋,判斷if (p == head && tryAcquire(arg))
代碼是否成立,從而執(zhí)行判斷操作。
三. 其他同步工具類
1. Semaphore
1.1 基本概述
Semaphore
是java.util.concurrent
包下的一種計數(shù)信號量,它同樣也是基于AQS
實現(xiàn)的同步工具類。相比ReentrantLock
來說,它應(yīng)該屬于共享鎖,即允許多個線程同時訪問某個共享資源,但會限制同時訪問特定資源的線程數(shù)量;Semaphore
同樣也支持公平模式和非公平模式兩種方式,其構(gòu)造方法如下:
public Semaphore(int permits) {sync = new NonfairSync(permits);
}public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Semaphore
默認為非公平模式(搶占式),在構(gòu)造信號量對象時都必須提供permits
參數(shù);permits
可以理解為許可證數(shù)量,只有拿到許可證的線程才能執(zhí)行,該參數(shù)限制了能同時獲取或訪問到共享資源的線程數(shù)量,其他超出線程都將阻塞等待?;诖?#xff0c;Semaphore
通常用于實現(xiàn)資源有明確訪問數(shù)量限制的場景,比如限流、池化等。Semaphore
的常用方法介紹如下:
Type | Method | Description |
---|---|---|
void | acquire() throws InterruptedException | 當前線程請求獲取該信號量的一個許可證。若有可用許可證,則獲得許可證并返回執(zhí)行同步代碼,同時可用許可證數(shù)量將減少一個;若沒有可用許可證,則阻塞等待直到有許可被釋放,或線程中斷。注意: 若當前線程在等待過程中被中斷,則會拋出InterruptedException ,并清除當前線程的中斷狀態(tài)。 |
void | acquire(int permits) throws InterruptedException | 當前線程請求獲取該信號量的permits 個許可證。若有可用數(shù)量的許可證,則獲得許可證并返回執(zhí)行同步代碼,同時可用許可證數(shù)量將減少permits 個;若沒有可用數(shù)量的許可證,則阻塞等待直到可用許可達到指定數(shù)量,或線程中斷。**注意: **若當前線程在等待過程中被中斷,則會拋出InterruptedException ,并清除當前線程的中斷狀態(tài)。 |
void | release() | 釋放該信號量的一個許可證,并使可用許可證數(shù)量增加一個。注意: 沒有通過acquire() 獲取許可的線程甚至也可以直接調(diào)用release() 來為信號量增加許可證數(shù)量,并且可用許可有可能會超出構(gòu)造時限制的permits 值,因此信號量的正確使用必須是通過應(yīng)用程序中的編程約束來建立。 |
void | release(int permits) | 釋放該信號量的permits 個許可證,并使可用許可證數(shù)量增加permits 個。注意: 同release() 方法,信號量的正確使用必須是通過應(yīng)用程序中的編程約束來建立。 |
boolean | tryAcquire() | 嘗試獲取該信號量的一個許可證,但該方法會立即返回。若有可用許可證,則獲得許可證并返回true ,同時可用許可證數(shù)量將減少一個;若沒有可用許可證,則返回false 。注意: 該方法會破壞公平策略,對該方法的調(diào)用會進行搶占式獲取(不管是否有線程在等待)。 |
boolean | tryAcquire(long timeout, TimeUnit unit) throws InterruptedException | 嘗試在指定時間timeout 內(nèi)獲取該信號量的一個許可證(遵循公平策略)。若有可用許可證,則獲得許可證并返回true ,同時可用許可證數(shù)量將減少一個;若沒有可用許可證,則阻塞等待直到timeout 過期,等待時間過期則返回false 。注意: 若當前線程在等待過程中被中斷,則會拋出InterruptedException ,并清除當前線程的中斷狀態(tài)。 |
int | availablePermits() | 獲取當前信號量中的可用許可證數(shù)量。 |
可以看出,許可證是Semaphore
的核心概念,Semaphore
信號量對許可證的獲取是強限制,但對許可證的釋放是弱限制的,即請求線程在執(zhí)行時必須獲取acquire
到指定數(shù)量的許可證,但在釋放release
時并不會對先前是否獲取進行檢查,因此可用許可有時可能會超出構(gòu)造時限制的permits
值。換句話說,構(gòu)造時傳入的permits
參數(shù)只表示信號量的初始許可數(shù)量,并且許可證只決定了線程執(zhí)行的門檻,但并不會對線程作全程限制;當前線程一旦獲取到指定數(shù)量的許可便開始執(zhí)行,即使中途釋放許可也不會影響后續(xù)執(zhí)行過程,這也就是為什么說信號量的正確使用必須是通過應(yīng)用程序中的編程約束來建立。舉例如下:
public class test {public static void main(String[] args) {// 初始化許可數(shù)量 = 5Semaphore semaphore = new Semaphore(5);new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + ": 等待許可...");semaphore.acquire(3); // acquire permits = 3System.out.println(Thread.currentThread().getName() + ": 拿到許可,剩余許可 = " + semaphore.availablePermits());Thread.sleep(3000);semaphore.release(); // release = 1System.out.println(Thread.currentThread().getName() + ": 釋放許可...");} catch (InterruptedException e) {e.printStackTrace();}},"thread-1").start();try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + ": 等待許可...");semaphore.acquire(3);// acquire permits = 3System.out.println(Thread.currentThread().getName() + ": 拿到許可,剩余許可 = " + semaphore.availablePermits());Thread.sleep(3000);semaphore.release();// release = 1System.out.println(Thread.currentThread().getName() + ": 釋放許可,剩余許可 = " + semaphore.availablePermits());} catch (InterruptedException e) {e.printStackTrace();}},"thread-2").start();}
}
thread-1: 等待許可...
thread-1: 拿到許可,剩余許可 = 2
thread-2: 等待許可...
thread-1: 釋放許可...
thread-2: 拿到許可,剩余許可 = 0
thread-2: 釋放許可,剩余許可 = 1
前文說過,Semaphore
通常用于實現(xiàn)資源有明確訪問數(shù)量限制的場景,比如限流、池化等;此處通過Semaphore
模擬一個請求限流的場景,其中限制最大并發(fā)數(shù)為3,實現(xiàn)代碼如下:
public class test {public static void main(String[] args) {// 自定義線程池ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2*2, 8,60, TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>(1024),new ThreadPoolExecutor.AbortPolicy());// 流量控制: 限制最大并發(fā)數(shù)為3final Semaphore semaphore = new Semaphore(3);// 模擬10個客戶端任務(wù)請求for(int index = 0;index < 10;index++){final int serial = index;threadPool.execute(() -> {try {// 請求獲取許可semaphore.acquire();System.out.println(Thread.currentThread().getName() + ": 請求成功!訪問編號 = " + serial);// 模擬IO操作Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {// 釋放許可semaphore.release();}});}// 等待線程池執(zhí)行結(jié)束關(guān)閉: 不再接受新任務(wù)提交,對已經(jīng)提交了的任務(wù)不會產(chǎn)生影響threadPool.shutdown();}
}
1.2 原理分析
Semaphore
的大部分方法都是基于內(nèi)部類Sync
實現(xiàn)的,而該類又繼承了 AbstractQueuedSynchronizer
即AQS
,并且Sync
對應(yīng)的還有兩個子類 NonfairSync
(非公平模式實現(xiàn)) 和 FairSync
(公平模式實現(xiàn))。在Semaphore
中,AQS
的 state
被定義為 permits
(許可證數(shù)量),對象創(chuàng)建時傳入的參數(shù)permits
實際是在對AQS內(nèi)部的state
進行初始化,初始化完成后state
代表著當前信號量對象的可用許可數(shù)(state>0
)。
以非公平模式為例,當線程調(diào)用Semaphore.acquire(arg)
請求獲取許可時,會首先判斷remaining = getState() - arg
是否大于0,如果是則代表還有滿足可用的許可數(shù),并嘗試對state
進行CAS操作使state=remaining
,若CAS成功則代表獲取許可成功;否則線程需要封裝成Node節(jié)點并加入同步隊列阻塞等待,直到許可釋放被喚醒。
// Semaphore類 -> acquire()方法
public void acquire() throws InterruptedException {// Sync類繼承AQS,此處直接調(diào)用AQS內(nèi)部的acquireSharedInterruptibly()方法sync.acquireSharedInterruptibly(1);}// AbstractQueuedSynchronizer類 -> acquireSharedInterruptibly()方法
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {// 判斷是否出現(xiàn)線程中斷信號(標志)if (Thread.interrupted())throw new InterruptedException();// 如果tryAcquireShared(arg)執(zhí)行結(jié)果不小于0,則線程獲取同步狀態(tài)成功if (tryAcquireShared(arg) < 0)// 未獲取成功加入同步隊列阻塞等待doAcquireSharedInterruptibly(arg);
}
// Semaphore類 -> NofairSync內(nèi)部類 -> tryAcquireShared()方法
protected int tryAcquireShared(int acquires) {// 調(diào)用了父類Sync中的實現(xiàn)方法return nonfairTryAcquireShared(acquires);
}// Syn類 -> nonfairTryAcquireShared()方法
abstract static class Sync extends AbstractQueuedSynchronizer {final int nonfairTryAcquireShared(int acquires) {// 開啟自旋死循環(huán)for (;;) {int available = getState();int remaining = available - acquires;// 判斷信號量中可用許可數(shù)是否已<0或者CAS執(zhí)行是否成功if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}
}
釋放邏輯對比獲取許可的邏輯相對來說要簡單許多,只需要更新state
值增加后調(diào)用doReleaseShared()
方法喚醒后繼節(jié)點線程即可;需要注意的是,而在共享模式中可能會存在多條線程同時釋放許可/鎖資源,所以在此處使用了CAS+自旋
的方式保證線程安全問題。
2. CountDownLatch
2.1 基本概述
CountDownLatch
同樣是java.util.concurrent
包下的基于AQS
實現(xiàn)的同步工具類。類似于Semaphore
,CountDownLatch
在初始化時也會傳入一個參數(shù)count
來間接賦值給AQS
的state
,用于表示一個線程計數(shù)值;不過CountDownLatch
并沒有構(gòu)建公平模式和非公平模式(內(nèi)部Sync
沒有子類實現(xiàn)),其構(gòu)造方法如下:
public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);
}
CountDownLatch
的主要作用是等待計數(shù)值count
歸零后,喚醒所有的等待線程。基于該特性,CountDownLatch
常被用于控制多線程之間的等待與協(xié)作(多線程條件喚醒);相比join
來說,CountDownLatch
更加靈活且粒度更細,join
是以線程執(zhí)行結(jié)束為條件,而CountDownLatch
是以方法的主動調(diào)用為條件。其常用方法如下:
Type | Method | Description |
---|---|---|
void | await() throws InterruptedException | 使當前線程阻塞等待,直到計數(shù)器count 歸零或線程被中斷。若當前計數(shù)已為零,則此方法立即返回。注意: 若在等待過程中被中斷,則會拋出InterruptedException,并清除當前線程的中斷狀態(tài)。 |
void | countDown() | 使當前計數(shù)器count 遞減。如果新計數(shù)歸零,則喚醒所有await() 等待線程;注意: 若當前計數(shù)已為零,則無事發(fā)生。 |
long | getCount() | 獲取當前計數(shù)器count 的值。 |
需要注意的是,CountDownLatch
是一次性的,即計數(shù)器的值count
只能在構(gòu)造方法中初始化,此外再沒有任何設(shè)置值的方法,當 CountDownLatch
使用完畢后(計數(shù)歸零)將不能重復(fù)被使用;若需要重置計數(shù)的版本,可以考慮使用CyclicBarrier
。CountDownLatch
的常用方法有兩種:
- 多等一:初始化
count=1
,多條線程await()
阻塞等待一條線程調(diào)用countDown()
喚醒所有線程。比如模擬并發(fā)安全、死鎖等; - 一等多:初始化
count=N
,一條線程await()
阻塞等待N條線程調(diào)用countDown()
歸零后喚醒。比如多接口調(diào)用的數(shù)據(jù)合并、多操作完成后的數(shù)據(jù)檢查、主服務(wù)啟動后等待多個組件加載完畢等(注意線程間的通信與數(shù)據(jù)傳遞需結(jié)合Future
實現(xiàn));
public class test {public static void main(String[] args) {// 模擬10人拼團活動final CountDownLatch countDownLatch = new CountDownLatch(10);// 固定數(shù)量線程池ExecutorService threadPool = Executors.newFixedThreadPool(50);// 拼團人員ID集合List<String> ids = new ArrayList<>();// 模擬30人開始搶單拼團for (int i = 0; i < 30; i++) {threadPool.execute(() -> {boolean orderSucess = false;System.out.println(Thread.currentThread().getName() + ": 請求拼團...");if (countDownLatch.getCount() > 0) {synchronized (ids) {if (countDownLatch.getCount() > 0) {ids.add(Thread.currentThread().getName());System.out.println(Thread.currentThread().getName() + ": 拼團成功!");countDownLatch.countDown();orderSucess = true;}}}if (!orderSucess) {System.out.println(Thread.currentThread().getName() + ": 拼團失敗!已無名額...");}});}// 訂單生成線程new Thread(() -> {try {countDownLatch.await();System.out.println(Thread.currentThread().getName() + ": 拼團結(jié)束, 訂單已生成...");System.out.println(Thread.currentThread().getName() + ": 拼團人員id = " + ids);} catch (InterruptedException e) {e.printStackTrace();}}, "拼團").start();// 釋放線程池threadPool.shutdown();}
}
2.2 原理分析
CountDownLatch
的底層實現(xiàn)原理也非常簡單,當線程調(diào)用 await()
的時候,如果 state
不為 0 則證明任務(wù)還沒有執(zhí)行結(jié)束,await()
就會進入阻塞等待,其源碼如下:
// CountDownLatch -> await()
public void await() throws InterruptedException {// 調(diào)用內(nèi)部類sync的acquireSharedInterruptibly方法sync.acquireSharedInterruptibly(1);
}
// CountDownLatch -> Sync -> acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {// 被中斷拋出異常if (Thread.interrupted())throw new InterruptedException();// tryAcquireShared -> 判斷是否阻塞等待if (tryAcquireShared(arg) < 0)// 自旋+阻塞(AQS實現(xiàn))doAcquireSharedInterruptibly(arg);
}
// CountDownLatch -> Sync -> tryAcquireShared
protected int tryAcquireShared(int acquires) {// 判斷當前state是否歸零return (getState() == 0) ? 1 : -1;
}
當線程調(diào)用 countDown()
時,其實最終是調(diào)用了Sync
中重寫的tryReleaseShared
方法,該方法以 CAS 的操作來減少 state
;若更新后state
歸零,則表示所有的計數(shù)任務(wù)線程都執(zhí)行完畢,那么在 CountDownLatch
上等待的線程就會被AQS
的doReleaseShared
方法喚醒并繼續(xù)向下執(zhí)行。
// CountDownLatch -> countDown()
public void countDown() {sync.releaseShared(1);
}
// CountDownLatch -> Sync -> AQS -> releaseShared
public final boolean releaseShared(int arg) {// 判斷遞減后計數(shù)器是否歸零if (tryReleaseShared(arg)) {// 喚醒所有等待線程(AQS實現(xiàn))doReleaseShared();return true;}return false;
}
// CountDownLatch -> Sync -> tryReleaseShared
protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {// 獲取當前stateint c = getState();// 若計數(shù)器歸零則返回false,其他什么也不做if (c == 0)return false;// CAS更新state遞減int nextc = c-1;if (compareAndSetState(c, nextc))// 若更新成功則判斷新計數(shù)值是否歸零return nextc == 0;}
}
3. CyclicBarrier
//