//*****************************************************************************
// RCF - Remote Call Framework
// Copyright (c) 2005. All rights reserved.
// Developed by Jarl Lindrud.
// Contact: jlindrud@hotmail.com .
//*****************************************************************************
#ifndef INCLUDE_RCF_TCPASIOSERVERTRANSPORT_HPP
#define INCLUDE_RCF_TCPASIOSERVERTRANSPORT_HPP
#include <set>
#include <vector>
#include <boost/enable_shared_from_this.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/weak_ptr.hpp>
#include <boost/asio.hpp>
#include <RCF/IpAddress.hpp>
#include <RCF/IpServerTransport.hpp>
#include <RCF/ServerTransport.hpp>
#include <RCF/Service.hpp>
#include <RCF/ThreadLibrary.hpp>
namespace RCF {
namespace asio = boost::asio;
class RcfServer;
class TcpAsioSynchronizedSocket;
typedef asio::io_service Demuxer;
typedef boost::shared_ptr<Demuxer> DemuxerPtr;
typedef asio::ip::tcp::acceptor SocketAcceptor;
typedef boost::shared_ptr<SocketAcceptor> SocketAcceptorPtr;
typedef asio::deadline_timer DeadlineTimer;
typedef boost::shared_ptr<DeadlineTimer> DeadlineTimerPtr;
class TcpAsioServerTransport :
public I_ServerTransport,
public I_ServerTransportEx,
public I_IpServerTransport,
public I_Service
{
public:
TcpAsioServerTransport(int port);
ServerTransportPtr clone();
private:
typedef boost::shared_ptr<I_Session> SessionPtr;
typedef boost::weak_ptr<I_Session> SessionWeakPtr;
typedef TcpAsioSynchronizedSocket SynchronizedSocket;
typedef boost::shared_ptr<SynchronizedSocket> SynchronizedSocketPtr;
class TcpAsioProactor;
class SessionState : public boost::enable_shared_from_this<SessionState>, boost::noncopyable
{
public:
SessionState(TcpAsioServerTransport &transport, DemuxerPtr demuxerPtr, ReadWriteMutexPtr readWriteMutexPtr);
~SessionState();
void setSessionPtr(SessionPtr sessionPtr) { mSessionPtr = sessionPtr; }
SessionPtr getSessionPtr() { return mSessionPtr; }
void close();
void invokeAsyncAccept();
private:
void filteredRead(char *buffer, std::size_t bufferLen);
void filteredWrite(const char *buffer, std::size_t bufferLen);
void filteredReadWriteCompletion(const asio::error &error, size_t bytesTransferred);
void unfilteredReadWriteCompletion(std::size_t bytesTransferred, int error);
void setTransportFilters(const std::vector<FilterPtr> &filters);
void invokeAsyncRead();
void invokeAsyncWrite();
void onAccept(const asio::error& error);
void onReadWrite(const asio::error& error, size_t bytesTransferred);
void onReflectedReadWrite(const asio::error& error, size_t bytesTransferred);
// TODO: too many friends
friend class TcpAsioServerTransport;
friend class TcpAsioProactor;
enum State
{
Ready,
Accepting,
ReadingDataCount,
ReadingData,
WritingData
};
State mState;
std::vector<char> mReadBuffer;
std::size_t mReadBufferRemaining;
std::vector<char> mWriteBuffer;
std::size_t mWriteBufferRemaining;
//boost::weak_ptr<I_Session> mSessionWeakPtr;
boost::shared_ptr<I_Session> mSessionPtr;
std::vector<FilterPtr> mTransportFilters;
SynchronizedSocketPtr mSynchronizedSocketPtr;
SynchronizedSocketPtr mReflectorSocketPtr;
volatile bool mReflecting;
IpAddress mIpAddress;
TcpAsioServerTransport & mTransport;
};
typedef boost::shared_ptr<SessionState> SessionStatePtr;
typedef boost::weak_ptr<SessionState> SessionStateWeakPtr;
class TcpAsioProactor : public I_Proactor
{
private:
// TODO: too many frioends
friend class SessionState;
friend class TcpAsioServerTransport;
TcpAsioProactor(TcpAsioServerTransport &tcpAsioServerTransport);
void postRead();
void postWrite();
void postClose();
std::vector<char> & getWriteBuffer();
std::size_t getWriteOffset();
std::vector<char> & getReadBuffer();
std::size_t getReadOffset();
I_ServerTransport & getServerTransport();
const I_RemoteAddress & getRemoteAddress();
void setTransportFilters(const std::vector<FilterPtr> &filters);
SessionStateWeakPtr mSessionStateWeakPtr;
//SessionStatePtr mSessionStatePtr;
TcpAsioServerTransport & mTcpAsioServerTransport;
};
typedef boost::shared_ptr<TcpAsioProactor> TcpAsioProactorPtr;
SessionStatePtr createSessionState();
// I_ServerTransportEx implementation
ClientTransportAutoPtr createClientTransport(const I_Endpoint &endpoint);
boost::shared_ptr<I_Session> createServerSession(ClientTransportAutoPtr clientTransportAutoPtr);
ClientTransportAutoPtr createClientTransport(boost::shared_ptr<I_Session> sessionPtr);
bool reflect(boost::shared_ptr<I_Session> sessionPtr1, boost::shared_ptr<I_Session> sessionPtr2);
bool isConnected(boost::shared_ptr<I_Session> sessionPtr);
// I_Service implementation
void open();
void close();
bool cycle(int timeoutMs, const volatile bool &stopFlag);
void stop();
void stopCycle(const asio::error &error);
void onServiceAdded(RcfServer &server);
void onServiceRemoved(RcfServer &server);
void onServerOpen(RcfServer &server);
void onServerClose(RcfServer &server);
void onServerStart(RcfServer &server);
void onServerStop(RcfServer &server);
void setServer(RcfServer &server);
RcfServer & getServer();
I_SessionManager & getSessionManager();
private:
friend class SessionState;
DemuxerPtr mDemuxerPtr;
ReadWriteMutexPtr mReadWriteMutexPtr;
int mPort;
SocketAcceptorPtr mAcceptorPtr;
DeadlineTimerPtr mCycleTimerPtr;
bool mInterrupt;
volatile bool mStopFlag;
RcfServer * pServer;
std::set<SessionStatePtr> mSessionStates;
};
} // namespace RCF
#endif // ! INCLUDE_RCF_TCPASIOSERVERTRANSPORT_HPP