Click here to Skip to main content
15,886,199 members
Articles / Programming Languages / C++

RCF - Interprocess Communication for C++

Rate me:
Please Sign up or sign in to vote.
4.94/5 (147 votes)
25 Oct 2011CPOL20 min read 4.6M   8.4K   331  
A server/client IPC framework, using the C++ preprocessor as an IDL compiler.
//*****************************************************************************
// RCF - Remote Call Framework
// Copyright (c) 2005. All rights reserved.
// Developed by Jarl Lindrud.
// Contact: jlindrud@hotmail.com .
//*****************************************************************************

#ifndef INCLUDE_RCF_TCPIOCPSERVERTRANSPORT_HPP
#define INCLUDE_RCF_TCPIOCPSERVERTRANSPORT_HPP

#include <map>
#include <memory>
#include <vector>

#include <boost/noncopyable.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/weak_ptr.hpp>

#include <RCF/AsyncFilter.hpp>
#include <RCF/IpAddress.hpp>
#include <RCF/IpServerTransport.hpp>
#include <RCF/ServerTask.hpp>
#include <RCF/ServerTransport.hpp>
#include <RCF/Service.hpp>
#include <RCF/ThreadLibrary.hpp>

namespace RCF {

    class RcfServer;

    class Iocp
    {
    public:
        Iocp(int nMaxConcurrency = -1);
        ~Iocp();
        BOOL Create(int nMaxConcurrency = 0);
        BOOL AssociateDevice(HANDLE hDevice, ULONG_PTR CompKey);
        BOOL AssociateSocket(SOCKET hSocket, ULONG_PTR CompKey);
        BOOL PostStatus(ULONG_PTR CompKey, DWORD dwNumBytes = 0, OVERLAPPED* po = NULL) ;
        BOOL GetStatus(ULONG_PTR* pCompKey, PDWORD pdwNumBytes, OVERLAPPED** ppo, DWORD dwMilliseconds = INFINITE);

    private:
        HANDLE m_hIOCP;
    };

    class WsaRecvFunctor
    {
    public:
        WsaRecvFunctor(WSAOVERLAPPED *pOverlapped, const int &fd, const bool &zombie,  ReadWriteMutex &rwm);
        void operator()(char *buffer, std::size_t bufferLen);
        Filter::ReadFunction getReadFunction();
        int getError();

    private:
        const int &fd;
        const bool &zombie;
        ReadWriteMutex &rwm;
        WSAOVERLAPPED *mpOverlapped;
        int mError;
    };

    class WsaSendFunctor
    {
    public:
        WsaSendFunctor(WSAOVERLAPPED *pOverlapped, const int &fd, const bool &zombie, ReadWriteMutex &rwm);
        void operator()(const char *buffer, std::size_t bufferLen);
        Filter::WriteFunction getWriteFunction();
        int getError();

    private:
        const int &fd;
        const bool &zombie;
        ReadWriteMutex &rwm;
        WSAOVERLAPPED *mpOverlapped;
        int mError;
    };

    // Synchronization -
    // SessionState::fd is immutable, and is only physically closed (closesocket()) from the SessionState destructor, which is thread-safe
    // since we only refer to SessionState object's through shared_ptr's. Any thread wishing to close a session must set the 
    // SessionState::zombie flag. Eventually the SessionState will be ejected from the server by a server thread, and the destructor will close the connection.
    // Ownership of the fd can be taken from the SessionState, by setting ownFd to false. The server-wide read_write_mutex rwm is used to 
    // synchronize reads and writes of ownFd and zombie for all SessionState objects.

    // SessionState's with set zombie flags cannot be summarily removed from the servers session map, since that might delete the SessionState object
    // while the iocp is polling it, => core dump. They can only be safely removed when it is known that the iocp is not polling them.

    class TcpIocpServerTransport;
    
    typedef boost::shared_ptr<TcpIocpServerTransport> TcpIocpServerTransportPtr;

    class TcpIocpServerTransport : 
        public I_ServerTransport, 
        public I_ServerTransportEx, 
        public I_IpServerTransport,
        public I_Service,
        boost::noncopyable
    {
    private:
        typedef int Fd;
        typedef boost::shared_ptr<I_Session> SessionPtr; //NB: not the same as RCF::SessionPtr!
        class SessionState;
        typedef boost::shared_ptr<SessionState> SessionStatePtr;
        typedef boost::weak_ptr<SessionState> SessionStateWeakPtr;

        class SessionState : public OVERLAPPED
        {
        public:

            enum State {
                Accepting,
                ReadingDataCount,
                ReadingData,
                WritingData,
                Ready
            };

            SessionState(TcpIocpServerTransport &transport, Fd fd);
            ~SessionState();
            void setTransportFilters(const std::vector<FilterPtr> &filters);
            void clearOverlapped();
            bool isReflecting();
            const I_RemoteAddress &getRemoteAddress();
            sockaddr_in getRemoteSockAddr() { return remoteAddress.getSockAddr(); }

            Fd getFd() const;
            void setLocalAddress(const IpAddress &ipAddress) { localAddress = ipAddress; }
            IpAddress getLocalAddress() const { return localAddress; }
            void setRemoteAddress(const IpAddress &ipAddress) { remoteAddress = ipAddress; }
            std::vector<char> &getReadBuffer() { return readBuffer; }
            std::size_t getReadBufferRemaining() const { return readBufferRemaining; }
            void setReadBufferRemaining(std::size_t readBufferRemaining) { this->readBufferRemaining = readBufferRemaining; }
            std::vector<char> &getWriteBuffer() { return writeBuffer; }
            std::size_t getWriteBufferRemaining() const { return writeBufferRemaining; }
            void setWriteBufferRemaining(std::size_t writeBufferRemaining) { this->writeBufferRemaining = writeBufferRemaining; }
            State getState() const { return state; }
            void setState(State state) { this->state = state; }
            SessionPtr getSessionPtr() const { return sessionPtr; }
            void setSessionPtr(SessionPtr sessionPtr) { this->sessionPtr = sessionPtr; }
            void setWeakThisPtr(SessionStateWeakPtr sessionStateWeakPtr) { this->weakThisPtr = sessionStateWeakPtr; }
            SessionStateWeakPtr getWeakThisPtr() const { return weakThisPtr; }
            Fd getReflectionFd() const { return reflectionFd; }
            void setReflectionFd(Fd reflectionFd) { this->reflectionFd = reflectionFd; }
            SessionStateWeakPtr getReflectionSessionStateWeakPtr() const { return reflectionSessionStateWeakPtr; }
            void setReflectionSessionStateWeakPtr(SessionStateWeakPtr sessionStateWeakPtr) { this->reflectionSessionStateWeakPtr = sessionStateWeakPtr; }

            int read(char *buffer, std::size_t bufferLen);
            int write(char *buffer, std::size_t bufferLen);
            void onReadWriteCompleted(std::size_t bytesTransferred, int error);

            bool ownFd; // synchronized through rwm
            bool zombie; // synchronized through rwm

        private:

            // data structures for reflecting. NB: not synchronized!
            int reflectionFd;
            SessionStateWeakPtr reflectionSessionStateWeakPtr;

            State state;
            SessionPtr sessionPtr;
            std::vector<char> readBuffer;
            std::size_t readBufferRemaining;
            std::vector<char> writeBuffer;
            std::size_t writeBufferRemaining;
            const Fd fd;
            SessionStateWeakPtr weakThisPtr;
            std::vector<FilterPtr> mTransportFilters;
            WsaRecvFunctor wsaRecvFunctor;
            WsaSendFunctor wsaSendFunctor;
            TcpIocpServerTransport &transport;
            IpAddress localAddress;
            IpAddress remoteAddress;
        };

        friend class TcpIocpServerTransport::SessionState;

        class TcpIocpProactor : public I_Proactor
        {
        public:
            TcpIocpProactor(TcpIocpServerTransport &transport, boost::shared_ptr<SessionState> sessionStatePtr);
            void postRead();
            void postWrite();
            void postClose();
            std::vector<char> &getWriteBuffer();
            std::size_t getWriteOffset();
            std::vector<char> &getReadBuffer();
            std::size_t getReadOffset();
            I_ServerTransport &getServerTransport();
            SessionState &getSessionState();
            SessionStatePtr getSessionStatePtr();
            const I_RemoteAddress &getRemoteAddress();
            void setTransportFilters(const std::vector<FilterPtr> &filters);
        private:
            TcpIocpServerTransport &transport;
            boost::weak_ptr<SessionState> sessionStatePtr; // using a shared_ptr here would make a cycle SessionState->Session->Proactor->SessionState
        };

        friend class TcpIocpServerTransport::TcpIocpProactor;
    
    public:
        TcpIocpServerTransport(int port = 0);
        ServerTransportPtr clone() { return ServerTransportPtr( new TcpIocpServerTransport(port) ); }

    public:
        void open();
        void close();
        void cycle(int timeoutMs, const volatile bool &stopFlag);
        bool cycleTransportAndServer(RcfServer &server, int timeoutMs, const volatile bool &stopFlag);

        void setPort(int port);
        int getPort();
        
        void setMaxPendingConnectionCount(unsigned int maxPendingConnectionCount);
        unsigned int getMaxPendingConnectionCount();

        void setSessionManager(I_SessionManager &sessionManager);
        I_SessionManager &getSessionManager();
 
    private:
        Fd hash(Fd fd);
        SessionStatePtr createSession(int fd);
        bool monitorSession(SessionStatePtr sessionStatePtr);
        bool unmonitorSession(SessionStatePtr sessionStatePtr);
        void closeSession(SessionStatePtr sessionStatePtr);
        void transition(SessionStatePtr sessionStatePtr);
        void reflectSession(SessionStatePtr sessionStatePtr, DWORD bytesRead, ULONG_PTR completionKey);
        bool isReflecting(SessionStatePtr sessionStatePtr);
        void stopAccepts();
        bool cycleAccepts(int timeoutMs, const volatile bool &stopFlag);
        void generateAccepts();
        
        void onReadWriteCompleted(SessionStatePtr sessionStatePtr, std::size_t bytesTransferred, int error);
        void onAcceptCompleted(SessionStatePtr sessionStatePtr);
        //Filter::ReadWriteCompletionCallback getReadWriteCompletionCallback();
        void postWrite(SessionStatePtr sessionStatePtr);
        void postRead(SessionStatePtr sessionStatePtr);
        SessionStatePtr getSessionStatePtr(Fd fd);

        // I_ServerTransportEx implementation
    private:
        void externalCloseSession(SessionStatePtr sessionStatePtr);
        void externalCloseSession(SessionStatePtr sessionStatePtr, int &fd);
        void externalCloseSession0(SessionStatePtr sessionStatePtr, int &fd);
        std::auto_ptr<I_ClientTransport> createClientTransport(const I_Endpoint &endpoint);
        ClientTransportAutoPtr createClientTransport(boost::shared_ptr<I_Session> sessionPtr);
        boost::shared_ptr<I_Session> createServerSession(ClientTransportAutoPtr clientTransportAutoPtr);
        bool reflect(boost::shared_ptr<I_Session> sessionPtr1, boost::shared_ptr<I_Session> sessionPtr2);
        bool reflect(SessionStatePtr sessionStatePtr1, SessionStatePtr sessionStatePtr2);
        bool isConnected(boost::shared_ptr<I_Session> sessionPtr);

        // I_IpServerTransport implementation
    private:
        //void setNetworkInterface(const std::string &networkInterface);
        //std::string  getNetworkInterface();
        //void setAllowedClientIps(const std::vector<std::string> &allowedClientIps);
        //std::vector<std::string> getAllowedClientIps();
        //bool isClientIpAllowed(const sockaddr_in &addr);
        
        // I_Service implementation
    private:
        void onServiceAdded(RcfServer &server);
        void onServiceRemoved(RcfServer &server);
        void onServerStart(RcfServer &server);
        void onServerStop(RcfServer &server);
        void onServerOpen(RcfServer &server);
        void onServerClose(RcfServer &server);
        bool mOpen;

        // member variables
    private:
        typedef boost::shared_ptr<Mutex> MutexPtr;
        typedef std::map<Fd, SessionStatePtr> SessionStateMap;
        typedef std::vector< std::pair<MutexPtr, SessionStateMap > > SynchronizedSessionMaps;

        SynchronizedSessionMaps sessionMaps;
        I_SessionManager *pSessionManager;
        std::string acceptorInterface;
        int acceptorPort;
        Fd acceptorFd;
        int port;
        unsigned int maxPendingConnectionCount;
        unsigned int fdPartitionCount;
        std::auto_ptr<Iocp> iocp;
        ReadWriteMutex rwm;

        Mutex mQueuedAcceptsMutex;
        Condition mQueuedAcceptsCondition;
        unsigned int mQueuedAccepts; // to be accessed only through InterlockedIncrement()/InterlockedDecrement()
        unsigned int mQueuedAcceptsThreshold;
        unsigned int mQueuedAcceptsAugment;

        LPFN_ACCEPTEX mlpfnAcceptEx;
        LPFN_GETACCEPTEXSOCKADDRS mlpfnGetAcceptExSockAddrs;

        volatile bool mStopFlag;
    };
    
} // namespace RCF

#endif // ! INCLUDE_RCF_TCPIOCPSERVERTRANSPORT_HPP

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Australia Australia
Software developer, from Sweden and now living in Canberra, Australia, working on distributed C++ applications. When he is not programming, Jarl enjoys skiing and playing table tennis. He derives immense satisfaction from referring to himself in third person.

Comments and Discussions