Click here to Skip to main content
15,892,674 members
Articles / Programming Languages / C++

RCF - Interprocess Communication for C++

Rate me:
Please Sign up or sign in to vote.
4.94/5 (147 votes)
25 Oct 2011CPOL20 min read 4.6M   8.4K   331  
A server/client IPC framework, using the C++ preprocessor as an IDL compiler.
//******************************************************************************
// RCF - Remote Call Framework
// Copyright (c) 2005 - 2007. All rights reserved.
// Consult your license for conditions of use.
// Developed by Jarl Lindrud.
// Contact: jlindrud@hotmail.com .
//******************************************************************************

#include <RCF/ThreadManager.hpp>

//#include <boost/lambda/bind.hpp>

#include <RCF/InitDeinit.hpp>
#include <RCF/ThreadLocalData.hpp>

namespace RCF {

    // I_ThreadManager

    void I_ThreadManager::setThreadName(const std::string &threadName)
    {
        Lock lock(mInitDeinitMutex);
        mThreadName = threadName;
    }

    std::string I_ThreadManager::getThreadName()
    {
        Lock lock(mInitDeinitMutex);
        return mThreadName;
    }

#ifdef _MSC_VER

    typedef struct tagTHREADNAME_INFO
    {
        DWORD dwType; // must be 0x1000
        LPCSTR szName; // pointer to name (in user addr space)
        DWORD dwThreadID; // thread ID (-1=caller thread)
        DWORD dwFlags; // reserved for future use, must be zero
    } THREADNAME_INFO;

    void SetThreadName( DWORD dwThreadID, LPCSTR szThreadName)
    {
        THREADNAME_INFO info;
        info.dwType = 0x1000;
        info.szName = szThreadName;
        info.dwThreadID = dwThreadID;
        info.dwFlags = 0;

        __try
        {
            RaiseException( 0x406D1388, 0, sizeof(info)/sizeof(DWORD), (ULONG_PTR*)&info );
        }
        __except(EXCEPTION_CONTINUE_EXECUTION)
        {
        }
    }

    void I_ThreadManager::setMyThreadName()
    {
        std::string threadName = getThreadName();
        if (!threadName.empty())
        {
            SetThreadName( DWORD(-1), threadName.c_str());
        }
    }

#else

    void I_ThreadManager::setMyThreadName()
    {}

#endif

    void I_ThreadManager::onInit()
    {
        std::vector<ThreadInitFunctor> initFunctors;
        {
            Lock lock(mInitDeinitMutex);
            std::copy(
                mThreadInitFunctors.begin(), 
                mThreadInitFunctors.end(), 
                std::back_inserter(initFunctors));
        }

        std::for_each(
            initFunctors.begin(), 
            initFunctors.end(), 
            boost::bind(&ThreadInitFunctor::operator(), _1));
    }

    void I_ThreadManager::onDeinit()
    {
        std::vector<ThreadDeinitFunctor> deinitFunctors;
        {
            Lock lock(mInitDeinitMutex);
            std::copy(
                mThreadDeinitFunctors.begin(), 
                mThreadDeinitFunctors.end(), 
                std::back_inserter(deinitFunctors));
        }

        std::for_each(
            deinitFunctors.begin(), 
            deinitFunctors.end(), 
            boost::bind(&ThreadDeinitFunctor::operator(), _1));
    }

    void I_ThreadManager::addThreadInitFunctor(ThreadInitFunctor threadInitFunctor)
    {
        Lock lock(mInitDeinitMutex);
        mThreadInitFunctors.push_back(threadInitFunctor);
    }

    void I_ThreadManager::addThreadDeinitFunctor(ThreadDeinitFunctor threadDeinitFunctor)
    {
        Lock lock(mInitDeinitMutex);
        mThreadDeinitFunctors.push_back(threadDeinitFunctor);
    }

    // FixedThreadPool

    FixedThreadPool::FixedThreadPool(std::size_t threadCount) :
        mThreadCount(threadCount),
        mStarted(RCF_DEFAULT_INIT)
    {
        RCF1_TRACE("");
    }

    void FixedThreadPool::repeatTask(
        const ThreadInfoPtr &threadInfoPtr,
        const Task &task,
        int timeoutMs,
        const volatile bool &stopFlag)
    {

        RCF1_TRACE("");
        setThreadInfoPtr(threadInfoPtr);

        setMyThreadName();

        onInit();

        bool taskFlag = false;
        while (!stopFlag && !taskFlag)
        {
            try
            {
                while (!stopFlag && !taskFlag)
                {
                    taskFlag = task(timeoutMs, stopFlag, false);
                }
            }
            catch(const std::exception &e)
            {
                RCF1_TRACE("worker thread: exception")(e);
            }
            catch(...)
            {
                RCF1_TRACE("worker thread: non std::exception derived exception");
            }
        }

        onDeinit();

        RCF1_TRACE("")(stopFlag);
    }

    // not synchronized
    void FixedThreadPool::start(const volatile bool &stopFlag)
    {
        RCF1_TRACE("");
        if (!mStarted)
        {
            RCF_ASSERT(mThreads.empty())(mThreads.size());
            mThreads.clear();
            mThreads.resize(mThreadCount);
            for (std::size_t i=0; i<mThreads.size(); ++i)
            {
                ThreadInfoPtr threadInfoPtr( new ThreadInfo( shared_from_this()));

                ThreadPtr threadPtr(new Thread(
                    boost::bind(
                    &FixedThreadPool::repeatTask,
                    this,
                    threadInfoPtr,
                    mTask,
                    1000,
                    boost::ref(stopFlag))));

                mThreads[i] = threadPtr;
            }
            mStarted = true;
        }
    }

    // not synchronized
    void FixedThreadPool::stop(bool wait)
    {
        RCF1_TRACE("")(wait);
        if (mStarted)
        {
            for (std::size_t i=0; i<mThreads.size(); ++i)
            {
                if (mStopFunctor)
                {
                    mStopFunctor();
                }
                if (wait)
                {
                    mThreads[i]->join();
                    mThreads[i].reset();
                }
            }
            mThreads.clear();
            if (wait)
            {
                mStarted = false;
            }
        }
    }

    void FixedThreadPool::setTask(const Task &task)
    {
        RCF_ASSERT(!mStarted);
        mTask = task;
    }

    void FixedThreadPool::setStopFunctor(const StopFunctor &stopFunctor)
    {
        RCF_ASSERT(!mStarted);
        mStopFunctor = stopFunctor;
    }

    // DynamicThreadPool

    DynamicThreadPool::DynamicThreadPool(
        std::size_t threadTargetCount,
        std::size_t threadMaxCount) :
            mStarted(RCF_DEFAULT_INIT),
            mThreadTargetCount(threadTargetCount),
            mThreadMaxCount(threadMaxCount),
            mParkedCount(RCF_DEFAULT_INIT),
            mUnparkCount(RCF_DEFAULT_INIT),
            mStopFlag(RCF_DEFAULT_INIT),
            mpUserStopFlag(RCF_DEFAULT_INIT)
    {
        RCF1_TRACE("");
        RCF_ASSERT(
            mThreadTargetCount < mThreadMaxCount)
            (mThreadTargetCount)(mThreadMaxCount);
    }

    std::size_t DynamicThreadPool::getParkedCount()
    {
        Lock lock(mParkMutex);
        return mParkedCount;
    }

    bool DynamicThreadPool::launchThread(const volatile bool &userStopFlag)
    {
        RCF1_TRACE("")(mParkedCount)(mUnparkCount)(mStopFlag)(userStopFlag);

        RCF_ASSERT(
            static_cast<int>(mThreads.size()) <= mThreadMaxCount)
            (mThreads.size())(mThreadMaxCount);

        if (static_cast<int>(mThreads.size()) == mThreadMaxCount)
        {
            return false; // out of threads
        }
        else
        {
            ThreadInfoPtr threadInfoPtr( new ThreadInfo(shared_from_this()));

            ThreadPtr threadPtr( new Thread(
                boost::bind(
                    &DynamicThreadPool::repeatTask,
                    this,
                    threadInfoPtr,
                    mTask,
                    1000,
                    boost::ref(userStopFlag))));

            mThreads.push_back(threadPtr);

            return true;
        }
    }

    void DynamicThreadPool::notifyBusy()
    {
        RCF1_TRACE("")(mParkedCount)(mUnparkCount)(mStopFlag);

        if (!mStopFlag && !getThreadInfoPtr()->mBusy)
        {
            getThreadInfoPtr()->mBusy = true;

            Lock lock(mParkMutex);
            if (mParkedCount == 0)
            {
                !launchThread(*mpUserStopFlag) ? ++mUnparkCount :0;
            }
            else
            {
                mUnparkCondition.notify_one();
                --mParkedCount;
            }
        }
    }

    void DynamicThreadPool::notifyReady()
    {
        RCF1_TRACE("")(mParkedCount)(mUnparkCount)(mStopFlag);

        if (!mStopFlag && getThreadInfoPtr()->mBusy)
        {
            {
                Lock lock(mParkMutex);
                if (mUnparkCount > 0)
                {
                    --mUnparkCount;
                }
                else
                {
                    ++mParkedCount;
                    mUnparkCondition.wait(lock);
                }
            }
           
            getThreadInfoPtr()->mBusy = false;
        }
    }

    void DynamicThreadPool::repeatTask(
        const RCF::ThreadInfoPtr &threadInfoPtr,
        const Task &task,
        int timeoutMs,
        const volatile bool &stopFlag)
    {
        RCF1_TRACE("")(mParkedCount)(mUnparkCount)(mStopFlag);
        setThreadInfoPtr(threadInfoPtr);

        setMyThreadName();

        onInit();

        bool taskFlag = false;
        while (!stopFlag && !taskFlag)
        {
            try
            {
                while (!stopFlag && !taskFlag)
                {
                    taskFlag = task(timeoutMs, stopFlag, false);
                    notifyReady();
                }
            }
            catch(const std::exception &e)
            {
                RCF1_TRACE("worker thread: exception")(e);
            }
            catch(...)
            {
                RCF1_TRACE("worker thread: non std:: derived exception");
            }           
        }

        onDeinit();

        RCF_TRACE("")(stopFlag);
    }

    // not synchronized
    void DynamicThreadPool::start(const volatile bool &stopFlag)
    {
        RCF1_TRACE("")(mParkedCount)(mUnparkCount)(mStopFlag);

        if (!mStarted)
        {
            RCF_ASSERT(mThreads.empty())(mThreads.size());
            mThreads.clear();
            mParkedCount = 0;
            mUnparkCount = 0;
            mStopFlag = false;
            mpUserStopFlag = &stopFlag;

            for (std::size_t i=0; i<mThreadTargetCount; ++i)
            {
                bool ok = launchThread(stopFlag);
                RCF_ASSERT(ok);
            }

            mStarted = true;
        }
    }

    // not synchronized
    void DynamicThreadPool::stop(bool wait)
    {
        RCF1_TRACE("")(mParkedCount)(mUnparkCount)(mStopFlag)(wait);

        if (mStarted)
        {
            {
                Lock lock(mParkMutex);
                mStopFlag = true;
                mUnparkCondition.notify_all();
                mParkedCount = 0;
            }

            for (std::size_t i=0; i<mThreads.size(); ++i)
            {
                if (mStopFunctor)
                {
                    mStopFunctor();
                }
                if (wait)
                {
                    mThreads[i]->join();
                }
            }
            mThreads.clear();
            if (wait)
            {
                mStarted = false;
            }
        }
    }

    void DynamicThreadPool::setTask(const Task &task)
    {
        RCF_ASSERT(!mStarted);
        mTask = task;
    }

    void DynamicThreadPool::setStopFunctor(const StopFunctor &stopFunctor)
    {
        RCF_ASSERT(!mStarted);
        mStopFunctor = stopFunctor;
    }

} // namespace RCF




By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Australia Australia
Software developer, from Sweden and now living in Canberra, Australia, working on distributed C++ applications. When he is not programming, Jarl enjoys skiing and playing table tennis. He derives immense satisfaction from referring to himself in third person.

Comments and Discussions