//*****************************************************************************
// RCF - Remote Call Framework
// Copyright (c) 2005. All rights reserved.
// Developed by Jarl Lindrud.
// Contact: jlindrud@hotmail.com .
//*****************************************************************************
#ifndef INCLUDE_RCF_TCPIOCPSERVERTRANSPORT_HPP
#define INCLUDE_RCF_TCPIOCPSERVERTRANSPORT_HPP
#include <map>
#include <memory>
#include <vector>
#include <boost/noncopyable.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/weak_ptr.hpp>
#include <RCF/AsyncFilter.hpp>
#include <RCF/IpAddress.hpp>
#include <RCF/IpServerTransport.hpp>
#include <RCF/ServerTask.hpp>
#include <RCF/ServerTransport.hpp>
#include <RCF/Service.hpp>
#include <RCF/ThreadLibrary.hpp>
namespace RCF {
class RcfServer;
class Iocp
{
public:
Iocp(int nMaxConcurrency = -1);
~Iocp();
BOOL Create(int nMaxConcurrency = 0);
BOOL AssociateDevice(HANDLE hDevice, ULONG_PTR CompKey);
BOOL AssociateSocket(SOCKET hSocket, ULONG_PTR CompKey);
BOOL PostStatus(ULONG_PTR CompKey, DWORD dwNumBytes = 0, OVERLAPPED* po = NULL) ;
BOOL GetStatus(ULONG_PTR* pCompKey, PDWORD pdwNumBytes, OVERLAPPED** ppo, DWORD dwMilliseconds = INFINITE);
private:
HANDLE m_hIOCP;
};
class WsaRecvFunctor
{
public:
WsaRecvFunctor(WSAOVERLAPPED *pOverlapped, const int &fd, const bool &zombie, ReadWriteMutex &rwm);
void operator()(char *buffer, std::size_t bufferLen);
Filter::ReadFunction getReadFunction();
int getError();
private:
const int &fd;
const bool &zombie;
ReadWriteMutex &rwm;
WSAOVERLAPPED *mpOverlapped;
int mError;
};
class WsaSendFunctor
{
public:
WsaSendFunctor(WSAOVERLAPPED *pOverlapped, const int &fd, const bool &zombie, ReadWriteMutex &rwm);
void operator()(const char *buffer, std::size_t bufferLen);
Filter::WriteFunction getWriteFunction();
int getError();
private:
const int &fd;
const bool &zombie;
ReadWriteMutex &rwm;
WSAOVERLAPPED *mpOverlapped;
int mError;
};
// Synchronization -
// SessionState::fd is immutable, and is only physically closed (closesocket()) from the SessionState destructor, which is thread-safe
// since we only refer to SessionState object's through shared_ptr's. Any thread wishing to close a session must set the
// SessionState::zombie flag. Eventually the SessionState will be ejected from the server by a server thread, and the destructor will close the connection.
// Ownership of the fd can be taken from the SessionState, by setting ownFd to false. The server-wide read_write_mutex rwm is used to
// synchronize reads and writes of ownFd and zombie for all SessionState objects.
// SessionState's with set zombie flags cannot be summarily removed from the servers session map, since that might delete the SessionState object
// while the iocp is polling it, => core dump. They can only be safely removed when it is known that the iocp is not polling them.
class TcpIocpServerTransport;
typedef boost::shared_ptr<TcpIocpServerTransport> TcpIocpServerTransportPtr;
class TcpIocpServerTransport :
public I_ServerTransport,
public I_ServerTransportEx,
public I_IpServerTransport,
public I_Service,
boost::noncopyable
{
private:
typedef int Fd;
typedef boost::shared_ptr<I_Session> SessionPtr; //NB: not the same as RCF::SessionPtr!
class SessionState;
typedef boost::shared_ptr<SessionState> SessionStatePtr;
typedef boost::weak_ptr<SessionState> SessionStateWeakPtr;
class SessionState : public OVERLAPPED
{
public:
enum State {
Accepting,
ReadingDataCount,
ReadingData,
WritingData,
Ready
};
SessionState(TcpIocpServerTransport &transport, Fd fd);
~SessionState();
void setTransportFilters(const std::vector<FilterPtr> &filters);
void clearOverlapped();
bool isReflecting();
const I_RemoteAddress &getRemoteAddress();
sockaddr_in getRemoteSockAddr() { return remoteAddress.getSockAddr(); }
Fd getFd() const;
void setLocalAddress(const IpAddress &ipAddress) { localAddress = ipAddress; }
IpAddress getLocalAddress() const { return localAddress; }
void setRemoteAddress(const IpAddress &ipAddress) { remoteAddress = ipAddress; }
std::vector<char> &getReadBuffer() { return readBuffer; }
std::size_t getReadBufferRemaining() const { return readBufferRemaining; }
void setReadBufferRemaining(std::size_t readBufferRemaining) { this->readBufferRemaining = readBufferRemaining; }
std::vector<char> &getWriteBuffer() { return writeBuffer; }
std::size_t getWriteBufferRemaining() const { return writeBufferRemaining; }
void setWriteBufferRemaining(std::size_t writeBufferRemaining) { this->writeBufferRemaining = writeBufferRemaining; }
State getState() const { return state; }
void setState(State state) { this->state = state; }
SessionPtr getSessionPtr() const { return sessionPtr; }
void setSessionPtr(SessionPtr sessionPtr) { this->sessionPtr = sessionPtr; }
void setWeakThisPtr(SessionStateWeakPtr sessionStateWeakPtr) { this->weakThisPtr = sessionStateWeakPtr; }
SessionStateWeakPtr getWeakThisPtr() const { return weakThisPtr; }
Fd getReflectionFd() const { return reflectionFd; }
void setReflectionFd(Fd reflectionFd) { this->reflectionFd = reflectionFd; }
SessionStateWeakPtr getReflectionSessionStateWeakPtr() const { return reflectionSessionStateWeakPtr; }
void setReflectionSessionStateWeakPtr(SessionStateWeakPtr sessionStateWeakPtr) { this->reflectionSessionStateWeakPtr = sessionStateWeakPtr; }
int read(char *buffer, std::size_t bufferLen);
int write(char *buffer, std::size_t bufferLen);
void onReadWriteCompleted(std::size_t bytesTransferred, int error);
bool ownFd; // synchronized through rwm
bool zombie; // synchronized through rwm
private:
// data structures for reflecting. NB: not synchronized!
int reflectionFd;
SessionStateWeakPtr reflectionSessionStateWeakPtr;
State state;
SessionPtr sessionPtr;
std::vector<char> readBuffer;
std::size_t readBufferRemaining;
std::vector<char> writeBuffer;
std::size_t writeBufferRemaining;
const Fd fd;
SessionStateWeakPtr weakThisPtr;
std::vector<FilterPtr> mTransportFilters;
WsaRecvFunctor wsaRecvFunctor;
WsaSendFunctor wsaSendFunctor;
TcpIocpServerTransport &transport;
IpAddress localAddress;
IpAddress remoteAddress;
};
friend class TcpIocpServerTransport::SessionState;
class TcpIocpProactor : public I_Proactor
{
public:
TcpIocpProactor(TcpIocpServerTransport &transport, boost::shared_ptr<SessionState> sessionStatePtr);
void postRead();
void postWrite();
void postClose();
std::vector<char> &getWriteBuffer();
std::size_t getWriteOffset();
std::vector<char> &getReadBuffer();
std::size_t getReadOffset();
I_ServerTransport &getServerTransport();
SessionState &getSessionState();
SessionStatePtr getSessionStatePtr();
const I_RemoteAddress &getRemoteAddress();
void setTransportFilters(const std::vector<FilterPtr> &filters);
private:
TcpIocpServerTransport &transport;
boost::weak_ptr<SessionState> sessionStatePtr; // using a shared_ptr here would make a cycle SessionState->Session->Proactor->SessionState
};
friend class TcpIocpServerTransport::TcpIocpProactor;
public:
TcpIocpServerTransport(int port = 0);
ServerTransportPtr clone() { return ServerTransportPtr( new TcpIocpServerTransport(port) ); }
public:
void open();
void close();
void cycle(int timeoutMs, const volatile bool &stopFlag);
bool cycleTransportAndServer(RcfServer &server, int timeoutMs, const volatile bool &stopFlag);
void setPort(int port);
int getPort();
void setMaxPendingConnectionCount(unsigned int maxPendingConnectionCount);
unsigned int getMaxPendingConnectionCount();
void setSessionManager(I_SessionManager &sessionManager);
I_SessionManager &getSessionManager();
private:
Fd hash(Fd fd);
SessionStatePtr createSession(int fd);
bool monitorSession(SessionStatePtr sessionStatePtr);
bool unmonitorSession(SessionStatePtr sessionStatePtr);
void closeSession(SessionStatePtr sessionStatePtr);
void transition(SessionStatePtr sessionStatePtr);
void reflectSession(SessionStatePtr sessionStatePtr, DWORD bytesRead, ULONG_PTR completionKey);
bool isReflecting(SessionStatePtr sessionStatePtr);
void stopAccepts();
bool cycleAccepts(int timeoutMs, const volatile bool &stopFlag);
void generateAccepts();
void onReadWriteCompleted(SessionStatePtr sessionStatePtr, std::size_t bytesTransferred, int error);
void onAcceptCompleted(SessionStatePtr sessionStatePtr);
//Filter::ReadWriteCompletionCallback getReadWriteCompletionCallback();
void postWrite(SessionStatePtr sessionStatePtr);
void postRead(SessionStatePtr sessionStatePtr);
SessionStatePtr getSessionStatePtr(Fd fd);
// I_ServerTransportEx implementation
private:
void externalCloseSession(SessionStatePtr sessionStatePtr);
void externalCloseSession(SessionStatePtr sessionStatePtr, int &fd);
void externalCloseSession0(SessionStatePtr sessionStatePtr, int &fd);
std::auto_ptr<I_ClientTransport> createClientTransport(const I_Endpoint &endpoint);
ClientTransportAutoPtr createClientTransport(boost::shared_ptr<I_Session> sessionPtr);
boost::shared_ptr<I_Session> createServerSession(ClientTransportAutoPtr clientTransportAutoPtr);
bool reflect(boost::shared_ptr<I_Session> sessionPtr1, boost::shared_ptr<I_Session> sessionPtr2);
bool reflect(SessionStatePtr sessionStatePtr1, SessionStatePtr sessionStatePtr2);
bool isConnected(boost::shared_ptr<I_Session> sessionPtr);
// I_IpServerTransport implementation
private:
//void setNetworkInterface(const std::string &networkInterface);
//std::string getNetworkInterface();
//void setAllowedClientIps(const std::vector<std::string> &allowedClientIps);
//std::vector<std::string> getAllowedClientIps();
//bool isClientIpAllowed(const sockaddr_in &addr);
// I_Service implementation
private:
void onServiceAdded(RcfServer &server);
void onServiceRemoved(RcfServer &server);
void onServerStart(RcfServer &server);
void onServerStop(RcfServer &server);
void onServerOpen(RcfServer &server);
void onServerClose(RcfServer &server);
bool mOpen;
// member variables
private:
typedef boost::shared_ptr<Mutex> MutexPtr;
typedef std::map<Fd, SessionStatePtr> SessionStateMap;
typedef std::vector< std::pair<MutexPtr, SessionStateMap > > SynchronizedSessionMaps;
SynchronizedSessionMaps sessionMaps;
I_SessionManager *pSessionManager;
std::string acceptorInterface;
int acceptorPort;
Fd acceptorFd;
int port;
unsigned int maxPendingConnectionCount;
unsigned int fdPartitionCount;
std::auto_ptr<Iocp> iocp;
ReadWriteMutex rwm;
Mutex mQueuedAcceptsMutex;
Condition mQueuedAcceptsCondition;
unsigned int mQueuedAccepts; // to be accessed only through InterlockedIncrement()/InterlockedDecrement()
unsigned int mQueuedAcceptsThreshold;
unsigned int mQueuedAcceptsAugment;
LPFN_ACCEPTEX mlpfnAcceptEx;
LPFN_GETACCEPTEXSOCKADDRS mlpfnGetAcceptExSockAddrs;
volatile bool mStopFlag;
};
} // namespace RCF
#endif // ! INCLUDE_RCF_TCPIOCPSERVERTRANSPORT_HPP