//*****************************************************************************
// RCF - Remote Call Framework
// Copyright (c) 2005. All rights reserved.
// Developed by Jarl Lindrud.
// Contact: jlindrud@hotmail.com .
//*****************************************************************************
#include <RCF/ObjectFactoryService.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <RCF/RcfServer.hpp>
#include <RCF/ServerInterfaces.hpp>
#include <RCF/StubEntry.hpp>
#include <RCF/StubFactory.hpp>
namespace RCF {
ObjectFactoryService::TokenFactory::TokenFactory(int tokenCount) :
mMutex(WriterPriority)
{
for (int i=0; i<tokenCount; i++)
{
mTokenSpace.push_back( Token(i+1) );
}
mAvailableTokens.assign( mTokenSpace.rbegin(), mTokenSpace.rend() );
// VC6 library deficiencies - above code is equivalent to following:
//for (int i=static_cast<int>(tokenSpace_.size())-1; i>=0; i--)
// tokens_.push_back( tokenSpace_[i] );
}
bool ObjectFactoryService::TokenFactory::requestToken(Token &token)
{
WriteLock writeLock(mMutex); RCF_UNUSED_VARIABLE(writeLock);
if (mAvailableTokens.empty())
{
RCF_TRACE("no more tokens available")(mAvailableTokens.size())(mTokenSpace.size());
return false;
}
else
{
Token myToken = mAvailableTokens.back();
mAvailableTokens.pop_back();
token = myToken;
return true;
}
}
void ObjectFactoryService::TokenFactory::returnToken(const Token &token)
{
WriteLock writeLock(mMutex); RCF_UNUSED_VARIABLE(writeLock);
mAvailableTokens.push_back(token);
}
const std::vector<Token> &ObjectFactoryService::TokenFactory::getTokenSpace()
{
return mTokenSpace;
}
unsigned int ObjectFactoryService::TokenFactory::getAvailableTokenCount()
{
ReadLock readLock( mMutex ); RCF_UNUSED_VARIABLE(readLock);
return static_cast<unsigned int>(mAvailableTokens.size());
}
ObjectFactoryService::ObjectFactoryService(unsigned int numberOfTokens, unsigned int clientStubTimeoutS) :
mClientStubTimeoutS(clientStubTimeoutS),
mTokenFactory(numberOfTokens),
mStubFactoryMapMutex(WriterPriority),
mStopFlag()
{
allocateStubMap();
}
// remotely accessible
bool ObjectFactoryService::createObject(const std::string &objectName, Token &token)
{
RCF_TRACE("")(objectName);
// if we have used up 2/3 of the token space, trigger a cleanup sweep
if (3*mTokenFactory.getAvailableTokenCount() < mTokenFactory.getTokenSpace().size())
{
mCleanupThresholdCondition.notify_one();
}
boost::shared_ptr<I_StubFactory> stubFactoryPtr( getStubFactory( objectName ) );
if (stubFactoryPtr.get())
{
// TODO: exception safety
RcfClientPtr rcfClientPtr( stubFactoryPtr->makeServerStub() );
Token myToken;
bool ok = mTokenFactory.requestToken(myToken);
if (ok)
{
StubEntryPtr stubEntryPtr = getStubEntryPtr(myToken);
{
Lock lock(stubEntryPtr->getMutex());
RCF_ASSERT( NULL == stubEntryPtr->getRcfClientPtr().get() );
stubEntryPtr->setRcfClientPtr(rcfClientPtr);
stubEntryPtr->touch();
}
token = myToken;
return true;
}
}
return false;
}
StubEntryPtr ObjectFactoryService::getStubEntryPtr(const Token &token)
{
return mStubMap.find(token) != mStubMap.end() ?
mStubMap[token] :
StubEntryPtr();
}
void ObjectFactoryService::onServiceAdded(RcfServer &server)
{
server.bind<I_ObjectFactory>(*this);
{
WriteLock writeLock(getTaskEntriesMutex());
getTaskEntries().clear();
getTaskEntries().push_back(
TaskEntry(
boost::bind(&ObjectFactoryService::cycleCleanup, this, _1, _2),
boost::bind(&ObjectFactoryService::stopCleanup, this) ));
}
mStopFlag = false;
}
void ObjectFactoryService::onServiceRemoved(RcfServer &server)
{
server.unbind<I_ObjectFactory>();
}
void ObjectFactoryService::stopCleanup()
{
mStopFlag = true;
Lock lock(mCleanupThresholdMutex);
mCleanupThresholdCondition.notify_one();
}
bool ObjectFactoryService::cycleCleanup(int timeoutMs, const volatile bool &stopFlag)
{
if (timeoutMs == 0)
{
cleanupStubMap(mClientStubTimeoutS);
}
else
{
Lock lock(mCleanupThresholdMutex);
if (!stopFlag && !mStopFlag)
{
//mCleanupThresholdCondition.wait(lock);
// wakeup after 2000 ms if no-one signals us
// TODO: configure
mCleanupThresholdCondition.timed_wait(lock, 2000);
if (!stopFlag && !mStopFlag)
{
cleanupStubMap(mClientStubTimeoutS);
}
else
{
return true;
}
}
}
return stopFlag || mStopFlag;
}
bool ObjectFactoryService::insertStubFactory(const std::string &objectName, const std::string &desc, boost::shared_ptr<I_StubFactory> factory)
{
RCF_UNUSED_VARIABLE(desc);
WriteLock writeLock(mStubFactoryMapMutex);
if (mStubFactoryMap.find(objectName) == mStubFactoryMap.end())
{
mStubFactoryMap[ objectName ] = factory;
return true;
}
return false;
}
bool ObjectFactoryService::removeStubFactory(const std::string &objectName)
{
WriteLock writeLock(mStubFactoryMapMutex);
mStubFactoryMap.erase(mStubFactoryMap.find(objectName));
return true;
}
boost::shared_ptr<I_StubFactory> ObjectFactoryService::getStubFactory(const std::string &objectName)
{
ReadLock readLock(mStubFactoryMapMutex);
return mStubFactoryMap.find(objectName) != mStubFactoryMap.end() ?
mStubFactoryMap[objectName] :
boost::shared_ptr<I_StubFactory>();
}
void ObjectFactoryService::cleanupStubMap(unsigned int timeoutS)
{
// Clean up the stub map
RCF_TRACE("");
if (3*mTokenFactory.getAvailableTokenCount() < mTokenFactory.getTokenSpace().size())
{
for (std::vector<Token>::const_iterator iter = mTokenFactory.getTokenSpace().begin(); iter != mTokenFactory.getTokenSpace().end(); iter++)
{
Token token = *iter;
StubEntryPtr stubEntryPtr = getStubEntryPtr(token);
if (stubEntryPtr->getConnectionCount() == 0)
{
if (stubEntryPtr->getElapsedTimeS() > timeoutS)
{
bool bReturnToken = false;
{
// Server stubs are removed when a minimum of timeout seconds have passed since their connection count went to zero
Lock lock(stubEntryPtr->getMutex());
if (stubEntryPtr->getConnectionCount() == 0 &&
stubEntryPtr->getElapsedTimeS() > timeoutS)
{
stubEntryPtr->clear();
bReturnToken = true;
RCF_TRACE("Stub entry cleared")(token);
}
}
if (bReturnToken)
{
mTokenFactory.returnToken(token);
RCF_TRACE("Token returned")(token);
}
}
}
}
}
}
void ObjectFactoryService::allocateStubMap()
{
RCF_TRACE("");
mStubMap.clear();
for (std::vector<Token>::const_iterator iter = mTokenFactory.getTokenSpace().begin(); iter != mTokenFactory.getTokenSpace().end(); iter++)
{
mStubMap[*iter] = StubEntryPtr( new StubEntry ) ;
}
}
void reinstateClientTransport(ClientStub &clientStub, I_RcfClient &factory)
{
clientStub.setTransport(factory.getClientStub().releaseTransport());
}
bool createRemoteObjectNamed(I_RcfClient &rcfClient, std::string objectName)
{
try
{
ClientStub &clientStub = rcfClient.getClientStub();
clientStub.connectTransport();
RcfClient<I_ObjectFactory> factory(clientStub.releaseTransport());
//ScopeGuard guard( boost::bind(&reinstateClientTransport, boost::ref(clientStub), boost::ref(factory)) );
ScopeGuard guard = MakeGuard(reinstateClientTransport, boost::ref(clientStub), boost::ref(factory));
RCF::Token token;
bool ok = factory.createObject(RCF::Twoway, objectName, token);
if (ok)
{
clientStub.setToken(token);
return true;
}
else
{
clientStub.setToken(Token());
return false;
}
}
catch(const RCF::Exception &e)
{
RCF_TRACE("failed to create remote object")(e);
return false;
}
}
} // namespace RCF