//*****************************************************************************
// RCF - Remote Call Framework
// Copyright (c) 2005. All rights reserved.
// Developed by Jarl Lindrud.
// Contact: jlindrud@hotmail.com .
//*****************************************************************************
#include <RCF/PublishingService.hpp>
#include <RCF/CurrentSession.hpp>
#include <RCF/MulticastClientTransport.hpp>
#include <RCF/RcfServer.hpp>
#include <RCF/ServerInterfaces.hpp>
#include <RCF/ServerTransport.hpp>
#include <RCF/Session.hpp>
namespace RCF {
PublishingService::PublishingService() :
mPublishersMutex(WriterPriority)
{}
bool PublishingService::beginPublishNamed(const std::string &publisherName, RcfClientPtr rcfClientPtr)
{
WriteLock lock(mPublishersMutex);
mPublishers[ publisherName ].reset( new Publisher );
mPublishers[ publisherName ]->name = publisherName;
mPublishers[ publisherName ]->multicastClient = rcfClientPtr;
mPublishers[ publisherName ]->multicastClient->getClientStub().setTransport(std::auto_ptr<I_ClientTransport>(new MulticastClientTransport));
mPublishers[ publisherName ]->multicastClient->getClientStub().setDefaultCallingSemantics(Oneway);
return true;
}
I_RcfClient &PublishingService::publishNamed(const std::string &publisherName)
{
ReadLock lock(mPublishersMutex);
if (mPublishers.find(publisherName) != mPublishers.end())
{
return *mPublishers[ publisherName ]->multicastClient;
}
RCF_THROW(ServiceException, "publishing service not found")(publisherName);
}
bool PublishingService::endPublishNamed(const std::string &publisherName)
{
WriteLock lock(mPublishersMutex);
mPublishers[ publisherName ].reset();
return true;
}
// remotely accessible
bool PublishingService::requestSubscription(const std::string &subscriptionName)
{
std::string publisherName = subscriptionName;
bool found = false;
ReadLock lock(mPublishersMutex);
if (mPublishers.find(publisherName) != mPublishers.end())
{
found = true;
}
lock.unlock();
if (found)
{
I_ServerTransportEx &serverTransport =
dynamic_cast<I_ServerTransportEx &>(getCurrentSessionPtr()->getProactorPtr()->getServerTransport());
ClientTransportAutoPtr clientTransportAutoPtr( serverTransport.createClientTransport(getCurrentISessionPtr()) );
ClientTransportPtr clientTransportPtr( clientTransportAutoPtr.release() );
getCurrentSessionPtr()->setOnWriteCompletedCallback(
boost::bind( &PublishingService::addSubscriberTransport, this, _1, publisherName, clientTransportPtr) );
}
return found;
}
void PublishingService::onServiceAdded(RcfServer &server)
{
server.bind<I_RequestSubscription>(*this);
}
void PublishingService::onServiceRemoved(RcfServer &)
{}
void PublishingService::addSubscriberTransport(Session &session, const std::string &publisherName, ClientTransportPtr clientTransportPtr)
{
WriteLock lock(mPublishersMutex);
if (mPublishers.find(publisherName) != mPublishers.end())
{
I_ClientTransport &clientTransport = mPublishers[ publisherName ]->multicastClient->getClientStub().getTransport();
MulticastClientTransport &multiCastClientTransport = dynamic_cast<MulticastClientTransport &>(clientTransport);
multiCastClientTransport.addTransport(clientTransportPtr);
}
}
} // namespace RCF