Click here to Skip to main content
11,637,002 members (67,655 online)
Click here to Skip to main content
Add your own
alternative version

Push Framework - A C++ toolkit for high performance server development

, 23 May 2012 Apache 143.1K 22.3K 293
Write asynchronous, multithreaded servers in a few lines of code. Monitor realtime activity with a deploy-only dashboard.
ChatApplication.zip
ChatAPI
ChatClient
res
.svn
entries
prop-base
ChatClient.ico.svn-base
props
text-base
ChatClient.ico.svn-base
ChatClient.rc2.svn-base
tmp
prop-base
props
text-base
ChatClient.ico
ChatPackets
ChatProtocol
ChatServer
ChatServer.vcproj.INTERNAL.Ahmed.Charfeddine.user
output
ChatServer.ini
TCPSocket
TCPSocket.zip
XMLProtocol
XMLProtocol.zip
ChatRobots.zip
ChatRobots
ChatRobots.ini
ProtoBufExample.zip
ProtoBufExampleClient
ProtoBufExampleProtocol
requests.pb.cc
responses.pb.cc
ProtoBufExampleServer
PushFramework_Essentials.zip
include
PushFramework.dll
PushFramework.lib
PushFramework_Sources.zip
private
QoS.zip
QoSExampleClient
QoSExampleProtocol
QoSExampleServer
#include "StdAfx.h"
#include "TCPSocketImpl.h"

#include <process.h>


#include "TCPSocket.h"
#include "TCPSocketEvents.h"
#include "ResponseHandler.h"

#include "ScopedLock.h"


#define nReceiveBufferSize 1024 * 8/*8192*/
#define nSendBufferSize 1024 * 8/*8192*/


#include <stdio.h>
#include <tchar.h>

#include <conio.h>
#include <ctype.h>
#include <mmsystem.h>




TCPSocketImpl::TCPSocketImpl(TCPSocket* pFacade)
:oBuffer(nSendBufferSize), inBuffer(nReceiveBufferSize)
{

	this->pFacade = pFacade;

	status = TCPSocket::Disconnected;

	hSocket = NULL;
	hSocketEvent = NULL;

	hThread = NULL;

	bSendInProgress = false;

	::InitializeCriticalSection(&cs);
}

TCPSocketImpl::~TCPSocketImpl()
{
	//Stop processing thread :
	if (status != TCPSocket::Disconnected)
	{
		disconnect(false);
	}	
	::DeleteCriticalSection(&cs);	
}


bool TCPSocketImpl::connect( const char* hostAddress, unsigned int uPort )
{
	ScopedLock lock(cs);

	if(status != TCPSocket::Disconnected)
		return true;

	this->strIP = hostAddress;
	this->uPort = uPort;


	hSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);

	if (hSocket == INVALID_SOCKET)
	{
		/*wcout << L"Could not create  socket " << WSAGetLastError();*/
		return false;
	}

	SOCKADDR_IN		saServer;		
	memset(&saServer,0,sizeof(saServer));

	saServer.sin_family = AF_INET;
	saServer.sin_addr.s_addr = inet_addr(strIP.c_str());
	saServer.sin_port = htons(uPort);

	int nRet = ::connect(hSocket,(sockaddr*)&saServer, sizeof(saServer));

	if (nRet == SOCKET_ERROR &&
		WSAGetLastError() != WSAEWOULDBLOCK)
	{
		int iError = WSAGetLastError();

		closesocket(hSocket);
		return false;
	}

	bool m_bInProgress = true;

	hSocketEvent = WSACreateEvent();
	if (hSocketEvent == WSA_INVALID_EVENT)
	{
		closesocket(hSocket);
		return FALSE;
	}

	nRet = WSAEventSelect(hSocket,
		hSocketEvent,
		FD_CONNECT|FD_CLOSE|FD_READ|FD_WRITE);

	if (nRet == SOCKET_ERROR)
	{
		closesocket(hSocket);
		return false;
	}

	status = TCPSocket::Connecting;

	hKillEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
	UINT m_dwThreadId;
	hThread = (HANDLE)_beginthreadex(NULL, 0, threadProc, (void*) this, 0, &m_dwThreadId);

	return true;
}



unsigned __stdcall TCPSocketImpl::threadProc( LPVOID lParam )
{
	TCPSocketImpl* pThis = reinterpret_cast<TCPSocketImpl*>(lParam);

	pThis->doClientLoop();

	return 0;
}


bool TCPSocketImpl::OnConnect()
{
	ScopedLock lock(cs);

	status = TCPSocket::Connected;

	if (pFacade->isRelayTCPEvents())
	{
		ConnectionStatusEvent* pEvent = new ConnectionStatusEvent;
		pEvent->type = ConnectionStatusEvent::ConnectionProgress;
		pEvent->bStatus = true;
		pFacade->PostQueuedConnectionStatusEvent(pEvent);
	}
	else
		pFacade->onConnected(true);


	return true;
}

bool TCPSocketImpl::OnClose()
{
	status = TCPSocket::Disconnected;

	if (pFacade->isRelayTCPEvents())
	{
		ConnectionStatusEvent* pEvent = new ConnectionStatusEvent;
		pEvent->type = ConnectionStatusEvent::ConnectionProgress;
		pEvent->bStatus = false;
		pFacade->PostQueuedConnectionStatusEvent(pEvent);
	}
	else
		pFacade->onConnected(false);
	return false;
}

bool TCPSocketImpl::OnRead()
{
	DWORD dwSize = 0;
	int nRet = ioctlsocket(hSocket, FIONREAD, &dwSize);

	if (nRet == -1)
		return true;

	if (dwSize == 0){

		if (pFacade->isRelayTCPEvents())
		{
			ConnectionStatusEvent* pEvent = new ConnectionStatusEvent;
			pEvent->type = ConnectionStatusEvent::ConnectionStatus;
			pEvent->bStatus = true;
			pFacade->PostQueuedConnectionStatusEvent(pEvent);
		}
		else
			pFacade->onConnectionClosed(true);

		return false;
	}


	int nBytesToBeRead = min(dwSize, inBuffer.getRemainingSize());

	BYTE* pData = new BYTE[nBytesToBeRead];

	int nRead = recv(hSocket, (char*) pData, nBytesToBeRead, 0);
	if (nRead == -1)
		return true;


	inBuffer.Append((char*)pData, nRead);

	delete [] pData;


	//Now Try depacketize :
	int uCommmandID;
	PushFramework::IncomingPacket* pPacket = NULL;
	int iResult;
	unsigned int uExtractedBytes;

	while (true)
	{
		iResult = pProtocol->tryDeserializeIncomingPacket(inBuffer, pPacket, uCommmandID, uExtractedBytes);
		//iResult = pProtocol->tryDeframeIncomingPacket(inBuffer.GetBuffer(), inBuffer.GetDataSize(), uCommmandID, pPacket, uExtractedBytes);
		if (iResult == PushFramework::Protocol::Success)
		{
			inBuffer.Pop(uExtractedBytes);
			//
			if (pFacade->isRelayTCPEvents())
			{
				ReceivedDataEvent* pEvent = new ReceivedDataEvent;
				pEvent->commandId = uCommmandID;
				pEvent->pPacket = pPacket;
				pFacade->PostQueuedDataReceivedEvent(pEvent);
			}
			else{
				dispatchResponse(uCommmandID, *pPacket);
				pProtocol->disposeIncomingPacket(pPacket);
			}
		}
		else if (iResult == PushFramework::Protocol::eDecodingFailure)
		{
			inBuffer.Pop(uExtractedBytes);
		}
		else
			break;
	}
	if (iResult != PushFramework::Protocol::eIncompletePacket)
	{
		closesocket(hSocket);
		return false;
	}
	return true;
}

void TCPSocketImpl::registerHandler( unsigned int requestId, ResponseHandler* pHandler )
{
	handlerMap[requestId] =  pHandler;
}

void TCPSocketImpl::dispatchResponse( unsigned int requestId, PushFramework::IncomingPacket& packet )
{
	handlerMapT::iterator it = handlerMap.find(requestId);
	if (it==handlerMap.end())
		return;

	ResponseHandler* pHandler = it->second;
	pHandler->handleResponse(packet);
}

void TCPSocketImpl::setProtocol( PushFramework::Protocol* pProtocol )
{
	this->pProtocol = pProtocol;
}

bool TCPSocketImpl::sendRequest( PushFramework::OutgoingPacket* pPacket )
{
	unsigned int uWrittenBytes = 0;
	int iResult = pProtocol->serializeOutgoingPacket(*pPacket, oBuffer, uWrittenBytes);
	//int iResult = pProtocol->serializeOutgoingPacket(pPacket, oBuffer.GetBuffer()+ oBuffer.GetDataSize(), oBuffer.GetMaxDataSize() - oBuffer.GetDataSize(), uWrittenBytes);
	if (iResult!=PushFramework::Protocol::Success)
	{
		return false;
	}
	//oBuffer.GrowSize(uWrittenBytes);
	return WriteBytes();
}

bool TCPSocketImpl::WriteBytes()
{
	if (oBuffer.GetDataSize()==0){
		return true;
	}

	unsigned int uSize = min(4096, oBuffer.GetDataSize());

	int dwBytes = send(hSocket, oBuffer.GetBuffer(), uSize,0);

	if (dwBytes == SOCKET_ERROR)
	{
		if (GetLastError() == WSAEWOULDBLOCK)
		{
			bSendInProgress = true;
			return true;
		}
		else
		{
			closesocket(hSocket);
			return false;
		}
	}
	else
	{
		oBuffer.Pop(uSize);
		return WriteBytes();
	}
}

bool TCPSocketImpl::OnWrite()
{
	bSendInProgress = false;

	if (oBuffer.GetDataSize()==0){

		pFacade->OnReadyToSend();

		return true;
	}


	return WriteBytes();
}

int TCPSocketImpl::getStatus()
{
	return status;
}

void TCPSocketImpl::disconnect( bool waitForSend )
{
	if (waitForSend && oBuffer.GetDataSize()!=0)
	{
		status = TCPSocket::WaitingToClose;
		return;
	}

	closesocket(hSocket);	
	//Stop the processing thread.

	::SetEvent(hKillEvent);


	/*
		if (hRealHandle != hThread)
			{
				cout << "i am waiting for myself" << endl;
				WaitForSingleObject(hThread, INFINITE);
			}
			else
				cout << "i am not waiting for myself" << endl;*/
		
	
	status = TCPSocket::Disconnected;
}


void TCPSocketImpl::doClientLoop(bool bServerMode/* = false*/)
{

	//Periodic Timer
	HANDLE m_hEventTimer = CreateEvent(NULL, TRUE, FALSE, NULL);


	HANDLE			hWaits[3];
	hWaits[0]		= hKillEvent;
	hWaits[1]		= hSocketEvent;
	hWaits[2]		= m_hEventTimer;


	DWORD dwRet = WAIT_OBJECT_0 + 2;
	ResetEvent(m_hEventTimer);

	WSANETWORKEVENTS events;

	while (true)
	{

		if (!bServerMode)
		{
			if(dwRet == (WAIT_OBJECT_0 + 2)){
				timeSetEvent(100, 1, (LPTIMECALLBACK) m_hEventTimer, 0, TIME_ONESHOT|TIME_CALLBACK_EVENT_SET);
			}
		}			

		dwRet = WSAWaitForMultipleEvents(bServerMode ? 2 : 3, hWaits,	FALSE, INFINITE, FALSE);

		if (dwRet == WAIT_OBJECT_0)	{
			break;
		}

		if (dwRet == (WAIT_OBJECT_0 + 2))
		{
			ResetEvent(m_hEventTimer);
			pFacade->onPerformAutomatedJob();
			continue;
		}

		// Figure out what happened
		int nRet = WSAEnumNetworkEvents(hSocket, hSocketEvent, &events);

		if (nRet == SOCKET_ERROR)
		{
			//str.Format("WSAEnumNetworkEvents error %ld\n",WSAGetLastError());
			break;
		}

		//Dispatch event :
		if(events.lNetworkEvents & FD_CONNECT)
		{
			if(!OnConnect())
				break;
		}

		if(events.lNetworkEvents & FD_READ)
		{
			if(!OnRead())
				break;
		}

		if(events.lNetworkEvents & FD_CLOSE)
		{
			if(!OnClose())
				break;
		}


		if(events.lNetworkEvents & FD_WRITE)
		{
			if(!OnWrite())
				break;
		}		
	}


	CloseHandle(m_hEventTimer);
	CloseHandle(hKillEvent);
	
	WSACloseEvent(hSocketEvent);

	status = TCPSocket::Disconnected;
}


//////////////////////////////////////////////////////////////////////////


bool TCPSocketImpl::SetupListeningSocket( const char* interfaceAddress, int uPort )
{
	//Create socket : 
	hServerSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
	if (hServerSocket == INVALID_SOCKET)
	{
		return false;
	}


	//Bind socket to interface address  and port: 
	SOCKADDR_IN		saAddress;		
	saAddress.sin_port = htons(uPort);
	saAddress.sin_family = AF_INET;


	//Check with interface address : 
	if(interfaceAddress)
	{
		u_long nInterfaceAddr = inet_addr(interfaceAddress);
		if (nInterfaceAddr != INADDR_NONE){
			closesocket(hServerSocket);
			return false;
		}
		saAddress.sin_addr.s_addr = nInterfaceAddr;
	}
	else
		saAddress.sin_addr.s_addr = INADDR_ANY;


	int nRet = bind(hServerSocket, (LPSOCKADDR)&saAddress, sizeof(struct sockaddr));
	if (nRet == SOCKET_ERROR)
	{
		closesocket(hServerSocket);
		return false;
	}



	//Create socket events : 
	hServerSocketEvent = WSACreateEvent();
	if (hServerSocketEvent == WSA_INVALID_EVENT)
	{
		closesocket(hServerSocket);
		return false;
	}

	//Interested in accept event only :
	nRet = WSAEventSelect(hServerSocket, hServerSocketEvent,	FD_ACCEPT);
	if (nRet == SOCKET_ERROR)
	{
		WSACloseEvent(hServerSocketEvent);
		closesocket(hServerSocket);
		return false;
	}

	//Start Listen : 	
	nRet = ::listen(hServerSocket, 1 /*listen backlog*/);
	if (nRet == SOCKET_ERROR)
	{
		WSACloseEvent(hServerSocketEvent);
		closesocket(hServerSocket);
		return false;
	}
	return true;
}

bool TCPSocketImpl::listen( int uListenPort, const char* interfaceAddress /*= NULL*/ )
{
	if(!SetupListeningSocket(interfaceAddress, uListenPort))
		return false;

	//this->strIP = interfaceAddress;
	this->uPort = uListenPort;

	hKillEvent	= CreateEvent(NULL, TRUE, FALSE, NULL);

	hThread = (HANDLE)_beginthreadex(NULL, 0, threadProcServer, (void*) this, 0, &m_dwThreadId);

	return true;
}

void TCPSocketImpl::doServerLoop()
{
	HANDLE			hWaits[2];
	hWaits[0]		= hKillEvent;
	hWaits[1]		= hServerSocketEvent;


	WSANETWORKEVENTS events;
	while (true)
	{
		DWORD dwRet = WSAWaitForMultipleEvents(2, hWaits, FALSE, INFINITE, FALSE);

		if (dwRet == WAIT_OBJECT_0)
		{
			break; //Kill signal
		}

		int nRet = WSAEnumNetworkEvents(hServerSocket, hServerSocketEvent, &events);
		if (nRet == SOCKET_ERROR)
		{
			//Error
			break;
		}

		if (events.lNetworkEvents & FD_ACCEPT   && events.iErrorCode[FD_ACCEPT_BIT] == 0)
		{
			SOCKADDR_IN	SockAddr;
			int			nLen = sizeof(SOCKADDR_IN);
			SOCKET		clientSocket;

			clientSocket = accept(hServerSocket, (LPSOCKADDR)&SockAddr, &nLen); 
			if (clientSocket == SOCKET_ERROR &&  (WSAGetLastError()!= WSAEWOULDBLOCK) )
			{
				cout << "error with accept : " << WSAGetLastError() << std::endl;
				break;
			}
			const char chOpt = 1;
			int nErr = setsockopt(clientSocket, IPPROTO_TCP, TCP_NODELAY, &chOpt, sizeof(char));
			if (nErr == -1)
			{
				closesocket(clientSocket);
				break;
			}

			handleNewClient(clientSocket);
		}
		else
			break;
	}
}

unsigned __stdcall TCPSocketImpl::threadProcServer( LPVOID lParam )
{
	TCPSocketImpl* pThis = reinterpret_cast<TCPSocketImpl*>(lParam);

	pThis->doServerLoop();

	return 0;
}

void TCPSocketImpl::handleNewClient( SOCKET clientSocket )
{
	cout << "handling new client connection" << std::endl;
	this->hSocket = clientSocket;
	//Prepare socket events :

	hSocketEvent = WSACreateEvent();
	if (hSocketEvent == WSA_INVALID_EVENT)
	{
		closesocket(hSocket);
		return;
	}

	int nRet = WSAEventSelect(hSocket, hSocketEvent, FD_CLOSE|FD_READ|FD_WRITE);

	if (nRet == SOCKET_ERROR)
	{
		closesocket(hSocket);
		return;
	}

	status = TCPSocket::Connected;
	//Handle the events
	doClientLoop(true);

	cout << "finished handling new client connection" << std::endl;
}

void TCPSocketImpl::stopServer()
{
	disconnect(false);

	closesocket(hServerSocket);	
	WSACloseEvent(hServerSocketEvent);
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Apache License, Version 2.0

Share

About the Author

Ahmed Charfeddine
Technical Lead
Tunisia Tunisia
Services:
http://www.pushframework.com/?page_id=890

You may also be interested in...

| Advertise | Privacy | Terms of Use | Mobile
Web01 | 2.8.150728.1 | Last Updated 23 May 2012
Article Copyright 2011 by Ahmed Charfeddine
Everything else Copyright © CodeProject, 1999-2015
Layout: fixed | fluid