Click here to Skip to main content
15,886,085 members
Articles / Programming Languages / C++

Push Framework - A C++ toolkit for high performance server development

Rate me:
Please Sign up or sign in to vote.
4.96/5 (86 votes)
23 May 2012Apache15 min read 260K   26.9K   316  
Write asynchronous, multithreaded servers in a few lines of code. Monitor realtime activity with a deploy-only dashboard.
/********************************************************************
	File :			Dispatcher.cpp
	Creation date :	2010/6/27
		
	License :			Copyright 2010 Ahmed Charfeddine, http://www.pushframework.com

				   Licensed under the Apache License, Version 2.0 (the "License");
				   you may not use this file except in compliance with the License.
				   You may obtain a copy of the License at
				
					   http://www.apache.org/licenses/LICENSE-2.0
				
				   Unless required by applicable law or agreed to in writing, software
				   distributed under the License is distributed on an "AS IS" BASIS,
				   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
				   See the License for the specific language governing permissions and
				   limitations under the License.
	
	
*********************************************************************/
#include "StdAfx.h"
#include "Dispatcher.h"

#include "ScopedLock.h"
#include "Channel.h"
#include "ServerImpl.h"
#include "ServerStats.h"
#include "..\include\Protocol.h"
#include "..\include\OutgoingPacket.h"
#include "..\include\IncomingPacket.h"
#include "..\include\ClientFactory.h"
#include "..\include\Client.h"
#include "..\include\Service.h"
#include "..\include\BroadcastManager.h"
#include "..\include\Server.h"
#include "ChannelFactory.h"
#include "ClientImpl.h"
#include "ClientFactoryImpl.h"

#include "BroadcastManagerImpl.h"

#include "MonitorsBroadcastManager.h"

#include "BroadcastChannel.h"
#include "MonitorProtocol.h"
#include "MonitorRequestPacket.h"
#include "MonitorResponsePacket.h"

#include "StopWatch.h"
#include "Utilities.h"

namespace PushFramework{


Dispatcher::Dispatcher(ServerImpl* pServerImpl)
{
	pMonitorsBroadcastManager = new MonitorsBroadcastManager(pServerImpl->getChannelFactory());

	this->pServerImpl = pServerImpl;
	::InitializeCriticalSection(&csSrvMap);

	ZeroMemory(&m_QPFrequency, sizeof(m_QPFrequency));
	QueryPerformanceFrequency(&m_QPFrequency);

}

Dispatcher::~Dispatcher(void)
{
	::DeleteCriticalSection(&csSrvMap);

	delete pMonitorsBroadcastManager;
}

std::string Dispatcher::getServiceNames()
{
	std::stringstream ss;
	ss << std::noskipws;

	for (serviceMapT::iterator it = serviceMap.begin();
		it!=serviceMap.end();
		it++)
	{
		ss << "<request value=\"" << it->second->serviceName << "\"/>";
	}

	return ss.str();
}

void Dispatcher::setCurrentService( std::string serviceName )
{
	ScopedLock lock(csSrvMap);
	DWORD dwThread = ::GetCurrentThreadId();

	workerServiceMap[dwThread] = serviceName;
}

void Dispatcher::UnsetCurrentService()
{
	ScopedLock lock(csSrvMap);
	DWORD dwThread = ::GetCurrentThreadId();

	workerServiceMapT::iterator it = workerServiceMap.find(dwThread);
	if (it!=workerServiceMap.end())
		workerServiceMap.erase(it);
}

bool Dispatcher::getCurrentService( std::string& serviceName )
{
	ScopedLock lock(csSrvMap);
	DWORD dwThread = ::GetCurrentThreadId();

	workerServiceMapT::iterator it = workerServiceMap.find(dwThread);
	if (it==workerServiceMap.end())
		return false;

	serviceName = it->second;
	return true;
}


void Dispatcher::NotifyObserversClientIN( const char* key, std::string peerIP, unsigned int peerPort )
{
	std::string timestamp  = Utilities::getCurrentTime();

	// = "22:00::23";

	std::stringstream ss;

	ss << std::noskipws;

	//Write header :
	ss << "<root type=\"cin\">";
	ss << "<time value=\"" << timestamp << "\"/>";
	ss << "<name value=\"" << key << "\"/>";
	ss << "<ip value=\"" << peerIP << "\"/>";
	ss << "<port value=\"" << peerPort << "\"/>";
	ss << "</root>";


	std::string data = ss.str();

	OutgoingPacket* pPacket =  new MonitorResponsePacket(data);

	pMonitorsBroadcastManager->pushPacket(pPacket, "clientsIn", key, 0);


	
}
void Dispatcher::NotifyObserversClientOut( const char* key )
{
	// = "22:00::23";

	std::stringstream ss;

	ss << std::noskipws;

	//Write header :
	ss << "<root type=\"cout\">";
	ss << "<name value=\"" << key << "\"/>";
	ss << "</root>";

	std::string data = ss.str();
	OutgoingPacket* pPacket =  new MonitorResponsePacket(data);

	//Signal that client is out :
	pMonitorsBroadcastManager->pushPacket(pPacket, "clientsOut");

	//Remove client from the other broadcast group :
	pMonitorsBroadcastManager->removePacket(key, 0, "clientsIn");
}



void Dispatcher::OnWriteComplete( CChannel* pChannel, DWORD dwIoSize )
{
	pServerImpl->getChannelFactory()->getObject(pChannel->getKey());

	bool reportSentData  = !pChannel->isObserverChannel();

	if (reportSentData)
	{
		pServerImpl->getStats()->addToCumul(ServerStats::BandwidthOutbound, dwIoSize);
		pServerImpl->getStats()->addToKeyedDuration(ServerStats::BandwidthOutboundPerConnection, pChannel->getKey(), dwIoSize);
	}

	bool bIsBufferIdle;
	int status = pChannel->OnSendCompleted(dwIoSize, bIsBufferIdle);

	if (status == CChannel::Attached && bIsBufferIdle){
		ProcessClientPendingPackets(pChannel);
	}

	pServerImpl->getChannelFactory()->disposeObject(pChannel->getKey());
}

void Dispatcher::OnReceiveComplete( CChannel* pChannel,DWORD dwIoSize )
{
	//wcout << L"Cycle begin." << std::endl;
	//
	StopWatch watch(m_QPFrequency);
	
	pServerImpl->getChannelFactory()->getObject(pChannel->getKey());

	bool isClient = !pChannel->isObserverChannel();

	if (isClient)
	{
		pServerImpl->getStats()->addToCumul(ServerStats::BandwidthInbound, dwIoSize);
		pServerImpl->getStats()->addToKeyedDuration(ServerStats::BandwidthInboundPerConnection, pChannel->getKey(), dwIoSize);
	}
	
	//wcout << L"Retrieved channel : " << watch.GetElapsedTime(false) << std::endl;


	int status = pChannel->ReadReceivedBytes(dwIoSize);

	//wcout << L"Received bytes : " << watch.GetElapsedTime(false) << std::endl;


	if(status < CChannel::Connected)
	{
		pServerImpl->getChannelFactory()->disposeObject(pChannel->getKey());
		//PostReceive is not called.

		//wcout << L"Cycle aborted" << std::endl;
		return;
	}

	//Channel is either connected or attached.

	if (dwIoSize == 0)//Peer wants to close the connection.
	{
		pChannel->Close(false);
		if (status == CChannel::Attached)
		{
			if (isClient)
			{
				std::string clientKey = pChannel->getClient();
				pServerImpl->getClientFactory()->onClientDisconnected(clientKey.c_str());
			}
			else
			{
				//TODO notify observer is out.
			}			
		}
		pServerImpl->getChannelFactory()->disposeObject(pChannel->getKey());

		//wcout << L"Cycle aborted : " << std::endl;

		return;
	}


	//The Processing Loop.
	int uCommandID;
	IncomingPacket* pPacket;
	int iResult;
	unsigned int uExtractedBytes;
	Protocol* pProtocol = pChannel->getProtocol();
	DataBuffer& sourceBuffer = pChannel->GetReceiveBuffer();

	bool bProcessDataInQueue = true;
	while (bProcessDataInQueue)
	{
		//watch.GetElapsedTime(false);
		iResult = pProtocol->tryDeserializeIncomingPacket(sourceBuffer, pPacket, uCommandID, uExtractedBytes);
		//iResult = pProtocol->tryDeframeIncomingPacket(pChannel->GetReceiveBuffer().GetBuffer(), pChannel->GetReceiveBuffer().GetDataSize(),uCommandID, pPacket, uExtractedBytes);
		//wcout << L"Packet deframed : " << watch.GetElapsedTime(false) << std::endl;
		if (iResult == Protocol::Success)
		{
			pChannel->GetReceiveBuffer().Pop(uExtractedBytes);
			if (status == CChannel::Attached)
			{
				if(isClient)
					dispatchRequest(uCommandID, pChannel->getClient().c_str(), *pPacket, uExtractedBytes);
				else
					HandleMonitorRequest(pChannel, *pPacket);
			}
			else if(status == CChannel::Connected)
			{
				if(isClient)
					ProcessFirstPacket(pChannel, uCommandID, *pPacket,uExtractedBytes);
				else
					ProcessMonitorFirstPacket(pChannel, *pPacket);
			}
			else{
				//Status changed by another thread eg ::disconnect.
				bProcessDataInQueue = false;
			}
			pProtocol->disposeIncomingPacket(pPacket);
		}
		else if (iResult == Protocol::eDecodingFailure)
		{
			pChannel->GetReceiveBuffer().Pop(uExtractedBytes);
		}
		else
			break;
	}
	//
	if (iResult == Protocol::eIncompletePacket)
	{
		pChannel->PostReceive();
	}
	pServerImpl->getChannelFactory()->disposeObject(pChannel->getKey());


		/*
		double d = watch.GetElapsedTime();
				wcout << L"Cycle duration : " << d << std::endl << std::endl << std::endl;*/
		
		
}

void Dispatcher::OnInitializeReady( CChannel* pChannel )
{
	//hold a reference..
	pServerImpl->getChannelFactory()->getObject(pChannel->getKey());

	bool isClient = !pChannel->isObserverChannel();

	if (isClient)
	{
		//
		void* lpContext = NULL;
		OutgoingPacket* pPacket = NULL;
		pPacket = pServerImpl->getClientFactory()->onNewConnection(lpContext);
		if (pPacket)
		{
			pChannel->saveContext(lpContext);
			pChannel->PushPacket(pPacket);
			pServerImpl->getFacade()->disposeOutgoingPacket(pPacket);
		}
	}

	
	//
	pChannel->PostReceive();
	//
	pServerImpl->getChannelFactory()->disposeObject(pChannel->getKey());
}





void Dispatcher::OnStartGC()
{
	pServerImpl->getChannelFactory()->closeNonLogged();
}

void Dispatcher::OnStartProfiling()
{
	OutgoingPacket* pPacket = pServerImpl->getStats()->getPerformancePacket();

	//push to broadcast group
	pMonitorsBroadcastManager->pushPacket(pPacket, "stats");

	/*
	pServerImpl->getChannelFactory()->BroadcastToMonitors(pPacket);
		pServerImpl->getFacade()->disposeOutgoingPacket(pPacket);*/
	
}

void Dispatcher::registerService( unsigned int uCommmand, Service* pService, std::string serviceName )
{
	ServiceInfo* pServiceInfo = new ServiceInfo;

	pServiceInfo->pService = pService;
	pServiceInfo->serviceName = serviceName;

	serviceMap[uCommmand] = pServiceInfo;
}

void Dispatcher::dispatchRequest( unsigned int uCommand,const char* uClient,IncomingPacket& packet,unsigned int serviceBytes )
{
	//StopWatch dispatchWatch(m_QPFrequency);

	serviceMapT::iterator it = serviceMap.find(uCommand);
	if(it == serviceMap.end())
		return;
	//		
	Service* pHandler = it->second->pService;

	//wcout << L"Locating Service : " << dispatchWatch.GetElapsedTime(false) << std::endl;

	//Mark dispatched service :

	setCurrentService(it->second->serviceName);

	StopWatch watch(m_QPFrequency);
	pHandler->handle(uClient, &packet);

	
	double duration = watch.GetElapsedTime();
/*	wcout << L"Service Time : " << watch.GetElapsedTime() << std::endl;
*/


	//StopWatch statsClock(m_QPFrequency);
	pServerImpl->getStats()->addToDistribution(ServerStats::PerformanceProcessingTimePerService, it->second->serviceName, duration);
	//wcout << L"Stat 1 : " << statsClock.GetElapsedTime(false) << std::endl;

	pServerImpl->getStats()->addToDuration(ServerStats::PerformanceProcessingTime, duration);
	//wcout << L"Stat 2 : " << statsClock.GetElapsedTime(false) << std::endl;

	UnsetCurrentService();

	//Stats. :
	
	pServerImpl->getStats()->addToDistribution(ServerStats::BandwidthInboundVolPerRequest, it->second->serviceName, serviceBytes);
	//wcout << L"Stat 3 : " << statsClock.GetElapsedTime(false) << std::endl;
	

	pServerImpl->getStats()->addToDistribution(ServerStats::PerformanceRequestVolPerRequest, it->second->serviceName, 1);
	//wcout << L"Stat 4 : " << statsClock.GetElapsedTime(false) << std::endl;


	//wcout << L"Dispatch Time : " << dispatchWatch.GetElapsedTime() << std::endl;
}

void Dispatcher::ProcessFirstPacket( CChannel* pChannel,unsigned int uCommand, IncomingPacket& packet,unsigned int serviceBytes )
{
	OutgoingPacket* pOutPacket = NULL;
	//
	Client* pClient;
	int iResult = pServerImpl->getClientFactory()->onFirstRequest(packet, pChannel->getContext(), pClient, pOutPacket);

	//packet and lpContext are not good.
	if (iResult==ClientFactory::RefuseAndClose)
	{
		pChannel->Close(false);
		return;
	}
	//
	if (iResult == ClientFactory::RefuseRequest)
	{
		if (pOutPacket)
		{
			pChannel->PushPacket(pOutPacket);
			pServerImpl->getFacade()->disposeOutgoingPacket(pOutPacket);
		}
		return;
	}

	//Init the Impl :
	pClient->pImpl->init();

	//iResult >= CClientFactory::CreateClient

	pClient->pImpl->setServerImpl(pServerImpl);

	std::string clientKey = pClient->getKey();
	unsigned int channelKey = pChannel->getKey();

	pChannel->attachToClient(clientKey.c_str()); //status is attached.
	pClient->pImpl->setChannel(channelKey);

	pServerImpl->getStats()->addToCumul(ServerStats::VisitorsHitsIn, 1);


	Client* pExistingClient = pServerImpl->getClientFactory()->pImpl->tryAddClient(pClient);
	if (pExistingClient)
	{
		//Delete created instance.
		pServerImpl->getClientFactory()->disposeClient(pClient);

		//Close past channel.
		unsigned int existingChannel = pExistingClient->pImpl->getChannel();
		CChannel* pPastChannel = pServerImpl->getChannelFactory()->getObject(existingChannel);
		if (pPastChannel)
		{
			pPastChannel->Close(false);
			pServerImpl->getChannelFactory()->disposeObject(existingChannel);
		}

		//Attach client to new channel :
		pExistingClient->pImpl->setChannel(channelKey);

		//Release reference :
		pServerImpl->getClientFactory()->returnClient(clientKey.c_str());

		//Fire Reconnect event.
		pServerImpl->getClientFactory()->onClientReconnected(clientKey.c_str());
		return;
	}

	//Fire connect event.
	pServerImpl->getClientFactory()->onClientConnected(clientKey.c_str());

	//Statistics :
	NotifyObserversClientIN(clientKey.c_str(), pChannel->getPeerIP(), pChannel->getPeerPort());
	pServerImpl->getStats()->addToCumul(ServerStats::VisitorsOnline, 1);


	if (iResult==ClientFactory::CreateAndRouteRequest)
	{
		dispatchRequest(uCommand, pClient->getKey(), packet, serviceBytes);
	}
}

void Dispatcher::HandleMonitorRequest( CChannel* pChannel, IncomingPacket& packet )
{
	MonitorRequestPacket& requestPacket = (MonitorRequestPacket&) packet;

	std::string command = requestPacket.getArgumentAsText("command");

	if (command == "disconnect")
	{
		pChannel->Close(false);
		return;
	}
	if (command=="about")
	{
		std::stringstream ss;
		ss << std::noskipws;

		std::string str = pServerImpl->getServerInfos() + "\nBased on PushFramework version 1.0";

		ss << "<root type=\"console\" value=\""<< str <<"\"/>";
		MonitorResponsePacket response(ss.str());
		pChannel->PushPacket(&response);
		return;
	}
	if (command == "profiling enable")
	{
		if(pServerImpl->getProfilingStatus()==true)
		{
			std::stringstream ss;
			ss << std::noskipws;
			ss << "<root type=\"console\" value=\"Profiling is already enabled.\"/>";
			MonitorResponsePacket response(ss.str());
			pChannel->PushPacket(&response);
		}
		else
		{
			pServerImpl->enableProfiling(-1);
			//
			std::stringstream ss;
			ss << std::noskipws;
			ss << "<root type=\"console\" value=\"Profiling was enabled.\"/>";
			MonitorResponsePacket response(ss.str());
			pChannel->PushPacket(&response);
		}
		return;
	}
	if (command == "profiling disable")
	{
		if(pServerImpl->getProfilingStatus()==false)
		{
			std::stringstream ss;
			ss << std::noskipws;
			ss << "<root type=\"console\" value=\"Profiling is already disabled.\"/>";
			MonitorResponsePacket response(ss.str());
			pChannel->PushPacket(&response);
		}
		else
		{
			pServerImpl->disableProfiling();
			//
			std::stringstream ss;
			ss << std::noskipws;
			ss << "<root type=\"console\" value=\"Profiling was disabled.\"/>";
			MonitorResponsePacket response(ss.str());
			pChannel->PushPacket(&response);

		}
		return;
	}
	if (command == "profiling status")
	{
		std::stringstream ss;
		ss << std::noskipws;
		if(pServerImpl->getProfilingStatus()==false)
			ss << "<root type=\"console\" value=\"Profiling was found to be disabled.\"/>";
		else
			ss << "<root type=\"console\" value=\"Profiling was found to be enabled.\"/>";
		MonitorResponsePacket response(ss.str());
		pChannel->PushPacket(&response);
	}

	char pOut[256];

	bool bRet = pServerImpl->getFacade()->handleMonitorRequest(command.c_str(), pOut);
	
	if (bRet)
	{
		//
		std::stringstream ss;
		ss << std::noskipws;
		ss << "<root type=\"console\" value=\"" << pOut << L"\"/>";
		MonitorResponsePacket response(ss.str());
		pChannel->PushPacket(&response);
	}

}

void Dispatcher::ProcessMonitorFirstPacket( CChannel* pChannel, IncomingPacket& packet )
{
	MonitorRequestPacket& requestPacket = (MonitorRequestPacket&) packet;

	//
	std::string accessKey = requestPacket.getArgumentAsText("accessKey");
	

	Version ver;
	pServerImpl->getVersion(ver);

	unsigned int monitorProtocol = ver.monitorProtocolVer;//requestPacket.getArgumentAsInt("protocol");

	if (accessKey == pServerImpl->getMonitorPassword())
	{
		if (ver.monitorProtocolVer != monitorProtocol)
		{
			MonitorResponsePacket* pPacket = new MonitorResponsePacket("<root type=\"pref\"/>");
			pChannel->PushPacket(pPacket);
			pServerImpl->getFacade()->disposeOutgoingPacket(pPacket);
			pChannel->Close(true);
		}
		else
		{
			pChannel->attachToClient("");

			OutgoingPacket* pInitPacket = pServerImpl->getStats()->getInitializationPacket();
			pChannel->PushPacket(pInitPacket);
			pServerImpl->getFacade()->disposeOutgoingPacket(pInitPacket);
		}

	}
	else{
		//Reply 
		MonitorResponsePacket* pRefusePwdResponse = new MonitorResponsePacket("<root type=\"cref\"/>");
		pChannel->PushPacket(pRefusePwdResponse);
		pServerImpl->getFacade()->disposeOutgoingPacket(pRefusePwdResponse);
	}
}


void Dispatcher::ProcessClientPendingPackets( CChannel* pChannel )
{
	/*
OutgoingPacket* _pPacket = pServerImpl->getFacade()->getTestPacket();
	pChannel->SendData(_pPacket);
	delete _pPacket;


	return;
*/
	
	//Get subscriber key :
	std::string subscriberKey;
	if(pChannel->isObserverChannel())
		subscriberKey = Utilities::stringBuilderA("%d", pChannel->getKey());
	else
		subscriberKey = pChannel->getClient();


	// Get broadcast manager :
	BroadcastManagerImplBase* pBroadcastMgr = NULL;
	if (pChannel->isObserverChannel())
		pBroadcastMgr  = pMonitorsBroadcastManager;
	else
		pBroadcastMgr  = pServerImpl->getBroadcastManager()->pImpl;


	std::string broadcastGroup;
	BroadcastChannel::PacketHANDLE hPacket;

	OutgoingPacket* pPacket = pBroadcastMgr->getNextPacket(subscriberKey.c_str(), hPacket, broadcastGroup);
	if(!pPacket)
		return;

	bool bSuccess = pChannel->PushPacket(pPacket);
	pBroadcastMgr->disposePacket(hPacket, broadcastGroup, subscriberKey, bSuccess);	
}





}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Apache License, Version 2.0


Written By
Technical Lead
Tunisia Tunisia
Services:
http://www.pushframework.com/?page_id=890

Comments and Discussions