一站式企業(yè)服務(wù)平臺是什么搜索熱度和搜索人氣
文章目錄
- CAS
- Atomic 原子類
- 一般原子類
- 針對aba問題 —— AtomicStampedReference
- 針對大量自旋問題 —— LongAdder
CAS
原理大致如下:
在java的 Unsafe 類里封裝了一些 cas 的api。以 compareAndSetInt 為例,來看看其底層實(shí)現(xiàn)。
可以發(fā)現(xiàn),最終會調(diào)用到 Atomic::cmpxchg 方法(Atomic::cmpxchg 在不同的操作系統(tǒng)中實(shí)現(xiàn)有所不同,上圖所示是 linux_x86 的代碼)。
Atomic::cmpxchg (exchange_value,dest, compare_value)
compare_value:是期望的當(dāng)前值,如果 destination 的當(dāng)前值等于 compare_value,則進(jìn)行替換操作。
dest: 是要進(jìn)行比較和替換的內(nèi)存位置(通常是一個變量或內(nèi)存地址)。
exchange_value: 是要設(shè)置到 destination 的新值。
返回:CAS成功返回替換值,CAS失敗返回原值。
Atomic::cmpxchg 底層是 cmpxchgl 指令,該指令是一個硬件層面上的原子操作。除此之外如果是多核架構(gòu)還加入lock前綴指令以實(shí)現(xiàn)內(nèi)存屏障的效果。
cmpxchgl指令:首先比較 dest 指向的內(nèi)存值是否和 compare_value 值相等,如果相等,則交換 dest 與 exchange_value,否則就單方面將 dest 指向的值賦給exchange_value。
Atomic 原子類
在理解了cas的底層實(shí)現(xiàn)后,再來看原子類。
一般原子類
一般的源自類無非就是直接調(diào)用了 Unsafe 中 cas 相關(guān)api。
以AtomicInteger為例:
而上面是在不考慮CAS自身缺陷情況下的一個封裝,如果考慮上CAS缺陷,原子類該如何應(yīng)對?
cas缺陷:1、aba問題 2、可能大量自旋
針對aba問題 —— AtomicStampedReference
試想這么一個場景:從讀取內(nèi)存值開始到執(zhí)行cas原子指令之前,值被修改了兩次,一次+1,一次-1。此時對于cas指令來說,內(nèi)存中的值是不變的,就好像沒被修改過一樣,進(jìn)而指令操作成功。這就是aba問題。
為解決aba問題,jdk增加了 AtomicStampedReference 類。核心設(shè)計(jì)思想是多加版本號的概念。當(dāng)cas替換時不僅比對內(nèi)存中的值是否是期望值,還會去比對版本號是否跟一開始的相等, 如果不相等,所以在這期間被修改過了,此時就cas失敗。
除了 AtomicStampedReference 還有 AtomicMarkableReference 。區(qū)別在于 AtomicMarkableReference 并不關(guān)心修改了多少次,只關(guān)心是否被修改過。所以mark是一個boolean類型。
針對大量自旋問題 —— LongAdder
試想這么一個場景,假如同一時刻大量線程對庫存數(shù)量-1,此時必然會有大量的自旋線程占用cpu資源。這明顯是一個資源的浪費(fèi)。如果不用cas,加鎖的話,就會有線程切換,對性能必然會有所影響。有沒有那么一種方案,既不加鎖阻塞線程,又可以避免大量的自旋呢?
為解決這類場景的問題,jdk 1.8 引入了 LongAdder。
除 LongAdder 外還有 DoubleAdder、DoubleAccumulator、LongAccumulator,基本原理是一樣的,本文以 LongAdder為例。
LongAccumulator相當(dāng)于LongAdder的增強(qiáng)版。LongAdder只能針對數(shù)值的進(jìn)行加減運(yùn)算,而LongAccumulator提供了自定義的函數(shù)操作。
LongAdder的基本思路:
分散熱點(diǎn),將value值分散到數(shù)組中,不同線程會命中到數(shù)組的不同槽,每個線程只對自己槽中的值進(jìn)行CAS,這樣熱點(diǎn)就被分散了,沖突的概率就小很多。如果要獲取真正的long值,只需將各個槽中的變量值累加返回即可。
重要變量:base變量 + Cells[]數(shù)組
base變量:非競態(tài)條件下,直接累加到該變量上
Cells[]數(shù)組:競態(tài)條件下,累加個各個線程自己的槽Cell[i]中
接下來從源碼看看 LongAdder 是如何累加的:
LongAdder#add
public void add(long x) {Cell[] cs; long b, v; int m; Cell c;if ((cs = cells) != null || !casBase(b = base, b + x)) {boolean uncontended = true;if (cs == null || (m = cs.length - 1) < 0 ||(c = cs[getProbe() & m]) == null ||!(uncontended = c.cas(v = c.value, v + x)))longAccumulate(x, null, uncontended);}}
總之就是當(dāng) cells 還為初始化就直接cas base變量,如果一開始cells就已經(jīng)初始化,或者cas base 變量失敗就cas cells數(shù)組,如果cas cells數(shù)組也失敗就進(jìn)入Striped64#longAccumulate。
Striped64#longAccumulate
final void longAccumulate(long x, LongBinaryOperator fn,boolean wasUncontended) {//...//自旋循環(huán)done: for (;;) {Cell[] cs; Cell c; int n; long v;//如果cells不為空if ((cs = cells) != null && (n = cs.length) > 0) {//映射的槽位為空,新建并設(shè)置槽位if ((c = cs[(n - 1) & h]) == null) {if (cellsBusy == 0) {Cell r = new Cell(x);if (cellsBusy == 0 && casCellsBusy()) {try {Cell[] rs; int m, j;if ((rs = cells) != null &&(m = rs.length) > 0 &&rs[j = (m - 1) & h] == null) {rs[j] = r;break done;}} finally {cellsBusy = 0;}continue;}}collide = false;}//...(自旋重試的標(biāo)記位處理)//不為空就 cas cell 槽位else if (c.cas(v = c.value, (fn == null) ? v + x : fn.applyAsLong(v, x)))break;//...(自旋重試的標(biāo)記位處理)//cas cell 槽位失敗嘗試擴(kuò)容cellselse if (cellsBusy == 0 && casCellsBusy()) {try {if (cells == cs) // Expand table unless stalecells = Arrays.copyOf(cs, n << 1);} finally {cellsBusy = 0;}collide = false;continue; // Retry with expanded table}h = advanceProbe(h);}//如果為空就新建 cells 并設(shè)置 cell 槽位else if (cellsBusy == 0 && cells == cs && casCellsBusy()) {try {if (cells == cs) {Cell[] rs = new Cell[2];rs[h & 1] = new Cell(x);cells = rs;break done;}} finally {cellsBusy = 0;}}//如果正在擴(kuò)容,直接cas baseelse if (casBase(v = base,(fn == null) ? v + x : fn.applyAsLong(v, x)))break done;}}
總之就是如果cells還未初始化就初始化并設(shè)置此次槽位。如果正在初始化就直接cas base。如果已經(jīng)初始化,就映射槽位,如果槽位為null,就創(chuàng)建槽位并設(shè)值,如果槽位不為null,就cas該槽位,如果槽位cas失敗,就嘗試擴(kuò)容(cells大小 < cpu核數(shù) 時擴(kuò)容)。