//*****************************************************************************
// RCF - Remote Call Framework
// Copyright (c) 2005. All rights reserved.
// Developed by Jarl Lindrud.
// Contact: jlindrud@hotmail.com .
//*****************************************************************************
#ifndef INCLUDE_RCF_RCFSERVER_HPP
#define INCLUDE_RCF_RCFSERVER_HPP
#include <deque>
#include <map>
#include <memory>
#include <string>
#include <vector>
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/noncopyable.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/weak_ptr.hpp>
#include <RCF/CheckRtti.hpp>
#include <RCF/GetInterfaceName.hpp>
#include <RCF/ServerTransport.hpp>
#include <RCF/ThreadLibrary.hpp>
namespace RCF {
class I_ServerTransport;
class StubEntry;
class I_Service;
class I_Session;
class Session;
class I_Endpoint;
class I_StubEntryLookupProvider;
class I_FilterFactoryLookupProvider;
class I_RcfClient;
typedef boost::shared_ptr<I_ServerTransport> ServerTransportPtr;
typedef boost::shared_ptr<I_Proactor> ProactorPtr;
typedef boost::shared_ptr<StubEntry> StubEntryPtr;
typedef boost::shared_ptr<I_Service> ServicePtr;
typedef boost::shared_ptr<Session> SessionPtr;
typedef boost::shared_ptr<I_StubEntryLookupProvider> StubEntryLookupProviderPtr;
typedef boost::shared_ptr<I_FilterFactoryLookupProvider> FilterFactoryLookupProviderPtr;
typedef boost::shared_ptr<I_RcfClient> RcfClientPtr;
typedef std::deque<SessionPtr> SessionQueue;
typedef boost::shared_ptr<SessionQueue> SessionQueuePtr;
/// Server class, supporting pluggable transports and services.
class RcfServer : boost::noncopyable, public virtual I_SessionManager
{
public:
//*************************************
// public interface
/// Constructor
/// \param endpoint Represents endpoint location of the server. Determines which server transport will be created and used.
RcfServer(const I_Endpoint &endpoint);
/// Constructor
/// \param servicePtr Service to load, typically a server transport.
RcfServer(ServicePtr servicePtr);
/// Constructor
/// \param serverTransportPtr Server transport to load.
RcfServer(ServerTransportPtr serverTransportPtr);
/// Destructor
~RcfServer();
/// Starts the server, by starting all services, including transports.
/// Synchronized: yes, idempotent: yes.
/// \param spawnThreads Indicates whether any server threads should be spawned.
void start(bool spawnThreads = true);
typedef boost::function0<void> JoinFunctor;
/// Adds a join functor to the server, to be called when stopping the server.
/// \param joinFunctor Join functor.
void addJoinFunctor(JoinFunctor joinFunctor);
/// Starts the server, by taking over the current thread. All server tasks will be run sequentially in the calling thread.
/// Synchronized: no, idempotent: no.
void startInThisThread();
/// Starts the server, by taking over the current thread. All server tasks will be run sequentially in the calling thread.
/// Synchronized: no, idempotent: no.
/// \param joinFunctor Functor to call in order to join this thread.
void startInThisThread(JoinFunctor joinFunctor);
/// Stops the server. Optionally waits until all server threads have terminated.
/// Synchronized: yes, idempotent: yes
/// \param wait True to wait for server threads to terminate, false to return immediately.
void stop(bool wait = true);
/// Cycles all services, including transports. May or may not result in the dispatching of a client request.
/// \param timeoutMs Maximum waiting time, in milliseconds, for any blocking operations.
/// \return boolean value indicating if the server is being stopped.
bool cycle(int timeoutMs = 0);
/// Cycles the server's thread-specific session queue.
/// \param timeoutMs Maximum waiting time, in milliseconds, for any blocking operations.
/// \param stopFlag Reference to boolean value that, when set, indicates that all server threads should terminate as soon as possible.
void cycleSessions(int timeoutMs, const volatile bool &stopFlag);
/// Opens all services, including transports.
void open();
/// Closes all services, including transports.
void close();
/// Returns a reference to the primary server transport.
/// \return Reference to primary server transport.
I_ServerTransport &getServerTransport();
/// Returns a shared pointer to the primary server transport.
/// \return Shared pointer to primary server transport.
boost::shared_ptr<I_ServerTransport> getServerTransportPtr();
/// Binds a reference to an object, to an interface
/// \param x Object to be bound.
/// \param name Name to assign to the binding.
/// \return true if binding succeeded, false otherwise (probably because of a name collision).
template<typename InterfaceT, typename ImplementationT>
bool bind(ImplementationT &x, const std::string &name = "");
/// Binds a shared pointer to an object, to an interface
/// \param px Shared pointer to be bound.
/// \param name Name to assign to the binding.
/// \return true if binding succeeded, false otherwise (probably because of a name collision).
template<typename InterfaceT, typename ImplementationT>
bool bind(boost::shared_ptr<ImplementationT> px, const std::string &name = "");
/// Binds a weak pointer to an object, to an interface
/// \param px Weak pointer to be bound.
/// \param name Name to assign to the binding.
/// \return true if binding succeeded, false otherwise (probably because of a name collision).
template<typename InterfaceT, typename ImplementationT>
bool bind(boost::weak_ptr<ImplementationT> px, const std::string &name = "");
/// Binds a auto pointer to an object, to an interface
/// \param px Auto pointer to be bound.
/// \param name Name to assign to the binding.
/// \return true if binding succeeded, false otherwise (probably because of a name collision).
template<typename InterfaceT, typename ImplementationT>
bool bind(std::auto_ptr<ImplementationT> px, const std::string &name = "");
/// Removes a binding from the server.
/// \param name Name of the binding
/// \return true
template<typename InterfaceT>
bool unbind(const std::string &name = ""/*getInterfaceName<InterfaceT>()*/);
/// Adds a service to the server.
/// \param servicePtr Service to be added. E.g. ...
/// \return true if service added successfully, false otherwise (probably because the service has already been added).
bool addService(ServicePtr servicePtr);
/// Removes a service from the server.
/// \param servicePtr Service to be removed.
/// \return true.
bool removeService(ServicePtr servicePtr);
// preferred form
//typedef boost::function<void(RcfServer&)> StartCallback;
// compatible form, needed for Borland C++ 5.5.1
typedef boost::function1<void, RcfServer&> StartCallback;
/// Sets the start callback, which is invoked by each worker thread when it starts.
void setStartCallback(StartCallback startCallback);
//{
// startCallback_ = startCallback;
//}
/// Sets the start callback, which is invoked by each worker thread when it starts.
template<typename T> void setStartCallback(void (T::*pfn)(RcfServer &), T &t)
{
// this function is implemented here (rather than in RcfServer.inl), because of borland c++ idiosyncracies
setStartCallback( boost::bind(pfn, &t, _1) );
}
private:
void invokeStartCallback();
private:
bool bindShared(const std::string &name, RcfClientPtr rcfClientPtr);
//*************************************
// async io transport interface
private:
typedef RCF::Session Session;
boost::shared_ptr<I_Session> createSession();
void onReadCompleted(boost::shared_ptr<I_Session> sessionPtr);
void onWriteCompleted(boost::shared_ptr<I_Session> sessionPtr);
void handleSession(SessionPtr sessionPtr);
void serializeSessionExceptionResponse(SessionPtr sessionPtr);
void sendSessionResponse(SessionPtr sessionPtr);
void closeSession(SessionPtr sessionPtr);
//*************************************
// transports, queues and threads
private:
SessionQueue &getSessionQueue(Session &session);
typedef ThreadSpecificPtr<SessionQueue>::Val ThreadSpecificSessionQueuePtr;
ThreadSpecificSessionQueuePtr mThreadSpecificSessionQueuePtr;
// eventually other specialized session queues...
volatile bool mServerThreadsStopFlag;
Mutex mOpenedMutex;
bool mOpened;
Mutex mStartedMutex;
bool mStarted;
public:
/// Returns a value indicating whether or not all server threads should terminate their activities.
bool getStopFlag();
//*************************************
// stub management
private:
ReadWriteMutex mStubMapMutex;
typedef std::map<std::string, StubEntryPtr> StubMap;
StubMap mStubMap;
//*************************************
// service management
private:
ReadWriteMutex mServicesMutex;
std::vector<ServicePtr> mServices;
std::vector<StubEntryLookupProviderPtr> mStubEntryLookupProviders;
std::vector<FilterFactoryLookupProviderPtr> mFilterFactoryLookupProviders;
std::vector<ServerTransportPtr> mServerTransports;
std::vector<JoinFunctor> mJoinFunctors;
void startService(ServicePtr servicePtr);
void stopService(ServicePtr servicePtr, bool wait = true);
FilterPtr createFilter(int filterId);
private:
// start callback
StartCallback startCallback_;
// start functionality
//Mutex mStartMutex;
Platform::Threads::condition mStartEvent;
// stop functionality
//Mutex mStopMutex;
Platform::Threads::condition mStopEvent;
public:
/// Waits for the server to be stopped.
void waitForStopEvent();
/// Waits for the server to be started.
void waitForStartEvent();
};
} // namespace RCF
#include <RCF/RcfServer.inl>
#endif // ! INCLUDE_RCF_RCFSERVER_HPP