Click here to Skip to main content
Click here to Skip to main content

Tagged as

Thread Synchronization Queue with Boost

, 22 Aug 2012 CPOL
Rate this:
Please Sign up or sign in to vote.
Implement a thread synchronization queue with STL and boost.

Introduction

Currently I am working on a networking communication project and I am trying to develop it using C++ STL and boost. When developing a multi-thread program, synchronization is an important issue. If your program needs to process streaming packets, then maintaining a queue is a good idea.

Background

This is my first time using boost, it's not really easy to use because of a lack of good examples. You can find the boost library and documentation at http://www.boost.org/. Here is the advantage of using boost, taken from its website:

In a word, Productivity. Use of high-quality libraries like Boost speeds initial development, results in fewer bugs, reduces reinvention-of-the-wheel, and cuts long-term maintenance costs. And since Boost libraries tend to become de facto or de jure standards, many programmers are already familiar with them. 

I am only using the boost synchronization class in this example but all the functions can be rewritten with boost and can be used in cross-platform development. The boost synchronization class looks straightforward but I still made some mistakes as a beginner, so I developed a test project to verify its functionality. After understanding how to use it, it will help you simplify the code and reduce bugs.    

Using the Code 

In this example, I implemented the thread synchronization model as producer-consumer. The producer thread creates data and inserts it into the queue and the consumer thread uses the data and deletes the data from the queue. I use a mutex object to keep the two threads synchronized. 

I am trying to use different approaches to solve the same problem and then compare its advantages and disadvantages. 

First I designed an interface to abstract the sync queue model. The ISynchronizedQueue abstract class has only have two methods: add() and get(). add() will be used in the producer thread to insert data into the queue and get() will be used in the consumer thread to acquire and remove data from the queue. There are three different implementations of this interface:

  1. SynchronizedDequeue: is a double-ended queue, implemented with STL deque.
  2. SychronizedVector: is a ring or cycle queue, implemented with STL vector.
  3. SychronizedVectorNB: is the no-blocking version of SychronizedVector.

Here is the header and interface definition: 

#include <iostream>
#include <deque>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>

using namespace std;
 
#define default_packesize 1280 
 
class TPacket
{
 int my_size;
 unsigned char my_databuf[default_packesize];
 unsigned int ID;
public:
 TPacket() {std::memset(my_databuf,0,sizeof(my_databuf));}
 ~TPacket() {;}
 int GetSize() {return my_size;}
 void SetSize(int size) {my_size = size;}
 unsigned int GetID() {return ID;}
 void SetID(int id) {ID = id;}
 void GetData(char* pbuf) {memcpy(pbuf,my_databuf,my_size);}
 void SetData(char* pbuf,int size) {memcpy(my_databuf,pbuf,size);}
public: 
 virtual bool IsValid() {return false;}
 virtual bool Encode() {return false;}
 virtual bool Decode() {return false;}
};
 

//queue interface
class ISynchronizedQueue
{
public:
 virtual bool add(TPacket pkt) = 0;
 virtual bool get(TPacket& pkt) = 0;
};

Let's see the implementations:

class SynchronizedDequeue: public ISynchronizedQueue
{
 boost::mutex m_mutex;
 deque<TPacket> m_queue;
 boost::condition_variable m_cond;
 
public:
 bool add(TPacket pkt)
 {
  boost::lock_guard<boost::mutex> lock(m_mutex);
 
  m_queue.push_back(pkt);
  return true;
 }
 bool get(TPacket& pkt)
 {
  boost::lock_guard<boost::mutex> lock(m_mutex);
  if (!m_queue.size())
  {
   return false;
  }
  pkt = m_queue.front();
  m_queue.pop_front();
  return true;
 }
};

SynchronizedDequeue has a dynamic queue size. The advantage is that if the producer is faster than the consumer, no data will escape, all the produced data will be processed by the consumer. The disadvantage is it has more impact on memory management performance. It will allocate memory when a packet is inserted into the queue, and release memory when we return the data to the consumer thread. Since there will be many times of memory allocation and deallocation, this may slow down the memory reclaim for bigger objects in the same process.

class SynchronizedVector :public ISynchronizedQueue
{
 int queue_size;
 boost::mutex m_mutex;
 std::vector<TPacket> my_vector;
 int start,end;
 
public:
 SynchronizedVector(int q_size=100) 
       {queue_size = q_size; start=end=0; my_vector.assign(queue_size,TPacket());}
 bool add(TPacket pkt)
 {
  boost::lock_guard<boost::mutex> lock(m_mutex);
  my_vector[end++] = pkt;
  if(end>=queue_size)
   end = 0;
  if(end == start)
   start = end+1;
  if(start>=queue_size)
   start = 0;
  return true;
 }
 bool get(TPacket& pkt)
 {
  boost::lock_guard<boost::mutex> lock(m_mutex);
  if(start==end)
   return false;
  pkt = my_vector[start++];
  if(start>=queue_size)
   start = 0;
  return true;
 }
};

SychronizedVector uses a fixed size queue to avoid memory management overhead but it will overwrite old data in the queue which is not timely processed while new data comes and flushes it out.

class SynchronizedVectorNB :public ISynchronizedQueue
{
 int queue_size;
 boost::mutex m_mutex;
 std::vector<TPacket> my_vector;
 int start,end;
 
public:
 SynchronizedVectorNB(int q_size=100) {queue_size = q_size; start=end=0; my_vector.assign(queue_size,TPacket());}
 bool add(TPacket pkt)
 {
  boost::unique_lock<boost::mutex> lock(m_mutex,boost::try_to_lock_t());
  if(!lock.owns_lock())
   return false;
  my_vector[end++] = pkt;
  if(end>=queue_size)
   end = 0;
  if(end == start)
   start = end+1;
  if(start>=queue_size)
   start = 0;
  return true;
 }
 bool get(TPacket& pkt)
 {
  boost::unique_lock<boost::mutex> lock(m_mutex,boost::try_to_lock_t());
  if(!lock.owns_lock())
   return false;
 
  if(start==end)
   return false;
  pkt = my_vector[start++];
  if(start>=queue_size)
   start = 0;
  return true;
 }
};

SychronizedVectorNB will not be blocked by either the producer or consumer thread. The advantage is that if there is some other activity needs to be done in the same loop of the queue access thread, then the non-block version will guarantee the response time. 

The two queues above may block the thread when the thread tries to own the mutex object. If one thread owns the mutex and then some exception happens, the other thread will also be blocked. Its disadvantage is that it may fail in adding data to the queue when it fails to own the lock, the caller then needs to add the same data again.  

Here is the sample code for the producer thread:

DWORD WINAPI ProducerServerThread(LPVOID lpParam)
{
 int count=0;
 
 ISynchronizedQueue* pQ = (ISynchronizedQueue*)lpParam;
 TPacket pkt; 
 LOG("\n-------------------------Producer thread begin-----------------------");
 while(1)
 {
  DWORD t1 = GetTickCount();
  Sleep(50);
 
  if(count++>=1000)
   break;
 
  memset(&pkt,0,sizeof(pkt));
  pkt.SetID(count);
  if(pQ->add(pkt))
   LOG("Add PACKET ID = %d ",pkt.GetID());
  else
   LOG("Add Packet Failed");
  DWORD t2 = GetTickCount();
 
  LOG("ONE-LOOP DURATION = %d",t2-t1);
 }
 LOG("\n-------------------------Producer thread end-----------------------");
 return 0;
}

Here is the sample code for the consumer thread:

DWORD WINAPI ConsumerServerThread(LPVOID lpParam)
{
 int count=0;
 ISynchronizedQueue* pQ = (ISynchronizedQueue*)lpParam;
 TPacket pkt;
 LOG("\n-------------------------Cosumer thread begin-----------------------");
 while(1)
 {
  Sleep(10);
 
  if(count++>=1200)
   break;
 
  if(pQ->get(pkt))
   LOG("Get Packet ID = %d",pkt.GetID());
  else
   LOG("Get Packet Failed");
 }
 LOG("\n-------------------------Cosumer thread end-----------------------");
 return 0;
}

Here is the sample code for the main thread:

SynchronizedDequeue m_q[5];
//SynchronizedVector m_q[5];
//SynchronizedVectorNB m_q[5]

int _tmain(int argc, _TCHAR* argv[])
{
   int thread_count =5;
   HANDLE server_threads[10]; 
 
   for (int i=0; i < thread_count ;i++)
    {
        server_threads[i] = CreateThread(
                                        NULL,
                                        0,
                                        ProducerServerThread,
                                        &m_q[i],
                                        0,
                                        NULL
                                        );
        if (server_threads[i] == NULL)
        {
            LOG( "Create Thread failed: %d\n", GetLastError());
            return 0;
        }
    }
 
    for (int i= 0; i < thread_count ;i++)
    {
        server_threads[i+thread_count] = CreateThread(
                                        NULL,
                                        0,
                                        ConsumerServerThread,
                                        &m_q[i],
                                        0,
                                        NULL
                                        );
        if (server_threads[i] == NULL)
        {
            LOG( "Create Thread failed: %d\n", GetLastError());
            return 0;
        }
    }
 
  // Wait until the threads exit, then cleanup
    int retval = WaitForMultipleObjects(
                                   2*thread_count,
                                   server_threads,
                                   TRUE,
                                   INFINITE
                                   );
    if ((retval == WAIT_FAILED) || (retval == WAIT_TIMEOUT))
    {
        LOG( "WaitForMultipleObjects failed: %d\n", GetLastError());
        return 0;
    }
}

In the test code, I create five producers, five consumers, and five queues. Each producer has its partner consumer linked by using the same queue. You can verify if each packet data produced is processed in order by consumer thread through its packet ID. You can define the LOG macro by yourself, I use a thread safe LOG macro with a log time output. With the log time you can see the thread performance more clearly.

19:33:50:106  5972 info: ConsumerServerThread: Get Packet Failed
19:33:50:106  4244 info: ConsumerServerThread: Get Packet Failed
19:33:50:122  5808 info: ConsumerServerThread: Get Packet Failed
19:33:50:122  8464 info: ConsumerServerThread: Get Packet Failed
19:33:50:122  7760 info: ProducerServerThread: Add PACKET ID = 1 
19:33:50:122  7416 info: ConsumerServerThread: Get Packet ID = 1
19:33:50:122  7760 info: ProducerServerThread: ONE-LOOP DURATION = 63
19:33:50:138  5808 info: ConsumerServerThread: Get Packet Failed
19:33:50:138  5972 info: ConsumerServerThread: Get Packet Failed
19:33:50:138  8464 info: ConsumerServerThread: Get Packet Failed
19:33:50:138  4244 info: ConsumerServerThread: Get Packet Failed
19:33:50:138  8268 info: ProducerServerThread: Add PACKET ID = 1 
19:33:50:138  7416 info: ConsumerServerThread: Get Packet Failed
19:33:50:138  8268 info: ProducerServerThread: ONE-LOOP DURATION = 62
19:33:50:153  4244 info: ConsumerServerThread: Get Packet ID = 1
19:33:50:153  5808 info: ConsumerServerThread: Get Packet Failed
19:33:50:153  8836 info: ProducerServerThread: Add PACKET ID = 1 
19:33:50:153  7352 info: ProducerServerThread: Add PACKET ID = 1 
19:33:50:153  8464 info: ConsumerServerThread: Get Packet Failed
19:33:50:153  5972 info: ConsumerServerThread: Get Packet ID = 1
19:33:50:153  8836 info: ProducerServerThread: ONE-LOOP DURATION = 63
19:33:50:153  7352 info: ProducerServerThread: ONE-LOOP DURATION = 63

After testing the three different sync queues with 5 producer-consumer threads pair and 1000 packets add and get each, their performances are basically same. The log itself will cost around 10 ms. You can modify it to see how the three types of queues perform in a larger data set, a longer running time, or with a big object memory allocation.

Points of Interest

It was kind of fun to use a little bit of the C++ boost library for the first time.

History

  • First version, 08/17/2012.
  • Second version, 08/19/2012: Update the article with more explanation.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

yux_315
Software Developer
China China
Have been working on OCR and Handwrtting Recognition application, currently working on 3D measuring and reconstrcuction software and devices.
 
Rich experice in developing with different languages, C/C++,C#,SQL,Java,Python,matlab.
 
Personal interests including, C++, Pattern Recognition, Image Processing, Computer Vision, Point Cloud Processing, 3d Reconstruction.
Follow on   Google+

Comments and Discussions

 
GeneralMy vote of 4 PinmemberChristian Amado20-Aug-12 7:13 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Mobile
Web02 | 2.8.141015.1 | Last Updated 22 Aug 2012
Article Copyright 2012 by yux_315
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid