Click here to Skip to main content
15,884,237 members
Articles / Programming Languages / C++

RCF - Interprocess Communication for C++

Rate me:
Please Sign up or sign in to vote.
4.94/5 (147 votes)
25 Oct 2011CPOL20 min read 4.6M   8.4K   331  
A server/client IPC framework, using the C++ preprocessor as an IDL compiler.
// uncomment to enable VLD leak detection - will automatically link to required libs
//#include "vld.h"
//#include "vldapi.h"

#include <string>
#include <strstream>

#include <boost/any.hpp>
#include <boost/lexical_cast.hpp>

#include <RCF/test/TestMinimal.hpp>

#include <RCF/AsyncFilter.hpp>
#include <RCF/FilterService.hpp>
#include <RCF/Idl.hpp>
#include <RCF/OpenSslEncryptionFilter.hpp>
#include <RCF/RcfServer.hpp>
#include <RCF/ZlibCompressionFilter.hpp>

#include <RCF/test/EndpointFactories.hpp>
#include <RCF/test/TransportFactories.hpp>
#include <RCF/util/CommandLine.hpp>
#include <RCF/util/Profile.hpp>

#include <SF/AdlWorkaround.hpp>

#ifdef BOOST_WINDOWS
#include <RCF/SspiFilter.hpp>
#endif

namespace Test_ZeroAllocation {

    class ContainsByteBuffer
    {
    public:
        RCF::ByteBuffer mByteBuffer;

        template<typename Archive>
        void serialize(Archive &archive, const unsigned int)
        {
            archive & mByteBuffer;
        }
    };

    class Echo
    {
    public:
        std::string echo(const std::string &s)
        {
            return s;
        }

        RCF::ByteBuffer echo(RCF::ByteBuffer byteBuffer1, const std::string &s, RCF::ByteBuffer byteBuffer2)
        {
            void *pv1 = byteBuffer1.getPtr() ;
            std::size_t pvlen1 = byteBuffer1.getLength() ;

            void *pv2 = byteBuffer2.getPtr() ;
            std::size_t pvlen2 = byteBuffer2.getLength() ;

            return byteBuffer2;
        }

        RCF::ByteBuffer echo(RCF::ByteBuffer byteBuffer)
        {
            return byteBuffer;
        }

        ContainsByteBuffer echo(ContainsByteBuffer c)
        {
            return c;
        }

    };

    RCF_BEGIN(I_Echo, "I_Echo")
        RCF_METHOD_R1(std::string, echo, const std::string &)
        RCF_METHOD_R3(RCF::ByteBuffer, echo, RCF::ByteBuffer, std::string, RCF::ByteBuffer)
        RCF_METHOD_R1(RCF::ByteBuffer, echo, RCF::ByteBuffer)
        RCF_METHOD_R1(ContainsByteBuffer, echo, ContainsByteBuffer)
    RCF_END(I_Echo)

    bool gExpectAllocations = true;
    std::size_t gnAllocations = 0;

} // namespace Test_ZeroAllocation

// User-defined operator new.
void *operator new(size_t bytes)
{
    BOOST_CHECK(Test_ZeroAllocation::gExpectAllocations);
    ++Test_ZeroAllocation::gnAllocations;
    return malloc(bytes);
}

// User-defined operator delete.
void operator delete(void *pv) throw()
{
    free(pv);
}

RCF_BROKEN_COMPILER_TYPE_TRAITS_SPECIALIZATION(Test_ZeroAllocation::ContainsByteBuffer)

SF_ADL_WORKAROUND(RCF, ByteBuffer)

int RCF_TEST_MAIN(int argc, char **argv)
{

    printTestHeader(__FILE__);

    using namespace Test_ZeroAllocation;

    util::CommandLineOption<std::string>    clIp("ip", util::PortNumbers::getSingleton().getIp(), "ip");
    util::CommandLineOption<int>            clPort("port", util::PortNumbers::getSingleton().getCurrent(), "port");
    util::CommandLineOption<std::string>    clScert("scert", TEMP_DIR "ssCert2.pem", "OpenSSL server certificate");
    util::CommandLineOption<std::string>    clSpwd("spwd", "mt2316", "OpenSSL server certificate password");
    util::CommandLineOption<std::string>    clCcert("ccert", TEMP_DIR "ssCert1.pem", "OpenSSL client certificate");
    util::CommandLineOption<std::string>    clCpwd("cpwd", "mt2316", "OpenSSL client certificate password");
    util::CommandLine::getSingleton().parse(argc, argv);

    // TODO: make ip and port options part of util::PortNumbers
    util::PortNumbers::getSingleton().setCurrent(clPort);
    util::PortNumbers::getSingleton().setIp(clIp);

#if !defined(BOOST_WINDOWS)
    BOOST_CHECK(1 == 0 && "Zero-allocation not yet supported with asio transports");
    return boost::exit_success;
#endif

#if defined(BOOST_WINDOWS) && defined(__MINGW32__) && __GNUC__ == 3 && __GNUC_MINOR__ == 4
    BOOST_CHECK(1 == 0 && "Zero-allocation not working in gcc 3.4 for unknown reasons");
    return boost::exit_success;
#endif

    // TODO: run this test over all transports
    for (unsigned int i=0; i<RCF::getTransportFactories().size(); ++i)
    {

        RCF::TransportFactoryPtr transportFactoryPtr( RCF::getTransportFactories()[i] );
        std::pair<RCF::ServerTransportPtr, RCF::ClientTransportAutoPtrPtr> transports = transportFactoryPtr->createTransports();
        RCF::ServerTransportPtr serverTransportPtr( transports.first );
        RCF::ClientTransportAutoPtr clientTransportAutoPtr( *transports.second );
        bool transportFiltersSupported = transportFactoryPtr->isConnectionOriented();

#ifdef RCF_USE_BOOST_ASIO
        // TODO
        if (typeid(*serverTransportPtr) == typeid(RCF::TcpAsioServerTransport))
        {
            // still got some work to do on zero allocation
            continue;
        }
#endif

        RCF::writeTransportTypes(std::cout, *serverTransportPtr, *clientTransportAutoPtr);
        std::string transportDesc = "Transport " + boost::lexical_cast<std::string>(i) + ": ";

        serverTransportPtr->setMaxMessageLength(-1);
        clientTransportAutoPtr->setMaxMessageLength(-1);

        Echo echo;
        RCF::RcfServer server(serverTransportPtr);
        server.bind( (I_Echo*) 0, echo);

        RCF::FilterServicePtr filterServicePtr(new RCF::FilterService());
        filterServicePtr->addFilterFactory( RCF::FilterFactoryPtr( new RCF::IdentityFilterFactory()));
        filterServicePtr->addFilterFactory( RCF::FilterFactoryPtr( new RCF::XorFilterFactory()));
        filterServicePtr->addFilterFactory( RCF::FilterFactoryPtr( new RCF::ZlibStatefulCompressionFilterFactory()));
        filterServicePtr->addFilterFactory( RCF::FilterFactoryPtr( new RCF::ZlibStatelessCompressionFilterFactory()));
        filterServicePtr->addFilterFactory( RCF::FilterFactoryPtr( new RCF::OpenSslEncryptionFilterFactory(clScert, clSpwd)));
#ifdef BOOST_WINDOWS
        filterServicePtr->addFilterFactory( RCF::FilterFactoryPtr( new RCF::SspiNtlmFilterFactory()));
#endif
        server.addService(filterServicePtr);
       
        server.start();
       
        // make sure all allocations have taken place
        Platform::OS::SleepMs(1000);

        RcfClient<I_Echo> client(clientTransportAutoPtr->clone());
       
        //client.getClientStub().setRemoteCallTimeoutMs(1000*60*60);

        {
            std::size_t nAllocations = gnAllocations;
            std::auto_ptr<int> apn(new int(17));
            apn.reset();
            BOOST_CHECK(gnAllocations != nAllocations);
        }

        {
            std::string s = "asdfasdfasdfasdfasdfasdfasdfasdfasdfasdf";
            RCF::ByteBuffer byteBuffer0( (char*) s.c_str(), s.length());

            std::vector<RCF::FilterPtr> filters;

            // with no transport or payload filters

            // prime the pump
            client.echo(byteBuffer0);
            Platform::OS::SleepMs(1000);

            {
                std::string s0 = byteBuffer0.string();
                gExpectAllocations = false;
                RCF::ByteBuffer byteBuffer1 = client.echo(byteBuffer0);
                gExpectAllocations = true;
                std::string s1 = byteBuffer1.string();
                BOOST_CHECK(s0 == s1);
            }

            {
                util::Profile profile(transportDesc + "1000 calls, no dynamic allocations");
                gExpectAllocations = false;
                for(unsigned int i=0; i<1000; ++i)
                {
                    RCF::ByteBuffer byteBuffer1 = client.echo(byteBuffer0);
                }
                gExpectAllocations = true;
            }

            // with both transport and payload filters

            if (transportFiltersSupported)
            {
                filters.clear();
                filters.push_back( RCF::FilterPtr( new RCF::XorFilter()));
                filters.push_back( RCF::FilterPtr( new RCF::XorFilter()));
                filters.push_back( RCF::FilterPtr( new RCF::XorFilter()));
                client.getClientStub().requestTransportFilters(filters);
            }

            filters.clear();
            filters.push_back( RCF::FilterPtr( new RCF::XorFilter()));
            filters.push_back( RCF::FilterPtr( new RCF::XorFilter()));
            filters.push_back( RCF::FilterPtr( new RCF::XorFilter()));
            client.getClientStub().setMessageFilters(filters);

            // prime the pump
            client.echo(byteBuffer0);
            Platform::OS::SleepMs(1000);

            for (int i=0; i<3; ++i)
            {
                // byteBuffer0 will be transformed in place, so we need to be a bit careful with the before/after comparison.
                // s0 and s1 will change on each pass.

                std::string s0 = byteBuffer0.string();
                gExpectAllocations = false;
                RCF::ByteBuffer byteBuffer1 = client.echo(byteBuffer0);
                gExpectAllocations = true;
                std::string s1 = byteBuffer1.string();
                BOOST_CHECK(s0 == s1);
            }

            {
                util::Profile profile(transportDesc + "1000 calls, no dynamic allocations, 3 transport filters + 3 payload filters");
                gExpectAllocations = false;
                for(unsigned int i=0; i<1000; ++i)
                {
                    RCF::ByteBuffer byteBuffer1 = client.echo(byteBuffer0);
                }
                gExpectAllocations = true;
            }
           
            filters.clear();
            filters.push_back( RCF::FilterPtr( new RCF::ZlibStatelessCompressionFilter()));
            client.getClientStub().setMessageFilters(filters);
            client.echo(byteBuffer0);

            {
                util::Profile profile(transportDesc + "1000 calls, no dynamic allocations, <zlib stateless> payload filters");
                gExpectAllocations = false;
                for(unsigned int i=0; i<1000; ++i)
                {
                    RCF::ByteBuffer byteBuffer1 = client.echo(byteBuffer0);
                }
                gExpectAllocations = true;
            }
           

            if (transportFiltersSupported)
            {
               
                filters.clear();
                filters.push_back( RCF::FilterPtr( new RCF::ZlibStatefulCompressionFilter()));
                filters.push_back( RCF::FilterPtr( new RCF::OpenSslEncryptionFilter(clCcert, clCpwd)));
                client.getClientStub().requestTransportFilters(filters);
               
                client.echo(byteBuffer0);

                util::Profile profile(transportDesc + "1000 calls, <zlib stateful><OpenSSL> transport filter");
                gExpectAllocations = false;
                for(unsigned int i=0; i<1000; ++i)
                {
                    RCF::ByteBuffer byteBuffer1 = client.echo(byteBuffer0);
                }
                gExpectAllocations = true;
            }

#ifdef BOOST_WINDOWS

            if (transportFiltersSupported)
            {
                filters.clear();
                filters.push_back( RCF::FilterPtr( new RCF::ZlibStatefulCompressionFilter()));
                filters.push_back( RCF::FilterPtr( new RCF::SspiNtlmFilter()));
                client.getClientStub().requestTransportFilters(filters);
                client.echo(byteBuffer0);

                util::Profile profile(transportDesc + "1000 calls, <zlib stateful><sspi ntlm> transport filter");
                gExpectAllocations = false;
                for(unsigned int i=0; i<1000; ++i)
                {
                    RCF::ByteBuffer byteBuffer1 = client.echo(byteBuffer0);
                }
                gExpectAllocations = true;
            }

#endif

            {
                // try serialization (not marshalling) of ByteBuffer
                ContainsByteBuffer c1;
                c1.mByteBuffer = byteBuffer0;
                ContainsByteBuffer c2 = client.echo(c1);

                gExpectAllocations = false;
                c2.mByteBuffer.clear();
                c2 = client.echo(c1);
                gExpectAllocations = true;
            }

            {
                // try serialization (not marshalling) of ByteBuffer with all serialization protocols
                for(int protocol=1; protocol<10; ++protocol)
                {
                    if (RCF::isSerializationProtocolSupported(protocol))
                    {
                        client.getClientStub().setSerializationProtocol(protocol);

                        ContainsByteBuffer c1;
                        c1.mByteBuffer = byteBuffer0;
                        ContainsByteBuffer c2 = client.echo(c1);
                        BOOST_CHECK(c2.mByteBuffer.getLength() == c1.mByteBuffer.getLength());

                        // will get memory allocations here when using boost serialization
                        c2.mByteBuffer.clear();
                        c2 = client.echo(c1);
                        BOOST_CHECK(c2.mByteBuffer.getLength() == c1.mByteBuffer.getLength());
                    }
                }
            }
           
        }
        server.stop();
    }

    return boost::exit_success;
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Australia Australia
Software developer, from Sweden and now living in Canberra, Australia, working on distributed C++ applications. When he is not programming, Jarl enjoys skiing and playing table tennis. He derives immense satisfaction from referring to himself in third person.

Comments and Discussions