#ifndef _THREADPOOL_H
#define _THREADPOOL_H
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <queue>
#if TIME_KILL_ENABLE == 1
extern bool is_time_out;
#endif // #if TIME_KILL_ENABLE == 1
/*
1. worker() routine keeps fetch task from taskq until stop
2. TaskType requirement: default constructor; move constructor; move assignment operator
3. void TaskFunc(void* pointer_to_threadpool, void* pointer_to_task)
*/
template <typename TaskType>
class ThreadPool
{
std::vector<std::thread> workers;
bool stop; // main_thread->worker_thread: notify worker thread to out of endless loop
std::queue<TaskType> taskq;
std::mutex taskq_mut;
std::condition_variable taskq_nempty_cond; // notify queue not empty
TaskFuncType TaskFunc;
private:
// endless loop to fetch task from taskq, if fetch one then execute the task
void worker()
{
while (1)
{
TaskType tsk;
{
std::unique_lock<std::mutex> lock(taskq_mut); // lock taskq_mut
// wait for taskq be nonempty or stop(will sleep and release lock when waiting)
taskq_nempty_cond.wait(lock,
[this]()
{ return stop || !taskq.empty()
#if TIME_KILL_ENABLE == 1
|| is_time_out
#endif
; });
if ((stop && taskq.empty())
#if TIME_KILL_ENABLE == 1
|| is_time_out
#endif
) // worker stop here
return;
tsk = std::move(taskq.front());
taskq.pop();
// unlock taskq_mut
}
TaskFunc(this, &tsk);
}
}
public:
// worker_thread_num: besides main_thread
ThreadPool(ThreadNumType worker_thread_num, std::queue<TaskType> &&taskq, TaskFuncType TaskFunc) : stop(0),
taskq(std::move(taskq)),
TaskFunc(TaskFunc),
{
for (ThreadNumType i = 0; i < worker_thread_num; ++i)
workers.emplace_back(&ThreadPool::worker, this);
}
~ThreadPool()
{
{
std::lock_guard<std::mutex> lock(taskq_mut);
stop = 1;
}
taskq_nempty_cond.notify_all();
// then all worker thread will wake up and find stop==1 then end their routine(return from func worker, after finishing work queue)
// wait for all workers and output thread ends
for (auto &th : workers)
th.join();
}
void push_taskq(TaskType tsk)
{
std::lock_guard<std::mutex> lock(taskq_mut); // lock taskq_mut
taskq.emplace(std::move(tsk));
taskq_nempty_cond.notify_one();
// unlock taskq_mut
}
};
#endif //_THREADPOOL_H