在哪個(gè)網(wǎng)做免費(fèi)網(wǎng)站好百度客服中心人工在線電話
文章目錄
- 一、POSIX 信號(hào)量
- 二、POSIX 信號(hào)量的接口
- 2.1 sem_init——初始化信號(hào)量
- 2.2 sem_destroy——銷毀信號(hào)量
- 2.3 sem_wait——等待信號(hào)量
- 2.4 sem_post——發(fā)布信號(hào)量
- 三、基于環(huán)形隊(duì)列的生產(chǎn)消費(fèi)者模型
- 3.1 單生產(chǎn)單消費(fèi)模型
- 3.2 多生產(chǎn)多消費(fèi)模型
- 3.3 基于任務(wù)的多生產(chǎn)多消費(fèi)模型
- 四、結(jié)語
一、POSIX 信號(hào)量
共享資源也可以被看成多份,只要規(guī)定好每個(gè)線程的訪問區(qū)域即可,此時(shí)就可以讓多線程去并發(fā)的訪問臨界資源。
POSIX
信號(hào)量和 SystemV
信號(hào)量作用相同,都是用于同步操作,達(dá)到無沖突的訪問共享資源的目的。但 POSIX
可以用于線程間同步。信號(hào)量本質(zhì)是一把計(jì)數(shù)器,用來描述可用資源數(shù)目的,申請信號(hào)量時(shí),其實(shí)就已經(jīng)在間接的做判斷,看資源是否就緒了,只要申請到信號(hào)量,那么說明資源一定是就緒的。
信號(hào)量只能保證,不讓多余的線程來訪問共享資源,即,當(dāng)前共享資源有十份,信號(hào)量不會(huì)允許同時(shí)有十一個(gè)線程來訪問臨界資源。但是具體的資源分配是通過程序員編碼去實(shí)現(xiàn)的。如果出現(xiàn)一個(gè)共享資源同時(shí)被兩個(gè)線程訪問,就屬于程序員的編碼 Bug。
二、POSIX 信號(hào)量的接口
2.1 sem_init——初始化信號(hào)量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
-
sem
:要初始化的信號(hào)量 -
pshared
:0表示線程間共享,非0表示進(jìn)程間共享。 -
value
:信號(hào)量初始值
2.2 sem_destroy——銷毀信號(hào)量
int sem_destroy(sem_t *sem);
2.3 sem_wait——等待信號(hào)量
int sem_wait(sem_t *sem); //P()
- 功能:會(huì)將信號(hào)量的值減1
2.4 sem_post——發(fā)布信號(hào)量
int sem_post(sem_t *sem);//V()
- 功能:發(fā)布信號(hào)量,表示資源使用完畢,可以歸還資源了。將信號(hào)量的值加1
三、基于環(huán)形隊(duì)列的生產(chǎn)消費(fèi)者模型
只要生產(chǎn)和消費(fèi)不訪問同一個(gè)格子,那么生產(chǎn)和消費(fèi)就可以同時(shí)進(jìn)行。那生產(chǎn)和消費(fèi)什么時(shí)候會(huì)指向同一個(gè)數(shù)據(jù)呢?答案是隊(duì)列為空和為滿的時(shí)候。
基于環(huán)形隊(duì)列的生產(chǎn)消費(fèi)者模型必須遵守以下三個(gè)原則:
-
當(dāng)生產(chǎn)和消費(fèi)指向同一個(gè)資源的時(shí)候,只能一個(gè)人訪問。為空的時(shí)候,由生產(chǎn)者去訪問;為滿的時(shí)候,由消費(fèi)者去訪問
-
消費(fèi)者不能超過生產(chǎn)者
-
生產(chǎn)者不能把消費(fèi)者套圈,因?yàn)檫@樣會(huì)導(dǎo)致數(shù)據(jù)被覆蓋
生產(chǎn)者最關(guān)心還剩多少空間(空間數(shù)量);消費(fèi)者最關(guān)系還剩多少數(shù)據(jù)(數(shù)據(jù)數(shù)量)。因?yàn)橛袃煞N資源,所以需要定義兩個(gè)信號(hào)量。
3.1 單生產(chǎn)單消費(fèi)模型
// RingQueue.hpp
#pragma once#include <pthread.h>
#include <vector>
#include <semaphore.h>template<class T>
class RingQueue
{
private:static const int defaultcap = 5;void P(sem_t *sem) // 申請一個(gè)信號(hào)量{sem_wait(sem); }void V(sem_t *sem) // 歸還一個(gè)信號(hào)量{sem_post(sem);}
public:RingQueue(int cap = defaultcap):ringqueue_(cap), cap_(cap), c_step(0), p_step(0){sem_init(&cdata_sem, 0, 0);sem_init(&pspace_sem, 0, cap_);}void Push(const T &data) // 生產(chǎn)行為{P(&pspace_sem);ringqueue_[p_step] = data;V(&cdata_sem);p_step++;p_step %= cap_;}void Pop(T *out) // 消費(fèi)行為{P(&cdata_sem);*out = ringqueue_[c_step];V(&pspace_sem);c_step++;c_step %= cap_;}~RingQueue(){sem_destroy(&cdata_sem);sem_destroy(&pspace_sem);}
private:std::vector<T> ringqueue_; // 環(huán)形隊(duì)列int cap_; // 容量int c_step; // 消費(fèi)者下一個(gè)要消費(fèi)的位置int p_step; // 生產(chǎn)者下一個(gè)要生產(chǎn)的位置sem_t cdata_sem; // 數(shù)據(jù)資源sem_t pspace_sem; // 空間資源
};
// main.cc
#include "RingQueue.hpp"
#include <iostream>
#include <unistd.h>using namespace std;void *Consumer(void *args)
{RingQueue<int> *rq = static_cast<RingQueue<int>*>(args);while(true){int data = 0;rq->Pop(&data);cout << "Consumer is running... get a data: " << data << endl;// 模擬處理數(shù)據(jù)usleep(1000000);}
}void *Productor(void *args)
{RingQueue<int> *rq = static_cast<RingQueue<int>*>(args);while(true){// 獲取數(shù)據(jù)usleep(10000); // 模擬獲取數(shù)據(jù)int data = rand() % 10;rq->Push(data);cout << "Productor is running... produce a data: " << data << endl;}
}int main()
{srand((unsigned int)time(nullptr));pthread_t c, p;RingQueue<int> *rq = new RingQueue<int>();pthread_create(&c, nullptr, Consumer, rq);pthread_create(&p, nullptr, Productor, rq);pthread_join(c, nullptr);pthread_join(p, nullptr);return 0;
}
互斥與同步的體現(xiàn):當(dāng)生產(chǎn)下標(biāo)和消費(fèi)下標(biāo)相同的時(shí)候,只允許一個(gè)來訪問,這就是互斥性的體現(xiàn)。當(dāng)隊(duì)列為空的時(shí)候,讓生產(chǎn)者去訪問資源,當(dāng)隊(duì)列為滿的時(shí)候,讓消費(fèi)者去訪問資源,這就是在指向同一個(gè)位置時(shí),讓生產(chǎn)和消費(fèi)具有一定的順序性,這就是同步性的體現(xiàn)。當(dāng)隊(duì)列不為空或不為滿的時(shí)候,生產(chǎn)下標(biāo)和消費(fèi)下標(biāo)不同,此時(shí)兩個(gè)線程并發(fā)執(zhí)行,并沒有體現(xiàn)出很強(qiáng)的互斥特性。
3.2 多生產(chǎn)多消費(fèi)模型
此時(shí)需要對下標(biāo)資源進(jìn)行保護(hù)。因?yàn)樯a(chǎn)下標(biāo)和消費(fèi)下標(biāo)各自只有一份,不允許同時(shí)有多個(gè)生產(chǎn)線程去訪問生產(chǎn)下標(biāo),消費(fèi)線程也一樣。因此需要通過加鎖來實(shí)現(xiàn)生產(chǎn)線程之間的互斥和消費(fèi)線程之間的互斥。
先加鎖還是先申請信號(hào)量?答案是先申請信號(hào)量,以生產(chǎn)線程為例,這樣可以讓所有生產(chǎn)線程并發(fā)的去執(zhí)行,什么意思呢?如果是先加鎖再申請信號(hào)量的話,因?yàn)槭冀K只有一個(gè)生產(chǎn)者線程能夠申請到鎖,所以也就只有一個(gè)生產(chǎn)者線程能去申請信號(hào)量,其他生產(chǎn)者線程只能干巴巴的等待鎖被釋放。這時(shí)申請鎖和申請信號(hào)量的動(dòng)作是串行的。而先申請信號(hào)量的話,可以保證雖然只有一個(gè)線程能夠申請到鎖,但是其他沒有鎖的線程也可以不用閑著,可以先去申請信號(hào)量,因?yàn)樾盘?hào)量的申請是原子的,因此也不需要加鎖進(jìn)行保護(hù),只要能申請到信號(hào)量,就說明資源還有,此時(shí)那些申請到信號(hào)量的線程就可能等待鎖被釋放,拿到鎖之后就可以去執(zhí)行相應(yīng)的代碼了。
// RingQueue.hpp
#pragma once#include <pthread.h>
#include <vector>
#include <semaphore.h>
#include <string>template<class T>
class RingQueue
{
private:static const int defaultcap = 5;void P(sem_t *sem) // 申請一個(gè)信號(hào)量{sem_wait(sem); }void V(sem_t *sem) // 歸還一個(gè)信號(hào)量{sem_post(sem);}void Lock(pthread_mutex_t *mutex){pthread_mutex_lock(mutex);}void Unlock(pthread_mutex_t *mutex){pthread_mutex_unlock(mutex);}
public:RingQueue(int cap = defaultcap):ringqueue_(cap), cap_(cap), c_step(0), p_step(0){sem_init(&cdata_sem, 0, 0);sem_init(&pspace_sem, 0, cap_);pthread_mutex_init(&c_mutex, nullptr);pthread_mutex_init(&p_mutex, nullptr);}void Push(const T &data) // 生產(chǎn)行為{P(&pspace_sem);Lock(&p_mutex);ringqueue_[p_step] = data;p_step++;p_step %= cap_;Unlock(&p_mutex);V(&cdata_sem);}void Pop(T *out) // 消費(fèi)行為{P(&cdata_sem); // 信號(hào)量資源是不需要保護(hù)的,因?yàn)樗牟僮魇窃拥?#xff0c;臨界區(qū)中的代碼要盡可能的少,所以不需要把信號(hào)量的申請放在加鎖之后Lock(&c_mutex);*out = ringqueue_[c_step];c_step++;c_step %= cap_;Unlock(&c_mutex);V(&pspace_sem);}~RingQueue(){sem_destroy(&cdata_sem);sem_destroy(&pspace_sem);pthread_mutex_destroy(&c_mutex);pthread_mutex_destroy(&p_mutex);}
private:std::vector<T> ringqueue_; // 環(huán)形隊(duì)列int cap_; // 容量int c_step; // 消費(fèi)者下一個(gè)要消費(fèi)的位置int p_step; // 生產(chǎn)者下一個(gè)要生產(chǎn)的位置sem_t cdata_sem; // 數(shù)據(jù)資源sem_t pspace_sem; // 空間資源pthread_mutex_t c_mutex; // 對消費(fèi)下標(biāo)的保護(hù)pthread_mutex_t p_mutex; // 對生產(chǎn)下標(biāo)的保護(hù)
};template <class T>
class Message
{
public:Message(std::string thread_name, RingQueue<T> *ringqueue):thread_name_(thread_name), ringqueue_(ringqueue){}std::string &get_thread_name(){return thread_name_;}RingQueue<T> *get_ringqueue(){return ringqueue_;}
private:std::string thread_name_;RingQueue<T> *ringqueue_;
};
// main.cc
#include "RingQueue.hpp"
#include <iostream>
#include <unistd.h>
#include <vector>using namespace std;void *Consumer(void *args)
{Message<int> *message = static_cast<Message<int> *>(args);RingQueue<int> *rq = message->get_ringqueue();string name = message->get_thread_name();while (true){int data = 0;rq->Pop(&data);printf("%s is running... get a data: %d\n", name.c_str(), data);// 模擬處理數(shù)據(jù)// usleep(1000000);}
}void *Productor(void *args)
{Message<int> *message = static_cast<Message<int> *>(args);RingQueue<int> *rq = message->get_ringqueue();string name = message->get_thread_name();while (true){// 獲取數(shù)據(jù)// usleep(1000000); // 模擬獲取數(shù)據(jù)int data = rand() % 10;rq->Push(data);printf("%s is running... produce a data: %d\n", name.c_str(), data);usleep(1000000);}
}int main()
{srand((unsigned int)time(nullptr));pthread_t c[3], p[5];RingQueue<int> *rq = new RingQueue<int>(); vector<Message<int>*> messages; for (int i = 0; i < 5; i++){Message<int> *message = new Message<int>("Produttor Thread "+to_string(i), rq);pthread_create(p + i, nullptr, Productor, message);messages.push_back(message);}for (int i = 0; i < 3; i++){Message<int> *message = new Message<int>("Consumer Thread "+to_string(i), rq);pthread_create(c + i, nullptr, Consumer, message);messages.push_back(message);}for (int i = 0; i < 3; i++){pthread_join(c[i], nullptr);}for (int i = 0; i < 5; i++){pthread_join(p[i], nullptr);}for (auto message : messages){delete message;}delete rq;return 0;
}
3.3 基于任務(wù)的多生產(chǎn)多消費(fèi)模型
RingQueue
的內(nèi)容不變
// Task.h
#include <iostream>
#include <string>enum
{DIVERROR = 1,MODERROR,UNKNOWERRROR
};class Task
{
public:Task(int a = 0, int b = 0, char op = '+'):data1_(a), data2_(b), op_(op), result_(0), exitcode_(0){}void run(){switch(op_){case '+':result_ = data1_ + data2_;break;case '-':result_ = data1_ - data2_;break;case '*':result_ = data1_ * data2_;break;case '/':if(data2_ == 0) exitcode_ = DIVERROR;else result_ = data1_ / data2_;break;case '%':if(data2_ == 0) exitcode_ = MODERROR;else result_ = data1_ % data2_;break;default:exitcode_ = UNKNOWERRROR;break;}}std::string result_to_string(){std::string ret = std::to_string(data1_);ret += ' ';ret += op_;ret += ' ';ret += std::to_string(data2_);ret += ' ';ret += '=';ret += ' ';ret += std::to_string(result_);ret += "[exitcode: ";ret += std::to_string(exitcode_);ret += ']';return ret;}std::string get_task(){std::string ret = std::to_string(data1_);ret += ' ';ret += op_;ret += ' ';ret += std::to_string(data2_);ret += ' ';ret += '=';ret += ' ';ret += '?';return ret;}
private:int data1_;int data2_;char op_;int result_;int exitcode_;
};
// main.cc
#include "RingQueue.hpp"
#include <iostream>
#include <unistd.h>
#include <vector>
#include "Task.h"using namespace std;const std::string opers = "+-*/%";void *Consumer(void *args)
{Message<Task> *message = static_cast<Message<Task> *>(args);RingQueue<Task> *rq = message->get_ringqueue();string name = message->get_thread_name();while (true){// 獲取任務(wù)// int data = 0;Task task;rq->Pop(&task);// 對任務(wù)做處理task.run();printf("%s is running... get a data: %s\n", name.c_str(), task.result_to_string().c_str());// 模擬處理數(shù)據(jù)// usleep(1000000);}
}void *Productor(void *args)
{Message<Task> *message = static_cast<Message<Task> *>(args);RingQueue<Task> *rq = message->get_ringqueue();string name = message->get_thread_name();int len = opers.size();while (true){// 獲取數(shù)據(jù)// usleep(1000000); // 模擬獲取數(shù)據(jù)// int data = rand() % 10;// 模擬獲取數(shù)據(jù)int data1 = rand() % 10 + 1; // [1, 10]usleep(10);int data2 = rand() % 13; // [0, 13]usleep(10);char op = opers[rand() % len];Task task(data1, data2, op);// 生產(chǎn)數(shù)據(jù)rq->Push(task);// printf("%s is running... produce a data: %d\n", name.c_str(), data);printf("%s is running... produce a Task: %s\n", name.c_str(), task.get_task().c_str());usleep(1000000);}
}int main()
{srand((unsigned int)time(nullptr));pthread_t c[3], p[2];RingQueue<Task> *rq = new RingQueue<Task>(); vector<Message<Task>*> messages; for (int i = 0; i < 5; i++){Message<Task> *message = new Message<Task>("Produttor Thread "+to_string(i), rq);pthread_create(p + i, nullptr, Productor, message);messages.push_back(message);}for (int i = 0; i < 3; i++){Message<Task> *message = new Message<Task>("Consumer Thread "+to_string(i), rq);pthread_create(c + i, nullptr, Consumer, message);messages.push_back(message);}// 等待子線程for (int i = 0; i < 3; i++){pthread_join(c[i], nullptr);}for (int i = 0; i < 5; i++){pthread_join(p[i], nullptr);}// 釋放資源for (auto message : messages){delete message;}delete rq;return 0;
}
四、結(jié)語
今天的分享到這里就結(jié)束啦!如果覺得文章還不錯(cuò)的話,可以三連支持一下,春人的主頁還有很多有趣的文章,歡迎小伙伴們前去點(diǎn)評(píng),您的支持就是春人前進(jìn)的動(dòng)力!