Click here to Skip to main content
15,867,488 members
Articles / Programming Languages / C++

Thread Synchronization Queue with Boost

Rate me:
Please Sign up or sign in to vote.
4.29/5 (7 votes)
28 Oct 2014CPOL4 min read 50.4K   37   12
Implement a thread synchronization queue with STL and boost.

Introduction

Currently I am working on a networking communication project and I am trying to develop it using C++ STL and boost. When developing a multi-thread program, synchronization is an important issue. If your program needs to process streaming packets, then maintaining a queue is a good idea.

Background

This is my first time using boost, it's not really easy to use because of a lack of good examples. You can find the boost library and documentation at http://www.boost.org/. Here is the advantage of using boost, taken from its website:

In a word, Productivity. Use of high-quality libraries like Boost speeds initial development, results in fewer bugs, reduces reinvention-of-the-wheel, and cuts long-term maintenance costs. And since Boost libraries tend to become de facto or de jure standards, many programmers are already familiar with them. 

I am only using the boost synchronization class in this example but all the functions can be rewritten with boost and can be used in cross-platform development. The boost synchronization class looks straightforward but I still made some mistakes as a beginner, so I developed a test project to verify its functionality. After understanding how to use it, it will help you simplify the code and reduce bugs.    

Using the Code 

In this example, I implemented the thread synchronization model as producer-consumer. The producer thread creates data and inserts it into the queue and the consumer thread uses the data and deletes the data from the queue. I use a mutex object to keep the two threads synchronized. 

I am trying to use different approaches to solve the same problem and then compare its advantages and disadvantages. 

First I designed an interface to abstract the sync queue model. The ISynchronizedQueue abstract class has only have two methods: add() and get(). add() will be used in the producer thread to insert data into the queue and get() will be used in the consumer thread to acquire and remove data from the queue. There are three different implementations of this interface:

  1. SynchronizedDequeue: is a double-ended queue, implemented with STL deque.
  2. SychronizedVector: is a ring or cycle queue, implemented with STL vector.
  3. SychronizedVectorNB: is the no-blocking version of SychronizedVector.

Here is the header and interface definition: 

C++
#include <iostream>
#include <deque>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>

using namespace std;
 
#define default_packesize 1280  

class TPacket
{
    int my_size;
    unsigned char my_databuf[default_packesize];
    unsigned int ID;
public:
    TPacket() {std::memset(my_databuf,0,sizeof(my_databuf));my_size=0;}
    ~TPacket() {;}
    int GetSize() {return my_size;}
    void SetSize(int size) {my_size = size;}
    unsigned int GetID() {return ID;}
    void SetID(int id) {ID = id;}
    bool GetData(char* pbuf,int& size) 
    {
        if(my_size>size)
            return false;
        size = my_size;
        memcpy(pbuf,my_databuf,my_size);
        return true;
    }
    bool SetData(char* pbuf,int size) 
    {
        if(size>default_packesize)
            return false;
        memcpy(my_databuf,pbuf,size);
        my_size=size;
        return true;
    }
public:    
    virtual bool IsValid() {return false;}
    virtual bool Encode() {return false;}
    virtual bool Decode() {return false;}
};

//queue interface
template <class T>
class ISynchronizedQueue
{
public:
    virtual bool add(T pkt) = 0;
    virtual bool get(T& pkt) = 0;
    virtual bool read(T& pkt) = 0;
    virtual bool del(T& pkt) = 0;
    virtual bool clear() = 0;
};

Let's see the implementations:

C++
class SynchronizedDequeue: public ISynchronizedQueue<TPacket>
{
    boost::mutex m_mutex;
    deque<TPacket> m_queue;
    boost::condition_variable m_cond;

public:
    bool add(TPacket pkt)
    {
        boost::lock_guard<boost::mutex> lock(m_mutex);
        if(m_queue.size()>100)
            m_queue.clear();
        m_queue.push_back(pkt);
        return true;
    }
    bool get(TPacket& pkt)
    {
        boost::lock_guard<boost::mutex> lock(m_mutex);
        if (!m_queue.size())
        {
            return false;
        }
        pkt = m_queue.front();
        m_queue.pop_front();
        return true;
    }

    bool read(TPacket& pkt)
    {
        boost::lock_guard<boost::mutex> lock(m_mutex);
        if (!m_queue.size())
        {
            return false;
        }
        pkt = m_queue.front();
        return true;
    }

    bool del(TPacket& pkt)
    {
        return get(pkt);
    }

    bool clear()
    {
        boost::lock_guard<boost::mutex> lock(m_mutex);
        m_queue.clear();
        return true;
    }
};

SynchronizedDequeue has a dynamic queue size. The advantage is that if the producer is faster than the consumer, no data will escape, all the produced data will be processed by the consumer. The disadvantage is it has more impact on memory management performance. It will allocate memory when a packet is inserted into the queue, and release memory when we return the data to the consumer thread. Since there will be many times of memory allocation and deallocation, this may slow down the memory reclaim for bigger objects in the same process.

C++
class SynchronizedVector :public ISynchronizedQueue<TPacket>
{
    int queue_size;
    boost::mutex m_mutex;
    std::vector<TPacket> my_vector;
    int start,end;

public:
    SynchronizedVector(int q_size=100) {queue_size = q_size; start=end=0; my_vector.assign(queue_size,TPacket());}
    bool add(TPacket pkt)
    {
        boost::lock_guard<boost::mutex> lock(m_mutex);
        my_vector[end++] = pkt;
        if(end>=queue_size)
            end = 0;
        if(end == start)
            start = end+1;
        if(start>=queue_size)
            start = 0;
        return true;
    }
    bool get(TPacket& pkt)
    {
        boost::lock_guard<boost::mutex> lock(m_mutex);
        if(start==end)
            return false;
        pkt = my_vector[start++];
        if(start>=queue_size)
            start = 0;
        return true;
    }
    bool read(TPacket& pkt)  //not support
    {
        return false;
    }
    bool del(TPacket& pkt) //not support
    {
        return false;
    }

    bool clear()
    {
        boost::lock_guard<boost::mutex> lock(m_mutex);
        start = end =0;
        return true;
    }
};

SychronizedVector uses a fixed size queue to avoid memory management overhead but it will overwrite old data in the queue which is not timely processed while new data comes and flushes it out.

C++
class SynchronizedVectorNB :public ISynchronizedQueue<TPacket>
{
    int queue_size;
    boost::mutex m_mutex;
    std::vector<TPacket> my_vector;
    int start,end;

public:
    SynchronizedVectorNB(int q_size=100) {queue_size = q_size; start=end=0; my_vector.assign(queue_size,TPacket());}
    bool add(TPacket pkt)
    {
        boost::unique_lock<boost::mutex> lock(m_mutex,boost::try_to_lock_t());
        if(!lock.owns_lock())
            return false;
        my_vector[end++] = pkt;
        if(end>=queue_size)
            end = 0;
        if(end == start)
            start = end+1;
        if(start>=queue_size)
            start = 0;
        return true;
    }
    bool get(TPacket& pkt)
    {
        boost::unique_lock<boost::mutex> lock(m_mutex,boost::try_to_lock_t());
        if(!lock.owns_lock())
            return false;

        if(start==end)
            return false;
        pkt = my_vector[start++];
        if(start>=queue_size)
            start = 0;
        return true;
    }
    bool read(TPacket& pkt) //not support
    {
        return false;
    }
    bool del(TPacket& pkt) //not support
    {
        return false;
    }

    bool clear()
    {
        boost::lock_guard<boost::mutex> lock(m_mutex);
        start = end =0;
        return true;
    }
};

SychronizedVectorNB will not be blocked by either the producer or consumer thread. The advantage is that if there is some other activity needs to be done in the same loop of the queue access thread, then the non-block version will guarantee the response time. 

The two queues above may block the thread when the thread tries to own the mutex object. If one thread owns the mutex and then some exception happens, the other thread will also be blocked. Its disadvantage is that it may fail in adding data to the queue when it fails to own the lock, the caller then needs to add the same data again.  

Here is the sample code for the producer thread:

C++
DWORD WINAPI ProducerServerThread(LPVOID lpParam)
{
 int count=0;
 
 ISynchronizedQueue<TPacket>* pQ = (ISynchronizedQueue<TPacket>*)lpParam;
 TPacket pkt; 
 LOG("\n-------------------------Producer thread begin-----------------------");
 while(1)
 {
  DWORD t1 = GetTickCount();
  Sleep(50);
 
  if(count++>=1000)
   break;
  
  //initialize packet data to zero.
  memset(&pkt,0,sizeof(pkt));

  //add content to packet, I only set the ID here, you can do something more.
  pkt.SetID(count);


  if(pQ->add(pkt))
   LOG("Add PACKET ID = %d ",pkt.GetID());
  else
   LOG("Add Packet Failed");
  DWORD t2 = GetTickCount();
 
  LOG("ONE-LOOP DURATION = %d",t2-t1);
 }
 LOG("\n-------------------------Producer thread end-----------------------");
 return 0;
}

Here is the sample code for the consumer thread:

C++
DWORD WINAPI ConsumerServerThread(LPVOID lpParam)
{
 int count=0;
 ISynchronizedQueue<TPacket>* pQ = (ISynchronizedQueue<TPacket>*)lpParam;
 TPacket pkt;
 LOG("\n-------------------------Cosumer thread begin-----------------------");
 while(1)
 {
  Sleep(10);
 
  if(count++>=1200)
   break;
 
  if(pQ->get(pkt))
   LOG("Get Packet ID = %d",pkt.GetID());
  else
   LOG("Get Packet Failed");
 }
 LOG("\n-------------------------Cosumer thread end-----------------------");
 return 0;
}

Here is the sample code for the main thread:

C++
SynchronizedDequeue m_q[5];
//SynchronizedVector m_q[5];
//SynchronizedVectorNB m_q[5]

int _tmain(int argc, _TCHAR* argv[])
{
   int thread_count =5;
   HANDLE server_threads[10]; 
 
   for (int i=0; i < thread_count ;i++)
    {
        server_threads[i] = CreateThread(
                                        NULL,
                                        0,
                                        ProducerServerThread,
                                        &m_q[i],
                                        0,
                                        NULL
                                        );
        if (server_threads[i] == NULL)
        {
            LOG( "Create Thread failed: %d\n", GetLastError());
            return 0;
        }
    }
 
    for (int i= 0; i < thread_count ;i++)
    {
        server_threads[i+thread_count] = CreateThread(
                                        NULL,
                                        0,
                                        ConsumerServerThread,
                                        &m_q[i],
                                        0,
                                        NULL
                                        );
        if (server_threads[i] == NULL)
        {
            LOG( "Create Thread failed: %d\n", GetLastError());
            return 0;
        }
    }
 
  // Wait until the threads exit, then cleanup
    int retval = WaitForMultipleObjects(
                                   2*thread_count,
                                   server_threads,
                                   TRUE,
                                   INFINITE
                                   );
    if ((retval == WAIT_FAILED) || (retval == WAIT_TIMEOUT))
    {
        LOG( "WaitForMultipleObjects failed: %d\n", GetLastError());
        return 0;
    }
}

In the test code, I create five producers, five consumers, and five queues. Each producer has its partner consumer linked by using the same queue. You can verify if each packet data produced is processed in order by consumer thread through its packet ID. You can define the LOG macro by yourself, I use a thread safe LOG macro with a log time output. With the log time you can see the thread performance more clearly.

19:33:50:106  5972 info: ConsumerServerThread: Get Packet Failed
19:33:50:106  4244 info: ConsumerServerThread: Get Packet Failed
19:33:50:122  5808 info: ConsumerServerThread: Get Packet Failed
19:33:50:122  8464 info: ConsumerServerThread: Get Packet Failed
19:33:50:122  7760 info: ProducerServerThread: Add PACKET ID = 1 
19:33:50:122  7416 info: ConsumerServerThread: Get Packet ID = 1
19:33:50:122  7760 info: ProducerServerThread: ONE-LOOP DURATION = 63
19:33:50:138  5808 info: ConsumerServerThread: Get Packet Failed
19:33:50:138  5972 info: ConsumerServerThread: Get Packet Failed
19:33:50:138  8464 info: ConsumerServerThread: Get Packet Failed
19:33:50:138  4244 info: ConsumerServerThread: Get Packet Failed
19:33:50:138  8268 info: ProducerServerThread: Add PACKET ID = 1 
19:33:50:138  7416 info: ConsumerServerThread: Get Packet Failed
19:33:50:138  8268 info: ProducerServerThread: ONE-LOOP DURATION = 62
19:33:50:153  4244 info: ConsumerServerThread: Get Packet ID = 1
19:33:50:153  5808 info: ConsumerServerThread: Get Packet Failed
19:33:50:153  8836 info: ProducerServerThread: Add PACKET ID = 1 
19:33:50:153  7352 info: ProducerServerThread: Add PACKET ID = 1 
19:33:50:153  8464 info: ConsumerServerThread: Get Packet Failed
19:33:50:153  5972 info: ConsumerServerThread: Get Packet ID = 1
19:33:50:153  8836 info: ProducerServerThread: ONE-LOOP DURATION = 63
19:33:50:153  7352 info: ProducerServerThread: ONE-LOOP DURATION = 63

After testing the three different sync queues with 5 producer-consumer threads pair and 1000 packets add and get each, their performances are basically same. The log itself will cost around 10 ms. You can modify it to see how the three types of queues perform in a larger data set, a longer running time, or with a big object memory allocation.

Points of Interest

It was kind of fun to use a little bit of the C++ boost library for the first time.

History

  • First version, 08/17/2012.
  • Second version, 08/19/2012: Update the article with more explanation.
  • Third version, 10/28/2014: Update the code to support template.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer (Senior)
China China
I started my career by developing Pattern Recognition application on windows and multiple mobile platforms. The Handwriting Recognition Software I took lead in development still help millions of senior Chinese people input information to computer today. Now I am working on high precision 3D measuring and reconstruction devices.

I have been developing with C/C++,C#,MS-SQL,Java(Android),Python,Matlab and using frameworks like WPF/WCF,QT(Linux),C++ AMP/CUDA,OpenCV,PCL,VTK,Hoops. (BTW, Old fashioned tech like, Palm-OS, Symbian, Windows Mobile).

My background including Machine learning, Image Processing, Computer Vision, Point Cloud Processing, 3D Reconstruction.

Comments and Discussions

 
QuestionA couple of things you might add... Pin
JackDingler21-Aug-12 9:02
JackDingler21-Aug-12 9:02 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.