//*****************************************************************************
// RCF - Remote Call Framework
// Copyright (c) 2005. All rights reserved.
// Developed by Jarl Lindrud.
// Contact: jlindrud@hotmail.com .
//*****************************************************************************
#include <RCF/TcpAsioClientTransport.hpp>
#include <boost/bind.hpp>
#include <RCF/ByteOrdering.hpp>
#include <RCF/InitDeinit.hpp>
#include <RCF/TcpAsioSynchronizedSocket.hpp>
#include <RCF/TcpEndpoint.hpp>
#include <RCF/TimedBsdSockets.hpp> // timedConnect()
namespace RCF {
// TcpAsioClientTransport
TcpAsioClientTransport::TcpAsioClientTransport(const std::string &ip, int port, DemuxerPtr demuxerPtr/* = DemuxerPtr()*/) :
mDemuxerPtr(demuxerPtr),// ? demuxerPtr : getSharedDemuxerPtr() ),
mSynchronizedSocketPtr( new SynchronizedSocket(mDemuxerPtr, getSharedReadWriteMutexPtr())),
mIp(ip),
mPort(port),
mEndpoint(),
mEndTimeMs(),
mStatePtr(new State(*mDemuxerPtr)),
mTimer(mStatePtr->mTimer),
mBytesTransferred(mStatePtr->mBytesTransferred),
mTimeoutFlag(mStatePtr->mTimeoutFlag),
mError(mStatePtr->mError),
mEventMutex(mStatePtr->mEventMutex),
mEvent(mStatePtr->mEvent)
{
}
TcpAsioClientTransport::TcpAsioClientTransport(SynchronizedSocketPtr synchronizedSocketPtr, DemuxerPtr demuxerPtr/* = DemuxerPtr()*/) :
mDemuxerPtr(demuxerPtr),// ? demuxerPtr : getSharedDemuxerPtr() ),
mSynchronizedSocketPtr(synchronizedSocketPtr),
mIp(),
mPort(),
mEndpoint(),
mEndTimeMs(),
mStatePtr(new State(*mDemuxerPtr)),
mTimer(mStatePtr->mTimer),
mBytesTransferred(mStatePtr->mBytesTransferred),
mTimeoutFlag(mStatePtr->mTimeoutFlag),
mError(mStatePtr->mError),
mEventMutex(mStatePtr->mEventMutex),
mEvent(mStatePtr->mEvent)
{
}
TcpAsioClientTransport::TcpAsioClientTransport(const TcpAsioClientTransport &rhs) :
mDemuxerPtr(rhs.mDemuxerPtr),
mSynchronizedSocketPtr( new SynchronizedSocket(mDemuxerPtr, getSharedReadWriteMutexPtr())),
mIp(rhs.mIp),
mPort(rhs.mPort),
mEndpoint(rhs.mEndpoint),
mEndTimeMs(),
mStatePtr(new State(*mDemuxerPtr)),
mTimer(mStatePtr->mTimer),
mBytesTransferred(mStatePtr->mBytesTransferred),
mTimeoutFlag(mStatePtr->mTimeoutFlag),
mError(mStatePtr->mError),
mEventMutex(mStatePtr->mEventMutex),
mEvent(mStatePtr->mEvent)
{
}
TcpAsioClientTransport::~TcpAsioClientTransport()
{
close();
}
// this function exists because borland c++ doesn't seem to find the comparison operator
// for asio::ipv4::tcp::endpoint
bool isClear(const asio::ip::tcp::endpoint &endpoint)
{
asio::ip::tcp::endpoint endpoint0;
return
endpoint.address() == endpoint0.address() &&
endpoint.port() == endpoint0.port();
}
ClientTransportAutoPtr TcpAsioClientTransport::clone() const
{
if (mSynchronizedSocketPtr.get() && mIp == "" && isClear(mEndpoint))
{
mSynchronizedSocketPtr->getRemoteEndpoint(mEndpoint);
mIp = mEndpoint.address().to_string();
mPort = mEndpoint.port();
}
return ClientTransportAutoPtr( new TcpAsioClientTransport(*this) );
}
EndpointPtr TcpAsioClientTransport::getEndpointPtr() const
{
if (mSynchronizedSocketPtr.get() && mIp == "" && isClear(mEndpoint))
{
mSynchronizedSocketPtr->getRemoteEndpoint(mEndpoint);
mIp = mEndpoint.address().to_string();
mPort = mEndpoint.port();
}
return EndpointPtr( new TcpEndpoint(mIp, mPort) );
}
int TcpAsioClientTransport::send(const std::string &data, unsigned int timeoutMs)
{
mEndTimeMs = getCurrentTimeMs() + timeoutMs;
// connect
if (!isConnected())
{
int ret = connect();
if (ret != 1)
{
return ret;
}
}
// encode message length
BOOST_STATIC_ASSERT(sizeof(unsigned int) == 4);
mWriteBuffer.resize(4+data.length());
*(unsigned int *) &mWriteBuffer[0] = static_cast<unsigned int>(data.length());
RCF::machineToNetworkOrder(&mWriteBuffer[0], 4, 1);
memcpy(&mWriteBuffer[4], data.c_str(), data.length());
// send message
write(&mWriteBuffer[0], mWriteBuffer.size());
return errFromAsioErr();
}
int TcpAsioClientTransport::receive(std::string &data, unsigned int timeoutMs)
{
// read message length
BOOST_STATIC_ASSERT(sizeof(unsigned int) == 4);
mReadBuffer.resize(4);
std::size_t bytesTransferred = read(&mReadBuffer[0], mReadBuffer.size());
if (bytesTransferred == 4)
{
// read message
networkToMachineOrder(&mReadBuffer[0], 4, 1);
unsigned int length = * (unsigned int *) &mReadBuffer[0];
if (length == 0 || length > getMaxMessageLength())
{
RCF_THROW(ClientTransportException, "bad message length")(length)(getMaxMessageLength());
}
mReadBuffer.resize(length);
bytesTransferred = read(&mReadBuffer[0], mReadBuffer.size());
if (bytesTransferred == length)
{
data.assign(&mReadBuffer[0], mReadBuffer.size());
return 1;
}
}
return errFromAsioErr();
}
bool TcpAsioClientTransport::isConnected()
{
return mSynchronizedSocketPtr->isConnected();
}
void TcpAsioClientTransport::close()
{
if (mSynchronizedSocketPtr)
{
mSynchronizedSocketPtr->close();
}
}
void TcpAsioClientTransport::onFilteredReadWriteCompletion(std::size_t bytesTransferred, int error)
{
mBytesTransferred = bytesTransferred;
mError = error;
}
void TcpAsioClientTransport::setTransportFilters(const std::vector<FilterPtr> &filters)
{
mTransportFilters.assign(filters.begin(), filters.end());
RCF::connectFilters(
mTransportFilters,
boost::bind(&TcpAsioClientTransport::readSingle, this, _1, _2),
boost::bind(&TcpAsioClientTransport::writeSingle, this, _1, _2),
boost::bind(&TcpAsioClientTransport::onFilteredReadWriteCompletion, this, _1, _2));
}
TcpAsioClientTransport::SynchronizedSocketPtr TcpAsioClientTransport::releaseSynchronizedSocketPtr()
{
SynchronizedSocketPtr synchronizedSocketPtr(mSynchronizedSocketPtr);
mSynchronizedSocketPtr.reset();
return synchronizedSocketPtr;
}
TcpAsioClientTransport::SynchronizedSocketPtr TcpAsioClientTransport::getSynchronizedSocketPtr()
{
return mSynchronizedSocketPtr;
}
int TcpAsioClientTransport::connect()
{
mSynchronizedSocketPtr->close();
mSynchronizedSocketPtr->open();
int fd = static_cast<int>(mSynchronizedSocketPtr->impl());
unsigned long ul_addr = 0; // in network order
int port = 0; // in host order
if (isClear(mEndpoint))
{
hostent *hostDesc = ::gethostbyname( mIp.c_str() );
if (hostDesc)
{
char *szIp = ::inet_ntoa( * (in_addr*) hostDesc->h_addr_list[0]);
ul_addr = ::inet_addr(szIp);
port = mPort;
}
}
else
{
asio::ip::address address = mEndpoint.address();
ul_addr = htonl(address.to_v4().to_ulong());
port = mEndpoint.port();
}
sockaddr_in remoteAddr;
memset(&remoteAddr, 0, sizeof(remoteAddr));
remoteAddr.sin_family = AF_INET;
remoteAddr.sin_addr.s_addr = ul_addr;
//remoteAddr.sin_port = ::htons(port); // the :: seems to screw up gcc (!?!)
remoteAddr.sin_port = htons(port); // the :: seems to screw up gcc (!?!)
unsigned int timeoutMs = generateTimeoutMs(mEndTimeMs);
int ret = timedConnect(timeoutMs, fd, (sockaddr*) &remoteAddr, sizeof(remoteAddr)); // TODO: error checking and reporting
return ret == 0 ? 1 : -1;
}
bool waitUntilFdReadable(int fd, unsigned int timeoutMs)
{
fd_set fdSet;
FD_ZERO(&fdSet);
FD_SET(fd, &fdSet);
timeval timeout;
timeout.tv_sec = timeoutMs/1000;
timeout.tv_usec = 1000*(timeoutMs%1000);
int ret = ::select(fd+1, &fdSet, NULL, NULL, &timeout);
return ret == 0 ? false : true; // return true for timeout (ret == 0), otherwise false
}
void TcpAsioClientTransport::readSingle(char *buffer, std::size_t bufferLen)
{
mError = asio::error();
mBytesTransferred = 0;
mTimeoutFlag = false;
int fd = static_cast<int>(mSynchronizedSocketPtr->impl());
unsigned int timeoutMs = generateTimeoutMs(mEndTimeMs);
if (waitUntilFdReadable(fd, timeoutMs))
{
mError = asio::error();
std::size_t ret = mSynchronizedSocketPtr->read(buffer, bufferLen, mError);
if (ret > 0)
{
mTransportFilters.empty() ?
onFilteredReadWriteCompletion(ret, 0) :
mTransportFilters.back()->onReadWriteCompleted(ret, 0);
return;
}
}
else
{
mTimeoutFlag = true;
}
mTransportFilters.empty() ?
onFilteredReadWriteCompletion(0, -1) :
mTransportFilters.back()->onReadWriteCompleted(0, -1);
}
void TcpAsioClientTransport::writeSingle(const char *buffer, std::size_t bufferLen)
{
mError = asio::error();
std::size_t bytesWritten = mSynchronizedSocketPtr->write(buffer, bufferLen, mError);
if (mError)
{
mSynchronizedSocketPtr->setCloseFlag();
}
int err = mError ? -1 : 0;
mTransportFilters.empty() ?
onFilteredReadWriteCompletion(bytesWritten, err) :
mTransportFilters.back()->onReadWriteCompleted(bytesWritten, err);
}
std::size_t TcpAsioClientTransport::read(char *buffer, std::size_t bufferLen)
{
std::size_t offset = 0;
while (!mError && !mTimeoutFlag && offset < bufferLen)
{
mTransportFilters.empty() ?
readSingle(buffer+offset, bufferLen-offset) :
mTransportFilters.front()->read(buffer+offset, bufferLen-offset);
offset += (!mError) ? mBytesTransferred : 0;
}
RCF_ASSERT(offset <= bufferLen);
return offset;
}
std::size_t TcpAsioClientTransport::write(const char *buffer, std::size_t bufferLen)
{
std::size_t offset = 0;
while (!mError && !mTimeoutFlag && offset < bufferLen)
{
mTransportFilters.empty() ?
writeSingle(buffer+offset, bufferLen-offset) :
mTransportFilters.front()->write(buffer+offset, bufferLen-offset);
offset += (!mError) ? mBytesTransferred : 0;
}
RCF_ASSERT(offset <= bufferLen);
return offset;
}
int TcpAsioClientTransport::errFromAsioErr()
{
if (mTimeoutFlag)
{
return -2;
}
else if (mError && mError.code() == asio::error::connection_reset)
{
return 0;
}
else if (mError)
{
RCF_TRACE("asio error: ")(mError.code());
return -1;
}
else
{
return 1;
}
}
void TcpAsioClientTransport::init()
{
deinit();
spReadWriteMutexPtr = new ReadWriteMutexPtr( new ReadWriteMutex(WriterPriority));
spDemuxerPtr = new DemuxerPtr( new Demuxer );
}
void TcpAsioClientTransport::deinit()
{
delete spReadWriteMutexPtr;
spReadWriteMutexPtr = NULL;
delete spDemuxerPtr;
spDemuxerPtr = NULL;
}
ReadWriteMutexPtr TcpAsioClientTransport::getSharedReadWriteMutexPtr()
{
return *spReadWriteMutexPtr;
}
DemuxerPtr TcpAsioClientTransport::getSharedDemuxerPtr()
{
return *spDemuxerPtr;
}
DemuxerPtr * TcpAsioClientTransport::spDemuxerPtr;
ReadWriteMutexPtr * TcpAsioClientTransport::spReadWriteMutexPtr;
RCF_ON_INIT_DEINIT(TcpAsioClientTransport::init(), TcpAsioClientTransport::deinit())
} // namespace RCF