As I learn about atomics and memory model I decided to take a stab at rewriting my blocking queue using atomic operations and eliminate the mutex around the critical section of code responsible for pushing and popping elements, effectively creating a fast path through the queue if no blocking is taking place.
Let’s jump straight into the code:
templatetypename std::enable_if< std::is_copy_constructible ::value and std::is_nothrow_copy_constructible::value, void>::type push(const T& item) noexcept { m_openSlots.wait(); auto pushIndex = m_pushIndex.fetch_add(1); new (m_data + (pushIndex % m_size)) T (item); ++m_count; auto expected = m_pushIndex.load(); while(!m_pushIndex.compare_exchange_strong(expected, m_pushIndex % m_size)) expected = m_pushIndex.load(); m_fullSlots.post(); }
Lines 1-4 are basically a template concept which specifies that this method will only be present if the type is no-throw copy constructible.
Line 7 is the same semaphore decrement and possible blocking if the queue is full. The fast path of this semaphore implementation uses only atomic operations, so if it doesn’t block it will not engage a mutex (
fast_semaphorecode available on GitHub).
Line 9 is where the magic starts. We atomically increment the
m_pushIndexwhile fetching its previous value into a temporary
pushIndex. From now on we work with the temporary.
Line 10 is where we insert the element by copy constructing it in the right open slot.
Line 11 is book-keeping needed during the destruction of the queue.
Line 13-15 is where we have to modulo the
m_pushIndexwith
m_size, so it never overflows. It check, in a loop, if it has changed, if it has it loads it back into
expectedand checks again until it hasn’t changed in which case it atomically swaps
m_pushIndexwith
m_pushIndex % m_size.
Line 17 signals to other blocked threads, if there are any, that the queue now has an element available for popping.
Other methods of the queue work in a very similar way so I will not be describing them in detail here. The only crux of this implementation is that it only works for no-throw copyable and movable types; so declare your constructors and assignment operators with
noexceptif you want to use them with this queue 🙂
Complete listing:
#pragma once #include#include #include #include #include "semaphore.h" template class fast_blocking_queue { public: explicit fast_blocking_queue(unsigned int size) : m_size(size), m_pushIndex(0), m_popIndex(0), m_count(0), m_data((T*)operator new(size * sizeof(T))), m_openSlots(size), m_fullSlots(0) { assert(size != 0); } ~fast_blocking_queue() noexcept { while (m_count--) { m_data[m_popIndex].~T(); m_popIndex = ++m_popIndex % m_size; } operator delete(m_data); } template typename std::enable_if< std::is_copy_constructible ::value and std::is_nothrow_copy_constructible::value, void>::type push(const T& item) noexcept { m_openSlots.wait(); auto pushIndex = m_pushIndex.fetch_add(1); new (m_data + (pushIndex % m_size)) T (item); ++m_count; auto expected = m_pushIndex.load(); while(!m_pushIndex.compare_exchange_strong(expected, m_pushIndex % m_size)) expected = m_pushIndex.load(); m_fullSlots.post(); } templatetypename std::enable_if< std::is_move_constructible ::value and std::is_nothrow_move_constructible::value, void>::type push(T&& item) noexcept { m_openSlots.wait(); auto pushIndex = m_pushIndex.fetch_add(1); new (m_data + (pushIndex % m_size)) T (std::move(item)); ++m_count; auto expected = m_pushIndex.load(); while(!m_pushIndex.compare_exchange_strong(expected, m_pushIndex % m_size)) expected = m_pushIndex.load(); m_fullSlots.post(); } templatetypename std::enable_if< not std::is_move_assignable ::value and std::is_nothrow_copy_assignable::value, void>::type pop(T& item) noexcept { m_fullSlots.wait(); auto popIndex = m_popIndex.fetch_add(1); item = m_data[popIndex % m_size]; m_data[popIndex % m_size].~T(); --m_count; auto expected = m_popIndex.load(); while(!m_popIndex.compare_exchange_strong(expected, m_popIndex % m_size)) expected = m_popIndex.load(); m_openSlots.post(); } templatetypename std::enable_if< std::is_move_assignable ::value and std::is_nothrow_move_assignable::value, void>::type pop(T& item) noexcept { m_fullSlots.wait(); auto popIndex = m_popIndex.fetch_add(1); item = std::move(m_data[popIndex % m_size]); m_data[popIndex % m_size].~T(); --m_count; auto expected = m_popIndex.load(); while(!m_popIndex.compare_exchange_strong(expected, m_popIndex % m_size)) expected = m_popIndex.load(); m_openSlots.post(); } T pop() noexcept(std::is_nothrow_invocable_r::pop ), T&>::value) { T item; pop(item); return item; } private: const unsigned int m_size; std::atomic_uint m_pushIndex; std::atomic_uint m_popIndex; std::atomic_uint m_count; T* m_data; fast_semaphore m_openSlots; fast_semaphore m_fullSlots; };
Super nice 🙂
thanks 🙂 it was my first attempt at lock free programming 😉