Click here to Skip to main content
15,900,461 members
Articles / Programming Languages / C#
Article

Lock Free Queue implementation in C++ and C#

Rate me:
Please Sign up or sign in to vote.
3.09/5 (20 votes)
23 Feb 2008CPOL4 min read 234.7K   5.1K   79   48
Lock Free Queue implementation in C++ and C#

Introduction

This article demonstrates implementation of a "lock free" queue in C# and C++. Lock free queues are typically used in multi-threaded architectures to communicate between threads without fear of data corruption or performance loss due to threads waiting to use shared memory. The goal of the article is to familiarize readers with lock free queues and provide a starting point to writing wait-free production architectures. It should be noted that using lock-free queues is only the beginning - a true lock free architecture use a lock free memory allocator. Implementing lock free memory allocators is beyond the scope of this article however.

Background

Recent developments in CPU architecture has necessitated a change in thinking in high performance software architecture - multithreaded software. Communication between threads in multithreaded architecture has traditionally been accomplished using mutexes, critical sections, and locks. Recent research in algorithms and changes in computer architecture has led to the introduction of "wait free", "lock free", or "non-blocking" data structures. The most popular and possibly the most important is the queue, a First In First Out (FIFO) data structure.

The key to the majority of lock free data structures is an instruction known as Compare and Swap (CAS). The flow chart below describes what Compare and Swap does. For the assembler coders out there the instruction is named CMPXCHG on X86 and Itanium architectures. The special thing about this instruction is that it is atomic- meaning that other threads\processes cannot interrupt until it is finished. Operating Systems use atomic operations to implement sychronization - locks, mutexes, semaphores, and critical sections.

Image 1

My code draws on research by Maged M. Michael and Michael L. Scott on non-blocking and blocking concurrent queue algorithms. In fact, an implementation of their queue is now part of the Java concurrency library. Their paper demonstrates why the queue is "linearizable" and "lock free". An implementation of the code in C is available here in tar.gz format. The idea is that pointers are reference counted and checked for consistency in a loop. The reference count is meant to prevent what is referred to the "ABA problem" - if a process or threads reads a value 'A' then attempts a CAS operation, the CAS operation might succeed incorrectly if a second thread or process changes value 'A' to 'B' and then back again to 'A'. If the "ABA" problem never occurs the code is safe because:

  1. The linked list is always connected.
  2. Nodes are only inserted at the end of the linked list and only inserted to a node that has a NULL next pointer.
  3. Nodes are deleted from the beginning of the list because they are only deleted from the head pointer and head always points to the first element of the list.
  4. Head always points to the first element of the list and only changes it's value atomically.

If CAS or similar instructions are not available I suggest using the STL queue or a similar queue with traditional sychronization primitives. Michael and Scott also present a "two lock" queue data structure.

Using the code

UML Diagram of sourceLockFreeQueue.jpg

Using the code provided with this article is simple.

  • C++ class declaration template< class T > class MSQueue
    • Include the "lockfree.h" file. #include "LockFreeQ.h"
    • Declare C++ queues like this: MSQueue< int > Q;
    • Add items to the queue: Q.enqueue(i);
    • Remove items from the queue: bool bIsQEmpty = Q.dequeue(i); dequeue returns false if the queue is empty and the value of i would be undefined.
  • C# class declaration namespace Lockfreeq { public class MSQueue<t /> {
    • Include the Lock Free Queue DLL: using Lockfreeq;
    • Declare a C# queue: MSQueue< int > Q = new MSQueue< int >();
    • Add items to the queue: Q.enqueue(i);
    • Remove items from the queue: bool bIsQEmpty = Q.dequeue(ref i); dequeue returns false if the queue is empty and the value of i would be undefined.

Points of Interest

Did you know that Valve Software (makers of Half-Life computer game) have switched to a wait free architecture?

History

  • This is my first article! (30.1.2008)
  • Revised 21:00 30.1.2008 (Thanks to the early commenters for spotting those mistakes!)
  • Noted need for memory allocator. 31.1.2008
  • Revision to correct inaccuracy with regards to Java Concurrency library.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Web Developer EMC Corporation
United States United States
Idaho Edokpayi is a Consultant working in EMC's Microsoft Practice specializing in WSS 3.0, MOSS 07, .NET, and ASP.NET technology. He also likes to write C++, Win32/64 API, and DirectX code in his spare time.

Comments and Discussions

 
GeneralRe: Is there still a chance of bug? Pin
LanceDiduck18-Apr-08 9:30
LanceDiduck18-Apr-08 9:30 
GeneralJava Concurrency Library Pin
Mike O'Neill21-Feb-08 14:09
Mike O'Neill21-Feb-08 14:09 
GeneralRe: Java Concurrency Library Pin
Idaho Edokpayi23-Feb-08 14:45
Idaho Edokpayi23-Feb-08 14:45 
QuestionCritical Section on the Stack? How Can It Provide Protection/Synchronization? Pin
Mike O'Neill20-Feb-08 15:11
Mike O'Neill20-Feb-08 15:11 
AnswerRe: Critical Section on the Stack? How Can It Provide Protection/Synchronization? Pin
Idaho Edokpayi23-Feb-08 14:48
Idaho Edokpayi23-Feb-08 14:48 
GeneralRe: Critical Section on the Stack? How Can It Provide Protection/Synchronization? Pin
opflucker5-May-08 15:13
opflucker5-May-08 15:13 
GeneralRe: Critical Section on the Stack? How Can It Provide Protection/Synchronization? Pin
Idaho Edokpayi5-May-08 15:29
Idaho Edokpayi5-May-08 15:29 
GeneralPerformance problem... Pin
jahuh6-Feb-08 13:28
jahuh6-Feb-08 13:28 
It works, however performance suffers because of the consistancy check.

The problem is the consistency check fails often under high read/write contention because multiple calls to Interlocks cannot synchronize a single state (e.g., the second Interlock attempting to keep the state consistant through variabloe count is causing the problem).

To show the problem, I ran a simple test with on a four core machine using two threads to populate the queue with integers (2 x 1M) and two threads consume from the queue. I created a second TestQueue class backed by a LinkedList<t> using monitor locking.

MSQueue averages 2.8 seconds
TestQueue averages 0.54 seconds

To run MSQueue, define USEMSQUEUE. To run TestQueue, do not define USEMSQUEUE

///////////////////////////////////////////////////////////////////////

#define USEMSQUEUE

using System;
using System.Collections.Generic;
using System.Threading;

namespace lfq
{
public class MSQueue<T>
{
public class node_t
{
public T value;
public pointer_t next;
/// <summary>
/// default constructor
/// </summary>
public node_t() { }

public override
string
ToString()
{
return string.Format("node_t(value={0}):{1}", value.ToString(), next.ToString());
}
}

public struct pointer_t
{
public long count;
public node_t ptr;

/// <summary>
/// copy constructor
/// </summary>
/// <param name="p"></param>
public pointer_t(pointer_t p)
{
ptr = p.ptr;
count = p.count;
}

/// <summary>
/// constructor that allows caller to specify ptr and count
/// </summary>
/// <param name="node"></param>
/// <param name="c"></param>
public pointer_t(node_t node, long c)
{
ptr = node;
count = c;
}

public override
string
ToString()
{
return string.Format("pointer_t(count={0}, ptr={1})", count, ptr == null ? "null" : ptr.ToString());
}
}

public pointer_t Head;
public pointer_t Tail;

public MSQueue()
{
node_t node = new node_t();
Head.ptr = Tail.ptr = node;
}

/// <summary>
/// CAS
/// stands for Compare And Swap
/// Interlocked Compare and Exchange operation
/// </summary>
/// <param name="destination"></param>
/// <param name="compared"></param>
/// <param name="exchange"></param>
/// <returns></returns>
private bool CAS(ref pointer_t destination, pointer_t compared, pointer_t exchange)
{
if (compared.ptr == Interlocked.CompareExchange(ref destination.ptr, exchange.ptr, compared.ptr))
{
Interlocked.Exchange(ref destination.count, exchange.count);
return true;
}

return false;
}

public bool deque(ref T t)
{
pointer_t head;

// Keep trying until deque is done
bool bDequeNotDone = true;
while (bDequeNotDone)
{
// read head
head = Head;

// read tail
pointer_t tail = Tail;

// read next
pointer_t next = head.ptr.next;

// Are head, tail, and next consistent?
if (head.count == Head.count && head.ptr == Head.ptr)
{
// is tail falling behind
if (head.ptr == tail.ptr)
{
// is the queue empty?
if (null == next.ptr)
{
// queue is empty cannnot dequeue
return false;
}

// Tail is falling behind. try to advance it
CAS(ref Tail, tail, new pointer_t(next.ptr, tail.count + 1));

} // endif

else // No need to deal with tail
{
// read value before CAS otherwise another deque might try to free the next node
t = next.ptr.value;

// try to swing the head to the next node
if (CAS(ref Head, head, new pointer_t(next.ptr, head.count + 1)))
{
bDequeNotDone = false;
}
}

} // endif

} // endloop

// dispose of head.ptr
return true;
}

public void enqueue(T t)
{
// Allocate a new node from the free list
node_t node = new node_t();

// copy enqueued value into node
node.value = t;

// keep trying until Enqueue is done
bool bEnqueueNotDone = true;

while (bEnqueueNotDone)
{
// read Tail.ptr and Tail.count together
pointer_t tail = Tail;

// read next ptr and next count together
pointer_t next = tail.ptr.next;

// are tail and next consistent
if (tail.count == Tail.count && tail.ptr == Tail.ptr)
{
// was tail pointing to the last node?
if (null == next.ptr)
{
if (CAS(ref tail.ptr.next, next, new pointer_t(node, next.count + 1)))
{
bEnqueueNotDone = false;
} // endif

} // endif

else // tail was not pointing to last node
{
// try to swing Tail to the next node
CAS(ref Tail, tail, new pointer_t(next.ptr, tail.count + 1));
}

} // endif

} // endloop
}

}

class TestQueue<T>
{
public
void
enqueue(
T item
)
{
lock (_list)
{
_list.AddFirst(item);
}
}

public
bool
deque(
ref T item
)
{
LinkedListNode<T> last = null;

lock (_list)
{
last = _list.Last;

if (last != null)
{
item = last.Value;
_list.Remove(last);
}
}

return last != null;
}

LinkedList<T> _list = new LinkedList<T>();
}

class Program
{
const int CHECKSIZE = 64;
const int CONSUMERS = 2;
const int PRODUCERS = 2;
const int PRODUCESIZE = 1000000;

#if USEMSQUEUE
static MSQueue<int> queue = new MSQueue<int>();
#else
static TestQueue<int> queue = new TestQueue<int>();
#endif

static bool consumersDone = false;

static void Main(string[] args)
{

Thread[] consumers = new Thread[CONSUMERS];
Thread[] producers = new Thread[PRODUCERS];

System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();

sw.Start();
Start(producers, producer);
Start(consumers, consumer);
Join(producers);
consumersDone = true;
Join(consumers);
sw.Stop();

Console.WriteLine("{0Blush | :O .0000} seconds", sw.Elapsed.TotalSeconds);
}

static void Join(
Thread[] threads
)
{
foreach (Thread t in threads)
{
t.Join();
}
}

static void Start(
Thread[] threads,
ThreadStart method
)
{
for (int i = 0; i < threads.Length; i++)
{
threads[i] = new Thread(method);
threads[i].Start();
}
}

static void producer()
{
for (int i = 0; i < PRODUCESIZE; i++)
{
queue.enqueue(i);
}
}

static void consumer()
{
int n = 0;

for (; ; )
{
if (queue.deque(ref n))
{
}

else if (consumersDone)
{
break;
}
}
}
}
}
GeneralRe: Performance problem... Pin
Idaho Edokpayi7-Feb-08 2:39
Idaho Edokpayi7-Feb-08 2:39 
QuestionPatents? Pin
The Wizard of Doze3-Feb-08 5:58
The Wizard of Doze3-Feb-08 5:58 
AnswerRe: Patents? Pin
Idaho Edokpayi3-Feb-08 8:26
Idaho Edokpayi3-Feb-08 8:26 
GeneralRe: Patents? Pin
The Wizard of Doze5-Feb-08 12:18
The Wizard of Doze5-Feb-08 12:18 
AnswerRe: Patents? Pin
supercat94-Jul-08 7:59
supercat94-Jul-08 7:59 
AnswerRe: Patents? Pin
GWBas1c7-Jun-10 18:53
GWBas1c7-Jun-10 18:53 
GeneralHeap operation is blocking Pin
Ricky Lung30-Jan-08 17:34
Ricky Lung30-Jan-08 17:34 
GeneralRe: Heap operation is blocking Pin
Idaho Edokpayi31-Jan-08 13:52
Idaho Edokpayi31-Jan-08 13:52 
GeneralRe: Heap operation is blocking Pin
dasblinkenlight25-Nov-08 9:08
dasblinkenlight25-Nov-08 9:08 
GeneralRe: Heap operation is blocking Pin
supercat94-Jul-08 8:03
supercat94-Jul-08 8:03 
QuestionWhere is the C# version? Pin
Dewey30-Jan-08 15:05
Dewey30-Jan-08 15:05 
AnswerRe: Where is the C# version? Pin
Dewey31-Jan-08 21:17
Dewey31-Jan-08 21:17 
GeneralRe: Where is the C# version? Pin
Idaho Edokpayi2-Feb-08 3:54
Idaho Edokpayi2-Feb-08 3:54 
GeneralPossibly Interesting Pin
Dewey30-Jan-08 15:03
Dewey30-Jan-08 15:03 
GeneralRe: Possibly Interesting Pin
Idaho Edokpayi30-Jan-08 16:49
Idaho Edokpayi30-Jan-08 16:49 
GeneralRe: Possibly Interesting Pin
Jim Crafton31-Jan-08 11:00
Jim Crafton31-Jan-08 11:00 
GeneralRe: Possibly Interesting Pin
Idaho Edokpayi31-Jan-08 14:02
Idaho Edokpayi31-Jan-08 14:02 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.