如何在門(mén)戶網(wǎng)站做搜索引擎廈門(mén)網(wǎng)站建設(shè)公司
前置概念
要徹底了解AQS的底層實(shí)現(xiàn)就必須要了解一下線程相關(guān)的知識(shí)。
包括voliate
voliate
我們使用翻譯軟件翻譯一下volatile,會(huì)發(fā)現(xiàn)它有以下幾個(gè)意思:易變的;無(wú)定性的;無(wú)常性的;可能急劇波動(dòng)的;不穩(wěn)定的;易惡化的;易揮發(fā)的;易發(fā)散的。這也正式使用volatile關(guān)鍵字的語(yǔ)義。
當(dāng)你使用volatile去申明個(gè)變量時(shí),就等于告訴了虛擬機(jī),這個(gè)變量極有可能會(huì)被其他程序或者線程修改。為了確保這個(gè)變量被修改后,應(yīng)用程序范圍內(nèi)所有線程都能夠“看到”這個(gè)改動(dòng),虛擬機(jī)就必須采用一些特殊手段,保證這個(gè)變量的可見(jiàn)性等特點(diǎn)
比如,根據(jù)編譯器的優(yōu)化規(guī)則,如果不使用volatile申明變量,這個(gè)變量被修改后其他線程可能并不會(huì)被通知到,甚至在別的線程中,看到變量的修改程序都是反的。但是使用volatile,虛擬機(jī)就會(huì)謹(jǐn)慎的處理這種情況
CAS
CAS 是高并發(fā)的一個(gè)重要的編程實(shí)現(xiàn),即compareAndSet,對(duì)比并且設(shè)置。
意思就是說(shuō),要做一個(gè)修改就必須先教研我改之前的期望值是否和他現(xiàn)在的是否相同,如果相同則修改,不相同則不處理。
什么是AQS
AQS的本質(zhì)是java中的AbstractQueuedSynchronizer類。
AQS是并發(fā)包下的一個(gè)基類,基于它實(shí)現(xiàn)的類包括CountDownLatch,ReentranLock…
下面我們就以ReentranLock為入口詳細(xì)講解下AQS
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {static final class Node {/** 用于指示節(jié)點(diǎn)正在共享模式下等待的標(biāo)記 */static final Node SHARED = new Node();/** 用于指示節(jié)點(diǎn)正在以獨(dú)占模式等待的標(biāo)記 */static final Node EXCLUSIVE = null;/** waitStatus 值,用于指示線程已取消 */static final int CANCELLED = 1;/** waitStatus 值,用于指示后續(xù)線程需要取消停放 */static final int SIGNAL = -1;/** waitStatus 值,用于指示線程正在等待條件 */static final int CONDITION = -2;/** waitStatus 值來(lái)指示下一個(gè) acquireShared 應(yīng)無(wú)條件傳播 */static final int PROPAGATE = -3;/** * 狀態(tài)字段,僅采用以下值: * 信號(hào):此節(jié)點(diǎn)的后繼節(jié)點(diǎn)被(或即將)阻止(通過(guò) park),因此當(dāng)前節(jié)點(diǎn)在釋放或取消時(shí)必須取消其后繼節(jié)點(diǎn)的停放。為了避免爭(zhēng)用,acquire 方法必須首先指示它們需要一個(gè)信號(hào),然后重試原子獲取,然后在失敗時(shí)阻止。* CANCELLED:該節(jié)點(diǎn)因超時(shí)或中斷而取消。節(jié)點(diǎn)永遠(yuǎn)不會(huì)離開(kāi)此狀態(tài)。特別是,具有已取消節(jié)點(diǎn)的線程永遠(yuǎn)不會(huì)再次阻塞。條件:此節(jié)點(diǎn)當(dāng)前位于條件隊(duì)列中。在傳輸之前,它不會(huì)用作同步隊(duì)列節(jié)點(diǎn),此時(shí)狀態(tài)將設(shè)置為 0。(此處使用此值與該字段的其他用途無(wú)關(guān),但簡(jiǎn)化了機(jī)制。* 傳播:應(yīng)將 releaseShared 傳播到其他節(jié)點(diǎn)。這是在 doReleaseShared 中設(shè)置的(僅適用于頭節(jié)點(diǎn)),以確保傳播繼續(xù)進(jìn)行,即使其他操作此后進(jìn)行了干預(yù)。* 0:以上都不是 為了簡(jiǎn)化使用,這些值以數(shù)字形式排列。非負(fù)值表示節(jié)點(diǎn)不需要發(fā)出信號(hào)。因此,大多數(shù)代碼不需要檢查特定值,只需檢查符號(hào)即可。對(duì)于正常同步節(jié)點(diǎn),該字段初始化為 0,對(duì)于條件節(jié)點(diǎn),該字段初始化為 CONDITION。它使用 CAS 進(jìn)行修改(或者在可能的情況下,無(wú)條件易失性寫(xiě)入)。*/volatile int waitStatus;/*** 上一個(gè)節(jié)點(diǎn)*/volatile Node prev;/*** 下一個(gè)節(jié)點(diǎn)*/volatile Node next;/*** 將此節(jié)點(diǎn)排隊(duì)的線程。在構(gòu)造時(shí)初始化,使用后清空。*/volatile Thread thread;/*** 鏈接到下一個(gè)節(jié)點(diǎn),等待條件,或特殊值 SHARED。* 因?yàn)闂l件隊(duì)列只有在以獨(dú)占模式保持時(shí)才會(huì)被訪問(wèn),所以我們只需要一個(gè)簡(jiǎn)單的鏈接隊(duì)列來(lái)在節(jié)點(diǎn)等待條件時(shí)保存它們。* 然后,它們被轉(zhuǎn)移到隊(duì)列中以重新獲取。由于條件只能是獨(dú)占的,因此我們通過(guò)使用特殊值來(lái)保存字段來(lái)指示共享模式。*/Node nextWaiter;/*** 如果節(jié)點(diǎn)在共享模式下等待,則返回 true*/final boolean isShared() {return nextWaiter == SHARED;}/*** 返回上一個(gè)節(jié)點(diǎn),如果為 null,則引發(fā) NullPointerException。* 當(dāng) predecessor 不能為 null 時(shí)使用??梢允÷?null 檢查,但存在該檢查以幫助 VM。* 返回:此節(jié)點(diǎn)的前身*/final Node predecessor() throws NullPointerException {Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}Node() { // Used to establish initial head or SHARED marker}Node(Thread thread, Node mode) { // Used by addWaiterthis.nextWaiter = mode;this.thread = thread;}Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread;}}/*** 頭節(jié)點(diǎn)*/private transient volatile Node head;/*** 尾節(jié)點(diǎn)*/private transient volatile Node tail;/*** 狀態(tài)*/private volatile int state;
}
ReentranLock
acquire
/*** 以獨(dú)占模式獲取,忽略中斷。通過(guò)至少調(diào)用一次 tryAcquire來(lái)實(shí)現(xiàn),成功返回。* 否則,線程會(huì)排隊(duì),可能會(huì)反復(fù)阻塞和解除阻塞,調(diào)用 tryAcquire 直到成功。此方法可用于實(shí)現(xiàn)方法Lock.lock。* 參數(shù):arg – acquire 參數(shù)。此值被傳達(dá)給 tryAcquire ,但以其他方式未解釋,可以表示您喜歡的任何內(nèi)容。*/public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
tryAcquire
/*** 1. 獲取當(dāng)前線程 * 2. 獲取當(dāng)前鎖狀態(tài)c * 3. 如果當(dāng)前鎖狀態(tài)為0,則判斷是否有排隊(duì)的前任線程,并嘗試使用compareAndSetState方法將鎖狀態(tài)設(shè)置為acquires,如果成功則將當(dāng)前線程設(shè)置為獨(dú)占所有者并返回true * 4. 如果當(dāng)前鎖狀態(tài)不為0且當(dāng)前線程已經(jīng)是獨(dú)占所有者,則計(jì)算新的鎖狀態(tài)nextc,如果nextc小于0,則拋出異常"Maximum lock count exceeded",否則更新鎖狀態(tài)為nextc并返回true * 5. 如果以上條件都不滿足,則返回false。*/
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&// 用CAS的方式設(shè)置狀態(tài)compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {//1. 首先,代碼檢查當(dāng)前線程是否為獨(dú)占鎖的擁有者,如果是則執(zhí)行以下操作: //2. 計(jì)算下一個(gè)鎖的狀態(tài)值 nextc = c + acquires; //3. 如果下一個(gè)狀態(tài)值小于0,則拋出錯(cuò)誤 "Maximum lock count exceeded"; //4. 設(shè)置鎖的狀態(tài)為 nextc; //5. 返回 true 表示成功獲取獨(dú)占鎖。int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}//判斷當(dāng)前線程前面是否還有其他線程在等待。
public final boolean hasQueuedPredecessors() {Node t = tail;Node h = head;Node s;return h != t &&((s = h.next) == null || s.thread != Thread.currentThread());
}
//設(shè)置線程作為獨(dú)占所有者線程。
protected final void setExclusiveOwnerThread(Thread thread) {exclusiveOwnerThread = thread;
}
addWaiter
/*** 這段代碼的作用是向一個(gè)鏈表中添加一個(gè)等待節(jié)點(diǎn)。 * 1. 創(chuàng)建一個(gè)新的節(jié)點(diǎn),將當(dāng)前線程和傳入的模式作為參數(shù)。 * 2. 嘗試使用快速路徑添加節(jié)點(diǎn)到鏈表的末尾,如果失敗則備用完整的添加方法。 * 3. 獲取當(dāng)前尾節(jié)點(diǎn)作為前驅(qū)節(jié)點(diǎn)。 * 4. 如果前驅(qū)節(jié)點(diǎn)不為空,則設(shè)置新節(jié)點(diǎn)的前驅(qū)為前驅(qū)節(jié)點(diǎn)。 * 5. 如果成功將新節(jié)點(diǎn)設(shè)置為尾節(jié)點(diǎn),則將前驅(qū)節(jié)點(diǎn)的后繼指向新節(jié)點(diǎn),然后返回新節(jié)點(diǎn)。 * 6. 如果無(wú)法使用快速路徑,則調(diào)用enq方法完整添加節(jié)點(diǎn)到鏈表末尾,并返回新節(jié)點(diǎn)。*/
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;// 用CAS的方式設(shè)置尾部節(jié)點(diǎn)if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;
}
acquireQueued
//以獨(dú)占不間斷模式獲取已在隊(duì)列中的線程。由條件等待方法以及獲取使用
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {// 進(jìn)入一個(gè)無(wú)限循環(huán),不斷嘗試獲取鎖。 // 1.在循環(huán)中,首先獲取前驅(qū)節(jié)點(diǎn)p。 // 2. 如果p是頭結(jié)點(diǎn)并且tryAcquire(arg)成功獲取到鎖,則將當(dāng)前節(jié)點(diǎn)設(shè)置為頭結(jié)點(diǎn),斷開(kāi)p的next引用,將failed設(shè)為false,然后返回interrupted的值。 // 3. 如果無(wú)法獲取鎖,則判斷是否應(yīng)該在獲取失敗后阻塞線程,并檢查是否被中斷。 // 4. 如果被中斷,則將interrupted設(shè)為true。final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&// 中斷線程,放入等待隊(duì)列parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)// 取消競(jìng)爭(zhēng)鎖cancelAcquire(node);}
}//判斷在獲取鎖失敗后是否需要進(jìn)行阻塞等待。這是所有采集環(huán)路中的主要信號(hào)控制。需要 pred == node.prev。private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {//1. 首先,獲取前驅(qū)節(jié)點(diǎn)的等待狀態(tài)int ws = pred.waitStatus;//2. 如果等待狀態(tài)(ws)等于Node.SIGNAL,表示前驅(qū)節(jié)點(diǎn)已經(jīng)設(shè)置了狀態(tài)要求釋放鎖以發(fā)信號(hào)通知當(dāng)前節(jié)點(diǎn)可以安全地阻塞等待,返回true。 if (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;//3. 如果等待狀態(tài)(ws)大于0,表示前驅(qū)節(jié)點(diǎn)已取消,需要跳過(guò)前驅(qū)節(jié)點(diǎn)并指示重試。在循環(huán)中,將當(dāng)前節(jié)點(diǎn)的prev指向前驅(qū)節(jié)點(diǎn)的prev,直到找到等待狀態(tài)不大于0的前驅(qū)節(jié)點(diǎn),然后將前驅(qū)節(jié)點(diǎn)的next指向當(dāng)前節(jié)點(diǎn)。 if (ws > 0) {//4. 如果等待狀態(tài)(ws)為0或PROPAGATE,則表示需要一個(gè)信號(hào),但暫時(shí)不阻塞等待。調(diào)用者將需要重試以確保在阻塞等待之前不能獲取鎖。 do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {// 用CAS的方式設(shè)置等待狀態(tài)compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}//5. 最后返回false,表示不需要阻塞等待。return false;
}