Click here to Skip to main content
15,881,882 members
Articles / Programming Languages / C++

RCF - Interprocess Communication for C++

Rate me:
Please Sign up or sign in to vote.
4.94/5 (147 votes)
25 Oct 2011CPOL20 min read 4.6M   8.4K   331  
A server/client IPC framework, using the C++ preprocessor as an IDL compiler.
// uncomment to enable VLD leak detection - will automatically link to required libs
//#include "vld.h"
//#include "vldapi.h"

#include <string>

#include <boost/bind.hpp>

#include <RCF/test/TestMinimal.hpp>

#include <RCF/Idl.hpp>
#include <RCF/RcfServer.hpp>
#include <RCF/test/EndpointFactories.hpp>
#include <RCF/test/TransportFactories.hpp>
#include <RCF/util/CommandLine.hpp>

namespace Test_Minimal {

    class Echo
    {
    public:
        Echo()
        {
            RCF::Thread t( boost::bind(&Echo::setLock, this));
            t.join();
        }

        std::string echo(const std::string &s)
        {
            return s;
        }

        void stopServer()
        {
            mLockPtr->unlock();
        }

        void wait()
        {
            RCF::Lock lock(mMutex);
        }

    private:

        void setLock()
        {
            mLockPtr.reset( new RCF::Lock(mMutex));
        }

        RCF::Mutex mMutex;
        RCF::LockPtr mLockPtr;

    };

    RCF_BEGIN(I_Echo, "I_Echo")
        RCF_METHOD_R1(std::string, echo, const std::string &)
        RCF_METHOD_V0(void, stopServer)
    RCF_END(I_Echo)

} // namespace Test_Minimal

int RCF_TEST_MAIN(int argc, char **argv)
{

    printTestHeader(__FILE__);

    using namespace Test_Minimal;

    util::CommandLineOption<bool>           clServer("server", false, "act as server");
    util::CommandLineOption<bool>           clClient("client", false, "act as client");
    util::CommandLineOption<std::string>    clIp("ip", util::PortNumbers::getSingleton().getIp(), "ip");
    util::CommandLineOption<int>            clPort("port", util::PortNumbers::getSingleton().getCurrent(), "port");
    util::CommandLine::getSingleton().parse(argc, argv);

    // TODO: make ip and port options part of util::PortNumbers
    util::PortNumbers::getSingleton().setCurrent(clPort);
    util::PortNumbers::getSingleton().setIp(clIp);

    bool bServer = true;
    bool bClient = true;
    if (clServer && clClient)
    {
        bServer = true;
        bClient = true;
    }
    else if (clServer)
    {
        bServer = true;
        bClient = false;
    }
    else if (clClient)
    {
        bServer = false;
        bClient = true;
    }

    for (unsigned int i=0; i<RCF::getTransportFactories().size(); ++i)
    {

        RCF::TransportFactoryPtr transportFactoryPtr = RCF::getTransportFactories()[i];
        std::pair<RCF::ServerTransportPtr, RCF::ClientTransportAutoPtrPtr> transports = transportFactoryPtr->createTransports();
        RCF::ServerTransportPtr serverTransportPtr( transports.first );
        RCF::ClientTransportAutoPtr clientTransportAutoPtr( *transports.second );

        RCF::writeTransportTypes(std::cout, *serverTransportPtr, *clientTransportAutoPtr);

        serverTransportPtr->setMaxMessageLength(1000*10);
        clientTransportAutoPtr->setMaxMessageLength(1000*10);

        std::string s0 = "something special";

        Echo echo;
        RCF::RcfServer server( serverTransportPtr );
        server.bind( (I_Echo*) 0, echo);
        if (bServer)
        {
            if (!bClient)
            {
                RCF::I_IpServerTransport &ipServerTransport = dynamic_cast<RCF::I_IpServerTransport &>(*serverTransportPtr);
                ipServerTransport.setNetworkInterface("0.0.0.0");
            }
            server.start();
        }
        else
        {
            Platform::OS::SleepMs(1000);
        }

        if (bClient)
        {
            // 5 calls on each serialization protocol
            for(int protocol=1; protocol<10; ++protocol)
            {
                if (RCF::isSerializationProtocolSupported(protocol))
                {
                    std::cout
                        << "Serialization protocol: "
                        << RCF::getSerializationProtocolName(protocol)
                        << std::endl;

                    RcfClient<I_Echo> client( clientTransportAutoPtr->clone());
                    client.getClientStub().setSerializationProtocol(protocol);
                    for (int k=0; k<5; ++k)
                    {
                        std::string s = client.echo(s0);
                        BOOST_CHECK(s0 == s);
                    }
                }
            }

            // all calls on the same connection
            RcfClient<I_Echo> client( clientTransportAutoPtr->clone() );

            for (int j=0; j<100; ++j)
            {
                std::string s = client.echo(s0);
                BOOST_CHECK(s0 == s);
            }

            // new connection for each call
            for (int j=0; j<100; ++j)
            {
                std::string s = RcfClient<I_Echo>( clientTransportAutoPtr->clone() ).echo(s0);
                BOOST_CHECK(s0 == s);
            }

            RcfClient<I_Echo>(clientTransportAutoPtr->clone()).stopServer();
        }

        if (bServer)
        {
            echo.wait();
            server.stop();
        }
    }

    for (unsigned int i=0; i<RCF::getEndpointPairFactories().size(); ++i)
    {

        std::string s0 = "something special";

        RCF::EndpointPair endpointPair = RCF::getEndpointPairFactories()[i]->createEndpointPair();
        const RCF::I_Endpoint &serverEndpoint = *endpointPair.first;
        const RCF::I_Endpoint &clientEndpoint = *endpointPair.second;

        RCF::writeEndpointTypes(std::cout, serverEndpoint, clientEndpoint);

        Echo echo;
        RCF::RcfServer server(serverEndpoint);
        server.bind( (I_Echo*) 0, echo);
        if (bServer)
        {
            server.start();
        }
        else
        {
            Platform::OS::SleepMs(1000);
        }

        if (bClient)
        {
            // all calls on the same connection
            RcfClient<I_Echo> client( clientEndpoint );

            for (int j=0; j<100; ++j)
            {
                RCF_TRACE("")(j);
                std::string s = client.echo(s0);
                BOOST_CHECK(s0 == s);
            }

            // new connection for each call
            for (int j=0; j<100; ++j)
            {
                std::string s = RcfClient<I_Echo>( clientEndpoint ).echo(s0);
                BOOST_CHECK(s0 == s);
            }

            RcfClient<I_Echo>(clientEndpoint).stopServer();
        }

        if (bServer)
        {
            echo.wait();
            server.stop();
        }

    }

    for (unsigned int i=0; i<RCF::getEndpointPairFactories().size(); ++i)
    {
        RCF::EndpointPair endpointPair = RCF::getEndpointPairFactories()[i]->createNonListeningEndpointPair();

        RCF::EndpointPtr serverEndpointPtr(endpointPair.first);
        RCF::EndpointPtr clientEndpointPtr(endpointPair.second);

        Echo echo;
        RCF::RcfServer server( *serverEndpointPtr );
        server.bind( (I_Echo*) 0, echo);
        server.start();
        server.stop();
    }

    {
        RCF::RcfServer server( RCF::TcpEndpoint(0));
        Echo echo;
        server.bind( (I_Echo*) 0, echo);
        server.start();

        int port = dynamic_cast<RCF::I_IpServerTransport &>(
            server.getServerTransport()).getPort();
        std::string s = "asdf";
        BOOST_CHECK(s == RcfClient<I_Echo>( RCF::TcpEndpoint(port)).echo(s));

        server.stop();
    }

    return boost::exit_success;
}

/*
#include <iostream>
#include <sstream>
#include <string>
#include <vector>

#include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/serialization/vector.hpp>
#include <boost/test/minimal.hpp>

template<typename Archive>
struct X
{
    template<typename T>
        void operator<<(const T &t)
    {
        Archive &archive = (*(Archive *) NULL);
        archive << t;
    }
};

void dummy()
{
    typedef boost::archive::text_oarchive Archive;
    X<Archive> &x = * (X<Archive> *) NULL;
    std::vector<char> *pt = NULL;

    // uncomment this line to cause the test to fail
    //x << pt;
}

int test_main(int argc, char* argv[])
{
    std::vector<char> v0(25, '%');
    std::vector<char> v1;

    std::ostringstream ostr;
    boost::archive::text_oarchive(ostr) & v0;

    std::istringstream istr(ostr.str());
    boost::archive::text_iarchive(istr) & v1;

    std::cout << "v0 size: " << v0.size() << std::endl;
    std::cout << "v1 size: " << v1.size() << std::endl;
    BOOST_CHECK(v0 == v1);

    return boost::exit_success;
}
*/

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Australia Australia
Software developer, from Sweden and now living in Canberra, Australia, working on distributed C++ applications. When he is not programming, Jarl enjoys skiing and playing table tennis. He derives immense satisfaction from referring to himself in third person.

Comments and Discussions