In the previous post we discussed a multi-threaded blocking queue who’s implementation was lacking: it was not exception safe. It left the semaphore counts wrongly modified in the presence of exceptions emitted by T’s copy constructor or assignment operator.
Below is a corrected void push(const T item) method:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
void push(const T& item) { m_openSlots.wait(); { std::lock_guard<std::mutex> lock(m_cs); try { new (m_data + m_pushIndex) T (item); } catch (...) { m_openSlots.post(); throw; } m_pushIndex = ++m_pushIndex % m_size; ++m_count; } m_fullSlots.post(); } |
In the corrected version we first catch any exception possibly emitted by T’s copy constructor then adjust back the open-slot semaphore. Since the push operation failed the number of open and full slots has not changed. Finally we re-throw the exception.
Below is a corrected void pop(T item) method:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
void pop(T& item) { m_fullSlots.wait(); { std::lock_guard<std::mutex> lock(m_cs); try { item = m_data[m_popIndex]; } catch (...) { m_fullSlots.post(); throw; } m_data[m_popIndex].~T(); m_popIndex = ++m_popIndex % m_size; --m_count; } m_openSlots.post(); } |
In the corrected version we first catch any exception possibly emitted by T’s assignment operator then adjust back the full-slot semaphore. Since the pop operation failed the number of open and full slots has not changed. Finally we re-throw the exception.
We now have a robust queue template class that can handle misbehaving T’s 🙂
It’s possible to write “luckless” multi producer multi consumer queues. This can be done using various types interlocked atomic (increment and compare exchange) operations. You usually use a counter and an interlocked operation twice the with of a pointer to avoide the aba problem.
It’s worth a search and some heavy reading but the performance gains with moving to lockless structures can be high if that is truly the critical path.
Another special is a shadowed control structure single producer single consumer queue. You use these in a NUMA environment. You can then implement a queue such that you only read locally and write remotely which is a key design decision when dealing with high performance NUMA architectures.