Click here to Skip to main content
15,891,896 members
Articles / Programming Languages / C++

RCF - Interprocess Communication for C++

Rate me:
Please Sign up or sign in to vote.
4.94/5 (147 votes)
25 Oct 2011CPOL20 min read 4.6M   8.4K   331  
A server/client IPC framework, using the C++ preprocessor as an IDL compiler.
//*****************************************************************************
// RCF - Remote Call Framework
// Copyright (c) 2005. All rights reserved.
// Developed by Jarl Lindrud.
// Contact: jlindrud@hotmail.com .
//*****************************************************************************

#include <RCF/EndpointBrokerService.hpp>

#include <RCF/CurrentSession.hpp>
#include <RCF/RcfServer.hpp>

namespace RCF {

    EndpointBroker::EndpointBroker(
        ServerTransportPtr serverTransportPtr,
        const std::string &endpointName, 
        const std::string &endpointClientPassword, 
        const std::string &endpointServerPassword) :
            mServerTransportPtr(serverTransportPtr),
            mEndpointName(endpointName),
            mEndpointClientPassword(endpointClientPassword),
            mEndpointServerPassword(endpointServerPassword)
    {}

    int EndpointBroker::bindToEndpoint()
    {
        RCF_ASSERT( mMasterConnection.get() != NULL );
        if (mConnections.size() == 0)
        {
            if (mMasterConnection->getClientStub().isConnected())
            {
                try 
                { 
                    mMasterConnection->spawnConnections(Oneway, 3); 
                    return RCF_ERROR_ENDPOINT_RETRY;
                }
                catch(const RCF::Exception &e)
                {
                    RCF_TRACE("")(e);
                }
            }
            return RCF_ERROR_ENDPOINT_DOWN;
        }
        else
        {
            RCF_ASSERT(mConnections.size() > 0);
            while (!mConnections.empty())
            {
                SessionPtr sessionPtr = mConnections.back();
                boost::shared_ptr<I_Session> sessionPtr0 = boost::static_pointer_cast<I_Session>(sessionPtr);
                mConnections.pop_back();
                RCF_ASSERT(sessionPtr.get());
                I_ServerTransport &serverTransport = getCurrentSessionPtr()->getProactorPtr()->getServerTransport();
                I_ServerTransportEx &serverTransportEx = dynamic_cast<I_ServerTransportEx &>(serverTransport);
                if (serverTransportEx.isConnected( sessionPtr0))
                {
                    serverTransportEx.reflect(getCurrentISessionPtr(), sessionPtr0);
                    return RCF_ERROR_OK;
                }
            }
            RCF_ASSERT( mConnections.empty());
            return bindToEndpoint(); // try again, this time try the master connection
        }
    }


    EndpointBrokerService::EndpointBrokerService() :
        mEndpointBrokersMutex(WriterPriority)
    {}

    void EndpointBrokerService::onServiceAdded(RcfServer &server)
    {
        mServerTransportPtr = server.getServerTransportPtr(); // need to obtain a shared ptr to the transport, so it doesn't get deleted before we do
        server.bind<I_EndpointBroker>(*this);
    }

    void EndpointBrokerService::onServiceRemoved(RcfServer &)
    {}

    // remotely invoked
    bool EndpointBrokerService::openEndpoint(const std::string &endpointName, const std::string &endpointClientPassword, std::string &endpointServerPassword)
    {
        WriteLock writeLock(mEndpointBrokersMutex);

        if (mEndpointBrokers[endpointName].get() && !mEndpointBrokers[endpointName]->mMasterConnection->getClientStub().isConnected())
        {
            mEndpointBrokers[endpointName].reset();
        }

        if (mEndpointBrokers[endpointName].get() == NULL) // TODO: check for zombie broker (e.g. all connections defunct)
        {
            I_ServerTransport &serverTransport = getCurrentSessionPtr()->getProactorPtr()->getServerTransport();
            I_ServerTransportEx *pServerTransportEx = dynamic_cast<I_ServerTransportEx *>(&serverTransport);
            if (pServerTransportEx)
            {
                I_ServerTransportEx &serverTransportEx = *pServerTransportEx;
                mEndpointBrokers[endpointName].reset(new EndpointBroker(mServerTransportPtr, endpointName, endpointClientPassword, endpointServerPassword));
                ClientTransportAutoPtr cllientTransportAutoPtr( serverTransportEx.createClientTransport(getCurrentISessionPtr()) );
                boost::shared_ptr< RcfClient<I_EndpointServer> > clientPtr( new RcfClient<I_EndpointServer>(cllientTransportAutoPtr) );
                mEndpointBrokers[endpointName]->mMasterConnection = clientPtr;
                return true;
            }
        }

        return false;
    }

    // remotely invoked
    bool EndpointBrokerService::closeEndpoint(const std::string &endpointName, const std::string &endpointServerPassword)
    {
        RCF_UNUSED_VARIABLE(endpointServerPassword);
        WriteLock writeLock(mEndpointBrokersMutex);

        if (mEndpointBrokers[endpointName].get())
        {
            mEndpointBrokers[endpointName].reset(); // TODO: physically remove the entry, not just reset it
        }
        return true;
    }

    // remotely invoked
    bool EndpointBrokerService::establishEndpointConnection(const std::string &endpointName, const std::string &endpointServerPassword)
    {
        WriteLock writeLock(mEndpointBrokersMutex); // TODO: read lock here and an extra mutex for each endpoint broker instead

        if (mEndpointBrokers[endpointName].get() && mEndpointBrokers[endpointName]->mEndpointServerPassword == endpointServerPassword)
        {
            mEndpointBrokers[endpointName]->mConnections.push_back(getCurrentSessionPtr());
            return true;
        }
        return false;
    }

    // remotely invoked
    int EndpointBrokerService::bindToEndpoint(const std::string &endpointName, const std::string &endpointClientPassword)
    {
        WriteLock writeLock(mEndpointBrokersMutex); // TODO: read lock here and an extra mutex for each endpoint broker instead

        // TODO: don't use mEndpointBrokers[] until it is known that the endpoint broker exists

        if (NULL == mEndpointBrokers[endpointName].get()) 
        {
            return RCF_ERROR_ENDPOINT_ABSENT;
        }
        else if (mEndpointBrokers[endpointName]->mEndpointClientPassword != endpointClientPassword)
        {
            return RCF_ERROR_ENDPOINT_PASSWORD;
        }
        else
        {
            int err = mEndpointBrokers[endpointName]->bindToEndpoint(); // bind current session to endpoint
            if (err == RCF_ERROR_ENDPOINT_DOWN)
            {
                mEndpointBrokers[endpointName].reset();
            }
            return err;
        }
    }

} // namespace RCF

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Australia Australia
Software developer, from Sweden and now living in Canberra, Australia, working on distributed C++ applications. When he is not programming, Jarl enjoys skiing and playing table tennis. He derives immense satisfaction from referring to himself in third person.

Comments and Discussions