Click here to Skip to main content
Click here to Skip to main content
Add your own
alternative version

RMI for C++

, 6 Aug 2009 CPOL
User-friendly remote method invocation in C++.
rcf.zip
RCF
demo
vs2003
RCF
Client
Server
doc
include
RCF
Connection.inl
Marshal.inl
Multiplexer.inl
RCF.inl
RcfServer.inl
ServerStub.inl
Protocol
util
Platform
Compiler
bcc55
cw80
gcc32
icl70
msvc71
Library
bcc55
cw80
gcc32
msvc60
msvc71
Machine
SPARC
x86
OS
Cygwin
Unix
Windows
System
NonWindows
Windows
Threads
SF
src
RCF
Protocol
util
SF
test
Jamfile
Jamrules
borland
Jamfile
Jamrules
vs2003
RCF
RCF
RCFTest
//*****************************************************************************
// RCF - Remote Call Framework
// Copyright (c) 2005. All rights reserved.
// Developed by Jarl Lindrud.
// Contact: jlindrud@hotmail.com .
//*****************************************************************************

#include <RCF/RcfServer.hpp>

#include <time.h>

#include <boost/lexical_cast.hpp>
#include <RCF/Idl.hpp>
#include <RCF/MethodInvocation.hpp>
#include <RCF/Multiplexer.hpp>
#include <RCF/Random.hpp>
#include <RCF/ServerInterfaces.hpp>
#include <RCF/Token.hpp>
#include <RCF/Tools.hpp>

namespace RCF {

    // time in s since ca 1970, may fail after year 2038
    unsigned int getCurrentTimeS() 
    { 
        return static_cast<unsigned int>(time(NULL)); 
    }

    
    //*****************************
    // TokenFactory

    size_t TokenHash::operator()(const Token &token) const 
    {
        return token.getId();
    } 

    TokenFactory::TokenFactory(int tokenCount) 
        : nextTokenId_(1) 
    {
        for (int i=0; i<tokenCount; i++) 
            tokenSpace_.push_back( generateToken() );
            
        tokens_.assign( tokenSpace_.rbegin(), tokenSpace_.rend() );
        
        // VC6 library deficiencies - above code is equivalent to following:
        //for (int i=static_cast<int>(tokenSpace_.size())-1; i>=0; i--)
        //    tokens_.push_back( tokenSpace_[i] );
        
    }
    
    Token TokenFactory::getNextToken() 
    {
        Platform::Threads::mutex::scoped_lock lock(mutex_); (void) RCF_UNUSED_VARIABLE(lock);

        if (tokens_.empty())
            RCF_THROW( RCF::ServerException, "no more tokens available" );

        Token token = tokens_.back();
        tokens_.pop_back();
        return token;
    }
    
    void TokenFactory::returnToken(Token token) 
    { 
        Platform::Threads::mutex::scoped_lock lock(mutex_); RCF_UNUSED_VARIABLE(lock);
        tokens_.push_back(token);
    }

    std::vector<Token> &TokenFactory::getTokenSpace() 
    { 
        return tokenSpace_; 
    }

    Token TokenFactory::generateToken() 
    {
        return Token(nextTokenId_++);
    }

    //********************************************
    // StubEntry

    StubEntry::StubEntry() : 
        stub_(), 
        connectionCount_(), 
        timeStamp_(), 
        expire_(),
        mutex_() 
    {}

    void StubEntry::reset(boost::shared_ptr<I_ServerStub> stub, bool expire)
    {
        stub_ = stub;
        connectionCount_ = 0;
        timeStamp_ = getCurrentTimeS();
        expire_ = expire;
    }

    bool StubEntry::hasStub()
    {
        return stub_.get() != NULL;
    }

    I_ServerStub &StubEntry::getStub() 
    { 
        return *stub_; 
    }
    
    unsigned int &StubEntry::getConnectionCount() 
    { 
        return connectionCount_; 
    }
    
    unsigned int &StubEntry::getTimeStamp() 
    { 
        return timeStamp_; 
    }

    bool StubEntry::getExpire() const
    {
        return expire_;
    }
    
    Platform::Threads::recursive_mutex &StubEntry::getMutex() 
    { 
        return mutex_; 
    }

    // StubEntryAccess
    StubEntryAccess::StubEntryAccess(StubEntry &stubEntry) 
        : lock_( stubEntry.getMutex() ) 
    {}

    // StubEntryAddRef
    StubEntryAddRef::StubEntryAddRef() :
      stubEntry_()
    {}
    
    StubEntryAddRef::~StubEntryAddRef() 
    { 
        reset();
    }

    void StubEntryAddRef::reset(boost::shared_ptr<StubEntry> stubEntry) 
    { 
        subRef(); 
        stubEntry_ = stubEntry; 
        addRef(); 
    }

    void StubEntryAddRef::addRef() 
    { 
        if (stubEntry_)
        {
            StubEntryAccess access(*stubEntry_);
            stubEntry_->getConnectionCount()++;
        }
    }
    
    void StubEntryAddRef::subRef() 
    { 
        if (stubEntry_)
        {
            StubEntryAccess access(*stubEntry_);
            stubEntry_->getConnectionCount()--;
            stubEntry_->getTimeStamp() = getCurrentTimeS(); 
        }
    }
    
    int Session::receive() 
    { 
        return connection.receive(); 
    }
    
    int Session::send() 
    { 
        return connection.send(); 
    }


    //*******************************
    // EndpointInfo

    EndpointInfo::EndpointInfo() 
    {}

    EndpointInfo::EndpointInfo(const std::string &name, const std::string &password, const ClientInfo &clientInfo) : 
        name(name), 
        password(password),
        clientInfo(clientInfo) 
    {}
    
    void EndpointInfo::addFd(int fd) 
    { 
        RCF_TRACE("")(fd);
        fds.push_back(fd); 
    }

    int EndpointInfo::popFd() 
    {
        RCF_TRACE("")(fds.front());
        int fd = fds.front(); 
        fds.pop_front(); 
        return fd; 
    }

    int EndpointInfo::frontFd() 
    { 
        return fds.front(); 
    }
    
    int EndpointInfo::fdCount() 
    { 
        return static_cast<int>(fds.size()); 
    }

    std::string EndpointInfo::getPassword() const 
    { 
        return password; 
    }

    bool EndpointInfo::verifyAccess(const std::string &password, const ClientInfo &clientInfo)
    {
        return 
            clientInfo.getAddress() == this->clientInfo.getAddress() &&
            password == this->password;
    }

    std::ostream &operator<<(std::ostream &os, const EndpointInfo &endpointInfo)
    {
        os << "(" 
            << "name = " << endpointInfo.name << ", "
            << "password = " << endpointInfo.password << ", "
            << "fds = " << endpointInfo.fds
            << ")";
        return os;
    }
   

    //*******************************
    // RcfServer

    template<typename Server>
    class EndpointBroker
    {
    public:
        EndpointBroker(Server &server) : server(server) {}
        void openEndpoint(const std::string &name, std::string &password) { server.openEndpoint(name, password); }
        void closeEndpoint(const std::string &name, const std::string &password) { server.closeEndpoint(name, password); }
        void reverseThisConnection(const std::string &name, const std::string &password) { server.reverseThisConnection(name, password); }
        void bindToEndpoint(const std::string &name) { server.bindToEndpoint(name); }
    private:
        Server &server;
    };

    template<typename Server>
    class EndpointServer
    {
    public:
        EndpointServer(Server &server) : server(server) {}
        void spawnReverseConnection(
            int count, 
            const std::string &ip, 
            int port, 
            const std::string &endpointName, 
            const std::string &endpointPassword) 
        { 
            server.spawnReverseConnection(count, ip, port, endpointName, endpointPassword); 
        }

    private:
        Server &server;
    };

    class ServerInfo
    {
    public:
        ServerInfo(const std::string &name) : name(name) {}
        std::string getName() { return name; }
    private:
        std::string name;
    };

    RcfServer::RcfServer(int port) : 
        port_(port), 
        tokenFactory_(NumberOfTokens),
        multiplexer_( new MultiplexerT(*this, port_) ),
        running_(false),
        startCallback_()
    {
        RCF_TRACE("")(port);

        allocateStubTable();
        createObjectFactory();
    }

    RcfServer::RcfServer(const std::string &endpointIp, int endpointPort, const std::string &endpointName) : 
        port_(), 
        tokenFactory_(NumberOfTokens),
        multiplexer_( new MultiplexerT(*this) ),
        running_(false),
        startCallback_(),
        endpointName(endpointName),
        endpointIp(endpointIp),
        endpointPort(endpointPort)
    {
        RCF_TRACE("")(endpointIp)(endpointPort)(endpointName);

        allocateStubTable();
        createObjectFactory();
    }
    
    RcfServer::~RcfServer() 
    {
        RCF_TRACE("");

        reset();
    }

    void RcfServer::stop(bool wait)
    {
        RCF_TRACE("");

        stop_ = true;
        getMultiplexer().stop(wait);
        getStopEvent().notify_all();
        stop_ = false;
        running_ = false;
    }

    void RcfServer::reset()
    {
        RCF_TRACE("");

        stop();
        getMultiplexer().reset();
        cleanup(0);
    }

    void RcfServer::onStart()
    {
        if (startCallback_)
        {
            startCallback_(*this);
        }
    }

    void RcfServer::waitForStopEvent()
    {
        RCF_TRACE("");

        Platform::Threads::mutex::scoped_lock lock( getStopMutex() ); RCF_UNUSED_VARIABLE(lock);
        getStopEvent().wait( lock );
    }

    void RcfServer::setAllowedClientIps(const std::vector<std::string> &ips)
    {
        getMultiplexer().setAllowedClientIps(ips);
    }
    
    std::vector<std::string> RcfServer::getAllowedClientIps()
    {
        return getMultiplexer().getAllowedClientIps();
    }

    void RcfServer::setNetworkInterface(const std::string &networkInterface)
    {
        getMultiplexer().setNetworkInterface(networkInterface);
    }
    std::string RcfServer::getNetworkInterface()
    {
        return getMultiplexer().getNetworkInterface();
    }

    void RcfServer::serializeExceptionResponse(Connection &connection)
    {
        RCF_TRACE("");

        connection.prepareSend();
        connection << RCF::MethodInvocationResponse(true);
        connection << std::string("std::runtime_error");
        connection << std::string("Non-std::exception derived exception was thrown upon invoking server object");
    }

    void RcfServer::sendResponse(Connection &connection, bool oneway)
    {
        RCF_TRACE("");

        if (!oneway)
        {
            connection.send();
        }
    }

    Session *RcfServer::createSession(int fd, const ClientInfo &clientInfo) 
    { 
        return new Session(fd, clientInfo, getRecvTimeoutMs());
    }

    void RcfServer::destroySession(Session *session) 
    { 
        delete session;
    }

    bool RcfServer::handleSession(Session &session) 
    {
        RCF_TRACE("");

        StubEntryAddRef &stubEntryAddRef = session.getStubEntryAddRef();
        Connection &connection = session.getConnection();
        MethodInvocationRequest request;

        if (connection.receive() <= 0)
        {
            return false;
        }
        
        connection >> request;
        
        if (request.getClose()) 
        {
            return false;
        }

        bool oneway = request.getOneway();
                        
        Token token = request.getToken();
        if (token == Token())
        {
            std::string service = request.getService();
            token = getSharedStubToken(service);
        }
        StubEntry &stubEntry = getStubEntry(token);
        stubEntryAddRef.reset( getStubEntryPtr(token) );
        
        {
            // TODO: the following two scopes are apparently not triggered by Borland C++, when throwing non
            // std::exception derived exceptions.
            ScopeGuard sendResponseGuard = 
                MakeObjGuard(*this, &RcfServer::sendResponse, ByRef(connection), oneway);

            {

                ScopeGuard serializeExceptionResponseGuard = 
                    MakeObjGuard(*this, &RcfServer::serializeExceptionResponse, ByRef(connection)) ;

                try 
                {
                    stubEntry.getStub().invoke(request.getFnId(), connection);
                    serializeExceptionResponseGuard.Dismiss();
                }
                catch(const SerializationException &e) 
                {
                    RCF_TRACE(": Serialization exception")(typeid(e))(e);
                    serializeExceptionResponseGuard.Dismiss();
                    sendResponseGuard.Dismiss();
                    return false;
                }
                catch(const MultiplexerException &e) 
                {
                    RCF_TRACE(": Multiplexer exception")(typeid(e))(e);
                    serializeExceptionResponseGuard.Dismiss();
                    sendResponseGuard.Dismiss();
                    return false;
                }
                catch(const std::exception &e) 
                {
                    RCF_TRACE(": User exception")(typeid(e))(e);
                    serializeExceptionResponseGuard.Dismiss();
                    connection.prepareSend();
                    connection << RCF::MethodInvocationResponse(true);
                    connection << std::string(typeid(e).name()); // TODO: something more predictable typeid.name()
                    connection << std::string(e.what());
                }
            }
        }

        return true;
    }

    void RcfServer::createObjectFactory()
    {
        RCF_TRACE("");

        bind< I_ObjectFactory,  ObjectFactory<RcfServer> >(
            std::auto_ptr< ObjectFactory<RcfServer> >( new ObjectFactory<RcfServer>(*this) ));

        bind< I_EndpointBroker, EndpointBroker<RcfServer> >(
            std::auto_ptr< EndpointBroker<RcfServer> >( new EndpointBroker<RcfServer>(*this) ));

        bind< I_EndpointServer, EndpointServer<RcfServer> >(
            std::auto_ptr< EndpointServer<RcfServer> >( new EndpointServer<RcfServer>(*this) ));

        bind< I_ServerInfo, ServerInfo >(
            std::auto_ptr<ServerInfo>( new ServerInfo(endpointName) ));
    }

    Token RcfServer::createClientStub( const std::string &objectName ) 
    {
        RCF_TRACE("")(objectName);

        if (isShared(objectName))
        {
            return createSharedClientStub(objectName);
        }
        else
        {
            I_StubFactory &factory = getStubFactory( objectName );
            boost::shared_ptr<I_ServerStub> serverStub = factory.makeServerStub();
            Token token = getTokenFactory().getNextToken();
            serverStub->setToken( token );
            RCF_ASSERT( getStubTable().find( token ) != getStubTable().end() );
            StubEntry &stubEntry = getStubEntry(token);
            StubEntryAccess access(stubEntry);
            RCF_ASSERT( ! stubEntry.hasStub() );
            stubEntry.reset(serverStub);
            incrementStubCount();
            return token;
        }
    }

    Token RcfServer::createSharedClientStub( const std::string &objectName )
    {
        RCF_TRACE("")(objectName);

        return getSharedStubToken(objectName);
    }

    bool RcfServer::isShared(const std::string &objectName)
    {
        RCF_TRACE("")(objectName);

        return sharedStubTokens.find(objectName) != sharedStubTokens.end();
    }

    Token RcfServer::getSharedStubToken(const std::string &objectName)
    {
        RCF_TRACE("")(objectName);

        RCF_ASSERT(isShared(objectName))(objectName);
        return sharedStubTokens[ objectName ];
    }

    std::vector<Token> &RcfServer::getTokenSpace() 
    { 
        return getTokenFactory().getTokenSpace(); 
    }

    void RcfServer::incrementStubCount()
    {
        Platform::Threads::recursive_mutex::scoped_lock lock( stubCountMutex_ ); RCF_UNUSED_VARIABLE(lock);
        stubCount_++;
    }

    void RcfServer::decrementStubCount()
    {
        Platform::Threads::recursive_mutex::scoped_lock lock( stubCountMutex_ ); RCF_UNUSED_VARIABLE(lock);
        stubCount_--;
    }

    void RcfServer::resetStubCount()
    {
        Platform::Threads::recursive_mutex::scoped_lock lock( stubCountMutex_ ); RCF_UNUSED_VARIABLE(lock);
        stubCount_ = 0;
    }

    int RcfServer::getStubCount()
    {
        Platform::Threads::recursive_mutex::scoped_lock lock( stubCountMutex_ ); RCF_UNUSED_VARIABLE(lock);
        return stubCount_;
    }

    void RcfServer::cleanup(unsigned int timeout) 
    {

        Platform::Threads::try_mutex::scoped_try_lock lock( cleanupMutex_ ); RCF_UNUSED_VARIABLE(lock);
        if (!lock.locked())
            return;

        // Clean up the stub table
        RCF_TRACE("");
        for (std::vector<Token>::iterator iter = getTokenSpace().begin(); iter != getTokenSpace().end(); iter++) {
            Token token = *iter;
            StubEntry &stubEntry = getStubEntry(token);
            if (stubEntry.getExpire()) {
                if (stubEntry.hasStub()) {
                    if (stubEntry.getConnectionCount() == 0) {
                        bool bReturnToken = false;
                        {
                            // Server stubs are removed when a minimum of timeout seconds have passed since their connection count went to zero
                            StubEntryAccess access(stubEntry);
                            if (stubEntry.getExpire() &&
                                stubEntry.getConnectionCount() == 0 && 
                                stubEntry.getTimeStamp() != 0 && 
                                (getCurrentTimeS() - stubEntry.getTimeStamp()) > timeout ) { 
                                     stubEntry.reset();
                                    bReturnToken = true;
                                    decrementStubCount();
                                    RCF_TRACE("Stub entry cleared")(token);
                            }
                        }
                        if (bReturnToken) 
                        {
                            getTokenFactory().returnToken(token);
                            RCF_TRACE("Token returned")(token);
                        }
                    }
                }
            }
        }
    }    

    void RcfServer::allocateStubTable()
    {
        RCF_TRACE("");

        if (! running_) {
            getStubTable().clear();
            for (std::vector<Token>::iterator iter = getTokenSpace().begin(); iter != getTokenSpace().end(); iter++) 
                getStubTable()[*iter] = boost::shared_ptr<StubEntry>( new StubEntry() );
            resetStubCount();
        }
        else
            RCF_TRACE( "Can't allocate stub table while server is running!" );
    }

    void RcfServer::printServerDiagnostics() 
    {
        RCF_TRACE("");

        int used = 0;
        int unused = 0;

        RCF_TRACE("Stub table contents");
        for (std::vector<Token>::iterator iter = getTokenSpace().begin(); iter != getTokenSpace().end(); iter++) {
            Token token = *iter;
            StubEntry &stubEntry = getStubEntry(token);
            if (stubEntry.hasStub()) {
                RCF_TRACE("Stub entry: ")(token)(stubEntry.getConnectionCount())(stubEntry.getTimeStamp());
                used++;
            }
            else
                unused++;
        }
        RCF_TRACE("Stub count: ")(used)(unused);
    }

    TokenFactory &RcfServer::getTokenFactory() 
    { 
        return tokenFactory_; 
    }
    
    RcfServer::StubTableT &RcfServer::getStubTable() 
    { 
        return stubTable_; 
    }
    
    // Multiplexer
    RcfServer::MultiplexerT &RcfServer::getMultiplexer() 
    { 
        return *multiplexer_; 
    }

    Platform::Threads::mutex &RcfServer::getStopMutex() 
    { 
        return stop_mutex_; 
    }

    Platform::Threads::condition &RcfServer::getStopEvent() 
    { 
        return stop_event_; 
    }

    StubEntry &RcfServer::getStubEntry(Token token)
    {
        return *getStubEntryPtr(token);
    }

    boost::shared_ptr<StubEntry> RcfServer::getStubEntryPtr(Token token)
    {
        if (getStubTable().find(token) != getStubTable().end())
            return getStubTable()[token];
        else
            RCF_THROW( RCF::ServerException, "token not found in stub table" )(token);
    }

    void RcfServer::registerStubFactory(const std::string &objectName, const std::string &/*desc*/, boost::shared_ptr<I_StubFactory> factory)
    {
        RCF_TRACE("")(objectName)((int)getStubFactoryTable().size());
        getStubFactoryTable()[ objectName ] = factory;
    }

    I_StubFactory &RcfServer::getStubFactory( const std::string &objectName )
    {
        if (getStubFactoryTable().find(objectName) == getStubFactoryTable().end())
            RCF_THROW(RCF::ServerException, "stub factory not registered")(objectName);
        return * getStubFactoryTable()[ objectName ];
    }

    RcfServer::StubFactoryTableT &RcfServer::getStubFactoryTable() 
    {
        return stubFactoryTable;
    }

    

    void RcfServer::start(bool wait)
    {
        RCF_TRACE("")(wait);

        if ( ! running_ ) {
            running_ = true;
            stop_ = false;
            //getMultiplexer().reset();
            
            if (endpointName != "")
            {
                // Passive server code for endpoints
                RcfClient<I_EndpointBroker> endpointBroker(endpointIp, endpointPort);
                endpointBroker.openEndpoint(endpointName, endpointPassword);
                getMultiplexer().addFd( endpointBroker.releaseConnection()->releaseFd() );
                getMultiplexer().start( NumberOfWorkerThreads );
            }
            else
            {
                getMultiplexer().start( NumberOfWorkerThreads );
                if (wait)
                {
                    waitForStopEvent();
                }
            }
        }
    }
    

    void RcfServer::openEndpoint(const std::string &endpointName, std::string &endpointPassword)
    {
        RCF_TRACE("")(endpointName)(endpointPassword);
        
        Platform::Threads::mutex::scoped_lock lock(endpointMapMutex); RCF_UNUSED_VARIABLE(lock);
        if (endpointMap.find(endpointName) == endpointMap.end())
        {
            {
                // TODO: better passwords
                int n = generateRandomNumber();
                endpointPassword = boost::lexical_cast<std::string>(n);
            }
            
            EndpointInfo endpointInfo(
                endpointName, 
                endpointPassword,
                getCurrentSessionInfo().getClientInfo());

            endpointMap[endpointName] = endpointInfo;

            int fd = getMultiplexer().reserveCurrentSessionForForwarding();
            endpointMap[endpointName].addFd(fd);
        }
    }

    void RcfServer::closeEndpoint(const std::string &endpointName, const std::string &endpointPassword)
    {
        RCF_TRACE("")(endpointName)(endpointPassword);

        Platform::Threads::mutex::scoped_lock lock(endpointMapMutex); RCF_UNUSED_VARIABLE(lock);
        if (endpointMap.find(endpointName) != endpointMap.end())
        {
            EndpointInfo &endpointInfo = endpointMap[endpointName];
            const ClientInfo &clientInfo = getCurrentSessionInfo().getClientInfo();
            if (endpointInfo.verifyAccess(endpointPassword, clientInfo))
            {
                endpointMap.erase( endpointMap.find(endpointName) );
            }
        }
    }

    void RcfServer::reverseThisConnection(const std::string &endpointName, const std::string &endpointPassword)
    {
        RCF_TRACE("")(endpointName)(endpointPassword);

        Platform::Threads::mutex::scoped_lock lock(endpointMapMutex); RCF_UNUSED_VARIABLE(lock);
        if (endpointMap.find(endpointName) != endpointMap.end())
        {
            EndpointInfo &endpointInfo = endpointMap[endpointName];
            const ClientInfo &clientInfo = getCurrentSessionInfo().getClientInfo();
            if (endpointInfo.verifyAccess(endpointPassword, clientInfo))
            {
                int fd = getMultiplexer().reserveCurrentSessionForForwarding();
                endpointInfo.addFd(fd);
            }
            else
            {
                throw RCF::Exception("Invalid password");
            }
        }
    }

    bool isFdWriteable(int fd)
    {
        fd_set fdSet;
        FD_ZERO(&fdSet);
        FD_SET(fd, &fdSet);
        int err = select(FD_SETSIZE, NULL, &fdSet, NULL, NULL);
        return err == 1;
    }

    bool sendOnewayRemoteSpawnRequest(int fd, int count, const std::string &ip, int port, const std::string &endpointName, const std::string &endpointPassword)
    {
        RCF_TRACE("")(fd)(count)(ip)(port)(endpointName)(endpointPassword);
        try
        {
            RcfClient<I_EndpointServer> endpointServer;
            endpointServer.setFd(fd);
            endpointServer.setOneway(true);
            endpointServer.spawnReverseConnection(count, ip, port, endpointName, endpointPassword);
            return true;
        }
        catch(const RCF::Exception &e)
        {
            RCF_TRACE("failed!")(e);
            return false;
        }
    }

    void RcfServer::bindToEndpoint(const std::string &endpointName)
    {
        RCF_TRACE("")(endpointName);

        Platform::Threads::mutex::scoped_lock lock(endpointMapMutex); RCF_UNUSED_VARIABLE(lock);
        if (endpointMap.find(endpointName) != endpointMap.end())
        {
            EndpointInfo &endpointInfo = endpointMap[endpointName];
            RCF_TRACE("")(endpointInfo)(endpointInfo.fdCount());
            while (endpointInfo.fdCount() > 1)
            {
                int fd = endpointInfo.popFd();
                if (isFdWriteable(fd))
                {
                    getMultiplexer().markCurrentSessionForForwarding(fd);
                    return;
                }
            }
            if (endpointInfo.fdCount() == 1 )
            {
                int fd = endpointInfo.frontFd();
                std::string endpointPassword = endpointInfo.getPassword();
                if (sendOnewayRemoteSpawnRequest(fd, 3, "localhost", port_, endpointName, endpointPassword))
                {
                    RCF_THROW(RCF::ServerException, "no available fds, issuing remote spawn request...");
                }
                else
                {
                    endpointMap.erase( endpointMap.find( endpointName ) );
                    RCF_THROW(RCF::ServerException, "no contact with endpoint, closing endpoint...");
                }
            }
            else
            {
                RCF_THROW(RCF::ServerException, "no available fds, cant even issue a remote spawn request...");
            }
        }
    }

    void RcfServer::spawnReverseConnection(int count, const std::string &ip, int port, const std::string &endpointName, const std::string &endpointPassword)
    {
        RCF_TRACE("")(count)(ip)(port)(endpointName)(endpointPassword);

        if (endpointName == this->endpointName && 
            endpointPassword == this->endpointPassword &&
            count > 0 && count < 20)
        {
            for (int i=0; i<count; i++)
            {
                RcfClient<I_EndpointBroker> endpointBroker(ip, port);
                endpointBroker.setOneway(true);
                endpointBroker.reverseThisConnection(endpointName, endpointPassword);
                getMultiplexer().addFd( endpointBroker.releaseConnection()->releaseFd() );
            }
        }
        else
        {
            RCF_TRACE("refusing to spawn connections")(endpointName)(this->endpointName)(endpointPassword)(this->endpointPassword)(count);
        }
    }
    
} // namespace RCF

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Jarl Lindrud

Australia Australia
Software developer, ex-resident of Sweden and now living in Canberra, Australia, working on distributed C++ applications. Jarl enjoys programming, but prefers skiing and playing table tennis. He derives immense satisfaction from referring to himself in third person.

| Advertise | Privacy | Terms of Use | Mobile
Web01 | 2.8.141220.1 | Last Updated 6 Aug 2009
Article Copyright 2005 by Jarl Lindrud
Everything else Copyright © CodeProject, 1999-2014
Layout: fixed | fluid