Where produce-consumer pattern is present it is often the case that one is faster that the other: a parsing producer reads records faster than a processing consumer; a disk reading producer is faster than network sending consumer.
Producer and consumer often communicate by queues: the producer will put items on a queue while the consumer will pop items off a queue. What happens when the queue becomes full, or empty? One approach of the producer is to try to put an item on a queue and if it’s full yield the thread and repeat. Similarly the consumer can try to pop an item off a queue and if it’s empty, ditto. This approach of try-fail-yield can unnecessarily burn CPU cycles in tight loops that constantly try to put or pop items off a queue.
Another approach is to temporarily grow the queue, but that doesn’t scale well. When do we stop growing? And once we stop we have to fall back onto the try-fail-yield method.

What if we could implement a blocking queue: a queue who’s put operation blocks when the queue if full, and unblocks only when another thread pops an item off the queue. Similarly a queue who’s pop operation blocks when the queue is empty, and unblocks only when another thread puts an item on the queue. An example of using such a queue would look like this (notice a fast producer and slow consumer in the code below):

Notice no try-fail-yield code in the example above. The put operation of the fast producer simply blocks until the slow consumer makes more room on the queue. The output of the program is as we expect; the fast producer fills up the queue then blocks, a second later the consumer starts to slowly pop items of the queue; they go in tandem for a while until the producer exits and the consumer drains the queue:

push v = 1
push v = 2
push v = 3
push v = 4
push v = 5
pop  v = 1
push v = 6
pop  v = 2
push v = 7
pop  v = 3
push v = 8
pop  v = 4
push v = 9
pop  v = 5
push v = 10
pop  v = 6
pop  v = 7
pop  v = 8
pop  v = 9
pop  v = 10
Program ended with exit code: 1

The trick to implementing such a queue is the ability to count both open and full slots of the queue, and block. A semaphore is a perfect mechanism to do just that 🙂 In fact we need two semaphores: one to count the open slots, and another to count the full slots. The open-slot semaphore starts with a count equal to the size of the queue. The full-slot semaphore starts with a count of zero. A push operation waits on the open-slot semaphore and signals the full-slot semaphore. A pop operation waits on the full-slot semaphore and signals the open-slot semaphore.

The blocking queue implementation below uses Boost semaphores to count and std::mutex to protect the critical section. In the next post I will show how to make it safe in the presence of exceptions; currently it misbehaves if T’s copy constructor or assignment operator throw (it assumes T’s destructor will never throw).

queue.h

P.S. It was suggested to me that the use of boost::interprocess::interprocess_semaphore is a heavy-handed approach. I agree. I only used it to keep the example code small and uncluttered with more utility classes. In production you should have a lightweight semaphore class that uses a mutex and a condition variable. Like this 🙂 …

6 Replies to “Blocking queue”

  1. Pingback: WOW Blog

Leave a Reply