|
/** @file queueing.cpp
* @brief Organize simple message queueing based on Lwsync.
* @author Vladimir Frolov
*
* Copyright (c) Vladimir Frolov, 2006-2008
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE_1_0.txt or copy at
* http://www.boost.org/LICENSE_1_0.txt)
*
*/
#include <lwsync/critical_resource.hpp>
#include <lwsync/monitor.hpp>
#include <boost/thread/thread.hpp>
#include <iostream>
#include <queue>
#include <vector>
#include <string>
using namespace boost;
using namespace lwsync;
// How many threads will process messages.
const size_t workers_count = 5;
// Thread will receive ID and increase thread_identificator counter.
critical_resource<int> thread_identificator(0);
critical_resource<std::ostream&> sync_cout(std::cout);
typedef monitor<std::queue<std::string> > messaging_queue_t;
// Global message queue. Each thread will take one message from queue
// and process the message in parallel.
messaging_queue_t messages;
bool not_empty(const std::queue<std::string>& message_queue)
{
return !message_queue.empty();
}
void test_working_thread()
{
int my_id = (*thread_identificator.access())++;
// Note that threads can log its ID in any order.
sync_cout.access().resource() << "Thread started. ID = " << my_id << std::endl;
for(;;)
{
std::string message_received;
{
messaging_queue_t::accessor messsage_access = messages.wait_for(not_empty);
message_received = messsage_access->front();
if (message_received == "exit")
{
sync_cout.access().resource() << "Thread stops working and exit. ID = " << my_id << std::endl;
// Do not remove "exit" message from queue so this message will be received by all threads.
return;
}
messsage_access->pop();
}
// Process received message in parallel.
// Note that all messages can be processed in any order.
sync_cout.access().resource() << "Thread " << my_id << " receives message : " << message_received << std::endl;
}
}
void add_work();
void queueing()
{
size_t thread_index = 0;
std::vector<thread*> work_pool(workers_count);
for(thread_index = 0; thread_index < workers_count; ++thread_index )
{
work_pool[thread_index] = new thread(test_working_thread);
}
add_work();
for(thread_index = 0; thread_index < workers_count; ++thread_index )
{
thread* current_thread = work_pool[thread_index];
current_thread->join();
delete current_thread;
work_pool[thread_index] = 0;
}
}
void add_work()
{
std::vector<std::string> all_messages;
all_messages.push_back("one");
all_messages.push_back("two");
all_messages.push_back("three");
all_messages.push_back("four");
all_messages.push_back("five");
all_messages.push_back("six");
all_messages.push_back("seven");
all_messages.push_back("eight");
all_messages.push_back("nine");
all_messages.push_back("ten");
all_messages.push_back("eleven");
all_messages.push_back("twelve");
all_messages.push_back("exit");
// Add all messages to queue in one access so all workers can start working
// in one time.
{
messaging_queue_t::accessor messaging_access = messages.access();
for(std::vector<std::string>::const_iterator message = all_messages.begin();
message != all_messages.end();
++message
)
{
messaging_access->push(*message);
}
}
}
|
By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.
If a file you wish to view isn't highlighted, and is a text file (not binary), please
let us know and we'll add colourisation support for it.
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.