#include <string>
#include <boost/config.hpp>
#include <boost/shared_ptr.hpp>
#include <RCF/test/TestMinimal.hpp>
#include <RCF/Idl.hpp>
#include <RCF/ObjectFactoryService.hpp>
#include <RCF/RcfServer.hpp>
#include <RCF/ThreadManager.hpp>
#include <RCF/test/EndpointFactories.hpp>
#include <RCF/test/TransportFactories.hpp>
#include <RCF/test/ThreadGroup.hpp>
#include <RCF/util/CommandLine.hpp>
#include <RCF/util/Profile.hpp>
#include <RCF/util/Platform/OS/ThreadId.hpp>
#include <RCF/util/Platform/OS/Sleep.hpp>
#ifdef RCF_USE_BOOST_ASIO
#include <RCF/TcpAsioServerTransport.hpp>
#endif
namespace Test_ThreadManager {
std::vector<RCF::MutexPtr> serverMutexes;
std::vector<bool> clientOk;
RCF_BEGIN(I_Wait, "I_Wait")
RCF_METHOD_V1(void, wait, int)
RCF_METHOD_R0(int, ping)
RCF_END(I_Wait)
class Wait
{
public:
Wait() : mNext(RCF_DEFAULT_INIT)
{}
void wait(int whichMutex)
{
RCF::getThreadInfoPtr()->mThreadManagerPtr->notifyBusy();
RCF::Lock lock(*serverMutexes[whichMutex]);
}
int ping()
{
return ++mNext;
}
private:
int mNext;
};
class WaitClient : public RcfClient<I_Wait>
{
public:
WaitClient(unsigned int timeoutMs = 10*1000) :
RcfClient<I_Wait>( RCF::ClientTransportAutoPtr(
new RCF::TcpClientTransport("localhost", 50001)))
{
getClientStub().setRemoteCallTimeoutMs(timeoutMs);
}
};
void clientWaitTask(int id)
{
try
{
WaitClient(1000*60*10).wait(id); // 10 minute timeout
clientOk[id] = true;
}
catch(...)
{
}
}
int ret;
void pingServer()
{
try
{
BOOST_CHECK( WaitClient(1000*2).ping() == ++ret );
}
catch(RCF::Exception &e)
{
BOOST_CHECK(1==0);
}
}
void runTest1(RCF::RcfServer &server, int concurrencyCount)
{
serverMutexes.resize(concurrencyCount);
for (unsigned int i=0; i<serverMutexes.size(); ++i)
{
serverMutexes[i].reset( new RCF::Mutex );
}
clientOk.resize(concurrencyCount);
for (unsigned int i=0; i<clientOk.size(); ++i)
{
clientOk[i] = false;
}
std::vector< RCF::LockPtr > serverLocks(concurrencyCount);
std::vector< RCF::ThreadPtr > clientThreads(concurrencyCount);
ret = 0;
Wait wait;
server.bind( (I_Wait*) 0, wait);
server.start();
for (unsigned int j=0; j<5; ++j)
{
// lock all mutexes and initialize ok table
for (int i=0; i<concurrencyCount; ++i)
{
serverLocks[i] = RCF::LockPtr( new RCF::Lock(*serverMutexes[i]));
clientOk[i] = false;
}
// start concurrencyCount client requests, all of which will block
for (unsigned int i=0; i<clientThreads.size(); ++i)
{
clientThreads[i].reset( new RCF::Thread( boost::bind(clientWaitTask, i) ) );
}
Platform::OS::Sleep(2);
pingServer();
// join the first half of the clients, one at a time
for (unsigned int i=0; i<concurrencyCount/2; ++i)
{
pingServer();
BOOST_CHECK(clientOk[i] == false);
serverLocks[i]->unlock();
while (clientOk[i] == false);
BOOST_CHECK(clientOk[i] == true);
pingServer();
}
// join the second half of the clients, all at once
pingServer();
for (unsigned int i=concurrencyCount/2; i<concurrencyCount; ++i)
{
BOOST_CHECK(clientOk[i] == false);
}
pingServer();
for (unsigned int i=concurrencyCount/2; i<concurrencyCount; ++i)
{
serverLocks[i]->unlock();
}
pingServer();
for (unsigned int i=concurrencyCount/2; i<concurrencyCount; ++i)
{
while (clientOk[i] == false);
BOOST_CHECK(clientOk[i] == true);
}
pingServer();
Platform::OS::Sleep(1);
}
server.stop();
server.unbind( (I_Wait*) 0);
}
RCF_BEGIN(I_X, "I_X")
RCF_METHOD_R0(int, ping)
RCF_METHOD_R1(int, wait, unsigned int /*waitMs*/)
RCF_METHOD_R0(int, pingBusy)
RCF_METHOD_R1(int, waitBusy, unsigned int /*waitMs*/)
RCF_END(I_X)
RCF_BEGIN(I_Y, "I_Y")
RCF_METHOD_R0(int, ping)
RCF_METHOD_R1(int, wait, unsigned int /*waitMs*/)
RCF_METHOD_R0(int, pingBusy)
RCF_METHOD_R1(int, waitBusy, unsigned int /*waitMs*/)
RCF_END(I_Y)
class X
{
public:
int ping()
{
return 17;
}
int wait(unsigned int waitMs)
{
// TODO: platform independent sleep
Platform::OS::SleepMs(waitMs);
return 17;
}
int pingBusy()
{
RCF::getThreadInfoPtr()->mThreadManagerPtr->notifyBusy();
return 17;
}
int waitBusy(unsigned int waitMs)
{
RCF::getThreadInfoPtr()->mThreadManagerPtr->notifyBusy();
// TODO: platform independent sleep
Platform::OS::SleepMs(waitMs);
return 17;
}
};
typedef X Y;
RCF::Mutex ioMutex;
void clientLoadTest(unsigned int clientReps, unsigned int maxDelayMs)
{
RcfClient<I_X> client( RCF::TcpEndpoint("127.0.0.1", 50001) );
client.getClientStub().setRemoteCallTimeoutMs(1000*20); // 20s timeout
for (unsigned int i=0; i<clientReps; ++i)
{
if ((i*100) % (20*clientReps) == 0)
{
int threadId = Platform::OS::GetCurrentThreadId();
RCF::Lock lock(ioMutex);
std::cout << "Client thread #" << threadId << ": progress: " << i*100/clientReps << "%\n";
}
unsigned int whichTask = rand()%4;
unsigned int delayMs = maxDelayMs > 0 ? rand()%maxDelayMs : 0; // random millisecond delays of up to 1 seconds
//BOOST_CHECK(client.waitBusy(delayMs) == 17);
switch (whichTask)
{
case 0:
BOOST_CHECK(client.ping() == 17);
break;
case 1:
BOOST_CHECK(client.pingBusy() == 17);
break;
case 2:
BOOST_CHECK(client.wait(delayMs) == 17);
break;
case 3:
BOOST_CHECK(client.waitBusy(delayMs) == 17);
break;
// TODO: more tasks, e.g. disconnect/connect, etc
default:
RCF_ASSERT(0)(whichTask);
}
}
}
void clientObjectLoadTest(unsigned int clientReps, unsigned int maxDelayMs)
{
RcfClient<I_Y> client( RCF::TcpEndpoint("127.0.0.1", 50001) );
client.getClientStub().setRemoteCallTimeoutMs(1000*20); // 20s timeout
//bool ok = RCF::createRemoteObject<I_Y>(client);
bool ok = tryCreateRemoteObject<I_Y>(client);
BOOST_CHECK(ok);
for (unsigned int i=0; i<clientReps; ++i)
{
if ((i*100) % (20*clientReps) == 0)
{
int threadId = Platform::OS::GetCurrentThreadId();
RCF::Lock lock(ioMutex);
std::cout << "Client thread #" << threadId << ": progress: " << i*100/clientReps << "%\n";
}
unsigned int whichTask = rand()%4;
unsigned int delayMs = maxDelayMs > 0 ? rand()%maxDelayMs : 0; // random millisecond delays of up to 1 seconds
BOOST_CHECK(client.waitBusy(delayMs) == 17);
switch (whichTask)
{
case 0:
BOOST_CHECK(client.ping() == 17);
break;
case 1:
BOOST_CHECK(client.pingBusy() == 17);
break;
case 2:
BOOST_CHECK(client.wait(delayMs) == 17);
break;
case 3:
BOOST_CHECK(client.waitBusy(delayMs) == 17);
break;
// TODO: more tasks, e.g. disconnect/connect, etc
default:
RCF_ASSERT(0)(whichTask);
}
}
}
// TODO
//void connectionLoadTest()
//{
//
//}
void runTest2(RCF::RcfServer &server, int concurrencyCount, unsigned int clientReps, unsigned int maxDelayMs)
{
// now try a more realistic load test
boost::shared_ptr<util::ImmediateProfile> immmediateProfilePtr;
// bind I_X
X x;
server.bind( (I_X*) 0, x);
// bind I_Y to object factory
RCF::ObjectFactoryServicePtr objectFactoryServicePtr( new RCF::ObjectFactoryService(10, 60));
objectFactoryServicePtr->bind( (I_Y*) 0, (Y**) 0);
server.addService( RCF::ServicePtr(objectFactoryServicePtr));
server.start();
ThreadGroup threadGroup;
// calls to I_X
{
util::ImmediateProfile profile("load test: all clients making static object calls");
for (int i=0; i<concurrencyCount; ++i)
{
threadGroup.push_back( ThreadPtr( new Thread( boost::bind(clientLoadTest, clientReps, maxDelayMs))));
}
joinThreadGroup(threadGroup);
}
// calls to I_Y
threadGroup.clear();
{
util::ImmediateProfile profile("load test: all clients making dynamic object calls");
for (int i=0;i<concurrencyCount; ++i)
{
threadGroup.push_back( ThreadPtr( new Thread( boost::bind(clientObjectLoadTest, clientReps, maxDelayMs))));
}
joinThreadGroup(threadGroup);
}
// mixed
threadGroup.clear();
{
util::ImmediateProfile profile("load test: half/half clients making static/dynamic object calls");
for (int i=0; i<concurrencyCount; ++i)
{
switch(i%2)
{
case 0: threadGroup.push_back( ThreadPtr( new Thread( boost::bind(clientLoadTest, clientReps, maxDelayMs)))); break;
case 1: threadGroup.push_back( ThreadPtr( new Thread( boost::bind(clientObjectLoadTest, clientReps, maxDelayMs)))); break;
default: RCF_ASSERT(0);
}
}
joinThreadGroup(threadGroup);
}
// TODO: more threads doing server side things (adding and removing bindings, publishing, subscribing, etc)
// TODO: more threads exercising object creation and timeout on the server
server.stop();
}
void runTests(RCF::RcfServer &server, unsigned int concurrencyCount, unsigned int clientReps)
{
//runTest1(server, concurrencyCount);
//unsigned int clientReps = 500; // add some zeros to do an endurance test
unsigned int maxDelayMs = 0;
runTest2(server, concurrencyCount, clientReps, maxDelayMs);
}
std::size_t sThreadInitCount = 0;
std::size_t sThreadDeinitCount = 0;
void onThreadInit()
{
++sThreadInitCount;
}
void onThreadDeinit()
{
++sThreadDeinitCount;
}
} // namespace Test_ThreadManager
int RCF_TEST_MAIN(int argc, char **argv)
{
printTestHeader(__FILE__);
using namespace Test_ThreadManager;
util::CommandLineOption<unsigned int> concurrencyCount("concurrencyCount", 5, "how many concurrent client threads to spawn");
util::CommandLineOption<unsigned int> clientReps("clientReps", 500, "how many iterations an individual client goes through");
util::CommandLine::getSingleton().parse(argc, argv);
#ifdef RCF_USE_BOOST_ASIO
// Asio server transport w/ dynamic thread pool
{
RCF::RcfServer server( RCF::ServerTransportPtr( new RCF::TcpAsioServerTransport(50001)));
RCF::DynamicThreadPoolPtr dynamicThreadPoolPtr( new RCF::DynamicThreadPool(1, 200) );
server.getServerTransportService().getTaskEntries()[0].setThreadManagerPtr( dynamicThreadPoolPtr );
runTests(server, concurrencyCount, clientReps);
}
#endif
#ifdef BOOST_WINDOWS
// Iocp server transport w/ dynamic thread pool
{
util::ProfilingResults::getSingleton().getTraceChannel().trace("Iocp server transport w/ dynamic thread pool\n");
RCF::RcfServer server( RCF::ServerTransportPtr( new RCF::TcpIocpServerTransport(50001)));
RCF::DynamicThreadPoolPtr dynamicThreadPoolPtr( new RCF::DynamicThreadPool(1, 200) );
server.getServerTransportService().getTaskEntries()[0].setThreadManagerPtr( dynamicThreadPoolPtr );
runTests(server, concurrencyCount, clientReps);
}
#endif
#ifdef RCF_USE_BOOST_ASIO
// Asio server transport w/ fixed thread pool
{
RCF::RcfServer server( RCF::ServerTransportPtr( new RCF::TcpAsioServerTransport(50001)));
RCF::FixedThreadPoolPtr fixedThreadPoolPtr( new RCF::FixedThreadPool(concurrencyCount+1) );
fixedThreadPoolPtr->addThreadInitFunctor(onThreadInit);
fixedThreadPoolPtr->addThreadDeinitFunctor(onThreadDeinit);
sThreadInitCount = sThreadDeinitCount = 0;
server.getServerTransportService().getTaskEntries()[0].setThreadManagerPtr( fixedThreadPoolPtr );
runTests(server, concurrencyCount, clientReps);
BOOST_CHECK(sThreadInitCount == sThreadDeinitCount);
BOOST_CHECK(sThreadInitCount == concurrencyCount+1);
}
#endif
#ifdef BOOST_WINDOWS
// Iocp server transport w/ fixed thread pool
{
util::ProfilingResults::getSingleton().getTraceChannel().trace("Iocp server transport w/ fixed thread pool\n");
RCF::RcfServer server( RCF::ServerTransportPtr( new RCF::TcpIocpServerTransport(50001)));
RCF::FixedThreadPoolPtr fixedThreadPoolPtr( new RCF::FixedThreadPool(concurrencyCount+1) );
fixedThreadPoolPtr->addThreadInitFunctor( &onThreadInit);
fixedThreadPoolPtr->addThreadDeinitFunctor( &onThreadDeinit);
sThreadInitCount = sThreadDeinitCount = 0;
server.getServerTransportService().getTaskEntries()[0].setThreadManagerPtr( fixedThreadPoolPtr );
runTests(server, concurrencyCount, clientReps);
BOOST_CHECK(sThreadInitCount == sThreadDeinitCount);
BOOST_CHECK(sThreadInitCount == concurrencyCount+1);
}
#endif
return boost::exit_success;
}