Below is my implementation of the thread pool described in this talk and a benchmark comparing it against my simple thread pool implementation. The advanced pool is 15x faster at scheduling and dispatching short random length work items on my 2018 MacBook Pro with i5 CPU and 4 logical cores. It uses a queue per worker thread and a work stealing dispatcher. It tries to enqueue the work items onto a queue that is not currently locked by a dispatch thread. It also tries to steal work from other unblocked queues. As always the complete implementation is available at GitHub.
Benchmark program:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
#include <iostream> #include <chrono> #include <cstdlib> #include "pool.h" using namespace std; using namespace chrono; const unsigned int COUNT = 10'000'000; const unsigned int REPS = 10; int main() { srand(0); auto start = high_resolution_clock::now(); { simple_thread_pool tp; for(int i = 0; i < COUNT; ++i) tp.enqueue_work([i]() { int x; int reps = REPS + (REPS * (rand() % 5)); for(int n = 0; n < reps; ++n) x = i + rand(); }); } auto end = high_resolution_clock::now(); auto duration = duration_cast<milliseconds>(end - start); cout << "simple_thread_pool duration = " << duration.count() / 1000.f << " s" << endl; srand(0); start = high_resolution_clock::now(); { thread_pool tp; for(int i = 0; i < COUNT; ++i) tp.enqueue_work([i]() { int x; int reps = REPS + (REPS * (rand() % 5)); for(int n = 0; n < reps; ++n) x = i + rand(); }); } end = high_resolution_clock::now(); duration = duration_cast<milliseconds>(end - start); cout << "thread_pool duration = " << duration.count() / 1000.f << " s" << endl; } |
* Apple CLANG -Ofast -march=native -std=c++17 -lc++
Program output.
simple_thread_pool duration = 30.337 s
thread_pool duration = 1.625 s
* LLVM -Ofast -march=native -std=c++17 -lc++
simple_thread_pool duration = 25.785 s
thread_pool duration = 1.615 s
* G++ -Ofast -march=native -std=c++17 -lstdc++ *
simple_thread_pool duration = 26.28 s
thread_pool duration = 1.614 s
thread_pool class:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
class thread_pool { public: thread_pool(unsigned int threads = std::thread::hardware_concurrency()) : m_queues(threads), m_count(threads) { assert(threads != 0); auto worker = [&](unsigned int i) { while(true) { Proc f; for(unsigned int n = 0; n < m_count; ++n) if(m_queues[(i + n) % m_count].try_pop(f)) break; if(!f && !m_queues[i].pop(f)) break; f(); } }; for(unsigned int i = 0; i < threads; ++i) m_threads.emplace_back(worker, i); } ~thread_pool() noexcept { for(auto& queue : m_queues) queue.done(); for(auto& thread : m_threads) thread.join(); } template<typename F, typename... Args> void enqueue_work(F&& f, Args&&... args) { auto work = [f,args...]() { f(args...); }; unsigned int i = m_index++; for(unsigned int n = 0; n < m_count * K; ++n) if(m_queues[(i + n) % m_count].try_push(work)) return; m_queues[i % m_count].push(work); } template<typename F, typename... Args> auto enqueue_task(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> { using return_type = typename std::result_of<F(Args...)>::type; auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...)); std::future<return_type> res = task->get_future(); auto work = [task](){ (*task)(); }; unsigned int i = m_index++; for(unsigned int n = 0; n < m_count * K; ++n) if(m_queues[(i + n) % m_count].try_push(work)) return res; m_queues[i % m_count].push(work); return res; } private: using Proc = std::function<void(void)>; using Queues = std::vector<simple_blocking_queue<Proc>>; Queues m_queues; using Threads = std::vector<std::thread>; Threads m_threads; const unsigned int m_count; std::atomic_uint m_index = 0; inline static const unsigned int K = 3; }; |