網(wǎng)站密碼如何找回密碼百度企業(yè)官網(wǎng)
文章目錄
- 四、阻塞隊列
- 1、基礎概念
- 1.1 生產者消費者概念
- 1.2 JUC阻塞隊列的存取方法
- 2、ArrayBlockingQueue
- 2.1 ArrayBlockingQueue的基本使用
- 2.2 生產者方法實現(xiàn)原理
- 2.2.1 ArrayBlockingQueue的常見屬性
- 2.2.2 add方法實現(xiàn)
- 2.2.3 offer方法實現(xiàn)
- 2.2.4 offer(time,unit)方法
- 2.2.5 put方法
- 2.3 消費者方法實現(xiàn)原理
- 2.3.1 remove方法
- 2.4.2 poll方法
- 2.4.3 poll(time,unit)方法
- 2.4.4 take方法
- 2.4.5 虛假喚醒
個人主頁:道友老李
歡迎加入社區(qū):道友老李的學習社區(qū)
四、阻塞隊列
1、基礎概念
1.1 生產者消費者概念
生產者消費者是設計模式的一種。讓生產者和消費者基于一個容器來解決強耦合問題。
生產者 消費者彼此之間不會直接通訊的,而是通過一個容器(隊列)進行通訊。
所以生產者生產完數(shù)據(jù)后扔到容器中,不通用等待消費者來處理。
消費者不需要去找生產者要數(shù)據(jù),直接從容器中獲取即可。
而這種容器最常用的結構就是隊列。
1.2 JUC阻塞隊列的存取方法
常用的存取方法都是來自于JUC包下的BlockingQueue
生產者存儲方法
add(E) // 添加數(shù)據(jù)到隊列,如果隊列滿了,無法存儲,拋出異常
offer(E) // 添加數(shù)據(jù)到隊列,如果隊列滿了,返回false
offer(E,timeout,unit) // 添加數(shù)據(jù)到隊列,如果隊列滿了,阻塞timeout時間,如果阻塞一段時間,依然沒添加進入,返回false
put(E) // 添加數(shù)據(jù)到隊列,如果隊列滿了,掛起線程,等到隊列中有位置,再扔數(shù)據(jù)進去,死等!
消費者取數(shù)據(jù)方法
remove() // 從隊列中移除數(shù)據(jù),如果隊列為空,拋出異常
poll() // 從隊列中移除數(shù)據(jù),如果隊列為空,返回null,么的數(shù)據(jù)
poll(timeout,unit) // 從隊列中移除數(shù)據(jù),如果隊列為空,掛起線程timeout時間,等生產者扔數(shù)據(jù),再獲取
take() // 從隊列中移除數(shù)據(jù),如果隊列為空,線程掛起,一直等到生產者扔數(shù)據(jù),再獲取
2、ArrayBlockingQueue
2.1 ArrayBlockingQueue的基本使用
ArrayBlockingQueue在初始化的時候,必須指定當前隊列的長度。
因為ArrayBlockingQueue是基于數(shù)組實現(xiàn)的隊列結構,數(shù)組長度不可變,必須提前設置數(shù)組長度信息。
public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {// 必須設置隊列的長度ArrayBlockingQueue queue = new ArrayBlockingQueue(4);// 生產者扔數(shù)據(jù)queue.add("1");queue.offer("2");queue.offer("3",2,TimeUnit.SECONDS);queue.put("2");// 消費者取數(shù)據(jù)System.out.println(queue.remove());System.out.println(queue.poll());System.out.println(queue.poll(2,TimeUnit.SECONDS));System.out.println(queue.take());
}
2.2 生產者方法實現(xiàn)原理
生產者添加數(shù)據(jù)到隊列的方法比較多,需要一個一個查看
2.2.1 ArrayBlockingQueue的常見屬性
ArrayBlockingQueue中的成員變量
lock = 就是一個ReentrantLock
count = 就是當前數(shù)組中元素的個數(shù)
iterms = 就是數(shù)組本身
# 基于putIndex和takeIndex將數(shù)組結構實現(xiàn)為了隊列結構
putIndex = 存儲數(shù)據(jù)時的下標
takeIndex = 去數(shù)據(jù)時的下標
notEmpty = 消費者掛起線程和喚醒線程用到的Condition(看成sync的wait和notify)
notFull = 生產者掛起線程和喚醒線程用到的Condition(看成sync的wait和notify)
2.2.2 add方法實現(xiàn)
add方法本身就是調用了offer方法,如果offer方法返回false,直接拋出異常
public boolean add(E e) {if (offer(e))return true;else// 拋出的異常throw new IllegalStateException("Queue full");
}
2.2.3 offer方法實現(xiàn)
public boolean offer(E e) {// 要求存儲的數(shù)據(jù)不允許為null,為null就拋出空指針checkNotNull(e);// 當前阻塞隊列的lock鎖final ReentrantLock lock = this.lock;// 為了保證線程安全,加鎖lock.lock();try {// 如果隊列中的元素已經(jīng)存滿了,if (count == items.length)// 返回falsereturn false;else {// 隊列沒滿,執(zhí)行enqueue將元素添加到隊列中enqueue(e);// 返回truereturn true;}} finally {// 操作完釋放鎖lock.unlock();}
}//==========================================================
private void enqueue(E x) {// 拿到數(shù)組的引用final Object[] items = this.items;// 將元素放到指定位置items[putIndex] = x;// 對inputIndex進行++操作,并且判斷是否已經(jīng)等于數(shù)組長度,需要歸位if (++putIndex == items.length)// 將索引設置為0putIndex = 0;// 元素添加成功,進行++操作。count++;// 將一個Condition中阻塞的線程喚醒。notEmpty.signal();
}
2.2.4 offer(time,unit)方法
生產者在添加數(shù)據(jù)時,如果隊列已經(jīng)滿了,阻塞一會。
- 阻塞到消費者消費了消息,然后喚醒當前阻塞線程
- 阻塞到了time時間,再次判斷是否可以添加,不能,直接告辭。
// 如果線程在掛起的時候,如果對當前阻塞線程的中斷標記位進行設置,此時會拋出異常直接結束
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {// 非空檢驗checkNotNull(e);// 將時間單位轉換為納秒long nanos = unit.toNanos(timeout);// 加鎖final ReentrantLock lock = this.lock;// 允許線程中斷并排除異常的加鎖方式lock.lockInterruptibly();try {// 為什么是while(虛假喚醒)// 如果元素個數(shù)和數(shù)組長度一致,隊列慢了while (count == items.length) {// 判斷等待的時間是否還充裕if (nanos <= 0)// 不充裕,直接添加失敗return false;// 掛起等待,會同時釋放鎖資源(對標sync的wait方法)// awaitNanos會掛起線程,并且返回剩余的阻塞時間// 恢復執(zhí)行時,需要重新獲取鎖資源nanos = notFull.awaitNanos(nanos);}// 說明隊列有空間了,enqueue將數(shù)據(jù)扔到阻塞隊列中enqueue(e);return true;} finally {// 釋放鎖資源lock.unlock();}
}
2.2.5 put方法
如果隊列是滿的, 就一直掛起,直到被喚醒,或者被中斷
public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length)// await方法一直阻塞,直到被喚醒或者中斷標記位notFull.await();enqueue(e);} finally {lock.unlock();}
}
2.3 消費者方法實現(xiàn)原理
2.3.1 remove方法
// remove方法就是調用了poll
public E remove() {E x = poll();// 如果有數(shù)據(jù),直接返回if (x != null)return x;// 沒數(shù)據(jù)拋出異常elsethrow new NoSuchElementException();
}
2.4.2 poll方法
// 拉取數(shù)據(jù)
public E poll() {// 加鎖操作final ReentrantLock lock = this.lock;lock.lock();try {// 如果沒有數(shù)據(jù),直接返回null,如果有數(shù)據(jù),執(zhí)行dequeue,取出數(shù)據(jù)并返回return (count == 0) ? null : dequeue();} finally {lock.unlock();}
}//==========================================================
// 取出數(shù)據(jù)
private E dequeue() {// 將成員變量引用到局部變量final Object[] items = this.items;// 直接獲取指定索引位置的數(shù)據(jù)E x = (E) items[takeIndex];// 將數(shù)組上指定索引位置設置為nullitems[takeIndex] = null;// 設置下次取數(shù)據(jù)時的索引位置if (++takeIndex == items.length)takeIndex = 0;// 對count進行--操作count--;// 迭代器內容,先跳過if (itrs != null)itrs.elementDequeued();// signal方法,會喚醒當前Condition中排隊的一個Node。// signalAll方法,會將Condition中所有的Node,全都喚醒notFull.signal();// 返回數(shù)據(jù)。return x;
}
2.4.3 poll(time,unit)方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException {// 轉換時間單位long nanos = unit.toNanos(timeout);// 競爭鎖final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {// 如果沒有數(shù)據(jù)while (count == 0) {if (nanos <= 0)// 沒數(shù)據(jù),也無法阻塞了,返回nullreturn null;// 沒數(shù)據(jù),掛起消費者線程nanos = notEmpty.awaitNanos(nanos);}// 取數(shù)據(jù)return dequeue();} finally {lock.unlock();}
}
2.4.4 take方法
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {// 虛假喚醒while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}
}
2.4.5 虛假喚醒
阻塞隊列中,如果需要線程掛起操作,判斷有無數(shù)據(jù)的位置采用的是while循環(huán) ,為什么不能換成if
肯定是不能換成if邏輯判斷
線程A,線程B,線程E,線程C。 其中ABE生產者,C屬于消費者
假如線程的隊列是滿的
// E,拿到鎖資源,還沒有走while判斷
while (count == items.length)// A醒了// B掛起notFull.await();
enqueue(e);
C此時消費一條數(shù)據(jù),執(zhí)行notFull.signal()喚醒一個線程,A線程被喚醒
E走判斷,發(fā)現(xiàn)有空余位置,可以添加數(shù)據(jù)到隊列,E添加數(shù)據(jù),走enqueue
如果判斷是if,A在E釋放鎖資源后,拿到鎖資源,直接走enqueue方法。
此時A線程就是在putIndex的位置,覆蓋掉之前的數(shù)據(jù),造成數(shù)據(jù)安全問題