LESSON RECORDING: How to implement a (simple) thread pool.

PART 2: Advanced Thread Pool.

I know the topic of thread pools has been beaten to death on the internet, nevertheless I wanted to present to you my implementation which uses only standard C++ components 🙂

I will be using queue and semaphore classes discussed in my earlier posts.

Below you will find a simple thread pool implementation which can be parametrized by the number of worker threads and the blocking queue depth of work items. Each thread waits on a blocking_queue::pop() until a work item shows up. The threads pick up work items randomly off of the queue, execute them, then go back to blocking_queue::pop(). Destruction and cleanup of threads is done with nullptr sentinel pushed onto the queue. If a sentinel is popped off the queue the thread will push it back and break out of its work loop. This way all threads are waited on and allowed to finish all unprocessed work items during destruction of a pool instance.
Moreover, a work item can be any callable entity: lambda, functor, or a function pointer. Work item can accept any number of parameters thanks to template parameter pack of pool::enqueue_work().

UPDATE:

Thank you reddit user sumo952 for bringing to my attention the progschj/ThreadPool. I have updated my implementation to support futures and the ability to retrieve work item’s result.

Usage example:

work item 1 starting 170521507 iterations…
work item 2 starting 141859716 iterations…
work item 2 finished
work item 3 starting 189442810 iterations…
work item 1 finished
work item 4 starting 125609749 iterations…
work item 4 finished
work item 3 finished
Program ended with exit code: 1

Program output.

pool.h:

6 Replies to “Simple thread pool”

    1. because copying of std::packaged_task is not allowed, yet “task” needs to be copied into the std::function<void(void)> closure that’s pushed onto the queue.

      HTH 🙂

  1. How do we handle the following scenario? When the threadpool is stopped when the workQueue is not empty. The main thread blocks on future get() forever.

    std::future result1, result2, result3;
    {
    thread_pool pool(2,100);

    } //pool goes out of scope and itsdestructor is called.

    cout << result3.get() // gets blocked for ever as the worker threads are stopped and the task is not completed

    1. the question is how should the pool handle outstanding tasks when it’s being destroyed. I recently changed the implementation to disregard pending tasks. pool’s destructor calls m_queue.done(); which causes the pop’s in the worker thread(s) to return false and exit the thread. you could instead change it to m_queue.push(nullptr); then in the worker thread change “if(!m_queue.pop(f)) break;” to “m_queue.pop(f); if(!f) break;” this will drain the queue before destroying the pool. email me at [email protected] if you have further questions…

      the latest pool code is here: https://github.com/mvorbrodt/blog/blob/master/src/pool.hpp

Leave a Reply