#include "StdAfx.h"
#include "TCPSocketImpl.h"
#include <process.h>
#include "TCPSocket.h"
#include "TCPSocketEvents.h"
#include "ResponseHandler.h"
#include "ScopedLock.h"
#define nReceiveBufferSize 1024 * 8/*8192*/
#define nSendBufferSize 1024 * 8/*8192*/
#include <stdio.h>
#include <tchar.h>
#include <conio.h>
#include <ctype.h>
#include <mmsystem.h>
TCPSocketImpl::TCPSocketImpl(TCPSocket* pFacade)
:oBuffer(nSendBufferSize), inBuffer(nReceiveBufferSize)
{
this->pFacade = pFacade;
status = TCPSocket::Disconnected;
hSocket = NULL;
hSocketEvent = NULL;
hThread = NULL;
bSendInProgress = false;
::InitializeCriticalSection(&cs);
}
TCPSocketImpl::~TCPSocketImpl()
{
//Stop processing thread :
if (status != TCPSocket::Disconnected)
{
disconnect(false);
}
::DeleteCriticalSection(&cs);
}
bool TCPSocketImpl::connect( const char* hostAddress, unsigned int uPort )
{
ScopedLock lock(cs);
if(status != TCPSocket::Disconnected)
return true;
this->strIP = hostAddress;
this->uPort = uPort;
hSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (hSocket == INVALID_SOCKET)
{
/*wcout << L"Could not create socket " << WSAGetLastError();*/
return false;
}
SOCKADDR_IN saServer;
memset(&saServer,0,sizeof(saServer));
saServer.sin_family = AF_INET;
saServer.sin_addr.s_addr = inet_addr(strIP.c_str());
saServer.sin_port = htons(uPort);
int nRet = ::connect(hSocket,(sockaddr*)&saServer, sizeof(saServer));
if (nRet == SOCKET_ERROR &&
WSAGetLastError() != WSAEWOULDBLOCK)
{
int iError = WSAGetLastError();
closesocket(hSocket);
return false;
}
bool m_bInProgress = true;
hSocketEvent = WSACreateEvent();
if (hSocketEvent == WSA_INVALID_EVENT)
{
closesocket(hSocket);
return FALSE;
}
nRet = WSAEventSelect(hSocket,
hSocketEvent,
FD_CONNECT|FD_CLOSE|FD_READ|FD_WRITE);
if (nRet == SOCKET_ERROR)
{
closesocket(hSocket);
return false;
}
status = TCPSocket::Connecting;
hKillEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
UINT m_dwThreadId;
hThread = (HANDLE)_beginthreadex(NULL, 0, threadProc, (void*) this, 0, &m_dwThreadId);
return true;
}
unsigned __stdcall TCPSocketImpl::threadProc( LPVOID lParam )
{
TCPSocketImpl* pThis = reinterpret_cast<TCPSocketImpl*>(lParam);
pThis->doClientLoop();
return 0;
}
bool TCPSocketImpl::OnConnect()
{
ScopedLock lock(cs);
status = TCPSocket::Connected;
if (pFacade->isRelayTCPEvents())
{
ConnectionStatusEvent* pEvent = new ConnectionStatusEvent;
pEvent->type = ConnectionStatusEvent::ConnectionProgress;
pEvent->bStatus = true;
pFacade->PostQueuedConnectionStatusEvent(pEvent);
}
else
pFacade->onConnected(true);
return true;
}
bool TCPSocketImpl::OnClose()
{
status = TCPSocket::Disconnected;
if (pFacade->isRelayTCPEvents())
{
ConnectionStatusEvent* pEvent = new ConnectionStatusEvent;
pEvent->type = ConnectionStatusEvent::ConnectionProgress;
pEvent->bStatus = false;
pFacade->PostQueuedConnectionStatusEvent(pEvent);
}
else
pFacade->onConnected(false);
return false;
}
bool TCPSocketImpl::OnRead()
{
DWORD dwSize = 0;
int nRet = ioctlsocket(hSocket, FIONREAD, &dwSize);
if (nRet == -1)
return true;
if (dwSize == 0){
if (pFacade->isRelayTCPEvents())
{
ConnectionStatusEvent* pEvent = new ConnectionStatusEvent;
pEvent->type = ConnectionStatusEvent::ConnectionStatus;
pEvent->bStatus = true;
pFacade->PostQueuedConnectionStatusEvent(pEvent);
}
else
pFacade->onConnectionClosed(true);
return false;
}
int nBytesToBeRead = min(dwSize, inBuffer.getRemainingSize());
BYTE* pData = new BYTE[nBytesToBeRead];
int nRead = recv(hSocket, (char*) pData, nBytesToBeRead, 0);
if (nRead == -1)
return true;
inBuffer.Append((char*)pData, nRead);
delete [] pData;
//Now Try depacketize :
int uCommmandID;
PushFramework::IncomingPacket* pPacket = NULL;
int iResult;
unsigned int uExtractedBytes;
while (true)
{
iResult = pProtocol->tryDeserializeIncomingPacket(inBuffer, pPacket, uCommmandID, uExtractedBytes);
//iResult = pProtocol->tryDeframeIncomingPacket(inBuffer.GetBuffer(), inBuffer.GetDataSize(), uCommmandID, pPacket, uExtractedBytes);
if (iResult == PushFramework::Protocol::Success)
{
inBuffer.Pop(uExtractedBytes);
//
if (pFacade->isRelayTCPEvents())
{
ReceivedDataEvent* pEvent = new ReceivedDataEvent;
pEvent->commandId = uCommmandID;
pEvent->pPacket = pPacket;
pFacade->PostQueuedDataReceivedEvent(pEvent);
}
else{
dispatchResponse(uCommmandID, *pPacket);
pProtocol->disposeIncomingPacket(pPacket);
}
}
else if (iResult == PushFramework::Protocol::eDecodingFailure)
{
inBuffer.Pop(uExtractedBytes);
}
else
break;
}
if (iResult != PushFramework::Protocol::eIncompletePacket)
{
closesocket(hSocket);
return false;
}
return true;
}
void TCPSocketImpl::registerHandler( unsigned int requestId, ResponseHandler* pHandler )
{
handlerMap[requestId] = pHandler;
}
void TCPSocketImpl::dispatchResponse( unsigned int requestId, PushFramework::IncomingPacket& packet )
{
handlerMapT::iterator it = handlerMap.find(requestId);
if (it==handlerMap.end())
return;
ResponseHandler* pHandler = it->second;
pHandler->handleResponse(packet);
}
void TCPSocketImpl::setProtocol( PushFramework::Protocol* pProtocol )
{
this->pProtocol = pProtocol;
}
bool TCPSocketImpl::sendRequest( PushFramework::OutgoingPacket* pPacket )
{
unsigned int uWrittenBytes = 0;
int iResult = pProtocol->serializeOutgoingPacket(*pPacket, oBuffer, uWrittenBytes);
//int iResult = pProtocol->serializeOutgoingPacket(pPacket, oBuffer.GetBuffer()+ oBuffer.GetDataSize(), oBuffer.GetMaxDataSize() - oBuffer.GetDataSize(), uWrittenBytes);
if (iResult!=PushFramework::Protocol::Success)
{
return false;
}
//oBuffer.GrowSize(uWrittenBytes);
return WriteBytes();
}
bool TCPSocketImpl::WriteBytes()
{
if (oBuffer.GetDataSize()==0){
return true;
}
unsigned int uSize = min(4096, oBuffer.GetDataSize());
int dwBytes = send(hSocket, oBuffer.GetBuffer(), uSize,0);
if (dwBytes == SOCKET_ERROR)
{
if (GetLastError() == WSAEWOULDBLOCK)
{
bSendInProgress = true;
return true;
}
else
{
closesocket(hSocket);
return false;
}
}
else
{
oBuffer.Pop(uSize);
return WriteBytes();
}
}
bool TCPSocketImpl::OnWrite()
{
bSendInProgress = false;
if (oBuffer.GetDataSize()==0){
pFacade->OnReadyToSend();
return true;
}
return WriteBytes();
}
int TCPSocketImpl::getStatus()
{
return status;
}
void TCPSocketImpl::disconnect( bool waitForSend )
{
if (waitForSend && oBuffer.GetDataSize()!=0)
{
status = TCPSocket::WaitingToClose;
return;
}
closesocket(hSocket);
//Stop the processing thread.
::SetEvent(hKillEvent);
/*
if (hRealHandle != hThread)
{
cout << "i am waiting for myself" << endl;
WaitForSingleObject(hThread, INFINITE);
}
else
cout << "i am not waiting for myself" << endl;*/
status = TCPSocket::Disconnected;
}
void TCPSocketImpl::doClientLoop(bool bServerMode/* = false*/)
{
//Periodic Timer
HANDLE m_hEventTimer = CreateEvent(NULL, TRUE, FALSE, NULL);
HANDLE hWaits[3];
hWaits[0] = hKillEvent;
hWaits[1] = hSocketEvent;
hWaits[2] = m_hEventTimer;
DWORD dwRet = WAIT_OBJECT_0 + 2;
ResetEvent(m_hEventTimer);
WSANETWORKEVENTS events;
while (true)
{
if (!bServerMode)
{
if(dwRet == (WAIT_OBJECT_0 + 2)){
timeSetEvent(100, 1, (LPTIMECALLBACK) m_hEventTimer, 0, TIME_ONESHOT|TIME_CALLBACK_EVENT_SET);
}
}
dwRet = WSAWaitForMultipleEvents(bServerMode ? 2 : 3, hWaits, FALSE, INFINITE, FALSE);
if (dwRet == WAIT_OBJECT_0) {
break;
}
if (dwRet == (WAIT_OBJECT_0 + 2))
{
ResetEvent(m_hEventTimer);
pFacade->onPerformAutomatedJob();
continue;
}
// Figure out what happened
int nRet = WSAEnumNetworkEvents(hSocket, hSocketEvent, &events);
if (nRet == SOCKET_ERROR)
{
//str.Format("WSAEnumNetworkEvents error %ld\n",WSAGetLastError());
break;
}
//Dispatch event :
if(events.lNetworkEvents & FD_CONNECT)
{
if(!OnConnect())
break;
}
if(events.lNetworkEvents & FD_READ)
{
if(!OnRead())
break;
}
if(events.lNetworkEvents & FD_CLOSE)
{
if(!OnClose())
break;
}
if(events.lNetworkEvents & FD_WRITE)
{
if(!OnWrite())
break;
}
}
CloseHandle(m_hEventTimer);
CloseHandle(hKillEvent);
WSACloseEvent(hSocketEvent);
status = TCPSocket::Disconnected;
}
//////////////////////////////////////////////////////////////////////////
bool TCPSocketImpl::SetupListeningSocket( const char* interfaceAddress, int uPort )
{
//Create socket :
hServerSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
if (hServerSocket == INVALID_SOCKET)
{
return false;
}
//Bind socket to interface address and port:
SOCKADDR_IN saAddress;
saAddress.sin_port = htons(uPort);
saAddress.sin_family = AF_INET;
//Check with interface address :
if(interfaceAddress)
{
u_long nInterfaceAddr = inet_addr(interfaceAddress);
if (nInterfaceAddr != INADDR_NONE){
closesocket(hServerSocket);
return false;
}
saAddress.sin_addr.s_addr = nInterfaceAddr;
}
else
saAddress.sin_addr.s_addr = INADDR_ANY;
int nRet = bind(hServerSocket, (LPSOCKADDR)&saAddress, sizeof(struct sockaddr));
if (nRet == SOCKET_ERROR)
{
closesocket(hServerSocket);
return false;
}
//Create socket events :
hServerSocketEvent = WSACreateEvent();
if (hServerSocketEvent == WSA_INVALID_EVENT)
{
closesocket(hServerSocket);
return false;
}
//Interested in accept event only :
nRet = WSAEventSelect(hServerSocket, hServerSocketEvent, FD_ACCEPT);
if (nRet == SOCKET_ERROR)
{
WSACloseEvent(hServerSocketEvent);
closesocket(hServerSocket);
return false;
}
//Start Listen :
nRet = ::listen(hServerSocket, 1 /*listen backlog*/);
if (nRet == SOCKET_ERROR)
{
WSACloseEvent(hServerSocketEvent);
closesocket(hServerSocket);
return false;
}
return true;
}
bool TCPSocketImpl::listen( int uListenPort, const char* interfaceAddress /*= NULL*/ )
{
if(!SetupListeningSocket(interfaceAddress, uListenPort))
return false;
//this->strIP = interfaceAddress;
this->uPort = uListenPort;
hKillEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
hThread = (HANDLE)_beginthreadex(NULL, 0, threadProcServer, (void*) this, 0, &m_dwThreadId);
return true;
}
void TCPSocketImpl::doServerLoop()
{
HANDLE hWaits[2];
hWaits[0] = hKillEvent;
hWaits[1] = hServerSocketEvent;
WSANETWORKEVENTS events;
while (true)
{
DWORD dwRet = WSAWaitForMultipleEvents(2, hWaits, FALSE, INFINITE, FALSE);
if (dwRet == WAIT_OBJECT_0)
{
break; //Kill signal
}
int nRet = WSAEnumNetworkEvents(hServerSocket, hServerSocketEvent, &events);
if (nRet == SOCKET_ERROR)
{
//Error
break;
}
if (events.lNetworkEvents & FD_ACCEPT && events.iErrorCode[FD_ACCEPT_BIT] == 0)
{
SOCKADDR_IN SockAddr;
int nLen = sizeof(SOCKADDR_IN);
SOCKET clientSocket;
clientSocket = accept(hServerSocket, (LPSOCKADDR)&SockAddr, &nLen);
if (clientSocket == SOCKET_ERROR && (WSAGetLastError()!= WSAEWOULDBLOCK) )
{
cout << "error with accept : " << WSAGetLastError() << std::endl;
break;
}
const char chOpt = 1;
int nErr = setsockopt(clientSocket, IPPROTO_TCP, TCP_NODELAY, &chOpt, sizeof(char));
if (nErr == -1)
{
closesocket(clientSocket);
break;
}
handleNewClient(clientSocket);
}
else
break;
}
}
unsigned __stdcall TCPSocketImpl::threadProcServer( LPVOID lParam )
{
TCPSocketImpl* pThis = reinterpret_cast<TCPSocketImpl*>(lParam);
pThis->doServerLoop();
return 0;
}
void TCPSocketImpl::handleNewClient( SOCKET clientSocket )
{
cout << "handling new client connection" << std::endl;
this->hSocket = clientSocket;
//Prepare socket events :
hSocketEvent = WSACreateEvent();
if (hSocketEvent == WSA_INVALID_EVENT)
{
closesocket(hSocket);
return;
}
int nRet = WSAEventSelect(hSocket, hSocketEvent, FD_CLOSE|FD_READ|FD_WRITE);
if (nRet == SOCKET_ERROR)
{
closesocket(hSocket);
return;
}
status = TCPSocket::Connected;
//Handle the events
doClientLoop(true);
cout << "finished handling new client connection" << std::endl;
}
void TCPSocketImpl::stopServer()
{
disconnect(false);
closesocket(hServerSocket);
WSACloseEvent(hServerSocketEvent);
}