Advanced thread pool

Below is my implementation of the thread pool described in this talk and a benchmark comparing it against my simple thread pool implementation. The advanced pool is 15x faster at scheduling and dispatching short random length work items on my 2018 MacBook Pro with i5 CPU and 4 logical cores. It uses a queue per worker thread and a work stealing dispatcher. It tries to enqueue the work items onto a queue that is not currently locked by a dispatch thread. It also tries to steal work from other unblocked queues. As always the complete implementation is available at GitHub.

Benchmark program:

#include 
#include 
#include 
#include "pool.h"
using namespace std;
using namespace chrono;

const unsigned int COUNT = 10'000'000;
const unsigned int REPS = 10;

int main()
{
	srand(0);
	auto start = high_resolution_clock::now();
	{
		simple_thread_pool tp;
		for(int i = 0; i < COUNT; ++i)
			tp.enqueue_work([i]() {
				int x;
				int reps = REPS + (REPS * (rand() % 5));
				for(int n = 0; n < reps; ++n)
					x = i + rand();
			});
	}
	auto end = high_resolution_clock::now();
	auto duration = duration_cast(end - start);
	cout << "simple_thread_pool duration = " << duration.count() / 1000.f << " s" << endl;

	srand(0);
	start = high_resolution_clock::now();
	{
		thread_pool tp;
		for(int i = 0; i < COUNT; ++i)
			tp.enqueue_work([i]() {
				int x;
				int reps = REPS + (REPS * (rand() % 5));
				for(int n = 0; n < reps; ++n)
					x = i + rand();
			});
	}
	end = high_resolution_clock::now();
	duration = duration_cast(end - start);
	cout << "thread_pool duration = " << duration.count() / 1000.f << " s" << endl;
}

* Apple CLANG -Ofast -march=native -std=c++17 -lc++

simple_thread_pool duration = 30.337 s
thread_pool duration = 1.625 s

* LLVM -Ofast -march=native -std=c++17 -lc++

simple_thread_pool duration = 25.785 s
thread_pool duration = 1.615 s

* G++ -Ofast -march=native -std=c++17 -lstdc++ *

simple_thread_pool duration = 26.28 s
thread_pool duration = 1.614 s

Program output.

thread_pool class:

class thread_pool
{
public:
	thread_pool(unsigned int threads = std::thread::hardware_concurrency())
	: m_queues(threads), m_count(threads)
	{
		assert(threads != 0);
		auto worker = [&](unsigned int i)
		{
			while(true)
			{
				Proc f;
				for(unsigned int n = 0; n < m_count; ++n)
					if(m_queues[(i + n) % m_count].try_pop(f)) break;
				if(!f && !m_queues[i].pop(f)) break;
				f();
			}
		};
		for(unsigned int i = 0; i < threads; ++i)
			m_threads.emplace_back(worker, i);
	}

	~thread_pool() noexcept
	{
		for(auto& queue : m_queues)
			queue.done();
		for(auto& thread : m_threads)
			thread.join();
	}

	template
	void enqueue_work(F&& f, Args&&... args)
	{
		auto work = [f,args...]() { f(args...); };
		unsigned int i = m_index++;
		for(unsigned int n = 0; n < m_count * K; ++n)
			if(m_queues[(i + n) % m_count].try_push(work)) return;
		m_queues[i % m_count].push(work);
	}

	template
	auto enqueue_task(F&& f, Args&&... args) -> std::future::type>
	{
		using return_type = typename std::result_of::type;
		auto task = std::make_shared>(std::bind(std::forward(f), std::forward(args)...));
		std::future res = task->get_future();

		auto work = [task](){ (*task)(); };
		unsigned int i = m_index++;
		for(unsigned int n = 0; n < m_count * K; ++n)
			if(m_queues[(i + n) % m_count].try_push(work)) return res;
		m_queues[i % m_count].push(work);

		return res;
	}

private:
	using Proc = std::function;
	using Queues = std::vector>;
	Queues m_queues;

	using Threads = std::vector;
	Threads m_threads;

	const unsigned int m_count;
	std::atomic_uint m_index = 0;

	inline static const unsigned int K = 3;
};

Better timer class

It bothered me that my previous simple timer implementation fired off a new thread for each timeout and interval. I knew things could be done better, but didn’t yet know how. Well this morning inspiration came and I implemented new and shiny timer class. The interface is simple: you create a timer with 1 parameter, its “tick”. The tick determines how frequently the internal thread wakes up and looks for work. Work can be a repeating interval event, or a one time timeout event. Each time you register an interval or a timeout you get back a pointer to an event object. Using this event object you can cancel the interval or the timeout, if it hasn’t fired already. The internal thread lives for as long as the

timer
object does. It also self-corrects any time drift caused by the firing of events and execution delay. Complete implementation can be found at GitHub.

Here’s how you use it:

#include 
#include 
#include "timer.h"
using namespace std;
using namespace chrono;

int main()
{
	auto start = high_resolution_clock::now();
	auto duration = [start]() {
		auto now = high_resolution_clock::now();
		auto msecs = duration_cast(now - start).count();
		stringstream ss;
		ss << msecs / 1000.0;
		cout << "elapsed " << ss.str() << "s\t: ";
	};

	cout << "start" << endl;
	timer t(1ms);
	auto e1 = t.set_timeout(3s, [&]() { duration(); cout << "timeout 3s" << endl; });
	auto e2 = t.set_interval(1s, [&]() { duration(); cout << "interval 1s" << endl; });
	auto e3 = t.set_timeout(4s, [&]() { duration(); cout << "timeout 4s" << endl; });
	auto e4 = t.set_interval(2s, [&]() { duration(); cout << "interval 2s" << endl; });
	auto e5 = t.set_timeout(5s, [&]() { duration(); cout << "timeout that never happens" << endl; });
	e5->signal(); // cancel this timeout
	this_thread::sleep_for(5s);
	e4->signal(); // cancel this interval
	cout << "cancel interval 2" << endl;
	this_thread::sleep_for(5s);
	cout << "end" << endl;
}

start
elapsed 1s : interval 1s
elapsed 2s : interval 1s
elapsed 2s : interval 2s
elapsed 3s : timeout 3s
elapsed 3s : interval 1s
elapsed 4s : interval 1s
elapsed 4s : timeout 4s
elapsed 4s : interval 2s
elapsed 5s : interval 1s
cancel interval 2
elapsed 6s : interval 1s
elapsed 7s : interval 1s
elapsed 8s : interval 1s
elapsed 9s : interval 1s
elapsed 10s : interval 1s
end

Program output.

The timer class:

#pragma once

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include "event.h"

class timer
{
public:
	template
	timer(T&& tick)
	: m_tick(std::chrono::duration_cast(tick)), m_thread([this]()
	{
		assert(m_tick.count() > 0);
		auto start = std::chrono::high_resolution_clock::now();
		std::chrono::nanoseconds drift{0};
		while(!m_event.wait_for(m_tick - drift))
		{
			++m_ticks;
			auto it = std::begin(m_events);
			auto end = std::end(m_events);
			while(it != end)
			{
				auto& event = *it;
				++event.elapsed;
				if(event.elapsed == event.ticks)
				{
					auto remove = event.proc();
					if(remove)
					{
						m_events.erase(it++);
						continue;
					}
					else
					{
						event.elapsed = 0;
					}
				}
				++it;
			}
			auto now = std::chrono::high_resolution_clock::now();
			auto realDuration = std::chrono::duration_cast(now - start);
			auto fakeDuration = std::chrono::duration_cast(m_tick * m_ticks);
			drift = realDuration - fakeDuration;
		}
	})
	{}

	~timer()
	{
		m_event.signal();
		m_thread.join();
	}

	template
	auto set_timeout(T&& timeout, F f, Args&&... args)
	{
		assert(std::chrono::duration_cast(timeout).count() >= m_tick.count());
		auto event = std::make_shared();
		auto proc = [=]() {
			if(event->wait_for(std::chrono::seconds(0))) return true;
			f(args...);
			return true;
		};
		m_events.insert({ event_ctx::kNextSeqNum++, proc,
			static_cast(std::chrono::duration_cast(timeout).count() / m_tick.count()), 0, event });
		return event;
	}

	template
	auto set_interval(T&& interval, F f, Args&&... args)
	{
		assert(std::chrono::duration_cast(interval).count() >= m_tick.count());
		auto event = std::make_shared();
		auto proc = [=]() {
			if(event->wait_for(std::chrono::seconds(0))) return true;
			f(args...);
			return false;
		};
		m_events.insert({ event_ctx::kNextSeqNum++, proc,
			static_cast(std::chrono::duration_cast(interval).count() / m_tick.count()), 0, event });
		return event;
	}

private:
	std::chrono::nanoseconds m_tick;
	unsigned long long m_ticks = 0;
	manual_event m_event;
	std::thread m_thread;

	struct event_ctx
	{
		bool operator < (const event_ctx& rhs) const { return seq_num < rhs.seq_num; }
		static inline unsigned long long kNextSeqNum = 0;
		unsigned long long seq_num;
		std::function proc;
		unsigned long long ticks;
		mutable unsigned long long elapsed;
		std::shared_ptr event;
	};

	using set = std::set;
	set m_events;
};

Random number generator

Examples based on this talk.

Below is the old and rusty way of generating random numbers. Don’t do it!

#include 
#include 
#include 

int main()
{
	srand((unsigned int)time(NULL));
	for(int n = 0; n < 10; ++n)
		printf("%d ", rand() % 1000);
	printf("\n");
}

Below is the new and shiny way of generating random numbers. Do that instead! Comments inline and a benchmark of each random number generator included. Program output first:

random_device min = 0, max = 4294967295
mt19937 min = 0, max = 4294967295
mt19937_64 min = 0, max = 18446744073709551615
10 -1 6 10 5 -4 -3 2 6 -3 
8.73366 3.81724 2.11837 4.14365 9.58442 
vector of ints: 0 1 2 3 4 5 6 7 8 9 
shuffled to   : 3 1 6 7 9 4 8 5 0 2 

generating 100000000 random numbers...
random_device duration in ms = 142080
mt19937 duration in ms = 553.894
uniform_int_distribution duration in ms = 2719.63
uniform_real_distribution duration in ms = 1070.29

Program output.

Complete listing:

#include 
#include 
#include 
#include 
#include 
using namespace std;
using namespace chrono;

const int COUNT = 100'000'000;

int main()
{
	/* random_device MAY BE CRYPTOGRAPHICALLY STRONG */
	random_device rd;
	/* SEED mt19937 and mt19937_64 ENGINE WITH STRONG RANDOM NUMBER */
	mt19937 mt(rd());
	mt19937_64 mt64(rd());
	/* CREATE RANDOM DISTRIBUTION OBJECTS FOR INTS AND DOUBLES */
	uniform_int_distribution int_dist(-10, 10);
	uniform_real_distribution real_dist(1.0, 10.0);

	/* PRINT MIN AND MAX OF EACH RANDOM ENGINE */
	cout << "random_device min = " << rd.min() << ", max = " << rd.max() << endl;
	cout << "mt19937 min = " << mt.min() << ", max = " << mt.max() << endl;
	cout << "mt19937_64 min = " << mt64.min() << ", max = " << mt64.max() << endl;

	/* GENERATE 10 INTEGERS IN RANGE -10 TO 10 USING mt19937 GENERATOR */
	for(int n = 0; n < 10; ++n)
		cout << int_dist(mt) << " ";
	cout << endl;

	/* GENERATE 5 DOUBLES IN RANGE 1.0 TO 10.0 USING mt19937 GENERATOR */
	for(int n = 0; n < 5; ++n)
		cout << real_dist(mt) << " ";
	cout << endl;

	/* GENERATE A VECTOR OF CONSECUTIVE INTEGERS */
	vector v;
	for(int n = 0; n < 10; ++n)
		v.push_back(n);

	/* PRINT IT */
	cout << "vector of ints: ";
	for(auto it : v) cout << it << " ";
	cout << endl;

	/* RANDOMLY SHUFFLE THE VECTOR OF INTEGERS USING mt19937 GENERATOR */
	shuffle(begin(v), end(v), mt);

	/* PRINT IT */
	cout << "shuffled to   : ";
	for(auto it : v) cout << it << " ";
	cout << endl;
	
	/* ********************* */
	/* BENCHMARK STARTS HERE */
	/* ********************* */
	cout << "generating " << COUNT << " random numbers..." << endl;

	/* TEST PERFORMANCE OF random_device */
	auto start = high_resolution_clock::now();
	int result{0};
	for(int i = 0; i < COUNT; ++i)
		result += rd();
	auto end = high_resolution_clock::now();
	auto duration = duration_cast(end - start).count() / 1000.0f;
	cout << "random_device duration in ms = " << duration << endl;

	/* TEST PERFORMANCE OF mt19937 */
	start = high_resolution_clock::now();
	result = 0;
	for(int i = 0; i < COUNT; ++i)
		result += mt();
	end = high_resolution_clock::now();
	duration = duration_cast(end - start).count() / 1000.0f;
	cout << "mt19937 duration in ms = " << duration << endl;

	/* TEST PERFORMANCE OF uniform_int_distribution */
	start = high_resolution_clock::now();
	result = 0;
	for(int i = 0; i < COUNT; ++i)
		result += int_dist(mt);
	end = high_resolution_clock::now();
	duration = duration_cast(end - start).count() / 1000.0f;
	cout << "uniform_int_distribution duration in ms = " << duration << endl;

	/* TEST PERFORMANCE OF uniform_real_distribution */
	start = high_resolution_clock::now();
	result = 0;
	for(int i = 0; i < COUNT; ++i)
		result += real_dist(mt);
	end = high_resolution_clock::now();
	duration = duration_cast(end - start).count() / 1000.0f;
	cout << "uniform_real_distribution duration in ms = " << duration << endl;
}

.

Time to generate 100,000,000 random numbers on 2012 MacBook Pro i7 2.3GHz. On a logarithmic scale, where RD = std::random_device, MT = std::mt19937, UD = std::uniform_int_distribution, and UD-R = std::uniform_real_distribution.
Graph created using gnuplot.

Relaxed atomics and data races

The following code is a race condition:

#include 
#include 
#include 
using namespace std;

int main(int argc, char** argv)
{
	atomic_bool flag{false};
	bool data{false};

	thread t1([&]() {
		data = true;
		flag.store(true, memory_order_relaxed);
	});

	thread t2([&]() {
		while(flag.load(memory_order_relaxed) == false);
		cout << "data = " << boolalpha << data << endl;
	});

	t1.join();
	t2.join();

	return 1;
}

Because

memory_order_relaxed
is used the compiler and the CPU are free to reorder the two writes in thread
t1
. They are also free to reorder the two reads in thread
t2
. Hence undefined behavior.

The fix is to either use

memory_order_seq_cst
on the
store
and
load
calls, or
memory_order_release
on the
store
call and
memory_order_acquire
on the load call. Release prevents any prior loads and stores to be reordered past it; acquire guarantees that all stores from the thread that released the atomic variable are visible to the current thread; it also prevents reads and writes to be reordered before it. See here.

Bit fields and race conditions

The following program, despite looking correct, is a race condition and has unpredictable behaviour:

#include 
#include 
#include 
using namespace std;

const int COUNT = 10'000'000;

mutex aLock;
mutex bLock;

struct S
{
	unsigned int a:9;
	unsigned int b:7;
} __attribute__((packed));

int main(int argc, char** argv)
{
	S s{};

	thread t1([&]() {
		scoped_lock lock(aLock);
		for(int i = 0; i < COUNT; ++i)
		{
			s.a = 0;
			s.a = 0b111111111;
		}
	});

	thread t2([&]() {
		scoped_lock lock(bLock);
		for(int i = 0; i < COUNT; ++i)
		{
			s.b = 0;
			s.b = 0b1111111;
		}
	});

	t1.join();
	t2.join();

	cout << "sizeof(S) = " << sizeof(S) << ", " << s.a << ", " << s.b << endl;

	return 1;
}

The proof is in the output of running this code several times:

sizeof(S) = 2, 511, 127
sizeof(S) = 2, 511, 127
sizeof(S) = 2, 0, 127
sizeof(S) = 2, 511, 0
sizeof(S) = 2, 511, 127
sizeof(S) = 2, 511, 127
sizeof(S) = 2, 511, 127
sizeof(S) = 2, 0, 127
sizeof(S) = 2, 0, 127
sizeof(S) = 2, 511, 127

The race condition and the problem here is that the C++ standard states that adjacent bit fields are a single object in memory and may be packed to share the same byte. The CPU cannot operate on single bits, it must load at least 1 byte at a time, and because of the overlap, loading bits of

a
also loads the bits of
b
. So the write to bit field
a
, even though protected with its own mutex, also overwrites the value of
b
. And vice versa.

The fix is to use one mutex to protect all adjacent bit fields. I say all because you have no guarantee that the CPU will be able to load 1 byte at a time. It may be limited to working on 32-bit values at a time; depending on the architecture.

Corrected program:

#include 
#include 
#include 
using namespace std;

const int COUNT = 10'000'000;

mutex abLock;

struct S
{
	unsigned int a:9;
	unsigned int b:7;
} __attribute__((packed));

int main(int argc, char** argv)
{
	S s{};

	thread t1([&]() {
		scoped_lock lock(abLock);
		for(int i = 0; i < COUNT; ++i)
		{
			s.a = 0;
			s.a = 0b111111111;
		}
	});

	thread t2([&]() {
		scoped_lock lock(abLock);
		for(int i = 0; i < COUNT; ++i)
		{
			s.b = 0;
			s.b = 0b1111111;
		}
	});

	t1.join();
	t2.join();

	cout << "sizeof(S) = " << sizeof(S) << ", " << s.a << ", " << s.b << endl;

	return 1;
}

Dekker’s algorithm for N threads

In the previous post I described Dekker’s algorithm. Its limitation is that it only works for 2 threads. I wanted it to work for N threads, but I can’t atomically check N-1 flags belonging to other threads 🙁 now what?

Instead of multiple flags we’ll use one atomic variable’s bits to indicate which thread wants entry into the critical section. So, if first bit is set that means first thread declares its intent to enter. And so on for N threads.

Complete listing:

#include 
#include 
#include 
#include 
using namespace std;

const unsigned int COUNT = 5;
const unsigned int THREADS = thread::hardware_concurrency();
const unsigned int THREAD_MASK = 0b1;

int main(int argc, char** argv)
{
	atomic_uint flag{0};

	auto proc = [&](int t, unsigned int thread_mask) {
		for(int i = 0; i < COUNT;)
		{
			if(flag.fetch_or(thread_mask) == 0)
			{
				cout << "T" << t << " in critical section" << endl;
				++i;
			}

			flag.fetch_xor(thread_mask);
		}
	};

	vector vt;
	for(int i = 0; i < THREADS; ++i)
		vt.emplace_back(proc, i, THREAD_MASK << i);

	for(auto& t : vt)
		t.join();

	return 1;
}

Dekker’s algorithm

Dekker’s algorithm is a way of synchronizing two threads around a critical section. It is a dance where thread 1 sets its flag declaring its intent to enter the critical section, then it checks the flag of thread 2. If it is not set, it enters the critical section, if it is set, it resets its flag and backs off. Thread 2 does the same.

Below is my implementation of the algorithm using C++ atomics.

#include 
#include 
#include 
using namespace std;

const int COUNT = 10;

int main(int argc, char** argv)
{
	atomic_bool f1{false}, f2{false};

	auto proc1 = [&]() {
		for(int i = 0; i < COUNT;)
		{
			f1.store(true);
			if(f2.load() == false)
			{
				cout << "T1 in critical section" << endl;
				++i;
			}
			
			f1.store(false);
		}
	};

	auto proc2 = [&]() {
		for(int i = 0; i < COUNT;)
		{
			f2.store(true);
			if(f1.load() == false)
			{
				cout << "T2 in critical section" << endl;
				++i;
			}
			
			f2.store(false);
		}
	};

	thread t1(proc1);
	thread t2(proc2);

	t1.join();
	t2.join();

	return 1;
}

T1 in critical section
T2 in critical section
T1 in critical section
T2 in critical section
T2 in critical section
T1 in critical section
T1 in critical section
T2 in critical section
T1 in critical section
T2 in critical section
T1 in critical section
T2 in critical section
T1 in critical section
T2 in critical section
T1 in critical section
T2 in critical section
T1 in critical section
T2 in critical section
T1 in critical section
T2 in critical section
Program ended with exit code: 1

Program output.

Memory barriers and thread synchronization

We will jump straight to the code. This innocent looking little program has a major issue (when compiled for release build with optimizations on my Mac using GCC, Apple’s CLANG, and LLVM, as well as on Windows using Visual Studio 2017, and ran on a multicore machine). Can you spot the problem?

#include 
#include 
using namespace std;

int main(int argc, char** argv)
{
	bool flag = false;
	
	thread t1([&]() {
		this_thread::sleep_for(100ms);
		cout << "t1 started" << endl;
		flag = true;
		cout << "t1 signals and exits" << endl;
	});
	
	thread t2([&]() {
		cout << "t2 started" << endl;
		while(flag == false) ;
		cout << "t2 got signaled and exits" << endl;
	});
	
	t1.join();
	t2.join();
	
	return 1;
}

That's right! It will never terminate! It will hang forever! The

while
loop in line 18 will never break. But why? Thread
t1
sets
flag
to
true
after all. Yes, but it does so too late (notice the 100ms sleep). At that point thread
t2
has already L1 cached
flag
and will never see its updated value. If you think that making
flag
volatile
will help you're wrong. It may work on your compiler/machine but it is no guarantee. Now what?

This was one of the hardest lessons in C++ and computer science for me. Before continuing to the fix section I highly recommend you read about the following: memory barriers, C++ memory model as well as C++ Memory Model at Modernest C++, and memory ordering. I'll see you in a couple of days 😉

The fix.

The simplest fix is to wrap access to flag around a

mutex
lock
/
unlock
or make
flag
an
atomic<bool>;
(both of those solutions will insert appropriate memory barriers). But that's not always an option for other data types...

We need to make sure that

t2
can see the actions of
t1
that happened later in time. For this we need to force cache synchronization between different CPU cores. We can do it in three ways:
1) By inserting memory barriers in the right places.
2) By inserting loads and stores of an atomic variable using release/acquire semantics.
3) By inserting loads and stores of a dependent atomic variable using release/consume semantics.

Below is the corrected version of our example; uncomment each

#define
to engage different fixes:

#include 
#include 
#include 
using namespace std;

//#define ATOMIC_FENCE
//#define ATOMIC_RELEASE
//#define ATOMIC_CONSUME

#if defined ATOMIC_FENCE
#define FENCE_ACQUIRE atomic_thread_fence(memory_order_acquire)
#define FENCE_RELEASE atomic_thread_fence(memory_order_release)
#elif defined ATOMIC_RELEASE
atomic_bool f{false};
#define FENCE_ACQUIRE f.load(memory_order_acquire)
#define FENCE_RELEASE f.store(true, memory_order_release)
#elif defined ATOMIC_CONSUME
atomic_bool f{false};
#define FENCE_ACQUIRE f.load(memory_order_consume)
#define FENCE_RELEASE f.store(flag, memory_order_release)
#else
#define FENCE_ACQUIRE
#define FENCE_RELEASE
#endif

int main(int argc, char** argv)
{
	bool flag = false;

	thread t1([&]() {
		this_thread::sleep_for(100ms);
		cout << "t1 started" << endl;
		flag = true;
		FENCE_RELEASE;
		cout << "t1 signals and exits" << endl;
	});

	thread t2([&]() {
		cout << "t2 started" << endl;
		while(flag == false) FENCE_ACQUIRE;
		cout << "t2 got signaled and exits" << endl;
	});

	t1.join();
	t2.join();

	return 1;
}

Atomic blocking queue

As I learn about atomics and memory model I decided to take a stab at rewriting my blocking queue using atomic operations and eliminate the mutex around the critical section of code responsible for pushing and popping elements, effectively creating a fast path through the queue if no blocking is taking place.

Let’s jump straight into the code:

template
typename std::enable_if<
	std::is_copy_constructible::value and
	std::is_nothrow_copy_constructible::value, void>::type
push(const T& item) noexcept
{
	m_openSlots.wait();

	auto pushIndex = m_pushIndex.fetch_add(1);
	new (m_data + (pushIndex % m_size)) T (item);
	++m_count;

	auto expected = m_pushIndex.load();
	while(!m_pushIndex.compare_exchange_strong(expected, m_pushIndex % m_size))
		expected = m_pushIndex.load();

	m_fullSlots.post();
}

Lines 1-4 are basically a template concept which specifies that this method will only be present if the type is no-throw copy constructible.
Line 7 is the same semaphore decrement and possible blocking if the queue is full. The fast path of this semaphore implementation uses only atomic operations, so if it doesn’t block it will not engage a mutex (

fast_semaphore
code available on GitHub).
Line 9 is where the magic starts. We atomically increment the
m_pushIndex
while fetching its previous value into a temporary
pushIndex
. From now on we work with the temporary.
Line 10 is where we insert the element by copy constructing it in the right open slot.
Line 11 is book-keeping needed during the destruction of the queue.
Line 13-15 is where we have to modulo the
m_pushIndex
with
m_size
, so it never overflows. It check, in a loop, if it has changed, if it has it loads it back into
expected
and checks again until it hasn’t changed in which case it atomically swaps
m_pushIndex
with
m_pushIndex % m_size
.
Line 17 signals to other blocked threads, if there are any, that the queue now has an element available for popping.

Other methods of the queue work in a very similar way so I will not be describing them in detail here. The only crux of this implementation is that it only works for no-throw copyable and movable types; so declare your constructors and assignment operators with

noexcept
if you want to use them with this queue 🙂

Complete listing:

#pragma once

#include 
#include 
#include 
#include 
#include "semaphore.h"

template
class fast_blocking_queue
{
public:
	explicit fast_blocking_queue(unsigned int size)
	: m_size(size), m_pushIndex(0), m_popIndex(0), m_count(0),
	m_data((T*)operator new(size * sizeof(T))),
	m_openSlots(size), m_fullSlots(0)
	{
		assert(size != 0);
	}

	~fast_blocking_queue() noexcept
	{
		while (m_count--)
		{
			m_data[m_popIndex].~T();
			m_popIndex = ++m_popIndex % m_size;
		}
		operator delete(m_data);
	}

	template
	typename std::enable_if<
		std::is_copy_constructible::value and
		std::is_nothrow_copy_constructible::value, void>::type
	push(const T& item) noexcept
	{
		m_openSlots.wait();

		auto pushIndex = m_pushIndex.fetch_add(1);
		new (m_data + (pushIndex % m_size)) T (item);
		++m_count;

		auto expected = m_pushIndex.load();
		while(!m_pushIndex.compare_exchange_strong(expected, m_pushIndex % m_size))
			expected = m_pushIndex.load();

		m_fullSlots.post();
	}

	template
	typename std::enable_if<
		std::is_move_constructible::value and
		std::is_nothrow_move_constructible::value, void>::type
	push(T&& item) noexcept
	{
		m_openSlots.wait();

		auto pushIndex = m_pushIndex.fetch_add(1);
		new (m_data + (pushIndex % m_size)) T (std::move(item));
		++m_count;

		auto expected = m_pushIndex.load();
		while(!m_pushIndex.compare_exchange_strong(expected, m_pushIndex % m_size))
			expected = m_pushIndex.load();

		m_fullSlots.post();
	}

	template
	typename std::enable_if<
		not std::is_move_assignable::value and
		std::is_nothrow_copy_assignable::value, void>::type
	pop(T& item) noexcept
	{
		m_fullSlots.wait();

		auto popIndex = m_popIndex.fetch_add(1);
		item = m_data[popIndex % m_size];
		m_data[popIndex % m_size].~T();
		--m_count;

		auto expected = m_popIndex.load();
		while(!m_popIndex.compare_exchange_strong(expected, m_popIndex % m_size))
			expected = m_popIndex.load();

		m_openSlots.post();
	}

	template
	typename std::enable_if<
		std::is_move_assignable::value and
		std::is_nothrow_move_assignable::value, void>::type
	pop(T& item) noexcept
	{
		m_fullSlots.wait();

		auto popIndex = m_popIndex.fetch_add(1);
		item = std::move(m_data[popIndex % m_size]);
		m_data[popIndex % m_size].~T();
		--m_count;

		auto expected = m_popIndex.load();
		while(!m_popIndex.compare_exchange_strong(expected, m_popIndex % m_size))
			expected = m_popIndex.load();

		m_openSlots.post();
	}

	T pop() noexcept(std::is_nothrow_invocable_r::pop), T&>::value)
	{
		T item;
		pop(item);
		return item;
	}

private:
	const unsigned int m_size;
	std::atomic_uint m_pushIndex;
	std::atomic_uint m_popIndex;
	std::atomic_uint m_count;
	T* m_data;

	fast_semaphore m_openSlots;
	fast_semaphore m_fullSlots;
};

Synchronizing with atomics

I’ve been reading and learning about the C++ memory model and relaxed atomics and decided to rewrite my Interview question, part 1 using

atomic<bool>
‘s to synchronize the threads. I took it one step further and created three threads that print A, B, C, A, B, C,…
Each thread waits for its
atomic<bool>
to be set to true while spinning; prints then sets the next thread’s flag. The wait is a CAS (compare-and-swap) operation.
Next, I will try to figure out how to relax the
compare_exchange
and the
store
calls with
memory_order_acquire
and
memory_order_release
parameters 🙂

Complete listing:

#include 
#include 
#include 
using namespace std;

const int COUNT = 3;

int main(int argc, char** argv)
{
	atomic e1{false}, e2{false}, e3{false};
	
	thread t1([&](){
		for(int i = 0; i < COUNT; ++i)
		{
			bool e = true;
			while(!e1.compare_exchange_strong(e, false)) e = true;
			cout << "A" << endl;
			e2.store(true);
		}
	});
	
	thread t2([&](){
		for(int i = 0; i < COUNT; ++i)
		{
			bool e = true;
			while(!e2.compare_exchange_strong(e, false)) e = true;
			cout << "B" << endl;
			e3.store(true);
		}
	});
	
	thread t3([&](){
		for(int i = 0; i < COUNT; ++i)
		{
			bool e = true;
			while(!e3.compare_exchange_strong(e, false)) e = true;
			cout << "C" << endl;
			e1.store(true);
		}
	});
	
	e1.store(true);
	
	t1.join();
	t2.join();
	t3.join();
	
	return 1;
}

A
B
C
A
B
C
A
B
C
Program ended with exit code: 1

Program output.

Fun with unordered containers

I’ve been playing around with

unordered_map
and
unordered_set
to see how they grow when elements are inserted. Here’s what I learned so far: the default load factor is 1; this means the container will grow to accommodate at most 1 element per bucket. You can change the load factor with
max_load_factor
call followed by
rehash
call. I also wanted to see how different STL implementations behave, so I tested with GCC, Apple’s CLAND, and LLVM on my Mac. The test program creates an
unordered_map
and
unordered_set
. Loads it with
10'000'000
entries and prints out the container’s properties. I then clear the container and do the same test but
reserve
space for the entries. I print the container’s properties: size, bucket count, and load factor. Next I change the
max_load_factor
to 10 (allowing for up to 10 entries per bucket) and rehash with maximum 1/10th the bucket count.
The GCC implementation is more aggressive in terms of container growth; it creates more buckets than Apple’s CLANG and LLVM’s implementation which had identical results.

The takeaway here is that when using unordered containers it is good to either reserve the appropriate element count, or rehash after inserting the elements to reduce the bucket count and optimize memory use.

****************************************
* GCC -Ofast -march=native -lstdc++ *
****************************************
unordered_map
initial
size = 10000000
bucket count = 15345007
load factor = 0.651678

reserved
size = 10000000
bucket count = 10352717
load factor = 0.96593

re-hashed
size = 10000000
bucket count = 1056323
load factor = 9.4668

unordered_set
initial
size = 10000000
bucket count = 15345007
load factor = 0.651678

reserved
size = 10000000
bucket count = 10352717
load factor = 0.96593

re-hashed
size = 10000000
bucket count = 1056323
load factor = 9.4668

**********************************************
* Apple CLANG -Ofast -march=native -lc++ *
**********************************************
unordered_map
initial
size = 10000000
bucket count = 13169977
load factor = 0.759303

reserved
size = 10000000
bucket count = 10000019
load factor = 0.999998

re-hashed
size = 10000000
bucket count = 1000003
load factor = 9.99997

unordered_set
initial
size = 10000000
bucket count = 13169977
load factor = 0.759303

reserved
size = 10000000
bucket count = 10000019
load factor = 0.999998

re-hashed
size = 10000000
bucket count = 1000003
load factor = 9.99997

Program output.

Complete listing:

#include 
#include 
#include "trace.h"
using namespace std;

const int COUNT = 10'000'000;

int main(int argc, char** argv)
{
	unordered_map m;
	for(int i = 0; i < COUNT; ++i)
		m[i] = i;

	trace("unordered_map");
	trace("initial");
	trace("size         = ", m.size());
	trace("bucket count = ", m.bucket_count());
	trace("load factor  = ", m.load_factor(), "\n");

	m.clear();
	m.reserve(COUNT);
	for(int i = 0; i < COUNT; ++i)
		m[i] = i;

	trace("reserved");
	trace("size         = ", m.size());
	trace("bucket count = ", m.bucket_count());
	trace("load factor  = ", m.load_factor(), "\n");

	m.max_load_factor(10);
	m.rehash(COUNT / 10);

	trace("re-hashed");
	trace("size         = ", m.size());
	trace("bucket count = ", m.bucket_count());
	trace("load factor  = ", m.load_factor(), "\n");

	unordered_set s;
	for(int i = 0; i < COUNT; ++i)
		s.insert(i);

	trace("unordered_set");
	trace("initial");
	trace("size         = ", s.size());
	trace("bucket count = ", s.bucket_count());
	trace("load factor  = ", s.load_factor(), "\n");

	s.clear();
	s.reserve(COUNT);
	for(int i = 0; i < COUNT; ++i)
		s.insert(i);

	trace("reserved");
	trace("size         = ", s.size());
	trace("bucket count = ", s.bucket_count());
	trace("load factor  = ", s.load_factor(), "\n");

	s.max_load_factor(10);
	s.rehash(COUNT / 10);

	trace("re-hashed");
	trace("size         = ", s.size());
	trace("bucket count = ", s.bucket_count());
	trace("load factor  = ", s.load_factor(), "\n");

	return 1;
}

trace.h:

#pragma once

#include 
#include 
#include "mutex.h"

namespace { static inline fast_mutex kStdOutLock; }

template
inline void trace(Ts&&... args)
{
	std::scoped_lock lock(kStdOutLock);
	(std::cout << ... << args) << std::endl;
}

Short introduction to parameter packs and fold expressions

Docendo discimus.

Seneca the Younger.

I’m new to this topic since I literally just learned about the fold expressions today, and parameter packs days before that, so it will be a short post where I’ll explain the basics 🙂

Let’s jump straight to the code:

template
int sum(Args&&... args)
{
    return (args + ...);
}

Line 1 tells us there will be zero or more parameters.
Line 2 declares a function with said variable number of parameters.
Line 4 is where the magic happens. The fold expression states that for every parameter in the

args
parameter pack, combine it with the next one using
operator+
. So it becomes
args0 + args1 + ... + argsN
.

Another example:

template
inline void trace(Ts&&... args)
{
    (cout << ... << args) << endl;
}

Here in line 4 the fold expression takes the form of "(init op ... op pack)" as described by syntax #4 here. It expands to

cout << args0 << args1 << ... << argsN
.

Another way of doing the same is:

template
inline void trace(Ts&&... args)
{
    ((cout << args << " "),...) << endl;
}

Here the

cout << args << " "
is expanded N times, separated by the comma operator, so it's less efficient but we get a space between each printed argument.

That's it for now 🙂 I'll post more as I learn more!

Complete listing:

#include 
using namespace std;

template
int sum(Args&&... args)
{
    return (args + ...);
}

template
inline void trace(Ts&&... args)
{
    //(cout << ... << args) << endl;
    ((cout << args << " "),...) << endl;
}

int main(int argc, char** argv)
{
    trace(sum(1));
    trace(sum(1,2,3,4,5));
    trace(sum(1,1,2,3,5,8,13,21,34,55));
    trace(1, 2, 3);

    return 1;
}

Pure virtual destructor

Did you know that a destructor can be pure virtual? It can be defined as such, but it still needs a body declaration. It is a way of making a class abstract (instances of it cannot be created) without having to make any pure virtual methods. So use it if you have a base class that you don’t want users to instantiate but you have no other candidates for pure virtual specifier 🙂

Additional discussion of this topic at Geeks for Geeks: Pure virtual destructor in C++.

Complete listing:

class base
{
public:
	virtual ~base() = 0;
};

base::~base() {}

class derived : public base
{
public:
	virtual ~derived() {}
};

int main(int argc, char** argv)
{
	//base b; // Compile error: Variable type 'base' is an abstract class
	derived d;

	return 1;
}

Read/write mutex

Designed and implemented by Chris M. Thomasson. Complete implementation can be found on GitHub.

mutex.h:

class rw_fast_mutex
{
public:
	rw_fast_mutex()
	: m_wrstate(1), m_count(INT_MAX), m_rdwake(0),
	m_rdwset(0), m_wrwset(0), m_wrmtx(0) {}

	void read_lock()
	{
		if (m_count.fetch_add(-1, std::memory_order_acquire) < 1)
			m_rdwset.wait();
	}

	void read_unlock()
	{
		if (m_count.fetch_add(1, std::memory_order_release) < 0)
			if (m_rdwake.fetch_add(-1, std::memory_order_acq_rel) == 1)
				m_wrwset.post();
	}

	void write_lock()
	{
		if (m_wrstate.fetch_sub(1, std::memory_order_acquire) < 1)
			m_wrmtx.wait();
		int count = m_count.fetch_add(-INT_MAX, std::memory_order_acquire);
		if (count < INT_MAX)
		{
			int rdwake = m_rdwake.fetch_add(INT_MAX - count, std::memory_order_acquire);
			if (rdwake + INT_MAX - count)
				m_wrwset.wait();
		}
	}

	void write_unlock()
	{
		int count = m_count.fetch_add(INT_MAX, std::memory_order_release);
		if (count < 0)
			m_rdwset.post(-count);
		if (m_wrstate.fetch_add(1, std::memory_order_release) < 0)
			m_wrmtx.post();
	}

private:
	std::atomic m_wrstate;
	std::atomic m_count;
	std::atomic m_rdwake;

	semaphore m_rdwset;
	semaphore m_wrwset;
	semaphore m_wrmtx;
};

Templates in STL containers

Storing template classes in STL containers is tricky 🙂 If

v
is a
std::vector
and
TC
is a template class, how can we do the following:

v.push_back(new TC('X'));
v.push_back(new TC(1));
v.push_back(new TC(3.141));

for(auto it : v)
    it->doWork();

The trick is two-fold: 1) we need a non-template base class (let’s call it

NTB
) and 2) we need to store pointers to the non-template base class in the STL container. Like this:

class NTB
{
public:
    virtual ~NTB() {}
    virtual void doWork() = 0;
};

template
class TC : public NTB
{
public:
    TC(T t) : m_T(t) {}
    virtual void doWork() override { cout << m_T << endl; }
private:
    T m_T;
};

Complete listing:

#include 
#include 
using namespace std;

class NTB
{
public:
    virtual ~NTB() {}
    virtual void doWork() = 0;
};

template
class TC : public NTB
{
public:
    TC(T t) : m_T(t) {}
    virtual void doWork() override { cout << m_T << endl; }
private:
    T m_T;
};

int main(int argc, char** argv)
{
    vector v;
    v.push_back(new TC('X'));
    v.push_back(new TC(1));
    v.push_back(new TC(3.141));

    for(auto it : v)
        it->doWork();

    for(auto it : v)
        delete it;

    return 1;
}

Multiple return values

C++17 gives us an elegant way to return multiple values from a function using structured binding.

Here’s how we can declare a function that returns 3 values:

template
auto ReturnThree(T t) -> tuple
{
    return {t / 9, t / 3, t};
}

This function takes one argument of type

T
, and returns a
tuple<T, T, T>
of 3 values.

In order to get the return values in C++11 we would have to write the following:

tuple<int, int, int> t = ReturnThree(9);

But with C++17 and structured binding syntax we can write:

auto [t1, t2, t3] = ReturnThree(9);

Much cleaner code this way 🙂

Complete listing:

#include 
using namespace std;

template
auto ReturnThree(T t) -> tuple
{
    return {t / 9, t / 3, t};
}

int main(int argc, char** argv)
{
    tuple t = ReturnThree(9); // Old and rusty
    auto [t1, t2, t3] = ReturnThree(9); // New and shiny
    return 1;
}