網(wǎng)站建設(shè) 搜狐seo綜合查詢工具
目錄
- 簡(jiǎn)介
- 手寫(xiě)線程池
- 線程池結(jié)構(gòu)體分析
- task_t
- task_queue_t
- thread_pool_t
- 線程池函數(shù)分析
- thread_pool_create
- thread_pool_post
- thread_worker
- thread_pool_destroy
- wait_all_done
- thread_pool_free
- 主函數(shù)調(diào)用
- 運(yùn)行結(jié)果
簡(jiǎn)介
本線程池采用C語(yǔ)言實(shí)現(xiàn)
線程池的場(chǎng)景:
當(dāng)某些任務(wù)特別耗時(shí)(例如大量的IO讀寫(xiě)操作),嚴(yán)重影響線程其他的任務(wù)的執(zhí)行,可以使用線程池
線程池的一般特點(diǎn):
線程池通常是一個(gè)生產(chǎn)者-消費(fèi)者模型
生產(chǎn)者線程用于發(fā)布任務(wù),任務(wù)通常保存在任務(wù)隊(duì)列中
線程池作為消費(fèi)者,用于取出任務(wù),執(zhí)行任務(wù)
線程池中線程數(shù)量的選擇:
有一個(gè)經(jīng)驗(yàn)公式: 線程數(shù)量 =(io等待時(shí)間+cpu運(yùn)算時(shí)間)*核心數(shù)/cpu運(yùn)算時(shí)間
因此可以根據(jù)經(jīng)驗(yàn)公式得出下面兩種場(chǎng)景的線程數(shù)量:
- cpu密集任務(wù):線程數(shù)量=核心數(shù)(即上面的公式假設(shè)cpu運(yùn)算時(shí)間>>io等待時(shí)間)
- io密集任務(wù):線程數(shù)量=2*n+2
手寫(xiě)線程池
線程池代碼結(jié)構(gòu):
- thread_pool_create:創(chuàng)建線程池所需要的資源,包含不限于任務(wù)隊(duì)列,子線程的創(chuàng)建。
- thread_pool_post:用于任務(wù)的發(fā)布,將執(zhí)行任務(wù)存在任務(wù)隊(duì)列中。
- thread_pool_destroy:用于線程池的退出,以及資源的銷毀。
- wait_all_done:join線程池所有子線程,等待回收子線程。
- thread_worker:用于任務(wù)執(zhí)行。
主要的核心點(diǎn)集中在thread_pool_post和thread_worker兩個(gè)函數(shù)中,這兩個(gè)函數(shù)也構(gòu)成了生產(chǎn)者-消費(fèi)者模型。本文采用隊(duì)列+互斥鎖+條件變量實(shí)現(xiàn)。
線程池結(jié)構(gòu)體分析
由于C語(yǔ)言不像C++可以用類封裝函數(shù),因此線程池會(huì)使用結(jié)構(gòu)體來(lái)封裝一些變量或者函數(shù)指針。
task_t
封裝任務(wù)的入口指針以及參數(shù)。
typedef struct task_t {handler_pt func;void * arg;
} task_t;
task_queue_t
封裝任務(wù)隊(duì)列,為了不頻繁移動(dòng)隊(duì)列中數(shù)據(jù),此處采用頭尾索引來(lái)標(biāo)記任務(wù)。
typedef struct task_queue_t {uint32_t head;uint32_t tail;uint32_t count;task_t *queue;
} task_queue_t;
thread_pool_t
包含互斥鎖,條件變量,任務(wù)隊(duì)列等信息
struct thread_pool_t {pthread_mutex_t mutex;pthread_cond_t condition; //條件變量pthread_t *threads; //線程task_queue_t task_queue; //任務(wù)隊(duì)列int closed; //是否關(guān)閉線程池執(zhí)行的標(biāo)志,為1表示關(guān)閉int started; // 當(dāng)前正在運(yùn)行的線程數(shù)int thrd_count; //線程數(shù)int queue_size; //任務(wù)隊(duì)列大小
};
其中closed:表示是否關(guān)閉線程池執(zhí)行的標(biāo)志,為1表示關(guān)閉。在線程的運(yùn)行函數(shù)中,用來(lái)判斷是否繼續(xù)循環(huán)等待執(zhí)行任務(wù)隊(duì)列中的任務(wù)。
started:表示當(dāng)前正在運(yùn)行的線程數(shù)。在thread_pool_destroy函數(shù)中銷毀線程池時(shí),需要等待所有線程停止才行,即started == 0
線程池函數(shù)分析
thread_pool_create
創(chuàng)建線程池,初始化一些線程池屬性
通過(guò)循環(huán)pthread_create函數(shù)創(chuàng)建子線程。
thread_pool_t *thread_pool_create(int thrd_count, int queue_size) {thread_pool_t *pool;if (thrd_count <= 0 || queue_size <= 0) {return NULL;}pool = (thread_pool_t*) malloc(sizeof(*pool));if (pool == NULL) {return NULL;}pool->thrd_count = 0;pool->queue_size = queue_size;pool->task_queue.head = 0;pool->task_queue.tail = 0;pool->task_queue.count = 0;pool->started = pool->closed = 0;pool->task_queue.queue = (task_t*)malloc(sizeof(task_t)*queue_size);if (pool->task_queue.queue == NULL) {// TODO: free poolreturn NULL;}pool->threads = (pthread_t*) malloc(sizeof(pthread_t) * thrd_count);if (pool->threads == NULL) {// TODO: free poolreturn NULL;}int i = 0;for (; i < thrd_count; i++) {if (pthread_create(&(pool ->threads[i]), NULL, thread_worker, (void*)pool) != 0) {// TODO: free poolreturn NULL;}pool->thrd_count++;pool->started++;}return pool;
}
thread_pool_post
作為生產(chǎn)者,往任務(wù)隊(duì)列里面添加任務(wù)
通過(guò)pthread_cond_signal通知子喚醒子線程的pthread_cond_wait
int thread_pool_post(thread_pool_t *pool, handler_pt func, void *arg) {if (pool == NULL || func == NULL) {return -1;}task_queue_t *task_queue = &(pool->task_queue);
//此處用自旋鎖會(huì)更節(jié)省消耗,因?yàn)殒i里面的邏輯比較簡(jiǎn)單if (pthread_mutex_lock(&(pool->mutex)) != 0) {return -2;}if (pool->closed) {pthread_mutex_unlock(&(pool->mutex));return -3;}if (task_queue->count == pool->queue_size) {pthread_mutex_unlock(&(pool->mutex));return -4;}
//避免queue數(shù)據(jù)的變化,采用頭尾索引來(lái)標(biāo)識(shí)task_queue->queue[task_queue->tail].func = func;task_queue->queue[task_queue->tail].arg = arg;task_queue->tail = (task_queue->tail + 1) % pool->queue_size;task_queue->count++;
//喚醒一個(gè)休眠的線程if (pthread_cond_signal(&(pool->condition)) != 0) {pthread_mutex_unlock(&(pool->mutex));return -5;}pthread_mutex_unlock(&(pool->mutex));return 0;
}
thread_worker
pthread_cond_wait等待任務(wù)的喚醒
作為消費(fèi)者, (*(task.func))(task.arg);執(zhí)行任務(wù)
static void *thread_worker(void *thrd_pool) {thread_pool_t *pool = (thread_pool_t*)thrd_pool;task_queue_t *que;task_t task;for (;;) {pthread_mutex_lock(&(pool->mutex));que = &pool->task_queue;while (que->count == 0 && pool->closed == 0) {// 阻塞在 condition,等待任務(wù)隊(duì)列添加任務(wù)pthread_cond_wait(&(pool->condition), &(pool->mutex));}if (pool->closed == 1 && que->count == 0) break;//沒(méi)有任務(wù),并且關(guān)閉標(biāo)志打開(kāi),即跳出循環(huán)task = que->queue[que->head];que->head = (que->head + 1) % pool->queue_size;que->count--;pthread_mutex_unlock(&(pool->mutex));(*(task.func))(task.arg);//執(zhí)行對(duì)應(yīng)任務(wù)函數(shù)}pool->started--;//跳出循環(huán)之后,運(yùn)行線程數(shù)需要減1pthread_mutex_unlock(&(pool->mutex));pthread_exit(NULL);return NULL;
}
thread_pool_destroy
銷毀釋放線程池,置 pool->closed = 1;
通過(guò)pthread_cond_broadcast喚醒線程池所有線程,這個(gè)和thread_pool_post里的pthread_cond_signal一樣,并且broadcast會(huì)通知到所有的線程
int thread_pool_destroy(thread_pool_t *pool) {if (pool == NULL) {return -1;}if (pthread_mutex_lock(&(pool->mutex)) != 0) {return -2;}if (pool->closed) {thread_pool_free(pool);return -3;}pool->closed = 1;
//廣播形式,通知所有阻塞在condition的線程接觸阻塞if (pthread_cond_broadcast(&(pool->condition)) != 0 || pthread_mutex_unlock(&(pool->mutex)) != 0) {thread_pool_free(pool);return -4;}wait_all_done(pool);thread_pool_free(pool);return 0;
}
wait_all_done
將所有線程通過(guò)pthread_join回收,所有子線程任務(wù)執(zhí)行完畢,回收線程
int wait_all_done(thread_pool_t *pool) {printf("wait_all_done start!pool->thrd_count:%d\n", pool->thrd_count);int i, ret=0;for (i=0; i < pool->thrd_count; i++) {printf("wait_all_done doing! i:%d\n", i);if (pthread_join(pool->threads[i], NULL) != 0) {ret=1;}}printf("wait_all_done end!\n");return ret;
}
thread_pool_free
釋放線程池空間
static void thread_pool_free(thread_pool_t *pool) {if (pool == NULL || pool->started > 0) {return;}if (pool->threads) {free(pool->threads);pool->threads = NULL;pthread_mutex_lock(&(pool->mutex));pthread_mutex_destroy(&pool->mutex);pthread_cond_destroy(&pool->condition);}if (pool->task_queue.queue) {free(pool->task_queue.queue);pool->task_queue.queue = NULL;}free(pool);
}
主函數(shù)調(diào)用
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>#include "thrd_pool.h"int nums = 0;
int done = 0;
int task_num = 100;pthread_mutex_t lock;void do_task(void *arg) {usleep(10000);pthread_mutex_lock(&lock);done++;printf("doing %d task\n", done);pthread_mutex_unlock(&lock);
}int main(int argc, char **argv) {int threads = 8;int queue_size = 256;if (argc == 2) {threads = atoi(argv[1]);if (threads <= 0) {printf("threads number error: %d\n", threads);return 1;}} else if (argc > 2) {threads = atoi(argv[1]);queue_size = atoi(argv[1]);if (threads <= 0 || queue_size <= 0) {printf("threads number or queue size error: %d,%d\n", threads, queue_size);return 1;}}thread_pool_t *pool = thread_pool_create(threads, queue_size);if (pool == NULL) {printf("thread pool create error!\n");return 1;}while (thread_pool_post(pool, &do_task, NULL) == 0) {pthread_mutex_lock(&lock);nums++;pthread_mutex_unlock(&lock);if (nums > task_num) break;}printf("add %d tasks\n", nums);usleep(1000000);//延時(shí)等待所有的作業(yè)完成printf("did %d tasks\n", done);thread_pool_destroy(pool);return 0;
}
運(yùn)行結(jié)果
使用指令編譯文件:
gcc main.c thrd_pool.c -o main -lpthread
運(yùn)行執(zhí)行文件得到運(yùn)行結(jié)果
完整代碼下載線程池Linux C語(yǔ)言簡(jiǎn)單版本