|
// High performance queue...
#ifndef CTHREADQUEUE_H
#define CTHREADQUEUE_H
#if _MSC_VER > 1000
#pragma warning (disable: 4786)
#pragma warning (disable: 4748)
#pragma warning (disable: 4103)
#endif /* _MSC_VER > 1000 */
#if ! defined( QUEUE_DEFAULT_NUMBER_OF_ITEMS )
#define QUEUE_DEFAULT_NUMBER_OF_ITEMS (2048)
#endif
class CThreadQueue
{
private:
// Don't allow these sorts of things
inline CThreadQueue( const CThreadQueue& ) {};
inline CThreadQueue& operator = ( const CThreadQueue& ){ return( *this ); };
protected:
// What we want to protect
CRITICAL_SECTION m_AddCriticalSection;
CRITICAL_SECTION m_GetCriticalSection;
void ** m_Items;
SIZE_T m_AddIndex;
SIZE_T m_GetIndex;
SIZE_T m_Size;
HANDLE m_Heap;
inline void m_GrowBy( SIZE_T number_of_new_items );
public:
inline CThreadQueue( SIZE_T initial_size = QUEUE_DEFAULT_NUMBER_OF_ITEMS );
inline ~CThreadQueue();
inline BOOL Add( SIZE_T new_item ) { return( Add( (void *) new_item ) ); };
inline BOOL Add( void * new_item );
inline void Empty( void )
{
::EnterCriticalSection( &m_AddCriticalSection );
::EnterCriticalSection( &m_GetCriticalSection );
m_AddIndex = 0;
m_GetIndex = 0;
::LeaveCriticalSection( &m_GetCriticalSection );
::LeaveCriticalSection( &m_AddCriticalSection );
};
inline BOOL Get( SIZE_T& item ) { return( Get( reinterpret_cast< void *& >( item ) ) ); };
inline BOOL Get( void * & item );
inline SIZE_T GetLength( void ) const;
inline SIZE_T GetMaximumLength( void ) const { return( m_Size ); };
};
inline CThreadQueue::CThreadQueue( SIZE_T initial_size )
{
m_AddIndex = 0;
m_GetIndex = 0;
m_Items = NULL;
if ( initial_size == 0 )
{
initial_size = 1;
}
m_Heap = ::HeapCreate( HEAP_NO_SERIALIZE,
( ( ( 2 * initial_size * sizeof( void * ) ) < 65536 ) ? 65536 : (2 * initial_size * sizeof( void * ) ) ),
0 );
m_Items = (void **) ::HeapAlloc( m_Heap, HEAP_NO_SERIALIZE, initial_size * sizeof( void * ) );
m_Size = ( m_Items == NULL ) ? 0 : initial_size;
::InitializeCriticalSection( &m_AddCriticalSection );
::InitializeCriticalSection( &m_GetCriticalSection );
}
inline CThreadQueue::~CThreadQueue()
{
m_AddIndex = 0;
m_GetIndex = 0;
m_Size = 0;
if ( m_Items != NULL )
{
::HeapFree( m_Heap, HEAP_NO_SERIALIZE, m_Items );
m_Items = NULL;
}
::HeapDestroy( m_Heap );
m_Heap = NULL;
::DeleteCriticalSection( &m_AddCriticalSection );
::DeleteCriticalSection( &m_GetCriticalSection );
}
inline BOOL CThreadQueue::Add( void * item )
{
// Block other threads from entering Add();
::EnterCriticalSection( &m_AddCriticalSection );
// Add the item
m_Items[ m_AddIndex ] = item;
// Make sure m_AddIndex is never invalid
SIZE_T new_add_index = ( ( m_AddIndex + 1 ) >= m_Size ) ? 0 : m_AddIndex + 1;
// Check to see if the queue is full. We need to grow.
// Stop anyone from getting from the queue
::EnterCriticalSection( &m_GetCriticalSection );
if ( new_add_index == m_GetIndex )
{
m_AddIndex = new_add_index;
// One last double-check.
if ( m_AddIndex == m_GetIndex )
{
m_GrowBy( m_Size );
}
}
else
{
m_AddIndex = new_add_index;
}
::LeaveCriticalSection( &m_GetCriticalSection );
// Let other threads call Add() now.
::LeaveCriticalSection( &m_AddCriticalSection );
return( TRUE );
}
inline BOOL CThreadQueue::Get( void * & item )
{
// Prevent other threads from entering Get()
::EnterCriticalSection( &m_GetCriticalSection );
if ( m_GetIndex == m_AddIndex )
{
// Let's check to see if our queue has grown too big
// If it has, then shrink it
if ( m_Size > QUEUE_DEFAULT_NUMBER_OF_ITEMS )
{
// too big...
if ( ::TryEnterCriticalSection( &m_AddCriticalSection ) != 0 )
{
// Now, no one can add to the queue
if ( m_GetIndex == m_AddIndex )
{
// See if we can just shrink it... It is safe to use HeapReAlloc() because the queue is empty
void * return_value = (void *) ::HeapReAlloc( m_Heap, HEAP_NO_SERIALIZE, m_Items, QUEUE_DEFAULT_NUMBER_OF_ITEMS * sizeof( void * ) );
if ( return_value != NULL )
{
m_Items = (void **) return_value;
}
else
{
// Looks like we'll have to do it the hard way
::HeapFree( m_Heap, HEAP_NO_SERIALIZE, m_Items );
m_Items = (void **) ::HeapAlloc( m_Heap, HEAP_NO_SERIALIZE, QUEUE_DEFAULT_NUMBER_OF_ITEMS * sizeof( void * ) );
}
m_Size = ( m_Items == NULL ) ? 0 : QUEUE_DEFAULT_NUMBER_OF_ITEMS;
m_AddIndex = 0;
m_GetIndex = 0;
}
else
{
// m_GetIndex != m_AddIndex, this means that someone added
// to the queue between the time we checked m_Size for being
// too big and the time we entered the add critical section.
// If this happened then we are too busy to shrink
}
// Let people add to the queue once again
::LeaveCriticalSection( &m_AddCriticalSection );
}
}
// Let other threads call Get() now, we are empty
::LeaveCriticalSection( &m_GetCriticalSection );
return( FALSE );
}
item = m_Items[ m_GetIndex ];
// Make sure m_GetIndex is never invalid
m_GetIndex = ( ( m_GetIndex + 1 ) >= m_Size ) ? 0 : m_GetIndex + 1;
// Let other threads call Get() now
::LeaveCriticalSection( &m_GetCriticalSection );
return( TRUE );
}
inline SIZE_T CThreadQueue::GetLength( void ) const
{
// This is a very expensive process!
// No one can call Add() or Get() while we're computing this
SIZE_T number_of_items_in_the_queue = 0;
::EnterCriticalSection( const_cast< CRITICAL_SECTION * >( &m_AddCriticalSection ) );
::EnterCriticalSection( const_cast< CRITICAL_SECTION * >( &m_GetCriticalSection ) );
number_of_items_in_the_queue = ( m_AddIndex >= m_GetIndex ) ?
( m_AddIndex - m_GetIndex ) :
( ( m_AddIndex + m_Size ) - m_GetIndex );
::LeaveCriticalSection( const_cast< CRITICAL_SECTION * >( &m_GetCriticalSection ) );
::LeaveCriticalSection( const_cast< CRITICAL_SECTION * >( &m_AddCriticalSection ) );
return( number_of_items_in_the_queue );
}
inline void CThreadQueue::m_GrowBy( SIZE_T number_of_new_items )
{
// Prevent other threads from calling Get().
// We don't need to enter the AddCriticalSection because
// m_GrowBy() is only called from Add();
void * * new_array = NULL;
void * * pointer_to_free = NULL;
SIZE_T new_size = m_Size + number_of_new_items;
// Prevent other threads from getting
::EnterCriticalSection( &m_GetCriticalSection );
new_array = (void **) ::HeapAlloc( m_Heap, HEAP_NO_SERIALIZE, new_size * sizeof( void * ) );
// Now copy all of the old items from the old queue to the new one.
// Get the entries from the get-index to the end of the array
::CopyMemory( new_array, &m_Items[ m_GetIndex ], ( m_Size - m_GetIndex ) * sizeof( void * ) );
// Get the entries from the beginning of the array to the add-index
::CopyMemory( &new_array[ m_Size - m_GetIndex ], m_Items, m_AddIndex * sizeof( void * ) );
m_AddIndex = m_Size;
m_GetIndex = 0;
m_Size = new_size;
pointer_to_free = m_Items;
m_Items = new_array;
::LeaveCriticalSection( &m_GetCriticalSection );
::HeapFree( m_Heap, HEAP_NO_SERIALIZE, pointer_to_free );
}
#endif /* ! defined( CTHREADQUEUE_H ) */
|
By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.
If a file you wish to view isn't highlighted, and is a text file (not binary), please
let us know and we'll add colourisation support for it.
Senior Software Developer in C/C++ and Oracle.
Ex-physicist holding a Ph.D. on x-ray lasers.