Click here to Skip to main content
15,886,199 members
Articles / Desktop Programming / MFC

Multithreaded Non-blocking Socket Server and Client, Based on Synchronized Socket

Rate me:
Please Sign up or sign in to vote.
3.75/5 (4 votes)
11 Mar 2002 97.9K   3.9K   33  
Non-blocking socket class using synchronized socket.
// SyncSocketWorker.h: interface for the CSyncSocketWorker class.
// Author: Kevin Hua
// Create: 2002-2-25 14:00
// Revision: 1
// Contact: Kevin-Hua@Woncore.com
// Notes: not use inline function for CSyncSocketWorker
//////////////////////////////////////////////////////////////////////

#if !defined(AFX_SYNCSOCKETWORKER_H__7D3D2BB8_8007_42F0_A800_3B6CC1A9C0ED__INCLUDED_)
#define AFX_SYNCSOCKETWORKER_H__7D3D2BB8_8007_42F0_A800_3B6CC1A9C0ED__INCLUDED_

#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000

#include <winsock2.h>
#include <process.h>
#include <list>
#include <string>

using namespace std;


#ifdef _DEBUG
#define new DEBUG_NEW
#endif


#pragma warning(disable : 4786)


template<class TDerivedWorker>
class CSyncSocketWorker  
{
protected:
	class CDataBuffer;
	typedef list<CDataBuffer* > BUFFER_LIST;  

public:
	CSyncSocketWorker()
	{
		m_socket = INVALID_SOCKET;
		m_strPeerAddr = "";
		m_nPeerPort = 0;
		m_bCloseWait = TRUE;
		m_nThreadCount = 0;
		m_bStarting = FALSE;
		
		m_nTimeout = -1;

		m_hEventSend = CreateEvent(NULL, FALSE, FALSE, NULL);
		m_hEventRecv = CreateEvent(NULL, FALSE, FALSE, NULL);
		
		m_hEventGoOn = CreateEvent(NULL, TRUE, FALSE, NULL);
		SetEvent(m_hEventGoOn);
		
		m_hThreadRecv = NULL;
		m_hThreadSend = NULL;
		m_hThreadWork = NULL;

		InitializeCriticalSection(&m_csSend);
		InitializeCriticalSection(&m_csRecv);
		InitializeCriticalSection(&m_cs);
	}

	CSyncSocketWorker(const SOCKET s, const char* pszClientAddr, unsigned short nClientPort, BOOL bCloseWait = TRUE)
	{
		m_socket = s;	
		m_strPeerAddr = "";
		m_nPeerPort = 0;
		m_bCloseWait = bCloseWait;
		m_nThreadCount = 0;
		m_bStarting = FALSE;
		
		m_nTimeout = -1;

		m_hEventSend = CreateEvent(NULL, FALSE, FALSE, NULL);
		m_hEventRecv = CreateEvent(NULL, FALSE, FALSE, NULL);

		m_hEventGoOn = CreateEvent(NULL, TRUE, FALSE, NULL);
		SetEvent(m_hEventGoOn);

		m_hThreadRecv = NULL;
		m_hThreadSend = NULL;
		m_hThreadWork = NULL;

		InitializeCriticalSection(&m_csSend);
		InitializeCriticalSection(&m_csRecv);
		InitializeCriticalSection(&m_cs);
	}


	~CSyncSocketWorker()
	{
		if(IsRunning())
			Stop();
		
		CleanupLists();
		
		CloseHandle(m_hEventSend);
		CloseHandle(m_hEventRecv);
		CloseHandle(m_hEventGoOn);
		
		DeleteCriticalSection(&m_csSend);
		DeleteCriticalSection(&m_csRecv);
		DeleteCriticalSection(&m_cs);
	}

	inline BOOL IsConnected()
	{
		return m_socket != INVALID_SOCKET;
	}

	inline BOOL IsShutingdown()
	{
		return m_bShutingdown ;
	}

	BOOL IsRecvBufferEmpty()
	{
		EnterCriticalSection(&m_csRecv);
		int nSize = m_listRecv.size();
		LeaveCriticalSection(&m_csRecv);

		return 0 == nSize;
	}

	BOOL IsSendBufferEmpty()
	{
		EnterCriticalSection(&m_csSend);
		int nSize = m_listSend.size();
		LeaveCriticalSection(&m_csSend);

		return 0 == nSize;
	}
	
	BOOL Start(const char* pStartErrMsg = NULL, unsigned int nSize = 0)
	{
		if(IsRunning())
			return TRUE;

		if(m_bStarting)
			return TRUE;
		
		m_bStarting = TRUE;
		
		CleanupLists();

		m_bShutingdown = FALSE;
		m_nLastActionTime = GetTickCount();
		
		m_hThreadRecv = NULL;
		m_hThreadSend = NULL;
		m_hThreadWork = NULL;

		m_nThreadCount = 0;		

		unsigned dwThread_ID;
		
		ResetEvent(m_hEventGoOn);
		if(!(m_hThreadRecv = (HANDLE)_beginthreadex(NULL, 0, RecvThread, this, 0, &dwThread_ID)))
		{
			if(pStartErrMsg && nSize > 0 && INVALID_SOCKET != m_socket)
					send(m_socket, pStartErrMsg,nSize, 0);

			if(INVALID_SOCKET != m_socket)
			{
				shutdown(m_socket, SD_SEND);
				closesocket(m_socket);
				m_socket = INVALID_SOCKET;
			}
			
			m_hThreadRecv = NULL;
			m_nThreadCount = 0;
			
			SetEvent(m_hEventGoOn);
			return m_bStarting = FALSE;
		}
		if(WAIT_TIMEOUT == WaitForSingleObject(m_hEventGoOn, 30000))
		{
			goto outSend;
		}

		ResetEvent(m_hEventGoOn);	
		if(!(m_hThreadSend = (HANDLE)_beginthreadex(NULL, 0, SendThread, this, 0, &dwThread_ID)))
		{
outSend:
			if(pStartErrMsg && nSize > 0 && INVALID_SOCKET != m_socket)
				send(m_socket, pStartErrMsg,nSize, 0);

			if(INVALID_SOCKET != m_socket)
			{
				shutdown(m_socket, SD_SEND);
				closesocket(m_socket);
				m_socket = INVALID_SOCKET;
			}
			
			if(WAIT_OBJECT_0 != WaitForSingleObject(m_hThreadRecv, 30000))
			{
				TerminateThread(m_hThreadRecv, 1);
				CloseHandle(m_hThreadRecv);
			}

			m_hThreadRecv = NULL;
			m_hThreadSend = NULL;
			
			m_nThreadCount = 0;
			
			SetEvent(m_hEventGoOn);
			return m_bStarting = FALSE;
		}
		if(WAIT_TIMEOUT == WaitForSingleObject(m_hEventGoOn, 30000))
		{
			goto outWork;
		}


		ResetEvent(m_hEventGoOn);	
		if(!(m_hThreadWork = (HANDLE)_beginthreadex(NULL, 0, WorkThread, this, 0, &dwThread_ID)))
		{
outWork:
			if(pStartErrMsg && nSize > 0 && INVALID_SOCKET != m_socket)
				send(m_socket, pStartErrMsg,nSize, 0);

			if(INVALID_SOCKET != m_socket)
			{
				shutdown(m_socket, SD_SEND);
				closesocket(m_socket);
				m_socket = INVALID_SOCKET;
			}

			if(WAIT_OBJECT_0 != WaitForSingleObject(m_hThreadRecv, 30000))
			{
				TerminateThread(m_hThreadRecv, 1);
				CloseHandle(m_hThreadRecv);
			}
			if(WAIT_OBJECT_0 != WaitForSingleObject(m_hThreadSend, 30000))
			{
				TerminateThread(m_hThreadSend, 1);
				CloseHandle(m_hThreadSend);
			}

			m_hThreadRecv = NULL;
			m_hThreadSend = NULL;
			m_hThreadWork = NULL;
			
			m_nThreadCount = 0;
			
			SetEvent(m_hEventGoOn);
			return m_bStarting = FALSE;
		}
		if(WAIT_TIMEOUT == WaitForSingleObject(m_hEventGoOn, 30000))
		{
			if(pStartErrMsg && nSize > 0 && INVALID_SOCKET != m_socket)
				send(m_socket, pStartErrMsg,nSize, 0);

			if(INVALID_SOCKET != m_socket)
			{
				shutdown(m_socket, SD_SEND);
				closesocket(m_socket);
				m_socket = INVALID_SOCKET;
			}
			
			if(WAIT_OBJECT_0 != WaitForSingleObject(m_hThreadWork, 3000))
			{
				TerminateThread(m_hThreadWork, 1);
				CloseHandle(m_hThreadWork);
			}
			
			goto outWork;
		}

		
		m_bStarting = FALSE;
		return TRUE;
	}

	void Stop()
	{		
		while(m_bStarting)
			Sleep(100);
		
		while(m_nThreadCount == 1 || m_nThreadCount == 2)
			Sleep(100);


		if(m_bShutingdown)
			return;
		
		m_bShutingdown = TRUE;
		SetEvent(m_hEventRecv);  // cause Read return
		

		if(m_bCloseWait)
			WaitForSingleObject(m_hEventGoOn, -1);
			
		if(INVALID_SOCKET != m_socket)
		{
			shutdown(m_socket, SD_SEND);
			closesocket(m_socket);
			m_socket = INVALID_SOCKET;
		}

		
		SetEvent(m_hEventRecv);
		SetEvent(m_hEventSend);

		if(WAIT_TIMEOUT == WaitForSingleObject(m_hThreadRecv, 30000))
		{
			TerminateThread(m_hThreadRecv, 21);
			CloseHandle(m_hThreadRecv);
		}
		
		SetEvent(m_hEventRecv);
		SetEvent(m_hEventSend);
		
		if(WAIT_TIMEOUT == WaitForSingleObject(m_hThreadSend, 30000))
		{
			TerminateThread(m_hThreadSend, 22);
			CloseHandle(m_hThreadSend);
		}
		
		SetEvent(m_hEventRecv);
		SetEvent(m_hEventSend);
		
		if(WAIT_TIMEOUT == WaitForSingleObject(m_hThreadWork, 30000))
		{
			TerminateThread(m_hThreadWork, 23);
			CloseHandle(m_hThreadWork);
		}
		
		m_hThreadRecv = NULL;
		m_hThreadSend = NULL;
		m_hThreadWork = NULL;

		m_nThreadCount = 0;
		
		CleanupLists();
		
		m_bShutingdown = FALSE;
	}

	void Disconnect()
	{
		shutdown(m_socket, SD_SEND);
		closesocket(m_socket);
		m_socket = INVALID_SOCKET;
	}
	
private:
	void CleanupLists()
	{
		EnterCriticalSection(&m_csSend);

		BUFFER_LIST::iterator it;
		for(it = m_listSend.begin(); it != m_listSend.end(); ++it)
			delete *it;
		m_listSend.clear();
		
		LeaveCriticalSection(&m_csSend);

		EnterCriticalSection(&m_csRecv);	

		for(it = m_listRecv.begin(); it != m_listRecv.end(); ++it)
			delete *it;

		m_listRecv.clear();

		LeaveCriticalSection(&m_csRecv);
	}
	
	inline BOOL IsRunning()
	{
		return m_nThreadCount != 0;
	}


	static unsigned __stdcall RecvThread(void* pParam)
	{
		CSyncSocketWorker* pWorker = (CSyncSocketWorker*)pParam;
		InterlockedIncrement(&pWorker->m_nThreadCount);
		
		int nBufferSize = 32*1024;
		char* pRecvBuffer = new char [nBufferSize];
		
		SetEvent(pWorker->m_hEventGoOn);

		if(pRecvBuffer)
		{
			while(!pWorker->m_bShutingdown)
			{
				int nRecv = recv(pWorker->m_socket, pRecvBuffer, nBufferSize, 0);
				if(SOCKET_ERROR == nRecv || 0 == nRecv)
				{
					int nWSAErr = WSAGetLastError();
					TRACE(TEXT("Recv Exit WSAErr: %d\n"), nWSAErr);

					if(nWSAErr == WSAEMSGSIZE)
					{
						delete [] pRecvBuffer;
						nBufferSize *= 2;
						pRecvBuffer = new char [nBufferSize];

						if(pRecvBuffer)
							continue;
					}
					else if(nWSAErr == WSAEINTR)
						continue;


					break;
				}
				else
				{
					pWorker->m_nLastActionTime = GetTickCount();

					EnterCriticalSection(&pWorker->m_csRecv);
					CDataBuffer* pBuffer = new CDataBuffer(pRecvBuffer, nRecv);
					if(pBuffer)
						pWorker->m_listRecv.push_back(pBuffer);
					else
					{
						Sleep(500);
						CDataBuffer* pBuffer = new CDataBuffer(pRecvBuffer, nRecv);
						if(pBuffer)
							pWorker->m_listRecv.push_back(pBuffer);
						else
						{
							break;
						}
					}
					LeaveCriticalSection(&pWorker->m_csRecv);
					
					SetEvent(pWorker->m_hEventRecv);
				}
			}
		}
	
		SetEvent(pWorker->m_hEventRecv); // for block read

		delete [] pRecvBuffer;

		if(INVALID_SOCKET != pWorker->m_socket)
		{
			shutdown(pWorker->m_socket, SD_SEND);
			closesocket(pWorker->m_socket);
			pWorker->m_socket = INVALID_SOCKET;
		}
		
		InterlockedDecrement(&pWorker->m_nThreadCount);

		return 0;
	}

	static unsigned __stdcall SendThread(void* pParam)
	{
		CDataBuffer* pDataBuffer;

		const int nSendBufferSize = 16 * 1024; 
		char* pSendBuffer = new char [nSendBufferSize];
		int nBufferBytes;
		
		CSyncSocketWorker* pWorker = (CSyncSocketWorker*)pParam;
		InterlockedIncrement(&pWorker->m_nThreadCount);
	
		SetEvent(pWorker->m_hEventGoOn);
		while(INVALID_SOCKET != pWorker->m_socket)
		{
			if(WAIT_TIMEOUT == WaitForSingleObject(pWorker->m_hEventSend, 5000))
			{
				if(GetTickCount() - pWorker->m_nLastActionTime > pWorker->m_nTimeout)
					break;
				else
					continue;
			}
				
			do
			{
				pDataBuffer = NULL;
				nBufferBytes = 0;

				BOOL bConti = TRUE;
				do
				{
					EnterCriticalSection(&pWorker->m_csSend);
						BUFFER_LIST::iterator it = pWorker->m_listSend.begin();
						if(it != pWorker->m_listSend.end())
						{
							pDataBuffer = *it;
							if(pDataBuffer->GetSize() <= nSendBufferSize - nBufferBytes)
							{
								memcpy(pSendBuffer + nBufferBytes, pDataBuffer->GetBuffer(), pDataBuffer->GetSize());
								nBufferBytes += pDataBuffer->GetSize();
								delete pDataBuffer;
								pDataBuffer = NULL;
								pWorker->m_listSend.pop_front();
							}
							else
								bConti = FALSE;
						}
						else
							bConti = FALSE;
					LeaveCriticalSection(&pWorker->m_csSend);
				}while(bConti);

				if(nBufferBytes != 0)
				{
					int nSent = send(pWorker->m_socket, pSendBuffer, nBufferBytes, 0);
					pWorker->m_nLastActionTime = GetTickCount();
					
					if(SOCKET_ERROR == nSent || 0 == nSent)
					{
						TRACE(TEXT("Send Exit WSAErr: %d\n"), WSAGetLastError());

						if(pDataBuffer = new CDataBuffer(pSendBuffer, nBufferBytes))
						{
							EnterCriticalSection(&pWorker->m_csSend);
							pWorker->m_listSend.push_front(pDataBuffer);
							LeaveCriticalSection(&pWorker->m_csSend);
						}
						
						break;
					}

					continue;
				}
				else if(NULL != pDataBuffer)// just use the large dataBuffer
				{
					int nSent = 0;	
					if(pDataBuffer->GetSize())
					{
						nSent = send(pWorker->m_socket, pDataBuffer->GetBuffer(), pDataBuffer->GetSize(), 0);
						pWorker->m_nLastActionTime = GetTickCount();
						
						if(SOCKET_ERROR == nSent || 0 == nSent)
						{
							TRACE(TEXT("Send Exit WSAErr: %d\n"), WSAGetLastError());
							break;
						}
					}

					EnterCriticalSection(&pWorker->m_csSend);
					pWorker->m_listSend.pop_front();
					LeaveCriticalSection(&pWorker->m_csSend);
					
					delete pDataBuffer;
				}
				else
					break;
			}while(true);

			if(pWorker->m_bShutingdown)
				break;
		}
		
		if(INVALID_SOCKET != pWorker->m_socket)
		{
			shutdown(pWorker->m_socket, SD_SEND);
			closesocket(pWorker->m_socket);
			pWorker->m_socket = INVALID_SOCKET;
		}
		
		InterlockedDecrement(&pWorker->m_nThreadCount);

		delete [] pSendBuffer;

		return 0;		
	}


	static unsigned __stdcall WorkThread(void* pParam)
	{
		CSyncSocketWorker* pWorker = (CSyncSocketWorker*)pParam;
		TDerivedWorker* pDerivedWorker = (TDerivedWorker*)pWorker;
		
		InterlockedIncrement(&pWorker->m_nThreadCount);

		SetEvent(pWorker->m_hEventGoOn);
		
		ResetEvent(pWorker->m_hEventGoOn);
		pDerivedWorker->ProcessData();
		SetEvent(pWorker->m_hEventGoOn);
		
		TRACE(TEXT("Work Exit WSAErr: %d\n"), WSAGetLastError());
		
		if(INVALID_SOCKET != pWorker->m_socket)
		{
			shutdown(pWorker->m_socket, SD_SEND);
			closesocket(pWorker->m_socket);
			pWorker->m_socket = INVALID_SOCKET;
		}
		
		InterlockedDecrement(&pWorker->m_nThreadCount);
		
		return 0;
	}

public:	
	
	// Since this function will close the socket,
	// you must provide your own overrided one
	// NEVER call CSyncSocketWorker::Stop() in its overrides
	virtual void ProcessData()  // NEVER call Stop() here
	{
		if(INVALID_SOCKET != m_socket)
		{	
			shutdown(m_socket, SD_SEND);
			closesocket(m_socket);
			m_socket = INVALID_SOCKET;
		}

		/***********************************************
	 	const int nBufferSize = 1024;
		
		char* pBuffer = new char[nBufferSize+2];
		if(!pBuffer)
			return;
		
		memset(pBuffer, 0, nBufferSize + 2);

		int nRead;
		while(nRead = Read(pBuffer, nBufferSize))
		{
			if(IsShutingdown())
				break;
			else
				Write(pBuffer, nRead);
		}

		delete pBuffer;
		*********************************************/
	}

	unsigned int Read(char * pBuffer, const unsigned int nBufferSize, BOOL bBlock = TRUE)
	{
		if(NULL == pBuffer || nBufferSize == 0)
			return 0;		

		unsigned int nFreeSize = nBufferSize;
		char* pFree = pBuffer;

		while(true)
		{
			CDataBuffer* p;

			EnterCriticalSection(&m_csRecv);
			BUFFER_LIST::iterator it = m_listRecv.begin();
			if(it != m_listRecv.end())
				p = *it;
			else
				p = NULL;
			LeaveCriticalSection(&m_csRecv);

			if(p)
			{
				if(p->GetSize() <= nFreeSize)
				{
					memcpy(pFree, p->GetBuffer(), p->GetSize());
					pFree += p->GetSize();
					nFreeSize -= p->GetSize();

					delete p;
					m_listRecv.pop_front();
					
					if(nFreeSize == 0)
						break;
				}
				else
				{ // the data size of the p object is larger than our free buffer
					memcpy(pFree, p->GetBuffer(), nFreeSize);

					p->ShrinkBuffer(nFreeSize);

					nFreeSize = 0;
					break;
				}
			}
			else
			{// p == null
				if(!bBlock)
					break;
				else if(IsConnected())
					WaitForSingleObject(m_hEventRecv, INFINITE);
				else
					break;
			}
		}


		return nBufferSize - nFreeSize;
	}

	BOOL Write(const char * pBuffer, const unsigned int nBufferSize, BOOL bSendRightNow = TRUE)
	{
		if(NULL == pBuffer || nBufferSize == 0)
			return TRUE;

		
		EnterCriticalSection(&m_csSend);
		CDataBuffer* p = new CDataBuffer(pBuffer, nBufferSize);
		if(p)
			m_listSend.push_back(p);
		LeaveCriticalSection(&m_csSend);

		if(bSendRightNow)
			SetEvent(m_hEventSend);
		
		return p != NULL;
	}



	void SendRightNow()
	{
		SetEvent(m_hEventSend);
	}

	void SetCloseWaitFlag(BOOL bWait = TRUE)
	{
		m_bCloseWait = bWait;
	}

	unsigned long GetTimeout()
	{
		return m_nTimeout;
	}

	unsigned long SetTimeout(unsigned long nNewVal)
	{
		if(nNewVal == 0)
			nNewVal = -1;

		unsigned long nOldVal = m_nTimeout;
		m_nTimeout = nNewVal;

		return nOldVal;
	}

// Attributes
	string m_strPeerAddr;
	unsigned short m_nPeerPort;
	
protected:		
	BUFFER_LIST m_listSend;
	BUFFER_LIST m_listRecv;


	CRITICAL_SECTION m_csSend;
	CRITICAL_SECTION m_csRecv;
	
	HANDLE m_hEventSend;
	
private:	
	CSyncSocketWorker(const CSyncSocketWorker& rhs)
	{
	}

	CSyncSocketWorker& operator=(const CSyncSocketWorker& rhs)
	{
		return *this;
	}	
	
	SOCKET m_socket;

	HANDLE m_hThreadRecv;
	HANDLE m_hThreadSend;
	HANDLE m_hThreadWork;
	
	HANDLE m_hEventGoOn;
	HANDLE m_hEventRecv;
	
	BOOL m_bCloseWait;

	CRITICAL_SECTION m_cs;

	unsigned long m_nTimeout; // in ms
	unsigned long m_nLastActionTime;

	BOOL m_bStarting;
	BOOL m_bShutingdown;

	long m_nThreadCount;

protected:
	class CDataBuffer
	{
	public:
		void ShrinkBuffer(unsigned int nFrom)
		{
			if(!m_pData || m_nSize == 0 || nFrom >= m_nSize)
				return;

			memmove(m_pData, m_pData + nFrom, m_nSize - nFrom);
			m_nSize -= nFrom;
		}

		CDataBuffer()
		{
			m_pData = NULL;
			m_nSize = 0;
		}
		
		CDataBuffer(const char* pData, const unsigned int nSize)
		{
			m_pData = NULL;
			m_nSize = 0;
			
			if(NULL != pData && nSize > 0)
			{
				m_pData = new char[nSize];
				memcpy(m_pData, pData, nSize);
				m_nSize = nSize;
			}
		}
		
		CDataBuffer(const CDataBuffer& rhs)
		{
			*this = rhs;
		}
		
		CDataBuffer& operator=(const CDataBuffer& rhs)
		{
			if(m_pData)
				delete [] m_pData;

			m_pData = NULL;
			m_nSize = 0;
			
			if(NULL != rhs.m_pData && rhs.m_nSize > 0)
			{
				m_pData = new char[rhs.m_nSize];
				memcpy(m_pData, rhs.m_pData, rhs.m_nSize);
				m_nSize = rhs.m_nSize;
			}

			return *this;
		}


		virtual ~CDataBuffer()
		{
			if(m_pData)
			{
				delete [] m_pData;
				m_pData = NULL;
			}
		}

		const char* GetBuffer()
		{
			return (const char*)m_pData;
		}
		
		unsigned int GetSize()
		{
			return m_nSize;
		}

	private:
		char*			m_pData;
		unsigned int	m_nSize;
	};	
};


#endif // !defined(AFX_SYNCSOCKETWORKER_H__7D3D2BB8_8007_42F0_A800_3B6CC1A9C0ED__INCLUDED_)

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here


Written By
Web Developer
China China
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.

Comments and Discussions