Sorry, your browser cannot access this site
This page requires browser support (enable) JavaScript
Learn more >
#ifndef _THREADPOOL_H
#define _THREADPOOL_H
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <queue>
#if TIME_KILL_ENABLE == 1
extern bool is_time_out;
#endif // #if TIME_KILL_ENABLE == 1

/*
1. worker() routine keeps fetch task from taskq until stop
2. TaskType requirement: default constructor; move constructor; move assignment operator
3. void TaskFunc(void* pointer_to_threadpool, void* pointer_to_task)
*/
template <typename TaskType>
class ThreadPool
{
    std::vector<std::thread> workers;

    bool stop; // main_thread->worker_thread: notify worker thread to out of endless loop
    std::queue<TaskType> taskq;
    std::mutex taskq_mut;
    std::condition_variable taskq_nempty_cond; // notify queue not empty

    TaskFuncType TaskFunc;

private:
    // endless loop to fetch task from taskq, if fetch one then execute the task
    void worker()
    {
        while (1)
        {
            TaskType tsk;
            {
                std::unique_lock<std::mutex> lock(taskq_mut); // lock taskq_mut
                // wait for taskq be nonempty or stop(will sleep and release lock when waiting)
                taskq_nempty_cond.wait(lock,
                                       [this]()
                                       { return stop || !taskq.empty()
#if TIME_KILL_ENABLE == 1
                                                || is_time_out
#endif
                                             ; });

                if ((stop && taskq.empty())
#if TIME_KILL_ENABLE == 1
                    || is_time_out
#endif
                    ) // worker stop here
                    return;
                tsk = std::move(taskq.front());
                taskq.pop();
                // unlock taskq_mut
            }

            TaskFunc(this, &tsk);
        }
    }

public:
    // worker_thread_num: besides main_thread
    ThreadPool(ThreadNumType worker_thread_num, std::queue<TaskType> &&taskq, TaskFuncType TaskFunc) : stop(0),
                                                                                                       taskq(std::move(taskq)),
                                                                                                       TaskFunc(TaskFunc),
    {
        for (ThreadNumType i = 0; i < worker_thread_num; ++i)
            workers.emplace_back(&ThreadPool::worker, this);
    }

    ~ThreadPool()
    {
        {
            std::lock_guard<std::mutex> lock(taskq_mut);
            stop = 1;
        }
        taskq_nempty_cond.notify_all();
        // then all worker thread will wake up and find stop==1 then end their routine(return from func worker, after finishing work queue)
        // wait for all workers and output thread ends
        for (auto &th : workers)
            th.join();
    }

    void push_taskq(TaskType tsk)
    {
        std::lock_guard<std::mutex> lock(taskq_mut); // lock taskq_mut
        taskq.emplace(std::move(tsk));
        taskq_nempty_cond.notify_one();
        // unlock taskq_mut
    }
};

#endif //_THREADPOOL_H

评论