/********************************************************************
File : Dispatcher.cpp
Creation date : 2010/6/27
License : Copyright 2010 Ahmed Charfeddine, http://www.pushframework.com
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*********************************************************************/
#include "StdAfx.h"
#include "Dispatcher.h"
#include "ScopedLock.h"
#include "Channel.h"
#include "ServerImpl.h"
#include "ServerStats.h"
#include "..\include\Protocol.h"
#include "..\include\OutgoingPacket.h"
#include "..\include\IncomingPacket.h"
#include "..\include\ClientFactory.h"
#include "..\include\Client.h"
#include "..\include\Service.h"
#include "..\include\BroadcastManager.h"
#include "..\include\Server.h"
#include "ChannelFactory.h"
#include "ClientImpl.h"
#include "ClientFactoryImpl.h"
#include "BroadcastManagerImpl.h"
#include "MonitorsBroadcastManager.h"
#include "BroadcastChannel.h"
#include "MonitorProtocol.h"
#include "MonitorRequestPacket.h"
#include "MonitorResponsePacket.h"
#include "StopWatch.h"
#include "Utilities.h"
namespace PushFramework{
Dispatcher::Dispatcher(ServerImpl* pServerImpl)
{
pMonitorsBroadcastManager = new MonitorsBroadcastManager(pServerImpl->getChannelFactory());
this->pServerImpl = pServerImpl;
::InitializeCriticalSection(&csSrvMap);
ZeroMemory(&m_QPFrequency, sizeof(m_QPFrequency));
QueryPerformanceFrequency(&m_QPFrequency);
}
Dispatcher::~Dispatcher(void)
{
::DeleteCriticalSection(&csSrvMap);
delete pMonitorsBroadcastManager;
}
std::string Dispatcher::getServiceNames()
{
std::stringstream ss;
ss << std::noskipws;
for (serviceMapT::iterator it = serviceMap.begin();
it!=serviceMap.end();
it++)
{
ss << "<request value=\"" << it->second->serviceName << "\"/>";
}
return ss.str();
}
void Dispatcher::setCurrentService( std::string serviceName )
{
ScopedLock lock(csSrvMap);
DWORD dwThread = ::GetCurrentThreadId();
workerServiceMap[dwThread] = serviceName;
}
void Dispatcher::UnsetCurrentService()
{
ScopedLock lock(csSrvMap);
DWORD dwThread = ::GetCurrentThreadId();
workerServiceMapT::iterator it = workerServiceMap.find(dwThread);
if (it!=workerServiceMap.end())
workerServiceMap.erase(it);
}
bool Dispatcher::getCurrentService( std::string& serviceName )
{
ScopedLock lock(csSrvMap);
DWORD dwThread = ::GetCurrentThreadId();
workerServiceMapT::iterator it = workerServiceMap.find(dwThread);
if (it==workerServiceMap.end())
return false;
serviceName = it->second;
return true;
}
void Dispatcher::NotifyObserversClientIN( const char* key, std::string peerIP, unsigned int peerPort )
{
std::string timestamp = Utilities::getCurrentTime();
// = "22:00::23";
std::stringstream ss;
ss << std::noskipws;
//Write header :
ss << "<root type=\"cin\">";
ss << "<time value=\"" << timestamp << "\"/>";
ss << "<name value=\"" << key << "\"/>";
ss << "<ip value=\"" << peerIP << "\"/>";
ss << "<port value=\"" << peerPort << "\"/>";
ss << "</root>";
std::string data = ss.str();
OutgoingPacket* pPacket = new MonitorResponsePacket(data);
pMonitorsBroadcastManager->pushPacket(pPacket, "clientsIn", key, 0);
}
void Dispatcher::NotifyObserversClientOut( const char* key )
{
// = "22:00::23";
std::stringstream ss;
ss << std::noskipws;
//Write header :
ss << "<root type=\"cout\">";
ss << "<name value=\"" << key << "\"/>";
ss << "</root>";
std::string data = ss.str();
OutgoingPacket* pPacket = new MonitorResponsePacket(data);
//Signal that client is out :
pMonitorsBroadcastManager->pushPacket(pPacket, "clientsOut");
//Remove client from the other broadcast group :
pMonitorsBroadcastManager->removePacket(key, 0, "clientsIn");
}
void Dispatcher::OnWriteComplete( CChannel* pChannel, DWORD dwIoSize )
{
pServerImpl->getChannelFactory()->getObject(pChannel->getKey());
bool reportSentData = !pChannel->isObserverChannel();
if (reportSentData)
{
pServerImpl->getStats()->addToCumul(ServerStats::BandwidthOutbound, dwIoSize);
pServerImpl->getStats()->addToKeyedDuration(ServerStats::BandwidthOutboundPerConnection, pChannel->getKey(), dwIoSize);
}
bool bIsBufferIdle;
int status = pChannel->OnSendCompleted(dwIoSize, bIsBufferIdle);
if (status == CChannel::Attached && bIsBufferIdle){
ProcessClientPendingPackets(pChannel);
}
pServerImpl->getChannelFactory()->disposeObject(pChannel->getKey());
}
void Dispatcher::OnReceiveComplete( CChannel* pChannel,DWORD dwIoSize )
{
//wcout << L"Cycle begin." << std::endl;
//
StopWatch watch(m_QPFrequency);
pServerImpl->getChannelFactory()->getObject(pChannel->getKey());
bool isClient = !pChannel->isObserverChannel();
if (isClient)
{
pServerImpl->getStats()->addToCumul(ServerStats::BandwidthInbound, dwIoSize);
pServerImpl->getStats()->addToKeyedDuration(ServerStats::BandwidthInboundPerConnection, pChannel->getKey(), dwIoSize);
}
//wcout << L"Retrieved channel : " << watch.GetElapsedTime(false) << std::endl;
int status = pChannel->ReadReceivedBytes(dwIoSize);
//wcout << L"Received bytes : " << watch.GetElapsedTime(false) << std::endl;
if(status < CChannel::Connected)
{
pServerImpl->getChannelFactory()->disposeObject(pChannel->getKey());
//PostReceive is not called.
//wcout << L"Cycle aborted" << std::endl;
return;
}
//Channel is either connected or attached.
if (dwIoSize == 0)//Peer wants to close the connection.
{
pChannel->Close(false);
if (status == CChannel::Attached)
{
if (isClient)
{
std::string clientKey = pChannel->getClient();
pServerImpl->getClientFactory()->onClientDisconnected(clientKey.c_str());
}
else
{
//TODO notify observer is out.
}
}
pServerImpl->getChannelFactory()->disposeObject(pChannel->getKey());
//wcout << L"Cycle aborted : " << std::endl;
return;
}
//The Processing Loop.
int uCommandID;
IncomingPacket* pPacket;
int iResult;
unsigned int uExtractedBytes;
Protocol* pProtocol = pChannel->getProtocol();
DataBuffer& sourceBuffer = pChannel->GetReceiveBuffer();
bool bProcessDataInQueue = true;
while (bProcessDataInQueue)
{
//watch.GetElapsedTime(false);
iResult = pProtocol->tryDeserializeIncomingPacket(sourceBuffer, pPacket, uCommandID, uExtractedBytes);
//iResult = pProtocol->tryDeframeIncomingPacket(pChannel->GetReceiveBuffer().GetBuffer(), pChannel->GetReceiveBuffer().GetDataSize(),uCommandID, pPacket, uExtractedBytes);
//wcout << L"Packet deframed : " << watch.GetElapsedTime(false) << std::endl;
if (iResult == Protocol::Success)
{
pChannel->GetReceiveBuffer().Pop(uExtractedBytes);
if (status == CChannel::Attached)
{
if(isClient)
dispatchRequest(uCommandID, pChannel->getClient().c_str(), *pPacket, uExtractedBytes);
else
HandleMonitorRequest(pChannel, *pPacket);
}
else if(status == CChannel::Connected)
{
if(isClient)
ProcessFirstPacket(pChannel, uCommandID, *pPacket,uExtractedBytes);
else
ProcessMonitorFirstPacket(pChannel, *pPacket);
}
else{
//Status changed by another thread eg ::disconnect.
bProcessDataInQueue = false;
}
pProtocol->disposeIncomingPacket(pPacket);
}
else if (iResult == Protocol::eDecodingFailure)
{
pChannel->GetReceiveBuffer().Pop(uExtractedBytes);
}
else
break;
}
//
if (iResult == Protocol::eIncompletePacket)
{
pChannel->PostReceive();
}
pServerImpl->getChannelFactory()->disposeObject(pChannel->getKey());
/*
double d = watch.GetElapsedTime();
wcout << L"Cycle duration : " << d << std::endl << std::endl << std::endl;*/
}
void Dispatcher::OnInitializeReady( CChannel* pChannel )
{
//hold a reference..
pServerImpl->getChannelFactory()->getObject(pChannel->getKey());
bool isClient = !pChannel->isObserverChannel();
if (isClient)
{
//
void* lpContext = NULL;
OutgoingPacket* pPacket = NULL;
pPacket = pServerImpl->getClientFactory()->onNewConnection(lpContext);
if (pPacket)
{
pChannel->saveContext(lpContext);
pChannel->PushPacket(pPacket);
pServerImpl->getFacade()->disposeOutgoingPacket(pPacket);
}
}
//
pChannel->PostReceive();
//
pServerImpl->getChannelFactory()->disposeObject(pChannel->getKey());
}
void Dispatcher::OnStartGC()
{
pServerImpl->getChannelFactory()->closeNonLogged();
}
void Dispatcher::OnStartProfiling()
{
OutgoingPacket* pPacket = pServerImpl->getStats()->getPerformancePacket();
//push to broadcast group
pMonitorsBroadcastManager->pushPacket(pPacket, "stats");
/*
pServerImpl->getChannelFactory()->BroadcastToMonitors(pPacket);
pServerImpl->getFacade()->disposeOutgoingPacket(pPacket);*/
}
void Dispatcher::registerService( unsigned int uCommmand, Service* pService, std::string serviceName )
{
ServiceInfo* pServiceInfo = new ServiceInfo;
pServiceInfo->pService = pService;
pServiceInfo->serviceName = serviceName;
serviceMap[uCommmand] = pServiceInfo;
}
void Dispatcher::dispatchRequest( unsigned int uCommand,const char* uClient,IncomingPacket& packet,unsigned int serviceBytes )
{
//StopWatch dispatchWatch(m_QPFrequency);
serviceMapT::iterator it = serviceMap.find(uCommand);
if(it == serviceMap.end())
return;
//
Service* pHandler = it->second->pService;
//wcout << L"Locating Service : " << dispatchWatch.GetElapsedTime(false) << std::endl;
//Mark dispatched service :
setCurrentService(it->second->serviceName);
StopWatch watch(m_QPFrequency);
pHandler->handle(uClient, &packet);
double duration = watch.GetElapsedTime();
/* wcout << L"Service Time : " << watch.GetElapsedTime() << std::endl;
*/
//StopWatch statsClock(m_QPFrequency);
pServerImpl->getStats()->addToDistribution(ServerStats::PerformanceProcessingTimePerService, it->second->serviceName, duration);
//wcout << L"Stat 1 : " << statsClock.GetElapsedTime(false) << std::endl;
pServerImpl->getStats()->addToDuration(ServerStats::PerformanceProcessingTime, duration);
//wcout << L"Stat 2 : " << statsClock.GetElapsedTime(false) << std::endl;
UnsetCurrentService();
//Stats. :
pServerImpl->getStats()->addToDistribution(ServerStats::BandwidthInboundVolPerRequest, it->second->serviceName, serviceBytes);
//wcout << L"Stat 3 : " << statsClock.GetElapsedTime(false) << std::endl;
pServerImpl->getStats()->addToDistribution(ServerStats::PerformanceRequestVolPerRequest, it->second->serviceName, 1);
//wcout << L"Stat 4 : " << statsClock.GetElapsedTime(false) << std::endl;
//wcout << L"Dispatch Time : " << dispatchWatch.GetElapsedTime() << std::endl;
}
void Dispatcher::ProcessFirstPacket( CChannel* pChannel,unsigned int uCommand, IncomingPacket& packet,unsigned int serviceBytes )
{
OutgoingPacket* pOutPacket = NULL;
//
Client* pClient;
int iResult = pServerImpl->getClientFactory()->onFirstRequest(packet, pChannel->getContext(), pClient, pOutPacket);
//packet and lpContext are not good.
if (iResult==ClientFactory::RefuseAndClose)
{
pChannel->Close(false);
return;
}
//
if (iResult == ClientFactory::RefuseRequest)
{
if (pOutPacket)
{
pChannel->PushPacket(pOutPacket);
pServerImpl->getFacade()->disposeOutgoingPacket(pOutPacket);
}
return;
}
//Init the Impl :
pClient->pImpl->init();
//iResult >= CClientFactory::CreateClient
pClient->pImpl->setServerImpl(pServerImpl);
std::string clientKey = pClient->getKey();
unsigned int channelKey = pChannel->getKey();
pChannel->attachToClient(clientKey.c_str()); //status is attached.
pClient->pImpl->setChannel(channelKey);
pServerImpl->getStats()->addToCumul(ServerStats::VisitorsHitsIn, 1);
Client* pExistingClient = pServerImpl->getClientFactory()->pImpl->tryAddClient(pClient);
if (pExistingClient)
{
//Delete created instance.
pServerImpl->getClientFactory()->disposeClient(pClient);
//Close past channel.
unsigned int existingChannel = pExistingClient->pImpl->getChannel();
CChannel* pPastChannel = pServerImpl->getChannelFactory()->getObject(existingChannel);
if (pPastChannel)
{
pPastChannel->Close(false);
pServerImpl->getChannelFactory()->disposeObject(existingChannel);
}
//Attach client to new channel :
pExistingClient->pImpl->setChannel(channelKey);
//Release reference :
pServerImpl->getClientFactory()->returnClient(clientKey.c_str());
//Fire Reconnect event.
pServerImpl->getClientFactory()->onClientReconnected(clientKey.c_str());
return;
}
//Fire connect event.
pServerImpl->getClientFactory()->onClientConnected(clientKey.c_str());
//Statistics :
NotifyObserversClientIN(clientKey.c_str(), pChannel->getPeerIP(), pChannel->getPeerPort());
pServerImpl->getStats()->addToCumul(ServerStats::VisitorsOnline, 1);
if (iResult==ClientFactory::CreateAndRouteRequest)
{
dispatchRequest(uCommand, pClient->getKey(), packet, serviceBytes);
}
}
void Dispatcher::HandleMonitorRequest( CChannel* pChannel, IncomingPacket& packet )
{
MonitorRequestPacket& requestPacket = (MonitorRequestPacket&) packet;
std::string command = requestPacket.getArgumentAsText("command");
if (command == "disconnect")
{
pChannel->Close(false);
return;
}
if (command=="about")
{
std::stringstream ss;
ss << std::noskipws;
std::string str = pServerImpl->getServerInfos() + "\nBased on PushFramework version 1.0";
ss << "<root type=\"console\" value=\""<< str <<"\"/>";
MonitorResponsePacket response(ss.str());
pChannel->PushPacket(&response);
return;
}
if (command == "profiling enable")
{
if(pServerImpl->getProfilingStatus()==true)
{
std::stringstream ss;
ss << std::noskipws;
ss << "<root type=\"console\" value=\"Profiling is already enabled.\"/>";
MonitorResponsePacket response(ss.str());
pChannel->PushPacket(&response);
}
else
{
pServerImpl->enableProfiling(-1);
//
std::stringstream ss;
ss << std::noskipws;
ss << "<root type=\"console\" value=\"Profiling was enabled.\"/>";
MonitorResponsePacket response(ss.str());
pChannel->PushPacket(&response);
}
return;
}
if (command == "profiling disable")
{
if(pServerImpl->getProfilingStatus()==false)
{
std::stringstream ss;
ss << std::noskipws;
ss << "<root type=\"console\" value=\"Profiling is already disabled.\"/>";
MonitorResponsePacket response(ss.str());
pChannel->PushPacket(&response);
}
else
{
pServerImpl->disableProfiling();
//
std::stringstream ss;
ss << std::noskipws;
ss << "<root type=\"console\" value=\"Profiling was disabled.\"/>";
MonitorResponsePacket response(ss.str());
pChannel->PushPacket(&response);
}
return;
}
if (command == "profiling status")
{
std::stringstream ss;
ss << std::noskipws;
if(pServerImpl->getProfilingStatus()==false)
ss << "<root type=\"console\" value=\"Profiling was found to be disabled.\"/>";
else
ss << "<root type=\"console\" value=\"Profiling was found to be enabled.\"/>";
MonitorResponsePacket response(ss.str());
pChannel->PushPacket(&response);
}
char pOut[256];
bool bRet = pServerImpl->getFacade()->handleMonitorRequest(command.c_str(), pOut);
if (bRet)
{
//
std::stringstream ss;
ss << std::noskipws;
ss << "<root type=\"console\" value=\"" << pOut << L"\"/>";
MonitorResponsePacket response(ss.str());
pChannel->PushPacket(&response);
}
}
void Dispatcher::ProcessMonitorFirstPacket( CChannel* pChannel, IncomingPacket& packet )
{
MonitorRequestPacket& requestPacket = (MonitorRequestPacket&) packet;
//
std::string accessKey = requestPacket.getArgumentAsText("accessKey");
Version ver;
pServerImpl->getVersion(ver);
unsigned int monitorProtocol = ver.monitorProtocolVer;//requestPacket.getArgumentAsInt("protocol");
if (accessKey == pServerImpl->getMonitorPassword())
{
if (ver.monitorProtocolVer != monitorProtocol)
{
MonitorResponsePacket* pPacket = new MonitorResponsePacket("<root type=\"pref\"/>");
pChannel->PushPacket(pPacket);
pServerImpl->getFacade()->disposeOutgoingPacket(pPacket);
pChannel->Close(true);
}
else
{
pChannel->attachToClient("");
OutgoingPacket* pInitPacket = pServerImpl->getStats()->getInitializationPacket();
pChannel->PushPacket(pInitPacket);
pServerImpl->getFacade()->disposeOutgoingPacket(pInitPacket);
}
}
else{
//Reply
MonitorResponsePacket* pRefusePwdResponse = new MonitorResponsePacket("<root type=\"cref\"/>");
pChannel->PushPacket(pRefusePwdResponse);
pServerImpl->getFacade()->disposeOutgoingPacket(pRefusePwdResponse);
}
}
void Dispatcher::ProcessClientPendingPackets( CChannel* pChannel )
{
/*
OutgoingPacket* _pPacket = pServerImpl->getFacade()->getTestPacket();
pChannel->SendData(_pPacket);
delete _pPacket;
return;
*/
//Get subscriber key :
std::string subscriberKey;
if(pChannel->isObserverChannel())
subscriberKey = Utilities::stringBuilderA("%d", pChannel->getKey());
else
subscriberKey = pChannel->getClient();
// Get broadcast manager :
BroadcastManagerImplBase* pBroadcastMgr = NULL;
if (pChannel->isObserverChannel())
pBroadcastMgr = pMonitorsBroadcastManager;
else
pBroadcastMgr = pServerImpl->getBroadcastManager()->pImpl;
std::string broadcastGroup;
BroadcastChannel::PacketHANDLE hPacket;
OutgoingPacket* pPacket = pBroadcastMgr->getNextPacket(subscriberKey.c_str(), hPacket, broadcastGroup);
if(!pPacket)
return;
bool bSuccess = pChannel->PushPacket(pPacket);
pBroadcastMgr->disposePacket(hPacket, broadcastGroup, subscriberKey, bSuccess);
}
}