Click here to Skip to main content
15,881,852 members
Articles / General Programming / Threads

Declarative multithreading

Rate me:
Please Sign up or sign in to vote.
4.94/5 (39 votes)
13 Mar 2012CDDL19 min read 57.5K   862   139  
An introduction and proof of concept code for the idea of declarative multi threading in C#.
using System;
using System.Collections.Generic;
using System.Threading;
using System.ComponentModel;

namespace ThreadBound
{
    /// <summary>
    /// Worker-thread implementation that is compatible with the WorkerSynchronizationContext.
    /// </summary>
    internal class Worker
    {
        /// <summary>
        /// Reference to the thread represented by this Worker instance.
        /// </summary>
        private Thread WorkerThread;

        /// <summary>
        /// This event is signaled as soon as the new worker thread is ready to process work items.
        /// </summary>
        private ManualResetEvent InitReady=new ManualResetEvent(false);

        /// <summary>
        /// This event will be signaled if the worker thread should be shut down.
        /// </summary>
        private ManualResetEvent StopNow = new ManualResetEvent(false);

        /// <summary>
        /// A sempahore to synchronize the processing of the queue items. A resource is added to
        /// the semaphore if a new item is enqueued. As soon as the queue is empty the semaphore
        /// is depleted.
        /// </summary>
        private Semaphore QueueSemaphore = new Semaphore(0, int.MaxValue);

        /// <summary>
        /// Reference to the WorkerThreadSyncContext instance that is the context for this
        /// worker thread.
        /// </summary>
        private SynchronizationContext WorkerThreadSyncContext;

        /// <summary>
        /// Reference to a Queue-Instance that handles the work items.
        /// </summary>
        private Queue<WorkQueueItem> WorkQueue = new Queue<WorkQueueItem>();

        /// <summary>
        /// Getter for the synchrnization context that can be used to enqueue work items
        /// for this thread.
        /// </summary>
        public SynchronizationContext SyncContext
        {
            get
            {
                return WorkerThreadSyncContext;
            }
        }

        /// <summary>
        /// C'tor: Create a new worker thread. Returns as soon as the new worker thread is ready
        /// to process requests. This may take a little while.
        /// </summary>
        public Worker()
        {
            //-- Start the worker thread
            WorkerThread = new Thread(WorkerMain);
            WorkerThread.IsBackground = true;
            WorkerThread.Start();

            //-- Wait for the worker thread's initialization to finish
            InitReady.WaitOne();
        }

        /// <summary>
        /// C'tor
        /// </summary>
        /// <param name="name">Name for this worker thread. Usefull for debuging purposes.</param>
        public Worker(String name) :
            this()
        {
            WorkerThread.Name = name;
        }
        
        /// <summary>
        /// The worker threads main method.
        /// </summary>
        private void WorkerMain()
        {
            WorkQueueItem currentItem = null;
            WaitHandle[] waitHandles={StopNow, QueueSemaphore};

            //-- Create the synchronization context
            AsyncOperationManager.SynchronizationContext = new WorkerSynchronizationContext(this);
            WorkerThreadSyncContext = AsyncOperationManager.SynchronizationContext;

            //-- Tell the waiting c'tor that the initialization is finished
            InitReady.Set();

            while (true)
            {
                //-- Wait for the cancel event and the queue semaphore
                if (WaitHandle.WaitAny(waitHandles) == 0)
                    break;

                lock (WorkQueue)
                {
                    currentItem = WorkQueue.Dequeue();
                }

                //-- The queue item knows how to execute its content. Just call
                //   invoke. It doesn't matter if the item is synchrnous or asynchronous.
                //   If an exception occures within an asynchronous item, the worker
                //   is terminated. This is consistent with the other worker implementations.
                currentItem.Invoke();
            }
        }

        /// <summary>
        /// Adds an asynchronous item to the queue.
        /// The item will be wraped into an AsyncWorkQueueItem.
        /// </summary>
        /// <param name="newWorkItem">Callback to execute.</param>
        /// <param name="state">Parameter for the callback.</param>
        internal void EnqueAsync(SendOrPostCallback newWorkItem, Object state)
        {
            lock (WorkQueue)
            {
                WorkQueue.Enqueue(new AsyncWorkQueueItem(newWorkItem, state));
                QueueSemaphore.Release();   //Eine neue Ressource zur Semaphore hinzufügen
            }
        }

        /// <summary>
        /// Adds a synchronous item to the queue.
        /// This method will wait for the new item to execute.
        /// </summary>
        /// <param name="newWorkItem">Callback to execute.</param>
        /// <param name="state">Parameter for the callback.</param>
        internal void EnqueSync(SendOrPostCallback newWorkItem, Object state)
        {
            //-- Create a new SyncWorkQueueItem and insert it into the queue.
            //   This time we will wait for the item to finish.
            SyncWorkQueueItem queueItem = new SyncWorkQueueItem(newWorkItem, state);

            lock (WorkQueue)
            {
                WorkQueue.Enqueue(queueItem);
                QueueSemaphore.Release();   //Add a new ressource to the semaphore to kick of the worker thread
            }

            //-- Wait for the queued item to complete.
            queueItem.Join();

            //-- If an exception occured, rethrow it within this thread.
            queueItem.RethrowException();
        }

        /// <summary>
        /// Signals the stop-event (StopNow) to terminate the worker thread.
        /// </summary>
        public void Stop()
        {
            StopNow.Set();
        }

        /// <summary>
        /// Waits for the worker thread to terminate.
        /// </summary>
        public void Join()
        {
            WorkerThread.Join();
        }
    }
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Common Development and Distribution License (CDDL)


Written By
Systems Engineer
Germany Germany
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.

Comments and Discussions