#include <string>
#include <boost/lexical_cast.hpp>
#include <RCF/test/TestMinimal.hpp>
#include <RCF/Idl.hpp>
#include <RCF/RcfServer.hpp>
#include <RCF/test/EndpointFactories.hpp>
#include <RCF/test/ThreadGroup.hpp>
#include <RCF/test/TransportFactories.hpp>
#include <RCF/test/WithStopServer.hpp>
#include <RCF/util/CommandLine.hpp>
#include <RCF/util/Profile.hpp>
namespace Test_ServerTransportConcurrency {
class Echo : public RCF::WithStopServer
{
public:
std::string echo(const std::string &s)
{
return s;
}
};
RCF_BEGIN(I_Echo, "I_Echo")
RCF_METHOD_R1(std::string, echo, const std::string &)
RCF_METHOD_V0(void, stopServer)
RCF_END(I_Echo)
typedef RcfClient<I_Echo> Client;
typedef boost::shared_ptr<Client> ClientPtr;
void clientThreadTask(const std::vector<ClientPtr> &clients, const std::string &msg, bool random, int i0, int i1)
{
for (int j=i0; j<i1; ++j)
{
if (random)
{
int idx = rand() % (i1-i0);
BOOST_CHECK( msg == clients[i0 + idx]->echo(msg) );
}
else
{
BOOST_CHECK( msg == clients[j]->echo(msg) );
}
}
}
ThreadGroup createClientThreads(int threads, const std::vector<ClientPtr> &clients, const std::string &msg, bool random)
{
int calls = clients.size();
ThreadGroup threadGroup;
for (int i=0; i<threads; ++i)
{
int threadCalls = calls/threads;
int j0 = i*threadCalls;
int j1 = (i+1)*threadCalls;
if (i == threads-1)
{
j1 = calls;
}
bool random = false;
threadGroup.push_back( ThreadPtr( new Thread( boost::bind(clientThreadTask, clients, msg, random, j0, j1))));
}
return threadGroup;
}
} // namespace Test_ServerTransportConcurrency
int RCF_TEST_MAIN(int argc, char **argv)
{
printTestHeader(__FILE__);
using namespace Test_ServerTransportConcurrency;
util::CommandLineOption<bool> clServer("server", false, "act as server");
util::CommandLineOption<bool> clClient("client", false, "act as client");
util::CommandLineOption<std::string> clIp("ip", util::PortNumbers::getSingleton().getIp(), "ip");
util::CommandLineOption<int> clPort("port", util::PortNumbers::getSingleton().getCurrent(), "port");
util::CommandLineOption<int> clCalls("calls", 1000, "number of calls");
util::CommandLineOption<int> clThreads("threads", 1, "number of threads making client calls");
util::CommandLine::getSingleton().parse(argc, argv);
// TODO: make ip and port options part of util::PortNumbers
util::PortNumbers::getSingleton().setCurrent(clPort);
util::PortNumbers::getSingleton().setIp(clIp);
bool bServer = true;
bool bClient = true;
if (clServer && clClient)
{
bServer = true;
bClient = true;
}
else if (clServer)
{
bServer = true;
bClient = false;
}
else if (clClient)
{
bServer = false;
bClient = true;
}
for (unsigned int i=0; i<RCF::getTransportFactories().size(); ++i)
{
RCF::TransportFactoryPtr transportFactoryPtr = RCF::getTransportFactories()[i];
std::pair<RCF::ServerTransportPtr, RCF::ClientTransportAutoPtrPtr> transports = transportFactoryPtr->createTransports();
RCF::ServerTransportPtr serverTransportPtr( transports.first );
RCF::ClientTransportAutoPtr clientTransportAutoPtr( *transports.second );
std::ostringstream ostr;
RCF::writeTransportTypes(ostr, *serverTransportPtr, *clientTransportAutoPtr);
std::string transportDesc = ostr.str();
RCF::writeTransportTypes(std::cout, *serverTransportPtr, *clientTransportAutoPtr);
std::string msg = "payload";
int calls = clCalls;
int threads = clThreads;
std::string ip = "localhost";
int port = 50001;
RCF_ASSERT(threads <= calls)(threads)(calls);
std::cout << "Calls: " << calls << std::endl;
std::cout << "Threads: " << threads << std::endl;
Echo echo;
RCF::RcfServer server(serverTransportPtr);
server.bind( (I_Echo*) 0, echo);
if (bServer)
{
if (!bClient)
{
RCF::I_IpServerTransport &ipServerTransport = dynamic_cast<RCF::I_IpServerTransport &>(*serverTransportPtr);
ipServerTransport.setNetworkInterface("0.0.0.0");
}
server.start();
}
else
{
Platform::OS::SleepMs(1000);
}
// all requests on 1 client
if (bClient)
{
Client client(clientTransportAutoPtr->clone());
BOOST_CHECK( msg == client.echo(msg) );
{
std::string title = transportDesc + ": " +
boost::lexical_cast<std::string>(calls) + " call(s), 1 client(s)";
util::Profile profile(title);
for (int i=0; i<calls; ++i)
{
BOOST_CHECK( msg == client.echo(msg) );
}
}
// 1 request on all clients, a) in order, and b) in random order
std::vector<ClientPtr> clients;
for (int i=0; i<calls; ++i)
{
clients.push_back( ClientPtr(new Client(clientTransportAutoPtr->clone())));
}
RCF_ASSERT(clients.size() == calls);
for (int j=0; j<clients.size(); ++j)
{
BOOST_CHECK( msg == clients[j]->echo(msg) );
}
{
std::string title = transportDesc + ": 1 call(s), " +
boost::lexical_cast<std::string>(calls) + " client(s): " +
boost::lexical_cast<std::string>(threads) + " threads(s)";
util::Profile profile(title);
joinThreadGroup(createClientThreads(threads, clients, msg, false));
}
{
srand(RCF::getCurrentTimeMs());
std::string title = transportDesc + ": 1 call(s), " +
boost::lexical_cast<std::string>(calls) + " client(s): " +
boost::lexical_cast<std::string>(threads) + " threads(s): randomized";
util::Profile profile(title);
joinThreadGroup(createClientThreads(threads, clients, msg, true));
}
RcfClient<I_Echo>(clientTransportAutoPtr->clone()).stopServer();
}
if (bServer)
{
echo.wait();
server.stop();
}
}
return boost::exit_success;
}