Click here to Skip to main content
15,893,190 members
Articles / Programming Languages / C++

Lock & Wait Synchronization in C++

Rate me:
Please Sign up or sign in to vote.
4.70/5 (26 votes)
28 Jul 2008Apache8 min read 132K   1.5K   68  
Applying lock and wait synchronization in C++.
/** @file queueing.cpp
  * @brief Organize simple message queueing based on Lwsync.
  * @author Vladimir Frolov
  *
  * Copyright (c) Vladimir Frolov, 2006-2008
  * Distributed under the Boost Software License, Version 1.0. (See
  * accompanying file LICENSE_1_0.txt or copy at
  * http://www.boost.org/LICENSE_1_0.txt)
  *
  */

#include <lwsync/critical_resource.hpp>
#include <lwsync/monitor.hpp>

#include <boost/thread/thread.hpp>

#include <iostream>
#include <queue>
#include <vector>
#include <string>

using namespace boost;
using namespace lwsync;

// How many threads will process messages.
const size_t workers_count = 5;

// Thread will receive ID and increase thread_identificator counter.
critical_resource<int> thread_identificator(0);

critical_resource<std::ostream&> sync_cout(std::cout);

typedef monitor<std::queue<std::string> > messaging_queue_t;

// Global message queue. Each thread will take one message from queue
// and process the message in parallel.
messaging_queue_t messages;

bool not_empty(const std::queue<std::string>& message_queue)
{
   return !message_queue.empty();
}

void test_working_thread()
{
   int my_id = (*thread_identificator.access())++;

   // Note that threads can log its ID in any order.
   sync_cout.access().resource() << "Thread started. ID = " << my_id << std::endl;
   for(;;)
   {
      std::string message_received;
      {
         messaging_queue_t::accessor messsage_access = messages.wait_for(not_empty);
         message_received = messsage_access->front();
      
         if (message_received == "exit")
         {
            sync_cout.access().resource() << "Thread stops working and exit. ID = " << my_id << std::endl;
            // Do not remove "exit" message from queue so this message will be received by all threads.
            return;
         }
         messsage_access->pop();
      }
      
      // Process received message in parallel.
      // Note that all messages can be processed in any order.
      sync_cout.access().resource() << "Thread " << my_id << " receives message : " << message_received << std::endl;
   }
}

void add_work();

void queueing()
{
   size_t thread_index = 0;
   std::vector<thread*> work_pool(workers_count);
   for(thread_index = 0; thread_index < workers_count; ++thread_index )
   {
      work_pool[thread_index] = new thread(test_working_thread);
   }

   add_work();

   for(thread_index = 0; thread_index < workers_count; ++thread_index )
   {
      thread* current_thread = work_pool[thread_index];
      current_thread->join();
      delete current_thread;
      work_pool[thread_index] = 0;
   }
}

void add_work()
{
   std::vector<std::string> all_messages;
   all_messages.push_back("one");
   all_messages.push_back("two");
   all_messages.push_back("three");
   all_messages.push_back("four");
   all_messages.push_back("five");
   all_messages.push_back("six");
   all_messages.push_back("seven");
   all_messages.push_back("eight");
   all_messages.push_back("nine");
   all_messages.push_back("ten");
   all_messages.push_back("eleven");
   all_messages.push_back("twelve");
   all_messages.push_back("exit");

   // Add all messages to queue in one access so all workers can start working
   // in one time.
   {
      messaging_queue_t::accessor messaging_access = messages.access();
      for(std::vector<std::string>::const_iterator message = all_messages.begin();
          message != all_messages.end();
          ++message
         )
      {
         messaging_access->push(*message);
      }
   }
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Apache License, Version 2.0


Written By
Architect
Ukraine Ukraine
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.

Comments and Discussions