Click here to Skip to main content
15,895,808 members
Articles / Programming Languages / C++

Push Framework - A C++ toolkit for high performance server development

Rate me:
Please Sign up or sign in to vote.
4.96/5 (86 votes)
23 May 2012Apache15 min read 262.5K   26.9K   316  
Write asynchronous, multithreaded servers in a few lines of code. Monitor realtime activity with a deploy-only dashboard.
#include "StdAfx.h"
#include "TCPSocketImpl.h"

#include <process.h>


#include "TCPSocket.h"
#include "TCPSocketEvents.h"
#include "ResponseHandler.h"

#include "ScopedLock.h"


#define nReceiveBufferSize 1024 * 8/*8192*/
#define nSendBufferSize 1024 * 8/*8192*/


#include <stdio.h>
#include <tchar.h>

#include <conio.h>
#include <ctype.h>
#include <mmsystem.h>




TCPSocketImpl::TCPSocketImpl(TCPSocket* pFacade)
:oBuffer(nSendBufferSize), inBuffer(nReceiveBufferSize)
{

	this->pFacade = pFacade;

	status = TCPSocket::Disconnected;

	hSocket = NULL;
	hSocketEvent = NULL;

	hThread = NULL;

	bSendInProgress = false;

	::InitializeCriticalSection(&cs);
}

TCPSocketImpl::~TCPSocketImpl()
{
	//Stop processing thread :
	if (status != TCPSocket::Disconnected)
	{
		disconnect(false);
	}	
	::DeleteCriticalSection(&cs);	
}


bool TCPSocketImpl::connect( const char* hostAddress, unsigned int uPort )
{
	ScopedLock lock(cs);

	if(status != TCPSocket::Disconnected)
		return true;

	this->strIP = hostAddress;
	this->uPort = uPort;


	hSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);

	if (hSocket == INVALID_SOCKET)
	{
		/*wcout << L"Could not create  socket " << WSAGetLastError();*/
		return false;
	}

	SOCKADDR_IN		saServer;		
	memset(&saServer,0,sizeof(saServer));

	saServer.sin_family = AF_INET;
	saServer.sin_addr.s_addr = inet_addr(strIP.c_str());
	saServer.sin_port = htons(uPort);

	int nRet = ::connect(hSocket,(sockaddr*)&saServer, sizeof(saServer));

	if (nRet == SOCKET_ERROR &&
		WSAGetLastError() != WSAEWOULDBLOCK)
	{
		int iError = WSAGetLastError();

		closesocket(hSocket);
		return false;
	}

	bool m_bInProgress = true;

	hSocketEvent = WSACreateEvent();
	if (hSocketEvent == WSA_INVALID_EVENT)
	{
		closesocket(hSocket);
		return FALSE;
	}

	nRet = WSAEventSelect(hSocket,
		hSocketEvent,
		FD_CONNECT|FD_CLOSE|FD_READ|FD_WRITE);

	if (nRet == SOCKET_ERROR)
	{
		closesocket(hSocket);
		return false;
	}

	status = TCPSocket::Connecting;

	hKillEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
	UINT m_dwThreadId;
	hThread = (HANDLE)_beginthreadex(NULL, 0, threadProc, (void*) this, 0, &m_dwThreadId);

	return true;
}



unsigned __stdcall TCPSocketImpl::threadProc( LPVOID lParam )
{
	TCPSocketImpl* pThis = reinterpret_cast<TCPSocketImpl*>(lParam);

	pThis->doClientLoop();

	return 0;
}


bool TCPSocketImpl::OnConnect()
{
	ScopedLock lock(cs);

	status = TCPSocket::Connected;

	if (pFacade->isRelayTCPEvents())
	{
		ConnectionStatusEvent* pEvent = new ConnectionStatusEvent;
		pEvent->type = ConnectionStatusEvent::ConnectionProgress;
		pEvent->bStatus = true;
		pFacade->PostQueuedConnectionStatusEvent(pEvent);
	}
	else
		pFacade->onConnected(true);


	return true;
}

bool TCPSocketImpl::OnClose()
{
	status = TCPSocket::Disconnected;

	if (pFacade->isRelayTCPEvents())
	{
		ConnectionStatusEvent* pEvent = new ConnectionStatusEvent;
		pEvent->type = ConnectionStatusEvent::ConnectionProgress;
		pEvent->bStatus = false;
		pFacade->PostQueuedConnectionStatusEvent(pEvent);
	}
	else
		pFacade->onConnected(false);
	return false;
}

bool TCPSocketImpl::OnRead()
{
	DWORD dwSize = 0;
	int nRet = ioctlsocket(hSocket, FIONREAD, &dwSize);

	if (nRet == -1)
		return true;

	if (dwSize == 0){

		if (pFacade->isRelayTCPEvents())
		{
			ConnectionStatusEvent* pEvent = new ConnectionStatusEvent;
			pEvent->type = ConnectionStatusEvent::ConnectionStatus;
			pEvent->bStatus = true;
			pFacade->PostQueuedConnectionStatusEvent(pEvent);
		}
		else
			pFacade->onConnectionClosed(true);

		return false;
	}


	int nBytesToBeRead = min(dwSize, inBuffer.getRemainingSize());

	BYTE* pData = new BYTE[nBytesToBeRead];

	int nRead = recv(hSocket, (char*) pData, nBytesToBeRead, 0);
	if (nRead == -1)
		return true;


	inBuffer.Append((char*)pData, nRead);

	delete [] pData;


	//Now Try depacketize :
	int uCommmandID;
	PushFramework::IncomingPacket* pPacket = NULL;
	int iResult;
	unsigned int uExtractedBytes;

	while (true)
	{
		iResult = pProtocol->tryDeserializeIncomingPacket(inBuffer, pPacket, uCommmandID, uExtractedBytes);
		//iResult = pProtocol->tryDeframeIncomingPacket(inBuffer.GetBuffer(), inBuffer.GetDataSize(), uCommmandID, pPacket, uExtractedBytes);
		if (iResult == PushFramework::Protocol::Success)
		{
			inBuffer.Pop(uExtractedBytes);
			//
			if (pFacade->isRelayTCPEvents())
			{
				ReceivedDataEvent* pEvent = new ReceivedDataEvent;
				pEvent->commandId = uCommmandID;
				pEvent->pPacket = pPacket;
				pFacade->PostQueuedDataReceivedEvent(pEvent);
			}
			else{
				dispatchResponse(uCommmandID, *pPacket);
				pProtocol->disposeIncomingPacket(pPacket);
			}
		}
		else if (iResult == PushFramework::Protocol::eDecodingFailure)
		{
			inBuffer.Pop(uExtractedBytes);
		}
		else
			break;
	}
	if (iResult != PushFramework::Protocol::eIncompletePacket)
	{
		closesocket(hSocket);
		return false;
	}
	return true;
}

void TCPSocketImpl::registerHandler( unsigned int requestId, ResponseHandler* pHandler )
{
	handlerMap[requestId] =  pHandler;
}

void TCPSocketImpl::dispatchResponse( unsigned int requestId, PushFramework::IncomingPacket& packet )
{
	handlerMapT::iterator it = handlerMap.find(requestId);
	if (it==handlerMap.end())
		return;

	ResponseHandler* pHandler = it->second;
	pHandler->handleResponse(packet);
}

void TCPSocketImpl::setProtocol( PushFramework::Protocol* pProtocol )
{
	this->pProtocol = pProtocol;
}

bool TCPSocketImpl::sendRequest( PushFramework::OutgoingPacket* pPacket )
{
	unsigned int uWrittenBytes = 0;
	int iResult = pProtocol->serializeOutgoingPacket(*pPacket, oBuffer, uWrittenBytes);
	//int iResult = pProtocol->serializeOutgoingPacket(pPacket, oBuffer.GetBuffer()+ oBuffer.GetDataSize(), oBuffer.GetMaxDataSize() - oBuffer.GetDataSize(), uWrittenBytes);
	if (iResult!=PushFramework::Protocol::Success)
	{
		return false;
	}
	//oBuffer.GrowSize(uWrittenBytes);
	return WriteBytes();
}

bool TCPSocketImpl::WriteBytes()
{
	if (oBuffer.GetDataSize()==0){
		return true;
	}

	unsigned int uSize = min(4096, oBuffer.GetDataSize());

	int dwBytes = send(hSocket, oBuffer.GetBuffer(), uSize,0);

	if (dwBytes == SOCKET_ERROR)
	{
		if (GetLastError() == WSAEWOULDBLOCK)
		{
			bSendInProgress = true;
			return true;
		}
		else
		{
			closesocket(hSocket);
			return false;
		}
	}
	else
	{
		oBuffer.Pop(uSize);
		return WriteBytes();
	}
}

bool TCPSocketImpl::OnWrite()
{
	bSendInProgress = false;

	if (oBuffer.GetDataSize()==0){

		pFacade->OnReadyToSend();

		return true;
	}


	return WriteBytes();
}

int TCPSocketImpl::getStatus()
{
	return status;
}

void TCPSocketImpl::disconnect( bool waitForSend )
{
	if (waitForSend && oBuffer.GetDataSize()!=0)
	{
		status = TCPSocket::WaitingToClose;
		return;
	}

	closesocket(hSocket);	
	//Stop the processing thread.

	::SetEvent(hKillEvent);


	/*
		if (hRealHandle != hThread)
			{
				cout << "i am waiting for myself" << endl;
				WaitForSingleObject(hThread, INFINITE);
			}
			else
				cout << "i am not waiting for myself" << endl;*/
		
	
	status = TCPSocket::Disconnected;
}


void TCPSocketImpl::doClientLoop(bool bServerMode/* = false*/)
{

	//Periodic Timer
	HANDLE m_hEventTimer = CreateEvent(NULL, TRUE, FALSE, NULL);


	HANDLE			hWaits[3];
	hWaits[0]		= hKillEvent;
	hWaits[1]		= hSocketEvent;
	hWaits[2]		= m_hEventTimer;


	DWORD dwRet = WAIT_OBJECT_0 + 2;
	ResetEvent(m_hEventTimer);

	WSANETWORKEVENTS events;

	while (true)
	{

		if (!bServerMode)
		{
			if(dwRet == (WAIT_OBJECT_0 + 2)){
				timeSetEvent(100, 1, (LPTIMECALLBACK) m_hEventTimer, 0, TIME_ONESHOT|TIME_CALLBACK_EVENT_SET);
			}
		}			

		dwRet = WSAWaitForMultipleEvents(bServerMode ? 2 : 3, hWaits,	FALSE, INFINITE, FALSE);

		if (dwRet == WAIT_OBJECT_0)	{
			break;
		}

		if (dwRet == (WAIT_OBJECT_0 + 2))
		{
			ResetEvent(m_hEventTimer);
			pFacade->onPerformAutomatedJob();
			continue;
		}

		// Figure out what happened
		int nRet = WSAEnumNetworkEvents(hSocket, hSocketEvent, &events);

		if (nRet == SOCKET_ERROR)
		{
			//str.Format("WSAEnumNetworkEvents error %ld\n",WSAGetLastError());
			break;
		}

		//Dispatch event :
		if(events.lNetworkEvents & FD_CONNECT)
		{
			if(!OnConnect())
				break;
		}

		if(events.lNetworkEvents & FD_READ)
		{
			if(!OnRead())
				break;
		}

		if(events.lNetworkEvents & FD_CLOSE)
		{
			if(!OnClose())
				break;
		}


		if(events.lNetworkEvents & FD_WRITE)
		{
			if(!OnWrite())
				break;
		}		
	}


	CloseHandle(m_hEventTimer);
	CloseHandle(hKillEvent);
	
	WSACloseEvent(hSocketEvent);

	status = TCPSocket::Disconnected;
}


//////////////////////////////////////////////////////////////////////////


bool TCPSocketImpl::SetupListeningSocket( const char* interfaceAddress, int uPort )
{
	//Create socket : 
	hServerSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
	if (hServerSocket == INVALID_SOCKET)
	{
		return false;
	}


	//Bind socket to interface address  and port: 
	SOCKADDR_IN		saAddress;		
	saAddress.sin_port = htons(uPort);
	saAddress.sin_family = AF_INET;


	//Check with interface address : 
	if(interfaceAddress)
	{
		u_long nInterfaceAddr = inet_addr(interfaceAddress);
		if (nInterfaceAddr != INADDR_NONE){
			closesocket(hServerSocket);
			return false;
		}
		saAddress.sin_addr.s_addr = nInterfaceAddr;
	}
	else
		saAddress.sin_addr.s_addr = INADDR_ANY;


	int nRet = bind(hServerSocket, (LPSOCKADDR)&saAddress, sizeof(struct sockaddr));
	if (nRet == SOCKET_ERROR)
	{
		closesocket(hServerSocket);
		return false;
	}



	//Create socket events : 
	hServerSocketEvent = WSACreateEvent();
	if (hServerSocketEvent == WSA_INVALID_EVENT)
	{
		closesocket(hServerSocket);
		return false;
	}

	//Interested in accept event only :
	nRet = WSAEventSelect(hServerSocket, hServerSocketEvent,	FD_ACCEPT);
	if (nRet == SOCKET_ERROR)
	{
		WSACloseEvent(hServerSocketEvent);
		closesocket(hServerSocket);
		return false;
	}

	//Start Listen : 	
	nRet = ::listen(hServerSocket, 1 /*listen backlog*/);
	if (nRet == SOCKET_ERROR)
	{
		WSACloseEvent(hServerSocketEvent);
		closesocket(hServerSocket);
		return false;
	}
	return true;
}

bool TCPSocketImpl::listen( int uListenPort, const char* interfaceAddress /*= NULL*/ )
{
	if(!SetupListeningSocket(interfaceAddress, uListenPort))
		return false;

	//this->strIP = interfaceAddress;
	this->uPort = uListenPort;

	hKillEvent	= CreateEvent(NULL, TRUE, FALSE, NULL);

	hThread = (HANDLE)_beginthreadex(NULL, 0, threadProcServer, (void*) this, 0, &m_dwThreadId);

	return true;
}

void TCPSocketImpl::doServerLoop()
{
	HANDLE			hWaits[2];
	hWaits[0]		= hKillEvent;
	hWaits[1]		= hServerSocketEvent;


	WSANETWORKEVENTS events;
	while (true)
	{
		DWORD dwRet = WSAWaitForMultipleEvents(2, hWaits, FALSE, INFINITE, FALSE);

		if (dwRet == WAIT_OBJECT_0)
		{
			break; //Kill signal
		}

		int nRet = WSAEnumNetworkEvents(hServerSocket, hServerSocketEvent, &events);
		if (nRet == SOCKET_ERROR)
		{
			//Error
			break;
		}

		if (events.lNetworkEvents & FD_ACCEPT   && events.iErrorCode[FD_ACCEPT_BIT] == 0)
		{
			SOCKADDR_IN	SockAddr;
			int			nLen = sizeof(SOCKADDR_IN);
			SOCKET		clientSocket;

			clientSocket = accept(hServerSocket, (LPSOCKADDR)&SockAddr, &nLen); 
			if (clientSocket == SOCKET_ERROR &&  (WSAGetLastError()!= WSAEWOULDBLOCK) )
			{
				cout << "error with accept : " << WSAGetLastError() << std::endl;
				break;
			}
			const char chOpt = 1;
			int nErr = setsockopt(clientSocket, IPPROTO_TCP, TCP_NODELAY, &chOpt, sizeof(char));
			if (nErr == -1)
			{
				closesocket(clientSocket);
				break;
			}

			handleNewClient(clientSocket);
		}
		else
			break;
	}
}

unsigned __stdcall TCPSocketImpl::threadProcServer( LPVOID lParam )
{
	TCPSocketImpl* pThis = reinterpret_cast<TCPSocketImpl*>(lParam);

	pThis->doServerLoop();

	return 0;
}

void TCPSocketImpl::handleNewClient( SOCKET clientSocket )
{
	cout << "handling new client connection" << std::endl;
	this->hSocket = clientSocket;
	//Prepare socket events :

	hSocketEvent = WSACreateEvent();
	if (hSocketEvent == WSA_INVALID_EVENT)
	{
		closesocket(hSocket);
		return;
	}

	int nRet = WSAEventSelect(hSocket, hSocketEvent, FD_CLOSE|FD_READ|FD_WRITE);

	if (nRet == SOCKET_ERROR)
	{
		closesocket(hSocket);
		return;
	}

	status = TCPSocket::Connected;
	//Handle the events
	doClientLoop(true);

	cout << "finished handling new client connection" << std::endl;
}

void TCPSocketImpl::stopServer()
{
	disconnect(false);

	closesocket(hServerSocket);	
	WSACloseEvent(hServerSocketEvent);
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Apache License, Version 2.0


Written By
Technical Lead
Tunisia Tunisia
Services:
http://www.pushframework.com/?page_id=890

Comments and Discussions