做曖曖視頻網(wǎng)站安全嗎it培訓(xùn)機構(gòu)哪個好一點
- 生產(chǎn)者和消費者概念
- 基于BlockingQueue的生產(chǎn)者消費者模型
- 全部代碼
生產(chǎn)者和消費者概念
生產(chǎn)者消費者模式就是通過一個容器來解決生產(chǎn)者和消費者的強耦合問題。
生產(chǎn)者和消費者彼此之間不直接通訊,而通過這個容器來通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費者處理,直接將生產(chǎn)的數(shù)據(jù)放到這個容器當(dāng)中,消費者也不用找生產(chǎn)者要數(shù)據(jù),而是直接從這個容器里取數(shù)據(jù),這個容器就相當(dāng)于一個緩沖區(qū),平衡了生產(chǎn)者和消費者的處理能力,這個容器實際上就是用來給生產(chǎn)者和消費者解耦的。
生產(chǎn)者消費者模型的特點
生產(chǎn)者消費者模型是多線程同步與互斥的一個經(jīng)典場景,其特點如下:
- 三種關(guān)系: 生產(chǎn)者和生產(chǎn)者(互斥關(guān)系)、消費者和消費者(互斥關(guān)系)、生產(chǎn)者和消費者(互斥關(guān)系、同步關(guān)系)。
- 兩種角色: 生產(chǎn)者和消費者。(通常由進程或線程承擔(dān))
- 一個交易場所: 通常指的是內(nèi)存中的一段緩沖區(qū)。(可以自己通過某種方式組織起來)
生產(chǎn)者和生產(chǎn)者、消費者和消費者、生產(chǎn)者和消費者,它們之間為什么會存在互斥關(guān)系?
因為在生產(chǎn)者和消費者之間存在多種執(zhí)行流同時訪問的問題,,因此我們需要將他們同時訪問的臨界區(qū)進行加互斥保護起來
其中,所有的生產(chǎn)者和消費者都會競爭式的申請鎖,因此生產(chǎn)者和生產(chǎn)者、消費者和消費者、生產(chǎn)者和消費者之間都存在互斥關(guān)系。
生產(chǎn)者和消費者之間為什么會存在同步關(guān)系?
- 如果讓生產(chǎn)者一直生產(chǎn),那么當(dāng)生產(chǎn)者生產(chǎn)的數(shù)據(jù)將容器塞滿后,生產(chǎn)者再生產(chǎn)數(shù)據(jù)就會生產(chǎn)失敗。
- 反之,讓消費者一直消費,那么當(dāng)容器當(dāng)中的數(shù)據(jù)被消費完后,消費者再進行消費就會消費失敗。
雖然這樣不會造成任何數(shù)據(jù)不一致的問題,但是這樣會引起另一方的饑餓問題,是非常低效的。我們應(yīng)該讓生產(chǎn)者和消費者訪問該容器時具有一定的順序性,比如讓生產(chǎn)者先生產(chǎn),然后再讓消費者進行消費。
- 注意: 互斥關(guān)系保證的是數(shù)據(jù)的正確性,而同步關(guān)系是為了讓多線程之間協(xié)同起來。
基于BlockingQueue的生產(chǎn)者消費者模型
當(dāng)多個生產(chǎn)者,消費者同時出現(xiàn)進行搶占線程時,我們可以使用BlockingQueue來進行緩沖,如圖
其與普通的隊列的區(qū)別在于:
- 當(dāng)隊列為空時,從隊列獲取元素的操作將會被阻塞,直到隊列中放入了元素。
- 當(dāng)隊列滿時,往隊列里存放元素的操作會被阻塞,直到有元素從隊列中取出。
知識聯(lián)系: 看到以上阻塞隊列的描述,我們很容易想到的就是管道,而阻塞隊列最典型的應(yīng)用場景實際上就是管道的實現(xiàn)。
put為生產(chǎn)者,take為消費者
全部代碼
task.hpp 用于實現(xiàn)打印和計算
#pragma once
#include <iostream>
#include <string>class Task
{
public:Task(){}Task(int x,int y,char op):_x(x),_y(y),_op(op),_result(0),_exitCode(0){}void operator()(){switch(_op){case '+' :_result = _x + _y;break;case '-':_result = _x - _y;break;case '*':_result = _x * _y;break;case '/':{if (_y == 0)_exitCode = -1;else_result = _x / _y;}break;case '%':{if (_y == 0)_exitCode = -2;else_result = _x % _y;}break;default:break;}}std::string formatArg(){return std::to_string(_x) + _op + std::to_string(_y) + "="; }std::string formatRes(){return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")";}~Task(){}
private:int _x;int _y;char _op;//輸入的符號int _result;int _exitCode;
};
blockQueue.hpp 維護線程之間的同步
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>const int gcap = 5;//最大容量
template<class T>
class BlockQueue
{
public:BlockQueue(const int cap = gcap):_cap(gcap){pthread_mutex_init(&_mutex,nullptr);//初始化互斥量//初始化用戶和生產(chǎn)者的條件變量pthread_cond_init(&_consumerCond,nullptr);pthread_cond_init(&_consumerCond,nullptr);}//判斷是否為慢bool isFull(){return _q.size() == _cap;}//判斷是否為空bool isEmpty(){return _q.empty();}//插入void push(const T &in){pthread_mutex_lock(&_mutex);//細(xì)節(jié)1:一定要保證,在任何時候,都要符合條件,才進行生產(chǎn)while(isFull()){//1 我們只能在臨界區(qū)內(nèi)部,判斷臨界區(qū)資源是否就緒!注定了我們在當(dāng)前一定持有鎖。//2 要讓線程進行休眠等待,不能持有鎖等待//3 注定了pthread_cond_wait要有鎖的釋放能力pthread_cond_wait(&_productCond,&_mutex);// 4. 當(dāng)線程醒來的時候,注定了繼續(xù)從臨界區(qū)內(nèi)部繼續(xù)運行!因為我是在臨界區(qū)被切走的!// 5. 注定了當(dāng)線程被喚醒的時候,繼續(xù)在pthread_cond_wait函數(shù)出向后運行,又要重新申請鎖,申請成功才會徹底返回}// 沒有滿,就要讓他繼續(xù)運行_q.push(in);//加策略pthread_cond_signal(&_consumerCond);pthread_mutex_unlock(&_mutex);}//取出刪除void pop(T* out){pthread_mutex_lock(&_mutex);while(isEmpty()) {pthread_cond_wait(&_consumerCond, &_mutex);}*out = _q.front();_q.pop();// 加策略pthread_cond_signal(&_productCond);pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_consumerCond);pthread_cond_destroy(&_productCond);}
private:std::queue<T> _q;int _cap;//生產(chǎn)容量//為什么我們這份代碼只有一個鎖,根本原因在于//我們生產(chǎn)者和消費者訪問的是同一個queue && queue 被當(dāng)作整體使用pthread_mutex_t _mutex;pthread_cond_t _consumerCond;//消費者對應(yīng)的條件變量pthread_cond_t _productCond;//生產(chǎn)者對應(yīng)的條件變量
};
main.cc
#include <iostream>
#include "task.hpp"
#include "blockQueue.hpp"
#include <pthread.h>
#include <ctime>
#include <unistd.h>void* consumer(void* args)
{BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(args);while(1){Task t;// 1. 將數(shù)據(jù)從blockqueue中獲取 -- 獲取到了數(shù)據(jù)bq->pop(&t);t();// 2. 結(jié)合某種業(yè)務(wù)邏輯,處理數(shù)據(jù)! -- TODOstd::cout << pthread_self() << " | consumer data: " << t.formatArg() << t.formatRes() << std::endl;}
}void *productor(void *args)
{BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);std::string opers = "+-*/%";while(1){int x = rand()%20 +1;int y = rand()%20 +1;char op = opers[rand() % opers.size()];Task t(x,y,op);bq->push(t);std::cout << pthread_self() << " | productor Task: " << t.formatArg() << "?" << std::endl;}
}int main()
{BlockQueue<Task>*bq = new BlockQueue<Task>();pthread_t c[2], p[3];pthread_create(&c[0], nullptr, consumer, bq);pthread_create(&c[1], nullptr, consumer, bq);pthread_create(&p[0], nullptr, productor, bq);pthread_create(&p[1], nullptr, productor, bq);pthread_create(&p[2], nullptr, productor, bq);pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);pthread_join(p[2], nullptr);delete bq;return 0;return 0;
}