眉山市住房和城鄉(xiāng)建設(shè)局網(wǎng)站西安推廣平臺排行榜
一、線程池概念
??????? 線程池是一種多線程處理方式,它包含一個(gè)線程工作隊(duì)列和一個(gè)任務(wù)隊(duì)列。當(dāng)有任務(wù)需要處理時(shí),線程池會從線程工作隊(duì)列中取出一個(gè)空閑線程來處理任務(wù),如果線程工作隊(duì)列中沒有空閑線程,則任務(wù)會被放入任務(wù)隊(duì)列中等待處理。當(dāng)線程處理完任務(wù)后,它會返回線程工作隊(duì)列中等待下一個(gè)任務(wù)的到來。線程池的好處是可以避免頻繁創(chuàng)建和銷毀線程,提高程序的效率和穩(wěn)定性。
??????? 在c++中,所謂的任務(wù)隊(duì)列就是一個(gè)雙端隊(duì)列,采用先進(jìn)先出的原則,而任務(wù)則是一些系列的功能函數(shù)。所謂線程工作隊(duì)列,就是一組線程,這些線程通常采用容器管理起來。
????????線程池可以應(yīng)用于需要頻繁創(chuàng)建和銷毀線程的場景,例如網(wǎng)絡(luò)服務(wù)器、數(shù)據(jù)庫連接池等。通過線程池,可以提高線程的復(fù)用率,減少線程創(chuàng)建和銷毀的開銷,從而提高系統(tǒng)的性能和穩(wěn)定性。同時(shí),線程池還可以控制并發(fā)線程的數(shù)量,避免系統(tǒng)資源被過度占用,保證系統(tǒng)的可靠性和可用性。
??????? 線程池一般可以提高程序的效率,尤其是具有延時(shí)等待、復(fù)雜交互、共享線程等情況,因?yàn)樗梢员苊忸l繁地創(chuàng)建和銷毀線程,從而減少了系統(tǒng)開銷。同時(shí),線程池還可以控制線程的數(shù)量,避免線程數(shù)量過多導(dǎo)致系統(tǒng)資源的浪費(fèi)。在使用線程池時(shí),需要注意線程安全問題,例如共享變量的訪問、線程間的同步等??梢酝ㄟ^加鎖、使用原子操作等方式來保證線程安全。但是有時(shí)候采用線程池是個(gè)不明智的舉動,例如那些可以連續(xù)執(zhí)行,無需等待,延時(shí)的純計(jì)算場景。
二、任務(wù)隊(duì)列設(shè)計(jì)
??????? 通常任務(wù)隊(duì)列就是將一個(gè)個(gè)任務(wù)(即函數(shù)對象或函數(shù)指針)裝載進(jìn)一個(gè)隊(duì)列內(nèi),然后各個(gè)線程從該隊(duì)列中取出任務(wù)執(zhí)行。任務(wù)進(jìn)入隊(duì)列遵循先進(jìn)先出原則,當(dāng)然復(fù)雜的可以給任務(wù)執(zhí)行優(yōu)先級,進(jìn)入隊(duì)列時(shí)按優(yōu)先級排序。另外為了隊(duì)列跨線程使用的安全,需要通過互斥變量來約束。
??????? 創(chuàng)建taskqueue.h和taskqueue.cpp源文件,在taskqueue.h創(chuàng)建一個(gè)TaskQueue類模板,其內(nèi)置一個(gè)std::deque容器和一個(gè)std::mutex互斥鎖及一些輔助變量,提供任務(wù)入列、出列、大小及空隊(duì)列判斷等成員函數(shù),內(nèi)容如下:
#ifndef _TASK_QUEUE_H_
#define _TASK_QUEUE_H_
/************************************************************************Copyright 2023-03-06, pyfree**File Name : taskqueue.h*File Mark : *Summary : *任務(wù)數(shù)據(jù)隊(duì)列類,線程安全**Current Version : 1.00*Author : pyfree*FinishDate :**Replace Version :*Author :*FinishDate :************************************************************************/
#include <queue>
#include <mutex>
#include <string>template <typename T>
class TaskQueue
{
public:TaskQueue(std::string desc = "taskqueue_for_threadsafe");~TaskQueue();///*** 獲取隊(duì)列大小* @return {int } 隊(duì)列大小*/size_t size();/*** 判定隊(duì)列是否為空* @return {bool } 是否為空隊(duì)列*/bool empty();/*** 獲取隊(duì)列頭元素* @param it {T&} 頭元素* @return {bool } 是否成功*/bool get_front(T &it);/*** 刪除元素* @return {bool } 是否成功*/bool pop_front();/*** 獲取隊(duì)列頭元素,并從隊(duì)列終刪除* @param it {T&} 頭元素* @return {bool } 是否成功*/bool pop_front(T &it);/*** 從隊(duì)列頭開始逐步獲取多個(gè)元素,并剔除* @param its {queue<T>&} 獲取到的元素集* @param sizel {int} 一次獲取多少個(gè)* @return {bool } 至少獲取一個(gè)元素以上則成功*/bool get_queue(std::queue<T> &its,unsigned int sizel=5);/*** 從隊(duì)列尾部添加元素* @param it {T} 被添加元素* @return {void } 無返回*/void add_back(T it);/*** 從隊(duì)列頭部添加元素* @param it {T} 被添加元素* @return {void } 無返回*/void add_front(T it);/*** 清空元素* @return {void }*/void clear();/*** 重置容器元素大小限制* @param sizel {unsigned int} 容器限制長度* @return {void }*/void reQSize(unsigned int sizel);
private:TaskQueue& operator=(const TaskQueue&) {return this;};void over_log(bool addfront_flag);
protected:std::string queue_desc;
private:/點(diǎn)集轉(zhuǎn)發(fā)//協(xié)議解析結(jié)果緩存std::deque<T> cache_queue; //隊(duì)列容器std::mutex m_Mutex; //線程鎖,c++11unsigned int QSize; //隊(duì)列大小約束,超出是會從隊(duì)列頭剔除舊數(shù)據(jù)騰出空位在對末添加數(shù)據(jù)int queue_overS; //隊(duì)列溢出次數(shù)計(jì)數(shù)
};#include "taskqueue.cpp"#endif //_TASK_QUEUE_H_
??????? 由于這一個(gè)類模板,因此要遵循類模板聲明定義分離一些規(guī)則,因此在頭文件結(jié)尾添加了#include "taskqueue.cpp"(PS,本專欄的無可避免的模板編程實(shí)踐(篇一)課題2.4節(jié)講述過原因)。模板還定義了容器隊(duì)列長度約束,防止容器一直添加內(nèi)容而無線程處理的異常情況出現(xiàn),并會打印容器溢出日志信息。類模板的TaskQueue定義放置在taskqueue.cpp實(shí)現(xiàn),由于頭文件結(jié)尾添加了#include "taskqueue.cpp",其實(shí)相當(dāng)于是在頭文件實(shí)現(xiàn),這樣分離只是為了統(tǒng)一排版。
#include "taskqueue.h"#include <stdio.h>template <typename T>
TaskQueue<T>::TaskQueue(std::string desc): queue_desc(desc), QSize(100), queue_overS(0)
{};template <typename T>
TaskQueue<T>::~TaskQueue()
{}
//
template <typename T>
size_t TaskQueue<T>::size()
{std::unique_lock<std::mutex> lock(m_Mutex);return cache_queue.size();
}template <typename T>
bool TaskQueue<T>::empty()
{std::unique_lock<std::mutex> lock(m_Mutex); return cache_queue.empty();
}template <typename T>
bool TaskQueue<T>::get_front(T &it)
{std::unique_lock<std::mutex> lock(m_Mutex);bool ret = !cache_queue.empty();if (ret) { it = cache_queue.front(); }return ret;
}template <typename T>
bool TaskQueue<T>::pop_front()
{std::unique_lock<std::mutex> lock(m_Mutex);bool ret = !cache_queue.empty();if (ret) { cache_queue.pop_front();}return ret;
}template <typename T>
bool TaskQueue<T>::pop_front(T &it)
{std::unique_lock<std::mutex> lock(m_Mutex);bool ret = !cache_queue.empty();if (ret) {it = cache_queue.front();cache_queue.pop_front();}return ret;
};template <typename T>
bool TaskQueue<T>::get_queue(std::queue<T> &its,unsigned int sizel)
{std::unique_lock<std::mutex> lock(m_Mutex);while (!cache_queue.empty()){its.push(cache_queue.front());cache_queue.pop_front();if (its.size() >= sizel){break;}}return !its.empty();
};template <typename T>
void TaskQueue<T>::add_back(T it)
{{std::unique_lock<std::mutex> lock(m_Mutex);if (cache_queue.size() > QSize) {queue_overS++;cache_queue.pop_front();}cache_queue.push_back(it);} over_log(false);
}template <typename T>
void TaskQueue<T>::add_front(T it)
{{std::unique_lock<std::mutex> lock(m_Mutex);if (cache_queue.size() > QSize) {queue_overS++;cache_queue.pop_front();}cache_queue.push_front(it);}over_log(true);
}template <typename T>
void TaskQueue<T>::clear()
{std::unique_lock<std::mutex> lock(m_Mutex);cache_queue.clear();queue_overS = 0;
}template <typename T>
void TaskQueue<T>::over_log(bool addfront_flag)
{if (queue_overS >= 10) {//每溢出10次,報(bào)告一次if(addfront_flag)printf("add item to queue %s at first,but the size of TaskQueue is up to limmit size: %d.\n", queue_desc.c_str(), QSize);elseprintf("add item to queue %s at end,but the size of TaskQueue is up to limmit size: %d.\n", queue_desc.c_str(), QSize); queue_overS = 0;}
}template <typename T>
void TaskQueue<T>::reQSize(unsigned int sizel)
{QSize = sizel;
}
??????? 其實(shí)一直跟著本人博客的應(yīng)該很好理解這些代碼,畢竟數(shù)據(jù)隊(duì)列在前面將TCP/Socket通信開發(fā)實(shí)戰(zhàn)案例博文內(nèi)就講述過,這里很類似,只是做了一些調(diào)整而已。
三、線程池類設(shè)計(jì)
??????? 線程池的設(shè)計(jì),首選它包含一個(gè)任務(wù)隊(duì)列,然后包括一組任務(wù)線程,這組線程采用std::vector來裝載管理,任務(wù)線程是用來執(zhí)行任務(wù)的。簡單來說就是從任務(wù)隊(duì)列取出任務(wù),把活干了。
#ifndef _THREAD_EPOLL_H_
#define _THREAD_EPOLL_H_
#include "taskqueue.h"#include <functional>
#include <thread>
#include <future>class ThreadPool
{
public:ThreadPool(const int n_threads = 5); //線程池顯式構(gòu)造函數(shù),默認(rèn)創(chuàng)建及初始化5個(gè)線程~ThreadPool();void init(); // 初始化線程池void close(); // 等待正執(zhí)行任務(wù)完成并關(guān)閉線程池void setTaskQSize(unsigned int sizel);// 提交一個(gè)任務(wù)(function)給線程池做異步處理template <typename F, typename... Args>auto addTask(F &&f, Args &&...args) -> std::future<decltype(f(args...))>;//指定auto返回值格式std::future<>friend class ThreadWorker; //友元線程工作類
private:ThreadPool(const ThreadPool &) = delete;ThreadPool(ThreadPool &&) = delete;ThreadPool &operator=(const ThreadPool &) = delete;ThreadPool &operator=(ThreadPool &&) = delete;
private:bool m_running; // 線程池是否處于運(yùn)行態(tài)勢TaskQueue<std::function<void()> > m_taskqueue; // 任務(wù)隊(duì)列,跨線程安全std::vector<std::thread> m_threads; // 作業(yè)線程隊(duì)列std::mutex m_conditional_mutex; // 線程休眠鎖互斥變量std::condition_variable m_conditional_lock; // 線程環(huán)境鎖,可以讓線程處于休眠或者喚醒狀態(tài)
};class ThreadWorker // 工作類線程
{
private:int m_id; // 工作idThreadPool *m_pool; // 所屬線程池
public:ThreadWorker(ThreadPool *pool, const int id);// 構(gòu)造函數(shù)void operator()(); // 重載()操作
};
#include "thread_pool.cpp" //有模板成員函數(shù)存在
#endif //_THREAD_EPOLL_H_
??????? 由于addTask函數(shù)是成員函數(shù)模板,因此同樣在頭文件末尾添加#include "thread_pool.cpp"。
????????下來就是實(shí)現(xiàn)線程池的重頭戲了,其中最關(guān)鍵的就是添加任務(wù)進(jìn)入隊(duì)列的addTask函數(shù),因?yàn)槲覀兊娜蝿?wù)隊(duì)列元素是一個(gè)函數(shù)對象,采用的是通用多態(tài)函數(shù)包裝器std::function類模板包裹的void()函數(shù)對象。而往線程池添加任務(wù)是各種函數(shù)及參數(shù)集。通過轉(zhuǎn)發(fā)調(diào)用包裝器std::bind函數(shù)模板,將傳入函數(shù)及參數(shù)集轉(zhuǎn)為函數(shù)對象,再由類模板 std::packaged_task
包裝可調(diào)用 (Callable) 目標(biāo)函數(shù),即std::bind函數(shù)模板綁定轉(zhuǎn)換后的函數(shù)對象。最后采用std::make_shared函數(shù)模板將函數(shù)包裝為 std::shared_ptr實(shí)例,隨即將該實(shí)例通過lambda表達(dá)式的函數(shù)包裝器,最終轉(zhuǎn)換為滿足可復(fù)制構(gòu)造 (CopyConstructible) 和可復(fù)制賦值 (CopyAssignable)的函數(shù)對象void()加入任務(wù)隊(duì)列。
????????thread_pool.cpp
#include "thread_pool.h"
//ThreadPool類
// 線程池構(gòu)造函數(shù)
ThreadPool::ThreadPool(const int n_threads): m_threads(std::vector<std::thread>(n_threads)), m_running(false)
{
}ThreadPool::~ThreadPool()
{close();std::unique_lock<std::mutex> lock(m_conditional_mutex);m_taskqueue.clear();m_threads.clear();
}
//
void ThreadPool::init()
{for (int i = 0; i < m_threads.size(); ++i){m_threads.at(i) = std::thread(ThreadWorker(this, i)); // 分配工作線程}m_running = true;
}
//
void ThreadPool::close()
{m_running = false;m_conditional_lock.notify_all(); // 通知,喚醒所有工作線程for (int i = 0; i < m_threads.size(); ++i){if (m_threads.at(i).joinable()) // 判斷線程是否在等待{m_threads.at(i).join(); // 將線程加入到等待隊(duì)列}}
}void ThreadPool::setTaskQSize(unsigned int sizel)
{std::unique_lock<std::mutex> lock(m_conditional_mutex);m_taskqueue.reQSize(sizel);
}
// 關(guān)鍵函數(shù)
template <typename F, typename... Args>
auto ThreadPool::addTask(F &&f, Args &&...args) -> std::future<decltype(f(args...))>
{// 采用std::bind綁定函數(shù)f及形參集args...創(chuàng)建函數(shù)對象(decltype獲取函數(shù)對象類型)std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...); /*以 args-func 為 <T> 的構(gòu)造函數(shù)參數(shù)列表,構(gòu)造 T 類型對象并將它包裝于 std::shared_ptr,*T是類模板 std::packaged_task 包裝的可調(diào)用 (Callable) 目標(biāo)函數(shù),即std::bind綁定的f(arg...)*可移動的共享互斥體所有權(quán)包裝,封裝到共享指針中,支持復(fù)制構(gòu)造,并隱藏了函數(shù)參數(shù)集*/auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()> >(func);//再次包裝,構(gòu)建一個(gè)lambda表達(dá)式的函數(shù)包裝器,滿足可復(fù)制構(gòu)造 (CopyConstructible) 和可復(fù)制賦值 (CopyAssignable)的函數(shù)對象void()std::function<void()> warpper_func = [task_ptr](){(*task_ptr)();};m_taskqueue.add_back(warpper_func); // 隊(duì)列通用安全封包函數(shù),并壓入安全隊(duì)列m_conditional_lock.notify_one(); // 喚醒一個(gè)等待中的線程return task_ptr->get_future(); // 返回先前注冊的任務(wù)指針,task_ptr類型為std::packaged_task
}
//ThreadWorker類
// 構(gòu)造函數(shù)
ThreadWorker::ThreadWorker(ThreadPool *pool, const int id) : m_pool(pool), m_id(id)
{
}
// 重載()操作
void ThreadWorker::operator()()
{std::function<void()> func; // 定義基礎(chǔ)函數(shù)類funcbool result = false; // 是否成功從隊(duì)列中取出元素while (m_pool->m_running){{//鎖脫離作用域解鎖// 為線程環(huán)境加鎖,互訪問工作線程的休眠和喚醒std::unique_lock<std::mutex> lock(m_pool->m_conditional_mutex);if (m_pool->m_taskqueue.empty()) // 如果任務(wù)隊(duì)列為空,阻塞當(dāng)前線程{m_pool->m_conditional_lock.wait(lock); // 等待條件變量通知,開啟線程}result = m_pool->m_taskqueue.pop_front(func); // 取出任務(wù)隊(duì)列中的首元素}if (result) func(); // 如果成功取出,執(zhí)行工作函數(shù)}
}
??????? 上述代碼可以看到,創(chuàng)建的線程是以工作分配管理線程類ThreadWorker的operator操作函數(shù)(這個(gè)知識點(diǎn)可以參考本主題篇四的博文)為線程功能函數(shù)來實(shí)現(xiàn)線程循環(huán)體的,就是不斷從任務(wù)隊(duì)列中獲取到任務(wù),干活。
四、線程池應(yīng)用測試
??????? 上面代碼就設(shè)計(jì)了一個(gè)基本的線程池了,下來就調(diào)用問題。
????????創(chuàng)建test.h和test.cpp源文件,直接上代碼,如下。有兩個(gè)函數(shù):函數(shù)threadPool_test顧名思義就是測試線程池的,main_doit用來測試純主線程執(zhí)行同等任務(wù)的效率;return_variance函數(shù)就帶返回值的線程功能函數(shù)(即工作任務(wù)),out_variance函數(shù)是返回值為void,但是返回信息是通過參數(shù)引用傳遞的線程功能函數(shù):
//test.h
#ifndef _TEST_H_
#define _TEST_H_void threadPool_test();
void main_doit();#endif //_TEST_H_
//test.cpp
#include <iostream>
#include <math.h>
#include <random>
#include <chrono>
#include <unistd.h> //usleep
#include "thread_pool.h"const unsigned int sizel = 100000;
auto end_poll = std::chrono::system_clock::now();double return_variance(int min, int max)
{if(min>max) return 0.0;int size = (max-min+1);double sum = ((double)(min+max)*size)/2.0;double mean = sum/size;// 求方差double variance = 0.0;for (int i = min ; i <= max ; i++){variance = variance + pow(i-mean,2);}variance = variance/size;// std::cout<<mean<<" \n"; // 均值// std::cout<<variance<<" \n"; // 方差//usleep(10); //讓任務(wù)等待一下...//測試線程池執(zhí)行效率使用,只能大概,畢竟先進(jìn)先出if((min+1)==sizel) end_poll = std::chrono::system_clock::now(); return variance;
};void out_variance(double &out,int min, int max)
{out = return_variance(min,max);
};void main_doit()
{//測試主線程自己執(zhí)行任務(wù)時(shí),間隔時(shí)間// 提交方差操作,總共100個(gè)auto start = std::chrono::system_clock::now();for (size_t i = 0; i < sizel; i++){return_variance(i,i+rand()/sizel);}auto end = std::chrono::system_clock::now();std::chrono::duration<double> diff = end-start;std::cout << "main product and consum diff.count() = " << diff.count() << "\n";//主線程生成加執(zhí)行一條龍
}
//
void threadPool_test()
{//測試線程池執(zhí)行任務(wù)時(shí),主線程經(jīng)歷間隔時(shí)間// 創(chuàng)建10個(gè)線程的線程池ThreadPool pool(10);pool.setTaskQSize(sizel);//重設(shè)隊(duì)列大小限制,防止溢出// 初始化線程池pool.init();// 提交方差操作,總共100個(gè)auto start = std::chrono::system_clock::now();for (size_t i = 0; i < sizel; i++){pool.addTask(return_variance, i, i+rand()/sizel);}auto end = std::chrono::system_clock::now();std::chrono::duration<double> diff = end-start;start = std::chrono::system_clock::now();//生產(chǎn)完成時(shí)刻std::cout << "main product diff.count() = " << diff.count() << "\n";//主線程生成任務(wù)//取得返回值測試(任務(wù)結(jié)果)pool.setTaskQSize(100);//重新設(shè)回容器大小限制// 使用ref傳遞的輸出參數(shù)提交函數(shù)double output_ref;auto future1 = pool.addTask(out_variance, std::ref(output_ref), 9, 99);// 等待操作輸出完成future1.get();std::cout << "[9,99] variance is equals to " << output_ref << std::endl;// 使用return參數(shù)提交函數(shù)auto future2 = pool.addTask(return_variance, 8, 88);// 等待乘法輸出完成double variance = future2.get();std::cout << "[8,88] variance is equals to " << variance << std::endl;// 關(guān)閉線程池pool.close();diff = end_poll-start;//線程池執(zhí)行任務(wù)時(shí)間間隔std::cout << "main consum diff.count() = " << diff.count() << "\n";//線程池執(zhí)行任務(wù)
}
??????? 另外可以通過線程池的addTask返回對象(std::future)實(shí)現(xiàn)對函數(shù)返回值的獲取,std::future的知識點(diǎn)見本課題的篇二的“3.6 線程輔助-Future”。
????????在main.cpp源文件內(nèi),調(diào)用者兩個(gè)函數(shù)來測試:
#include "test.h"int main(int argc, char* argv[])
{threadPool_test();main_doit();return 0;
}
??????? 編譯及測試g++ main.cpp test.cpp -o test.exe -std=c++11,運(yùn)行測程序:
???????? 是不是很驚訝,多線程怎么還比主線程單獨(dú)執(zhí)行這些同等任務(wù)更高效。正如前面所說的,主線程向線程池添加任務(wù)是通過一些列轉(zhuǎn)換的,目的就是為了適應(yīng)各個(gè)功能函數(shù)轉(zhuǎn)換為統(tǒng)一的函數(shù)對象,但是這樣就增加了很多計(jì)算成本,以及線程鎖的使用,各個(gè)線程進(jìn)行了資源競爭,效率當(dāng)然就差了。尤其還是這種純粹計(jì)算、連續(xù)執(zhí)行、無延時(shí)要求無等待要求的功能函數(shù)。
??????? 這就涉及到多線程編程最核心的問題了資源競爭。CPU有多核,可以同時(shí)執(zhí)行多個(gè)線程是沒有問題的。但是控制臺(資源)卻只有一個(gè),同時(shí)只能有一個(gè)線程擁有這個(gè)唯一的控制臺,將數(shù)字輸出。大多情況下,程序可能只分配了一個(gè)CPU內(nèi)核在運(yùn)算(本例子任務(wù)對資源的訴求是沒滿足系統(tǒng)多核調(diào)度的判定),各個(gè)線程只是每次運(yùn)行任務(wù)是搶占CPU的一段運(yùn)行時(shí)間而已,各個(gè)線程之間(包括主線程)相互搶奪及切換要耗費(fèi)大量時(shí)間,當(dāng)然就沒有單個(gè)主線程運(yùn)行來得快。
??????? 為此,又在關(guān)閉線程池前,加入了一句等待:
std::this_thread::sleep_for(std::chrono::seconds(100)); //等待,防止線程池沒完成就退出了
// 關(guān)閉線程池
pool.close();
??????? 重新編譯測試,去資源監(jiān)測器查看,的確創(chuàng)建了10個(gè)子線程,只是CPU使用壓力太小,無法促使系統(tǒng)做出更多CPU資源的傾斜分配。
????????所有我們在開篇時(shí)就指出,線程池可以應(yīng)用于需要頻繁創(chuàng)建和銷毀線程的場景,例如網(wǎng)絡(luò)服務(wù)器、數(shù)據(jù)庫連接池等,這些場景才是它們的主戰(zhàn)場。
??????? 當(dāng)然說到,在多CPU多核情況下,讓各個(gè)線程運(yùn)行在各個(gè)不同的CPU的內(nèi)核上,和控制臺線程(主線程)齊驅(qū)并駕,這就是多核并行,這就另外一個(gè)主題了。