RM新时代网站-首页

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

線程池的基本概念

科技綠洲 ? 來(lái)源:Linux開發(fā)架構(gòu)之路 ? 作者:Linux開發(fā)架構(gòu)之路 ? 2023-11-10 16:37 ? 次閱讀

線程池的基本概念

不管線程池是什么東西!但是我們必須知道線程池被搞出來(lái)的目的就是:提高程序執(zhí)行效率而設(shè)計(jì)出來(lái)的;

了解了線程池的目的后:我們就可以開始理解線程池:

首先回答一個(gè)問題:為什么會(huì)有線程池?

呃呃,我這么問就很奇怪,因?yàn)榫€程池是什么我都沒說(shuō),怎么會(huì)知道為什么會(huì)有線程池呢?所以我打算帶大家去思考一個(gè)場(chǎng)景:

當(dāng)我們的程序中:有一批任務(wù)到來(lái)時(shí)候(通常該任務(wù)都是從網(wǎng)絡(luò)來(lái)的),我們就會(huì)創(chuàng)建一堆線程去處理這一批任務(wù);

雖然說(shuō)創(chuàng)建線程的成本開銷并不大,但是這里有個(gè)問題:當(dāng)我們?nèi)蝿?wù)來(lái)到時(shí)候,你才去創(chuàng)建線程去處理這個(gè)任務(wù),你不覺得這樣很慢嗎?

是否我們可以換個(gè)思路:假如我們有一種手段:使得任務(wù)一到來(lái),就可以馬上有線程去處理這批任務(wù),這樣是不是相對(duì)于前面等線程來(lái)到,再創(chuàng)建線程去處理時(shí)候快得多;

所以說(shuō):線程池就是基于上面的思路設(shè)計(jì)的;線程池就是:預(yù)先創(chuàng)建好一大批線程,同時(shí)線程池維護(hù)一個(gè)隊(duì)列,來(lái)存放到來(lái)的任務(wù),當(dāng)隊(duì)列中一旦有任務(wù)時(shí)候,預(yù)先創(chuàng)建好的一大批線程就可以并發(fā)處理這一批任務(wù)了;

我們抽象出一個(gè)模型:

任務(wù)派發(fā)者是誰(shuí)? 是生產(chǎn)者;

任務(wù)存儲(chǔ)的隊(duì)列是什么?是一個(gè)容器,數(shù)組,鏈表,只要是可以存放產(chǎn)品(數(shù)據(jù))的東西即可;

拿任務(wù)去處理的是誰(shuí)?是消費(fèi)者;

所以說(shuō):線程池本質(zhì)就是一個(gè)生產(chǎn)者消費(fèi)者的模型;

而我們線程池只需要關(guān)注兩個(gè)點(diǎn):一個(gè)存放任務(wù)的隊(duì)列,和消費(fèi)隊(duì)列任務(wù)的消費(fèi)者即可;而生產(chǎn)者暫時(shí)不用關(guān)注,因?yàn)樯a(chǎn)者是你外部搞出任務(wù)丟給線程池去使用;那么什么時(shí)候可以關(guān)心生產(chǎn)者呢?

也就是當(dāng)我們?nèi)ナ褂镁€程池的時(shí)候咯;這不就是妥妥的生產(chǎn)者消費(fèi)者模型嘛!

圖片

線程池實(shí)現(xiàn)的基本思路:

在各個(gè)編程語(yǔ)言的語(yǔ)種中都有線程池的概念,并且很多語(yǔ)言中直接提供了線程池,作為程序猿直接使用就可以了,下面給大家介紹一下線程池的實(shí)現(xiàn)原理:

線程池的組成主要分為 3 個(gè)部分,這三部分配合工作就可以得到一個(gè)完整的線程池:

任務(wù)隊(duì)列,存儲(chǔ)需要處理的任務(wù),由工作的線程來(lái)處理這些任務(wù)

通過(guò)線程池提供的 API 函數(shù),將一個(gè)待處理的任務(wù)添加到任務(wù)隊(duì)列,或者從任務(wù)隊(duì)列中刪除;

已處理的任務(wù)會(huì)被從任務(wù)隊(duì)列中刪除;

線程池的使用者,也就是調(diào)用線程池函數(shù)往任務(wù)隊(duì)列中添加任務(wù)的線程就是生產(chǎn)者線程;

工作的線程(任務(wù)隊(duì)列任務(wù)的消費(fèi)者) ,N個(gè)

線程池中維護(hù)了一定數(shù)量的工作線程,他們的作用是是不停的讀任務(wù)隊(duì)列,從里邊取出任務(wù)并處理

工作的線程相當(dāng)于是任務(wù)隊(duì)列的消費(fèi)者角色;

如果任務(wù)隊(duì)列為空,工作的線程將會(huì)被阻塞 (使用條件變量 / 信號(hào)量阻塞);

如果阻塞之后有了新的任務(wù),由生產(chǎn)者將阻塞解除,工作線程開始工作;

管理者線程(不處理任務(wù)隊(duì)列中的任務(wù)),1個(gè)

它的任務(wù)是周期性的對(duì)任務(wù)隊(duì)列中的任務(wù)數(shù)量以及處于忙狀態(tài)的工作線程個(gè)數(shù)進(jìn)行檢測(cè)

當(dāng)任務(wù)過(guò)多的時(shí)候,可以適當(dāng)?shù)膭?chuàng)建一些新的工作線程;

當(dāng)任務(wù)過(guò)少的時(shí)候,可以適當(dāng)?shù)匿N毀一些工作的線程;

線程池的代碼

1.任務(wù)隊(duì)列的任務(wù)結(jié)構(gòu)體

對(duì)于任務(wù)隊(duì)列:

里面存放的都是函數(shù)指針,該函數(shù)指針指向的就是處理任務(wù)的函數(shù);

同時(shí)還要維護(hù)一個(gè)任務(wù)函數(shù)的形參;

typedef struct Task
{
    void (*function)(void *args); //任務(wù)的函數(shù)指針
    void *args; //任務(wù)函數(shù)的形參
} Task;

2. 線程池的定義

線程池里面最重要的是:

一個(gè)任務(wù)隊(duì)列;
多個(gè)消費(fèi)者線程IDs;
一個(gè)管理者線程ID;
管理線程池的鎖;
管理任務(wù)隊(duì)列是否為滿和空的條件變量;

還有一些其他的輔助成員變量;

struct ThreadPool
{
    Task *taskQ; //任務(wù)隊(duì)列
    /*對(duì)于一個(gè)任務(wù)隊(duì)列:我們需要知道以下信息*/
    int queueCapacity; //隊(duì)列的容量
    int queueSize;     //當(dāng)前任務(wù)的個(gè)數(shù)
    int queueFront;    //隊(duì)頭取任務(wù)
    int queueRear;     //隊(duì)尾放任務(wù)

    /*有了任務(wù)隊(duì)列后,還要有管理任務(wù)隊(duì)列的線程和從任務(wù)隊(duì)列拿任務(wù)的線程*/
    pthread_t managerID; //管理者線程
    /*設(shè)置為指針的目的:工作線程有多個(gè)*/
    pthread_t *threadIDs; //工作線程(也就是消費(fèi)者)

    /*對(duì)于工作線程我們要知道以下這幾個(gè)消息方便管理*/
    int minNum;  //最少的工作線程數(shù)
    int maxNum;  //最多的工作線程數(shù)
    int busyNum; //正在工作的線程數(shù),也就是正在獲取任務(wù)處理的線程
    int liveNum; //存貨的工作線程數(shù)(也就是被喚醒的線程,卻沒有資格去獲取任務(wù)的線程)
    int exitNum; //銷毀的工作線程數(shù)(因?yàn)榭赡芄ぷ骶€程存在,但是卻不工作,我們需要?dú)⒌粢恍┎槐匾木€程)

    /*  由于任務(wù)隊(duì)列為臨界資源:
        工作線程(消費(fèi)者)可能有多個(gè)會(huì)同時(shí)競(jìng)爭(zhēng)該資源
        同時(shí)多生產(chǎn)者線程之間(也就是往任務(wù)隊(duì)列放任務(wù)的線程)也會(huì)競(jìng)爭(zhēng)該資源
        所以我們要保證互斥訪問線程池的任務(wù)隊(duì)列
    */
    pthread_mutex_t mutexpool;    //鎖整個(gè)線程池
    pthread_mutex_t mutexbusyNum; //鎖增在工作線程的數(shù)量
    /*由于任務(wù)隊(duì)列滿,或者為空:
      生產(chǎn)者和消費(fèi)者都需要阻塞
      所以需要條件變量,來(lái)保證
    */
    pthread_cond_t notFull;  //判斷線程池是否為滿
    pthread_cond_t notEmpty; //判斷線程池是否為空

    /*輔助成員主要判斷該線程池是否還在工作*/
    int shutdown; //判斷是否需要銷毀線程池,是0不銷毀,是1銷毀
};

線程池的頭文件聲明

#pragma once
#include < pthread.h >
#include < string.h >
#include < unistd.h >
#include < malloc.h >
#include< stdio.h >

typedef struct ThreadPool ThreadPool; //線程池結(jié)構(gòu)體,這里聲明的原因是結(jié)構(gòu)體定義在線程池源文件中

//創(chuàng)建線程池并初始化
ThreadPool* threadPoolCreate(int min,int max,int queueSize);

//銷毀線程池
int threadPoolDestroy(ThreadPool* pool);

//給線程池添加任務(wù)
void threadPoolAdd(ThreadPool* pool,void(*functions)(void*),void* args);

//獲取線程池工作線程的個(gè)數(shù)
int threadBusyNum (ThreadPool* pool);

//獲取線程池存活的線程的個(gè)數(shù)
int threadLiveNum (ThreadPool* pool);
//工作線程
void* worker (void* args);
//管理線程
void* manager (void* args);

//線程退出函數(shù)
void threadExit(ThreadPool* pool);

線程池的源文件

#include"thread_pool.h"

const int WORK_THREAD_NUMBER = 2; //管理者線程要添加的工作線程個(gè)數(shù),和銷毀的線程個(gè)數(shù)
/*
線程池:首先要有個(gè)任務(wù)隊(duì)列,在C語(yǔ)言中,
任務(wù)隊(duì)列是需要自己定義的,C++中可以直接使用容器queue
*/
//任務(wù)隊(duì)列存放的任務(wù)就是一個(gè)函數(shù)指針
typedef struct Task
{
    void (*function)(void *args);
    void *args;
} Task;

//再搞出一個(gè)線程池

struct ThreadPool
{
    Task *taskQ; //任務(wù)隊(duì)列
    /*對(duì)于一個(gè)任務(wù)隊(duì)列:我們需要知道以下信息*/
    int queueCapacity; //隊(duì)列的容量
    int queueSize;     //當(dāng)前任務(wù)的個(gè)數(shù)
    int queueFront;    //隊(duì)頭取任務(wù)
    int queueRear;     //隊(duì)尾放任務(wù)

    /*有了任務(wù)隊(duì)列后,還要有管理任務(wù)隊(duì)列的線程和從任務(wù)隊(duì)列拿任務(wù)的線程*/
    pthread_t managerID; //管理者線程
    /*設(shè)置為指針的目的:工作線程有多個(gè)*/
    pthread_t *threadIDs; //工作線程(也就是消費(fèi)者)

    /*對(duì)于工作線程我們要知道以下這幾個(gè)消息方便管理*/
    int minNum;  //最少的工作線程數(shù)
    int maxNum;  //最多的工作線程數(shù)
    int busyNum; //正在工作的線程數(shù),也就是正在獲取任務(wù)處理的線程
    int liveNum; //存貨的工作線程數(shù)(也就是被喚醒的線程,卻沒有資格去獲取任務(wù)的線程)
    int exitNum; //銷毀的工作線程數(shù)(因?yàn)榭赡芄ぷ骶€程存在,但是卻不工作,我們需要?dú)⒌粢恍┎槐匾木€程)

    /*  由于任務(wù)隊(duì)列為臨界資源:
        工作線程(消費(fèi)者)可能有多個(gè)會(huì)同時(shí)競(jìng)爭(zhēng)該資源
        同時(shí)多生產(chǎn)者線程之間(也就是往任務(wù)隊(duì)列放任務(wù)的線程)也會(huì)競(jìng)爭(zhēng)該資源
        所以我們要保證互斥訪問線程池的任務(wù)隊(duì)列
    */
    pthread_mutex_t mutexpool;    //鎖整個(gè)線程池
    pthread_mutex_t mutexbusyNum; //鎖增在工作線程的數(shù)量
    /*由于任務(wù)隊(duì)列滿,或者為空:
      生產(chǎn)者和消費(fèi)者都需要阻塞
      所以需要條件變量,來(lái)保證
    */
    pthread_cond_t notFull;  //判斷線程池是否為滿
    pthread_cond_t notEmpty; //判斷線程池是否為空

    /*輔助成員主要判斷該線程池是否還在工作*/
    int shutdown; //判斷是否需要銷毀線程池,是0不銷毀,是1銷毀
};

//************************************************************************************************

/*由于我們的線程池被創(chuàng)建出來(lái)時(shí)候,就必須保證存在的,
    所以我們返回值要設(shè)計(jì)為指針類型,不能是賦值拷貝的形式
    并且如何考慮線程池需要傳入什么參數(shù)初始化呢?
*/
ThreadPool *threadPoolCreate(int min, int max, int queueSize)
{
    //先搞出一個(gè)線程池
    ThreadPool *pool = (ThreadPool *)malloc(sizeof(ThreadPool));
    do // do while(0)的設(shè)計(jì)是為了,假設(shè)開辟線程池,消費(fèi)者線程IDs,任務(wù)隊(duì)列空間失敗,可以直接跳出循環(huán)統(tǒng)一處理釋放空間
    {
        if (pool == NULL)
        {
            printf("malloc threadPool is failedn");
            break;
        }
        //搞出線程池后開始初始化里面的數(shù)據(jù)成員

        //首先先搞出消費(fèi)者線程出來(lái)
        pool- >threadIDs = (pthread_t *)malloc(sizeof(pthread_t) * max);
        if (pool- >threadIDs == NULL)
        {
            printf("malloc threadIDs is failedn");
            /*如果沒有do while(0)的設(shè)計(jì),這里直接返回,那么前面的pool內(nèi)存池的空間沒有被釋放,這就會(huì)內(nèi)存泄漏了*/
            // return NULL;

            //基于上面的注釋考慮,這里設(shè)計(jì)break;退出dowhile(0)然后處理
            break;
        }
        //初始化消費(fèi)者線程ID
        /*這么做的目的是:在管理者線程中可以通過(guò)判斷線程ID是否為0,來(lái)說(shuō)明該消費(fèi)者線程是否被占用*/
        memset(pool- >threadIDs, 0, sizeof(pthread_t) * max);
        //初始化線程池的其他成員屬性
        pool- >minNum = min;
        pool- >maxNum = max;
        pool- >busyNum = 0;
        pool- >liveNum = min;
        pool- >exitNum = 0;
        //初始化鎖和條件變量
        if (pthread_mutex_init(&pool- >mutexpool, NULL) != 0 ||
            pthread_mutex_init(&pool- >mutexpool, NULL) != 0 ||
            pthread_cond_init(&pool- >notEmpty, NULL) != 0 ||
            pthread_cond_init(&pool- >notFull, NULL) != 0)
        {
            perror("mutex or condition failed:");
        }
        //初始化任務(wù)隊(duì)列
        pool- >taskQ = (Task *)malloc(sizeof(Task) * queueSize);
        if (pool- >taskQ == NULL)
        {
            printf("malloc taskQ is failedn");
            break;
        }
        pool- >queueCapacity = queueSize;
        pool- >queueSize = 0;
        pool- >queueFront = 0;
        pool- >queueRear = 0;
        //剛開始不關(guān)閉線程池
        pool- >shutdown = 0;

        //創(chuàng)建管理者線程和消費(fèi)者線程
        pthread_create(&pool- >managerID, NULL, manager, (void *)pool);
        int i = 0;
        for (; i < min; ++i)
        {
            /*消費(fèi)線程需要消費(fèi)的是任務(wù),
            也就是taskQ,而taskQ又是pool的一個(gè)成員屬性
            所以傳參時(shí)候,我們傳入pool就可以獲得taskQ了
            */
            pthread_create(&pool- >threadIDs[i], NULL, worker, (void *)pool);
        }

        //創(chuàng)建成功初始化后,那么就可以把線程池返回去了
        return pool;
    } while (0);
    //如果break出來(lái),那么就是異常的開辟空間失敗,要釋放資源
    if (pool)
        free(pool);
    if (pool && pool- >threadIDs)
        free(pool- >threadIDs);
    if (pool && pool- >taskQ)
        free(pool- >taskQ);

    return NULL;
}

//判斷任務(wù)隊(duì)列是否為空
static int taskQIsEmpty(ThreadPool *pool)
{
    return pool- >queueSize == 0;
}
//判斷線程池是否還工作
static int isShutDown(ThreadPool *pool)
{
    return pool- >shutdown == 1 ? 1 : 0;
}

//消費(fèi)者線程
void *worker(void *args)
{
    ThreadPool *pool = (ThreadPool *)args;
    /*設(shè)計(jì)為死循環(huán)是:消費(fèi)者要不斷從任務(wù)隊(duì)列拿任務(wù)來(lái)處理*/
    while (1)
    {
        pthread_mutex_lock(&pool- >mutexpool);
        //消費(fèi)數(shù)據(jù)之前,要判斷任務(wù)隊(duì)列是否為空,空就需要掛起該線程
        while (taskQIsEmpty(pool) && !isShutDown(pool))
        {
            pthread_cond_wait(&pool- >notEmpty, &pool- >mutexpool);

            //線程被喚醒后,判斷是否需要銷毀該線程,因?yàn)橛芯€程是多余的
            if (pool- >exitNum > 0)
            {
                pool- >exitNum--;
                if (pool- >liveNum > pool- >minNum)
                {
                    pool- >liveNum--;
                    pthread_mutex_unlock(&pool- >mutexpool); //退出線程前解鎖,防止死鎖問題
                    threadExit(pool);
                }
            }
        }
        //還需要判斷線程池是否關(guān)閉了,關(guān)閉了就退出消費(fèi)者線程即可
        if (isShutDown(pool))
        {
            pthread_mutex_unlock(&pool- >mutexpool);
            threadExit(pool);
        }
        //開始消費(fèi)者拿任務(wù)
        Task task;                                              //保存任務(wù)的變量
        task.function = pool- >taskQ[pool- >queueFront].function; //獲取到任務(wù)隊(duì)列的任務(wù),就是一個(gè)函數(shù)指針
        task.args = pool- >taskQ[pool- >queueFront].args;           //獲取任務(wù)隊(duì)列任務(wù)的函數(shù)指針參數(shù)

        //控制任務(wù)隊(duì)列的指針移動(dòng)
        pool- >queueFront++;
        pool- >queueFront %= pool- >queueCapacity;
        pool- >queueSize--;

        pthread_mutex_unlock(&pool- >mutexpool);
         //喚醒生產(chǎn)者
        pthread_cond_signal(&pool- >notFull);

        //拿到任務(wù)后就是處理任務(wù)

        // 1.處理任務(wù)前,先處理busyNum
        pthread_mutex_lock(&pool- >mutexbusyNum);
        pool- >busyNum++;
        pthread_mutex_unlock(&pool- >mutexbusyNum);

        // 2. 這里處理任務(wù)就是調(diào)用任務(wù)函數(shù)
        task.function(task.args);
        //任務(wù)處理完就釋放參數(shù)的空間
        free(task.args);
        task.args = NULL;

        printf("thread %ld ending working ... n", pthread_self());
        // 3.處理完任務(wù)對(duì)其busyNum操作
        pthread_mutex_lock(&pool- >mutexbusyNum);
        pool- >busyNum--;
        pthread_mutex_unlock(&pool- >mutexbusyNum);
    }
}
//管理者線程
/*
主要是管理創(chuàng)建線程和銷毀線程

*/
void *manager(void *args)
{
    ThreadPool *pool = (ThreadPool *)args;
    //只要線程池沒關(guān)閉,那么管理者線程就一直工作
    while (!isShutDown(pool))
    {
        //自己定制的檢查策略:我設(shè)置每個(gè)三秒檢測(cè)
        sleep(3);

        //取出線程池任務(wù)的數(shù)量和消費(fèi)者的工作線程數(shù)量
        pthread_mutex_lock(&pool- >mutexpool);
        int queueSize = pool- >queueSize;
        int liveNum = pool- >liveNum;
        pthread_mutex_unlock(&pool- >mutexpool);

        //獲取忙的消費(fèi)者線程數(shù)量
        pthread_mutex_lock(&pool- >mutexbusyNum);
        int busyNum = pool- >busyNum;
        pthread_mutex_unlock(&pool- >mutexbusyNum);

        //開始管理線程
        // 1.添加消費(fèi)者線程
        /*制定添加規(guī)則(也是自己設(shè)定的)
            任務(wù)的個(gè)數(shù) > 存活的線程個(gè)數(shù) && 存活的線程個(gè)數(shù) < 最大的線程個(gè)數(shù)
        */
        if (queueSize > liveNum && liveNum < pool- >maxNum)
        {
            pthread_mutex_lock(&pool- >mutexpool); //這個(gè)鎖主要是操作了liveNum這個(gè)資源

            int counter = 0; // counter表示要添加的消費(fèi)者線程數(shù)量
            //遍歷 消費(fèi)者線程IDs數(shù)組,看看哪個(gè)位置可以放入新添加的線程
            int i = 0;
            for (; i < pool- >maxNum &&
                   counter < WORK_THREAD_NUMBER &&
                   pool- >liveNum < pool- >maxNum;
                 i++)
            {
                //為0表示消費(fèi)者線程數(shù)組的位置可以放入線程ID
                if (pool- >threadIDs[i] == 0)
                {
                    pthread_create(&pool- >threadIDs[i], NULL, worker, pool);
                    counter++;
                    liveNum++;
                }
            }
            pthread_mutex_unlock(&pool- >mutexpool);
        }

        //由于線程過(guò)多,可能要進(jìn)行銷毀
        // 2. 銷毀消費(fèi)者線程
        /*
             銷毀線程的策略:
              存活的線程數(shù)量 >忙的線程數(shù)量*2 && 存活線程數(shù)量 >最小線程數(shù)量
        */
        if (liveNum > busyNum * 2 && liveNum > pool- >minNum)
        {
            pthread_mutex_lock(&pool- >mutexpool);
            pool- >exitNum = WORK_THREAD_NUMBER;
            pthread_mutex_unlock(&pool- >mutexpool);

            //讓工作者線程去自殺
            /*如何讓他自殺呢?
              由于線程池有多余的消費(fèi)者線程不工作
              我們可以通過(guò)喚醒消費(fèi)者線程,讓他去自己消亡
            */
            int i = 0;
            for (; i < WORK_THREAD_NUMBER; i++)
            {
                pthread_cond_signal(&pool- >notEmpty);
            }
        }
    }
}

//線程退出函數(shù)
void threadExit(ThreadPool *pool)
{
    pthread_t tid = pthread_self();
    int i = 0;
    //遍歷消費(fèi)者線程的線程個(gè)數(shù),找到退出線程的ID
    for (; i < pool- >maxNum; i++)
    {
        if (pool- >threadIDs[i] == tid)
        {
            pool- >threadIDs[i] = 0;
            printf("threadExit()消費(fèi)者線程 :%ld exit...n", tid);
            break;
        }
    }
    pthread_exit(NULL);
}
static int taskQisFull(ThreadPool* pool)
{
    return pool- >queueCapacity == pool- >queueSize;
}
//給線程池添加任務(wù)
void threadPoolAdd(ThreadPool* pool,void(*function)(void*),void* args)
{
    pthread_mutex_lock(&pool- >mutexpool); 
    //生產(chǎn)者線程:任務(wù)隊(duì)列滿要阻塞自己
    while(taskQisFull(pool) && !isShutDown(pool))
    {
        pthread_cond_wait(&pool- >notFull,&pool- >mutexpool);
    }
    if(isShutDown(pool))
    {
        pthread_mutex_unlock(&pool- >mutexpool);
        return ;
    }

    //添加任務(wù)
    pool- >taskQ[pool- >queueRear].function = function;
    pool- >taskQ[pool- >queueRear].args = args;

    pool- >queueRear++;
    pool- >queueRear %= pool- >queueCapacity;
    pool- >queueSize++;

    pthread_mutex_unlock(&pool- >mutexpool); 
    //喚醒work線程:
    pthread_cond_signal(&pool- >notEmpty);
}

//獲取線程池工作線程的個(gè)數(shù)
int threadBusyNum (ThreadPool* pool)
{
    pthread_mutex_lock(&pool- >mutexbusyNum);
    int busyNum = pool- >busyNum;
    pthread_mutex_unlock(&pool- >mutexbusyNum);
    return busyNum;

}

//獲取線程池存活的線程的個(gè)數(shù)
int threadLiveNum (ThreadPool* pool)
{
    pthread_mutex_lock(&pool- >mutexpool);
    int liveNum = pool- >liveNum;
    pthread_mutex_unlock(&pool- >mutexpool);
    return liveNum;
}

//銷毀線程池
int threadPoolDestroy(ThreadPool* pool)
{
    if(pool == NULL)
    {
        return -1;
    }
    //關(guān)閉線程池
    pool- >shutdown = 1;


    //喚醒阻塞的消費(fèi)者
    //存活的線程有多少就喚醒多少
    int i = 0;
    for(;i < pool- >liveNum;i++)
    {
        pthread_cond_signal(&pool- >notEmpty);
    }
    pthread_join(pool- >managerID,NULL);

    //釋放資源
    if(pool- >taskQ )
        free(pool- >taskQ);
    if(pool- >threadIDs)
        free(pool- >threadIDs);

    pthread_mutex_destroy(&pool- >mutexbusyNum);
    pthread_mutex_destroy(&pool- >mutexpool);
    pthread_cond_destroy(&pool- >notFull);
    pthread_cond_destroy(&pool- >notEmpty);

    free(pool);
    pool = NULL;

    return 0;

}

線程池測(cè)試代碼

#include"thread_pool.h"

//任務(wù)處理函數(shù)
void taskFunction(void* args)
{
    int num = *(int*)args;
    printf("thread: %ld is working,number:%dn",pthread_self(),num);
    sleep(1);
}
int main()
{
    //創(chuàng)建線程池
    ThreadPool* pool = threadPoolCreate(3,10,20);

    //往線程池里面放任務(wù)
    int i = 0;
    for(; i< 20; i++)
    {
        int *num = (int*)malloc(sizeof(int));
        *num = i+1;
        threadPoolAdd(pool,taskFunction,(void*)num);
    }

    sleep(10);

    threadPoolDestroy(pool);
    return 0;
}

測(cè)試線程池結(jié)果

由于我的測(cè)試代碼:只搞了3個(gè)工作線程(消費(fèi)者線程),任務(wù)隊(duì)列大小為20,并且搞了20個(gè)任務(wù)隊(duì)列進(jìn)去,所以線程池就會(huì)有三個(gè)工作線程在搶奪任務(wù)工作!

圖片

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • 程序
    +關(guān)注

    關(guān)注

    117

    文章

    3785

    瀏覽量

    81003
  • 容器
    +關(guān)注

    關(guān)注

    0

    文章

    495

    瀏覽量

    22060
  • 線程池
    +關(guān)注

    關(guān)注

    0

    文章

    57

    瀏覽量

    6844
  • 數(shù)組
    +關(guān)注

    關(guān)注

    1

    文章

    417

    瀏覽量

    25939
收藏 人收藏

    評(píng)論

    相關(guān)推薦

    跨平臺(tái)的線程組件--TP組件

    /銷毀代價(jià)是很高的。那么我們要怎么去設(shè)計(jì)多線程編程呢???答案:對(duì)于長(zhǎng)駐的線程,我們可以創(chuàng)建獨(dú)立的線程去執(zhí)行。但是非長(zhǎng)駐的線程,我們可以通過(guò)線程
    的頭像 發(fā)表于 04-06 15:39 ?871次閱讀

    Java中的線程包括哪些

    線程是用來(lái)統(tǒng)一管理線程的,在 Java 中創(chuàng)建和銷毀線程都是一件消耗資源的事情,線程可以重復(fù)
    的頭像 發(fā)表于 10-11 15:33 ?809次閱讀
    Java中的<b class='flag-5'>線程</b><b class='flag-5'>池</b>包括哪些

    線程是如何實(shí)現(xiàn)的

    線程概念是什么?線程是如何實(shí)現(xiàn)的?
    發(fā)表于 02-28 06:20

    基于線程技術(shù)集群接入點(diǎn)的應(yīng)用研究

    本文在深入研究高級(jí)線程技術(shù)的基礎(chǔ)上,分析、研究了固定線程數(shù)目的線程線程數(shù)目動(dòng)態(tài)變化的
    發(fā)表于 01-22 14:21 ?5次下載

    python創(chuàng)建線程的兩種方法

    在使用多線程處理任務(wù)時(shí)也不是線程越多越好,由于在切換線程的時(shí)候,需要切換上下文環(huán)境,依然會(huì)造成cpu的大量開銷。為解決這個(gè)問題,線程
    的頭像 發(fā)表于 03-16 16:15 ?5975次閱讀

    基于Nacos的簡(jiǎn)單動(dòng)態(tài)化線程實(shí)現(xiàn)

    本文以Nacos作為服務(wù)配置中心,以修改線程核心線程數(shù)、最大線程數(shù)為例,實(shí)現(xiàn)一個(gè)簡(jiǎn)單的動(dòng)態(tài)化線程
    發(fā)表于 01-06 14:14 ?862次閱讀

    線程線程

    線程通常用于服務(wù)器應(yīng)用程序。 每個(gè)傳入請(qǐng)求都將分配給線程池中的一個(gè)線程,因此可以異步處理請(qǐng)求,而不會(huì)占用主線程,也不會(huì)延遲后續(xù)請(qǐng)求的處理
    的頭像 發(fā)表于 02-28 09:53 ?786次閱讀
    多<b class='flag-5'>線程</b>之<b class='flag-5'>線程</b><b class='flag-5'>池</b>

    Java線程核心原理

    看過(guò)Java線程源碼的小伙伴都知道,在Java線程池中最核心的類就是ThreadPoolExecutor,
    的頭像 發(fā)表于 04-21 10:24 ?850次閱讀

    細(xì)數(shù)線程的10個(gè)坑

    JDK開發(fā)者提供了線程的實(shí)現(xiàn)類,我們基于Executors組件,就可以快速創(chuàng)建一個(gè)線程
    的頭像 發(fā)表于 06-16 10:11 ?722次閱讀
    細(xì)數(shù)<b class='flag-5'>線程</b><b class='flag-5'>池</b>的10個(gè)坑

    線程線程怎么釋放

    線程分組看,pool名開頭線程占616條,而且waiting狀態(tài)也是616條,這個(gè)點(diǎn)就非常可疑了,我斷定就是這個(gè)pool開頭線程導(dǎo)致的問題。我們先排查為何這個(gè)
    發(fā)表于 07-31 10:49 ?2274次閱讀
    <b class='flag-5'>線程</b><b class='flag-5'>池</b>的<b class='flag-5'>線程</b>怎么釋放

    Spring 的線程應(yīng)用

    我們?cè)谌粘i_發(fā)中,經(jīng)常跟多線程打交道,Spring 為我們提供了一個(gè)線程方便我們開發(fā),它就是 ThreadPoolTaskExecutor ,接下來(lái)我們就來(lái)聊聊 Spring 的線程
    的頭像 發(fā)表于 10-13 10:47 ?620次閱讀
    Spring 的<b class='flag-5'>線程</b><b class='flag-5'>池</b>應(yīng)用

    線程基本概念與原理

    一、線程基本概念與原理 1.1 線程概念及優(yōu)勢(shì) C++
    的頭像 發(fā)表于 11-10 10:24 ?528次閱讀

    線程七大核心參數(shù)執(zhí)行順序

    線程是一種用于管理和調(diào)度線程執(zhí)行的技術(shù),通過(guò)將任務(wù)分配到線程池中的線程進(jìn)行處理,可以有效地控制并發(fā)線程
    的頭像 發(fā)表于 12-04 16:45 ?1052次閱讀

    線程的創(chuàng)建方式有幾種

    線程是一種用于管理和調(diào)度線程的技術(shù),能夠有效地提高系統(tǒng)的性能和資源利用率。它通過(guò)預(yù)先創(chuàng)建一組線程并維護(hù)一個(gè)工作隊(duì)列,將任務(wù)提交給線程
    的頭像 發(fā)表于 12-04 16:52 ?854次閱讀

    什么是動(dòng)態(tài)線程?動(dòng)態(tài)線程的簡(jiǎn)單實(shí)現(xiàn)思路

    因此,動(dòng)態(tài)可監(jiān)控線程一種針對(duì)以上痛點(diǎn)開發(fā)的線程管理工具。主要可實(shí)現(xiàn)功能有:提供對(duì) Spring 應(yīng)用內(nèi)線程
    的頭像 發(fā)表于 02-28 10:42 ?638次閱讀
    RM新时代网站-首页