聚合廣告聯(lián)盟公司網(wǎng)站優(yōu)化
什么是阻塞隊列
阻塞隊里是在普通的隊列(先進先出隊列)基礎(chǔ)上,做出了擴充
- 線程安全
- 標(biāo)準(zhǔn)庫中原有的隊列 Queue 和其子類,默認(rèn)都是線程不安全的
- 具有阻塞特性
- 如果隊列為空,進行出隊列操作,此時就會出現(xiàn)阻塞。一直阻塞到其他線程往隊列里添加元素為止
- 如果隊列滿了,進行入隊列操作,此時就會出現(xiàn)阻塞。一直阻塞到其他線程從隊列里取走元素為止
基于阻塞隊列,最大的應(yīng)用場景,就是實現(xiàn)“生產(chǎn)者消費者模型”(日常開發(fā)中,常見的編程手法)
生產(chǎn)者消費者模型
比如:
小豬佩奇一家準(zhǔn)備包餃子,成員有佩奇,豬爸爸和豬媽媽,外加一個桌子
- 佩奇負(fù)責(zé)搟面皮
- 豬爸爸和豬媽媽負(fù)責(zé)包餃子
- 桌子用來放你搟好的面皮
每次佩奇搟好一個面皮后,就放在桌子上,豬爸爸和豬媽媽就用這個面皮包出一個餃子
此時:
- 佩奇就是面皮的生產(chǎn)者——生產(chǎn)者
- 豬爸爸和豬媽媽就是面皮的消費者——消費者
- 桌子就是阻塞隊列——阻塞隊列
為什么是是阻塞隊列而不是普通隊列?
因為阻塞隊列可以很好的協(xié)調(diào)生產(chǎn)者和消費者
- 若佩奇搟面皮很快,不一會桌子上就滿了
- 阻塞隊列:佩奇就休息一下,等面皮被消耗一些之后繼續(xù)再搟
- 普通隊列:不會停,放不下了也一直搟
- 若豬爸爸和豬媽媽包的很快,不一會桌子上就空了
- 阻塞隊列:豬爸爸和豬媽媽休息一下,等到面皮搟出來之后再包
- 普通隊列:不會停,沒面皮了也一直包
好處
上述生產(chǎn)者消費者模型在后端開發(fā)中,經(jīng)常會涉及到
當(dāng)下后端開發(fā),常見的結(jié)構(gòu)——“分布式系統(tǒng)”,不是一臺服務(wù)器解決所有問題,而是分成了多個服務(wù)器,服務(wù)器之間相互調(diào)用
主要有兩方面的好處
1. 服務(wù)器之間解耦合
我們希望見到“低耦合”
- 模塊之間的關(guān)聯(lián)程度/影響程度
通常談到的“阻塞隊列”是代碼中的一個數(shù)據(jù)結(jié)構(gòu)
但是由于這個東西太好用了,以至于會把這樣的數(shù)據(jù)結(jié)構(gòu)單獨封裝成一個服務(wù)器程序,并且在單獨的服務(wù)器機器上進行部署
此時,這樣的餓阻塞隊列有了一個新的名字,“消息隊列”(Message Queue,MQ)
如果是直接調(diào)用:
- 編寫 A 和 B 代碼中,會出現(xiàn)很多對方服務(wù)器相關(guān)的代碼
- 并且,此時如果 B 服務(wù)器掛了,A 可能也會直接受到影響
- 再并且,如果后續(xù)想加入一個 C 服務(wù)器,此時對 A 的改動就很大
如果是通過阻塞隊列:
- A 之和隊列通信
- B 也只和隊列通信
- A 和 B 互相不知道對方的存在,代碼中就更沒有對方的影子
看起來,A 和 B 之間是解耦合了,但是 A 和隊列,B 和隊列之間,不是引入了新的耦合嗎?- 耦合的代碼,在后續(xù)的變更工程中,比較復(fù)雜,容易產(chǎn)生 bug
- 但消息隊列是成熟穩(wěn)定的產(chǎn)品,代碼是穩(wěn)定的,不會頻繁更改。A、B 和隊列之間的耦合,對我們的影響微乎其微
- 再增加 C 服務(wù)器也很方便,也不會影響到原有的 A 和 B 服務(wù)器
2. “削峰填谷”的效果
通過中間的阻塞隊列,可以起到削峰填谷的效果,在遇到請求量激增突發(fā)的情況下,可以有效保護下游服務(wù)器,不會被請求沖垮
阻塞隊列的作用就相當(dāng)與三峽大壩在三峽的防汛作用
- A 向隊列中寫入數(shù)據(jù)變快了,但是 B 仍然可以按照原有的速度來消費數(shù)據(jù)
- 阻塞隊列扛下了這樣的壓力,就像三峽大壩抗住上游的大量水量的壓力
- 如果是直接調(diào)用,A 收到多少請求,B 也收到多少,那很可能直接就把 B 給搞掛了
- 當(dāng) A 不再寫入數(shù)據(jù)的時候,但隊列中還存有數(shù)據(jù),可以繼續(xù)工給 B
問題
- 為啥一個服務(wù)器,收到的請求變多,就容易掛?
- 一臺服務(wù)器,就是一臺“電腦”,上面就提供了一些硬件資源(包括但不限于 CPU,內(nèi)存,硬盤,網(wǎng)絡(luò)帶寬…)
- 就算你這個及其配置再好,硬件資源也是有限的
- 服務(wù)器每次收到一個請求,處理這個請求的過程,就都需要執(zhí)行一系列的代碼,在執(zhí)行這些代碼的過程中,就需要消耗一定的硬件資源(CPU,內(nèi)存,硬盤,網(wǎng)絡(luò)帶寬…)
- 這些請求小號的總的硬件資源的量,超過了及其能提供的上限,那么此時機器就會出現(xiàn)(卡死,程序直接崩潰等…)
- 在請求激增的時候,A 為啥不會掛?隊列為啥不會掛?反而是 B 更容易掛呢?
- A 的角色是一個“網(wǎng)關(guān)服務(wù)器”,收到客戶端的請求,再把請求轉(zhuǎn)發(fā)給其他的服務(wù)器
- 這樣的服務(wù)器里的代碼,做的工作比較簡單(單純的數(shù)據(jù)轉(zhuǎn)發(fā)),消耗的硬件資源通常更少
- 處理一個請求,消耗的資源更少,同樣的配置下,就能支持更多的請求處理
- 同理,隊列其實也是比較簡單的程序,單位請求消耗的硬件資源,也是比較少見的
- B 這個服務(wù)器,是真正干活的服務(wù)器,要真正完成一系列的業(yè)務(wù)邏輯
- 這一系列的工作,代碼量非常龐大,消耗的時間很多,消耗的系統(tǒng)硬件資源,也是更多的
類似的,像 MySQL 這樣的數(shù)據(jù)庫,處理每個請求的時候,做的工作就是比較多的,消耗的硬件資源也是比較多的,因此 MySQL 也是后端系統(tǒng)中,容易掛的部分
對應(yīng)的,像 Redis 這種內(nèi)存數(shù)據(jù)庫,處理請求,做的工作遠遠少于 MySQL,消耗的資源更少,Redis 就比 MySQL 硬朗很多,不容易掛
代價
- 需要更多的機器來部署這樣的消息隊列(小代價)
- A 和 B 之間的通信延遲會變長
- 對于 A 和 B 之間的調(diào)用,要求響應(yīng)時間比較短就不太適合了
每個技術(shù)都有優(yōu)缺點,不能無腦吹,也不能無腦黑
比如:微服務(wù)
- 本質(zhì)上就是把分布式系統(tǒng)服務(wù)拆的更細(xì)了,每個服務(wù)都很小,只做一項功能
- 非常適合大公司,部門分的很細(xì)
- 但需要更多的機器,處理請求需要更多的響應(yīng)時間,更復(fù)雜的后端結(jié)構(gòu),運維成本水漲船高
Java 自帶的阻塞隊列
阻塞隊列在 Java 標(biāo)準(zhǔn)庫中也提供了現(xiàn)成的封裝——BlockingQueue
BlockingQueue
本質(zhì)上是一個接口,不能直接new
,只能new
一個類- 因為是繼承與
Queue
,所以Queue
的一些操作,offer
、poll
這些,在BlockingQueue
中同樣可以使用(不過不建議使用,因為都不能阻塞)BlockingQueue
提供了另外兩個專屬方法,都能阻塞
put
——入列take
——出隊列
BlockingQueue<String> queue = new ArrayBlockingQueue<>(1000);
capacity 指的是容量,是一個需要加上的參數(shù)
public class Demo10 { public static void main(String[] args) throws InterruptedException { BlockingQueue<String> queue = new ArrayBlockingQueue<>(3); queue.put("111"); System.out.println("put成功"); queue.put("111"); System.out.println("put成功"); }
}
//運行結(jié)果
put成功
put成功
put成功
- 只打印了三個,說明第四次 put 的時候容量不夠,阻塞了
public class Demo10 { public static void main(String[] args) throws InterruptedException { BlockingQueue<String> queue = new ArrayBlockingQueue<>(3); queue.put("111"); System.out.println("put 成功"); queue.put("111"); System.out.println("put 成功"); queue.take(); System.out.println("take 成功"); queue.take(); System.out.println("take 成功"); queue.take(); System.out.println("take 成功"); }
}
//運行結(jié)果
put 成功
put 成功
take 成功
take 成功
- 由于只有
put
了兩次,所以也只有兩次take
,隨后阻塞住了
public class Demo11 { public static void main(String[] args) { BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1000); Thread t1 = new Thread(() -> { int i = 1; while(true){ try { queue.put(i); System.out.println("生產(chǎn)者元素"+i); i++; Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); Thread t2 = new Thread(() -> { while(true) { try { Integer i = queue.take(); System.out.println("消費者元素"+i); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); t1.start(); t2.start(); }
}
- 上述程序中,一個線程生產(chǎn),一個線程消費
- 實際開發(fā)中,通??赡苁嵌鄠€線程生產(chǎn),多個線程消費
自己實現(xiàn)一個阻塞隊列
普通隊列
基于數(shù)組的隊列
實現(xiàn)一個基礎(chǔ)的隊列
//此處不考慮泛型參數(shù),只是基于 String 進行存儲
class MyBlockingQueue { private String[] data = null; private int head = 0; private int tail = 0; private int size = 0; public MyBlockingQueue(int capacity) { data = new String[capacity]; } public void put(String s) { if(size == data.length) { //隊列滿了 return; } data[tail] = s; tail++; if(tail >= data.length){ tail = 0; } size++; } public String take() { if(size == 0) { //隊列為空 return null; } String ret = data[head]; head++; if(head >= data.length){ head = 0; } size--; return ret; }
}
阻塞隊列
- 隊列為空,
take
就要阻塞,在其他線程put
的時候喚醒 - 隊列未滿,
put
就要阻塞,在其他線程take
的時候喚醒
//此處不考慮泛型參數(shù),只是基于 String 進行存儲
class MyBlockingQueue { private String[] data = null; private int head = 0; private int tail = 0; private int size = 0; private Object locker = new Object(); public MyBlockingQueue(int capacity) { data = new String[capacity]; } public void put(String s) throws InterruptedException { //加鎖的對象,可以單獨定義一個,也可以直接就地使用this synchronized (locker) { if (size == data.length) { //隊列滿了,需要阻塞 //return; locker.wait(); } data[tail] = s; tail++; if (tail >= data.length) { tail = 0; } size++; //喚醒 take 的阻塞 locker.notify(); } } public String take() throws InterruptedException { String ret = ""; synchronized (locker) { if (size == 0) { //隊列為空,需要阻塞 //return null; locker.wait(); } ret = data[head]; head++; if (head >= data.length) { head = 0; } size--; //喚醒 put 的阻塞 locker.notify(); } return ret; }
}