As I learn about atomics and memory model I decided to take a stab at rewriting my blocking queue using atomic operations and eliminate the mutex around the critical section of code responsible for pushing and popping elements, effectively creating a fast path through the queue if no blocking is taking place.
Let’s jump straight into the code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
template<typename Q = T> typename std::enable_if< std::is_copy_constructible<Q>::value and std::is_nothrow_copy_constructible<Q>::value, void>::type push(const T& item) noexcept { m_openSlots.wait(); auto pushIndex = m_pushIndex.fetch_add(1); new (m_data + (pushIndex % m_size)) T (item); ++m_count; auto expected = m_pushIndex.load(); while(!m_pushIndex.compare_exchange_strong(expected, m_pushIndex % m_size)) expected = m_pushIndex.load(); m_fullSlots.post(); } |
Lines 1-4 are basically a template concept which specifies that this method will only be present if the type is no-throw copy constructible.
Line 7 is the same semaphore decrement and possible blocking if the queue is full. The fast path of this semaphore implementation uses only atomic operations, so if it doesn’t block it will not engage a mutex (
fast_semaphore code available on GitHub).
Line 9 is where the magic starts. We atomically increment the
m_pushIndex while fetching its previous value into a temporary
pushIndex. From now on we work with the temporary.
Line 10 is where we insert the element by copy constructing it in the right open slot.
Line 11 is book-keeping needed during the destruction of the queue.
Line 13-15 is where we have to modulo the
m_pushIndex with
m_size, so it never overflows. It check, in a loop, if it has changed, if it has it loads it back into
expected and checks again until it hasn’t changed in which case it atomically swaps
m_pushIndex with
m_pushIndex % m_size.
Line 17 signals to other blocked threads, if there are any, that the queue now has an element available for popping.
Other methods of the queue work in a very similar way so I will not be describing them in detail here. The only crux of this implementation is that it only works for no-throw copyable and movable types; so declare your constructors and assignment operators with noexcept if you want to use them with this queue 🙂
Complete listing:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
#pragma once #include <atomic> #include <utility> #include <type_traits> #include <cassert> #include "semaphore.h" template<typename T> class fast_blocking_queue { public: explicit fast_blocking_queue(unsigned int size) : m_size(size), m_pushIndex(0), m_popIndex(0), m_count(0), m_data((T*)operator new(size * sizeof(T))), m_openSlots(size), m_fullSlots(0) { assert(size != 0); } ~fast_blocking_queue() noexcept { while (m_count--) { m_data[m_popIndex].~T(); m_popIndex = ++m_popIndex % m_size; } operator delete(m_data); } template<typename Q = T> typename std::enable_if< std::is_copy_constructible<Q>::value and std::is_nothrow_copy_constructible<Q>::value, void>::type push(const T& item) noexcept { m_openSlots.wait(); auto pushIndex = m_pushIndex.fetch_add(1); new (m_data + (pushIndex % m_size)) T (item); ++m_count; auto expected = m_pushIndex.load(); while(!m_pushIndex.compare_exchange_strong(expected, m_pushIndex % m_size)) expected = m_pushIndex.load(); m_fullSlots.post(); } template<typename Q = T> typename std::enable_if< std::is_move_constructible<Q>::value and std::is_nothrow_move_constructible<Q>::value, void>::type push(T&& item) noexcept { m_openSlots.wait(); auto pushIndex = m_pushIndex.fetch_add(1); new (m_data + (pushIndex % m_size)) T (std::move(item)); ++m_count; auto expected = m_pushIndex.load(); while(!m_pushIndex.compare_exchange_strong(expected, m_pushIndex % m_size)) expected = m_pushIndex.load(); m_fullSlots.post(); } template<typename Q = T> typename std::enable_if< not std::is_move_assignable<Q>::value and std::is_nothrow_copy_assignable<Q>::value, void>::type pop(T& item) noexcept { m_fullSlots.wait(); auto popIndex = m_popIndex.fetch_add(1); item = m_data[popIndex % m_size]; m_data[popIndex % m_size].~T(); --m_count; auto expected = m_popIndex.load(); while(!m_popIndex.compare_exchange_strong(expected, m_popIndex % m_size)) expected = m_popIndex.load(); m_openSlots.post(); } template<typename Q = T> typename std::enable_if< std::is_move_assignable<Q>::value and std::is_nothrow_move_assignable<Q>::value, void>::type pop(T& item) noexcept { m_fullSlots.wait(); auto popIndex = m_popIndex.fetch_add(1); item = std::move(m_data[popIndex % m_size]); m_data[popIndex % m_size].~T(); --m_count; auto expected = m_popIndex.load(); while(!m_popIndex.compare_exchange_strong(expected, m_popIndex % m_size)) expected = m_popIndex.load(); m_openSlots.post(); } T pop() noexcept(std::is_nothrow_invocable_r<void, decltype(&fast_blocking_queue<T>::pop<T>), T&>::value) { T item; pop(item); return item; } private: const unsigned int m_size; std::atomic_uint m_pushIndex; std::atomic_uint m_popIndex; std::atomic_uint m_count; T* m_data; fast_semaphore m_openSlots; fast_semaphore m_fullSlots; }; |
Super nice 🙂
thanks 🙂 it was my first attempt at lock free programming 😉