Relaxed atomics and data races

The following code is a race condition:

#include 
#include 
#include 
using namespace std;

int main(int argc, char** argv)
{
	atomic_bool flag{false};
	bool data{false};

	thread t1([&]() {
		data = true;
		flag.store(true, memory_order_relaxed);
	});

	thread t2([&]() {
		while(flag.load(memory_order_relaxed) == false);
		cout << "data = " << boolalpha << data << endl;
	});

	t1.join();
	t2.join();

	return 1;
}

Because

memory_order_relaxed
is used the compiler and the CPU are free to reorder the two writes in thread
t1
. They are also free to reorder the two reads in thread
t2
. Hence undefined behavior.

The fix is to either use

memory_order_seq_cst
on the
store
and
load
calls, or
memory_order_release
on the
store
call and
memory_order_acquire
on the load call. Release prevents any prior loads and stores to be reordered past it; acquire guarantees that all stores from the thread that released the atomic variable are visible to the current thread; it also prevents reads and writes to be reordered before it. See here.

Bit fields and race conditions

The following program, despite looking correct, is a race condition and has unpredictable behaviour:

#include 
#include 
#include 
using namespace std;

const int COUNT = 10'000'000;

mutex aLock;
mutex bLock;

struct S
{
	unsigned int a:9;
	unsigned int b:7;
} __attribute__((packed));

int main(int argc, char** argv)
{
	S s{};

	thread t1([&]() {
		scoped_lock lock(aLock);
		for(int i = 0; i < COUNT; ++i)
		{
			s.a = 0;
			s.a = 0b111111111;
		}
	});

	thread t2([&]() {
		scoped_lock lock(bLock);
		for(int i = 0; i < COUNT; ++i)
		{
			s.b = 0;
			s.b = 0b1111111;
		}
	});

	t1.join();
	t2.join();

	cout << "sizeof(S) = " << sizeof(S) << ", " << s.a << ", " << s.b << endl;

	return 1;
}

The proof is in the output of running this code several times:

sizeof(S) = 2, 511, 127
sizeof(S) = 2, 511, 127
sizeof(S) = 2, 0, 127
sizeof(S) = 2, 511, 0
sizeof(S) = 2, 511, 127
sizeof(S) = 2, 511, 127
sizeof(S) = 2, 511, 127
sizeof(S) = 2, 0, 127
sizeof(S) = 2, 0, 127
sizeof(S) = 2, 511, 127

The race condition and the problem here is that the C++ standard states that adjacent bit fields are a single object in memory and may be packed to share the same byte. The CPU cannot operate on single bits, it must load at least 1 byte at a time, and because of the overlap, loading bits of

a
also loads the bits of
b
. So the write to bit field
a
, even though protected with its own mutex, also overwrites the value of
b
. And vice versa.

The fix is to use one mutex to protect all adjacent bit fields. I say all because you have no guarantee that the CPU will be able to load 1 byte at a time. It may be limited to working on 32-bit values at a time; depending on the architecture.

Corrected program:

#include 
#include 
#include 
using namespace std;

const int COUNT = 10'000'000;

mutex abLock;

struct S
{
	unsigned int a:9;
	unsigned int b:7;
} __attribute__((packed));

int main(int argc, char** argv)
{
	S s{};

	thread t1([&]() {
		scoped_lock lock(abLock);
		for(int i = 0; i < COUNT; ++i)
		{
			s.a = 0;
			s.a = 0b111111111;
		}
	});

	thread t2([&]() {
		scoped_lock lock(abLock);
		for(int i = 0; i < COUNT; ++i)
		{
			s.b = 0;
			s.b = 0b1111111;
		}
	});

	t1.join();
	t2.join();

	cout << "sizeof(S) = " << sizeof(S) << ", " << s.a << ", " << s.b << endl;

	return 1;
}

Dekker’s algorithm for N threads

In the previous post I described Dekker’s algorithm. Its limitation is that it only works for 2 threads. I wanted it to work for N threads, but I can’t atomically check N-1 flags belonging to other threads 🙁 now what?

Instead of multiple flags we’ll use one atomic variable’s bits to indicate which thread wants entry into the critical section. So, if first bit is set that means first thread declares its intent to enter. And so on for N threads.

Complete listing:

#include 
#include 
#include 
#include 
using namespace std;

const unsigned int COUNT = 5;
const unsigned int THREADS = thread::hardware_concurrency();
const unsigned int THREAD_MASK = 0b1;

int main(int argc, char** argv)
{
	atomic_uint flag{0};

	auto proc = [&](int t, unsigned int thread_mask) {
		for(int i = 0; i < COUNT;)
		{
			if(flag.fetch_or(thread_mask) == 0)
			{
				cout << "T" << t << " in critical section" << endl;
				++i;
			}

			flag.fetch_xor(thread_mask);
		}
	};

	vector vt;
	for(int i = 0; i < THREADS; ++i)
		vt.emplace_back(proc, i, THREAD_MASK << i);

	for(auto& t : vt)
		t.join();

	return 1;
}

Dekker’s algorithm

Dekker’s algorithm is a way of synchronizing two threads around a critical section. It is a dance where thread 1 sets its flag declaring its intent to enter the critical section, then it checks the flag of thread 2. If it is not set, it enters the critical section, if it is set, it resets its flag and backs off. Thread 2 does the same.

Below is my implementation of the algorithm using C++ atomics.

#include 
#include 
#include 
using namespace std;

const int COUNT = 10;

int main(int argc, char** argv)
{
	atomic_bool f1{false}, f2{false};

	auto proc1 = [&]() {
		for(int i = 0; i < COUNT;)
		{
			f1.store(true);
			if(f2.load() == false)
			{
				cout << "T1 in critical section" << endl;
				++i;
			}
			
			f1.store(false);
		}
	};

	auto proc2 = [&]() {
		for(int i = 0; i < COUNT;)
		{
			f2.store(true);
			if(f1.load() == false)
			{
				cout << "T2 in critical section" << endl;
				++i;
			}
			
			f2.store(false);
		}
	};

	thread t1(proc1);
	thread t2(proc2);

	t1.join();
	t2.join();

	return 1;
}

T1 in critical section
T2 in critical section
T1 in critical section
T2 in critical section
T2 in critical section
T1 in critical section
T1 in critical section
T2 in critical section
T1 in critical section
T2 in critical section
T1 in critical section
T2 in critical section
T1 in critical section
T2 in critical section
T1 in critical section
T2 in critical section
T1 in critical section
T2 in critical section
T1 in critical section
T2 in critical section
Program ended with exit code: 1

Program output.

Memory barriers and thread synchronization

We will jump straight to the code. This innocent looking little program has a major issue (when compiled for release build with optimizations on my Mac using GCC, Apple’s CLANG, and LLVM, as well as on Windows using Visual Studio 2017, and ran on a multicore machine). Can you spot the problem?

#include 
#include 
using namespace std;

int main(int argc, char** argv)
{
	bool flag = false;
	
	thread t1([&]() {
		this_thread::sleep_for(100ms);
		cout << "t1 started" << endl;
		flag = true;
		cout << "t1 signals and exits" << endl;
	});
	
	thread t2([&]() {
		cout << "t2 started" << endl;
		while(flag == false) ;
		cout << "t2 got signaled and exits" << endl;
	});
	
	t1.join();
	t2.join();
	
	return 1;
}

That's right! It will never terminate! It will hang forever! The

while
loop in line 18 will never break. But why? Thread
t1
sets
flag
to
true
after all. Yes, but it does so too late (notice the 100ms sleep). At that point thread
t2
has already L1 cached
flag
and will never see its updated value. If you think that making
flag
volatile
will help you're wrong. It may work on your compiler/machine but it is no guarantee. Now what?

This was one of the hardest lessons in C++ and computer science for me. Before continuing to the fix section I highly recommend you read about the following: memory barriers, C++ memory model as well as C++ Memory Model at Modernest C++, and memory ordering. I'll see you in a couple of days 😉

The fix.

The simplest fix is to wrap access to flag around a

mutex
lock
/
unlock
or make
flag
an
atomic<bool>;
(both of those solutions will insert appropriate memory barriers). But that's not always an option for other data types...

We need to make sure that

t2
can see the actions of
t1
that happened later in time. For this we need to force cache synchronization between different CPU cores. We can do it in three ways:
1) By inserting memory barriers in the right places.
2) By inserting loads and stores of an atomic variable using release/acquire semantics.
3) By inserting loads and stores of a dependent atomic variable using release/consume semantics.

Below is the corrected version of our example; uncomment each

#define
to engage different fixes:

#include 
#include 
#include 
using namespace std;

//#define ATOMIC_FENCE
//#define ATOMIC_RELEASE
//#define ATOMIC_CONSUME

#if defined ATOMIC_FENCE
#define FENCE_ACQUIRE atomic_thread_fence(memory_order_acquire)
#define FENCE_RELEASE atomic_thread_fence(memory_order_release)
#elif defined ATOMIC_RELEASE
atomic_bool f{false};
#define FENCE_ACQUIRE f.load(memory_order_acquire)
#define FENCE_RELEASE f.store(true, memory_order_release)
#elif defined ATOMIC_CONSUME
atomic_bool f{false};
#define FENCE_ACQUIRE f.load(memory_order_consume)
#define FENCE_RELEASE f.store(flag, memory_order_release)
#else
#define FENCE_ACQUIRE
#define FENCE_RELEASE
#endif

int main(int argc, char** argv)
{
	bool flag = false;

	thread t1([&]() {
		this_thread::sleep_for(100ms);
		cout << "t1 started" << endl;
		flag = true;
		FENCE_RELEASE;
		cout << "t1 signals and exits" << endl;
	});

	thread t2([&]() {
		cout << "t2 started" << endl;
		while(flag == false) FENCE_ACQUIRE;
		cout << "t2 got signaled and exits" << endl;
	});

	t1.join();
	t2.join();

	return 1;
}

Atomic blocking queue

As I learn about atomics and memory model I decided to take a stab at rewriting my blocking queue using atomic operations and eliminate the mutex around the critical section of code responsible for pushing and popping elements, effectively creating a fast path through the queue if no blocking is taking place.

Let’s jump straight into the code:

template
typename std::enable_if<
	std::is_copy_constructible::value and
	std::is_nothrow_copy_constructible::value, void>::type
push(const T& item) noexcept
{
	m_openSlots.wait();

	auto pushIndex = m_pushIndex.fetch_add(1);
	new (m_data + (pushIndex % m_size)) T (item);
	++m_count;

	auto expected = m_pushIndex.load();
	while(!m_pushIndex.compare_exchange_strong(expected, m_pushIndex % m_size))
		expected = m_pushIndex.load();

	m_fullSlots.post();
}

Lines 1-4 are basically a template concept which specifies that this method will only be present if the type is no-throw copy constructible.
Line 7 is the same semaphore decrement and possible blocking if the queue is full. The fast path of this semaphore implementation uses only atomic operations, so if it doesn’t block it will not engage a mutex (

fast_semaphore
code available on GitHub).
Line 9 is where the magic starts. We atomically increment the
m_pushIndex
while fetching its previous value into a temporary
pushIndex
. From now on we work with the temporary.
Line 10 is where we insert the element by copy constructing it in the right open slot.
Line 11 is book-keeping needed during the destruction of the queue.
Line 13-15 is where we have to modulo the
m_pushIndex
with
m_size
, so it never overflows. It check, in a loop, if it has changed, if it has it loads it back into
expected
and checks again until it hasn’t changed in which case it atomically swaps
m_pushIndex
with
m_pushIndex % m_size
.
Line 17 signals to other blocked threads, if there are any, that the queue now has an element available for popping.

Other methods of the queue work in a very similar way so I will not be describing them in detail here. The only crux of this implementation is that it only works for no-throw copyable and movable types; so declare your constructors and assignment operators with

noexcept
if you want to use them with this queue 🙂

Complete listing:

#pragma once

#include 
#include 
#include 
#include 
#include "semaphore.h"

template
class fast_blocking_queue
{
public:
	explicit fast_blocking_queue(unsigned int size)
	: m_size(size), m_pushIndex(0), m_popIndex(0), m_count(0),
	m_data((T*)operator new(size * sizeof(T))),
	m_openSlots(size), m_fullSlots(0)
	{
		assert(size != 0);
	}

	~fast_blocking_queue() noexcept
	{
		while (m_count--)
		{
			m_data[m_popIndex].~T();
			m_popIndex = ++m_popIndex % m_size;
		}
		operator delete(m_data);
	}

	template
	typename std::enable_if<
		std::is_copy_constructible::value and
		std::is_nothrow_copy_constructible::value, void>::type
	push(const T& item) noexcept
	{
		m_openSlots.wait();

		auto pushIndex = m_pushIndex.fetch_add(1);
		new (m_data + (pushIndex % m_size)) T (item);
		++m_count;

		auto expected = m_pushIndex.load();
		while(!m_pushIndex.compare_exchange_strong(expected, m_pushIndex % m_size))
			expected = m_pushIndex.load();

		m_fullSlots.post();
	}

	template
	typename std::enable_if<
		std::is_move_constructible::value and
		std::is_nothrow_move_constructible::value, void>::type
	push(T&& item) noexcept
	{
		m_openSlots.wait();

		auto pushIndex = m_pushIndex.fetch_add(1);
		new (m_data + (pushIndex % m_size)) T (std::move(item));
		++m_count;

		auto expected = m_pushIndex.load();
		while(!m_pushIndex.compare_exchange_strong(expected, m_pushIndex % m_size))
			expected = m_pushIndex.load();

		m_fullSlots.post();
	}

	template
	typename std::enable_if<
		not std::is_move_assignable::value and
		std::is_nothrow_copy_assignable::value, void>::type
	pop(T& item) noexcept
	{
		m_fullSlots.wait();

		auto popIndex = m_popIndex.fetch_add(1);
		item = m_data[popIndex % m_size];
		m_data[popIndex % m_size].~T();
		--m_count;

		auto expected = m_popIndex.load();
		while(!m_popIndex.compare_exchange_strong(expected, m_popIndex % m_size))
			expected = m_popIndex.load();

		m_openSlots.post();
	}

	template
	typename std::enable_if<
		std::is_move_assignable::value and
		std::is_nothrow_move_assignable::value, void>::type
	pop(T& item) noexcept
	{
		m_fullSlots.wait();

		auto popIndex = m_popIndex.fetch_add(1);
		item = std::move(m_data[popIndex % m_size]);
		m_data[popIndex % m_size].~T();
		--m_count;

		auto expected = m_popIndex.load();
		while(!m_popIndex.compare_exchange_strong(expected, m_popIndex % m_size))
			expected = m_popIndex.load();

		m_openSlots.post();
	}

	T pop() noexcept(std::is_nothrow_invocable_r::pop), T&>::value)
	{
		T item;
		pop(item);
		return item;
	}

private:
	const unsigned int m_size;
	std::atomic_uint m_pushIndex;
	std::atomic_uint m_popIndex;
	std::atomic_uint m_count;
	T* m_data;

	fast_semaphore m_openSlots;
	fast_semaphore m_fullSlots;
};

Synchronizing with atomics

I’ve been reading and learning about the C++ memory model and relaxed atomics and decided to rewrite my Interview question, part 1 using

atomic<bool>
‘s to synchronize the threads. I took it one step further and created three threads that print A, B, C, A, B, C,…
Each thread waits for its
atomic<bool>
to be set to true while spinning; prints then sets the next thread’s flag. The wait is a CAS (compare-and-swap) operation.
Next, I will try to figure out how to relax the
compare_exchange
and the
store
calls with
memory_order_acquire
and
memory_order_release
parameters 🙂

Complete listing:

#include 
#include 
#include 
using namespace std;

const int COUNT = 3;

int main(int argc, char** argv)
{
	atomic e1{false}, e2{false}, e3{false};
	
	thread t1([&](){
		for(int i = 0; i < COUNT; ++i)
		{
			bool e = true;
			while(!e1.compare_exchange_strong(e, false)) e = true;
			cout << "A" << endl;
			e2.store(true);
		}
	});
	
	thread t2([&](){
		for(int i = 0; i < COUNT; ++i)
		{
			bool e = true;
			while(!e2.compare_exchange_strong(e, false)) e = true;
			cout << "B" << endl;
			e3.store(true);
		}
	});
	
	thread t3([&](){
		for(int i = 0; i < COUNT; ++i)
		{
			bool e = true;
			while(!e3.compare_exchange_strong(e, false)) e = true;
			cout << "C" << endl;
			e1.store(true);
		}
	});
	
	e1.store(true);
	
	t1.join();
	t2.join();
	t3.join();
	
	return 1;
}

A
B
C
A
B
C
A
B
C
Program ended with exit code: 1

Program output.

Fun with unordered containers

I’ve been playing around with

unordered_map
and
unordered_set
to see how they grow when elements are inserted. Here’s what I learned so far: the default load factor is 1; this means the container will grow to accommodate at most 1 element per bucket. You can change the load factor with
max_load_factor
call followed by
rehash
call. I also wanted to see how different STL implementations behave, so I tested with GCC, Apple’s CLAND, and LLVM on my Mac. The test program creates an
unordered_map
and
unordered_set
. Loads it with
10'000'000
entries and prints out the container’s properties. I then clear the container and do the same test but
reserve
space for the entries. I print the container’s properties: size, bucket count, and load factor. Next I change the
max_load_factor
to 10 (allowing for up to 10 entries per bucket) and rehash with maximum 1/10th the bucket count.
The GCC implementation is more aggressive in terms of container growth; it creates more buckets than Apple’s CLANG and LLVM’s implementation which had identical results.

The takeaway here is that when using unordered containers it is good to either reserve the appropriate element count, or rehash after inserting the elements to reduce the bucket count and optimize memory use.

****************************************
* GCC -Ofast -march=native -lstdc++ *
****************************************
unordered_map
initial
size = 10000000
bucket count = 15345007
load factor = 0.651678

reserved
size = 10000000
bucket count = 10352717
load factor = 0.96593

re-hashed
size = 10000000
bucket count = 1056323
load factor = 9.4668

unordered_set
initial
size = 10000000
bucket count = 15345007
load factor = 0.651678

reserved
size = 10000000
bucket count = 10352717
load factor = 0.96593

re-hashed
size = 10000000
bucket count = 1056323
load factor = 9.4668

**********************************************
* Apple CLANG -Ofast -march=native -lc++ *
**********************************************
unordered_map
initial
size = 10000000
bucket count = 13169977
load factor = 0.759303

reserved
size = 10000000
bucket count = 10000019
load factor = 0.999998

re-hashed
size = 10000000
bucket count = 1000003
load factor = 9.99997

unordered_set
initial
size = 10000000
bucket count = 13169977
load factor = 0.759303

reserved
size = 10000000
bucket count = 10000019
load factor = 0.999998

re-hashed
size = 10000000
bucket count = 1000003
load factor = 9.99997

Program output.

Complete listing:

#include 
#include 
#include "trace.h"
using namespace std;

const int COUNT = 10'000'000;

int main(int argc, char** argv)
{
	unordered_map m;
	for(int i = 0; i < COUNT; ++i)
		m[i] = i;

	trace("unordered_map");
	trace("initial");
	trace("size         = ", m.size());
	trace("bucket count = ", m.bucket_count());
	trace("load factor  = ", m.load_factor(), "\n");

	m.clear();
	m.reserve(COUNT);
	for(int i = 0; i < COUNT; ++i)
		m[i] = i;

	trace("reserved");
	trace("size         = ", m.size());
	trace("bucket count = ", m.bucket_count());
	trace("load factor  = ", m.load_factor(), "\n");

	m.max_load_factor(10);
	m.rehash(COUNT / 10);

	trace("re-hashed");
	trace("size         = ", m.size());
	trace("bucket count = ", m.bucket_count());
	trace("load factor  = ", m.load_factor(), "\n");

	unordered_set s;
	for(int i = 0; i < COUNT; ++i)
		s.insert(i);

	trace("unordered_set");
	trace("initial");
	trace("size         = ", s.size());
	trace("bucket count = ", s.bucket_count());
	trace("load factor  = ", s.load_factor(), "\n");

	s.clear();
	s.reserve(COUNT);
	for(int i = 0; i < COUNT; ++i)
		s.insert(i);

	trace("reserved");
	trace("size         = ", s.size());
	trace("bucket count = ", s.bucket_count());
	trace("load factor  = ", s.load_factor(), "\n");

	s.max_load_factor(10);
	s.rehash(COUNT / 10);

	trace("re-hashed");
	trace("size         = ", s.size());
	trace("bucket count = ", s.bucket_count());
	trace("load factor  = ", s.load_factor(), "\n");

	return 1;
}

trace.h:

#pragma once

#include 
#include 
#include "mutex.h"

namespace { static inline fast_mutex kStdOutLock; }

template
inline void trace(Ts&&... args)
{
	std::scoped_lock lock(kStdOutLock);
	(std::cout << ... << args) << std::endl;
}

Short introduction to parameter packs and fold expressions

Docendo discimus.

Seneca the Younger.

I’m new to this topic since I literally just learned about the fold expressions today, and parameter packs days before that, so it will be a short post where I’ll explain the basics 🙂

Let’s jump straight to the code:

template
int sum(Args&&... args)
{
    return (args + ...);
}

Line 1 tells us there will be zero or more parameters.
Line 2 declares a function with said variable number of parameters.
Line 4 is where the magic happens. The fold expression states that for every parameter in the

args
parameter pack, combine it with the next one using
operator+
. So it becomes
args0 + args1 + ... + argsN
.

Another example:

template
inline void trace(Ts&&... args)
{
    (cout << ... << args) << endl;
}

Here in line 4 the fold expression takes the form of "(init op ... op pack)" as described by syntax #4 here. It expands to

cout << args0 << args1 << ... << argsN
.

Another way of doing the same is:

template
inline void trace(Ts&&... args)
{
    ((cout << args << " "),...) << endl;
}

Here the

cout << args << " "
is expanded N times, separated by the comma operator, so it's less efficient but we get a space between each printed argument.

That's it for now 🙂 I'll post more as I learn more!

Complete listing:

#include 
using namespace std;

template
int sum(Args&&... args)
{
    return (args + ...);
}

template
inline void trace(Ts&&... args)
{
    //(cout << ... << args) << endl;
    ((cout << args << " "),...) << endl;
}

int main(int argc, char** argv)
{
    trace(sum(1));
    trace(sum(1,2,3,4,5));
    trace(sum(1,1,2,3,5,8,13,21,34,55));
    trace(1, 2, 3);

    return 1;
}

Pure virtual destructor

Did you know that a destructor can be pure virtual? It can be defined as such, but it still needs a body declaration. It is a way of making a class abstract (instances of it cannot be created) without having to make any pure virtual methods. So use it if you have a base class that you don’t want users to instantiate but you have no other candidates for pure virtual specifier 🙂

Additional discussion of this topic at Geeks for Geeks: Pure virtual destructor in C++.

Complete listing:

class base
{
public:
	virtual ~base() = 0;
};

base::~base() {}

class derived : public base
{
public:
	virtual ~derived() {}
};

int main(int argc, char** argv)
{
	//base b; // Compile error: Variable type 'base' is an abstract class
	derived d;

	return 1;
}

Read/write mutex

Designed and implemented by Chris M. Thomasson. Complete implementation can be found on GitHub.

mutex.h:

class rw_fast_mutex
{
public:
	rw_fast_mutex()
	: m_wrstate(1), m_count(INT_MAX), m_rdwake(0),
	m_rdwset(0), m_wrwset(0), m_wrmtx(0) {}

	void read_lock()
	{
		if (m_count.fetch_add(-1, std::memory_order_acquire) < 1)
			m_rdwset.wait();
	}

	void read_unlock()
	{
		if (m_count.fetch_add(1, std::memory_order_release) < 0)
			if (m_rdwake.fetch_add(-1, std::memory_order_acq_rel) == 1)
				m_wrwset.post();
	}

	void write_lock()
	{
		if (m_wrstate.fetch_sub(1, std::memory_order_acquire) < 1)
			m_wrmtx.wait();
		int count = m_count.fetch_add(-INT_MAX, std::memory_order_acquire);
		if (count < INT_MAX)
		{
			int rdwake = m_rdwake.fetch_add(INT_MAX - count, std::memory_order_acquire);
			if (rdwake + INT_MAX - count)
				m_wrwset.wait();
		}
	}

	void write_unlock()
	{
		int count = m_count.fetch_add(INT_MAX, std::memory_order_release);
		if (count < 0)
			m_rdwset.post(-count);
		if (m_wrstate.fetch_add(1, std::memory_order_release) < 0)
			m_wrmtx.post();
	}

private:
	std::atomic m_wrstate;
	std::atomic m_count;
	std::atomic m_rdwake;

	semaphore m_rdwset;
	semaphore m_wrwset;
	semaphore m_wrmtx;
};

Templates in STL containers

Storing template classes in STL containers is tricky 🙂 If

v
is a
std::vector
and
TC
is a template class, how can we do the following:

v.push_back(new TC('X'));
v.push_back(new TC(1));
v.push_back(new TC(3.141));

for(auto it : v)
    it->doWork();

The trick is two-fold: 1) we need a non-template base class (let’s call it

NTB
) and 2) we need to store pointers to the non-template base class in the STL container. Like this:

class NTB
{
public:
    virtual ~NTB() {}
    virtual void doWork() = 0;
};

template
class TC : public NTB
{
public:
    TC(T t) : m_T(t) {}
    virtual void doWork() override { cout << m_T << endl; }
private:
    T m_T;
};

Complete listing:

#include 
#include 
using namespace std;

class NTB
{
public:
    virtual ~NTB() {}
    virtual void doWork() = 0;
};

template
class TC : public NTB
{
public:
    TC(T t) : m_T(t) {}
    virtual void doWork() override { cout << m_T << endl; }
private:
    T m_T;
};

int main(int argc, char** argv)
{
    vector v;
    v.push_back(new TC('X'));
    v.push_back(new TC(1));
    v.push_back(new TC(3.141));

    for(auto it : v)
        it->doWork();

    for(auto it : v)
        delete it;

    return 1;
}

Multiple return values

C++17 gives us an elegant way to return multiple values from a function using structured binding.

Here’s how we can declare a function that returns 3 values:

template
auto ReturnThree(T t) -> tuple
{
    return {t / 9, t / 3, t};
}

This function takes one argument of type

T
, and returns a
tuple<T, T, T>
of 3 values.

In order to get the return values in C++11 we would have to write the following:

tuple<int, int, int> t = ReturnThree(9);

But with C++17 and structured binding syntax we can write:

auto [t1, t2, t3] = ReturnThree(9);

Much cleaner code this way 🙂

Complete listing:

#include 
using namespace std;

template
auto ReturnThree(T t) -> tuple
{
    return {t / 9, t / 3, t};
}

int main(int argc, char** argv)
{
    tuple t = ReturnThree(9); // Old and rusty
    auto [t1, t2, t3] = ReturnThree(9); // New and shiny
    return 1;
}

Simple timer

Jonathan Boccara over at Fluent{C++} made a post a while ago titled A Simple Timer in C++. I felt things could be done… different 😉 so I decided to write my own version of the timer code.

First, I felt there’s no need to actually instantiate

timer
objects; a simple function call to
set_timeout
or
set_interval
from
namespace timer
should be sufficient.
Second, I didn’t like the way cancellation was done. Single
stop
call interrupted all intervals and timeouts. How about a cancelation event per
set_timeout
or
set_interval
call?
Finally, I wanted the
set_timeout
and
set_interval
functions to accept any callable with any number of arguments.
That’s exactly how I designed my interface.

Usage example:

#include 
#include 
#include "timer.h"
using namespace std;

mutex cout_lock;
#define trace(x) { scoped_lock lock(cout_lock); cout << x << endl; }

int main(int argc, char** argv)
{
    auto e1 = timer::set_timeout(1s, []() { trace("timeout"); });
    auto e2 = timer::set_timeout(6s, []() { trace("canceled timeout"); });

    auto e3 = timer::set_interval(1s, []() { trace("interval"); });
    auto e4 = timer::set_interval(6s, []() { trace("canceled interval"); });

    trace("waiting 5s...");
    this_thread::sleep_for(5s);

    e2->signal();
    e4->signal();

    trace("waiting 5s...");
    this_thread::sleep_for(5s);

    return 1;
}

waiting 5s…
timeout
interval
interval
interval
interval
waiting 5s…
interval
interval
interval
interval
interval
Program ended with exit code: 1

Program output.

timer.h:

#pragma once

#include 
#include 
#include "event.h"

namespace timer
{
    template
    std::shared_ptr set_timeout(D d, F f, Args&&... args)
    {
        auto event = std::make_shared();
        std::thread([=]()
        {
            if(event->wait_for(d)) return;
            f(args...);
        }).detach();
        return event;
    }

    template
    std::shared_ptr set_interval(D d, F f, Args&&... args)
    {
        auto event = std::make_shared();
        std::thread([=]()
        {
            while(true)
            {
                if(event->wait_for(d)) return;
                f(args...);
            }
        }).detach();
        return event;
    }
}

Updated event.h:

#pragma once

#include 
#include 

class manual_event
{
public:
    explicit manual_event(bool signaled = false) noexcept
    : m_signaled(signaled) {}

    void signal() noexcept
    {
        {
            std::unique_lock lock(m_mutex);
            m_signaled = true;
        }
        m_cv.notify_all();
    }

    void wait() noexcept
    {
        std::unique_lock lock(m_mutex);
        m_cv.wait(lock, [&](){ return m_signaled != false; });
    }

    template
    bool wait_for(T t) noexcept
    {
        std::unique_lock lock(m_mutex);
        return m_cv.wait_for(lock, t, [&](){ return m_signaled != false; });
    }

    template
    bool wait_until(T t) noexcept
    {
        std::unique_lock lock(m_mutex);
        return m_cv.wait_until(lock, t, [&](){ return m_signaled != false; });
    }

    void reset() noexcept
    {
        std::unique_lock lock(m_mutex);
        m_signaled = false;
    }

private:
    bool m_signaled = false;
    std::mutex m_mutex;
    std::condition_variable m_cv;
};

Interview question, part 6

Given a set of integer ranges defined as [LO, HI) and a value P, find which range P falls into.

My approach to this programming puzzle was to first define a

range
as a
struct
that can be sorted (thanks to
operator <
), then perform binary search on a
vector
of sorted ranges. The code is pretty self explanatory 🙂

Example input and output:

LO: 0, HI: 10
LO: 10, HI: 20
LO: 20, HI: 30
LO: 30, HI: 40
LO: 40, HI: 50
LO: 50, HI: 60
LO: 60, HI: 70
LO: 70, HI: 80
LO: 80, HI: 90
LO: 90, HI: 100
P = 15 falls in range LO: 10, HI: 20
P = 16 falls in range LO: 10, HI: 20
P = 4 falls in range LO: 0, HI: 10
P = 73 falls in range LO: 70, HI: 80
P = 25 falls in range LO: 20, HI: 30
P = 28 falls in range LO: 20, HI: 30
P = 19 falls in range LO: 10, HI: 20
P = 60 falls in range LO: 60, HI: 70
P = 83 falls in range LO: 80, HI: 90
P = 76 falls in range LO: 70, HI: 80
Program ended with exit code: 1

The answer:

#include 
#include 
#include 
#include 
#include 
using namespace std;

mutex cout_lock;
#define trace(x) { scoped_lock lock(cout_lock); cout << x << endl; }

const int COUNT = 10;

struct range
{
    unsigned int lo;
    unsigned int hi;

    bool in_range(unsigned int p) const { return lo <= p && p < hi; }
};

bool operator < (const range& lhs, const range& rhs)
{
    return lhs.lo < rhs.lo;
}

ostream& operator << (ostream& os, const range& r)
{
    os << "LO: " << r.lo << ", HI: " << r.hi;
    return os;
}

range BinarySearch(const vector& v, unsigned int p)
{
    size_t index = v.size() / 2;
    size_t step = index / 2 + 1;

    while(true)
    {
        if(v[index].hi <= p) index += step;
        if(v[index].lo > p) index -= step;
        step /= 2;
        if(step == 0) step = 1;
        if(v[index].in_range(p)) break;
    }

    return v[index];
}

int main(int argc, char** argv)
{
    srand((unsigned int)time(NULL));

    vector ranges =
    {
        {50, 60}, {60, 70}, {70, 80}, {80, 90}, {90, 100},
        {0, 10}, {10, 20}, {20, 30}, {30, 40}, {40, 50}
    };

    sort(begin(ranges), end(ranges));

    for(const auto& r : ranges)
        trace(r);

    for(int i = 0; i < COUNT; ++i)
    {
        unsigned int p = rand() % 100;
        trace("P = " << p << " falls in range " << BinarySearch(ranges, p));
    }

    return 1;
}

Fast mutex

Many thanks to Chris M. Thomasson for rewriting POSIX Threads for Win32 mutex into standard C++ implementation. Using

auto_event
class from Event objects.

mutex.h:

#pragma once

#include 
#include "event.h"

class fast_mutex
{
public:
    fast_mutex() : m_state(0) {}

    void lock()
    {
        if (m_state.exchange(1, std::memory_order_acquire))
            while (m_state.exchange(2, std::memory_order_acquire))
                m_waitset.wait();
    }

    void unlock()
    {
        if (m_state.exchange(0, std::memory_order_release) == 2)
            m_waitset.signal();
    }

private:
    std::atomic m_state;
    auto_event m_waitset;
};

Simple thread pool

LESSON RECORDING: How to implement a (simple) thread pool.

PART 2: Advanced Thread Pool.

I know the topic of thread pools has been beaten to death on the internet, nevertheless I wanted to present to you my implementation which uses only standard C++ components 🙂

I will be using queue and semaphore classes discussed in my earlier posts.

Below you will find a simple thread pool implementation which can be parametrized by the number of worker threads and the blocking queue depth of work items. Each thread waits on a

blocking_queue::pop()
until a work item shows up. The threads pick up work items randomly off of the queue, execute them, then go back to
blocking_queue::pop()
. Destruction and cleanup of threads is done with
nullptr
sentinel pushed onto the queue. If a sentinel is popped off the queue the thread will push it back and break out of its work loop. This way all threads are waited on and allowed to finish all unprocessed work items during destruction of a
pool
instance.
Moreover, a work item can be any callable entity: lambda, functor, or a function pointer. Work item can accept any number of parameters thanks to template parameter pack of
pool::enqueue_work()
.

UPDATE:

Thank you reddit user sumo952 for bringing to my attention the progschj/ThreadPool. I have updated my implementation to support futures and the ability to retrieve work item’s result.

Usage example:

#include 
#include 
#include 
#include "pool.h"
using namespace std;

mutex cout_lock;
#define trace(x) { scoped_lock lock(cout_lock); cout << x << endl; }

const int COUNT = thread::hardware_concurrency();
const int WORK = 10'000'000;

int main(int argc, char** argv)
{
    srand((unsigned int)time(NULL));

    thread_pool pool;

    auto result = pool.enqueue_task([](int i) { return i; }, 0xFF);
    result.get();

    for(int i = 1; i <= COUNT; ++i)
        pool.enqueue_work([](int workerNumber) {
            int workOutput = 0;
            int work = WORK + (rand() % (WORK));
            trace("work item " << workerNumber << " starting " << work << " iterations...");
            for(int w = 0; w < work; ++w)
                workOutput += rand();
            trace("work item " << workerNumber << " finished");
        }, i);

    return 1;
}

work item 1 starting 170521507 iterations...
work item 2 starting 141859716 iterations...
work item 2 finished
work item 3 starting 189442810 iterations...
work item 1 finished
work item 4 starting 125609749 iterations...
work item 4 finished
work item 3 finished
Program ended with exit code: 1

Program output.

pool.h:

#pragma once

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include "queue.h"

class thread_pool
{
public:
    thread_pool(
        unsigned int queueDepth = std::thread::hardware_concurrency(),
        size_t threads = std::thread::hardware_concurrency())
    : m_workQueue(queueDepth)
    {
        assert(queueDepth != 0);
        assert(threads != 0);
        for(size_t i = 0; i < threads; ++i)
            m_threads.emplace_back(std::thread([this]() {
                while(true)
                {
                    auto workItem = m_workQueue.pop();
                    if(workItem == nullptr)
                    {
                        m_workQueue.push(nullptr);
                        break;
                    }
                    workItem();
                }
            }));
    }

    ~thread_pool() noexcept
    {
        m_workQueue.push(nullptr);
        for(auto& thread : m_threads)
            thread.join();
    }

    using Proc = std::function;

    template
    void enqueue_work(F&& f, Args&&... args) noexcept(std::is_nothrow_invocable::push)>::value)
    {
        m_workQueue.push([=]() { f(args...); });
    }

    template
    auto enqueue_task(F&& f, Args&&... args) -> std::future::type>
    {
        using return_type = typename std::result_of::type;
        auto task = std::make_shared>(std::bind(std::forward(f), std::forward(args)...));
        std::future res = task->get_future();
        m_workQueue.push([task](){ (*task)(); });
        return res;
    }

private:
    using ThreadPool = std::vector;
    ThreadPool m_threads;
    blocking_queue m_workQueue;
};