//*****************************************************************************
// RCF - Remote Call Framework
// Copyright (c) 2005. All rights reserved.
// Developed by Jarl Lindrud.
// Contact: jlindrud@hotmail.com .
//*****************************************************************************
#include <RCF/TcpAsioSynchronizedSocket.hpp>
#include <boost/bind.hpp>
#include <RCF/UsingBsdSockets.hpp>
#include <RCF/Tools.hpp>
namespace RCF {
TcpAsioSynchronizedSocket::TcpAsioSynchronizedSocket(DemuxerPtr demuxerPtr, ReadWriteMutexPtr readWriteMutexPtr/* = ReadWriteMutexPtr()*/) :
mReadWriteMutexPtr(readWriteMutexPtr),
mSocket(*demuxerPtr),
mCloseFlag()
{
}
void TcpAsioSynchronizedSocket::setSynchronized(bool synchronized)
{
WriteLock writeLock(*mReadWriteMutexPtr);
synchronized && !mMutexAutoPtr.get() ?
mMutexAutoPtr.reset(new Mutex) :
mMutexAutoPtr.reset();
}
void TcpAsioSynchronizedSocket::setCloseFlag()
{
mCloseFlag = true;
}
bool TcpAsioSynchronizedSocket::getCloseFlag()
{
return mCloseFlag;
}
bool TcpAsioSynchronizedSocket::isConnected()
{
if (mCloseFlag)
{
return false;
}
else if (mReadWriteMutexPtr.get())
{
ReadLock readLock(*mReadWriteMutexPtr);
return mMutexAutoPtr.get() ?
isConnectedSynchronized() :
isConnectedUnsynchronized();
}
else
{
return isConnectedUnsynchronized();
}
}
bool TcpAsioSynchronizedSocket::isConnectedSynchronized()
{
Lock lock(*mMutexAutoPtr);
return isConnectedUnsynchronized();
}
bool TcpAsioSynchronizedSocket::isConnectedUnsynchronized()
{
// apply some low level socket routines to figure out if there are errors pending on this socket
int fd = static_cast<int>(mSocket.native());
if (fd != -1)
{
timeval tv = {0,0};
fd_set readFds;
FD_ZERO(&readFds);
FD_SET(fd, &readFds);
int ret = Platform::OS::BsdSockets::select(fd+1, &readFds, NULL, NULL, &tv);
if (ret == 0)
{
return true;
}
else if (ret == 1)
{
const int length = 1;
char buffer[length];
int ret = Platform::OS::BsdSockets::recv(fd, buffer, length, MSG_PEEK);
int err = Platform::OS::BsdSockets::GetLastError();
if (ret > 0 || (
ret == -1 && (
err == Platform::OS::BsdSockets::ERR_EINPROGRESS ||
err == Platform::OS::BsdSockets::ERR_EWOULDBLOCK)))
{
return true;
}
}
}
return false;
}
template<typename Handler>
void TcpAsioSynchronizedSocket::connect(const asio::ip::tcp::endpoint &endpoint, const Handler &handler)
{
if (mReadWriteMutexPtr.get())
{
ReadLock readLock(*mReadWriteMutexPtr);
mMutexAutoPtr.get() ?
connectSynchronized(endpoint, handler) :
connectUnsynchronized(endpoint, handler);
}
else
{
connectUnsynchronized(endpoint, handler);
}
}
template<typename Handler>
void TcpAsioSynchronizedSocket::connectSynchronized(const asio::ip::tcp::endpoint &endpoint, const Handler &handler)
{
Lock lock(*mMutexAutoPtr);
connectUnsynchronized(endpoint, handler);
}
template<typename Handler>
void TcpAsioSynchronizedSocket::connectUnsynchronized(const asio::ip::tcp::endpoint &endpoint, const Handler &handler)
{
mSocket.async_connect(endpoint, handler);
}
template<typename Handler>
void TcpAsioSynchronizedSocket::accept(asio::ip::tcp::acceptor &acceptor, const Handler &handler)
{
if (mReadWriteMutexPtr.get())
{
ReadLock readLock(*mReadWriteMutexPtr);
mMutexAutoPtr.get() ?
acceptSynchronized(acceptor, handler) :
acceptUnsynchronized(acceptor, handler);
}
else
{
acceptUnsynchronized(acceptor, handler);
}
}
template<typename Handler>
void TcpAsioSynchronizedSocket::acceptSynchronized(asio::ip::tcp::acceptor &acceptor, const Handler &handler)
{
Lock lock(*mMutexAutoPtr);
acceptUnsynchronized(acceptor, handler);
}
template<typename Handler>
void TcpAsioSynchronizedSocket::acceptUnsynchronized(asio::ip::tcp::acceptor &acceptor, const Handler &handler)
{
acceptor.async_accept(mSocket, handler);
}
template<typename Handler>
void TcpAsioSynchronizedSocket::read(char *buffer, std::size_t bufferLen, const Handler &handler)
{
if (mReadWriteMutexPtr.get())
{
ReadLock readLock(*mReadWriteMutexPtr);
mMutexAutoPtr.get() ?
readSynchronized(buffer, bufferLen, handler) :
readUnsynchronized(buffer, bufferLen, handler);
}
else
{
readUnsynchronized(buffer, bufferLen, handler);
}
}
template<typename Handler>
void TcpAsioSynchronizedSocket::readSynchronized(char *buffer, std::size_t bufferLen, const Handler &handler)
{
Lock lock(*mMutexAutoPtr);
readUnsynchronized(buffer, bufferLen, handler);
}
template<typename Handler>
void TcpAsioSynchronizedSocket::readUnsynchronized(char *buffer, std::size_t bufferLen, const Handler &handler)
{
mSocket.async_receive(asio::buffer(buffer, bufferLen), 0, handler);
}
template<typename Handler>
void TcpAsioSynchronizedSocket::write(const char *buffer, std::size_t bufferLen, const Handler &handler)
{
if (mReadWriteMutexPtr.get())
{
ReadLock readLock(*mReadWriteMutexPtr);
mMutexAutoPtr.get() ?
writeSynchronized(buffer, bufferLen, handler) :
writeUnsynchronized(buffer, bufferLen, handler);
}
else
{
writeUnsynchronized(buffer, bufferLen, handler);
}
}
template<typename Handler>
void TcpAsioSynchronizedSocket::writeSynchronized(const char *buffer, std::size_t bufferLen, const Handler &handler)
{
Lock lock(*mMutexAutoPtr);
writeUnsynchronized(buffer, bufferLen, handler);
}
template<typename Handler>
void TcpAsioSynchronizedSocket::writeUnsynchronized(const char *buffer, std::size_t bufferLen, const Handler &handler)
{
//mSocket.async_send(asio::buffer(buffer, bufferLen), 0, handler);
asio::async_write(mSocket, asio::buffer(buffer, bufferLen), handler);
}
std::size_t TcpAsioSynchronizedSocket::read(char *buffer, std::size_t bufferLen, asio::error &error)
{
if (mReadWriteMutexPtr.get())
{
ReadLock readLock(*mReadWriteMutexPtr);
return mMutexAutoPtr.get() ?
readSynchronized(buffer, bufferLen, error) :
readUnsynchronized(buffer, bufferLen, error);
}
else
{
return readUnsynchronized(buffer, bufferLen, error);
}
}
std::size_t TcpAsioSynchronizedSocket::readSynchronized(char *buffer, std::size_t bufferLen, asio::error &error)
{
Lock lock(*mMutexAutoPtr);
return readUnsynchronized(buffer, bufferLen, error);
}
std::size_t TcpAsioSynchronizedSocket::readUnsynchronized(char *buffer, std::size_t bufferLen, asio::error &error)
{
/*return mSocket.read_some(
asio::buffer(buffer, bufferLen),
boost::bind(&TcpAsioSynchronizedSocket::onError, this, boost::ref(error), asio::placeholders::error));*/
// for symmetry with writeUnsynchronized(), we use asio::read
return asio::read(
mSocket,
asio::buffer(buffer, bufferLen),
asio::transfer_at_least(1),
boost::bind(&TcpAsioSynchronizedSocket::onError, this, boost::ref(error), asio::placeholders::error));
}
std::size_t TcpAsioSynchronizedSocket::write(const char *buffer, std::size_t bufferLen, asio::error &error)
{
if (mReadWriteMutexPtr.get())
{
ReadLock readLock(*mReadWriteMutexPtr);
return mMutexAutoPtr.get() ?
writeSynchronized(buffer, bufferLen, error) :
writeUnsynchronized(buffer, bufferLen, error);
}
else
{
return writeUnsynchronized(buffer, bufferLen, error);
}
}
std::size_t TcpAsioSynchronizedSocket::writeSynchronized(const char *buffer, std::size_t bufferLen, asio::error &error)
{
Lock lock(*mMutexAutoPtr);
return writeUnsynchronized(buffer, bufferLen, error);
}
std::size_t TcpAsioSynchronizedSocket::writeUnsynchronized(const char *buffer, std::size_t bufferLen, asio::error &error)
{
return asio::write(
mSocket,
asio::buffer(buffer, bufferLen),
asio::transfer_all(),
boost::bind(&TcpAsioSynchronizedSocket::onError, this, boost::ref(error), asio::placeholders::error));
}
void TcpAsioSynchronizedSocket::open()
{
if (mReadWriteMutexPtr.get())
{
ReadLock readLock(*mReadWriteMutexPtr);
mMutexAutoPtr.get() ?
openSynchronized() :
openUnsynchronized();
}
else
{
openUnsynchronized();
}
}
void TcpAsioSynchronizedSocket::openSynchronized()
{
Lock lock(*mMutexAutoPtr);
openUnsynchronized();
}
void TcpAsioSynchronizedSocket::openUnsynchronized()
{
mSocket.open( asio::ip::tcp::v4() );
}
void TcpAsioSynchronizedSocket::close()
{
if (mReadWriteMutexPtr.get())
{
ReadLock readLock(*mReadWriteMutexPtr);
mMutexAutoPtr.get() ?
closeSynchronized() :
closeUnsynchronized();
}
else
{
closeUnsynchronized();
}
}
void TcpAsioSynchronizedSocket::closeSynchronized()
{
Lock lock(*mMutexAutoPtr);
closeUnsynchronized();
}
void TcpAsioSynchronizedSocket::closeUnsynchronized()
{
mSocket.close();
}
asio::ip::tcp::socket::native_type TcpAsioSynchronizedSocket::impl()
{
if (mReadWriteMutexPtr.get())
{
ReadLock readLock(*mReadWriteMutexPtr);
return mMutexAutoPtr.get() ?
implSynchronized() :
implUnsynchronized();
}
else
{
return implUnsynchronized();
}
}
asio::ip::tcp::socket::native_type TcpAsioSynchronizedSocket::implSynchronized()
{
Lock lock(*mMutexAutoPtr);
return implUnsynchronized();
}
asio::ip::tcp::socket::native_type TcpAsioSynchronizedSocket::implUnsynchronized()
{
return mSocket.native();
}
void TcpAsioSynchronizedSocket::getRemoteEndpoint(asio::ip::tcp::endpoint &endpoint) const
{
if (mReadWriteMutexPtr.get())
{
ReadLock readLock(*mReadWriteMutexPtr);
mMutexAutoPtr.get() ?
getRemoteEndpointSynchronized(endpoint) :
getRemoteEndpointUnsynchronized(endpoint);
}
else
{
getRemoteEndpointUnsynchronized(endpoint);
}
}
void TcpAsioSynchronizedSocket::getRemoteEndpointSynchronized(asio::ip::tcp::endpoint &endpoint) const
{
Lock lock(*mMutexAutoPtr);
getRemoteEndpointUnsynchronized(endpoint);
}
void TcpAsioSynchronizedSocket::getRemoteEndpointUnsynchronized(asio::ip::tcp::endpoint &endpoint) const
{
endpoint = mSocket.remote_endpoint();
}
void TcpAsioSynchronizedSocket::onError(asio::error &myError, const asio::error &error)
{
myError = error;
}
} // namespace RCF