Click here to Skip to main content
15,886,100 members
Articles / Programming Languages / C++

RMI for C++

Rate me:
Please Sign up or sign in to vote.
4.87/5 (113 votes)
6 Aug 2009CPOL8 min read 816.8K   4.6K   153  
User-friendly remote method invocation in C++.
//*****************************************************************************
// RCF - Remote Call Framework
// Copyright (c) 2005, Jarl Lindrud.
// Contact: jlindrud@hotmail.com .
//
// Distributed under the so-called MIT license, see accompanying file license.txt.
//*****************************************************************************

#ifndef _RCF_RCFSERVER_HPP
#define _RCF_RCFSERVER_HPP

#include <deque>
#include <map>
#include <memory>
#include <vector>

#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/noncopyable.hpp>
#include <boost/shared_ptr.hpp>

#include <RCF/ClientInfo.hpp>
#include <RCF/Connection.hpp>
#include <RCF/ServerStub.hpp>
#include <RCF/ThreadLibrary.hpp>
#include <RCF/Token.hpp>

namespace RCF {

    class TokenHash 
    {
    public:
        size_t operator()(const Token &token) const;
    };

    class TokenFactory : boost::noncopyable
    {
    public:
        TokenFactory(int tokenCount);
        Token getNextToken();
        void returnToken(Token token);
        std::vector<Token> &getTokenSpace();

    private:
        Token generateToken();

        int nextTokenId_;
        std::vector<Token> tokenSpace_;
        std::vector<Token> tokens_;
        Platform::Threads::mutex mutex_;
    };

    class I_StubFactory
    {
    public:
        virtual ~I_StubFactory() {}
        virtual boost::shared_ptr<I_ServerStub> makeServerStub() = 0;
        virtual std::string getObjectName() = 0;
    };

    template<typename T, typename I_T>
    class StubFactory : public I_StubFactory
    {
    public:
        boost::shared_ptr<I_ServerStub> makeServerStub();
        std::string getObjectName();
    };

    template<typename T, typename I_T>
    class SharedStubFactory : public I_StubFactory
    {
    public:
        boost::shared_ptr<I_ServerStub> makeServerStub();
        boost::shared_ptr<I_ServerStub> makeServerStub(T &x);
        boost::shared_ptr<I_ServerStub> makeServerStub(std::auto_ptr<T> px);
        boost::shared_ptr<I_ServerStub> makeServerStub(boost::shared_ptr<T> px);
        std::string getObjectName();
    };

    template<typename ServerT>
    class ObjectFactory : boost::noncopyable
    {
    public:
        ObjectFactory(ServerT &server);
        Token make(const std::string &objectName);

    private:
        ServerT &server_;
    };
    
    // Stub entry
    class StubEntry : boost::noncopyable
    {
    public:
        StubEntry();
        void reset(boost::shared_ptr<I_ServerStub> stub = boost::shared_ptr<I_ServerStub>(), bool expire = true);
        bool hasStub();
        I_ServerStub &getStub();
        unsigned int &getConnectionCount();
        unsigned int &getTimeStamp();
        bool getExpire() const;
        Platform::Threads::recursive_mutex &getMutex();

    private:
        boost::shared_ptr<I_ServerStub> stub_;
        unsigned int connectionCount_;
        unsigned int timeStamp_;
        bool expire_;
        Platform::Threads::recursive_mutex mutex_;
    };

    // Synchronization - stub entry
    class StubEntryAccess : boost::noncopyable 
    { 
    public: 
        StubEntryAccess(StubEntry &stubEntry);
    private:
        Platform::Threads::recursive_mutex::scoped_lock lock_;
    };

    // Stub entry reference count handler
    class StubEntryAddRef : boost::noncopyable
    {
    public:
        StubEntryAddRef();
        ~StubEntryAddRef();
        void reset(boost::shared_ptr<StubEntry> stubEntry = boost::shared_ptr<StubEntry>());
        
    private:
        void addRef();
        void subRef();
        boost::shared_ptr<StubEntry> stubEntry_;
    };

    struct Session
    {
    public:
        Session(int fd, const ClientInfo &clientInfo, unsigned int recvTimeoutMs) :
            fd(fd),
            clientInfo(clientInfo),
            connection(fd, recvTimeoutMs)
        {}

        int receive();
        int send();

        Connection &getConnection() { return connection; }
        StubEntryAddRef &getStubEntryAddRef() { return stubEntryAddRef; }

    private:
        int fd;
        ClientInfo clientInfo;
        Connection connection;
        StubEntryAddRef stubEntryAddRef;
    };

    class EndpointInfo
    {
    public:
        EndpointInfo();
        EndpointInfo(const std::string &name, const std::string &password, const ClientInfo &clientInfo);
        void addFd(int fd);
        int popFd();
        int frontFd();
        int fdCount();
        std::string getPassword() const;
        bool verifyAccess(const std::string &password, const ClientInfo &clientInfo);
        friend std::ostream &operator<<(std::ostream &os, const EndpointInfo &endpointInfo);

    private:
        std::string name;
        std::string password;
        ClientInfo clientInfo;
        std::deque<int> fds;
    };

    static const int NumberOfWorkerThreads = 1;
    static const int NumberOfTokens = 100;

    template<typename Server> class Multiplexer;
    
    class RcfServer : boost::noncopyable
    {
    private:
        typedef RCF::Session Session;
        friend class SessionManager;
        
    public:
        RcfServer(const std::string &ip, int port, const std::string &endpointName);
        RcfServer(int port);
        ~RcfServer();

        void start(bool wait = false);
        void stop(bool wait = true);
        void reset();

        // preferred form
        //typedef boost::function<void(RcfServer&)> StartCallback;
        // compatible form, needed for Borland C++ 5.5.1
        typedef boost::function1<void, RcfServer&> StartCallback;

        //void setStartCallback(StartCallback startCallback);

        void setStartCallback(StartCallback startCallback) 
        { 
            startCallback_ = startCallback;
        }

        template<typename T> void setStartCallback(void (T::*pfn)(RcfServer &), T &t)
        {
            // this function is implemented here (rather than in RcfServer.inl), because of borland c++ idiosyncracies
            setStartCallback( boost::bind(pfn, &t, _1) );
        }
        void onStart();
        
        template<typename InterfaceT, typename ImplementationT> void bind(bool share = false, std::string name = "");
        template<typename InterfaceT, typename ImplementationT> void bind(ImplementationT &x, std::string name = "");
        template<typename InterfaceT, typename ImplementationT> void bind(std::auto_ptr<ImplementationT> px, std::string name = "");
        template<typename InterfaceT, typename ImplementationT> void bind(boost::shared_ptr<ImplementationT> px, std::string name = "");

        void setAllowedClientIps(const std::vector<std::string> &ips);
        std::vector<std::string> getAllowedClientIps();

        void setNetworkInterface(const std::string &networkInterface);
        std::string getNetworkInterface();

    private:
        template<typename ImplementationT, typename InterfaceT> 
        void bindShared(std::string name, boost::shared_ptr<I_ServerStub> serverStub);

    private:
        friend class Multiplexer<RcfServer>;
        bool handleSession(Session &session);
        Session *createSession(int fd, const ClientInfo &clientInfo);
        void destroySession(Session *session);

    private:

        template<typename ServerT> friend class ObjectFactory;

        void waitForStopEvent();
        
        void sendResponse(Connection &connection, bool oneway);
        void serializeExceptionResponse(Connection &connection);
        
        void createObjectFactory();
        Token createClientStub( const std::string &objectName );
        Token createSharedClientStub( const std::string &objectName );
        bool isShared(const std::string &objectName);
        Token getSharedStubToken(const std::string &objectName);
       
        void incrementStubCount();
        void decrementStubCount();
        void resetStubCount();
        int getStubCount();
        void cleanup( unsigned int timeout = 60 );
        void allocateStubTable();
        void printServerDiagnostics();

    private:

        // Connection parameters
        const int port_;
        int getPort() const { return port_; }

        std::string ip_; // TODO
        std::string getIp() const { return ip_; } 

        static const unsigned int recvTimeoutMs_ = 1000;
        unsigned int getRecvTimeoutMs() const { return recvTimeoutMs_; }

        // Token factory
        TokenFactory tokenFactory_;
        TokenFactory &getTokenFactory();
        
        // Stub table
        typedef boost::shared_ptr<StubEntry> StubEntryPtr;
        typedef Platform::Library::hash_map_t< Token, StubEntryPtr, TokenHash >::Val StubTableT;
        StubTableT &getStubTable();
        StubTableT stubTable_;
        
        int stubCount_;
        Platform::Threads::recursive_mutex stubCountMutex_;
        Platform::Threads::try_mutex cleanupMutex_;
        
        // Multiplexer
        typedef Multiplexer<RcfServer> MultiplexerT;
        std::auto_ptr<MultiplexerT> multiplexer_;
        MultiplexerT &getMultiplexer();

        // Mutex for stop event
        Platform::Threads::mutex stop_mutex_;
        Platform::Threads::mutex &getStopMutex();

        // Condition for stop event
        Platform::Threads::condition stop_event_;
        Platform::Threads::condition &getStopEvent();

        // Flag to stop worker threads
        bool stop_;

        // Flag to indicate if server is running, used to prevent users from starting the server twice
        bool running_;
        
        StubEntry &getStubEntry(Token token);
        boost::shared_ptr<StubEntry> getStubEntryPtr(Token token);

        std::map< std::string, Token > sharedStubTokens;
        std::vector<Token> &getTokenSpace();
    
        // Stub registration and creation
        void registerStubFactory(const std::string &objectName, const std::string &desc, boost::shared_ptr<I_StubFactory> factory);
        I_StubFactory &getStubFactory( const std::string &objectName );

        typedef std::map< std::string, boost::shared_ptr<I_StubFactory> > StubFactoryTableT;
        StubFactoryTableT &getStubFactoryTable();
        StubFactoryTableT stubFactoryTable;

        // start callback
        StartCallback startCallback_;

        //*************************************
        // Endpoints
        std::string endpointName;
        std::string endpointPassword;
        std::string endpointIp;
        int endpointPort;

        void openEndpoint(const std::string &endpointName, std::string &endpointPassword);
        void closeEndpoint(const std::string &endpointName, const std::string &password);
        void reverseThisConnection(const std::string &endpointName, const std::string &password);
        void bindToEndpoint(const std::string &endpointName);
        void spawnReverseConnection(int count, const std::string &ip, int port, const std::string &endpointName, const std::string &endpointPassword);
        void seedRandomNumberGenerator();

        template<typename ServerT> friend class EndpointBroker;
        template<typename ServerT> friend class EndpointServer;
        
        std::map<std::string, EndpointInfo> endpointMap;
        Platform::Threads::mutex endpointMapMutex;
    };

} // namespace RCF

#include <RCF/RcfServer.inl>

#endif // ! _RCF_RCFSERVER_HPP

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Australia Australia
Software developer, from Sweden and now living in Canberra, Australia, working on distributed C++ applications. When he is not programming, Jarl enjoys skiing and playing table tennis. He derives immense satisfaction from referring to himself in third person.

Comments and Discussions