Robust C++: Queue Templates
Implementing corruptible queues that don't manage memory
Introduction
If you're using the STL and need a queue, you'll probably opt for std::deque
or std::list
. For some systems, however, using these templates can be problematic:
- Their memory overhead can be significant in a system with many short queues.
- They allocate and manage memory, which can conflict with the need to provide predictable response times in a hard real-time system.
- They hide the queue data structure, which makes it difficult for a robust system to recover from a queue corruption caused by an unguarded critical region or trampled memory.
This article introduces templates that can be used when these issues make it undesirable to use an STL template.
Background
The templates in this article come from the Robust Services Core (RSC), an open-source framework for developing robust C++ applications. The templates are
Q1Way
, for one-way queues, andQ2Way
, for two-way queues.
Because their implementations are similar, this article only describes Q1Way
, which is used more frequently. Q2Way
is only used for performance reasons, when elements must often be removed from the middle of a relatively long queue. Q2Way
also supports backwards iteration, but applications seldom need that capability.
Using the Code
The code has been edited to remove things that are not central to the templates. These things appear in the full versions of the code in the attached .zip file, so you can remove them if you're not using RSC.
The crux of the design is that a queueable object defines a Q1Link
member which provides the link to the next object in the queue. Because of this, the template does not need to allocate memory for elements on the queue.
Here is the code for Q1Link
, except for its destructor.
// Link for an item on a one-way queue. An object that resides on a one-way
// queue includes this as a member and implements a LinkDiff function that
// returns the distance between the top of the object and its Q1Link member.
//
class Q1Link
{
template<class T> friend class Q1Way;
public:
// Public because an instance of this class is included in objects that
// can be queued.
//
Q1Link() : next(nullptr) { }
// Public because an instance of this class is included in objects that
// can be queued.
//
~Q1Link();
// Deleted to prohibit copying.
//
Q1Link(const Q1Link& that) = delete;
Q1Link& operator=(const Q1Link& that) = delete;
// Returns true if the item is on a queue.
//
bool IsQueued() const { return next != nullptr; }
// Returns a string for displaying the link.
//
std::string to_str() const
{
std::ostringstream stream;
stream << next;
return stream.str();
}
private:
// The next item in the queue. Because Q1Way uses circular queues, a
// value of nullptr means that the item is not on a queue.
//
Q1Link* next;
};
An object should be removed from a queue before it is deleted. But if it's still queued, the Q1Link
destructor removes it to prevent a broken link from corrupting the queue and orphaning the objects that follow it:
Q1Link::~Q1Link()
{
// If the item is still queued, exqueue it. This is a serious problem if
// it is the tail item, because it will leave the queue head pointing to
// a deleted item.
//
if(next == nullptr) return;
auto prev = this;
auto curr = next;
auto i = INT16_MAX;
while((i >= 0) && (curr != nullptr))
{
if(curr == this)
{
auto before = INT16_MAX - i;
if(before == 0)
{
Debug::SwLog("tail item was still queued", before);
prev->next = nullptr;
}
else
{
Debug::SwLog("item was still queued", before);
prev->next = curr->next;
}
return;
}
--i;
prev = curr;
curr = curr->next;
}
Debug::SwLog("item is still queued", INT16_MAX - i);
}
Q1Link
prohibited copying, which can corrupt a queue by duplicating a link. Because a queueable object defines a Q1Link
member, this also makes the object uncopyable. If it must support copying, Q1Link
's copy constructor and copy operator could be implemented by invoking its destructor (to make sure the item isn't queued) and then initializing next
to nullptr
so that the item's new incarnation remains unqueued.
Now we can start to look at the Q1Way
template itself. Note that it points to the queue's tail element, which makes both enqueue and dequeue operations efficient. The queued objects actually form a circular queue.
// One-way queue. Recommended unless items are often exqueued, which
// can be expensive.
// o no items: tail_.next = nullptr
// o one item: tail_.next = item, item.next = item (points to itself)
// o two or more items: tail_.next = last item, last item.next = first
// item, second last item.next = last item (circular queue)
//
template<class T> class Q1Way
{
public:
// Initializes the queue header to default values. Before the queue
// can be used, Init must be invoked.
//
Q1Way() : tail_(nullptr), diff_(NilDiff) { }
// Cleans up the queue.
//
~Q1Way()
{
if(tail_.next == nullptr) return;
Purge();
}
// Deleted to prohibit copying.
//
Q1Way(const Q1Way& that) = delete;
Q1Way& operator=(const Q1Way& that) = delete;
// Initializes the queue so that it can be used.
//
void Init(ptrdiff_t diff)
{
tail_.next = nullptr; // queue is empty
diff_ = diff; // distance from each item's vptr to its Q1Link
}
private:
// For initializing diff_.
//
static const ptrdiff_t NilDiff = -1;
// The queue head, which actually points to the tail item.
// If the queue is empty, tail_.next is nullptr.
//
Q1Link tail_;
// The distance from an item's vptr to its Q1Link.
//
ptrdiff_t diff_;
};
In order to manipulate the queue links, Q1Way
needs to know where each object's Q1Link
is located. This is the purpose of diff_
, which is the distance (in bytes) from the beginning of the object to its Q1Link
member. All the objects in a queue must use the same offset, so a common base class is needed if different classes will have objects in the same queue.
The value of diff_
is returned by a grotty function that RSC classes always name LinkDiff
. For example, RSC's Thread
class defines a msgq_
member where messages for a thread are queued. These messages derive from MsgBuffer
, which therefore needs a Q1Link
member and a LinkDiff
function that allows Thread
to initialize msgq_
. However, MsgBuffer
derives from Pooled
, which already provides a Q1Link
member and LinkDiff
function for all pooled objects. The outline (omitting other members) therefore looks like this:
class Pooled : public Object
{
public:
static ptrdiff_t LinkDiff() const;
private:
Q1Link link_;
};
ptrdiff_t Pooled ::LinkDiff()
{
uintptr_t local;
auto fake = reinterpret_cast<const Pooled*>(&local);
return ptrdiff(&fake->link_, fake);
}
class Thread : public Permanent
{
private:
Q1Way<MsgBuffer> msgq_;
};
msgq_.Init(Pooled::LinkDiff()); // in Thread's constructor
Once Init
has been invoked, the Q1Way
function Item
can find the location of each element's Q1Link
. This function only needs to be used internally and is therefore private
:
Q1Link* Item(const T& elem) const
{
if(diff_ == NilDiff) return nullptr;
if(&elem == nullptr) return nullptr;
return (Q1Link*) getptr2(&elem, diff_); // adds diff_ bytes to &elem
}
An item is usually added to the back of the queue:
bool Enq(T& elem)
{
auto item = Item(elem);
if(item == nullptr) return false; // error
if(item->next != nullptr) return false; // if item is queued, do nothing
if(tail_.next != nullptr) // if queue isn't empty
{ // then
item->next = tail_.next->next; // item points to first element
tail_.next->next = item; // last element points to item
}
else
{
item->next = item; // else item points to itself
}
tail_.next = item; // tail points to item
return true;
}
An item can also be added to the front of the queue:
bool Henq(T& elem)
{
auto item = Item(elem);
if(item == nullptr) return false; // error
if(item->next != nullptr) return false; // if item is queued, do nothing
if(tail_.next != nullptr) // if queue isn't empty
{
item->next = tail_.next->next; // item points to first element
tail_.next->next = item; // last element points to item
} // tail isn't changed, so item is
else // after last (and is thus first)
{
item->next = item; // item points to itself
tail_.next = item; // tail points to item
}
return true;
}
An item can even be added to the middle of the queue:
bool Insert(T* prev, T& elem)
{
if(prev == nullptr) // if nothing is previous
return Henq(elem); // then put item at head
auto item = Item(elem);
if(item == nullptr) return false; // error
auto ante = (Q1Link*)
getptr2(prev, diff_); // put item after previous
if(item->next != nullptr) return false; // item must not be queued
if(ante->next == nullptr) return false; // prev must be queued
item->next = ante->next;
ante->next = item;
if(tail_.next == ante) tail_.next = item; // update tail if item is last
return true;
}
An item is usually removed from the front of the queue:
T* Deq()
{
if(tail_.next == nullptr) // if tail points to nothing
return nullptr; // then queue is empty
Q1Link* item = tail_.next->next; // set item to first element
if(tail_.next != item) // if item wasn't alone
tail_.next->next = item->next; // then last now points to second
else
tail_.next = nullptr; // else queue is now empty
item->next = nullptr; // item has been removed
return (T*) getptr1(item, diff_); // location of item's vptr (subtracts
// diff_ bytes from item)
}
If necessary, an item can be removed from anywhere on the queue:
bool Exq(T& elem)
{
auto item = Item(elem);
if(item == nullptr) return false; // error
if(item->next == nullptr) // if the item isn't queued
return true; // then return immediately
if(tail_.next == nullptr) // if the queue is empty
return false; // then the item can't be there
if(item->next == item) // if the item points to itself
{
if(tail_.next == item) // and if the item is also last
{
item->next = nullptr; // then remove the item
tail_.next = nullptr; // and the queue is now empty
return true;
}
return false; // the item is on another queue
}
auto curr = tail_.next; // starting at the last element,
while(curr->next != item) // advance until the item is
{
curr = curr->next; // the next element--but
if(curr == tail_.next) // stop after searching the
return false; // entire queue
}
curr->next = item->next; // curr's next becomes item's next
if(tail_.next == item) // if the item was the tail
tail_.next = curr; // then the tail has to back up
item->next = nullptr; // the item has been removed
return true;
}
The functions First
and Next
support queue traversal. They simply use a pointer to the current item rather than a distinct iterator. Traversal usually takes the form
for(auto item = queue_.First(); item != nullptr; queue_.Next(item))…
T* First() const
{
if(diff_ == NilDiff) return nullptr; // queue is not initialized
Q1Link* item = tail_.next; // set item to first element
if(item == nullptr) return nullptr; // queue is empty
item = item->next; // advance to head
return (T*) getptr1(item, diff_); // location of item's vptr
}
// Updates ELEM to the next item in the queue. If ELEM is nullptr,
// provides the first item. Returns true if there was a next item.
//
bool Next(T*& elem) const
{
if(diff_ == NilDiff) return false; // queue is not initialized
Q1Link* item; // item will hold result
if(elem == nullptr) // nullptr means
{
item = tail_.next; // start at last element
if(item == nullptr) return false; // return if the queue is empty
}
else
{
item = (Q1Link*)
getptr2(elem, diff_); // start at the current item
if(tail_.next == item) // if that is the last element
{
elem = nullptr; // then there are no more
return false;
}
}
item = item->next; // advance to next element
if(item == nullptr) // make sure the item was queued
{
elem = nullptr;
return false;
}
elem = (T*) getptr1(item, diff_); // location of item's vptr
return true;
}
// Returns the item that follows ELEM.
//
T* Next(const T& elem) const
{
auto item = Item(elem);
if(item == nullptr) return nullptr; // error
if(tail_.next == item) return nullptr; // return if item was last
item = item->next; // advance to the next element
if(item == nullptr) return nullptr; // return if item wasn't queued
return (T*) getptr1(item, diff_); // location of next item's vptr
}
There are also the usual functions for finding the queue's length:
bool Empty() const { return (tail_.next == nullptr); }
size_t Size() const
{
if(diff_ == NilDiff) return 0; // queue is not initialized
Q1Link* item = tail_.next; // start at the last item
if(item == nullptr) return 0; // check for an empty queue
size_t count = 1; // there is at least one element
item = item->next; // advance to the first element
while(item != tail_.next) // stop when we reach the tail
{
item = item->next;
++count;
}
return count; // report the result
}
And finally, there is the equivalent of clear
:
void Purge()
{
while(tail_.next != nullptr)
{
auto item = Deq();
delete item;
}
}
If you want to modify the templates, NtIncrement.cpp contains Q1WayCommands
and Q2WayCommands
, which allow Q1Way
and Q2Way
functions to be tested via RSC's CLI.
Points of Interest
The following function is provided to test software that tries to recover after a queue corruption:
// Corrupts ELEM's next pointer for testing purposes. If ELEM is
// nullptr, the queue's tail pointer is corrupted instead.
//
void Corrupt(T* elem)
{
if(elem == nullptr) // if no item provided
{
tail_.next = (Q1Link*) BAD_POINTER; // corrupt queue header
return;
}
auto item = (Q1Link*) // start at the current item
getptr2(elem, diff_);
item->next = (Q1Link*) BAD_POINTER; // corrupt ELEM's next pointer
}
If the queue of available blocks in an object pool gets corrupted, how the object pool audit repairs it is described here.
History
- 13th June, 2020: Initial version