Click here to Skip to main content
15,895,864 members
Articles / Programming Languages / C++

RCF - Interprocess Communication for C++

Rate me:
Please Sign up or sign in to vote.
4.94/5 (147 votes)
25 Oct 2011CPOL20 min read 4.6M   8.4K   331  
A server/client IPC framework, using the C++ preprocessor as an IDL compiler.
#include <string>
#include <boost/lexical_cast.hpp>

#include <RCF/test/TestMinimal.hpp>

#include <RCF/FilterService.hpp>
#include <RCF/Idl.hpp>
#include <RCF/RcfServer.hpp>
#include <RCF/TcpEndpoint.hpp>

#include <RCF/test/EndpointFactories.hpp>
#include <RCF/test/TransportFactories.hpp>

#include "Test_Performance.hpp"

#include <RCF/util/AutoBuild.hpp>
#include <RCF/util/CommandLine.hpp>
#include <RCF/util/PortNumbers.hpp>
#include <RCF/util/Profile.hpp>

#ifdef RCF_USE_OPENSSL
#include <RCF/OpenSslEncryptionFilter.hpp>
#endif

#ifdef RCF_USE_ZLIB
#include <RCF/ZlibCompressionFilter.hpp>
#endif

#ifdef BOOST_WINDOWS
#include <RCF/SspiFilter.hpp>
#endif

namespace Test_Performance {

    void testUdp(const std::string &title, const std::string &ip, int port, bool server, bool client, int calls, int buflen)
    {
        RCF_ASSERT(!(server && client));

        int ret = 0;

        sockaddr_in clientAddr = {0};
        clientAddr.sin_family = AF_INET;
        clientAddr.sin_port = 0;
        clientAddr.sin_addr.s_addr = inet_addr(ip.c_str());//INADDR_ANY;

        sockaddr_in serverAddr = {0};
        serverAddr.sin_family = AF_INET;
        serverAddr.sin_port = htons(port);
        serverAddr.sin_addr.s_addr = inet_addr(ip.c_str());//INADDR_ANY;

        sockaddr_in toAddr = {0};
        toAddr.sin_family = AF_INET;
        toAddr.sin_port = htons(port);
        toAddr.sin_addr.s_addr = inet_addr(ip.c_str());

        sockaddr_in fromAddr = {0};
        int fromAddrLen = sizeof(fromAddr);

        int socket1 = socket( AF_INET, SOCK_DGRAM, 0 );

        int packetsSent = 0;
        int packetsReceived = 0;
        int bytesSent = 0;
        int bytesReceived = 0;

        if (client)
        {
            ret = bind(socket1, (sockaddr *)&clientAddr, sizeof(clientAddr));
            RCF_VERIFY(ret == 0, std::runtime_error("bind()"));

            std::string msg(buflen, 'A');
            std::vector<char> buffer(buflen);

            {
                util::Profile profile( title + "raw udp round trips, " + boost::lexical_cast<std::string>(calls) + " calls");
                for (int i=0; i<calls; ++i)
                {
                    ret = Platform::OS::BsdSockets::sendto(socket1, msg.c_str(), msg.size(), 0, (sockaddr *)&toAddr, sizeof(toAddr));
                    RCF_VERIFY(ret > 0, std::runtime_error("sendto()"));
                    bytesSent += ret;
                    packetsSent += 1;

                    ret = Platform::OS::BsdSockets::recvfrom(socket1, &buffer[0], buffer.size(), 0, (sockaddr *)&fromAddr, &fromAddrLen);
                    RCF_VERIFY(ret > 0, std::runtime_error("recvfrom()"));
                    bytesReceived += ret;
                    packetsReceived += 1;
                }
            }

            std::cout << "Packets sent: "       << packetsSent      << std::endl;
            std::cout << "Bytes sent: "         << bytesSent        << std::endl;
            std::cout << "Packets received: "   << packetsReceived  << std::endl;
            std::cout << "Bytes received: "     << bytesReceived    << std::endl;
        }
        else if (server)
        {
            ret = bind(socket1, (sockaddr *)&serverAddr, sizeof(serverAddr));
            RCF_VERIFY(ret == 0, std::runtime_error("bind()"));

            std::vector<char> buffer(buflen);

            //while (true)
            for (int i=0; i<calls; ++i)
            {
                ret = Platform::OS::BsdSockets::recvfrom(socket1, &buffer[0], buffer.size(), 0, (sockaddr *)&fromAddr, &fromAddrLen);
                RCF_VERIFY(ret > 0, std::runtime_error("recvfrom()"));
                bytesSent += ret;
                packetsSent += 1;

                ret = Platform::OS::BsdSockets::sendto(socket1, &buffer[0], ret, 0, (sockaddr *)&fromAddr, fromAddrLen);
                RCF_VERIFY(ret > 0, std::runtime_error("sendto()"));
                bytesReceived += ret;
                packetsReceived += 1;
            }
        }

        Platform::OS::BsdSockets::closesocket(socket1);
    }

    void testTcp(const std::string &title, const std::string &ip, int port, bool server, bool client, int calls, int buflen)
    {
        RCF_ASSERT(!(server && client));

        int ret = 0;
    /*
        sockaddr_in clientAddr = {0};
        clientAddr.sin_family = AF_INET;
        clientAddr.sin_port = 0;
        clientAddr.sin_addr.s_addr = INADDR_ANY;
    */
        sockaddr_in serverAddr = {0};
        serverAddr.sin_family = AF_INET;
        serverAddr.sin_port = htons(port);
        serverAddr.sin_addr.s_addr = inet_addr(ip.c_str());//INADDR_ANY;

        sockaddr_in toAddr = {0};
        toAddr.sin_family = AF_INET;
        toAddr.sin_port = htons(port);
        toAddr.sin_addr.s_addr = inet_addr(ip.c_str());

        sockaddr_in fromAddr = {0};
        int fromAddrLen = sizeof(fromAddr);

        int socket1 = ::socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);

        int packetsSent = 0;
        int packetsReceived = 0;
        int bytesSent = 0;
        int bytesReceived = 0;

        if (client)
        {
            ret = Platform::OS::BsdSockets::connect(socket1, (sockaddr *)&toAddr, sizeof(toAddr));
            RCF_VERIFY(ret == 0, std::runtime_error("bind()"));

            std::string msg(buflen, 'A');
            std::vector<char> buffer(buflen);

            {
                util::Profile profile( title + "raw tcp round trips, " + boost::lexical_cast<std::string>(calls) + " calls");
                for (int i=0; i<calls; ++i)
                {
                    ret = Platform::OS::BsdSockets::send(socket1, msg.c_str(), msg.size(), 0);
                    RCF_VERIFY(ret == buflen, std::runtime_error("send()"))(ret);
                    bytesSent += ret;
                    packetsSent += 1;

                    ret = Platform::OS::BsdSockets::recv(socket1, &buffer[0], buffer.size(), 0);
                    RCF_VERIFY(ret == buflen, std::runtime_error("recv()"))(ret);
                    bytesReceived += ret;
                    packetsReceived += 1;
                }
            }

            std::cout << "Packets sent: "       << packetsSent      << std::endl;
            std::cout << "Bytes sent: "         << bytesSent        << std::endl;
            std::cout << "Packets received: "   << packetsReceived  << std::endl;
            std::cout << "Bytes received: "     << bytesReceived    << std::endl;
        }
        else if (server)
        {
            ret = bind(socket1, (struct sockaddr*) &serverAddr, sizeof(serverAddr));
            RCF_VERIFY(ret == 0, std::runtime_error("bind()"));

            ret = listen(socket1, 10);
            RCF_VERIFY(ret == 0, std::runtime_error("listen()"));

            std::vector<char> buffer(buflen);

            //while (true)
            {
                int socket2 = Platform::OS::BsdSockets::accept(socket1, (sockaddr *)&fromAddr, &fromAddrLen);
                RCF_VERIFY(socket2 > 0, std::runtime_error("accept()"))(socket2);

                //while (true)
                for (int i=0; i<calls; ++i)
                {
                    ret = Platform::OS::BsdSockets::recv(socket2, &buffer[0], buffer.size(), 0);
                    RCF_VERIFY(ret == 0 || ret == buflen, std::runtime_error("recv()"))(ret);

                    if (ret == 0)
                    {
                        Platform::OS::BsdSockets::closesocket(socket2);
                        break;
                    }

                    bytesSent += ret;
                    packetsSent += 1;

                    ret = Platform::OS::BsdSockets::send(socket2, &buffer[0], ret, 0);
                    RCF_VERIFY(ret == buflen, std::runtime_error("send()"))(ret);

                    bytesReceived += ret;
                    packetsReceived += 1;
                }
            }
        }

        Platform::OS::BsdSockets::closesocket(socket1);
    }

} // namespace Test_Performance

int RCF_TEST_MAIN(int argc, char **argv)
{

    printTestHeader(__FILE__);

    using namespace Test_Performance;

    util::CommandLineOption<bool>           clServer("server", false, "act as server");
    util::CommandLineOption<bool>           clClient("client", false, "act as client");
    util::CommandLineOption<std::string>    clIp("ip", util::PortNumbers::getSingleton().getIp(), "ip");
    util::CommandLineOption<int>            clPort("port", util::PortNumbers::getSingleton().getCurrent(), "port");
    util::CommandLineOption<std::string>    clScert("scert", TEMP_DIR "ssCert2.pem", "OpenSSL server certificate");
    util::CommandLineOption<std::string>    clSpwd("spwd", "mt2316", "OpenSSL server certificate password");
    util::CommandLineOption<std::string>    clCcert("ccert", TEMP_DIR "ssCert1.pem", "OpenSSL client certificate");
    util::CommandLineOption<std::string>    clCpwd("cpwd", "mt2316", "OpenSSL client certificate password");
    util::CommandLineOption<int>            clCalls( "calls", 1000, "number of calls");
    util::CommandLineOption<int>            clCallsLarge( "Calls", 1, "number of large message-size calls");
    util::CommandLineOption<int>            clProtocol("protocol", 1, "serialization protocol (1-4)");
    util::CommandLineOption<int>            clTest( "test", 0, "which test to run, 0 to run them all");
    util::CommandLineOption<int>            clLargeMessageSize("msgsize", 25, "large message size, in Mb");
    util::CommandLineOption<int>            clRawMessageSize("rawmsgsize", 50, "raw message size, in bytes, for non-RCF tcp and udp tests");
    util::CommandLine::getSingleton().parse(argc, argv);

    std::string clientCertificateFile           = clCcert;
    std::string clientCertificateFilePassword   = clCpwd;
    std::string serverCertificateFile           = clScert;
    std::string serverCertificateFilePassword   = clSpwd;

    int calls           = clCalls;
    int callsLarge      = clCallsLarge;
    int requestedTest   = clTest;
    int currentTest     = 0;

    util::PortNumbers::getSingleton().setCurrent(clPort);
    util::PortNumbers::getSingleton().setIp(clIp);

    if (clServer && clClient)
    {
        gServer = true;
        gClient = true;
    }
    else if (clServer)
    {
        gServer = true;
        gClient = false;
    }
    else if (clClient)
    {
        gServer = false;
        gClient = true;
    }

    if (requestedTest == 0)
    {
        // test all serialization protocols over a TCP transport
        for (int protocol=0; protocol<10; ++protocol)
        {
            if (RCF::isSerializationProtocolSupported(protocol))
            {
                RCF::EndpointPair endpointPair = RCF::TcpEndpointFactory().createEndpointPair();
                RCF::ClientTransportAutoPtr clientTransportPtr(endpointPair.first->createClientTransport());
                RCF::ServerTransportPtr serverTransportPtr(endpointPair.second->createServerTransport().release());
                std::string title = "TcpEndpoint transport: ";

                runPerformanceTest(
                    title,
                    clientTransportPtr,
                    serverTransportPtr,
                    RCF::Twoway,
                    "",
                    protocol,
                    std::vector<RCF::FilterFactoryPtr>(),
                    std::vector<RCF::FilterPtr>(),
                    std::vector<RCF::FilterPtr>(),
                    calls);
            }
        }
    }

    // test tcp network performance, without RCF involvement
    ++currentTest;
    if (requestedTest == 0 || requestedTest == currentTest)
    {
        std::string ip = util::PortNumbers::getSingleton().getIp();
        int port = util::PortNumbers::getSingleton().getNext();
        int buflen = clRawMessageSize;
        std::string title = "Test " + boost::lexical_cast<std::string>(currentTest) + ": ";
        if (gServer && gClient)
        {
            RCF::ThreadPtr threadPtr( new RCF::Thread( boost::bind(&testTcp, title, ip, port, true, false, calls, buflen)));
            Platform::OS::SleepMs(1000);
            testTcp(title, ip, port, false, true, calls, buflen);
            threadPtr->join();
        }
        else if (gServer)
        {
            testTcp(title, ip, port, true, false, calls, buflen);

        }
        else if (gClient)
        {
            testTcp(title, ip, port, false, true, calls, buflen);
        }
    }

    // test udp network performance, without RCF involvement
    ++currentTest;
    if (requestedTest == 0 || requestedTest == currentTest)
    {
        std::string ip = util::PortNumbers::getSingleton().getIp();
        int port = util::PortNumbers::getSingleton().getNext();
        int buflen = clRawMessageSize;
        std::string title = "Test " + boost::lexical_cast<std::string>(currentTest) + ": ";
        if (gServer && gClient)
        {
            RCF::ThreadPtr threadPtr( new RCF::Thread( boost::bind(&testUdp, title, ip, port, true, false, calls, buflen)));
            Platform::OS::SleepMs(1000);
            testUdp(title, ip, port, false, true, calls, buflen);
            threadPtr->join();
        }
        else if (gServer)
        {
            testUdp(title, ip, port, true, false, calls, buflen);

        }
        else if (gClient)
        {
            testUdp(title, ip, port, false, true, calls, buflen);
        }
    }

    // test all transports, against the standard serialization protocol
    for (unsigned int i=0; i<RCF::getTransportFactories().size(); ++i)
    {
        RCF::TransportFactoryPtr transportFactoryPtr = RCF::getTransportFactories()[i];
        std::pair<RCF::ServerTransportPtr, RCF::ClientTransportAutoPtrPtr> transports = transportFactoryPtr->createTransports();
        RCF::ServerTransportPtr serverTransportPtr( transports.first );
        RCF::ClientTransportAutoPtr clientTransportAutoPtr( *transports.second );

        std::string s0 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
        s0 = s0 + s0;
        s0 = s0 + s0;
        s0 = s0 + s0;

        int serializationProtocol = 1;

        ++currentTest;
        if (requestedTest == 0 || requestedTest == currentTest)
        {
            std::string title = "Test " + boost::lexical_cast<std::string>(currentTest) + ": ";
            runPerformanceTest(
                title,
                clientTransportAutoPtr,
                serverTransportPtr,
                RCF::Twoway,
                s0,
                serializationProtocol,
                std::vector<RCF::FilterFactoryPtr>(),//filterFactories,
                std::vector<RCF::FilterPtr>(),
                std::vector<RCF::FilterPtr>(),
                calls);
        }

    }

    std::string s0 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
    s0 = s0 + s0;
    s0 = s0 + s0;
    s0 = s0 + s0;

    int serializationProtocol = clProtocol;

    // test compression and encryption filters, over TCP
    {
        std::vector<std::vector<RCF::FilterPtr> > payloadFilterChains;
        std::vector<RCF::FilterPtr> payloadFilterChain;

#ifdef RCF_USE_ZLIB
        payloadFilterChain.clear();
        payloadFilterChain.push_back( RCF::FilterPtr(new RCF::ZlibStatefulCompressionFilter()) );
        payloadFilterChains.push_back(payloadFilterChain);

        payloadFilterChain.clear();
        payloadFilterChain.push_back( RCF::FilterPtr(new RCF::ZlibStatelessCompressionFilter()) );
        payloadFilterChains.push_back(payloadFilterChain);
#endif

        std::vector<std::vector<RCF::FilterPtr> > transportFilterChains;
        std::vector<RCF::FilterPtr> transportFilterChain;

#ifdef RCF_USE_ZLIB
        transportFilterChain.clear();
        transportFilterChain.push_back( RCF::FilterPtr(new RCF::ZlibStatefulCompressionFilter()) );
        transportFilterChains.push_back(transportFilterChain);

        transportFilterChain.clear();
        transportFilterChain.push_back( RCF::FilterPtr(new RCF::ZlibStatelessCompressionFilter()) );
        transportFilterChains.push_back(transportFilterChain);
#endif

#ifdef RCF_USE_OPENSSL
        transportFilterChain.clear();
        transportFilterChain.push_back( RCF::FilterPtr(new RCF::OpenSslEncryptionFilter(clientCertificateFile, clientCertificateFilePassword)) );
        transportFilterChains.push_back(transportFilterChain);
#endif

#if defined(RCF_USE_ZLIB) && defined(RCF_USE_OPENSSL)
        transportFilterChain.clear();
        transportFilterChain.push_back( RCF::FilterPtr(new RCF::ZlibStatelessCompressionFilter()) );
        transportFilterChain.push_back( RCF::FilterPtr(new RCF::OpenSslEncryptionFilter(clientCertificateFile, clientCertificateFilePassword)) );
        transportFilterChains.push_back(transportFilterChain);

        transportFilterChain.clear();
        transportFilterChain.push_back( RCF::FilterPtr(new RCF::ZlibStatefulCompressionFilter()) );
        transportFilterChain.push_back( RCF::FilterPtr(new RCF::OpenSslEncryptionFilter(clientCertificateFile, clientCertificateFilePassword)) );
        transportFilterChains.push_back(transportFilterChain);
#endif

#ifdef BOOST_WINDOWS
        transportFilterChain.clear();
        transportFilterChain.push_back( RCF::FilterPtr(new RCF::SspiNtlmFilter()));
        transportFilterChains.push_back(transportFilterChain);
#endif

        std::vector<RCF::FilterFactoryPtr> filterFactories;

#ifdef RCF_USE_ZLIB
        filterFactories.push_back(RCF::FilterFactoryPtr( new RCF::ZlibStatelessCompressionFilterFactory() ));
        filterFactories.push_back(RCF::FilterFactoryPtr( new RCF::ZlibStatefulCompressionFilterFactory() ));
#endif

#ifdef RCF_USE_OPENSSL
        filterFactories.push_back(RCF::FilterFactoryPtr( new RCF::OpenSslEncryptionFilterFactory(serverCertificateFile, serverCertificateFilePassword) ));
#endif

#ifdef BOOST_WINDOWS
        filterFactories.push_back(RCF::FilterFactoryPtr( new RCF::SspiNtlmFilterFactory()));
#endif

        // test payload filters
        for (unsigned int i=0; i<payloadFilterChains.size(); ++i)
        {
            RCF::TcpEndpoint tcpEndpoint("localhost", util::PortNumbers::getSingleton().getNext());
            RCF::ServerTransportPtr serverTransportPtr( tcpEndpoint.createServerTransport().release() );
            RCF::ClientTransportAutoPtr clientTransportAutoPtr( tcpEndpoint.createClientTransport() );

            ++currentTest;
            if (requestedTest == 0 || requestedTest == currentTest)
            {
                std::string title = "Test " + boost::lexical_cast<std::string>(currentTest) + ": ";
                runPerformanceTest(
                    title,
                    clientTransportAutoPtr,
                    serverTransportPtr,
                    RCF::Twoway,
                    s0,
                    serializationProtocol,
                    filterFactories,
                    payloadFilterChains[i],
                    std::vector<RCF::FilterPtr>(),
                    calls);
            }
        }

        // test transport filters
        for (unsigned int i=0; i<transportFilterChains.size(); ++i)
        {
            RCF::TcpEndpoint tcpEndpoint("localhost", util::PortNumbers::getSingleton().getNext());
            RCF::ServerTransportPtr serverTransportPtr( tcpEndpoint.createServerTransport().release() );
            RCF::ClientTransportAutoPtr clientTransportAutoPtr( tcpEndpoint.createClientTransport() );

            ++currentTest;
            if (requestedTest == 0 || requestedTest == currentTest)
            {
                std::string title = "Test " + boost::lexical_cast<std::string>(currentTest) + ": ";
                runPerformanceTest(
                    title,
                    clientTransportAutoPtr,
                    serverTransportPtr,
                    RCF::Twoway,
                    s0,
                    serializationProtocol,
                    filterFactories,
                    std::vector<RCF::FilterPtr>(),
                    transportFilterChains[i],
                    calls);
            }
        }
    }

    // test transmission of large messages
    {
        int len = 1024*1024*clLargeMessageSize;
        std::vector<char> buffer(len);
        memset(&buffer[0], 1, len);
        std::string s0(&buffer[0], len);

        ++currentTest;
        if (requestedTest == 0 || requestedTest == currentTest)
        {
            RCF::TcpEndpoint tcpEndpoint("localhost", util::PortNumbers::getSingleton().getNext());
            RCF::ServerTransportPtr serverTransportPtr( tcpEndpoint.createServerTransport().release() );
            RCF::ClientTransportAutoPtr clientTransportAutoPtr( tcpEndpoint.createClientTransport() );

            serverTransportPtr->setMaxMessageLength(-1);
            clientTransportAutoPtr->setMaxMessageLength(-1);

            unsigned int timeoutMs = 1000*3600;
            std::string title = "Test " + boost::lexical_cast<std::string>(currentTest) + ": ";
            runPerformanceTest(
                title,
                clientTransportAutoPtr,
                serverTransportPtr,
                RCF::Twoway,
                s0,
                serializationProtocol,
                std::vector<RCF::FilterFactoryPtr>(),
                std::vector<RCF::FilterPtr>(),
                std::vector<RCF::FilterPtr>(),
                callsLarge,
                timeoutMs,
                false);
        }
    }

    // test oneway invocations - tcp
    {
        RCF::TcpEndpoint tcpEndpoint("localhost", util::PortNumbers::getSingleton().getNext());
        RCF::ServerTransportPtr serverTransportPtr( tcpEndpoint.createServerTransport().release() );
        RCF::ClientTransportAutoPtr clientTransportAutoPtr( tcpEndpoint.createClientTransport() );

        ++currentTest;
        if (requestedTest == 0 || requestedTest == currentTest)
        {
            std::string title = "Test " + boost::lexical_cast<std::string>(currentTest) + ": ";
            runPerformanceTest(
                title,
                clientTransportAutoPtr,
                serverTransportPtr,
                RCF::Oneway,
                s0,
                serializationProtocol,
                std::vector<RCF::FilterFactoryPtr>(),
                std::vector<RCF::FilterPtr>(),
                std::vector<RCF::FilterPtr>(),
                calls);
        }
    }

    // test oneway invocations - udp
    {
        RCF::UdpEndpoint udpEndpoint("127.0.0.1", util::PortNumbers::getSingleton().getNext());
        RCF::ServerTransportPtr serverTransportPtr( udpEndpoint.createServerTransport().release() );
        RCF::ClientTransportAutoPtr clientTransportAutoPtr( udpEndpoint.createClientTransport() );

        ++currentTest;
        if (requestedTest == 0 || requestedTest == currentTest)
        {
            std::string title = "Test " + boost::lexical_cast<std::string>(currentTest) + ": ";
            runPerformanceTest(
                title,
                clientTransportAutoPtr,
                serverTransportPtr,
                RCF::Oneway,
                s0,
                serializationProtocol,
                std::vector<RCF::FilterFactoryPtr>(),
                std::vector<RCF::FilterPtr>(),
                std::vector<RCF::FilterPtr>(),
                calls);
        }
    }

    return boost::exit_success;
}



By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Australia Australia
Software developer, from Sweden and now living in Canberra, Australia, working on distributed C++ applications. When he is not programming, Jarl enjoys skiing and playing table tennis. He derives immense satisfaction from referring to himself in third person.

Comments and Discussions