Click here to Skip to main content
Click here to Skip to main content
Go to top

An Asynchronous and Synchronous Worker Queue with Thread Pool

, 20 Jan 2014
Rate this:
Please Sign up or sign in to vote.
An Asynchronous and Synchronous Worker Queue With Thread Pool

Introduction

A worker queue is a data structure designed to store all kinds of events or requests, and then passed those events or requests to one or many threads (thread pool) to process. These events or requests can come from different calling threads and all these event or request items go into the queue, The thread pool resource is pre allocate at initialization, and reuse to handle continuous event flow[1]. It is good scalable architecture design. There are two possible ways to process items in the queue. After calling threads finish putting event or request items into queue, they can either be done, continue something else, or they can wait until items get processed by the worker thread or thread pool. The first way is asynchronous and second one is synchronous. I have coded a worker queue with thread pool doing both ways in C++. I have seen some articles here doing asynchronous way[2][3], but I have never seen one doing both ways.

Many server applications, such as web servers, database servers, file servers, or mail servers need process a large number of events or requests that arrive from remote sources. I come from financial trading industry, where trading servers need not only listen to high throughput price update events, order event, financial news events, and trader decision event etc, but also analyze those information together, and make time critical trading decision, so marshalling into one worker thread is good technique. Also with different kinds of event, servers may want to handle differently. For example, trader decide to cancel all trading orders because some abrupt event like 9.11, then this trader decision event need to be handled synchronously, waiting until this item, canceling all orders, get processed.

Worker Queue Thread Pool Implementation

Let us take a brief look at how I define data members of this worker queue thread pool class, and the comments tell the usage of each data member. Those data member is initialized before any usage, including all the worker thread resource allocation.

template <class _TyItem, class _TyStatus = WorkerQueueStatus< bool > >
class WorkerQueue : public SpinLock
{
    typedef typename std::list< EnqueuedData > QUEUE;
    bool m_bInTerm;              // flag to see whether the worker queue is in termination or not
    bool m_bInitialized;         // flag to see whether the worker queue is initialized or not
    QUEUE m_queue;               // The STL list to store all items in the worker queue
    HANDLE m_hHasItem;           // the auto reset handle to signal a new item added into worker queue
    volatile long m_lSetEvent;   // the value to check whether dequeue start or not, 
                                 // if the value is 0, then enqueue a new item can trigger event object signaled
    void *m_pUserData;           // The user of the worker queue can pass his data using this
    unsigned int m_uiTimeout;    // Maximum time to time out    
    THREADVEC m_threadVec;       // vector to hold handle and threadid for all worker thread or thread pool
    HANDLE m_hShutdown;          // The manual reset handle for handling shutdown 
    unsigned int m_uiNumThreads; // the number of worker thread<br />
    // the callback function when an item is processed
    typename _TyStatus::status_type ( __stdcall *m_pProcessItemCallback )( void *, _TyItem &item ); 
} 

The key data member is the m_queue. It is a STL list, you can enqueue data item to either front(head) or back, the front data is dequeued first, so user of this worker queue thread pool class can put more important data member in front. Besides actual data, there is workqueuestatus associated with it. The status is used for synchronous enqueue. Let us take a look the data member of workqueuestatus class.

template < class _TyStatus >
class WorkerQueueStatus
{
    TyStatus status;       // the status is the satus of running the callback function for processing a data item, usually a boolean
    HANDLE hEvent;         // Handle for a manually reset event object
    bool bManuallyRemoved; // flag to tell whether the data item is removed or not
    bool bStatusSet;       // flag to tell whether the event object has been reset to unsignalled or not
}; 

As mentioned earlier, I coded two ways to enqueue data member, asynthronous and synchronous with two different functions EnqueueItem, and EnqueueItemAndWait. 

inline wq_status EnqueueItem( const _TyItem &item, wq_push pushType = WQ_PUSH_BACK ) 

wq_status EnqueueItemAndWait(	const _TyItem &item, STATUSTYPE *pStatus,wq_push pushType = WQ_PUSH_BACK,unsigned int uiMaxWaitTime = INFINITE,bool bRemoveItemOnTimeout = false )  

Asynchronous function EnqueueItem is relative simple, just push_back or push_front data item into the list. The status pointer is not needed and set to NULL. After done inserting a data item in the queue, if the flag m_lSetEvent has not increment to one, then we need set the event object signaled (m_hHasItem is the handle is this autoset event object) for dequeue. One trick in the code is that it is better to set event outside of the lock, since setting an event cause an instant transition to the worker thread, in order to unlock, we need a context switch to oringal thread, and then another context switch back to worker thread to process item.

Synchronous function EnqueueItemAndWait is more complicated, since after pushing data item, it needs to wait the inserted item to be processed. This function also can be called inside worker thread or any other thread. If within a worker thread, we cannot wait because it will result in dead lock. The only thing we can do is that pump out all data items in queue to process or time out occurs. The actual code is in the function WaitInSideWorderThread. If not within a worker thread, we can wait this item to be processed by worker thread. The event object in the workerqueue status, associated with this data item is the one to use to synchronize with dequeueing action in the worker thread. The actual code is in the function WaitOutSideWorkerThread.

The dequeue happens in worker thread or thread pool function. Whenever the event is signaled, one thread in the worker thread pool will be waked up to start process all items in the queue. If an item was asynchronous enqueued, then workqueue status is NULL, and we do not need do anything, If an item was synchronous enqueued, then we need set an event in the worksqueue status object, so that asynchronous enqueue would know this item is alaready processed.  

Usage Example and Testing

You can use this worker queue in any servers, applications where you need handle multiple events or requests coming from different threads using the thread pool inside the worker queue. One usage is that there is one worker queue with multiple pointers to it from multiple consumers. In a previous article I wrote[4], I create publisher and consumers from multiple threads, please read the article for details. Here I just extend that by adding a pointer to a shared worker queue in all consumer objects. Let us take a look at data members in consumer class.

class Consumer : public DataEvent
{
    workerqueue * m_pQueueData; // the pointer to the worker queue
    HANDLE m_hThread;           // Subscribe to Publisher in different thread, so that is the handle
    Publisher * m_pPublisher;   // the pointer to the publisher to which this consumer object is subscribing
};  

In the previous article, one publisher is publishing event datas from multiple threads. User can adjust number of publishing threads, and I tested up to 1500 threads in the previous article. There is any number of consumer object listening to continuous event updates from all publishing threads. Now that each consumer object is linked to one common worker queue, all the event updates from different publishing threads go into worker queue, then get processed by threads in the thread pool. Let us take a look at the callback in consumer whenever a new event data is received from publisher, and the callback function in worker queue when thread pool process a new event data item.

void Consumer::OnNewData(EventData * pEd )
{
    DWORD ThreadId = GetCurrentThreadId();
    std::cout << "C, Type:" << pEd->EventType << " Data:" << pEd->Data << std::endl;
    EventData ed;
    ed.Data = pEd->Data;    ed.EventType = pEd->EventType;    
    //  enqueue data asynchronous into worker queue whenever a new event data comes in    
    this->GetQueueData()->EnqueueItem( *pEd );
}

static bool __stdcall ProcessQueueItemCallback( void *pData, EventData &dta )
{
    char * pUserData = static_cast<char*> ( pData );
    DWORD ThreadId = GetCurrentThreadId();

    // By adjusting sleep time, you can adjust processing item rate in worker queue.    
    Sleep(100);
    std::cout << "In marshalled thread id "<< ThreadId << " " << dta.EventType << ":" << dta.Data << std::endl;    
    return true;
} 

Here is the code for testing the worker queue. I tested a lot of cases where publishing rate is higher than processing data rate, so that the length of queue is greater than zero, where number of publisher thread is equal, less, or greather than the number of worker threads etc.

const int NUM_PUBLISHER_THREADS = 2;
const int NUM_CONSUMERS = 2;
const int NUM_WORKER_THREADS = 2;
workerqueue myWorkerQueue;

int _tmain(int argc, _TCHAR* argv[])
{
    EventData myEventData[NUM_PUBLISHER_THREADS];
    std::vector< EventData *> pEd;
    pEd.resize(NUM_PUBLISHER_THREADS);

    for ( int i = 0; i < NUM_PUBLISHER_THREADS; i++ )
    {
        myEventData[i].EventType = i + 1;
        pEd[i] = &myEventData[i];
    }

    Publisher myPublisher(NUM_PUBLISHER_THREADS);
    myPublisher.SetEventData( pEd );
    workerqueue::WorkerQueueInit queueinit;
    queueinit.pUserData = "User Data";
    queueinit.pProcessItemCallback = &ProcessQueueItemCallback;
    queueinit.pszThreadName = "Worker Queue";
    queueinit.uiNumThreads = NUM_WORKER_THREADS;

    if ( WQ_STATUS_SUCCESS != myWorkerQueue.Init( queueinit ))
    {
        std::cout << "Unable to start worker queue" << std::endl;
        return 0;
    }

    Consumer myConsumers[NUM_CONSUMERS];
    for ( int i = 0; i < NUM_CONSUMERS; i++ )
    {
        myConsumers[i].SetAPublisher( &myPublisher );    
        myConsumers[i].SubScribeToPublisher();
        myConsumers[i].SetQueue( &myWorkerQueue );
    }

    myPublisher.initialize();
    Sleep(10000 );
    myConsumers[1].UnSubScribe();
    Sleep(1000);
    EventData ed;
    ed.Data = -1;
    ed.EventType = -1;
    workerqueue::STATUSTYPE status;

    // Do a synthronous enqueue 
    myWorkerQueue.EnqueueItemAndWait( ed, &status );
    for ( int i = 0; i < NUM_PUBLISHER_THREADS; i++ )
        joinThread( myPublisher.GetThreadHandle( i ) );

    return 0;
}

Literature

  1. http://en.wikipedia.org/wiki/Thread_pool_pattern
  2. http://www.codeproject.com/Articles/283034/Work-queue-with-Boost-Threads
  3. http://www.codeproject.com/Articles/27703/Producer-Consumer-Using-Double-Queues
  4. http://www.codeproject.com/Articles/704717/Concurrent-Event-Sink-Template-Under-MultiThreadin

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

PengHeProfessor
Software Developer (Senior)
United States United States
I have been developing low latency high throughput applications, services and platforms in financial trading industry since 2004, mostly in C++, some in C#.net and Java.

Comments and Discussions

 
QuestionWorker Queues with thread pools Pinmembergeoyar25-Feb-14 14:40 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Mobile
Web03 | 2.8.140916.1 | Last Updated 20 Jan 2014
Article Copyright 2014 by PengHeProfessor
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid