// SyncSocketWorker.h: interface for the CSyncSocketWorker class.
// Author: Kevin Hua
// Create: 2002-2-25 14:00
// Revision: 1
// Contact: Kevin-Hua@Woncore.com
// Notes: not use inline function for CSyncSocketWorker
//////////////////////////////////////////////////////////////////////
#if !defined(AFX_SYNCSOCKETWORKER_H__7D3D2BB8_8007_42F0_A800_3B6CC1A9C0ED__INCLUDED_)
#define AFX_SYNCSOCKETWORKER_H__7D3D2BB8_8007_42F0_A800_3B6CC1A9C0ED__INCLUDED_
#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
#include <winsock2.h>
#include <process.h>
#include <list>
#include <string>
using namespace std;
#ifdef _DEBUG
#define new DEBUG_NEW
#endif
#pragma warning(disable : 4786)
template<class TDerivedWorker>
class CSyncSocketWorker
{
protected:
class CDataBuffer;
typedef list<CDataBuffer* > BUFFER_LIST;
public:
CSyncSocketWorker()
{
m_socket = INVALID_SOCKET;
m_strPeerAddr = "";
m_nPeerPort = 0;
m_bCloseWait = TRUE;
m_nThreadCount = 0;
m_bStarting = FALSE;
m_nTimeout = -1;
m_hEventSend = CreateEvent(NULL, FALSE, FALSE, NULL);
m_hEventRecv = CreateEvent(NULL, FALSE, FALSE, NULL);
m_hEventGoOn = CreateEvent(NULL, TRUE, FALSE, NULL);
SetEvent(m_hEventGoOn);
m_hThreadRecv = NULL;
m_hThreadSend = NULL;
m_hThreadWork = NULL;
InitializeCriticalSection(&m_csSend);
InitializeCriticalSection(&m_csRecv);
InitializeCriticalSection(&m_cs);
}
CSyncSocketWorker(const SOCKET s, const char* pszClientAddr, unsigned short nClientPort, BOOL bCloseWait = TRUE)
{
m_socket = s;
m_strPeerAddr = "";
m_nPeerPort = 0;
m_bCloseWait = bCloseWait;
m_nThreadCount = 0;
m_bStarting = FALSE;
m_nTimeout = -1;
m_hEventSend = CreateEvent(NULL, FALSE, FALSE, NULL);
m_hEventRecv = CreateEvent(NULL, FALSE, FALSE, NULL);
m_hEventGoOn = CreateEvent(NULL, TRUE, FALSE, NULL);
SetEvent(m_hEventGoOn);
m_hThreadRecv = NULL;
m_hThreadSend = NULL;
m_hThreadWork = NULL;
InitializeCriticalSection(&m_csSend);
InitializeCriticalSection(&m_csRecv);
InitializeCriticalSection(&m_cs);
}
~CSyncSocketWorker()
{
if(IsRunning())
Stop();
CleanupLists();
CloseHandle(m_hEventSend);
CloseHandle(m_hEventRecv);
CloseHandle(m_hEventGoOn);
DeleteCriticalSection(&m_csSend);
DeleteCriticalSection(&m_csRecv);
DeleteCriticalSection(&m_cs);
}
inline BOOL IsConnected()
{
return m_socket != INVALID_SOCKET;
}
inline BOOL IsShutingdown()
{
return m_bShutingdown ;
}
BOOL IsRecvBufferEmpty()
{
EnterCriticalSection(&m_csRecv);
int nSize = m_listRecv.size();
LeaveCriticalSection(&m_csRecv);
return 0 == nSize;
}
BOOL IsSendBufferEmpty()
{
EnterCriticalSection(&m_csSend);
int nSize = m_listSend.size();
LeaveCriticalSection(&m_csSend);
return 0 == nSize;
}
BOOL Start(const char* pStartErrMsg = NULL, unsigned int nSize = 0)
{
if(IsRunning())
return TRUE;
if(m_bStarting)
return TRUE;
m_bStarting = TRUE;
CleanupLists();
m_bShutingdown = FALSE;
m_nLastActionTime = GetTickCount();
m_hThreadRecv = NULL;
m_hThreadSend = NULL;
m_hThreadWork = NULL;
m_nThreadCount = 0;
unsigned dwThread_ID;
ResetEvent(m_hEventGoOn);
if(!(m_hThreadRecv = (HANDLE)_beginthreadex(NULL, 0, RecvThread, this, 0, &dwThread_ID)))
{
if(pStartErrMsg && nSize > 0 && INVALID_SOCKET != m_socket)
send(m_socket, pStartErrMsg,nSize, 0);
if(INVALID_SOCKET != m_socket)
{
shutdown(m_socket, SD_SEND);
closesocket(m_socket);
m_socket = INVALID_SOCKET;
}
m_hThreadRecv = NULL;
m_nThreadCount = 0;
SetEvent(m_hEventGoOn);
return m_bStarting = FALSE;
}
if(WAIT_TIMEOUT == WaitForSingleObject(m_hEventGoOn, 30000))
{
goto outSend;
}
ResetEvent(m_hEventGoOn);
if(!(m_hThreadSend = (HANDLE)_beginthreadex(NULL, 0, SendThread, this, 0, &dwThread_ID)))
{
outSend:
if(pStartErrMsg && nSize > 0 && INVALID_SOCKET != m_socket)
send(m_socket, pStartErrMsg,nSize, 0);
if(INVALID_SOCKET != m_socket)
{
shutdown(m_socket, SD_SEND);
closesocket(m_socket);
m_socket = INVALID_SOCKET;
}
if(WAIT_OBJECT_0 != WaitForSingleObject(m_hThreadRecv, 30000))
{
TerminateThread(m_hThreadRecv, 1);
CloseHandle(m_hThreadRecv);
}
m_hThreadRecv = NULL;
m_hThreadSend = NULL;
m_nThreadCount = 0;
SetEvent(m_hEventGoOn);
return m_bStarting = FALSE;
}
if(WAIT_TIMEOUT == WaitForSingleObject(m_hEventGoOn, 30000))
{
goto outWork;
}
ResetEvent(m_hEventGoOn);
if(!(m_hThreadWork = (HANDLE)_beginthreadex(NULL, 0, WorkThread, this, 0, &dwThread_ID)))
{
outWork:
if(pStartErrMsg && nSize > 0 && INVALID_SOCKET != m_socket)
send(m_socket, pStartErrMsg,nSize, 0);
if(INVALID_SOCKET != m_socket)
{
shutdown(m_socket, SD_SEND);
closesocket(m_socket);
m_socket = INVALID_SOCKET;
}
if(WAIT_OBJECT_0 != WaitForSingleObject(m_hThreadRecv, 30000))
{
TerminateThread(m_hThreadRecv, 1);
CloseHandle(m_hThreadRecv);
}
if(WAIT_OBJECT_0 != WaitForSingleObject(m_hThreadSend, 30000))
{
TerminateThread(m_hThreadSend, 1);
CloseHandle(m_hThreadSend);
}
m_hThreadRecv = NULL;
m_hThreadSend = NULL;
m_hThreadWork = NULL;
m_nThreadCount = 0;
SetEvent(m_hEventGoOn);
return m_bStarting = FALSE;
}
if(WAIT_TIMEOUT == WaitForSingleObject(m_hEventGoOn, 30000))
{
if(pStartErrMsg && nSize > 0 && INVALID_SOCKET != m_socket)
send(m_socket, pStartErrMsg,nSize, 0);
if(INVALID_SOCKET != m_socket)
{
shutdown(m_socket, SD_SEND);
closesocket(m_socket);
m_socket = INVALID_SOCKET;
}
if(WAIT_OBJECT_0 != WaitForSingleObject(m_hThreadWork, 3000))
{
TerminateThread(m_hThreadWork, 1);
CloseHandle(m_hThreadWork);
}
goto outWork;
}
m_bStarting = FALSE;
return TRUE;
}
void Stop()
{
while(m_bStarting)
Sleep(100);
while(m_nThreadCount == 1 || m_nThreadCount == 2)
Sleep(100);
if(m_bShutingdown)
return;
m_bShutingdown = TRUE;
SetEvent(m_hEventRecv); // cause Read return
if(m_bCloseWait)
WaitForSingleObject(m_hEventGoOn, -1);
if(INVALID_SOCKET != m_socket)
{
shutdown(m_socket, SD_SEND);
closesocket(m_socket);
m_socket = INVALID_SOCKET;
}
SetEvent(m_hEventRecv);
SetEvent(m_hEventSend);
if(WAIT_TIMEOUT == WaitForSingleObject(m_hThreadRecv, 30000))
{
TerminateThread(m_hThreadRecv, 21);
CloseHandle(m_hThreadRecv);
}
SetEvent(m_hEventRecv);
SetEvent(m_hEventSend);
if(WAIT_TIMEOUT == WaitForSingleObject(m_hThreadSend, 30000))
{
TerminateThread(m_hThreadSend, 22);
CloseHandle(m_hThreadSend);
}
SetEvent(m_hEventRecv);
SetEvent(m_hEventSend);
if(WAIT_TIMEOUT == WaitForSingleObject(m_hThreadWork, 30000))
{
TerminateThread(m_hThreadWork, 23);
CloseHandle(m_hThreadWork);
}
m_hThreadRecv = NULL;
m_hThreadSend = NULL;
m_hThreadWork = NULL;
m_nThreadCount = 0;
CleanupLists();
m_bShutingdown = FALSE;
}
void Disconnect()
{
shutdown(m_socket, SD_SEND);
closesocket(m_socket);
m_socket = INVALID_SOCKET;
}
private:
void CleanupLists()
{
EnterCriticalSection(&m_csSend);
BUFFER_LIST::iterator it;
for(it = m_listSend.begin(); it != m_listSend.end(); ++it)
delete *it;
m_listSend.clear();
LeaveCriticalSection(&m_csSend);
EnterCriticalSection(&m_csRecv);
for(it = m_listRecv.begin(); it != m_listRecv.end(); ++it)
delete *it;
m_listRecv.clear();
LeaveCriticalSection(&m_csRecv);
}
inline BOOL IsRunning()
{
return m_nThreadCount != 0;
}
static unsigned __stdcall RecvThread(void* pParam)
{
CSyncSocketWorker* pWorker = (CSyncSocketWorker*)pParam;
InterlockedIncrement(&pWorker->m_nThreadCount);
int nBufferSize = 32*1024;
char* pRecvBuffer = new char [nBufferSize];
SetEvent(pWorker->m_hEventGoOn);
if(pRecvBuffer)
{
while(!pWorker->m_bShutingdown)
{
int nRecv = recv(pWorker->m_socket, pRecvBuffer, nBufferSize, 0);
if(SOCKET_ERROR == nRecv || 0 == nRecv)
{
int nWSAErr = WSAGetLastError();
TRACE(TEXT("Recv Exit WSAErr: %d\n"), nWSAErr);
if(nWSAErr == WSAEMSGSIZE)
{
delete [] pRecvBuffer;
nBufferSize *= 2;
pRecvBuffer = new char [nBufferSize];
if(pRecvBuffer)
continue;
}
else if(nWSAErr == WSAEINTR)
continue;
break;
}
else
{
pWorker->m_nLastActionTime = GetTickCount();
EnterCriticalSection(&pWorker->m_csRecv);
CDataBuffer* pBuffer = new CDataBuffer(pRecvBuffer, nRecv);
if(pBuffer)
pWorker->m_listRecv.push_back(pBuffer);
else
{
Sleep(500);
CDataBuffer* pBuffer = new CDataBuffer(pRecvBuffer, nRecv);
if(pBuffer)
pWorker->m_listRecv.push_back(pBuffer);
else
{
break;
}
}
LeaveCriticalSection(&pWorker->m_csRecv);
SetEvent(pWorker->m_hEventRecv);
}
}
}
SetEvent(pWorker->m_hEventRecv); // for block read
delete [] pRecvBuffer;
if(INVALID_SOCKET != pWorker->m_socket)
{
shutdown(pWorker->m_socket, SD_SEND);
closesocket(pWorker->m_socket);
pWorker->m_socket = INVALID_SOCKET;
}
InterlockedDecrement(&pWorker->m_nThreadCount);
return 0;
}
static unsigned __stdcall SendThread(void* pParam)
{
CDataBuffer* pDataBuffer;
const int nSendBufferSize = 16 * 1024;
char* pSendBuffer = new char [nSendBufferSize];
int nBufferBytes;
CSyncSocketWorker* pWorker = (CSyncSocketWorker*)pParam;
InterlockedIncrement(&pWorker->m_nThreadCount);
SetEvent(pWorker->m_hEventGoOn);
while(INVALID_SOCKET != pWorker->m_socket)
{
if(WAIT_TIMEOUT == WaitForSingleObject(pWorker->m_hEventSend, 5000))
{
if(GetTickCount() - pWorker->m_nLastActionTime > pWorker->m_nTimeout)
break;
else
continue;
}
do
{
pDataBuffer = NULL;
nBufferBytes = 0;
BOOL bConti = TRUE;
do
{
EnterCriticalSection(&pWorker->m_csSend);
BUFFER_LIST::iterator it = pWorker->m_listSend.begin();
if(it != pWorker->m_listSend.end())
{
pDataBuffer = *it;
if(pDataBuffer->GetSize() <= nSendBufferSize - nBufferBytes)
{
memcpy(pSendBuffer + nBufferBytes, pDataBuffer->GetBuffer(), pDataBuffer->GetSize());
nBufferBytes += pDataBuffer->GetSize();
delete pDataBuffer;
pDataBuffer = NULL;
pWorker->m_listSend.pop_front();
}
else
bConti = FALSE;
}
else
bConti = FALSE;
LeaveCriticalSection(&pWorker->m_csSend);
}while(bConti);
if(nBufferBytes != 0)
{
int nSent = send(pWorker->m_socket, pSendBuffer, nBufferBytes, 0);
pWorker->m_nLastActionTime = GetTickCount();
if(SOCKET_ERROR == nSent || 0 == nSent)
{
TRACE(TEXT("Send Exit WSAErr: %d\n"), WSAGetLastError());
if(pDataBuffer = new CDataBuffer(pSendBuffer, nBufferBytes))
{
EnterCriticalSection(&pWorker->m_csSend);
pWorker->m_listSend.push_front(pDataBuffer);
LeaveCriticalSection(&pWorker->m_csSend);
}
break;
}
continue;
}
else if(NULL != pDataBuffer)// just use the large dataBuffer
{
int nSent = 0;
if(pDataBuffer->GetSize())
{
nSent = send(pWorker->m_socket, pDataBuffer->GetBuffer(), pDataBuffer->GetSize(), 0);
pWorker->m_nLastActionTime = GetTickCount();
if(SOCKET_ERROR == nSent || 0 == nSent)
{
TRACE(TEXT("Send Exit WSAErr: %d\n"), WSAGetLastError());
break;
}
}
EnterCriticalSection(&pWorker->m_csSend);
pWorker->m_listSend.pop_front();
LeaveCriticalSection(&pWorker->m_csSend);
delete pDataBuffer;
}
else
break;
}while(true);
if(pWorker->m_bShutingdown)
break;
}
if(INVALID_SOCKET != pWorker->m_socket)
{
shutdown(pWorker->m_socket, SD_SEND);
closesocket(pWorker->m_socket);
pWorker->m_socket = INVALID_SOCKET;
}
InterlockedDecrement(&pWorker->m_nThreadCount);
delete [] pSendBuffer;
return 0;
}
static unsigned __stdcall WorkThread(void* pParam)
{
CSyncSocketWorker* pWorker = (CSyncSocketWorker*)pParam;
TDerivedWorker* pDerivedWorker = (TDerivedWorker*)pWorker;
InterlockedIncrement(&pWorker->m_nThreadCount);
SetEvent(pWorker->m_hEventGoOn);
ResetEvent(pWorker->m_hEventGoOn);
pDerivedWorker->ProcessData();
SetEvent(pWorker->m_hEventGoOn);
TRACE(TEXT("Work Exit WSAErr: %d\n"), WSAGetLastError());
if(INVALID_SOCKET != pWorker->m_socket)
{
shutdown(pWorker->m_socket, SD_SEND);
closesocket(pWorker->m_socket);
pWorker->m_socket = INVALID_SOCKET;
}
InterlockedDecrement(&pWorker->m_nThreadCount);
return 0;
}
public:
// Since this function will close the socket,
// you must provide your own overrided one
// NEVER call CSyncSocketWorker::Stop() in its overrides
virtual void ProcessData() // NEVER call Stop() here
{
if(INVALID_SOCKET != m_socket)
{
shutdown(m_socket, SD_SEND);
closesocket(m_socket);
m_socket = INVALID_SOCKET;
}
/***********************************************
const int nBufferSize = 1024;
char* pBuffer = new char[nBufferSize+2];
if(!pBuffer)
return;
memset(pBuffer, 0, nBufferSize + 2);
int nRead;
while(nRead = Read(pBuffer, nBufferSize))
{
if(IsShutingdown())
break;
else
Write(pBuffer, nRead);
}
delete pBuffer;
*********************************************/
}
unsigned int Read(char * pBuffer, const unsigned int nBufferSize, BOOL bBlock = TRUE)
{
if(NULL == pBuffer || nBufferSize == 0)
return 0;
unsigned int nFreeSize = nBufferSize;
char* pFree = pBuffer;
while(true)
{
CDataBuffer* p;
EnterCriticalSection(&m_csRecv);
BUFFER_LIST::iterator it = m_listRecv.begin();
if(it != m_listRecv.end())
p = *it;
else
p = NULL;
LeaveCriticalSection(&m_csRecv);
if(p)
{
if(p->GetSize() <= nFreeSize)
{
memcpy(pFree, p->GetBuffer(), p->GetSize());
pFree += p->GetSize();
nFreeSize -= p->GetSize();
delete p;
m_listRecv.pop_front();
if(nFreeSize == 0)
break;
}
else
{ // the data size of the p object is larger than our free buffer
memcpy(pFree, p->GetBuffer(), nFreeSize);
p->ShrinkBuffer(nFreeSize);
nFreeSize = 0;
break;
}
}
else
{// p == null
if(!bBlock)
break;
else if(IsConnected())
WaitForSingleObject(m_hEventRecv, INFINITE);
else
break;
}
}
return nBufferSize - nFreeSize;
}
BOOL Write(const char * pBuffer, const unsigned int nBufferSize, BOOL bSendRightNow = TRUE)
{
if(NULL == pBuffer || nBufferSize == 0)
return TRUE;
EnterCriticalSection(&m_csSend);
CDataBuffer* p = new CDataBuffer(pBuffer, nBufferSize);
if(p)
m_listSend.push_back(p);
LeaveCriticalSection(&m_csSend);
if(bSendRightNow)
SetEvent(m_hEventSend);
return p != NULL;
}
void SendRightNow()
{
SetEvent(m_hEventSend);
}
void SetCloseWaitFlag(BOOL bWait = TRUE)
{
m_bCloseWait = bWait;
}
unsigned long GetTimeout()
{
return m_nTimeout;
}
unsigned long SetTimeout(unsigned long nNewVal)
{
if(nNewVal == 0)
nNewVal = -1;
unsigned long nOldVal = m_nTimeout;
m_nTimeout = nNewVal;
return nOldVal;
}
// Attributes
string m_strPeerAddr;
unsigned short m_nPeerPort;
protected:
BUFFER_LIST m_listSend;
BUFFER_LIST m_listRecv;
CRITICAL_SECTION m_csSend;
CRITICAL_SECTION m_csRecv;
HANDLE m_hEventSend;
private:
CSyncSocketWorker(const CSyncSocketWorker& rhs)
{
}
CSyncSocketWorker& operator=(const CSyncSocketWorker& rhs)
{
return *this;
}
SOCKET m_socket;
HANDLE m_hThreadRecv;
HANDLE m_hThreadSend;
HANDLE m_hThreadWork;
HANDLE m_hEventGoOn;
HANDLE m_hEventRecv;
BOOL m_bCloseWait;
CRITICAL_SECTION m_cs;
unsigned long m_nTimeout; // in ms
unsigned long m_nLastActionTime;
BOOL m_bStarting;
BOOL m_bShutingdown;
long m_nThreadCount;
protected:
class CDataBuffer
{
public:
void ShrinkBuffer(unsigned int nFrom)
{
if(!m_pData || m_nSize == 0 || nFrom >= m_nSize)
return;
memmove(m_pData, m_pData + nFrom, m_nSize - nFrom);
m_nSize -= nFrom;
}
CDataBuffer()
{
m_pData = NULL;
m_nSize = 0;
}
CDataBuffer(const char* pData, const unsigned int nSize)
{
m_pData = NULL;
m_nSize = 0;
if(NULL != pData && nSize > 0)
{
m_pData = new char[nSize];
memcpy(m_pData, pData, nSize);
m_nSize = nSize;
}
}
CDataBuffer(const CDataBuffer& rhs)
{
*this = rhs;
}
CDataBuffer& operator=(const CDataBuffer& rhs)
{
if(m_pData)
delete [] m_pData;
m_pData = NULL;
m_nSize = 0;
if(NULL != rhs.m_pData && rhs.m_nSize > 0)
{
m_pData = new char[rhs.m_nSize];
memcpy(m_pData, rhs.m_pData, rhs.m_nSize);
m_nSize = rhs.m_nSize;
}
return *this;
}
virtual ~CDataBuffer()
{
if(m_pData)
{
delete [] m_pData;
m_pData = NULL;
}
}
const char* GetBuffer()
{
return (const char*)m_pData;
}
unsigned int GetSize()
{
return m_nSize;
}
private:
char* m_pData;
unsigned int m_nSize;
};
};
#endif // !defined(AFX_SYNCSOCKETWORKER_H__7D3D2BB8_8007_42F0_A800_3B6CC1A9C0ED__INCLUDED_)