//*****************************************************************************
// RCF - Remote Call Framework
// Copyright (c) 2005. All rights reserved.
// Developed by Jarl Lindrud.
// Contact: jlindrud@hotmail.com .
//*****************************************************************************
#include <RCF/EndpointBrokerService.hpp>
#include <RCF/CurrentSession.hpp>
#include <RCF/RcfServer.hpp>
namespace RCF {
EndpointBroker::EndpointBroker(
ServerTransportPtr serverTransportPtr,
const std::string &endpointName,
const std::string &endpointClientPassword,
const std::string &endpointServerPassword) :
mServerTransportPtr(serverTransportPtr),
mEndpointName(endpointName),
mEndpointClientPassword(endpointClientPassword),
mEndpointServerPassword(endpointServerPassword)
{}
int EndpointBroker::bindToEndpoint()
{
RCF_ASSERT( mMasterConnection.get() != NULL );
if (mConnections.size() == 0)
{
if (mMasterConnection->getClientStub().isConnected())
{
try
{
mMasterConnection->spawnConnections(Oneway, 3);
return RCF_ERROR_ENDPOINT_RETRY;
}
catch(const RCF::Exception &e)
{
RCF_TRACE("")(e);
}
}
return RCF_ERROR_ENDPOINT_DOWN;
}
else
{
RCF_ASSERT(mConnections.size() > 0);
while (!mConnections.empty())
{
SessionPtr sessionPtr = mConnections.back();
boost::shared_ptr<I_Session> sessionPtr0 = boost::static_pointer_cast<I_Session>(sessionPtr);
mConnections.pop_back();
RCF_ASSERT(sessionPtr.get());
I_ServerTransport &serverTransport = getCurrentSessionPtr()->getProactorPtr()->getServerTransport();
I_ServerTransportEx &serverTransportEx = dynamic_cast<I_ServerTransportEx &>(serverTransport);
if (serverTransportEx.isConnected( sessionPtr0))
{
serverTransportEx.reflect(getCurrentISessionPtr(), sessionPtr0);
return RCF_ERROR_OK;
}
}
RCF_ASSERT( mConnections.empty());
return bindToEndpoint(); // try again, this time try the master connection
}
}
EndpointBrokerService::EndpointBrokerService() :
mEndpointBrokersMutex(WriterPriority)
{}
void EndpointBrokerService::onServiceAdded(RcfServer &server)
{
mServerTransportPtr = server.getServerTransportPtr(); // need to obtain a shared ptr to the transport, so it doesn't get deleted before we do
server.bind<I_EndpointBroker>(*this);
}
void EndpointBrokerService::onServiceRemoved(RcfServer &)
{}
// remotely invoked
bool EndpointBrokerService::openEndpoint(const std::string &endpointName, const std::string &endpointClientPassword, std::string &endpointServerPassword)
{
WriteLock writeLock(mEndpointBrokersMutex);
if (mEndpointBrokers[endpointName].get() && !mEndpointBrokers[endpointName]->mMasterConnection->getClientStub().isConnected())
{
mEndpointBrokers[endpointName].reset();
}
if (mEndpointBrokers[endpointName].get() == NULL) // TODO: check for zombie broker (e.g. all connections defunct)
{
I_ServerTransport &serverTransport = getCurrentSessionPtr()->getProactorPtr()->getServerTransport();
I_ServerTransportEx *pServerTransportEx = dynamic_cast<I_ServerTransportEx *>(&serverTransport);
if (pServerTransportEx)
{
I_ServerTransportEx &serverTransportEx = *pServerTransportEx;
mEndpointBrokers[endpointName].reset(new EndpointBroker(mServerTransportPtr, endpointName, endpointClientPassword, endpointServerPassword));
ClientTransportAutoPtr cllientTransportAutoPtr( serverTransportEx.createClientTransport(getCurrentISessionPtr()) );
boost::shared_ptr< RcfClient<I_EndpointServer> > clientPtr( new RcfClient<I_EndpointServer>(cllientTransportAutoPtr) );
mEndpointBrokers[endpointName]->mMasterConnection = clientPtr;
return true;
}
}
return false;
}
// remotely invoked
bool EndpointBrokerService::closeEndpoint(const std::string &endpointName, const std::string &endpointServerPassword)
{
RCF_UNUSED_VARIABLE(endpointServerPassword);
WriteLock writeLock(mEndpointBrokersMutex);
if (mEndpointBrokers[endpointName].get())
{
mEndpointBrokers[endpointName].reset(); // TODO: physically remove the entry, not just reset it
}
return true;
}
// remotely invoked
bool EndpointBrokerService::establishEndpointConnection(const std::string &endpointName, const std::string &endpointServerPassword)
{
WriteLock writeLock(mEndpointBrokersMutex); // TODO: read lock here and an extra mutex for each endpoint broker instead
if (mEndpointBrokers[endpointName].get() && mEndpointBrokers[endpointName]->mEndpointServerPassword == endpointServerPassword)
{
mEndpointBrokers[endpointName]->mConnections.push_back(getCurrentSessionPtr());
return true;
}
return false;
}
// remotely invoked
int EndpointBrokerService::bindToEndpoint(const std::string &endpointName, const std::string &endpointClientPassword)
{
WriteLock writeLock(mEndpointBrokersMutex); // TODO: read lock here and an extra mutex for each endpoint broker instead
// TODO: don't use mEndpointBrokers[] until it is known that the endpoint broker exists
if (NULL == mEndpointBrokers[endpointName].get())
{
return RCF_ERROR_ENDPOINT_ABSENT;
}
else if (mEndpointBrokers[endpointName]->mEndpointClientPassword != endpointClientPassword)
{
return RCF_ERROR_ENDPOINT_PASSWORD;
}
else
{
int err = mEndpointBrokers[endpointName]->bindToEndpoint(); // bind current session to endpoint
if (err == RCF_ERROR_ENDPOINT_DOWN)
{
mEndpointBrokers[endpointName].reset();
}
return err;
}
}
} // namespace RCF