#include <string>
#include <boost/lexical_cast.hpp>
#include <RCF/test/TestMinimal.hpp>
#include <RCF/FilterService.hpp>
#include <RCF/Idl.hpp>
#include <RCF/RcfServer.hpp>
#include <RCF/TcpEndpoint.hpp>
#include <RCF/test/EndpointFactories.hpp>
#include <RCF/test/TransportFactories.hpp>
#include "Test_Performance.hpp"
#include <RCF/util/AutoBuild.hpp>
#include <RCF/util/CommandLine.hpp>
#include <RCF/util/PortNumbers.hpp>
#include <RCF/util/Profile.hpp>
#ifdef RCF_USE_OPENSSL
#include <RCF/OpenSslEncryptionFilter.hpp>
#endif
#ifdef RCF_USE_ZLIB
#include <RCF/ZlibCompressionFilter.hpp>
#endif
#ifdef BOOST_WINDOWS
#include <RCF/SspiFilter.hpp>
#endif
namespace Test_Performance {
void testUdp(const std::string &title, const std::string &ip, int port, bool server, bool client, int calls, int buflen)
{
RCF_ASSERT(!(server && client));
int ret = 0;
sockaddr_in clientAddr = {0};
clientAddr.sin_family = AF_INET;
clientAddr.sin_port = 0;
clientAddr.sin_addr.s_addr = inet_addr(ip.c_str());//INADDR_ANY;
sockaddr_in serverAddr = {0};
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(port);
serverAddr.sin_addr.s_addr = inet_addr(ip.c_str());//INADDR_ANY;
sockaddr_in toAddr = {0};
toAddr.sin_family = AF_INET;
toAddr.sin_port = htons(port);
toAddr.sin_addr.s_addr = inet_addr(ip.c_str());
sockaddr_in fromAddr = {0};
int fromAddrLen = sizeof(fromAddr);
int socket1 = socket( AF_INET, SOCK_DGRAM, 0 );
int packetsSent = 0;
int packetsReceived = 0;
int bytesSent = 0;
int bytesReceived = 0;
if (client)
{
ret = bind(socket1, (sockaddr *)&clientAddr, sizeof(clientAddr));
RCF_VERIFY(ret == 0, std::runtime_error("bind()"));
std::string msg(buflen, 'A');
std::vector<char> buffer(buflen);
{
util::Profile profile( title + "raw udp round trips, " + boost::lexical_cast<std::string>(calls) + " calls");
for (int i=0; i<calls; ++i)
{
ret = Platform::OS::BsdSockets::sendto(socket1, msg.c_str(), msg.size(), 0, (sockaddr *)&toAddr, sizeof(toAddr));
RCF_VERIFY(ret > 0, std::runtime_error("sendto()"));
bytesSent += ret;
packetsSent += 1;
ret = Platform::OS::BsdSockets::recvfrom(socket1, &buffer[0], buffer.size(), 0, (sockaddr *)&fromAddr, &fromAddrLen);
RCF_VERIFY(ret > 0, std::runtime_error("recvfrom()"));
bytesReceived += ret;
packetsReceived += 1;
}
}
std::cout << "Packets sent: " << packetsSent << std::endl;
std::cout << "Bytes sent: " << bytesSent << std::endl;
std::cout << "Packets received: " << packetsReceived << std::endl;
std::cout << "Bytes received: " << bytesReceived << std::endl;
}
else if (server)
{
ret = bind(socket1, (sockaddr *)&serverAddr, sizeof(serverAddr));
RCF_VERIFY(ret == 0, std::runtime_error("bind()"));
std::vector<char> buffer(buflen);
//while (true)
for (int i=0; i<calls; ++i)
{
ret = Platform::OS::BsdSockets::recvfrom(socket1, &buffer[0], buffer.size(), 0, (sockaddr *)&fromAddr, &fromAddrLen);
RCF_VERIFY(ret > 0, std::runtime_error("recvfrom()"));
bytesSent += ret;
packetsSent += 1;
ret = Platform::OS::BsdSockets::sendto(socket1, &buffer[0], ret, 0, (sockaddr *)&fromAddr, fromAddrLen);
RCF_VERIFY(ret > 0, std::runtime_error("sendto()"));
bytesReceived += ret;
packetsReceived += 1;
}
}
Platform::OS::BsdSockets::closesocket(socket1);
}
void testTcp(const std::string &title, const std::string &ip, int port, bool server, bool client, int calls, int buflen)
{
RCF_ASSERT(!(server && client));
int ret = 0;
/*
sockaddr_in clientAddr = {0};
clientAddr.sin_family = AF_INET;
clientAddr.sin_port = 0;
clientAddr.sin_addr.s_addr = INADDR_ANY;
*/
sockaddr_in serverAddr = {0};
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(port);
serverAddr.sin_addr.s_addr = inet_addr(ip.c_str());//INADDR_ANY;
sockaddr_in toAddr = {0};
toAddr.sin_family = AF_INET;
toAddr.sin_port = htons(port);
toAddr.sin_addr.s_addr = inet_addr(ip.c_str());
sockaddr_in fromAddr = {0};
int fromAddrLen = sizeof(fromAddr);
int socket1 = ::socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
int packetsSent = 0;
int packetsReceived = 0;
int bytesSent = 0;
int bytesReceived = 0;
if (client)
{
ret = Platform::OS::BsdSockets::connect(socket1, (sockaddr *)&toAddr, sizeof(toAddr));
RCF_VERIFY(ret == 0, std::runtime_error("bind()"));
std::string msg(buflen, 'A');
std::vector<char> buffer(buflen);
{
util::Profile profile( title + "raw tcp round trips, " + boost::lexical_cast<std::string>(calls) + " calls");
for (int i=0; i<calls; ++i)
{
ret = Platform::OS::BsdSockets::send(socket1, msg.c_str(), msg.size(), 0);
RCF_VERIFY(ret == buflen, std::runtime_error("send()"))(ret);
bytesSent += ret;
packetsSent += 1;
ret = Platform::OS::BsdSockets::recv(socket1, &buffer[0], buffer.size(), 0);
RCF_VERIFY(ret == buflen, std::runtime_error("recv()"))(ret);
bytesReceived += ret;
packetsReceived += 1;
}
}
std::cout << "Packets sent: " << packetsSent << std::endl;
std::cout << "Bytes sent: " << bytesSent << std::endl;
std::cout << "Packets received: " << packetsReceived << std::endl;
std::cout << "Bytes received: " << bytesReceived << std::endl;
}
else if (server)
{
ret = bind(socket1, (struct sockaddr*) &serverAddr, sizeof(serverAddr));
RCF_VERIFY(ret == 0, std::runtime_error("bind()"));
ret = listen(socket1, 10);
RCF_VERIFY(ret == 0, std::runtime_error("listen()"));
std::vector<char> buffer(buflen);
//while (true)
{
int socket2 = Platform::OS::BsdSockets::accept(socket1, (sockaddr *)&fromAddr, &fromAddrLen);
RCF_VERIFY(socket2 > 0, std::runtime_error("accept()"))(socket2);
//while (true)
for (int i=0; i<calls; ++i)
{
ret = Platform::OS::BsdSockets::recv(socket2, &buffer[0], buffer.size(), 0);
RCF_VERIFY(ret == 0 || ret == buflen, std::runtime_error("recv()"))(ret);
if (ret == 0)
{
Platform::OS::BsdSockets::closesocket(socket2);
break;
}
bytesSent += ret;
packetsSent += 1;
ret = Platform::OS::BsdSockets::send(socket2, &buffer[0], ret, 0);
RCF_VERIFY(ret == buflen, std::runtime_error("send()"))(ret);
bytesReceived += ret;
packetsReceived += 1;
}
}
}
Platform::OS::BsdSockets::closesocket(socket1);
}
} // namespace Test_Performance
int RCF_TEST_MAIN(int argc, char **argv)
{
printTestHeader(__FILE__);
using namespace Test_Performance;
util::CommandLineOption<bool> clServer("server", false, "act as server");
util::CommandLineOption<bool> clClient("client", false, "act as client");
util::CommandLineOption<std::string> clIp("ip", util::PortNumbers::getSingleton().getIp(), "ip");
util::CommandLineOption<int> clPort("port", util::PortNumbers::getSingleton().getCurrent(), "port");
util::CommandLineOption<std::string> clScert("scert", TEMP_DIR "ssCert2.pem", "OpenSSL server certificate");
util::CommandLineOption<std::string> clSpwd("spwd", "mt2316", "OpenSSL server certificate password");
util::CommandLineOption<std::string> clCcert("ccert", TEMP_DIR "ssCert1.pem", "OpenSSL client certificate");
util::CommandLineOption<std::string> clCpwd("cpwd", "mt2316", "OpenSSL client certificate password");
util::CommandLineOption<int> clCalls( "calls", 1000, "number of calls");
util::CommandLineOption<int> clCallsLarge( "Calls", 1, "number of large message-size calls");
util::CommandLineOption<int> clProtocol("protocol", 1, "serialization protocol (1-4)");
util::CommandLineOption<int> clTest( "test", 0, "which test to run, 0 to run them all");
util::CommandLineOption<int> clLargeMessageSize("msgsize", 25, "large message size, in Mb");
util::CommandLineOption<int> clRawMessageSize("rawmsgsize", 50, "raw message size, in bytes, for non-RCF tcp and udp tests");
util::CommandLine::getSingleton().parse(argc, argv);
std::string clientCertificateFile = clCcert;
std::string clientCertificateFilePassword = clCpwd;
std::string serverCertificateFile = clScert;
std::string serverCertificateFilePassword = clSpwd;
int calls = clCalls;
int callsLarge = clCallsLarge;
int requestedTest = clTest;
int currentTest = 0;
util::PortNumbers::getSingleton().setCurrent(clPort);
util::PortNumbers::getSingleton().setIp(clIp);
if (clServer && clClient)
{
gServer = true;
gClient = true;
}
else if (clServer)
{
gServer = true;
gClient = false;
}
else if (clClient)
{
gServer = false;
gClient = true;
}
if (requestedTest == 0)
{
// test all serialization protocols over a TCP transport
for (int protocol=0; protocol<10; ++protocol)
{
if (RCF::isSerializationProtocolSupported(protocol))
{
RCF::EndpointPair endpointPair = RCF::TcpEndpointFactory().createEndpointPair();
RCF::ClientTransportAutoPtr clientTransportPtr(endpointPair.first->createClientTransport());
RCF::ServerTransportPtr serverTransportPtr(endpointPair.second->createServerTransport().release());
std::string title = "TcpEndpoint transport: ";
runPerformanceTest(
title,
clientTransportPtr,
serverTransportPtr,
RCF::Twoway,
"",
protocol,
std::vector<RCF::FilterFactoryPtr>(),
std::vector<RCF::FilterPtr>(),
std::vector<RCF::FilterPtr>(),
calls);
}
}
}
// test tcp network performance, without RCF involvement
++currentTest;
if (requestedTest == 0 || requestedTest == currentTest)
{
std::string ip = util::PortNumbers::getSingleton().getIp();
int port = util::PortNumbers::getSingleton().getNext();
int buflen = clRawMessageSize;
std::string title = "Test " + boost::lexical_cast<std::string>(currentTest) + ": ";
if (gServer && gClient)
{
RCF::ThreadPtr threadPtr( new RCF::Thread( boost::bind(&testTcp, title, ip, port, true, false, calls, buflen)));
Platform::OS::SleepMs(1000);
testTcp(title, ip, port, false, true, calls, buflen);
threadPtr->join();
}
else if (gServer)
{
testTcp(title, ip, port, true, false, calls, buflen);
}
else if (gClient)
{
testTcp(title, ip, port, false, true, calls, buflen);
}
}
// test udp network performance, without RCF involvement
++currentTest;
if (requestedTest == 0 || requestedTest == currentTest)
{
std::string ip = util::PortNumbers::getSingleton().getIp();
int port = util::PortNumbers::getSingleton().getNext();
int buflen = clRawMessageSize;
std::string title = "Test " + boost::lexical_cast<std::string>(currentTest) + ": ";
if (gServer && gClient)
{
RCF::ThreadPtr threadPtr( new RCF::Thread( boost::bind(&testUdp, title, ip, port, true, false, calls, buflen)));
Platform::OS::SleepMs(1000);
testUdp(title, ip, port, false, true, calls, buflen);
threadPtr->join();
}
else if (gServer)
{
testUdp(title, ip, port, true, false, calls, buflen);
}
else if (gClient)
{
testUdp(title, ip, port, false, true, calls, buflen);
}
}
// test all transports, against the standard serialization protocol
for (unsigned int i=0; i<RCF::getTransportFactories().size(); ++i)
{
RCF::TransportFactoryPtr transportFactoryPtr = RCF::getTransportFactories()[i];
std::pair<RCF::ServerTransportPtr, RCF::ClientTransportAutoPtrPtr> transports = transportFactoryPtr->createTransports();
RCF::ServerTransportPtr serverTransportPtr( transports.first );
RCF::ClientTransportAutoPtr clientTransportAutoPtr( *transports.second );
std::string s0 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
s0 = s0 + s0;
s0 = s0 + s0;
s0 = s0 + s0;
int serializationProtocol = 1;
++currentTest;
if (requestedTest == 0 || requestedTest == currentTest)
{
std::string title = "Test " + boost::lexical_cast<std::string>(currentTest) + ": ";
runPerformanceTest(
title,
clientTransportAutoPtr,
serverTransportPtr,
RCF::Twoway,
s0,
serializationProtocol,
std::vector<RCF::FilterFactoryPtr>(),//filterFactories,
std::vector<RCF::FilterPtr>(),
std::vector<RCF::FilterPtr>(),
calls);
}
}
std::string s0 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
s0 = s0 + s0;
s0 = s0 + s0;
s0 = s0 + s0;
int serializationProtocol = clProtocol;
// test compression and encryption filters, over TCP
{
std::vector<std::vector<RCF::FilterPtr> > payloadFilterChains;
std::vector<RCF::FilterPtr> payloadFilterChain;
#ifdef RCF_USE_ZLIB
payloadFilterChain.clear();
payloadFilterChain.push_back( RCF::FilterPtr(new RCF::ZlibStatefulCompressionFilter()) );
payloadFilterChains.push_back(payloadFilterChain);
payloadFilterChain.clear();
payloadFilterChain.push_back( RCF::FilterPtr(new RCF::ZlibStatelessCompressionFilter()) );
payloadFilterChains.push_back(payloadFilterChain);
#endif
std::vector<std::vector<RCF::FilterPtr> > transportFilterChains;
std::vector<RCF::FilterPtr> transportFilterChain;
#ifdef RCF_USE_ZLIB
transportFilterChain.clear();
transportFilterChain.push_back( RCF::FilterPtr(new RCF::ZlibStatefulCompressionFilter()) );
transportFilterChains.push_back(transportFilterChain);
transportFilterChain.clear();
transportFilterChain.push_back( RCF::FilterPtr(new RCF::ZlibStatelessCompressionFilter()) );
transportFilterChains.push_back(transportFilterChain);
#endif
#ifdef RCF_USE_OPENSSL
transportFilterChain.clear();
transportFilterChain.push_back( RCF::FilterPtr(new RCF::OpenSslEncryptionFilter(clientCertificateFile, clientCertificateFilePassword)) );
transportFilterChains.push_back(transportFilterChain);
#endif
#if defined(RCF_USE_ZLIB) && defined(RCF_USE_OPENSSL)
transportFilterChain.clear();
transportFilterChain.push_back( RCF::FilterPtr(new RCF::ZlibStatelessCompressionFilter()) );
transportFilterChain.push_back( RCF::FilterPtr(new RCF::OpenSslEncryptionFilter(clientCertificateFile, clientCertificateFilePassword)) );
transportFilterChains.push_back(transportFilterChain);
transportFilterChain.clear();
transportFilterChain.push_back( RCF::FilterPtr(new RCF::ZlibStatefulCompressionFilter()) );
transportFilterChain.push_back( RCF::FilterPtr(new RCF::OpenSslEncryptionFilter(clientCertificateFile, clientCertificateFilePassword)) );
transportFilterChains.push_back(transportFilterChain);
#endif
#ifdef BOOST_WINDOWS
transportFilterChain.clear();
transportFilterChain.push_back( RCF::FilterPtr(new RCF::SspiNtlmFilter()));
transportFilterChains.push_back(transportFilterChain);
#endif
std::vector<RCF::FilterFactoryPtr> filterFactories;
#ifdef RCF_USE_ZLIB
filterFactories.push_back(RCF::FilterFactoryPtr( new RCF::ZlibStatelessCompressionFilterFactory() ));
filterFactories.push_back(RCF::FilterFactoryPtr( new RCF::ZlibStatefulCompressionFilterFactory() ));
#endif
#ifdef RCF_USE_OPENSSL
filterFactories.push_back(RCF::FilterFactoryPtr( new RCF::OpenSslEncryptionFilterFactory(serverCertificateFile, serverCertificateFilePassword) ));
#endif
#ifdef BOOST_WINDOWS
filterFactories.push_back(RCF::FilterFactoryPtr( new RCF::SspiNtlmFilterFactory()));
#endif
// test payload filters
for (unsigned int i=0; i<payloadFilterChains.size(); ++i)
{
RCF::TcpEndpoint tcpEndpoint("localhost", util::PortNumbers::getSingleton().getNext());
RCF::ServerTransportPtr serverTransportPtr( tcpEndpoint.createServerTransport().release() );
RCF::ClientTransportAutoPtr clientTransportAutoPtr( tcpEndpoint.createClientTransport() );
++currentTest;
if (requestedTest == 0 || requestedTest == currentTest)
{
std::string title = "Test " + boost::lexical_cast<std::string>(currentTest) + ": ";
runPerformanceTest(
title,
clientTransportAutoPtr,
serverTransportPtr,
RCF::Twoway,
s0,
serializationProtocol,
filterFactories,
payloadFilterChains[i],
std::vector<RCF::FilterPtr>(),
calls);
}
}
// test transport filters
for (unsigned int i=0; i<transportFilterChains.size(); ++i)
{
RCF::TcpEndpoint tcpEndpoint("localhost", util::PortNumbers::getSingleton().getNext());
RCF::ServerTransportPtr serverTransportPtr( tcpEndpoint.createServerTransport().release() );
RCF::ClientTransportAutoPtr clientTransportAutoPtr( tcpEndpoint.createClientTransport() );
++currentTest;
if (requestedTest == 0 || requestedTest == currentTest)
{
std::string title = "Test " + boost::lexical_cast<std::string>(currentTest) + ": ";
runPerformanceTest(
title,
clientTransportAutoPtr,
serverTransportPtr,
RCF::Twoway,
s0,
serializationProtocol,
filterFactories,
std::vector<RCF::FilterPtr>(),
transportFilterChains[i],
calls);
}
}
}
// test transmission of large messages
{
int len = 1024*1024*clLargeMessageSize;
std::vector<char> buffer(len);
memset(&buffer[0], 1, len);
std::string s0(&buffer[0], len);
++currentTest;
if (requestedTest == 0 || requestedTest == currentTest)
{
RCF::TcpEndpoint tcpEndpoint("localhost", util::PortNumbers::getSingleton().getNext());
RCF::ServerTransportPtr serverTransportPtr( tcpEndpoint.createServerTransport().release() );
RCF::ClientTransportAutoPtr clientTransportAutoPtr( tcpEndpoint.createClientTransport() );
serverTransportPtr->setMaxMessageLength(-1);
clientTransportAutoPtr->setMaxMessageLength(-1);
unsigned int timeoutMs = 1000*3600;
std::string title = "Test " + boost::lexical_cast<std::string>(currentTest) + ": ";
runPerformanceTest(
title,
clientTransportAutoPtr,
serverTransportPtr,
RCF::Twoway,
s0,
serializationProtocol,
std::vector<RCF::FilterFactoryPtr>(),
std::vector<RCF::FilterPtr>(),
std::vector<RCF::FilterPtr>(),
callsLarge,
timeoutMs,
false);
}
}
// test oneway invocations - tcp
{
RCF::TcpEndpoint tcpEndpoint("localhost", util::PortNumbers::getSingleton().getNext());
RCF::ServerTransportPtr serverTransportPtr( tcpEndpoint.createServerTransport().release() );
RCF::ClientTransportAutoPtr clientTransportAutoPtr( tcpEndpoint.createClientTransport() );
++currentTest;
if (requestedTest == 0 || requestedTest == currentTest)
{
std::string title = "Test " + boost::lexical_cast<std::string>(currentTest) + ": ";
runPerformanceTest(
title,
clientTransportAutoPtr,
serverTransportPtr,
RCF::Oneway,
s0,
serializationProtocol,
std::vector<RCF::FilterFactoryPtr>(),
std::vector<RCF::FilterPtr>(),
std::vector<RCF::FilterPtr>(),
calls);
}
}
// test oneway invocations - udp
{
RCF::UdpEndpoint udpEndpoint("127.0.0.1", util::PortNumbers::getSingleton().getNext());
RCF::ServerTransportPtr serverTransportPtr( udpEndpoint.createServerTransport().release() );
RCF::ClientTransportAutoPtr clientTransportAutoPtr( udpEndpoint.createClientTransport() );
++currentTest;
if (requestedTest == 0 || requestedTest == currentTest)
{
std::string title = "Test " + boost::lexical_cast<std::string>(currentTest) + ": ";
runPerformanceTest(
title,
clientTransportAutoPtr,
serverTransportPtr,
RCF::Oneway,
s0,
serializationProtocol,
std::vector<RCF::FilterFactoryPtr>(),
std::vector<RCF::FilterPtr>(),
std::vector<RCF::FilterPtr>(),
calls);
}
}
return boost::exit_success;
}