Click here to Skip to main content
Click here to Skip to main content
Add your own
alternative version

RCF - Interprocess Communication for C++

, 25 Oct 2011 CPOL
A server/client IPC framework, using the C++ preprocessor as an IDL compiler.
rcf-04.zip
RCF-0.4
demo
vs2003
RCF
Client
Server
doc
html
class_r_c_f_1_1_exception.png
class_r_c_f_1_1_filter.png
class_r_c_f_1_1_filter_factory.png
class_r_c_f_1_1_filter_service.png
class_r_c_f_1_1_identity_filter.png
class_r_c_f_1_1_identity_filter_factory.png
class_r_c_f_1_1_i___client_transport.png
class_r_c_f_1_1_i___endpoint.png
class_r_c_f_1_1_i___service.png
class_r_c_f_1_1_i___session_manager.png
class_r_c_f_1_1_multicast_client_transport.png
class_r_c_f_1_1_object_factory_service.png
class_r_c_f_1_1_open_ssl_encryption_filter.png
class_r_c_f_1_1_open_ssl_encryption_filter_factory.png
class_r_c_f_1_1_publishing_service.png
class_r_c_f_1_1_rcf_server.png
class_r_c_f_1_1_remote_exception.png
class_r_c_f_1_1_subscription_service.png
class_r_c_f_1_1_tcp_endpoint.png
class_r_c_f_1_1_udp_endpoint.png
class_r_c_f_1_1_zlib_stateful_compression_filter_factory.png
class_r_c_f_1_1_zlib_stateless_compression_filter_factory.png
doxygen.png
ftv2blank.png
ftv2doc.png
ftv2folderclosed.png
ftv2folderopen.png
ftv2lastnode.png
ftv2link.png
ftv2mlastnode.png
ftv2mnode.png
ftv2node.png
ftv2plastnode.png
ftv2pnode.png
ftv2vertline.png
tab_b.gif
tab_l.gif
tab_r.gif
latex
annotated.tex
class_r_c_f_1_1_client_stub.tex
class_r_c_f_1_1_exception.eps
class_r_c_f_1_1_exception.tex
class_r_c_f_1_1_filter.eps
class_r_c_f_1_1_filter.tex
class_r_c_f_1_1_filter_description.tex
class_r_c_f_1_1_filter_factory.eps
class_r_c_f_1_1_filter_factory.tex
class_r_c_f_1_1_filter_service.eps
class_r_c_f_1_1_filter_service.tex
class_r_c_f_1_1_identity_filter.eps
class_r_c_f_1_1_identity_filter.tex
class_r_c_f_1_1_identity_filter_factory.eps
class_r_c_f_1_1_identity_filter_factory.tex
class_r_c_f_1_1_i___client_transport.eps
class_r_c_f_1_1_i___client_transport.tex
class_r_c_f_1_1_i___endpoint.eps
class_r_c_f_1_1_i___endpoint.tex
class_r_c_f_1_1_i___proactor.tex
class_r_c_f_1_1_i___rcf_client.tex
class_r_c_f_1_1_i___server_transport.tex
class_r_c_f_1_1_i___server_transport_ex.tex
class_r_c_f_1_1_i___service.eps
class_r_c_f_1_1_i___service.tex
class_r_c_f_1_1_i___session.tex
class_r_c_f_1_1_i___session_manager.eps
class_r_c_f_1_1_i___session_manager.tex
class_r_c_f_1_1_multicast_client_transport.eps
class_r_c_f_1_1_multicast_client_transport.tex
class_r_c_f_1_1_object_factory_service.eps
class_r_c_f_1_1_object_factory_service.tex
class_r_c_f_1_1_open_ssl_encryption_filter.eps
class_r_c_f_1_1_open_ssl_encryption_filter.tex
class_r_c_f_1_1_open_ssl_encryption_filter_factory.eps
class_r_c_f_1_1_open_ssl_encryption_filter_factory.tex
class_r_c_f_1_1_publishing_service.eps
class_r_c_f_1_1_publishing_service.tex
class_r_c_f_1_1_rcf_server.eps
class_r_c_f_1_1_rcf_server.tex
class_r_c_f_1_1_remote_exception.eps
class_r_c_f_1_1_remote_exception.tex
class_r_c_f_1_1_subscription_service.eps
class_r_c_f_1_1_subscription_service.tex
class_r_c_f_1_1_tcp_endpoint.eps
class_r_c_f_1_1_tcp_endpoint.tex
class_r_c_f_1_1_udp_endpoint.eps
class_r_c_f_1_1_udp_endpoint.tex
class_r_c_f_1_1_zlib_stateful_compression_filter.tex
class_r_c_f_1_1_zlib_stateful_compression_filter_factory.eps
class_r_c_f_1_1_zlib_stateful_compression_filter_factory.tex
class_r_c_f_1_1_zlib_stateless_compression_filter.tex
class_r_c_f_1_1_zlib_stateless_compression_filter_factory.eps
class_r_c_f_1_1_zlib_stateless_compression_filter_factory.tex
dirs.tex
dir_G_3A_2FDevelopment_2Fbuild_2Fscripts_2Fwin_2FBuildRcf2_5FOutput_2FRCF_2D0_2E4_2Finclude_2F.tex
dir_G_3A_2FDevelopment_2Fbuild_2Fscripts_2Fwin_2FBuildRcf2_5FOutput_2FRCF_2D0_2E4_2Finclude_2FRCF_2F.tex
dir_G_3A_2FDevelopment_2Fbuild_2Fscripts_2Fwin_2FBuildRcf2_5FOutput_2FRCF_2D0_2E4_2Finclude_2FRCF_2FProtocol_2F.tex
doxygen.sty
Helvetica.ttf
hierarchy.tex
Makefile
refman.tex
struct_r_c_f_1_1_filter_id_comparison.tex
include
RCF
Marshal.inl
Protocol
RcfServer.inl
ServerStub.inl
test
util
Platform
Machine
SPARC
x86
OS
Unix
Windows
Threads
SF
src
RCF
Protocol
SF
test
borland
Jamfile
Jamrules
Jamfile
Jamrules
vs2003
RCF
RCF
RCFTest
client.pem
server.pem
rcf-09c.zip
RCF-0.9c
demo
vs2003
RCF
Client
Server
include
RCF
Marshal.inl
Protocol
RcfServer.inl
ServerStub.inl
test
util
Platform
Machine
SPARC
x86
OS
Unix
Windows
Threads
SF
src
RCF
Protocol
util
SF
test
bcc
Jamfile
Jamrules
data
caCertA.pem
caCertB.pem
certA.pem
certB.pem
ssCert1.pem
ssCert2.pem
Jamfile
Jamrules
vc6
Jamfile
Jamrules
//*****************************************************************************
// RCF - Remote Call Framework
// Copyright (c) 2005. All rights reserved.
// Developed by Jarl Lindrud.
// Contact: jlindrud@hotmail.com .
//*****************************************************************************

#ifndef INCLUDE_RCF_TCPIOCPSERVERTRANSPORT_HPP
#define INCLUDE_RCF_TCPIOCPSERVERTRANSPORT_HPP

#include <map>
#include <memory>
#include <vector>

#include <boost/noncopyable.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/weak_ptr.hpp>

#include <RCF/AsyncFilter.hpp>
#include <RCF/IpAddress.hpp>
#include <RCF/IpServerTransport.hpp>
#include <RCF/ServerTask.hpp>
#include <RCF/ServerTransport.hpp>
#include <RCF/Service.hpp>
#include <RCF/ThreadLibrary.hpp>

namespace RCF {

    class RcfServer;

    class Iocp
    {
    public:
        Iocp(int nMaxConcurrency = -1);
        ~Iocp();
        BOOL Create(int nMaxConcurrency = 0);
        BOOL AssociateDevice(HANDLE hDevice, ULONG_PTR CompKey);
        BOOL AssociateSocket(SOCKET hSocket, ULONG_PTR CompKey);
        BOOL PostStatus(ULONG_PTR CompKey, DWORD dwNumBytes = 0, OVERLAPPED* po = NULL) ;
        BOOL GetStatus(ULONG_PTR* pCompKey, PDWORD pdwNumBytes, OVERLAPPED** ppo, DWORD dwMilliseconds = INFINITE);

    private:
        HANDLE m_hIOCP;
    };

    class WsaRecvFunctor
    {
    public:
        WsaRecvFunctor(WSAOVERLAPPED *pOverlapped, const int &fd, const bool &zombie,  ReadWriteMutex &rwm);
        void operator()(char *buffer, std::size_t bufferLen);
        Filter::ReadFunction getReadFunction();
        int getError();

    private:
        const int &fd;
        const bool &zombie;
        ReadWriteMutex &rwm;
        WSAOVERLAPPED *mpOverlapped;
        int mError;
    };

    class WsaSendFunctor
    {
    public:
        WsaSendFunctor(WSAOVERLAPPED *pOverlapped, const int &fd, const bool &zombie, ReadWriteMutex &rwm);
        void operator()(const char *buffer, std::size_t bufferLen);
        Filter::WriteFunction getWriteFunction();
        int getError();

    private:
        const int &fd;
        const bool &zombie;
        ReadWriteMutex &rwm;
        WSAOVERLAPPED *mpOverlapped;
        int mError;
    };

    // Synchronization -
    // SessionState::fd is immutable, and is only physically closed (closesocket()) from the SessionState destructor, which is thread-safe
    // since we only refer to SessionState object's through shared_ptr's. Any thread wishing to close a session must set the 
    // SessionState::zombie flag. Eventually the SessionState will be ejected from the server by a server thread, and the destructor will close the connection.
    // Ownership of the fd can be taken from the SessionState, by setting ownFd to false. The server-wide read_write_mutex rwm is used to 
    // synchronize reads and writes of ownFd and zombie for all SessionState objects.

    // SessionState's with set zombie flags cannot be summarily removed from the servers session map, since that might delete the SessionState object
    // while the iocp is polling it, => core dump. They can only be safely removed when it is known that the iocp is not polling them.

    class TcpIocpServerTransport;
    
    typedef boost::shared_ptr<TcpIocpServerTransport> TcpIocpServerTransportPtr;

    class TcpIocpServerTransport : 
        public I_ServerTransport, 
        public I_ServerTransportEx, 
        public I_IpServerTransport,
        public I_Service,
        boost::noncopyable
    {
    private:
        typedef int Fd;
        typedef boost::shared_ptr<I_Session> SessionPtr; //NB: not the same as RCF::SessionPtr!
        class SessionState;
        typedef boost::shared_ptr<SessionState> SessionStatePtr;
        typedef boost::weak_ptr<SessionState> SessionStateWeakPtr;

        class SessionState : public OVERLAPPED
        {
        public:

            enum State {
                Accepting,
                ReadingDataCount,
                ReadingData,
                WritingData,
                Ready
            };

            SessionState(TcpIocpServerTransport &transport, Fd fd);
            ~SessionState();
            void setTransportFilters(const std::vector<FilterPtr> &filters);
            void clearOverlapped();
            bool isReflecting();
            const I_RemoteAddress &getRemoteAddress();
            sockaddr_in getRemoteSockAddr() { return remoteAddress.getSockAddr(); }

            Fd getFd() const;
            void setLocalAddress(const IpAddress &ipAddress) { localAddress = ipAddress; }
            IpAddress getLocalAddress() const { return localAddress; }
            void setRemoteAddress(const IpAddress &ipAddress) { remoteAddress = ipAddress; }
            std::vector<char> &getReadBuffer() { return readBuffer; }
            std::size_t getReadBufferRemaining() const { return readBufferRemaining; }
            void setReadBufferRemaining(std::size_t readBufferRemaining) { this->readBufferRemaining = readBufferRemaining; }
            std::vector<char> &getWriteBuffer() { return writeBuffer; }
            std::size_t getWriteBufferRemaining() const { return writeBufferRemaining; }
            void setWriteBufferRemaining(std::size_t writeBufferRemaining) { this->writeBufferRemaining = writeBufferRemaining; }
            State getState() const { return state; }
            void setState(State state) { this->state = state; }
            SessionPtr getSessionPtr() const { return sessionPtr; }
            void setSessionPtr(SessionPtr sessionPtr) { this->sessionPtr = sessionPtr; }
            void setWeakThisPtr(SessionStateWeakPtr sessionStateWeakPtr) { this->weakThisPtr = sessionStateWeakPtr; }
            SessionStateWeakPtr getWeakThisPtr() const { return weakThisPtr; }
            Fd getReflectionFd() const { return reflectionFd; }
            void setReflectionFd(Fd reflectionFd) { this->reflectionFd = reflectionFd; }
            SessionStateWeakPtr getReflectionSessionStateWeakPtr() const { return reflectionSessionStateWeakPtr; }
            void setReflectionSessionStateWeakPtr(SessionStateWeakPtr sessionStateWeakPtr) { this->reflectionSessionStateWeakPtr = sessionStateWeakPtr; }

            int read(char *buffer, std::size_t bufferLen);
            int write(char *buffer, std::size_t bufferLen);
            void onReadWriteCompleted(std::size_t bytesTransferred, int error);

            bool ownFd; // synchronized through rwm
            bool zombie; // synchronized through rwm

        private:

            // data structures for reflecting. NB: not synchronized!
            int reflectionFd;
            SessionStateWeakPtr reflectionSessionStateWeakPtr;

            State state;
            SessionPtr sessionPtr;
            std::vector<char> readBuffer;
            std::size_t readBufferRemaining;
            std::vector<char> writeBuffer;
            std::size_t writeBufferRemaining;
            const Fd fd;
            SessionStateWeakPtr weakThisPtr;
            std::vector<FilterPtr> mTransportFilters;
            WsaRecvFunctor wsaRecvFunctor;
            WsaSendFunctor wsaSendFunctor;
            TcpIocpServerTransport &transport;
            IpAddress localAddress;
            IpAddress remoteAddress;
        };

        friend class TcpIocpServerTransport::SessionState;

        class TcpIocpProactor : public I_Proactor
        {
        public:
            TcpIocpProactor(TcpIocpServerTransport &transport, boost::shared_ptr<SessionState> sessionStatePtr);
            void postRead();
            void postWrite();
            void postClose();
            std::vector<char> &getWriteBuffer();
            std::size_t getWriteOffset();
            std::vector<char> &getReadBuffer();
            std::size_t getReadOffset();
            I_ServerTransport &getServerTransport();
            SessionState &getSessionState();
            SessionStatePtr getSessionStatePtr();
            const I_RemoteAddress &getRemoteAddress();
            void setTransportFilters(const std::vector<FilterPtr> &filters);
        private:
            TcpIocpServerTransport &transport;
            boost::weak_ptr<SessionState> sessionStatePtr; // using a shared_ptr here would make a cycle SessionState->Session->Proactor->SessionState
        };

        friend class TcpIocpServerTransport::TcpIocpProactor;
    
    public:
        TcpIocpServerTransport(int port = 0);
        ServerTransportPtr clone() { return ServerTransportPtr( new TcpIocpServerTransport(port) ); }

    public:
        void open();
        void close();
        void cycle(int timeoutMs, const volatile bool &stopFlag);
        bool cycleTransportAndServer(RcfServer &server, int timeoutMs, const volatile bool &stopFlag);

        void setPort(int port);
        int getPort();
        
        void setMaxPendingConnectionCount(unsigned int maxPendingConnectionCount);
        unsigned int getMaxPendingConnectionCount();

        void setSessionManager(I_SessionManager &sessionManager);
        I_SessionManager &getSessionManager();
 
    private:
        Fd hash(Fd fd);
        SessionStatePtr createSession(int fd);
        bool monitorSession(SessionStatePtr sessionStatePtr);
        bool unmonitorSession(SessionStatePtr sessionStatePtr);
        void closeSession(SessionStatePtr sessionStatePtr);
        void transition(SessionStatePtr sessionStatePtr);
        void reflectSession(SessionStatePtr sessionStatePtr, DWORD bytesRead, ULONG_PTR completionKey);
        bool isReflecting(SessionStatePtr sessionStatePtr);
        void stopAccepts();
        bool cycleAccepts(int timeoutMs, const volatile bool &stopFlag);
        void generateAccepts();
        
        void onReadWriteCompleted(SessionStatePtr sessionStatePtr, std::size_t bytesTransferred, int error);
        void onAcceptCompleted(SessionStatePtr sessionStatePtr);
        //Filter::ReadWriteCompletionCallback getReadWriteCompletionCallback();
        void postWrite(SessionStatePtr sessionStatePtr);
        void postRead(SessionStatePtr sessionStatePtr);
        SessionStatePtr getSessionStatePtr(Fd fd);

        // I_ServerTransportEx implementation
    private:
        void externalCloseSession(SessionStatePtr sessionStatePtr);
        void externalCloseSession(SessionStatePtr sessionStatePtr, int &fd);
        void externalCloseSession0(SessionStatePtr sessionStatePtr, int &fd);
        std::auto_ptr<I_ClientTransport> createClientTransport(const I_Endpoint &endpoint);
        ClientTransportAutoPtr createClientTransport(boost::shared_ptr<I_Session> sessionPtr);
        boost::shared_ptr<I_Session> createServerSession(ClientTransportAutoPtr clientTransportAutoPtr);
        bool reflect(boost::shared_ptr<I_Session> sessionPtr1, boost::shared_ptr<I_Session> sessionPtr2);
        bool reflect(SessionStatePtr sessionStatePtr1, SessionStatePtr sessionStatePtr2);
        bool isConnected(boost::shared_ptr<I_Session> sessionPtr);

        // I_IpServerTransport implementation
    private:
        //void setNetworkInterface(const std::string &networkInterface);
        //std::string  getNetworkInterface();
        //void setAllowedClientIps(const std::vector<std::string> &allowedClientIps);
        //std::vector<std::string> getAllowedClientIps();
        //bool isClientIpAllowed(const sockaddr_in &addr);
        
        // I_Service implementation
    private:
        void onServiceAdded(RcfServer &server);
        void onServiceRemoved(RcfServer &server);
        void onServerStart(RcfServer &server);
        void onServerStop(RcfServer &server);
        void onServerOpen(RcfServer &server);
        void onServerClose(RcfServer &server);
        bool mOpen;

        // member variables
    private:
        typedef boost::shared_ptr<Mutex> MutexPtr;
        typedef std::map<Fd, SessionStatePtr> SessionStateMap;
        typedef std::vector< std::pair<MutexPtr, SessionStateMap > > SynchronizedSessionMaps;

        SynchronizedSessionMaps sessionMaps;
        I_SessionManager *pSessionManager;
        std::string acceptorInterface;
        int acceptorPort;
        Fd acceptorFd;
        int port;
        unsigned int maxPendingConnectionCount;
        unsigned int fdPartitionCount;
        std::auto_ptr<Iocp> iocp;
        ReadWriteMutex rwm;

        Mutex mQueuedAcceptsMutex;
        Condition mQueuedAcceptsCondition;
        unsigned int mQueuedAccepts; // to be accessed only through InterlockedIncrement()/InterlockedDecrement()
        unsigned int mQueuedAcceptsThreshold;
        unsigned int mQueuedAcceptsAugment;

        LPFN_ACCEPTEX mlpfnAcceptEx;
        LPFN_GETACCEPTEXSOCKADDRS mlpfnGetAcceptExSockAddrs;

        volatile bool mStopFlag;
    };
    
} // namespace RCF

#endif // ! INCLUDE_RCF_TCPIOCPSERVERTRANSPORT_HPP

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Jarl Lindrud

Australia Australia
Software developer, ex-resident of Sweden and now living in Canberra, Australia, working on distributed C++ applications. Jarl enjoys programming, but prefers skiing and playing table tennis. He derives immense satisfaction from referring to himself in third person.

| Advertise | Privacy | Terms of Use | Mobile
Web01 | 2.8.150327.1 | Last Updated 25 Oct 2011
Article Copyright 2005 by Jarl Lindrud
Everything else Copyright © CodeProject, 1999-2015
Layout: fixed | fluid