動態(tài)網(wǎng)站完整版百度pc網(wǎng)頁版
1 背景
- STL的容器不是線程安全的,我們經(jīng)常會有需求要求數(shù)據(jù)結(jié)構(gòu)線程安全,比如寫生產(chǎn)者消費(fèi)者模型的時候,就要求隊(duì)列線程安全。
- 利用std::queue和C++線程標(biāo)準(zhǔn)庫的一些組件(mutex,condition_variable),可以寫一個線程安全的隊(duì)列ConcurrenceQueue。
2 思路梳理
需要4個函數(shù)
- push,入隊(duì);
- pop,出隊(duì)并返回原來對頭的元素,如果為隊(duì)空則阻塞;
- tryPop,出隊(duì)并返回原來對頭的元素,如果隊(duì)空返回空(使用智能指針作返回類型),非阻塞;
- empty,返回是否為空,實(shí)則沒啥用,多線程條件下判空,下一瞬間另一線程就可能push進(jìn)去東西了。
3 實(shí)現(xiàn)代碼
#ifndef __CONCURRENCEQUEUE_H__
#define __CONCURRENCEQUEUE_H__
#include <mutex>
#include <condition_variable>
#include <deque>
#include <queue>
#include <memory>template<typename DATATYPE, typename SEQUENCE = std::deque<DATATYPE>>
class ConcurrenceQueue
{
public:ConcurrenceQueue() = default;ConcurrenceQueue(const ConcurrenceQueue & other){std::lock_guard<std::mutex> lg(other.m_mutex);m_data = other.m_data;}ConcurrenceQueue(ConcurrenceQueue &&) = delete;ConcurrenceQueue & operator= (const ConcurrenceQueue &) = delete;~ConcurrenceQueue() = default;bool empty() const {std::lock_guard<std::mutex> lg(m_mutex);return m_data.empty();}void push(const DATATYPE & data) {std::lock_guard<std::mutex> lg(m_mutex);m_data.push(data);m_cond.notify_one();}void push(DATATYPE && data) {std::lock_guard<std::mutex> lg(m_mutex);m_data.push(std::move(data));m_cond.notify_one();}std::shared_ptr<DATATYPE> tryPop() { // 非阻塞std::lock_guard<std::mutex> lg(m_mutex);if (m_data.empty()) return {};auto res = std::make_shared<DATATYPE>(m_data.front());m_data.pop();return res;}std::shared_ptr<DATATYPE> pop() { // 非阻塞std::unique_lock<std::mutex> lg(m_mutex);m_cond.wait(lg, [this] { return !m_data.empty(); });auto res = std::make_shared<DATATYPE>(std::move(m_data.front()));m_data.pop();return res;}private:std::queue<DATATYPE, SEQUENCE> m_data;mutable std::mutex m_mutex;std::condition_variable m_cond;
};
#endif
?4 測試
全局的:
ConcurrenceQueue<int> g_queue;void producer()
{ for (int i = 0; i < 100; ++i) {g_queue.push(i);std::this_thread::sleep_for(std::chrono::seconds(3));}
}void consumer1()
{while (1) {std::printf("[1] ------- %d\n", *g_queue.pop());}
}void consumer2()
{while (1) {auto front = g_queue.tryPop();std::printf("[2] ------- %d\n", front ? *front : -1);std::this_thread::sleep_for(std::chrono::seconds(1));}
}
測試 1:(消費(fèi)者阻塞式消費(fèi))
int main ()
{std::thread t1(producer);std::thread t2(consumer1);t1.join();t2.join();return 0;
}
測試 2:(消費(fèi)者非阻塞式消費(fèi),但要sleep輪詢)
int main ()
{std::thread t1(producer);std::thread t2(consumer2);t1.join();t2.join();return 0;
}