貴州高端網(wǎng)站建設(shè)seo管理系統(tǒng)創(chuàng)作
一、簡介
Semaphore(信號量):是用來控制同時(shí)訪問特定資源的線程數(shù)量,它通過協(xié)調(diào)各個(gè)線程,以保證合理的使用公共資源。
Semaphore 一般用于流量的控制,特別是公共資源有限的應(yīng)用場景。例如數(shù)據(jù)庫的連接,假設(shè)數(shù)據(jù)庫的連接數(shù)上線為10個(gè),多個(gè)線程并發(fā)操作數(shù)據(jù)庫可以使用Semaphore來控制并發(fā)操作數(shù)據(jù)庫的線程個(gè)數(shù)最多為10個(gè)。
Semaphore 是一個(gè)有效的流量控制工具,它基于?AQS?共享鎖實(shí)現(xiàn)。我們常常用它來控制對有限資源的訪問。
- 每次使用資源前,先申請一個(gè)信號量,如果資源數(shù)不夠,就會阻塞等待;
- 每次釋放資源后,就釋放一個(gè)信號量。
二、源碼
2.1 類總覽
通過上面的類圖可以看到,Semaphore 與 ReentrantLock 的內(nèi)部類的結(jié)構(gòu)相同,類內(nèi)部總共存在 Sync、NonfairSync、FairSync 三個(gè)類, NonfairSync 與 FairSync 類繼承自 Sync 類,其只有一個(gè) tryAcquireShared() 方法,重寫了AQS的該方法。Sync 類繼承自 AbstractQueuedSynchronizer 抽象類。
與 CountDownLatch 類似,Semaphore 主要是通過 AQS 的共享鎖機(jī)制實(shí)現(xiàn)的,因此它的核心屬性只有一個(gè) Sync。總體源碼如下:
public class Semaphore implements java.io.Serializable {//序列化版本號private static final long serialVersionUID = -3222578661600680210L;//同步隊(duì)列private final Sync sync;//構(gòu)造方法//指定許可數(shù),默認(rèn)為非公平策略public Semaphore(int permits) {sync = new NonfairSync(permits);}//指定許可數(shù)和是否公平策略public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);}//Semaphore提供了acquire方法來獲取一個(gè)許可,會阻塞線程(有重載方法,可以指定獲取許可的個(gè)數(shù))public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1); //調(diào)用AQS的acquireSharedInterruptibly方法, 即共享式獲取響應(yīng)中斷}//tryAcquire的意思是嘗試獲取許可,如果獲取成功返回true,否則返回false,不會阻塞線程,而且不響應(yīng)中斷public boolean tryAcquire() {return sync.nonfairTryAcquireShared(1) >= 0;}//Semaphore提供release來釋放許可public void release() {sync.releaseShared(1); //調(diào)用AQS的releaseShared方法,即釋放共享式同步狀態(tài)}abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 1192457210091910933L;Sync(int permits) {setState(permits);}//獲取許可數(shù)目 final int getPermits() {return getState();}//共享模式下非公平策略獲取//本質(zhì)就是一個(gè)自旋方法,通過自旋+CAS來保證修改許可值的線程安全性,該方法返回的情況有如下兩種情況:// 信號量不夠,直接返回,返回值為負(fù)數(shù),表示獲取失敗;// 信號量足夠,且CAS操作成功,返回值為剩余許可值,獲取成功。final int nonfairTryAcquireShared(int acquires) {for (;;) { //自旋int available = getState(); //獲取可用許可值int remaining = available - acquires; //計(jì)算剩余的許可值//如果剩余許可值小于0,說明許可不夠用了,直接返回,否則CAS更新許可值,更新成功返回,否則繼續(xù)自旋if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}//共享模式下進(jìn)行釋放//該方法也是一個(gè)自旋方法,通過自旋+CAS原子性地修改許可值protected final boolean tryReleaseShared(int releases) {for (;;) { //自旋int current = getState(); //獲取許可值int next = current + releases; //計(jì)算釋放后的許可值if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next)) //CAS修改許可值,成功則返回,失敗則繼續(xù)自旋return true;}}//根據(jù)指定的縮減量減小可用許可的數(shù)目final void reducePermits(int reductions) {for (;;) {int current = getState();int next = current - reductions;if (next > current) // underflowthrow new Error("Permit count underflow");if (compareAndSetState(current, next))return;}}//獲取并返回立即可用的所有許可數(shù)目final int drainPermits() {for (;;) {int current = getState();if (current == 0 || compareAndSetState(current, 0))return current;}}}//采用非公平策略獲取資源static final class NonfairSync extends Sync {private static final long serialVersionUID = -2694183684443567898L;NonfairSync(int permits) {super(permits);}//獲取許可protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires); //共享模式下非公平策略獲取}}//采用公平策略獲取資源static final class FairSync extends Sync {private static final long serialVersionUID = 2014338818796000944L;FairSync(int permits) {super(permits);}//獲取許可protected int tryAcquireShared(int acquires) {for (;;) {//獲取共享鎖之前,先調(diào)用hasQueuedPredecessors方法來判斷隊(duì)列中是否存在其他正在排隊(duì)的節(jié)點(diǎn),// 如果是返回true,否則為false。因此當(dāng)存在其他正在排隊(duì)的節(jié)點(diǎn),當(dāng)前節(jié)點(diǎn)就無法獲取許可,只能排隊(duì)等待,這也是公平策略的體現(xiàn)。if (hasQueuedPredecessors())return -1;int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}}
}
2.2 核心方法
獲取信號量的方法總共有四個(gè):
釋放信號量的方法有兩個(gè):
獲取信號量四個(gè)方法中后面三個(gè)方法原理同 acquire() ,我們這里來分析一下 acquire() 和 release() 方法。
2.2.1 acquire() 方法
獲取許可,會阻塞線程,響應(yīng)中斷。
// Semaphore
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}
內(nèi)部調(diào)用的是 AQS 的 acquireSharedInterruptibly() 方法, 即共享式獲取響應(yīng)中斷,代碼如下:
// AbstractQueuedSynchronizer
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}
除了 tryAcquireShared() 方法由 AQS 子類實(shí)現(xiàn),其他方法在 《AQS實(shí)現(xiàn)原理》中有講解過,這里不再贅述。我們來分析一下子類實(shí)現(xiàn)的 tryAcquireShared() 方法,這里就要分公平和非公平策略兩種情況了。
2.2.1.1 非公平策略下
非公平策略下的 tryAcquireShared() 方法:
// Semaphore#NonfairSync
protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);
}
內(nèi)部調(diào)用 Sync#nonfairTryAcquireShared() 方法:
// Sync
final int nonfairTryAcquireShared(int acquires) {//自旋for (;;) {//獲取可用許可值int available = getState();//計(jì)算剩余的許可值int remaining = available - acquires;//如果剩余許可值小于0,說明許可不夠用了,直接返回,否則CAS更新同步狀態(tài),更新成功返回,否則繼續(xù)自旋if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}
}
該方法本質(zhì)就是一個(gè)自旋方法,通過自旋+CAS來保證修改許可值的線程安全性。方法返回的情況有如下兩種情況
- 信號量不夠,直接返回,返回值為負(fù)數(shù),表示獲取失敗;
- 信號量足夠,且CAS操作成功,返回值為剩余許可值,獲取成功。
2.2.1.2 公平策略下
公平策略下的 tryAcquireShared() 方法如下:
// Semaphore#FairSync
protected int tryAcquireShared(int acquires) {//自旋for (;;) {if (hasQueuedPredecessors())return -1;int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}
}
我們看到它與非公平策略的唯一區(qū)別就是多了下面這個(gè) if 代碼:
protected int tryAcquireShared(int acquires) {for (;;) {if (hasQueuedPredecessors())return -1;......}
}// AbstractQueuedSynchronizer
public final boolean hasQueuedPredecessors() {// The correctness of this depends on head being initialized// before tail and on head.next being accurate if the current// thread is first in queue.Node t = tail; // Read fields in reverse initialization orderNode h = head;Node s;return h != t &&((s = h.next) == null || s.thread != Thread.currentThread());
}
即在獲取共享鎖之前,先調(diào)用 hasQueuedPredecessors() 方法來判斷隊(duì)列中是否存在其他正在排隊(duì)的節(jié)點(diǎn),如果是返回true,否則為false。因此當(dāng)存在其他正在排隊(duì)的節(jié)點(diǎn),當(dāng)前節(jié)點(diǎn)就無法獲取許可,只能排隊(duì)等待,這也是公平策略的體現(xiàn)。
2.2.2 release() 方法
Semaphore 提供 release() 方法來釋放許可。我們繼續(xù)分析 release() 方法,源碼如下:
// Semaphore
public void release() {sync.releaseShared(1);
}//AbstractQueuedSynchronizer
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {//如果釋放鎖成功,喚醒正在排隊(duì)的節(jié)點(diǎn)doReleaseShared();return true;}return false;
}//Semaphore#Sync
protected final boolean tryReleaseShared(int releases) {//自旋for (;;) {//獲取許可值int current = getState();//計(jì)算釋放后的許可值int next = current + releases;//如果釋放后比釋放前的許可值還小,直接報(bào)Errorif (next < current) // overflowthrow new Error("Maximum permit count exceeded");//CAS修改許可值,成功則返回,失敗則繼續(xù)自旋if (compareAndSetState(current, next))return true;}
}
tryReleaseShared() 方法是一個(gè)自旋方法,通過自旋+CAS原子性地修改同步狀態(tài),邏輯很簡單。
2.2.3 其余方法
獲取信號量的方法有四個(gè):
釋放信號量的方法有兩個(gè):
其余獲取和釋放信號量的方法原理同上問,不再贅述。接下來看看其余的工具方法。
2.2.3.1 tryAcquire() 嘗試獲取許可
該方法一共有四種重載形式:
- tryAcquire() :嘗試獲取許可,如果獲取成功返回true,否則返回false,不會阻塞線程,而且不響應(yīng)中斷。
- tryAcquire(int permits) :同上的基礎(chǔ)上,可以指定獲取許可的個(gè)數(shù)。
- tryAcquire(long timeout, TimeUnit unit) :指定超時(shí)時(shí)間,它調(diào)用AQS的tryAcquireSharedNanos() 方法,即共享式超時(shí)獲取。
- tryAcquire(int permits, long timeout, TimeUnit unit) :可以指定獲取許可的個(gè)數(shù)和超時(shí)時(shí)間。
//Semaphore
public boolean tryAcquire() {return sync.nonfairTryAcquireShared(1) >= 0;
}public boolean tryAcquire(int permits) {if (permits < 0) throw new IllegalArgumentException();return sync.nonfairTryAcquireShared(permits) >= 0;
}public boolean tryAcquire(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}public boolean tryAcquire(int permits, long timeout, TimeUnit unit)throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
2.2.3.2 availablePermits() 獲取可用許可數(shù)
源碼如下:
//Semaphore
public int availablePermits() {//獲取可用許可數(shù)return sync.getPermits();
}//Sync
//獲取可用許可數(shù)
final int getPermits() {return getState();
}
2.2.3.3 drainPermits() 耗光信號量
將剩下的信號量一次性消耗光,并且返回所消耗的信號量。
//Semaphore
public int drainPermits() {return sync.drainPermits();
}//Sync
final int drainPermits() {//自旋操作for (;;) {//獲取信號量值int current = getState();//如果信號量為0,直接返回//否則CAS修改為0,成功則返回,否則繼續(xù)自旋if (current == 0 || compareAndSetState(current, 0))return current;}
}
2.2.3.4 reducePermits() 減少信號量
reducePermits() 和 acquire() 方法相比都是減少信號量的值,但是 reducePermits() 不會導(dǎo)致任何線程阻塞,即只要傳遞的參數(shù) reductions(減少的信號量的數(shù)量)大于0,操作就會成功。所以調(diào)用該方法可能會導(dǎo)致信號量最終為負(fù)數(shù)。
//Semaphore
protected void reducePermits(int reduction) {if (reduction < 0) throw new IllegalArgumentException();sync.reducePermits(reduction);
}//Sync
final void reducePermits(int reductions) {//自旋for (;;) {//獲取當(dāng)前信號量值int current = getState();//計(jì)算剩余許可值int next = current - reductions;if (next > current) // underflowthrow new Error("Permit count underflow");//CAS修改同步狀態(tài),成功則返回,失敗則繼續(xù)自旋if (compareAndSetState(current, next))return;}
}
三、使用案例
這里以經(jīng)典的停車作為案例。假設(shè)停車場有3個(gè)停車位,此時(shí)有5輛汽車需要進(jìn)入停車場停車。
public static void main(String[] args) {//定義semaphore實(shí)例,設(shè)置許可數(shù)為3,即停車位為3個(gè)Semaphore semaphore = new Semaphore(3);//創(chuàng)建五個(gè)線程,即有5輛汽車準(zhǔn)備進(jìn)入停車場停車for (int i = 1; i <= 5; i++) {new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + "嘗試進(jìn)入停車場...");//嘗試獲取許可semaphore.acquire();//模擬停車long time = (long) (Math.random() * 10 + 1);System.out.println(Thread.currentThread().getName() + "進(jìn)入了停車場,停車" + time +"秒...");Thread.sleep(time);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println(Thread.currentThread().getName() + "開始駛離停車場...");//釋放許可semaphore.release();System.out.println(Thread.currentThread().getName() + "離開了停車場!");}}, i + "號汽車").start();}
}
//執(zhí)行結(jié)果
1號汽車嘗試進(jìn)入停車場...
5號汽車嘗試進(jìn)入停車場...
4號汽車嘗試進(jìn)入停車場...
3號汽車嘗試進(jìn)入停車場...
2號汽車嘗試進(jìn)入停車場...
5號汽車進(jìn)入了停車場,停車5秒...
1號汽車進(jìn)入了停車場,停車8秒...
4號汽車進(jìn)入了停車場,停車9秒...
5號汽車開始駛離停車場...
5號汽車離開了停車場!
3號汽車進(jìn)入了停車場,停車10秒...
1號汽車開始駛離停車場...
1號汽車離開了停車場!
2號汽車進(jìn)入了停車場,停車2秒...
4號汽車開始駛離停車場...
4號汽車離開了停車場!
2號汽車開始駛離停車場...
2號汽車離開了停車場!
3號汽車開始駛離停車場...
3號汽車離開了停車場!