#include <vector>
#include <boost/test/minimal.hpp>
#include <RCF/Idl.hpp>
#include <RCF/RcfServer.hpp>
#include <RCF/PublishingService.hpp>
#include <RCF/SubscriptionService.hpp>
#include <RCF/test/TransportFactories.hpp>
#include <RCF/util/CommandLine.hpp>
#include <RCF/util/Platform/OS/Sleep.hpp>
// some weird problems with linux, maybe emanating from within asio? Something is causing a SIGABRT signal.
#ifdef __linux__
#include <signal.h>
#include <RCF/util/InitDeinit.hpp>
UTIL_ON_INIT( sigignore(SIGABRT) )
#endif
RCF_BEGIN(I_Events, "I_Events")
RCF_METHOD_V0(void, onA)
RCF_METHOD_V1(void, onB, int)
RCF_METHOD_V2(void, onC, int, const std::string &)
RCF_END(I_Events)
class Events
{
public:
Events() : nA(), nB(), nC()
{}
void onA()
{
nA++;
}
void onB(int n)
{
nB++;
}
void onC(int n, std::string s)
{
nC++;
}
int nA;
int nB;
int nC;
};
int test_main(int argc, char **argv)
{
util::CommandLine::getSingleton().parse(argc, argv);
for (unsigned int i=0; i<RCF::getTransportFactories().size(); ++i)
{
RCF::TransportFactoryPtr transportFactoryPtr;
std::pair<RCF::ServerTransportPtr, RCF::ClientTransportAutoPtrPtr> transports;
transportFactoryPtr = RCF::getTransportFactories()[i];
transports = transportFactoryPtr->createTransports();
RCF::ServerTransportPtr publisherServerTransportPtr( transports.first );
RCF::ClientTransportAutoPtr publisherClientTransportAutoPtr( *transports.second );
transportFactoryPtr = RCF::getTransportFactories()[i];
transports = transportFactoryPtr->createTransports();
RCF::ServerTransportPtr subscriberServerTransportPtr( transports.first );
RCF::ClientTransportAutoPtr subscriberClientTransportAutoPtr( *transports.second );
RCF::RcfServer publisher(publisherServerTransportPtr);
RCF::RcfServer subscriber(subscriberServerTransportPtr);
// need both I_ServerTransportEx and I_ServerTransportSessionFull for publish/subscribe functionality
RCF::I_ServerTransportEx *pServerTransportEx = dynamic_cast<RCF::I_ServerTransportEx *>(&publisher.getServerTransport());
if (NULL == pServerTransportEx)
{
continue;
}
RCF::writeTransportTypes(std::cout, *publisherServerTransportPtr, *publisherClientTransportAutoPtr);
RCF::PublishingServicePtr publishingServicePtr(new RCF::PublishingService);
RCF::PublishingService &publishingService = *publishingServicePtr;
publisher.addService(publishingServicePtr);
RCF::SubscriptionServicePtr subscriptionServicePtr(new RCF::SubscriptionService);
RCF::SubscriptionService &subscriptionService = *subscriptionServicePtr;
subscriber.addService(subscriptionServicePtr);
publisher.start();
subscriber.start();
// NB: must call endSubscribe<>(object) before object goes out of scope! ie, object must outlive the subscription. Otherwise we get memory corruption from invoking methods on a dead object.
// TODO: only allow shared_ptr's for subscriptions?
std::vector<Events> events(3);
RCF::EndpointPtr publisherClientEndpointPtr(publisherClientTransportAutoPtr->getEndpointPtr());
for (int j=0; j<3;++j)
{
publishingService.beginPublish<I_Events>();
for (unsigned int k=0; k<events.size(); ++k)
{
events[k] = Events();
subscriptionService.beginSubscribe<I_Events>(events[k], *publisherClientEndpointPtr);
}
Platform::OS::Sleep(1); // give the server time to setup the subscriptions
for (unsigned int k=0; k<events.size(); ++k)
{
subscriptionService.endSubscribe<I_Events>(events[k]);
publishingService.publish<I_Events>().onA();
publishingService.publish<I_Events>().onB(1);
publishingService.publish<I_Events>().onC(1, "one");
Platform::OS::Sleep(1); // give the subscribers time to receive the notifications
}
publishingService.endPublish<I_Events>();
for (unsigned int k=0; k<events.size(); ++k)
{
BOOST_CHECK(events[k].nA == k);
BOOST_CHECK(events[k].nB == k);
BOOST_CHECK(events[k].nC == k);
}
}
events.resize(100);
for (int j=0; j<5; ++j)
{
publishingService.beginPublish<I_Events>();
for (unsigned int k=0; k<events.size(); ++k)
{
events[k] = Events();
subscriptionService.beginSubscribe<I_Events>(events[k], *publisherClientEndpointPtr);
}
for (unsigned int k=0; k<events.size()/2; ++k)
{
subscriptionService.endSubscribe<I_Events>(events[k]);
publishingService.publish<I_Events>().onA();
publishingService.publish<I_Events>().onB(1);
publishingService.publish<I_Events>().onC(1, "one");
}
for (unsigned int k=events.size()/2; k<events.size(); ++k)
{
subscriptionService.endSubscribe<I_Events>(events[k]);
}
publishingService.endPublish<I_Events>();
}
subscriber.stop();
publisher.stop();
}
return boost::exit_success;
}