Click here to Skip to main content
15,888,802 members
Articles / Programming Languages / C++

RCF - Interprocess Communication for C++

Rate me:
Please Sign up or sign in to vote.
4.94/5 (147 votes)
25 Oct 2011CPOL20 min read 4.6M   8.4K   331  
A server/client IPC framework, using the C++ preprocessor as an IDL compiler.
#include <vector>

#include <boost/test/minimal.hpp>

#include <RCF/Idl.hpp>
#include <RCF/RcfServer.hpp>
#include <RCF/PublishingService.hpp>
#include <RCF/SubscriptionService.hpp>
#include <RCF/test/TransportFactories.hpp>
#include <RCF/util/CommandLine.hpp>
#include <RCF/util/Platform/OS/Sleep.hpp>

// some weird problems with linux, maybe emanating from within asio? Something is causing a SIGABRT signal.
#ifdef __linux__
#include <signal.h>
#include <RCF/util/InitDeinit.hpp>
UTIL_ON_INIT( sigignore(SIGABRT) )
#endif

RCF_BEGIN(I_Events, "I_Events")
    RCF_METHOD_V0(void, onA)
    RCF_METHOD_V1(void, onB, int)
    RCF_METHOD_V2(void, onC, int, const std::string &)
RCF_END(I_Events)

class Events
{
public:

    Events() : nA(), nB(), nC()
    {}

    void onA()
    {
        nA++;
    }

    void onB(int n)
    {
        nB++;
    }

    void onC(int n, std::string s)
    {
        nC++;
    }

    int nA;
    int nB;
    int nC;

};

int test_main(int argc, char **argv)
{
    util::CommandLine::getSingleton().parse(argc, argv);

    for (unsigned int i=0; i<RCF::getTransportFactories().size(); ++i)
    {
        RCF::TransportFactoryPtr transportFactoryPtr;
        std::pair<RCF::ServerTransportPtr, RCF::ClientTransportAutoPtrPtr> transports;

        transportFactoryPtr = RCF::getTransportFactories()[i];
        transports = transportFactoryPtr->createTransports();
        RCF::ServerTransportPtr publisherServerTransportPtr( transports.first );
        RCF::ClientTransportAutoPtr publisherClientTransportAutoPtr( *transports.second );

        transportFactoryPtr = RCF::getTransportFactories()[i];
        transports = transportFactoryPtr->createTransports();
        RCF::ServerTransportPtr subscriberServerTransportPtr( transports.first );
        RCF::ClientTransportAutoPtr subscriberClientTransportAutoPtr( *transports.second );

        RCF::RcfServer publisher(publisherServerTransportPtr);
        RCF::RcfServer subscriber(subscriberServerTransportPtr);

        // need both I_ServerTransportEx and I_ServerTransportSessionFull for publish/subscribe functionality
        RCF::I_ServerTransportEx *pServerTransportEx = dynamic_cast<RCF::I_ServerTransportEx *>(&publisher.getServerTransport());
        if (NULL == pServerTransportEx)
        {
            continue; 
        }

        RCF::writeTransportTypes(std::cout, *publisherServerTransportPtr, *publisherClientTransportAutoPtr);

        RCF::PublishingServicePtr publishingServicePtr(new RCF::PublishingService);
        RCF::PublishingService &publishingService = *publishingServicePtr;
        publisher.addService(publishingServicePtr);

        RCF::SubscriptionServicePtr subscriptionServicePtr(new RCF::SubscriptionService);
        RCF::SubscriptionService &subscriptionService = *subscriptionServicePtr;
        subscriber.addService(subscriptionServicePtr);

        publisher.start();
        subscriber.start();

        // NB: must call endSubscribe<>(object) before object goes out of scope! ie, object must outlive the subscription. Otherwise we get memory corruption from invoking methods on a dead object.
        // TODO: only allow shared_ptr's for subscriptions?

        std::vector<Events> events(3);

        RCF::EndpointPtr publisherClientEndpointPtr(publisherClientTransportAutoPtr->getEndpointPtr());

        for (int j=0; j<3;++j)
        {
            publishingService.beginPublish<I_Events>();

            for (unsigned int k=0; k<events.size(); ++k)
            {
                events[k] = Events();
                subscriptionService.beginSubscribe<I_Events>(events[k], *publisherClientEndpointPtr);
            }

            Platform::OS::Sleep(1); // give the server time to setup the subscriptions

            for (unsigned int k=0; k<events.size(); ++k)
            {
                subscriptionService.endSubscribe<I_Events>(events[k]);
                publishingService.publish<I_Events>().onA();
                publishingService.publish<I_Events>().onB(1);
                publishingService.publish<I_Events>().onC(1, "one");

                Platform::OS::Sleep(1); // give the subscribers time to receive the notifications
            }

            publishingService.endPublish<I_Events>();

            for (unsigned int k=0; k<events.size(); ++k)
            {
                BOOST_CHECK(events[k].nA == k);
                BOOST_CHECK(events[k].nB == k);
                BOOST_CHECK(events[k].nC == k);
            }
        }

        events.resize(100);

        for (int j=0; j<5; ++j)
        {
            publishingService.beginPublish<I_Events>();
            
            for (unsigned int k=0; k<events.size(); ++k)
            {
                events[k] = Events();
                subscriptionService.beginSubscribe<I_Events>(events[k], *publisherClientEndpointPtr);
            }

            for (unsigned int k=0; k<events.size()/2; ++k)
            {
                subscriptionService.endSubscribe<I_Events>(events[k]);
                publishingService.publish<I_Events>().onA();
                publishingService.publish<I_Events>().onB(1);
                publishingService.publish<I_Events>().onC(1, "one");
            }

            for (unsigned int k=events.size()/2; k<events.size(); ++k)
            {
                subscriptionService.endSubscribe<I_Events>(events[k]);
            }

            publishingService.endPublish<I_Events>();
        }

        subscriber.stop();
        publisher.stop();
    
    }
    
    return boost::exit_success;
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Australia Australia
Software developer, from Sweden and now living in Canberra, Australia, working on distributed C++ applications. When he is not programming, Jarl enjoys skiing and playing table tennis. He derives immense satisfaction from referring to himself in third person.

Comments and Discussions