Click here to Skip to main content
15,887,356 members
Please Sign up or sign in to vote.
2.00/5 (1 vote)
See more:
Sorry, probably it is a silly question, but I've been stucked for a long time with the following problem and I cannot find the solution. I'm working with boost thread library version 1.40. I want to create a bounded buffer which uses a mutex and conditional variables to manage the reader and writer. I'm followng the library examples as benchmark, but I want to be able to create mutexes objects at runtime.

I want to create two FIFOs, bufA and bufB. bufA is filled by thrd1 (writer) and drained by thrd2 (reader). However, thrd2 has to write the data readed from bufA to bufB, hence it is a writer for bufB. Finally, bufB is drained from thrd3. Of course, thrd2 has to lock both bufA and bufB before his operations. But I don't want to create global mutexes for both bufA and bufB, since I want my code to work for whatever number of FIFOs.

My code is the following. Note that this manages only a single FIFO, but if it worked I would be able to accomodate it to manage more FIFOs. I cannot understand what is wrong with it.

XML
/*
fifo_buffer.h

    Header implementing a mutex FIFO accessible from 2 threads,
    a producer (writer) and a consumer (reader)
*/

#ifndef _FIFO_BUFFER_H
#define _FIFO_BUFFER_H

#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <deque>

#define _ITERS 100

#ifndef _IO_MUTEX_
#define _IO_MUTEX_
boost::mutex io_mutex;
#endif

namespace fifo_buffer
{

    template <class T>
    class FifoBuffer
    {
    public:
        typedef boost::mutex::scoped_lock scoped_lock;

        FifoBuffer(unsigned int buf_size, int id_in, std::deque<T> & buf_in):
            full(0), BUF_SIZE(buf_size), id(id_in), buf(buf_in)//,buf2(buf_in)
        { initialize(); }

        T get()
        {
            T tmp = buf.front();
            buf.pop_front();
            return tmp;
        }
        void put(const T &m ){buf.push_back(m);}

        void initialize()
        {
            buf.clear();
            //buf2.clear();
        }
        unsigned int full, BUF_SIZE;//, full2, BUF2_SIZE;
        int id;//, id2;
        std::deque<T> &buf;//, &buf2;
    };

    template <class T>
    class MutexFifo: FifoBuffer<T>
    {
    public:

        MutexFifo(unsigned int buf_size, int id_in, std::deque<T> & buf_in): FifoBuffer(buf_size,id_in,buf_in) {}

        int getId(){return id;}

        void push(const T &m)
        {
            scoped_lock lock(mutex);
            if (full == BUF_SIZE)
            {
                {
                    boost::mutex::scoped_lock lock(io_mutex);
                    std::cout << id << ": Buffer is full. Waiting..." << std::endl;
                }
                while (full == BUF_SIZE)
                    cond.wait(lock);
            }
            this->put(m);
            ++full;
            cond.notify_one();
        }

        T pop()
        {
            scoped_lock lk(mutex);
            if (full == 0)
            {
                {
                    scoped_lock lock(io_mutex);
                    std::cout << id << ": Buffer is empty. Waiting..." <<std::endl;
                }
                while (full == 0)
                    cond.wait(lk);
            }
            T i = this->get();
            --full;
            cond.notify_one();

            return i;
        }

    private:
        boost::mutex mutex;
        boost::condition cond;
    };

}
#endif


XML
// main -- main.cpp

#include <iostream>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include "fifo_buffer.h"
#include <deque>

#ifndef _IO_MUTEX_
#define _IO_MUTEX_
boost::mutex io_mutex;
#endif

using fifo_buffer::FifoBuffer;
using fifo_buffer::MutexFifo;


typedef std::deque<int> FIFO;

void writer(MutexFifo<int> & buffer)
{
        for (register int n = 0; n < _ITERS; ++n)
        {
            {
                boost::mutex::scoped_lock lock(io_mutex);
                std::cout << "sending " << buffer.getId() << ": " << n << std::endl;
            }
            buffer.push(n);
        }
    }

//template <class T>
void reader(MutexFifo<int> & buffer)
    {
        for (register int m = 0; m < _ITERS; ++m)
        {
            //T n = buffer_r.pop();
            int n = buffer.pop();
            {
                boost::mutex::scoped_lock lock(io_mutex);
                std::cout << "received " << buffer.getId() << ": " << n << std::endl;
            }
        }
    }


int main()
{

    FIFO fifo1;
    MutexFifo<int> buffer(10,1,fifo1);

    boost::thread thrd1(boost::bind(&reader,buffer));
    boost::thread thrd2(boost::bind(&writer,buffer));

    thrd1.join();
    thrd2.join();

    return 0;
}
Posted
Updated 21-Apr-10 4:06am
v3

1 solution

Dear, pls do not post so lengthy code, unless that asked for. I guess most of the ppl got less interest after seeing long post. Try to describe your technical requirements & queries in quite understandable format, like:

1. I want to create two FIFOs, bufA and bufB.
2. bufA is filled by thrd1 (writer) and drained by thrd2 (reader).
3. thrd2 has to write the data readed from bufA to bufB, hence it is a
writer for bufB.
4. Finally, bufB is drained from thrd3.
5. But thrd2 has to lock both bufA and bufB before his operations.

But I dont want to create global mutexes for both bufA and bufB, since I want my code to work for whatever number of FIFOs.

NOTE: I'm working with boost thread library version 1.40. I want to create a bounded buffer which uses a mutex and conditional variables to manage the reader and writer.

Pls Help.
 
Share this answer
 

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900