推薦個做兼職的網(wǎng)站做網(wǎng)上推廣
1.描述問題
在完成線程池核心功能功能時,沒有遇到太大的問題(Any,Result,Semfore的設(shè)計),在做線程池資源回收時,遇到了死鎖的問題????????
1、在ThreadPool的資源回收,等待線程池所有線程退出時,發(fā)生死鎖問題,導(dǎo)致進程無法退出
死鎖代碼:
#include "threadpool.h"#include <thread>
#include <iostream>const int TASK_MAX_THRESHHOLD = INT32_MAX;
const int THREAD_MAX_THRESHHOLD = 100;
const int THREAD_MAX_IDLE_TIME = 60;//單位:秒//線程池構(gòu)造
ThreadPool::ThreadPool(): initThreadSize_(0), taskSize_(0), idleThreadSize_(0)//剛開始時還沒有線程, curThreadSize_(0), taskQueMaxThreshHold_(TASK_MAX_THRESHHOLD), threadSizeThreshHold_(THREAD_MAX_THRESHHOLD), poolMode_(PoolMode::MODE_FIXED), isPoolRunning_(false)
{}//線程池析構(gòu)
ThreadPool::~ThreadPool()
{isPoolRunning_ = false;notEmpty_.notify_all();//等待線程池里面所有的線程返回 有兩種狀態(tài):阻塞 & 正在執(zhí)行任務(wù)中std::unique_lock<std::mutex> lock(taskQueMtx_);exitCond_.wait(lock, [&]()->bool {return threads_.size() == 0; });
}//設(shè)置線程池的工作模式
void ThreadPool::setMode(PoolMode mode)
{if (checkRunningState())return;poolMode_ = mode;
}// 設(shè)置task任務(wù)隊列上限閾值
void ThreadPool::setTaskQueMaxThreshHold(int threshhold)
{if (checkRunningState())return;taskQueMaxThreshHold_ = threshhold;
}//設(shè)置線程池cached模式下線程閾值
void ThreadPool::setThreadSizeThreshHold(int threshhold)
{if (checkRunningState())return;if (poolMode_ == PoolMode::MODE_CACHED){threadSizeThreshHold_ = threshhold;}
}// 給線程池提交任務(wù) 用戶調(diào)用該接口,傳入任務(wù)對象,生產(chǎn)任務(wù)
Result ThreadPool::submitTask(std::shared_ptr<Task> sp)
{//獲取鎖std::unique_lock<std::mutex> lock(taskQueMtx_);//線程的通信 等待任務(wù)隊列有空余// 用戶提交任務(wù),最長不能阻塞超過1s,否則判斷提交任務(wù)失敗,返回//while (taskQue_.size() == taskQueMaxThreshHold_)//{// notFull_.wait(lock);//}/** wait:直到等待滿足條件(第二個參數(shù)lamada)才返回* wait_for:滿足條件返回真,到了約定的時間段(5s)返回假* wait_until:滿足條件返回真,到了約定的時間點(下周一)返回假*/if (!notFull_.wait_for(lock, std::chrono::seconds(1),[&]()->bool {return taskQue_.size() < (size_t)taskQueMaxThreshHold_; }))//等同于上面的語句,參數(shù):需要釋放的鎖 函數(shù)對象(要能滿足條件變量)//任務(wù)隊列中的任務(wù)數(shù)小于上限的閾值,否則就阻塞在這句{//表示notFull_等待1s,條件依然沒有滿足std::cerr << "task queue is full,submit task fail." << std::endl;//return task->getResult(); //Task Result 線程執(zhí)行完task,task對象就被析構(gòu)掉了return Result(sp, false);//返回臨時對象,應(yīng)該自動匹配右值的資源轉(zhuǎn)移,如果編譯不通過,把C++標(biāo)準(zhǔn)調(diào)高一點}//如果有空余,把任務(wù)放入任務(wù)隊列中taskQue_.emplace(sp);taskSize_++;//因為新放了任務(wù),任務(wù)隊列肯定不空了,在notEmpty_上進行通知,趕快分配線程執(zhí)行任務(wù)notEmpty_.notify_all();//cached模式 任務(wù)處理比較緊急 場景:小而快的任務(wù) 需要根據(jù)任務(wù)數(shù)量和空閑線程的數(shù)量,判斷是否需要創(chuàng)建新的線程出來if (poolMode_ == PoolMode::MODE_CACHED&& taskSize_ > idleThreadSize_&& curThreadSize_ < threadSizeThreshHold_){std::cout << ">>> create new thread..." << std::this_thread::get_id() << " exit!" << std::endl;//創(chuàng)建新的線程對象auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this, std::placeholders::_1));int threadId = ptr->getId();threads_.emplace(threadId, std::move(ptr));//threads_.emplace_back(std::move(ptr));//啟動線程threads_[threadId]->start();//修改線程個數(shù)相關(guān)的變量curThreadSize_++;idleThreadSize_++;}//返回任務(wù)的Result對象return Result(sp);// return task->getResult();
}//開啟線程池
void ThreadPool::start(int initThreadSize)
{//設(shè)置線程池的運行狀態(tài)isPoolRunning_=true;//記錄初始線程個數(shù)initThreadSize_ = initThreadSize;curThreadSize_ = initThreadSize;//創(chuàng)建線程對象for (int i = 0; i < initThreadSize_; i++){//創(chuàng)建thread線程對象的時候,把線程函數(shù)給到thread線程對象auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this, std::placeholders::_1));int threadId = ptr->getId();threads_.emplace(threadId, std::move(ptr));//threads_.emplace_back(std::move(ptr));//unique_ptr將左值引用的拷貝構(gòu)造和賦值都delete了,需要右值(進行資源轉(zhuǎn)移)}//啟動所有線程 std::vector<Thread*> threads_;for (int i = 0; i < initThreadSize_; i++){threads_[i]->start(); //需要去執(zhí)行一個線程函數(shù)idleThreadSize_++;//記錄初始空閑線程的數(shù)量}
}//定義線程函數(shù) 線程池的所有線程從任務(wù)隊列里面消費任務(wù)
void ThreadPool::threadFunc(int threadid) //線程函數(shù)返回,相應(yīng)的線程也就結(jié)束了
{/*std::cout << "begin threadFunc tid:" << std::this_thread::get_id() << std::endl;std::cout << "end threadFunc tid:" << std::this_thread::get_id() << std::endl;*/auto lastTime = std::chrono::high_resolution_clock().now();while (isPoolRunning_){std::shared_ptr<Task> task;{//先獲取鎖std::unique_lock<std::mutex> lock(taskQueMtx_);std::cout << "tid:" << std::this_thread::get_id()<< "嘗試獲取任務(wù)..." << std::endl;//cached模式下,有可能已經(jīng)創(chuàng)建了很多的線程,但是空閑時間超過60s,應(yīng)該把多余的線程結(jié)束回收掉(超過initThreadSize_數(shù)量的線程要進行回收)//當(dāng)前時間-上一次線程執(zhí)行的時間>60s//每一秒中返回一次 怎么區(qū)分:超時返回?還是有任務(wù)待執(zhí)行返回while (taskQue_.size() == 0){if (poolMode_ == PoolMode::MODE_CACHED){//條件變量,超時返回了if (std::cv_status::timeout == notEmpty_.wait_for(lock, std::chrono::seconds(1))){auto now = std::chrono::high_resolution_clock().now();auto dur = std::chrono::duration_cast<std::chrono::seconds>(now - lastTime);if (dur.count() >= THREAD_MAX_IDLE_TIME && curThreadSize_ > initThreadSize_){//開始回收當(dāng)前線程//記錄線程數(shù)量的相關(guān)變量的值修改//把線程對象從線程列表容器中刪除 沒有辦法 threadFunc <=>thread對象//threadid=>thread對象=》刪除threads_.erase(threadid);// 這個id不是std::this_thread::getid() 是自己生成的,我們自定義的curThreadSize_--;idleThreadSize_--;std::cout << "threadid:" << std::this_thread::get_id() << "exit!" << std::endl;return;}}}else{//等待notEmpty條件notEmpty_.wait(lock);}//線程池結(jié)束,回收線程資源if (!isPoolRunning_){threads_.erase(threadid);// 這個id不是std::this_thread::getid() 是自己生成的,我們自定義的std::cout << "threadid:" << std::this_thread::get_id() << "exit!" << std::endl;exitCond_.notify_all();return;}}idleThreadSize_--;//喚醒線程工作,空閑線程-1std::cout << "tid:" << std::this_thread::get_id()<< "獲取任務(wù)成功..." << std::endl;//從任務(wù)隊列中取一個任務(wù)出來task = taskQue_.front();taskQue_.pop();taskSize_--;//如果依然有剩余任務(wù),繼續(xù)通知其它的線程執(zhí)任務(wù)if (taskQue_.size() > 0){notEmpty_.notify_all();}//取出一個任務(wù),進行通知,通知可以繼續(xù)提交生產(chǎn)任務(wù)notFull_.notify_all();}//就應(yīng)該把鎖釋放掉//當(dāng)前線程負(fù)責(zé)執(zhí)行這個任務(wù)if (task != nullptr){//task->run();//執(zhí)行任務(wù);把任務(wù)的返回值setVal方法給到Resulttask->exec();}idleThreadSize_++;//線程執(zhí)行完任務(wù),空閑線程+1lastTime = std::chrono::high_resolution_clock().now();//更新線程執(zhí)行完任務(wù)的時間}threads_.erase(threadid);// 這個id不是std::this_thread::getid() 是自己生成的,我們自定義的std::cout << "threadid:" << std::this_thread::get_id() << "exit!" << std::endl;exitCond_.notify_all();
}bool ThreadPool::checkRunningState() const
{return isPoolRunning_;
}/// 線程方法實現(xiàn)
int Thread::generateId_ = 0;//線程構(gòu)造
Thread::Thread(ThreadFunc func):func_(func),threadId_(generateId_++)
{}//線程析構(gòu)
Thread::~Thread(){}//啟動線程
void Thread::start()
{//創(chuàng)建一個線程來執(zhí)行一個線程函數(shù)std::thread t(func_, threadId_);//C++11來說 線程對象t 和線程函數(shù)func_t.detach();//設(shè)置分離線程,線程對象t出作用域會析構(gòu),但是線程函數(shù)不能結(jié)束否則程序會掛掉,所以要將線程分離出去,做到二者互不影響//pthread_detach pthread_t設(shè)置成分離線程//主線程要用pthread_join回收線程,防止孤兒線程的出現(xiàn)} //獲取線程id
int Thread::getId()const
{return threadId_;
}/// Task方法實現(xiàn)
Task::Task():result_(nullptr)
{}void Task::exec()
{result_->setVal(run());//這里發(fā)生多態(tài)調(diào)用
}void Task::setResult(Result* res)
{result_ = res;
}/// Result方法的實現(xiàn)
Result::Result(std::shared_ptr<Task> task, bool isValid):isValid_(isValid),task_(task)
{task_->setResult(this);
}Any Result::get() // 用戶調(diào)用的
{if (!isValid_){return "";}//task任務(wù)如果沒有執(zhí)行完,這里會阻塞用戶的線程sem_.wait();//用戶調(diào)用get時,如果任務(wù)在線程池中,還沒有被執(zhí)行完,那么調(diào)用get方法的線程就會阻塞住return std::move(any_);//右值引用
}void Result::setVal(Any any)//誰調(diào)用的呢??
{//存儲task的返回值this->any_ = std::move(any);sem_.post();//已經(jīng)獲取的任務(wù)的返回值,增加信號量資源
}
?
我們的資源回收代碼如下:
//線程池析構(gòu)
ThreadPool::~ThreadPool()
{isPoolRunning_ = false;notEmpty_.notify_all();//等待線程池里面所有的線程返回 有兩種狀態(tài):阻塞 & 正在執(zhí)行任務(wù)中std::unique_lock<std::mutex> lock(taskQueMtx_);exitCond_.wait(lock, [&]()->bool {return threads_.size() == 0; });
}
現(xiàn)在,有的線程沒有被回收,線程隊列中還有線程,所以就一直阻塞等待了。
線程池的那個線程為什么沒有被回收掉?
(時而出現(xiàn),時而不出現(xiàn)的問題)
我們通過在windows上調(diào)試:
我們通過在Linux上進行g(shù)db調(diào)試
主要通過gdb attach到正在運行的進程,通過info threads,thread tid,bt等命令查看各個線程的調(diào)用堆棧信息,結(jié)合項目代碼,定位到發(fā)生死鎖的代碼片段,分析死鎖問題發(fā)生的原因
2.分析問題
原先針對上面的2種情況的處理方法如下:
第3種情況:
有的線程執(zhí)行完任務(wù),又進入while循環(huán)了
在這里有2種情況:
1、pool線程先獲取到鎖,線程池的線程獲取不到鎖,阻塞。
此時pool線程看wait條件,size>0,不滿足條件,就進入等待wait狀態(tài)了,并且把互斥鎖mutex釋放掉。
線程池的線程就獲取到鎖了,發(fā)現(xiàn)任務(wù)隊列沒有任務(wù)了,這個任務(wù)就在notEmpty條件變量上wait,但是此時pool線程沒有辦法再對這個條件變量notify了。
發(fā)生死鎖了!!!
2、線程池里的線程先獲取到鎖,發(fā)生任務(wù)隊列為空,在條件變量notEmpty上wait了,釋放鎖,然后pool線程搶到鎖,只是看exitCond條件變量的wait條件,看size還是大于0,還是死鎖了。
解決方法:pool線程獲取到鎖后再notify
//線程池析構(gòu)
ThreadPool::~ThreadPool()
{isPoolRunning_ = false;//等待線程池里面所有的線程返回 有兩種狀態(tài):阻塞 & 正在執(zhí)行任務(wù)中std::unique_lock<std::mutex> lock(taskQueMtx_);notEmpty_.notify_all();exitCond_.wait(lock, [&]()->bool {return threads_.size() == 0; });
}
?我們在消費者線程進行鎖+雙重判斷:
//定義線程函數(shù) 線程池的所有線程從任務(wù)隊列里面消費任務(wù)
void ThreadPool::threadFunc(int threadid)//線程函數(shù)返回,相應(yīng)的線程也就結(jié)束了
{auto lastTime = std::chrono::high_resolution_clock().now();//所有任務(wù)必須執(zhí)行完成,線程池才可以回收所有線程資源for (;;){std::shared_ptr<Task> task;{//先獲取鎖,我們要注意控制鎖的范圍,取完任務(wù),就釋放鎖std::unique_lock<std::mutex> lock(taskQueMtx_);std::cout << "tid:" << std::this_thread::get_id()<< "嘗試獲取任務(wù)..." << std::endl;//cached模式下,有可能已經(jīng)創(chuàng)建了很多的線程,但是空閑時間超過60s,應(yīng)該把多余的線程//結(jié)束回收掉(超過initThreadSize_數(shù)量的線程要進行回收)//當(dāng)前時間 - 上一次線程執(zhí)行的時間 > 60s//每一秒中返回一次 怎么區(qū)分:超時返回?還是有任務(wù)待執(zhí)行返回//鎖 + 雙重判斷while (taskQue_.size() == 0){//線程池要結(jié)束,回收線程資源if (!isPoolRunning_){threads_.erase(threadid);//std::this_thread::getid()std::cout << "threadid:" << std::this_thread::get_id() << " exit!"<< std::endl;exitCond_.notify_all();return;//線程函數(shù)結(jié)束,線程結(jié)束}if (poolMode_ == PoolMode::MODE_CACHED){//條件變量,超時返回了if (std::cv_status::timeout ==notEmpty_.wait_for(lock, std::chrono::seconds(1))){auto now = std::chrono::high_resolution_clock().now();auto dur = std::chrono::duration_cast<std::chrono::seconds>(now - lastTime);if (dur.count() >= THREAD_MAX_IDLE_TIME&& curThreadSize_ > initThreadSize_)//任務(wù)數(shù)量大于空閑線程數(shù)量{//開始回收當(dāng)前線程//記錄線程數(shù)量的相關(guān)變量的值修改//把線程對象從線程列表容器中刪除 沒有辦法 threadFunc《=》thread對象//通過threadid => thread對象 => 刪除threads_.erase(threadid);//std::this_thread::getid()curThreadSize_--;idleThreadSize_--;std::cout << "threadid:" << std::this_thread::get_id() << " exit!"<< std::endl;return;}}}else{//等待notEmpty條件notEmpty_.wait(lock);}//if (!isPoolRunning_)//{// threads_.erase(threadid);//std::this_thread::getid()// std::cout << "threadid:" << std::this_thread::get_id() << " exit!"// << std::endl;// exitCond_.notify_all();// return;//結(jié)束線程函數(shù),就是結(jié)束當(dāng)前線程了!//}}idleThreadSize_--;std::cout << "tid:" << std::this_thread::get_id()<< "獲取任務(wù)成功..." << std::endl;//從任務(wù)隊列種取一個任務(wù)出來task = taskQue_.front();taskQue_.pop();taskSize_--;//如果依然有剩余任務(wù),繼續(xù)通知其它得線程執(zhí)行任務(wù)if (taskQue_.size() > 0){notEmpty_.notify_all();}//取出一個任務(wù),進行通知,通知可以繼續(xù)提交生產(chǎn)任務(wù)notFull_.notify_all();} //就應(yīng)該把鎖釋放掉//當(dāng)前線程負(fù)責(zé)執(zhí)行這個任務(wù)if (task != nullptr){//task->run();//執(zhí)行任務(wù);把任務(wù)的返回值setVal方法給到Result,基類指針調(diào)用派生類對象的同名覆蓋方法task->exec();//用戶還是使用run方法}idleThreadSize_++;lastTime = std::chrono::high_resolution_clock().now();//更新線程執(zhí)行完任務(wù)的時間}
}