自建網(wǎng)站營(yíng)銷是什么上海關(guān)鍵詞排名軟件
文章目錄
- AtomicLong實(shí)現(xiàn)原理
- 遞增和遞減操作代碼
- 總結(jié)
- LongAdder實(shí)現(xiàn)原理
- 實(shí)現(xiàn)原理
- LongAdder 代碼分析
- 構(gòu)造方法
- sum方法
- reset方法
- sumThenReset方法
- longValue方法
- add 方法
- longAccumulate 方法
- 總結(jié)
JUC 包提供 了一系列的原子性操作類,這些類都是使用非阻塞算法 CAS 實(shí)現(xiàn)的 ,相比使用鎖實(shí)現(xiàn)原子性操作這在性能上有很大提高。本篇主要以 AtomicLong 為例進(jìn)行原子操作類的講解,找到原子操作類的不足并提出解決方案。
AtomicLong實(shí)現(xiàn)原理
AtomicLong 是原子性遞增或者遞減類,其內(nèi)部使用 Unsafe 來(lái)實(shí)現(xiàn),由于AtomicLong 也是在rt包下,都是由BootStarp 類加載器加載,所以獲取Unsafe 時(shí)直接使用 Unsafe.getUnsafe ( )方法就可以獲取到。
遞增和遞減操作代碼
// ( 6 )調(diào)用 unsafe方法, 原子性設(shè)置value值為原始值+1 ,返回值為遞增后的值
public final long incrementAndGet() {
return unsafe.getAddAddLong(this, valueOffset , lL) + 1L ;
}
// ( 7 )調(diào)用 unsafe方法,原子性設(shè)置 value{直為原始值- 1 ,返回值為遞減之后的值
public final long decrementAndGet() {
return unsafe . getAddAddLong(this, valueOffset , - lL) - 1L ;
}
// (8 )調(diào)用 unsafe方法,原子性設(shè)置value值為原始值+1 , 返回值為原始值
public final long getAndincrement() {
return unsafe .getAndAddLong(this , valueOffset , 1L) ;
}
// ( 9 )調(diào)用 unsafe方法,原子性設(shè)置 value值為原始值- 1 ,返回位為原始值
public final long getAndDecrement( ) {
return unsafe.getAndAddLong (this , valueOffset ,- 1L) ;
}
在如上代碼內(nèi)部都是通過(guò)調(diào)用 Unsafe 的 getAndAddLong 方法來(lái)實(shí)現(xiàn)操作,這個(gè)函數(shù)
是個(gè)原子性操作,這里第一個(gè)參數(shù)是 AtomicLong 實(shí)例的引用 , 第二個(gè)參數(shù)是 value 變量在 AtomicLong 中的偏移值,第三個(gè)參數(shù)是要設(shè)置的第二個(gè)變量的值。
其中, getAndlncrement 方法在 JDK 7 中的實(shí)現(xiàn)邏輯為
public final long getAndincrement () {
while (true) {
long current= get() ;
long next = current + 1 ;
if (compareAndSet(current , next))
return current ;
}
}
在如上代碼中,每個(gè)線程是先拿到變量的當(dāng)前值(由于 value 是 volatile 變量,所以這
里拿到的是最新的值),然后在工作內(nèi)存中對(duì)其進(jìn)行增 1 操作 ,而后使用 CAS 修改變量的值。如果設(shè)置失敗,則循環(huán)繼續(xù)嘗試 , 直到設(shè)置成功 。
而 JDK8 中的邏輯為
public final long getAndAddLong(Object paramObject , long paramLongl , long paramLong2)
{
long l ;
do {
l = getLongvolatile(paramObject , paramLongl) ;
) while (!compareAndSwapLong(param0bject , paramLong1 , 1, 1 + paramLong2) );
return l ;
}
可以看到, JDK 7 的 AtomicLong 中的循環(huán)邏輯已經(jīng)被 JDK 8 中的原子操作類 UNsafe
內(nèi)置了 , 之所以內(nèi)置應(yīng)該是考慮到這個(gè)函數(shù)在其他地方也會(huì)用到,而內(nèi) 置可以提高復(fù)用性 。
public final boolean compareAndSet(long expect , long update) {
return unsafe.compareAddSwapLong ( this , valueOffset , expect , update) ;
}
由如上代碼可知,在內(nèi)部還是調(diào)用了 unsafe.compareAndSwapLong 方法 。 如果原子變量中的 value 值等于expect則使用 update 值更新該值并返回 true,否則返回 false 。
總結(jié)
經(jīng)過(guò)上面代碼的介紹,我們發(fā)現(xiàn)cas的實(shí)現(xiàn)是依賴于 unsafe 實(shí)現(xiàn)的,在進(jìn)行遞增遞減時(shí)會(huì)采用循環(huán)的方式去cas更新原始值。這樣的好處是不需要使用鎖進(jìn)行阻塞來(lái)保障數(shù)據(jù)安全,直接利用硬件自帶的比較替換方式更新數(shù)據(jù),相對(duì)來(lái)說(shuō)更加輕量級(jí)。但是如果在高并發(fā)下,這種比較替換會(huì)十分頻繁,導(dǎo)致CPU不斷被占用,進(jìn)而導(dǎo)致監(jiān)控中出現(xiàn)CPU使用告警。為解決這個(gè)問(wèn)題,在JDK8中出現(xiàn)了LongAdder。
LongAdder實(shí)現(xiàn)原理
既然 AtomicLong 的性能瓶頸是由于過(guò) 多 線程同時(shí)去競(jìng)爭(zhēng)一個(gè)變量的更新而產(chǎn)生的,那么如果把一個(gè)變量分解為多個(gè)變量,讓同樣多的線程去競(jìng)爭(zhēng)多個(gè)資源 ,是不是就解決了性能問(wèn)題?是的, LongAdder 就是這個(gè)思路 。
實(shí)現(xiàn)原理
使用 LongAdder 時(shí),則是在內(nèi)部維護(hù)多個(gè) Ce ll 變量,每個(gè) Cell 里面有一個(gè)初始值為 0 的 long 型變量,這樣,在同等并發(fā)量的情況下,爭(zhēng)奪單個(gè)變量更新操作的線程量會(huì)減少,這變相地減少了 爭(zhēng)奪共享資源的并發(fā)量。另 外,多個(gè)線程在爭(zhēng)奪同一個(gè) Cell 原子變量時(shí)如果失敗了 , 它并不是在當(dāng)前 Cell 變量上一直自旋 CAS 重試,而是嘗試在其他 Cell 的變量上進(jìn)行 CAS 嘗試 ,這個(gè)改變?cè)黾恿水?dāng)前線程重試 CAS 成功的可能性 。最后,在獲取 LongAdder 當(dāng)前值時(shí), 是把所有 Cell 變量的 value 值累加后再加上 base返回的 。
LongAdder 維護(hù)了 一個(gè)延遲初始化的原子性更新數(shù)組(默認(rèn)情況 下 Cell 數(shù)組是 null )和 一個(gè)基值變量 base 。 由于 Cells 占用的內(nèi)存是相對(duì)比較大的,所以一開(kāi)始并不創(chuàng)建它,而是在需要時(shí)創(chuàng)建,也就是惰性加載。
當(dāng)一開(kāi)始判斷 Cell 數(shù)組是 null 并且并發(fā)線程較少時(shí),所有 的 累加操作都是對(duì) base 變量進(jìn)行的 。 保持 Ce ll 數(shù)組的大小為 2 的 N 次方,在初始化時(shí) Cell 數(shù)組中的 Cell 元素個(gè)數(shù)為 2,數(shù)組里面的變量實(shí)體是 Cell 類型。 Cell 類型是 AtomicLong 的一個(gè)改進(jìn),用來(lái)減少緩存的爭(zhēng)用,也就是解決偽共享問(wèn)題 。
對(duì)于大多數(shù)孤立的多個(gè)原子操作進(jìn)行字節(jié)填充是浪費(fèi)的,因?yàn)樵有圆僮鞫际菬o(wú)規(guī)律地分散在內(nèi)存中的 (也就是說(shuō)多個(gè)原子性變量的內(nèi)存地址是不連續(xù) 的), 多個(gè)原子變量被放入同 一個(gè)緩存行的可能性很小 。 但是原子性數(shù)組元素的內(nèi)存地址是連續(xù)的,所以數(shù)組內(nèi)的 多個(gè)元素能經(jīng)常共享緩存行,因此這里使用 @sun.misc.Contended 注解對(duì)Cell 類進(jìn)行字節(jié)填充,這防止了 數(shù)組中多個(gè)元素共享一個(gè)緩存行,在性能上是一個(gè)提升。
LongAdder 代碼分析
下面圍繞以下話題從源碼角度來(lái)分析 LongAdder 的實(shí)現(xiàn):
LongAdder 類繼承自 Striped64 類,在 Striped64 內(nèi)部維護(hù)著三個(gè)變量。LongAdder 的真實(shí)值其實(shí)是 base 的值與 Cell 數(shù)組里面所有 Cell 元素中的 value 值的累加,base 是個(gè)基礎(chǔ)值,默認(rèn)為 0 。 cellsBusy 用來(lái)實(shí)現(xiàn)自旋鎖,狀態(tài)值只有 0 和 l ,當(dāng)創(chuàng)建 Cell 元素,擴(kuò)容 Cell 數(shù)組或者初始化 Cell 數(shù)組時(shí),使用 CAS 操作該變量來(lái)保證同時(shí)只有一個(gè)線程可以進(jìn)行其中之一的操作 。
構(gòu)造方法
``
@sun .misc.Contended static final class Cell {
volatile long value ;
Cell (long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this , valueOffset , cmp , val) ;
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE ;
private static final long valueOffset ;
static {
try {
UNSAFE = sun. misc .Unsafe . getUnsafe() ;
Class<?> ak =Cell . class ;
valueOffset = UNSAFE . objectFieldOffset
(ak . getDeclaredField (” value” )) ;
} catch (Exception e) {
throw new Error(e) ;
}
}
}
``
可以看到, Cell 的構(gòu)造很簡(jiǎn)單,其內(nèi)部維護(hù)一個(gè)被聲明為 volatile 的變量 , 這里聲 明
為 volatile 是因?yàn)榫€程操作 value 變量時(shí)沒(méi)有使用鎖 , 為 了 保證變量的內(nèi)存可見(jiàn)性這里將其聲 明為 volatile 的 。另外 cas 函數(shù)通過(guò) CAS 操作,保證了當(dāng)前線程更新時(shí)被分配的 Cell 元素 中 value 值的原子性。另外 , Cell 類使用@sun.misc .Contended 修飾是為了避免偽共享。
sum方法
long sum()返回 當(dāng)前的值 ,內(nèi) 部操作是累加所有 Cell 內(nèi) 部的 value 值后再累加 base 。
例如下面的代碼 , 由于計(jì)算總和時(shí)沒(méi)有對(duì) Cell 數(shù)組進(jìn)行加鎖,所以在累加過(guò)程中
可能有其他線程對(duì) Cell 中 的值進(jìn)行了修改 , 也有可能對(duì)數(shù)組進(jìn)行了擴(kuò)容,所 以 sum
返回的值并不是非常精確的, 其返回值并不是一個(gè)調(diào)用 sum 方法時(shí)的原子快照值 。
``
public long sum() {
Cell[] as = cells; Cell a;long sum = base ;if (as != null) {for (int i = 0 ; i < as.length ; ++i) {if ( (a = as[i] ) != null)sum += a . value;} }
return sum;
}
``
reset方法
void reset()為重置操作 , 如下代碼把 base 置為 0 , 如果 Cell 數(shù)組有元素 ,則元素值被重置為 0 。
``
public void reset () {
Cell[] as= cells ; Cell a;
base = 0L ;
if (as 1= null ) {
for (int i = 0 ; i< as . length ; ++i) {
if ((a=as[i]) != null) {
a.value = 0L ;
}
}
}
``
sumThenReset方法
long sumThenReset() 是 sum 的改造版本,在使用 sum 累加對(duì)應(yīng)的 Cell 值后,
把當(dāng)前 Cell 的值重置為 0, base 重置為 0。這樣 , 當(dāng)多線程調(diào)用該方法時(shí)會(huì)有問(wèn)題,
比如考慮第一個(gè)調(diào)用 線程清空 Cell 的值,則后一個(gè)線程調(diào)用時(shí)累加的都是 0 值 。
longValue方法
long longValue() 等價(jià)于 sum() 。
add 方法
``
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;if ((as = cells) != null || !casBase(b = base, b + x)) {boolean uncontended = true;if (as == null || (m = as.length - 1) < 0 ||(a = as[getProbe() & m]) == null ||!(uncontended = a.cas(v = a.value, v + x)))longAccumulate(x, null, uncontended);}
}
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
``
代碼 (1)首先看 cells 是否為 null ,如果為 null 則當(dāng)前在基礎(chǔ)變量 base 上進(jìn)行累加 ,
這時(shí)候就類似 AtomicLong 的操作 。
如果 cells 不為 null 或者線程執(zhí)行代碼( 1 )的 CAS 操作失敗了, 則會(huì)去執(zhí)行代碼 。 ) 。代碼 ( 2 )( 3 )決定當(dāng)前線程應(yīng)該訪 問(wèn) cells 數(shù)組里面的哪一個(gè) Cell 元素,如果當(dāng)前線程映射的元素存在則執(zhí)行代碼(4),使用 CAS 操作去更新分配的 Ce ll 元素 的 value 值,如果當(dāng)前線程映射的元素不存在或者存在但是 CAS 操作失敗則執(zhí)行代碼( 5 )。其實(shí)將代碼(2)(3) (4 )合起來(lái)看就是獲取當(dāng)前線程應(yīng)該訪問(wèn)的 cells 數(shù)組的 Cell 元素,然后進(jìn)行 CAS 更新操作,只是在獲取期間如果有些條件不滿足則會(huì)跳轉(zhuǎn)到代碼( 5 )執(zhí)行。另外當(dāng)前線程應(yīng)該訪 問(wèn) cells 數(shù)組的哪一個(gè) Cell 元素是通過(guò) getProbe() & m 進(jìn)行計(jì)算的 , 其中 m 是當(dāng)前cells 數(shù)組元素個(gè)數(shù) - 1 , getProbe()則用于獲取 當(dāng)前線程中變量 threadLocalRandomProbe 的值,這個(gè)值一開(kāi)始為 0,在代碼( 5 )里面會(huì)對(duì)其進(jìn)行初始化。并且當(dāng)前線程通過(guò)分配的Cell 元素的 cas 函數(shù)來(lái)保證對(duì) Cell 元素 value 值更新的原子性。
longAccumulate 方法
這是 cells 數(shù)組被初始化和擴(kuò)容的地方。
``
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;if ((h = getProbe()) == 0) {ThreadLocalRandom.current(); // force initializationh = getProbe();wasUncontended = true;}boolean collide = false; // True if last slot nonemptyfor (;;) {Cell[] as; Cell a; int n; long v;if ((as = cells) != null && (n = as.length) > 0) {if ((a = as[(n - 1) & h]) == null) {if (cellsBusy == 0) { // Try to attach new CellCell r = new Cell(x); // Optimistically createif (cellsBusy == 0 && casCellsBusy()) {boolean created = false;try { // Recheck under lockCell[] rs; int m, j;if ((rs = cells) != null &&(m = rs.length) > 0 &&rs[j = (m - 1) & h] == null) {rs[j] = r;created = true;}} finally {cellsBusy = 0;}if (created)break;continue; // Slot is now non-empty}}collide = false;}else if (!wasUncontended) // CAS already known to failwasUncontended = true; // Continue after rehashelse if (a.cas(v = a.value, ((fn == null) ? v + x :fn.applyAsLong(v, x))))break;else if (n >= NCPU || cells != as)collide = false; // At max size or staleelse if (!collide)collide = true;else if (cellsBusy == 0 && casCellsBusy()) {try {if (cells == as) { // Expand table unless staleCell[] rs = new Cell[n << 1];for (int i = 0; i < n; ++i)rs[i] = as[i];cells = rs;}} finally {cellsBusy = 0;}collide = false;continue; // Retry with expanded table}h = advanceProbe(h);}else if (cellsBusy == 0 && cells == as && casCellsBusy()) {boolean init = false;try { // Initialize tableif (cells == as) {Cell[] rs = new Cell[2];rs[h & 1] = new Cell(x);cells = rs;init = true;}} finally {cellsBusy = 0;}if (init)break;}else if (casBase(v = base, ((fn == null) ? v + x :fn.applyAsLong(v, x))))break; // Fall back on using base}
}
``
當(dāng)每 個(gè)線程第 一次 執(zhí)行到代碼 ( 6 )時(shí),會(huì)初始化當(dāng)前線程變 量threadLocalRandomProbe 的值,上面也說(shuō)了,這個(gè)變量在計(jì)算當(dāng)前線程應(yīng)該被分配到 cells數(shù)組的哪一個(gè) Cell 元素時(shí)會(huì)用到 。
cells 數(shù)組的初始化是在代碼(14)的中進(jìn)行的 , 其中 cellsBusy 是一 個(gè)標(biāo)示 , 為 0 說(shuō)明當(dāng)前 cells 數(shù)組沒(méi)有在被初始化或者擴(kuò)容, 也沒(méi)有在新建 Cell 元素,為 1則說(shuō)明 cells 數(shù)組在被初始化或者擴(kuò)容,或者當(dāng)前在創(chuàng)建新的 Cell 元素、通過(guò) CAS 操作來(lái)進(jìn)行 0 或 1狀態(tài)的切換,這里使用 casCellsBusy 函數(shù)。假設(shè)當(dāng) 前線程通過(guò) CAS 設(shè)置 cellsBusy 為 1,則當(dāng)前線程開(kāi)始初始化操作,那么這時(shí)候其他線程就不能進(jìn)行擴(kuò)容了 。 如代碼( 14.1 )初始化cells 數(shù)組元 素個(gè)數(shù)為 2 ,然后使用 h&1 計(jì) 算當(dāng)前線程應(yīng)該訪問(wèn) celll 數(shù)組的哪個(gè)位置,也就是使用當(dāng)前線程的 threadLocalRandomProbe 變量值& ( cells 數(shù)組元素個(gè)數(shù)- 1 ),然后標(biāo)示 cells 數(shù)組已經(jīng)被初始化,最后代碼( 14.3 ) 重置了 cellsBusy 標(biāo)記 。 顯然這里沒(méi)有使用CAS 操作,卻是線程安全的,原因是cellsBusy 是 volatile 類型的,這保證了變量的內(nèi)存可見(jiàn)性,另外此時(shí)其他地方的代碼沒(méi)有機(jī)會(huì)修改 cellsBusy 的值 。 在這里初始化的 cells 數(shù)組里面的兩個(gè)元素的值目前還是 null 。
cells 數(shù)組的擴(kuò)容是在代碼 (12 )中進(jìn)行的,對(duì) cells 擴(kuò)容是有條件的,也就是代碼( 10) ( 11 )的條件都不滿足的時(shí)候 。具體就是當(dāng)前 cells 的元素個(gè)數(shù)小于當(dāng)前機(jī)器 CPU 個(gè)數(shù)并且當(dāng)前多個(gè)線程訪 問(wèn)了 cells 中同 一個(gè)元素從而導(dǎo)致沖突使其中 一個(gè)線程 CAS 失敗時(shí)才會(huì)進(jìn)行擴(kuò)容操作。這里為何要涉及 CPU 個(gè)數(shù)呢?其實(shí)在基礎(chǔ)篇中己經(jīng)講過(guò) , 只有當(dāng)每個(gè) CPU 都運(yùn)行一個(gè)線程時(shí)才會(huì)使多線程的效果最佳,也就是當(dāng) cells 數(shù)組元素個(gè)數(shù)與 CPU 個(gè)數(shù)一致時(shí),每個(gè) Cell 都使用 一個(gè) CPU 進(jìn)行處理,這時(shí)性能才是最佳的 。 代碼 (12 )中的擴(kuò)容操作也是先通過(guò) CAS 設(shè)置 cellsBusy 為 1 ,然后才能進(jìn)行擴(kuò)容 。 假設(shè) CAS 成功則執(zhí)行代碼(l2.1)將容量擴(kuò)充為之前的 2 倍,并復(fù)制 Cell 元素到擴(kuò)容后數(shù)組 。 另外,擴(kuò)容后 cells 數(shù)組里面除了包含復(fù)制過(guò)來(lái)的元素外,還包含其他新元素,這些元素的值目前還是 null 。
在代碼 (7) (8)中,當(dāng)前線程調(diào)用 add 方法并根據(jù)當(dāng)前線程的隨機(jī)數(shù)threadLoca!RandomProbe 和 cells 元 素個(gè)數(shù)計(jì)算要訪問(wèn)的 Cell 元素下標(biāo),然后如果發(fā)現(xiàn)對(duì)應(yīng)下標(biāo)元素的值為 null,則新增一個(gè) Cell 元素到 cells 數(shù)組,并且在將其添加到 cells 數(shù)組之前要競(jìng)爭(zhēng)設(shè)置 cellsBusy 為 1 。
代碼( 13 )對(duì) CAS 失敗的線程重新計(jì)算當(dāng)前線程的隨機(jī)值 threadLocalRandomProbe,
以減少下次訪問(wèn) cells 元素時(shí)的沖突機(jī)會(huì)。
總結(jié)
本節(jié)介紹了 JDK 8 中新增的 LongAdder 原子性操作類,該類通過(guò)內(nèi)部 cells 數(shù)組分擔(dān)了高并發(fā)下多線程同時(shí)對(duì)一個(gè)原子變量進(jìn)行更新時(shí)的競(jìng)爭(zhēng)量,讓多個(gè)線程可 以 同時(shí)對(duì) cells數(shù)組里面的元素進(jìn)行并行的更新操作 。 另外,數(shù)組元素 Cell 使用@sun .misc .Contended 注解進(jìn)行修飾 , 這避免了 cells 數(shù)組 內(nèi) 多個(gè)原子變量被放入 同 一個(gè)緩存行 ,也就是避免了 偽共享,這對(duì)性能也是一個(gè)提升 。