Sorry, probably it is a silly question, but I've been stucked for a long time with the following problem and I cannot find the solution. I'm working with boost thread library version 1.40. I want to create a bounded buffer which uses a mutex and conditional variables to manage the reader and writer. I'm followng the library examples as benchmark, but I want to be able to create mutexes objects at runtime.
I want to create two FIFOs, bufA and bufB. bufA is filled by thrd1 (writer) and drained by thrd2 (reader). However, thrd2 has to write the data readed from bufA to bufB, hence it is a writer for bufB. Finally, bufB is drained from thrd3. Of course, thrd2 has to lock both bufA and bufB before his operations. But I don't want to create global mutexes for both bufA and bufB, since I want my code to work for whatever number of FIFOs.
My code is the following. Note that this manages only a single FIFO, but if it worked I would be able to accomodate it to manage more FIFOs. I cannot understand what is wrong with it.
/*
fifo_buffer.h
Header implementing a mutex FIFO accessible from 2 threads,
a producer (writer) and a consumer (reader)
*/
#ifndef _FIFO_BUFFER_H
#define _FIFO_BUFFER_H
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <deque>
#define _ITERS 100
#ifndef _IO_MUTEX_
#define _IO_MUTEX_
boost::mutex io_mutex;
#endif
namespace fifo_buffer
{
template <class T>
class FifoBuffer
{
public:
typedef boost::mutex::scoped_lock scoped_lock;
FifoBuffer(unsigned int buf_size, int id_in, std::deque<T> & buf_in):
full(0), BUF_SIZE(buf_size), id(id_in), buf(buf_in)//,buf2(buf_in)
{ initialize(); }
T get()
{
T tmp = buf.front();
buf.pop_front();
return tmp;
}
void put(const T &m ){buf.push_back(m);}
void initialize()
{
buf.clear();
//buf2.clear();
}
unsigned int full, BUF_SIZE;//, full2, BUF2_SIZE;
int id;//, id2;
std::deque<T> &buf;//, &buf2;
};
template <class T>
class MutexFifo: FifoBuffer<T>
{
public:
MutexFifo(unsigned int buf_size, int id_in, std::deque<T> & buf_in): FifoBuffer(buf_size,id_in,buf_in) {}
int getId(){return id;}
void push(const T &m)
{
scoped_lock lock(mutex);
if (full == BUF_SIZE)
{
{
boost::mutex::scoped_lock lock(io_mutex);
std::cout << id << ": Buffer is full. Waiting..." << std::endl;
}
while (full == BUF_SIZE)
cond.wait(lock);
}
this->put(m);
++full;
cond.notify_one();
}
T pop()
{
scoped_lock lk(mutex);
if (full == 0)
{
{
scoped_lock lock(io_mutex);
std::cout << id << ": Buffer is empty. Waiting..." <<std::endl;
}
while (full == 0)
cond.wait(lk);
}
T i = this->get();
--full;
cond.notify_one();
return i;
}
private:
boost::mutex mutex;
boost::condition cond;
};
}
#endif
// main -- main.cpp
#include <iostream>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include "fifo_buffer.h"
#include <deque>
#ifndef _IO_MUTEX_
#define _IO_MUTEX_
boost::mutex io_mutex;
#endif
using fifo_buffer::FifoBuffer;
using fifo_buffer::MutexFifo;
typedef std::deque<int> FIFO;
void writer(MutexFifo<int> & buffer)
{
for (register int n = 0; n < _ITERS; ++n)
{
{
boost::mutex::scoped_lock lock(io_mutex);
std::cout << "sending " << buffer.getId() << ": " << n << std::endl;
}
buffer.push(n);
}
}
//template <class T>
void reader(MutexFifo<int> & buffer)
{
for (register int m = 0; m < _ITERS; ++m)
{
//T n = buffer_r.pop();
int n = buffer.pop();
{
boost::mutex::scoped_lock lock(io_mutex);
std::cout << "received " << buffer.getId() << ": " << n << std::endl;
}
}
}
int main()
{
FIFO fifo1;
MutexFifo<int> buffer(10,1,fifo1);
boost::thread thrd1(boost::bind(&reader,buffer));
boost::thread thrd2(boost::bind(&writer,buffer));
thrd1.join();
thrd2.join();
return 0;
}