公司網(wǎng)站建設(shè)需求書網(wǎng)站設(shè)計公司哪家專業(yè)
作者:后端小肥腸
🍇 我寫過的文章中的相關(guān)代碼放到了gitee,地址:xfc-fdw-cloud: 公共解決方案
🍊 有疑問可私信或評論區(qū)聯(lián)系我。
🥑 ?創(chuàng)作不易未經(jīng)允許嚴禁轉(zhuǎn)載。
目錄
1. 前言
2. 為何要使用分布式鎖?
2.1. 單機場景里的鎖
2.2. 分布式場景里的鎖
3. Redis分布式鎖實現(xiàn)
3.1. SpringBoot實現(xiàn)分布式鎖
3.2. 看門狗方案實現(xiàn)
3.2.1. 開門狗方案原理
3.2.2. 看門狗方案核心代碼
4. 如何使用Redission分布式鎖
4.1. Redission簡介
4.2. SpringBoot使用Redission分布式鎖
5. 結(jié)語
1. 前言
在當今快速發(fā)展的分布式系統(tǒng)中,多個節(jié)點之間的協(xié)調(diào)和一致性成為了一個日益重要的挑戰(zhàn)。隨著云計算、微服務(wù)架構(gòu)和大數(shù)據(jù)處理的普及,系統(tǒng)的復(fù)雜性顯著增加,這使得并發(fā)操作的管理愈發(fā)困難。在這樣的背景下,分布式鎖作為一種重要的機制,能夠有效地防止數(shù)據(jù)競爭和不一致性問題,確保系統(tǒng)的穩(wěn)定與可靠。本文將深入探討分布式鎖的原理、實現(xiàn)方式以及在實際應(yīng)用中的重要性。
2. 為何要使用分布式鎖?
在系統(tǒng)開發(fā)中,尤其是高并發(fā)場景下,多個線程同時操作共享資源是常見的需求。例如,多個線程同時售票、更新庫存、扣減余額等。這些操作如果沒有妥善管理,很容易導致資源競爭、數(shù)據(jù)不一致等問題。在單機環(huán)境中,我們可以通過鎖機制(如 synchronized
或 ReentrantLock
)解決這些問題,但在分布式環(huán)境中,這些機制無法直接使用,需要更復(fù)雜的分布式鎖方案。
2.1. 單機場景里的鎖
在單機環(huán)境中,可以使用線程安全的操作來避免多線程競爭。在以下代碼中,我們通過三種方式逐步引入鎖機制來保障線程安全。
以普通的售票代碼為例,原始代碼:
public class SaleTicket {public static void main(String[] args) throws Exception {Ticket ticket = new Ticket();for (int j = 0; j < 5; j++) { // 創(chuàng)建5個線程模擬并發(fā)new Thread(() -> { // 每個線程執(zhí)行售票操作for (int i = 1; i <= 10000; i++) {ticket.sale();}}).start();}Thread.sleep(5000); // 等待線程執(zhí)行完成ticket.print(); // 打印剩余票數(shù)}
}// 無鎖資源類
class Ticket {// 總票數(shù)private Integer number = new Integer(50000);// 售票方法,無線程安全保障public void sale() {if (number > 0) {number--;}}public void print() {System.out.println("剩余票:" + number);}
}
運行以上代碼,可能會出現(xiàn)以下問題:
- 票數(shù)不一致:多個線程可能同時讀取和修改
number
,導致最終票數(shù)小于 0 或大于實際值。 - 數(shù)據(jù)競爭:線程之間沒有同步機制,數(shù)據(jù)容易被破壞。
解決這一問題的關(guān)鍵在于引入鎖機制,下面我們介紹三種常見的單機鎖實現(xiàn)方式。
1. 使用 AtomicInteger
AtomicInteger
是 Java 提供的線程安全類,使用 CAS(Compare-And-Swap)原子操作實現(xiàn)多線程數(shù)據(jù)一致性。它適合簡單場景,例如遞增、遞減等操作。
代碼示例如下:
import java.util.concurrent.atomic.AtomicInteger;public class SaleTicket {public static void main(String[] args) throws Exception {Ticket ticket = new Ticket();for (int j = 0; j < 5; j++) {new Thread(() -> {for (int i = 1; i <= 10000; i++) {ticket.sale();}}).start();}Thread.sleep(5000); // 等待線程完成ticket.print(); // 打印剩余票數(shù)}
}class Ticket {private AtomicInteger number = new AtomicInteger(50000); // 線程安全的票數(shù)public void sale() {if (number.get() > 0) {number.decrementAndGet(); // 原子操作}}public void print() {System.out.println("剩余票:" + number.get());}
}
優(yōu)點:
- 原子操作,無需顯式加鎖。
- 性能較高,適合簡單的并發(fā)場景。
缺點:
- 不適合復(fù)雜業(yè)務(wù)邏輯,例如多個共享資源需要同時操作的場景。
2. 使用 synchronized
Synchronized
是 Java 提供的關(guān)鍵字,可以用來保證方法或代碼塊的線程安全。它通過內(nèi)部鎖(Monitor)機制,確保同一時間只有一個線程能夠執(zhí)行加鎖的代碼。
代碼示例如下:
public class SaleTicket {public static void main(String[] args) throws Exception {Ticket ticket = new Ticket();for (int j = 0; j < 5; j++) {new Thread(() -> {for (int i = 1; i <= 10000; i++) {ticket.sale();}}).start();}Thread.sleep(5000); // 等待線程完成ticket.print(); // 打印剩余票數(shù)}
}class Ticket {private Integer number = new Integer(50000); // 總票數(shù)public synchronized void sale() {if (number > 0) {number--; // 在鎖保護下操作}}public void print() {System.out.println("剩余票:" + number);}
}
?優(yōu)點:
- 簡單易用,內(nèi)置關(guān)鍵字,便于開發(fā)者理解和使用。
- 適合多線程復(fù)雜操作。
缺點:
- 性能較低,因為線程競爭會導致阻塞。
- 粒度較大,可能降低系統(tǒng)并發(fā)性。
3. 使用 ReentrantLock
ReentrantLock
是 Java 并發(fā)包中的顯式鎖,與 synchronized
相比,它提供了更豐富的功能,例如支持公平鎖、非公平鎖、條件變量等。
代碼示例如下:
import java.util.concurrent.locks.ReentrantLock;public class SaleTicket {public static void main(String[] args) throws Exception {Ticket ticket = new Ticket();for (int j = 0; j < 5; j++) {new Thread(() -> {for (int i = 1; i <= 10000; i++) {ticket.sale();}}).start();}Thread.sleep(5000); // 等待線程完成ticket.print(); // 打印剩余票數(shù)}
}class Ticket {private Integer number = new Integer(50000); // 總票數(shù)private final ReentrantLock lock = new ReentrantLock(); // 顯式鎖public void sale() {lock.lock(); // 加鎖try {if (number > 0) {number--; // 線程安全操作}} finally {lock.unlock(); // 確保釋放鎖}}public void print() {System.out.println("剩余票:" + number);}
}
優(yōu)點:
- 靈活,支持公平鎖、非公平鎖等特性。
- 更適合復(fù)雜的并發(fā)場景。
缺點:
- 必須顯式加鎖和釋放鎖,代碼復(fù)雜度較高。
- 需要正確處理異常,防止死鎖。
在單機場景中,使用鎖機制可以有效解決線程安全問題。對于簡單的操作(如計數(shù)器),可以優(yōu)先使用 AtomicInteger
;如果需要保護復(fù)雜的業(yè)務(wù)邏輯,可以選擇 synchronized
或 ReentrantLock
。
2.2. 分布式場景里的鎖
在單機環(huán)境中,使用線程鎖(如 synchronized
、JUC)可以有效地管理并發(fā)操作,保證數(shù)據(jù)的一致性。但在分布式系統(tǒng)中,多個節(jié)點可能并行執(zhí)行相同的操作,訪問的是共享資源(如數(shù)據(jù)庫、緩存、隊列等)。這就帶來了一個新的問題:如何在不同的節(jié)點之間協(xié)調(diào)資源的訪問?
光說可能你不是很理解,我來舉個例子:
假設(shè)你有一個售票系統(tǒng),多個用戶同時請求購買同一張票。如果沒有分布式鎖,可能會發(fā)生如下情況:
- 用戶 A 和用戶 B 同時查詢到有票可買。
- 用戶 A 和用戶 B 分別進行扣款操作,且系統(tǒng)仍認為票數(shù)未減少,這會導致“超賣”情況。
使用分布式鎖后,只有一個請求可以修改票數(shù),其他請求將被阻塞或等待,直到鎖被釋放,從而避免超賣問題。
在分布式環(huán)境中,多個服務(wù)或節(jié)點可能并發(fā)訪問同一份數(shù)據(jù)。如果沒有適當?shù)臋C制來管理這些并發(fā)操作,就會發(fā)生資源競爭和數(shù)據(jù)不一致等問題。因此,分布式鎖應(yīng)運而生,用于控制不同節(jié)點對共享資源的訪問,確保同一時刻只有一個節(jié)點能夠執(zhí)行某項操作。
常見的分布式鎖應(yīng)用場景:
- 防止超賣:比如多個用戶請求同時購買同一票,或者多個服務(wù)同時修改同一份數(shù)據(jù)。通過分布式鎖,確保只有一個請求能夠操作共享資源,從而避免超賣。
- 避免緩存穿透:多個服務(wù)可能同時訪問緩存失效的數(shù)據(jù),使用分布式鎖可以確保只有一個請求去查詢數(shù)據(jù)庫,其他請求需要等待。
- 確保數(shù)據(jù)一致性:多個微服務(wù)可能會并發(fā)修改同一份數(shù)據(jù),通過分布式鎖來確保同一時刻只有一個服務(wù)能夠修改數(shù)據(jù),從而避免數(shù)據(jù)不一致的風險。
如何在分布式環(huán)境中實現(xiàn)鎖?
分布式鎖的目標是確保不同節(jié)點對共享資源的訪問不沖突。以下是幾種常見的分布式鎖實現(xiàn)方式:
-
基于 Redis 的分布式鎖: Redis 提供了高效的鍵值存儲,可以通過
SETNX
命令(Set if Not eXists)來創(chuàng)建分布式鎖。該命令只有在鎖不存在時才會成功設(shè)置,從而保證了只有一個節(jié)點可以獲取鎖。示例:
SETNX lock_key value
該命令如果成功設(shè)置,表示當前節(jié)點獲得了鎖;如果失敗,表示其他節(jié)點已獲得鎖。
-
基于 ZooKeeper 的分布式鎖: ZooKeeper 是一個分布式協(xié)調(diào)服務(wù),提供了可靠的鎖機制。通過在 ZooKeeper 中創(chuàng)建臨時節(jié)點,當一個節(jié)點成功創(chuàng)建鎖節(jié)點時,其他節(jié)點無法重復(fù)創(chuàng)建,從而實現(xiàn)分布式鎖。(這部分會放到ZooKeeper系列說)
-
基于數(shù)據(jù)庫的分布式鎖: 通過在數(shù)據(jù)庫中創(chuàng)建鎖表,使用數(shù)據(jù)庫行鎖來控制并發(fā)訪問。雖然簡單易用,但性能較低,適合低并發(fā)場景。(本文不講)
-
基于 Redisson 的分布式鎖: Redisson 是一個 Java 客戶端,提供了高效的分布式鎖功能,支持多種鎖類型,如公平鎖、讀寫鎖等。它封裝了 Redis 的原子操作,并提供了更易用的 API,使得在分布式系統(tǒng)中實現(xiàn)鎖機制更加方便。
在實踐中,分布式鎖可以應(yīng)用于多個場景,如防止超賣、確保數(shù)據(jù)一致性和避免緩存穿透等問題。在下一節(jié)中,我們將詳細介紹分布式鎖的具體技術(shù)實現(xiàn),包括redis、Redission、Zookeeper的具體實現(xiàn)技術(shù)細節(jié)。
3. Redis分布式鎖實現(xiàn)
想要實現(xiàn)分布式鎖,必須要求 Redis 有互斥的能力,我們可以使用 SETNX 命令,這個命令表示SET if Not Exists,即如果 key 不存在,才會設(shè)置它的值,否則什么也不做。
兩個客戶端進程可以執(zhí)行這個命令,達到互斥,就可以實現(xiàn)一個分布式鎖。
客戶端 1 申請加鎖,加鎖成功:
客戶端 2 申請加鎖,因為它后到達,加鎖失敗:
此時,加鎖成功的客戶端,就可以去操作共享資源,例如,修改 數(shù)據(jù)庫?的某一行數(shù)據(jù),或者調(diào)用一個 API 請求。
操作完成后,還要及時釋放鎖,給后來者讓出操作共享資源的機會。如何釋放鎖呢?
也很簡單,直接使用 DEL 命令刪除這個 key 即可,這個邏輯非常簡單。
但是,它存在一個很大的問題,當客戶端 1 拿到鎖后,如果發(fā)生下面的場景,就會造成死鎖:
1、程序處理業(yè)務(wù)邏輯異常,沒及時釋放鎖
2、進程掛了,沒機會釋放鎖
這時,這個客戶端就會一直占用這個鎖,而其它客戶端就永遠拿不到這把鎖了。怎么解決這個問題呢?
如何避免死鎖?
我們很容易想到的方案是,在申請鎖時,給這把鎖設(shè)置一個租期。
在 Redis 中實現(xiàn)時,就是給這個 key 設(shè)置一個過期時間。這里我們假設(shè),操作共享資源的時間不會超過 10s,那么在加鎖時,給這個 key 設(shè)置 10s 過期即可:
SETNX lock 1 ? // 加鎖
EXPIRE lock 10 // 10s后自動過期
這樣一來,無論客戶端是否異常,這個鎖都可以在 10s 后被自動釋放,其它客戶端依舊可以拿到鎖。
但現(xiàn)在還是有問題:
現(xiàn)在的操作,加鎖、設(shè)置過期是 2 條命令,有沒有可能只執(zhí)行了第一條,第二條卻來不及執(zhí)行的情況發(fā)生呢?例如:
-
SETNX 執(zhí)行成功,執(zhí)行EXPIRE 時由于網(wǎng)絡(luò)問題,執(zhí)行失敗
-
SETNX 執(zhí)行成功,Redis 異常宕機,EXPIRE 沒有機會執(zhí)行
-
SETNX 執(zhí)行成功,客戶端異常崩潰,EXPIRE也沒有機會執(zhí)行
總之,這兩條命令不能保證是原子操作(一起成功),就有潛在的風險導致過期時間設(shè)置失敗,依舊發(fā)生死鎖問題。
在 Redis 2.6.12 之后,Redis 擴展了 SET 命令的參數(shù),用這一條命令就可以了:
SET lock 1 EX 10 NX
鎖被別人釋放怎么辦?
上面的命令執(zhí)行時,每個客戶端在釋放鎖時,都是無腦操作,并沒有檢查這把鎖是否還歸自己持有,所以就會發(fā)生釋放別人鎖的風險,這樣的解鎖流程,很不嚴謹!如何解決這個問題呢?
解決辦法是:客戶端在加鎖時,設(shè)置一個只有自己知道的唯一標識進去。
例如,可以是自己的線程 ID,也可以是一個 UUID(隨機且唯一),這里我們以UUID 舉例:
SET lock $uuid EX 20 NX
之后,在釋放鎖時,要先判斷這把鎖是否還歸自己持有,偽代碼可以這么寫:
if redis.get("lock") == $uuid:redis.del("lock")
這里釋放鎖使用的是 GET + DEL 兩條命令,這時,又會遇到我們前面講的原子性問題了。這里可以使用lua腳本來解決。
安全釋放鎖的 Lua 腳本如下:
if redis.call("GET",KEYS[1]) == ARGV[1]
thenreturn redis.call("DEL",KEYS[1])
elsereturn 0
end
好了,這樣一路優(yōu)化,整個的加鎖、解鎖的流程就更嚴謹了。
這里我們先小結(jié)一下,基于 Redis 實現(xiàn)的分布式鎖,一個嚴謹?shù)牡牧鞒倘缦?#xff1a;
1、加鎖
SET lock_key $unique_id EX $expire_time NX
2、操作共享資源
3、釋放鎖:Lua 腳本,先 GET 判斷鎖是否歸屬自己,再DEL 釋放鎖
3.1. SpringBoot實現(xiàn)分布式鎖
只貼核心代碼,redis配置和maven依賴就不貼了:
/*** 分布式鎖的實現(xiàn)*/
@Component
public class RedisDistLock implements Lock {private final static int LOCK_TIME = 5*1000;//失效時間private final static String RS_DISTLOCK_NS = "tdln:"; //加鎖的key的前綴/*if redis.call('get',KEYS[1])==ARGV[1] thenreturn redis.call('del', KEYS[1])else return 0 end*///釋放鎖的時候,確保原子。lua腳本:確保 釋放鎖的線程就是加鎖的線程,不能被線程的線程無腦調(diào)用釋放private final static String RELEASE_LOCK_LUA ="if redis.call('get',KEYS[1])==ARGV[1] then\n" +" return redis.call('del', KEYS[1])\n" +" else return 0 end";/*保存每個線程的獨有的ID值*/private ThreadLocal<String> lockerId = new ThreadLocal<>();/*解決鎖的重入*/private Thread ownerThread;private String lockName = "lock";@Autowiredprivate JedisPool jedisPool;public String getLockName() {return lockName;}public void setLockName(String lockName) {this.lockName = lockName;}public Thread getOwnerThread() {return ownerThread;}public void setOwnerThread(Thread ownerThread) {//加鎖成功,就會把搶到鎖的線程進行保存this.ownerThread = ownerThread;}@Overridepublic void lock() { //redis的分布式鎖while(!tryLock()){try {Thread.sleep(100); //每隔100ms 都會去嘗試加鎖} catch (InterruptedException e) {e.printStackTrace();}}}@Overridepublic void lockInterruptibly() throws InterruptedException {throw new UnsupportedOperationException("不支持可中斷獲取鎖!");}@Overridepublic boolean tryLock() {Thread t = Thread.currentThread();if(ownerThread==t){/*說明本線程持有鎖*/return true;}else if(ownerThread!=null){/*本進程里有其他線程持有分布式鎖*/return false;}Jedis jedis = jedisPool.getResource();try {String id = UUID.randomUUID().toString();SetParams params = new SetParams();params.px(LOCK_TIME);params.nx();synchronized (this){/*線程們,本地搶鎖*/if((ownerThread==null)&&"OK".equals(jedis.set(RS_DISTLOCK_NS+lockName,id,params))){lockerId.set(id);setOwnerThread(t);return true;}else{return false;}}} catch (Exception e) {throw new RuntimeException("分布式鎖嘗試加鎖失敗!");} finally {jedis.close();}}@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {throw new UnsupportedOperationException("不支持等待嘗試獲取鎖!");}@Overridepublic void unlock() {if(ownerThread!=Thread.currentThread()) {throw new RuntimeException("試圖釋放無所有權(quán)的鎖!");}Jedis jedis = null;try {jedis = jedisPool.getResource();Long result = (Long)jedis.eval(RELEASE_LOCK_LUA,Arrays.asList(RS_DISTLOCK_NS+lockName),Arrays.asList(lockerId.get()));if(result.longValue()!=0L){System.out.println("Redis上的鎖已釋放!");}else{System.out.println("Redis上的鎖釋放失敗!");}} catch (Exception e) {throw new RuntimeException("釋放鎖失敗!",e);} finally {if(jedis!=null) jedis.close();lockerId.remove();setOwnerThread(null);System.out.println("本地鎖所有權(quán)已釋放!");}}@Overridepublic Condition newCondition() {throw new UnsupportedOperationException("不支持等待通知操作!");}}
這段代碼實現(xiàn)了一個基于 Redis 的分布式鎖 (RedisDistLock
) 類。它實現(xiàn)了 Lock
接口,用于在分布式環(huán)境中對資源進行加鎖和解鎖,確保同一時間只有一個線程可以操作共享資源。主要功能包括:
- 加鎖 (
lock
方法):嘗試獲取分布式鎖,如果當前線程未持有鎖,則進入循環(huán)每 100 毫秒重試,直到成功獲得鎖。 - 釋放鎖 (
unlock
方法):只有持有鎖的線程才能釋放鎖,利用 Lua 腳本確保鎖的釋放操作是原子的,避免鎖被錯誤釋放。 - 鎖的重入:通過檢查當前線程是否已經(jīng)持有鎖,來支持鎖的重入機制,避免死鎖。
- 線程唯一標識:每個線程持有一個唯一的
lockerId
,用于標識和驗證鎖的擁有者。 - 不支持中斷和等待超時的加鎖:該實現(xiàn)不支持可中斷的加鎖操作,也不支持在指定時間內(nèi)嘗試獲取鎖。
該鎖使用了 Redis 的 SET
命令來加鎖,并且使用 Lua 腳本確保釋放鎖時不會被其他線程誤釋放。
3.2. 看門狗方案實現(xiàn)
3.2.1. 開門狗方案原理
在分布式系統(tǒng)中,使用 Redis 鎖時,如果業(yè)務(wù)邏輯執(zhí)行時間超過鎖的過期時間,可能會引發(fā)鎖的提前釋放問題,進而導致并發(fā)沖突。為了避免這種情況,可以引入 看門狗機制。
未加看門狗機制時分布式場景的工作流程:
- 加鎖:客戶端(如客戶端 C)請求加鎖,Redis 設(shè)置?
lock_key
?并附加一個過期時間(例如 10 秒)。 - 業(yè)務(wù)執(zhí)行:客戶端在鎖的保護下,執(zhí)行業(yè)務(wù)邏輯。
- 解鎖:業(yè)務(wù)邏輯執(zhí)行完畢后,客戶端主動釋放鎖。
潛在問題:
- 如果業(yè)務(wù)邏輯的執(zhí)行時間超過鎖的過期時間(如大于 10 秒),在客戶端釋放鎖之前,鎖已經(jīng)因為過期而自動釋放。
- 鎖釋放后,其他客戶端(如客戶端 A 或 B)可能搶到鎖,導致多個客戶端同時執(zhí)行相同的業(yè)務(wù)邏輯,發(fā)生并發(fā)沖突。
如下圖所示:
為了解決鎖提前釋放的問題,可以引入?看門狗機制,通過定期續(xù)期保證鎖在業(yè)務(wù)邏輯執(zhí)行完成前不會被自動釋放。
看門狗機制的工作原理:
-
加鎖后啟動看門狗:
- 客戶端C加鎖后,啟動一個守護線程(看門狗)。
- 守護線程會定期檢查鎖的過期時間。
-
定期續(xù)期:
- 守護線程每隔一段時間(例如 5 秒)檢查鎖是否即將過期。
- 如果鎖沒有被解鎖且仍然屬于當前客戶端,則向 Redis 請求續(xù)期(如將鎖的過期時間從 10 秒延長到 20 秒)。
- 這樣,即使業(yè)務(wù)邏輯的執(zhí)行時間超過初始過期時間,鎖也不會過期。
-
主動釋放鎖:
- 客戶端在業(yè)務(wù)邏輯執(zhí)行完畢后,主動釋放鎖。
- 此時,看門狗線程會停止續(xù)期,鎖正常釋放。
如下圖展示了看門狗機制的具體流程:
3.2.2. 看門狗方案核心代碼
只貼核心代碼:
RedisDistLockWithDog
@Component
public class RedisDistLockWithDog implements Lock {private final static int LOCK_TIME = 1*1000;private final static String LOCK_TIME_STR = String.valueOf(LOCK_TIME);private final static String RS_DISTLOCK_NS = "tdln2:";/*if redis.call('get',KEYS[1])==ARGV[1] thenreturn redis.call('del', KEYS[1])else return 0 end*/private final static String RELEASE_LOCK_LUA ="if redis.call('get',KEYS[1])==ARGV[1] then\n" +" return redis.call('del', KEYS[1])\n" +" else return 0 end";/*還有并發(fā)問題,考慮ThreadLocal*/private ThreadLocal<String> lockerId = new ThreadLocal<>();private Thread ownerThread;private String lockName = "lock";@Autowiredprivate JedisPool jedisPool;public String getLockName() {return lockName;}public void setLockName(String lockName) {this.lockName = lockName;}public Thread getOwnerThread() {return ownerThread;}public void setOwnerThread(Thread ownerThread) {this.ownerThread = ownerThread;}@Overridepublic void lock() {while(!tryLock()){try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}}@Overridepublic void lockInterruptibly() throws InterruptedException {throw new UnsupportedOperationException("不支持可中斷獲取鎖!");}@Overridepublic boolean tryLock() {Thread t=Thread.currentThread();/*說明本線程正在持有鎖*/if(ownerThread==t) {return true;}else if(ownerThread!=null){/*說明本進程中有別的線程正在持有分布式鎖*/return false;}Jedis jedis = null;try {jedis = jedisPool.getResource();/*每一個鎖的持有人都分配一個唯一的id,也可采用snowflake算法*/String id = UUID.randomUUID().toString();SetParams params = new SetParams();params.px(LOCK_TIME); //加鎖時間1sparams.nx();synchronized (this){if ((ownerThread==null)&&"OK".equals(jedis.set(RS_DISTLOCK_NS+lockName,id,params))) {lockerId.set(id);setOwnerThread(t);if(expireThread == null){//看門狗線程啟動expireThread = new Thread(new ExpireTask(),"expireThread");expireThread.setDaemon(true);expireThread.start();}//往延遲阻塞隊列中加入元素(讓看門口可以在過期之前一點點的時間去做鎖的續(xù)期)delayDog.add(new ItemVo<>((int)LOCK_TIME,new LockItem(lockName,id)));System.out.println(Thread.currentThread().getName()+"已獲得鎖----");return true;}else{System.out.println(Thread.currentThread().getName()+"無法獲得鎖----");return false;}}} catch (Exception e) {throw new RuntimeException("分布式鎖嘗試加鎖失敗!",e);} finally {jedis.close();}}@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {throw new UnsupportedOperationException("不支持等待嘗試獲取鎖!");}@Overridepublic void unlock() {if(ownerThread!=Thread.currentThread()) {throw new RuntimeException("試圖釋放無所有權(quán)的鎖!");}Jedis jedis = null;try {jedis = jedisPool.getResource();Long result = (Long)jedis.eval(RELEASE_LOCK_LUA,Arrays.asList(RS_DISTLOCK_NS+lockName),Arrays.asList(lockerId.get()));System.out.println(result);if(result.longValue()!=0L){System.out.println("Redis上的鎖已釋放!");}else{System.out.println("Redis上的鎖釋放失敗!");}} catch (Exception e) {throw new RuntimeException("釋放鎖失敗!",e);} finally {if(jedis!=null) jedis.close();lockerId.remove();setOwnerThread(null);}}@Overridepublic Condition newCondition() {throw new UnsupportedOperationException("不支持等待通知操作!");}/*看門狗線程*/private Thread expireThread;//通過delayDog 避免無謂的輪詢,減少看門狗線程的輪序次數(shù) 阻塞延遲隊列 刷1 沒有刷2private static DelayQueue<ItemVo<LockItem>> delayDog = new DelayQueue<>();//續(xù)鎖邏輯:判斷是持有鎖的線程才能續(xù)鎖private final static String DELAY_LOCK_LUA ="if redis.call('get',KEYS[1])==ARGV[1] then\n" +" return redis.call('pexpire', KEYS[1],ARGV[2])\n" +" else return 0 end";private class ExpireTask implements Runnable{@Overridepublic void run() {System.out.println("看門狗線程已啟動......");while(!Thread.currentThread().isInterrupted()) {try {LockItem lockItem = delayDog.take().getData();//只有元素快到期了才能take到 0.9sJedis jedis = null;try {jedis = jedisPool.getResource();Long result = (Long)jedis.eval(DELAY_LOCK_LUA,Arrays.asList(RS_DISTLOCK_NS+lockItem.getKey ()),Arrays.asList(lockItem.getValue(),LOCK_TIME_STR));if(result.longValue()==0L){System.out.println("Redis上的鎖已釋放,無需續(xù)期!");}else{delayDog.add(new ItemVo<>((int)LOCK_TIME,new LockItem(lockItem.getKey(),lockItem.getValue())));System.out.println("Redis上的鎖已續(xù)期:"+LOCK_TIME);}} catch (Exception e) {throw new RuntimeException("鎖續(xù)期失敗!",e);} finally {if(jedis!=null) jedis.close();}} catch (InterruptedException e) {System.out.println("看門狗線程被中斷");break;}}System.out.println("看門狗線程準備關(guān)閉......");}}// @PostConstruct
// public void initExpireThread(){
//
// }@PreDestroypublic void closeExpireThread(){if(null!=expireThread){expireThread.interrupt();}}
}
針對3.1小節(jié)中的分布式鎖而言,看門狗方案主要做了以下改進?:
1. 鎖的過期時間設(shè)計
private?final?static?int?LOCK_TIME?=?1*1000;?//?鎖的過期時間為1秒
- 相比RedisDistLock的5秒,這里故意設(shè)置較短的過期時間
- 通過看門狗機制來自動續(xù)期,避免業(yè)務(wù)執(zhí)行時間過長導致鎖過期
2. 看門狗機制的核心實現(xiàn)
// 看門狗相關(guān)的成員變量
private Thread expireThread;
private static DelayQueue<ItemVo<LockItem>> delayDog = new DelayQueue<>();// 續(xù)期的Lua腳本
private final static String DELAY_LOCK_LUA ="if redis.call('get',KEYS[1])==ARGV[1] then\n" +" return redis.call('pexpire', KEYS[1],ARGV[2])\n" +" else return 0 end";
- 使用DelayQueue來實現(xiàn)延遲任務(wù),避免無效的輪詢
- 通過Lua腳本保證續(xù)期操作的原子性
3. 加鎖時啟動看門狗
if?((ownerThread==null)?&&?"OK".equals(jedis.set(RS_DISTLOCK_NS+lockName,id,params)))?{lockerId.set(id);setOwnerThread(t);if(expireThread?==?null){??//?啟動看門狗線程expireThread?=?new?Thread(new?ExpireTask(),"expireThread");expireThread.setDaemon(true);expireThread.start();}//?添加續(xù)期任務(wù)到延遲隊列delayDog.add(new?ItemVo<>((int)LOCK_TIME,new?LockItem(lockName,id)));return?true;}
4. 自動續(xù)期的實現(xiàn)
public?void?run()?{while(!Thread.currentThread().isInterrupted())?{try?{LockItem?lockItem?=?delayDog.take().getData();??//?阻塞等待直到快過期//?執(zhí)行續(xù)期Long?result?=?(Long)jedis.eval(DELAY_LOCK_LUA,Arrays.asList(RS_DISTLOCK_NS+lockItem.getKey()),Arrays.asList(lockItem.getValue(),LOCK_TIME_STR));if(result.longValue()!=0L){//?續(xù)期成功,繼續(xù)添加下一次續(xù)期任務(wù)delayDog.add(new?ItemVo<>((int)LOCK_TIME,new?LockItem(lockItem.getKey(),lockItem.getValue())));}}?catch?(InterruptedException?e)?{break;}}}
5. 優(yōu)化的續(xù)期時機
public?ItemVo(long?expirationTime,?T?data)?{//?提前100ms進行續(xù)期this.activeTime?=?expirationTime+System.currentTimeMillis()-100;this.data?=?data;}
主要優(yōu)勢在于:
- 解決了長時間業(yè)務(wù)導致鎖過期的問題
- 使用DelayQueue避免了輪詢帶來的性能開銷
- 自動續(xù)期機制更加可靠
- 優(yōu)雅關(guān)閉機制(通過@PreDestroy注解)
這種實現(xiàn)類似于Redis官方客戶端Redisson的實現(xiàn)原理,更適合實際生產(chǎn)環(huán)境使用。
4. 如何使用Redission分布式鎖
4.1. Redission簡介
Redisson 是一個基于 Redis 的高性能工具庫,它簡化了 Redis 的使用,并提供了豐富的分布式工具支持,如分布式鎖、分布式集合、隊列等。在分布式鎖場景下,Redisson 封裝了鎖的創(chuàng)建、續(xù)期和釋放等邏輯,并內(nèi)置了看門狗機制,大大提升了分布式鎖的可靠性和開發(fā)效率。
Redisson 的核心功能:
- 提供可重入鎖(ReentrantLock)和公平鎖(FairLock)。
- 內(nèi)置看門狗機制,自動續(xù)期防止鎖過期。
- 支持集群、哨兵模式和單節(jié)點模式。
- 與 Spring Boot 集成簡單,支持注解形式使用。
4.2. SpringBoot使用Redission分布式鎖
1. 新增 Maven 依賴
在 Spring Boot 項目的 pom.xml
文件中,添加 Redisson 的 Maven 依賴:
<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.23.5</version> <!-- 使用最新版本 -->
</dependency>
此依賴可以幫助我們輕松將 Redisson 集成到 Spring Boot 項目中。
2. 配置類編寫
為 Redisson 創(chuàng)建一個配置類,用于初始化 Redis 連接。以下是一個基于單機模式的示例:
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient() {Config config = new Config();// 配置單節(jié)點 Redis 地址config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword(null) // 如果有密碼則填寫.setConnectionMinimumIdleSize(10).setConnectionPoolSize(64);return Redisson.create(config);}
}
如果你的 Redis 部署為集群模式或哨兵模式,可以使用以下方法:
- 集群模式:
config.useClusterServers().addNodeAddress(...)
- 哨兵模式:
config.useSentinelServers().addSentinelAddress(...)
3. 使用 Redisson 分布式鎖
下面通過一個具體的業(yè)務(wù)場景(如庫存扣減)來演示如何使用 Redisson 分布式鎖。
示例代碼:
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.concurrent.TimeUnit;@Service
public class StockService {@Autowiredprivate RedissonClient redissonClient;public void deductStock(String productId) {// 獲取鎖實例RLock lock = redissonClient.getLock("lock:stock:" + productId);try {// 加鎖,等待時間 5 秒,鎖超時時間 10 秒if (lock.tryLock(5, 10, TimeUnit.SECONDS)) {try {// 模擬業(yè)務(wù)邏輯:扣減庫存System.out.println("扣減庫存邏輯執(zhí)行中...");Thread.sleep(8000); // 模擬耗時操作System.out.println("庫存扣減成功!");} finally {lock.unlock(); // 釋放鎖System.out.println("鎖釋放成功!");}} else {System.out.println("獲取鎖失敗,可能有其他線程在執(zhí)行!");}} catch (InterruptedException e) {e.printStackTrace();}}
}
上述代碼使用 Redisson 實現(xiàn)分布式鎖,通過 tryLock
嘗試獲取商品的鎖,設(shè)置等待時間為 5 秒、鎖超時時間為 10 秒。獲取鎖后,在鎖保護下模擬扣減庫存的操作,完成后釋放鎖。如果未能獲取鎖,則提示可能有其他線程在執(zhí)行。?
5. 結(jié)語
本文從分布式鎖的核心原理入手,結(jié)合手寫實現(xiàn)的 Redis 分布式鎖與看門狗機制,深入剖析了解決鎖過期問題的設(shè)計思路。同時,我們還介紹了 Redisson 的使用方法,通過它的封裝與內(nèi)置的看門狗機制,可以更高效地實現(xiàn)分布式鎖,減少開發(fā)成本,提升可靠性。分布式鎖的實現(xiàn)既是分布式系統(tǒng)中的基礎(chǔ)問題,也是解決高并發(fā)和數(shù)據(jù)一致性挑戰(zhàn)的重要工具。希望本文的講解能為你在實際開發(fā)中提供幫助!