Click here to Skip to main content
Click here to Skip to main content
Add your own
alternative version

Single-threaded concurrency model design

, 12 Jul 2009 CPOL
A lightweight library to support single-threaded concurrency with multiple components.
#include <windows.h>
#include "ThreadCtx.h"

//////////////////////////////////////
// ThreadCtx
__declspec(thread) ThreadCtx* ThreadCtx::s_pCtx = NULL;

const ULONG MAX_LATE_TIMER = 0x10000000;

void _com_raise_LastError()
{
	throw GetLastError();
}

ThreadCtx::ThreadCtx()
	:m_nSignaled(0)
	,m_nApc(0)
	,m_nMsg(0)
{
	ASSERT(!s_pCtx);
	if (!s_pCtx)
		s_pCtx = this;
}
ThreadCtx::~ThreadCtx()
{
	ASSERT(!m_nSignaled);
	ASSERT(this == s_pCtx);
	if (this == s_pCtx)
		s_pCtx = NULL;
}

DWORD ThreadCtx::PerfWait(HANDLE* pWait, DWORD dwCount, DWORD dwTimeout)
{
	DWORD dwWaitResult;

#ifndef UNDER_CE
	if (!m_nApc)
#endif // UNDER_CE

		// Perform non-alertable wait.
		dwWaitResult = 
			m_nMsg ?
				GetQueueStatus(QS_ALLEVENTS) ?
					WAIT_OBJECT_0 + dwCount :
					MsgWaitForMultipleObjects(dwCount, pWait, FALSE, dwTimeout, QS_ALLEVENTS) :
				dwCount ?
					WaitForMultipleObjects(dwCount, pWait, FALSE, dwTimeout) :
					(Sleep(dwTimeout), WAIT_TIMEOUT);

#ifndef UNDER_CE
	else
		// Perform alertable wait.
		dwWaitResult = 
			m_nMsg ?
				GetQueueStatus(QS_ALLEVENTS) ?
					WAIT_OBJECT_0 + dwCount :
					MsgWaitForMultipleObjectsEx(dwCount, pWait, dwTimeout, QS_ALLEVENTS, MWMO_ALERTABLE) :
				dwCount ?
					WaitForMultipleObjectsEx(dwCount, pWait, FALSE, dwTimeout, TRUE) :
					SleepEx(dwTimeout, TRUE) ? WAIT_IO_COMPLETION : WAIT_TIMEOUT;
#endif // UNDER_CE

	switch (dwWaitResult)
	{
#ifndef UNDER_CE
	case WAIT_IO_COMPLETION:
#endif // UNDER_CE
	case WAIT_TIMEOUT:
		break; // ok
	default:
		if (WAIT_OBJECT_0 + dwCount == dwWaitResult)
			dwWaitResult = WAIT_FAILED; // this is our agreement for wait_msg
		else
		{
			if ((dwWaitResult >= WAIT_OBJECT_0) && (dwWaitResult < WAIT_OBJECT_0 + dwCount))
				dwWaitResult -= WAIT_OBJECT_0;
			else
				if ((dwWaitResult >= WAIT_ABANDONED_0) && (dwWaitResult < WAIT_ABANDONED_0 + dwCount))
					dwWaitResult -= WAIT_ABANDONED_0;
				else
					_com_raise_LastError();
		}
	}
	return dwWaitResult;
}
ThreadCtx::Schedule* ThreadCtx::Wait(bool& bApc, bool& bMsg)
{
	bApc = false;
	bMsg = false;

	ScheduleTimer* pFirstTimeout;
	DWORD dwTimeout;
	DWORD dwTicks = GetTickCount();

	if (m_treeTimers._Empty)
	{
		pFirstTimeout = NULL;
		dwTimeout = INFINITE;
	} else
	{
		pFirstTimeout = m_treeTimers.FindBigger(dwTicks - MAX_LATE_TIMER);
		if (!pFirstTimeout)
			VERIFY(pFirstTimeout = m_treeTimers.FindMin());

		dwTimeout = dwTicks - (DWORD) pFirstTimeout->m_Key;
		dwTimeout = (dwTimeout < MAX_LATE_TIMER) ? 0 : 0 - dwTimeout;
	}

	HANDLE pWait[MAXIMUM_WAIT_OBJECTS - 1];
	Schedule* pKeyWait[MAXIMUM_WAIT_OBJECTS - 1];

	DWORD dwWait = 0;

	ScheduleHandle* pNode;
	for (pNode = m_lstHandle._Head; pNode; pNode = pNode->_Next)
		if (dwWait < _countof(pWait))
		{
			pKeyWait[dwWait] = pNode;
			pWait[dwWait++] = pNode->_Handle;
		}
		else
			break;

	DWORD dwResult = WAIT_TIMEOUT;
	if (pNode)
	{
		// there are more than MAXIMUM_WAIT_OBJECTS - 1 objects.
		for ( ; GetTickCount() - dwTicks <= dwTimeout; Sleep(10))
		{
			dwResult = PerfWait(pWait, _countof(pWait), 0);
			if (dwResult < _countof(pWait))
				return pKeyWait[dwResult];

			if (WAIT_TIMEOUT != dwResult)
				break;

			for (ScheduleHandle* pWaitNode = pNode; pWaitNode = (ScheduleHandle*) pWaitNode->_Next; )
			{
				switch (dwResult = WaitForSingleObject(pWaitNode->_Handle, 0))
				{
				case WAIT_OBJECT_0:
				case WAIT_ABANDONED:
					return pWaitNode;
					// no break;
				case WAIT_TIMEOUT:
					break;
				default:
					_com_raise_LastError();
				}
			}
		}

	} else
	{
		dwResult = PerfWait(pWait, dwWait, dwTimeout);
		if (dwResult < _countof(pWait))
			return pKeyWait[dwResult];
	}

	switch (dwResult)
	{
	case WAIT_TIMEOUT:
		ASSERT(pFirstTimeout && pFirstTimeout->m_Key);
		pFirstTimeout->KillTimer();
		return pFirstTimeout;

#ifndef UNDER_CE
	case WAIT_IO_COMPLETION:
		bApc = true;
		break;
#endif // UNDER_CE

	case WAIT_FAILED:
		bMsg = true;
		break;
	}

	return NULL;
}
void ThreadCtx::WaitOnce()
{
	// run our multi-wait context.
	bool bApc, bMsg;

	Schedule* pSchedule = Wait(bApc, bMsg);
	if (pSchedule)
		pSchedule->OnSchedule();
	else
		if (bMsg)
			DispatchMessages();
}
void ThreadCtx::DispatchMessages()
{
	for (MSG stMsg; PeekMessage(&stMsg, NULL, 0, 0, PM_REMOVE); )
		DispatchMessage(&stMsg);
}

//////////////////////////////////////
// Schedules
void ThreadCtx::Schedule::OnSchedule()
{
	_Signaled = true;
}
void ThreadCtx::Schedule::put_Signaled(bool bVal)
{
	m_Signal = bVal;
	if (bVal)
		CTX().m_nSignaled++;
}
void ThreadCtx::ScheduleHandle::put_Handle(HANDLE hVal)
{
	RemoveFromList();
	m_hHandle = hVal;
	if (hVal)
		CTX().m_lstHandle.InsertHead(*this);
}
void ThreadCtx::ScheduleHandle::RemoveFromList()
{
	if (m_hHandle)
		CTX().m_lstHandle.Remove(*this);
}
void ThreadCtx::ScheduleTimer::put_Timeout(ULONG nTimeout)
{
	if (m_Key)
		CTX().m_treeTimers.Remove(*this);

	if (INFINITE == nTimeout)
		m_Key = 0;
	else
	{
		m_Key = GetTickCount() + nTimeout;
		if (!m_Key)
			m_Key = 1;

		CTX().m_treeTimers.Insert(*this);
	}
}
bool ThreadCtx::Schedule::Wait()
{
	while (!CTX().m_nSignaled)
		CTX().WaitOnce();

	if (_Signaled)
	{
		_Signaled = false;
		return true;
	}
	return false;
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

valdok
Software Developer (Senior)
Israel Israel
My name is Vladislav Gelfer, I was born in Kiev (former Soviet Union), since 1993 I live in Israel.
In programming I'm interested mostly in low-level, OOP design, DSP and multimedia.
Besides of the programming I like physics, math, digital photography.

| Advertise | Privacy | Terms of Use | Mobile
Web04 | 2.8.150302.1 | Last Updated 12 Jul 2009
Article Copyright 2009 by valdok
Everything else Copyright © CodeProject, 1999-2015
Layout: fixed | fluid