Click here to Skip to main content
Email Password   helpLost your password?

Introduction

This article demonstrates implementation of a "lock free" queue in C# and C++. Lock free queues are typically used in multi-threaded architectures to communicate between threads without fear of data corruption or performance loss due to threads waiting to use shared memory. The goal of the article is to familiarize readers with lock free queues and provide a starting point to writing wait-free production architectures. It should be noted that using lock-free queues is only the beginning - a true lock free architecture use a lock free memory allocator. Implementing lock free memory allocators is beyond the scope of this article however.

Background

Recent developments in CPU architecture has necessitated a change in thinking in high performance software architecture - multithreaded software. Communication between threads in multithreaded architecture has traditionally been accomplished using mutexes, critical sections, and locks. Recent research in algorithms and changes in computer architecture has led to the introduction of "wait free", "lock free", or "non-blocking" data structures. The most popular and possibly the most important is the queue, a First In First Out (FIFO) data structure.

The key to the majority of lock free data structures is an instruction known as Compare and Swap (CAS). The flow chart below describes what Compare and Swap does. For the assembler coders out there the instruction is named CMPXCHG on X86 and Itanium architectures. The special thing about this instruction is that it is atomic- meaning that other threads\processes cannot interrupt until it is finished. Operating Systems use atomic operations to implement sychronization - locks, mutexes, semaphores, and critical sections.

My code draws on research by Maged M. Michael and Michael L. Scott on non-blocking and blocking concurrent queue algorithms. In fact, an implementation of their queue is now part of the Java concurrency library. Their paper demonstrates why the queue is "linearizable" and "lock free". An implementation of the code in C is available here in tar.gz format. The idea is that pointers are reference counted and checked for consistency in a loop. The reference count is meant to prevent what is referred to the "ABA problem" - if a process or threads reads a value 'A' then attempts a CAS operation, the CAS operation might succeed incorrectly if a second thread or process changes value 'A' to 'B' and then back again to 'A'. If the "ABA" problem never occurs the code is safe because:

  1. The linked list is always connected.
  2. Nodes are only inserted at the end of the linked list and only inserted to a node that has a NULL next pointer.
  3. Nodes are deleted from the beginning of the list because they are only deleted from the head pointer and head always points to the first element of the list.
  4. Head always points to the first element of the list and only changes it's value atomically.

If CAS or similar instructions are not available I suggest using the STL queue or a similar queue with traditional sychronization primitives. Michael and Scott also present a "two lock" queue data structure.

Using the code

UML Diagram of sourceLockFreeQueue.jpg

Using the code provided with this article is simple.

Points of Interest

Did you know that Valve Software (makers of Half-Life computer game) have switched to a wait free architecture?

History

You must Sign In to use this message board.
 
 
Per page   
 FirstPrevNext
GeneralMy vote of 1
Bharat Karia
8:38 5 Feb '10  
Extremely buggy code
GeneralMy vote of 1
AAO
19:10 29 Jul '09  
This is very old algorithm IT DOES NOT WORK for multiple readers, it is not likely to work for multiple writers. It reliably works only for one reader one writer. just taking the article and compiling it is not a big deal. The author did not even bother to make it work for WIN64. Sloppy work.
Generalstress test core
fly_horse
3:21 8 May '09  
bool dequeue(T& t)
{
...
else // no need to deal with tail
{
// read value before CAS otherwise another deque might try to free the next node
t = next.ptr->value;

// try to swing Head to the next node
if(CAS(Head,head, pointer_t(next.ptr,head.count+1) ) )
{
bDequeNotDone = false;
}
}
...
}

every time my code will crash at the bold line

next is my code

#include "stdafx.h"
#include <iostream>

#include "LockFreeQ.h"
using namespace std;

MSQueue< int > Q;

int nReadThreads = 10;
int nWriteThreads = 10;


DWORD WINAPI ReadThreadFunc(LPVOID p)
{
while (true)
{
int i;
Q.dequeue(i);
}
}

DWORD WINAPI WriteThreadFunc(LPVOID p)
{
int i=0;
while (true)
{
Q.enqueue(i++);
Sleep(1);
}
}

int _tmain(int argc, _TCHAR* argv[])
{

for (int i=0; i< nWriteThreads; i++) {
CreateThread(NULL, 0, WriteThreadFunc, NULL, 0, NULL);
}

for (int i=0; i< nReadThreads; i++) {
CreateThread(NULL, 0, ReadThreadFunc, NULL, 0, NULL);
}


int n;
std::cin >> n;

return 0;
}

GeneralC# Test Crashing
irares
6:11 16 Apr '09  
Hi there,

The Assert fails in the following test:

	class LockFreeQueueTest
{
static MSQueue queue = new MSQueue();

static private void producer( object _count )
{
int count = (int) _count;

for( int i = 0; i < count; i++ )
{
queue.enqueue( i );
}

}

static private void consumer( object _count )
{
int count = (int) _count;

for( int i = 0; i < count; i++ )
{
int result = 0;
if( !queue.deque( ref result ) )
continue;

Trace.Assert( result == i );
}
}

public static void test( int count )
{
Thread producer = new Thread( new ParameterizedThreadStart( LockFreeQueueTest.producer ) );
Thread consumer = new Thread( new ParameterizedThreadStart( LockFreeQueueTest.consumer ) );

producer.Start( count );
consumer.Start( count );

producer.Join();
consumer.Join();
}
}

Generalstress test not working
hagai_sela
2:08 28 Jul '08  
Hi,
Following is a simple stress test that crashes after a few seconds. Am I missing something here?

#include "stdafx.h"
#include "LockFreeQ.h"
#include
using namespace std;

MSQueue< int > Q;

int nReadThreads = 1;
int nWriteThreads = 1;


DWORD WINAPI ReadThreadFunc(LPVOID p)
{
while (true)
{
int i;
Q.dequeue(i);
}
}

DWORD WINAPI WriteThreadFunc(LPVOID p)
{
int i=0;
while (true)
{
Q.enqueue(i++);
}
}

int _tmain(int argc, _TCHAR* argv[])
{

for (int i=0; i<nwritethreads;> {
CreateThread(NULL, 0, WriteThreadFunc, NULL, 0, NULL);
}

for (int i=0; i<nreadthreads;> {
CreateThread(NULL, 0, ReadThreadFunc, NULL, 0, NULL);
}


int n;
std::cin >> n;

return 0;
}


Hagai.

GeneralRe: stress test not working
cool_man_from_Russia
23:25 23 Mar '09  
Hi!
IMHO this stress for manager memory: crash in 'new' command.

For function WriteThreadFunc need next change:

DWORD WINAPI WriteThreadFunc(LPVOID p)
{
int i=0;
while (true)
{
Q.enqueue(i++);
Sleep(1);
}
GeneralHello
nidal102002
7:38 8 Apr '08  
waht musst i to change to make this code comapibel with .Net 1.1
i mean with out generics

thank you

nidal
GeneralStill a bug. [modified]
Patrick Twohig
15:25 7 Apr '08  
Regarding the Michael And Scott lock free queue:

Separating the CAS into separate operations is definitely a bug. In Scott's sample implementation, he shows a CAS function written in assembly that uses a single word compare/swap. In his proof-of-concept code he mocks memory allocation by using a short as his pointer and another short as his counter. His CAS algorithm is implemented as an sc (store/conditional) MIPS instruction written as inlined assembly. This is necessary because the allocator can (and will) recycle memory addresses. If you could assume that the allocator, for the life of the entire queue, will never recycle and address then the pointer tag is not necessary. Realistically, that is not practical. So the pointers are tagged to ensure that if one node was dequeued (and possibly re-enqueued with the same memory address) that it will skip that node. With this implementation, the ABA problem could still occur. Because it's possible to atomically write the pointer to memory, but then its tag lags behind by a few instructions. Not to mention the copying of the objects from scope to scope, also can be potentially racy.

What you need to do is use a double compare/swap which is available on Vista and Xbox360 as InterlockedCompareExchange64. On Xbox360 it's implemented using locked reads/writes and on 32 bit windows is implemented as cmpxchg8 (yes, the infamous F00F bug instruction). The downside to using the InterlockedCompareExchange64 is that you must cast the pointer/tag pair to a LONG64 which is technically undefined behavior. This can cause a failure if the memory isn't aligned on an 8 byte boundary, which is the natural alignment requirement for a long64. I've used a struct pair and a union of a long64, and that seems to do the trick. To rectify the alignment, you'll need to use __declspec( align(8) ) if you don't use a union to ensure that the compiler will align the pair of values properly. Also, if InterlockedCompareExchange64 isn't available, then you've gotta roll your own in assembly. If you do that, it's rather simple but be sure to follow the compiler's ABI.

It is important to also note that alignment is important. On most architectures (such as the PPC based Xbox360) the processor will set an exception flag when unaligned memory is accessed and the process causes a bus error, or in my case the console just locks up, the game quits and everybody stops having fun. On Intel architecture, it's a little different. Intel processors will set the exception flag, but then the exception is handled by replacing the single read/write instructions in the pipeline with a series of instructions to correct the erroneous bus transaction. Once that happens, the cmpxchg8 instruction is no longer atomic and we're still left with a race condition.

modified on Monday, April 7, 2008 9:10 PM

GeneralRe: Still a bug.
Idaho Edokpayi
16:36 5 May '08  
Thanks for the comment! At some point, I'll take the time to correct my article. I appreciate your reading and correcting me.

Idaho Edokpayi

GeneralIs there still a chance of bug?
Member 1220
23:00 26 Feb '08  
Hello,

I am still not sure that the following code is really safe:
pointer_t(const pointer_t* p): ptr(NULL),count(0)
{
if(NULL == p)
return;

InterlockedExchange(&count,const_cast< LONG >(p->count));
InterlockedExchangePointer(ptr,const_cast< node_t*>(p->ptr));
}

Imagine, your source pointer being completely modified between both Interlocked lines. The count you have is not consistent with the current one.

then when in your code you have:
...
// Read Tail.ptr and Tail.count together
pointer_t tail(Tail);
...

You may be interrupted between the interlockedExchange of your copy constructor. As Tail is a reference, you may end with tail (the copy) having the count of the tail at moment T0 and the pointer of the Tail at moment T1 (different moments, possibly different pointer_t ...

If the Tails have different count => it will be detected but... If no luck both Tail pointers at various moments may have the same count.
I think after that the queue might be messy.


I have come accross a InterlockedExchange64 function that might help to solve this issue. Concatenate both pointer and counter in a 64 bit value, and access it atomically!

Hopefully InterlockedExchange64 is lockfree (I am not sure).


Finally, I may have not gone deep enough to find out my assumptions are false, and that this implementation is correct.

Thanks for the work and for any answer.

Sincerely,

Didier HARRANG
GeneralRe: Is there still a chance of bug?
Idaho Edokpayi
3:35 27 Feb '08  
You could be right. But what of a 64 bit implementation?
I think I prefer taking a copy early in the constructor then only making the exchange if the pointer is unchanged. Thanks for the commment!

Idaho Edokpayi

GeneralRe: Is there still a chance of bug?
LanceDiduck
10:30 18 Apr '08  
The problem is that what is pointed to can change, but the address does not. Assume Q has a few nodea already then

pop invoke(thread A)//gets node pointer X, but suspends before CAS invoked
pop invoke(B)
pop return (B)//node pointer X deleted
push enter (B)//node pointer X' allocated (allocator uses exact same address as X -- why not?)
push return (B)//X' now
B sleeps
A wakes up, sees that X'==X returns X'

and FIFO is not satisfied.

The tags (counter) and the poitners have to be compared and swapped at the same time. Typically double word CAS is used, but there are schemes to pack the tags in the low order bytes of the pointer values too. 128 bit CAS is not part of the MSVC intrinsics, but that doesnt mean that a) it isnt supported by hardware, so you could write assenmbly to get at it, and b) that incorrect code must be written.

A small note: if your MSQueue is deleted before all items have been popped, you will have a memory leak. the Dtor should something like T v; while(deque(v)); in it to drain off all the items, if there are any.

The reason for selecting different allocators isn't strictly performance. Allocators can align the nodes on cache line boundaries, substantially reducing false sharing (thus contention), it can make room for those tag values, and also since the nodes are all the same size, making a lock free free list allocator is very easy. Even a system allocator with fast locks is subject to convoying, priority inversion, and other nasty things.


Lance
GeneralJava Concurrency Library
Mike O'Neill
15:09 21 Feb '08  
The article mentions the research in a 1996 paper by Maged M. Michael and Michael L. Scott on non-blocking and blocking concurrent queue algorithms, and states that ...
In fact, their queue implementation is now part of the Java concurrency library. 
Where are you getting this fact from? It frankly does not appear defensible. The principal author/architect of the Java concurency library is Doug Lea at Oswego (see http://g.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html[^]) not either of Maged Michael or Michael Scott.

It's true that Micheal Scott won the 2006 "Edsger W. Dijkstra Prize in Distributed Computing", for work he did in a 1991 paper which describes a mutual exclusion lock based on a queue. The 1991 paper is different from the 1996 paper that you cited. It's also true that his mutual exclusion lock forms the basis for monitor locks used in Java VMs. See http://www.podc.org/dijkstra/2006.html[^].

But a mutual exclusion lock is very clearly a locking technique, not a lock-free technique. And the paper you cited is not the one that forms the basis for monitor locks in Java.

So, I don't understand the statement in the article, to the effect that the 1996 paper is somehow the basis for the entirety of the Java concurrency library.

Please explain further.

PS: Professor Scott is a remarkable person with many achievements, which you can see from his University of Rochester website: http://www.cs.rochester.edu/~scott/[^]. With my question above, I intend no disrespect for him or any of his accomplishments. I simply seek justification for a fairly sweeping comment.
GeneralRe: Java Concurrency Library
Idaho Edokpayi
15:45 23 Feb '08  
I cannot find my original research which lead me to believe that they themselves wrote the queue implementation in the concurrency library. I've been busy since then. However this is what the concurrency library page said: "This implementation employs an efficient "wait-free" algorithm based on one described in Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms by Maged M. Michael and Michael L. Scott." Re-reading what I said, I do not understand how you would conclude that I was saying the paper was the basis for the entirety of the Java concurrency library. Your quote of what I said explains pretty clearly what I believed "..their queue implementation is now part of the Java concurrency library.:

Idaho Edokpayi

GeneralCritical Section on the Stack? How Can It Provide Protection/Synchronization?
Mike O'Neill
16:11 20 Feb '08  
In the ArrayQ::Resize() function (C++ file), what is the purpose of the CRITICAL_SECTION?

It's being created on the stack, inside the local scope of the Resize() function. Thus, every single thread that calls the function will get its own copy of a different critical section. As such, it cannot serve any possible thread-synchronization or data-protection functions.

Can you shed any light on this?

Mike
GeneralRe: Critical Section on the Stack? How Can It Provide Protection/Synchronization?
Idaho Edokpayi
15:48 23 Feb '08  
I've since revised the code and removed the ArrayQ class since I believed the queue based on the paper was a stronger and more defensible implementation. I actually don't recommend the ArrayQ class. I am also planning a fix for the performance problem in the C# queue.

Idaho Edokpayi

GeneralRe: Critical Section on the Stack? How Can It Provide Protection/Synchronization?
opflucker
16:13 5 May '08  
You avoid to respond the question. The question is not about if ArrayQ is stronger or not than other implementation, the question is about a "conceptual mistake" in the implementation of ArrayQ. Why not simple accept an error?
GeneralRe: Critical Section on the Stack? How Can It Provide Protection/Synchronization?
Idaho Edokpayi
16:29 5 May '08  
Looking back, (way back) the ArrayQ wasn't that great an implementation. I noticed the error and decided that the other algorithm was more likely to produce the results I was seeking, so it would have been a waste of my time and the reader's time to fix code that shouldn't have been used at all. I acknowledge that there are weak points still. My intention is to fix them but I don't have the time lately.

Idaho Edokpayi

GeneralPerformance problem...
jahuh
14:28 6 Feb '08  
It works, however performance suffers because of the consistancy check.

The problem is the consistency check fails often under high read/write contention because multiple calls to Interlocks cannot synchronize a single state (e.g., the second Interlock attempting to keep the state consistant through variabloe count is causing the problem).

To show the problem, I ran a simple test with on a four core machine using two threads to populate the queue with integers (2 x 1M) and two threads consume from the queue. I created a second TestQueue class backed by a LinkedList using monitor locking.

MSQueue averages 2.8 seconds
TestQueue averages 0.54 seconds

To run MSQueue, define USEMSQUEUE. To run TestQueue, do not define USEMSQUEUE

///////////////////////////////////////////////////////////////////////

#define USEMSQUEUE

using System;
using System.Collections.Generic;
using System.Threading;

namespace lfq
{
public class MSQueue<T>
{
public class node_t
{
public T value;
public pointer_t next;
/// <summary>
/// default constructor
/// </summary>
public node_t() { }

public override
string
ToString()
{
return string.Format("node_t(value={0}):{1}", value.ToString(), next.ToString());
}
}

public struct pointer_t
{
public long count;
public node_t ptr;

/// <summary>
/// copy constructor
/// </summary>
/// <param name="p"></param>
public pointer_t(pointer_t p)
{
ptr = p.ptr;
count = p.count;
}

/// <summary>
/// constructor that allows caller to specify ptr and count
/// </summary>
/// <param name="node"></param>
/// <param name="c"></param>
public pointer_t(node_t node, long c)
{
ptr = node;
count = c;
}

public override
string
ToString()
{
return string.Format("pointer_t(count={0}, ptr={1})", count, ptr == null ? "null" : ptr.ToString());
}
}

public pointer_t Head;
public pointer_t Tail;

public MSQueue()
{
node_t node = new node_t();
Head.ptr = Tail.ptr = node;
}

/// <summary>
/// CAS
/// stands for Compare And Swap
/// Interlocked Compare and Exchange operation
/// </summary>
/// <param name="destination"></param>
/// <param name="compared"></param>
/// <param name="exchange"></param>
/// <returns></returns>
private bool CAS(ref pointer_t destination, pointer_t compared, pointer_t exchange)
{
if (compared.ptr == Interlocked.CompareExchange(ref destination.ptr, exchange.ptr, compared.ptr))
{
Interlocked.Exchange(ref destination.count, exchange.count);
return true;
}

return false;
}

public bool deque(ref T t)
{
pointer_t head;

// Keep trying until deque is done
bool bDequeNotDone = true;
while (bDequeNotDone)
{
// read head
head = Head;

// read tail
pointer_t tail = Tail;

// read next
pointer_t next = head.ptr.next;

// Are head, tail, and next consistent?
if (head.count == Head.count && head.ptr == Head.ptr)
{
// is tail falling behind
if (head.ptr == tail.ptr)
{
// is the queue empty?
if (null == next.ptr)
{
// queue is empty cannnot dequeue
return false;
}

// Tail is falling behind. try to advance it
CAS(ref Tail, tail, new pointer_t(next.ptr, tail.count + 1));

} // endif

else // No need to deal with tail
{
// read value before CAS otherwise another deque might try to free the next node
t = next.ptr.value;

// try to swing the head to the next node
if (CAS(ref Head, head, new pointer_t(next.ptr, head.count + 1)))
{
bDequeNotDone = false;
}
}

} // endif

} // endloop

// dispose of head.ptr
return true;
}

public void enqueue(T t)
{
// Allocate a new node from the free list
node_t node = new node_t();

// copy enqueued value into node
node.value = t;

// keep trying until Enqueue is done
bool bEnqueueNotDone = true;

while (bEnqueueNotDone)
{
// read Tail.ptr and Tail.count together
pointer_t tail = Tail;

// read next ptr and next count together
pointer_t next = tail.ptr.next;

// are tail and next consistent
if (tail.count == Tail.count && tail.ptr == Tail.ptr)
{
// was tail pointing to the last node?
if (null == next.ptr)
{
if (CAS(ref tail.ptr.next, next, new pointer_t(node, next.count + 1)))
{
bEnqueueNotDone = false;
} // endif

} // endif

else // tail was not pointing to last node
{
// try to swing Tail to the next node
CAS(ref Tail, tail, new pointer_t(next.ptr, tail.count + 1));
}

} // endif

} // endloop
}

}

class TestQueue<T>
{
public
void
enqueue(
T item
)
{
lock (_list)
{
_list.AddFirst(item);
}
}

public
bool
deque(
ref T item
)
{
LinkedListNode<T> last = null;

lock (_list)
{
last = _list.Last;

if (last != null)
{
item = last.Value;
_list.Remove(last);
}
}

return last != null;
}

LinkedList<T> _list = new LinkedList<T>();
}

class Program
{
const int CHECKSIZE = 64;
const int CONSUMERS = 2;
const int PRODUCERS = 2;
const int PRODUCESIZE = 1000000;

#if USEMSQUEUE
static MSQueue<int> queue = new MSQueue<int>();
#else
static TestQueue<int> queue = new TestQueue<int>();
#endif

static bool consumersDone = false;

static void Main(string[] args)
{

Thread[] consumers = new Thread[CONSUMERS];
Thread[] producers = new Thread[PRODUCERS];

System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();

sw.Start();
Start(producers, producer);
Start(consumers, consumer);
Join(producers);
consumersDone = true;
Join(consumers);
sw.Stop();

Console.WriteLine("{0Blush .0000} seconds", sw.Elapsed.TotalSeconds);
}

static void Join(
Thread[] threads
)
{
foreach (Thread t in threads)
{
t.Join();
}
}

static void Start(
Thread[] threads,
ThreadStart method
)
{
for (int i = 0; i < threads.Length; i++)
{
threads[i] = new Thread(method);
threads[i].Start();
}
}

static void producer()
{
for (int i = 0; i < PRODUCESIZE; i++)
{
queue.enqueue(i);
}
}

static void consumer()
{
int n = 0;

for (; ; )
{
if (queue.deque(ref n))
{
}

else if (consumersDone)
{
break;
}
}
}
}
}

GeneralRe: Performance problem...
Idaho Edokpayi
3:39 7 Feb '08  
I'll look into it.

Idaho Edokpayi

GeneralPatents?
The Wizard of Doze
6:58 3 Feb '08  
"Lock free" algorithms are riddled with patents. Everyone considering to use them should be aware of this fact.
GeneralRe: Patents?
Idaho Edokpayi
9:26 3 Feb '08  
Do you know where to find info on such patents?

Idaho Edokpayi

GeneralRe: Patents?
The Wizard of Doze
13:18 5 Feb '08  
Idaho Edokpayi wrote:
Do you know where to find info on such patents?

Not a single source for all of them but try a Google search:

http://www.google.com/search?hl=en&q=%22lock+free%22++patent&btnG=Search[^]
GeneralRe: Patents?
supercat9
8:59 4 Jul '08  
The Wizard of Doze wrote:
"Lock free" algorithms are riddled with patents.

Has someone claimed a patent on the basic pattern:
  1. Latch old pointer to current object
  2. Create new version of object using information at latched pointer
  3. Compare-and-swap the current object pointer to the new one if it was equal to the old one
  4. If the CAS operation failed, repeat
Many lock-free algorithms are formed by taking long-standing existing algorithms and wrapping them in that fashion, so any patent on such algorithm would be void due to obviousness.
GeneralHeap operation is blocking
Ricky Lung
18:34 30 Jan '08  
Just like other lock-free data structure found on the web, the memory allocation/de-allocation in enqueue/dequeue will perform locking during the operation. Therefore the MSQueue is not lock-free unless some lock-free memory allocation scheme is applied. An "almost" (the first few allocations will touch the global heap anyway) lock-free free list memory allocator should not be hard to implement base on the code provided in ArrayQ and MSQueue Big Grin .

Happy coding.


Last Updated 23 Feb 2008 | Advertise | Privacy | Terms of Use | Copyright © CodeProject, 1999-2010