Click here to Skip to main content
15,881,413 members
Articles / Programming Languages / C++

Single-threaded concurrency model design

Rate me:
Please Sign up or sign in to vote.
4.71/5 (7 votes)
12 Jul 2009CPOL20 min read 32K   359   25  
A lightweight library to support single-threaded concurrency with multiple components.
#include <windows.h>
#include "ThreadCtx.h"

//////////////////////////////////////
// ThreadCtx
__declspec(thread) ThreadCtx* ThreadCtx::s_pCtx = NULL;

const ULONG MAX_LATE_TIMER = 0x10000000;

void _com_raise_LastError()
{
	throw GetLastError();
}

ThreadCtx::ThreadCtx()
	:m_nSignaled(0)
	,m_nApc(0)
	,m_nMsg(0)
{
	ASSERT(!s_pCtx);
	if (!s_pCtx)
		s_pCtx = this;
}
ThreadCtx::~ThreadCtx()
{
	ASSERT(!m_nSignaled);
	ASSERT(this == s_pCtx);
	if (this == s_pCtx)
		s_pCtx = NULL;
}

DWORD ThreadCtx::PerfWait(HANDLE* pWait, DWORD dwCount, DWORD dwTimeout)
{
	DWORD dwWaitResult;

#ifndef UNDER_CE
	if (!m_nApc)
#endif // UNDER_CE

		// Perform non-alertable wait.
		dwWaitResult = 
			m_nMsg ?
				GetQueueStatus(QS_ALLEVENTS) ?
					WAIT_OBJECT_0 + dwCount :
					MsgWaitForMultipleObjects(dwCount, pWait, FALSE, dwTimeout, QS_ALLEVENTS) :
				dwCount ?
					WaitForMultipleObjects(dwCount, pWait, FALSE, dwTimeout) :
					(Sleep(dwTimeout), WAIT_TIMEOUT);

#ifndef UNDER_CE
	else
		// Perform alertable wait.
		dwWaitResult = 
			m_nMsg ?
				GetQueueStatus(QS_ALLEVENTS) ?
					WAIT_OBJECT_0 + dwCount :
					MsgWaitForMultipleObjectsEx(dwCount, pWait, dwTimeout, QS_ALLEVENTS, MWMO_ALERTABLE) :
				dwCount ?
					WaitForMultipleObjectsEx(dwCount, pWait, FALSE, dwTimeout, TRUE) :
					SleepEx(dwTimeout, TRUE) ? WAIT_IO_COMPLETION : WAIT_TIMEOUT;
#endif // UNDER_CE

	switch (dwWaitResult)
	{
#ifndef UNDER_CE
	case WAIT_IO_COMPLETION:
#endif // UNDER_CE
	case WAIT_TIMEOUT:
		break; // ok
	default:
		if (WAIT_OBJECT_0 + dwCount == dwWaitResult)
			dwWaitResult = WAIT_FAILED; // this is our agreement for wait_msg
		else
		{
			if ((dwWaitResult >= WAIT_OBJECT_0) && (dwWaitResult < WAIT_OBJECT_0 + dwCount))
				dwWaitResult -= WAIT_OBJECT_0;
			else
				if ((dwWaitResult >= WAIT_ABANDONED_0) && (dwWaitResult < WAIT_ABANDONED_0 + dwCount))
					dwWaitResult -= WAIT_ABANDONED_0;
				else
					_com_raise_LastError();
		}
	}
	return dwWaitResult;
}
ThreadCtx::Schedule* ThreadCtx::Wait(bool& bApc, bool& bMsg)
{
	bApc = false;
	bMsg = false;

	ScheduleTimer* pFirstTimeout;
	DWORD dwTimeout;
	DWORD dwTicks = GetTickCount();

	if (m_treeTimers._Empty)
	{
		pFirstTimeout = NULL;
		dwTimeout = INFINITE;
	} else
	{
		pFirstTimeout = m_treeTimers.FindBigger(dwTicks - MAX_LATE_TIMER);
		if (!pFirstTimeout)
			VERIFY(pFirstTimeout = m_treeTimers.FindMin());

		dwTimeout = dwTicks - (DWORD) pFirstTimeout->m_Key;
		dwTimeout = (dwTimeout < MAX_LATE_TIMER) ? 0 : 0 - dwTimeout;
	}

	HANDLE pWait[MAXIMUM_WAIT_OBJECTS - 1];
	Schedule* pKeyWait[MAXIMUM_WAIT_OBJECTS - 1];

	DWORD dwWait = 0;

	ScheduleHandle* pNode;
	for (pNode = m_lstHandle._Head; pNode; pNode = pNode->_Next)
		if (dwWait < _countof(pWait))
		{
			pKeyWait[dwWait] = pNode;
			pWait[dwWait++] = pNode->_Handle;
		}
		else
			break;

	DWORD dwResult = WAIT_TIMEOUT;
	if (pNode)
	{
		// there are more than MAXIMUM_WAIT_OBJECTS - 1 objects.
		for ( ; GetTickCount() - dwTicks <= dwTimeout; Sleep(10))
		{
			dwResult = PerfWait(pWait, _countof(pWait), 0);
			if (dwResult < _countof(pWait))
				return pKeyWait[dwResult];

			if (WAIT_TIMEOUT != dwResult)
				break;

			for (ScheduleHandle* pWaitNode = pNode; pWaitNode = (ScheduleHandle*) pWaitNode->_Next; )
			{
				switch (dwResult = WaitForSingleObject(pWaitNode->_Handle, 0))
				{
				case WAIT_OBJECT_0:
				case WAIT_ABANDONED:
					return pWaitNode;
					// no break;
				case WAIT_TIMEOUT:
					break;
				default:
					_com_raise_LastError();
				}
			}
		}

	} else
	{
		dwResult = PerfWait(pWait, dwWait, dwTimeout);
		if (dwResult < _countof(pWait))
			return pKeyWait[dwResult];
	}

	switch (dwResult)
	{
	case WAIT_TIMEOUT:
		ASSERT(pFirstTimeout && pFirstTimeout->m_Key);
		pFirstTimeout->KillTimer();
		return pFirstTimeout;

#ifndef UNDER_CE
	case WAIT_IO_COMPLETION:
		bApc = true;
		break;
#endif // UNDER_CE

	case WAIT_FAILED:
		bMsg = true;
		break;
	}

	return NULL;
}
void ThreadCtx::WaitOnce()
{
	// run our multi-wait context.
	bool bApc, bMsg;

	Schedule* pSchedule = Wait(bApc, bMsg);
	if (pSchedule)
		pSchedule->OnSchedule();
	else
		if (bMsg)
			DispatchMessages();
}
void ThreadCtx::DispatchMessages()
{
	for (MSG stMsg; PeekMessage(&stMsg, NULL, 0, 0, PM_REMOVE); )
		DispatchMessage(&stMsg);
}

//////////////////////////////////////
// Schedules
void ThreadCtx::Schedule::OnSchedule()
{
	_Signaled = true;
}
void ThreadCtx::Schedule::put_Signaled(bool bVal)
{
	m_Signal = bVal;
	if (bVal)
		CTX().m_nSignaled++;
}
void ThreadCtx::ScheduleHandle::put_Handle(HANDLE hVal)
{
	RemoveFromList();
	m_hHandle = hVal;
	if (hVal)
		CTX().m_lstHandle.InsertHead(*this);
}
void ThreadCtx::ScheduleHandle::RemoveFromList()
{
	if (m_hHandle)
		CTX().m_lstHandle.Remove(*this);
}
void ThreadCtx::ScheduleTimer::put_Timeout(ULONG nTimeout)
{
	if (m_Key)
		CTX().m_treeTimers.Remove(*this);

	if (INFINITE == nTimeout)
		m_Key = 0;
	else
	{
		m_Key = GetTickCount() + nTimeout;
		if (!m_Key)
			m_Key = 1;

		CTX().m_treeTimers.Insert(*this);
	}
}
bool ThreadCtx::Schedule::Wait()
{
	while (!CTX().m_nSignaled)
		CTX().WaitOnce();

	if (_Signaled)
	{
		_Signaled = false;
		return true;
	}
	return false;
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer (Senior)
Israel Israel
My name is Vladislav Gelfer, I was born in Kiev (former Soviet Union), since 1993 I live in Israel.
In programming I'm interested mostly in low-level, OOP design, DSP and multimedia.
Besides of the programming I like physics, math, digital photography.

Comments and Discussions