LESSON RECORDING: How to implement a (simple) thread pool.
PART 2: Advanced Thread Pool.
I know the topic of thread pools has been beaten to death on the internet, nevertheless I wanted to present to you my implementation which uses only standard C++ components 🙂
I will be using queue and semaphore classes discussed in my earlier posts.
Below you will find a simple thread pool implementation which can be parametrized by the number of worker threads and the blocking queue depth of work items. Each thread waits on a
blocking_queue::pop() until a work item shows up. The threads pick up work items randomly off of the queue, execute them, then go back to
blocking_queue::pop(). Destruction and cleanup of threads is done with
nullptr sentinel pushed onto the queue. If a sentinel is popped off the queue the thread will push it back and break out of its work loop. This way all threads are waited on and allowed to finish all unprocessed work items during destruction of a
pool instance.
Moreover, a work item can be any callable entity: lambda, functor, or a function pointer. Work item can accept any number of parameters thanks to template parameter pack of
pool::enqueue_work().
UPDATE:
Thank you reddit user sumo952 for bringing to my attention the progschj/ThreadPool. I have updated my implementation to support futures and the ability to retrieve work item’s result.
Usage example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
#include <iostream> #include <mutex> #include <cstdlib> #include "pool.h" using namespace std; mutex cout_lock; #define trace(x) { scoped_lock<mutex> lock(cout_lock); cout << x << endl; } const int COUNT = thread::hardware_concurrency(); const int WORK = 10'000'000; int main(int argc, char** argv) { srand((unsigned int)time(NULL)); thread_pool pool; auto result = pool.enqueue_task([](int i) { return i; }, 0xFF); result.get(); for(int i = 1; i <= COUNT; ++i) pool.enqueue_work([](int workerNumber) { int workOutput = 0; int work = WORK + (rand() % (WORK)); trace("work item " << workerNumber << " starting " << work << " iterations..."); for(int w = 0; w < work; ++w) workOutput += rand(); trace("work item " << workerNumber << " finished"); }, i); return 1; } |
work item 1 starting 170521507 iterations…
Program output.
work item 2 starting 141859716 iterations…
work item 2 finished
work item 3 starting 189442810 iterations…
work item 1 finished
work item 4 starting 125609749 iterations…
work item 4 finished
work item 3 finished
Program ended with exit code: 1
pool.h:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
#pragma once #include <vector> #include <thread> #include <memory> #include <future> #include <functional> #include <type_traits> #include <cassert> #include "queue.h" class thread_pool { public: thread_pool( unsigned int queueDepth = std::thread::hardware_concurrency(), size_t threads = std::thread::hardware_concurrency()) : m_workQueue(queueDepth) { assert(queueDepth != 0); assert(threads != 0); for(size_t i = 0; i < threads; ++i) m_threads.emplace_back(std::thread([this]() { while(true) { auto workItem = m_workQueue.pop(); if(workItem == nullptr) { m_workQueue.push(nullptr); break; } workItem(); } })); } ~thread_pool() noexcept { m_workQueue.push(nullptr); for(auto& thread : m_threads) thread.join(); } using Proc = std::function<void(void)>; template<typename F, typename... Args> void enqueue_work(F&& f, Args&&... args) noexcept(std::is_nothrow_invocable<decltype(&blocking_queue<Proc>::push<Proc&&>)>::value) { m_workQueue.push([=]() { f(args...); }); } template<typename F, typename... Args> auto enqueue_task(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> { using return_type = typename std::result_of<F(Args...)>::type; auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...)); std::future<return_type> res = task->get_future(); m_workQueue.push([task](){ (*task)(); }); return res; } private: using ThreadPool = std::vector<std::thread>; ThreadPool m_threads; blocking_queue<Proc> m_workQueue; }; |
Why does task in enqueue_task have to be std::make_shared?
because copying of std::packaged_task is not allowed, yet “task” needs to be copied into the std::function<void(void)> closure that’s pushed onto the queue.
HTH 🙂
How do we handle the following scenario? When the threadpool is stopped when the workQueue is not empty. The main thread blocks on future get() forever.
std::future result1, result2, result3;
{
thread_pool pool(2,100);
} //pool goes out of scope and itsdestructor is called.
cout << result3.get() // gets blocked for ever as the worker threads are stopped and the task is not completed
the question is how should the pool handle outstanding tasks when it’s being destroyed. I recently changed the implementation to disregard pending tasks. pool’s destructor calls m_queue.done(); which causes the pop’s in the worker thread(s) to return false and exit the thread. you could instead change it to m_queue.push(nullptr); then in the worker thread change “if(!m_queue.pop(f)) break;” to “m_queue.pop(f); if(!f) break;” this will drain the queue before destroying the pool. email me at [email protected] if you have further questions…
the latest pool code is here: https://github.com/mvorbrodt/blog/blob/master/src/pool.hpp