Click here to Skip to main content
Click here to Skip to main content
Add your own
alternative version
Go to top

Declarative multithreading

, 13 Mar 2012
An introduction and proof of concept code for the idea of declarative multi threading in C#.
using System;
using System.Collections.Generic;
using System.Threading;
using System.ComponentModel;

namespace ThreadBound
{
    /// <summary>
    /// Worker-thread implementation that is compatible with the WorkerSynchronizationContext.
    /// </summary>
    internal class Worker
    {
        /// <summary>
        /// Reference to the thread represented by this Worker instance.
        /// </summary>
        private Thread WorkerThread;

        /// <summary>
        /// This event is signaled as soon as the new worker thread is ready to process work items.
        /// </summary>
        private ManualResetEvent InitReady=new ManualResetEvent(false);

        /// <summary>
        /// This event will be signaled if the worker thread should be shut down.
        /// </summary>
        private ManualResetEvent StopNow = new ManualResetEvent(false);

        /// <summary>
        /// A sempahore to synchronize the processing of the queue items. A resource is added to
        /// the semaphore if a new item is enqueued. As soon as the queue is empty the semaphore
        /// is depleted.
        /// </summary>
        private Semaphore QueueSemaphore = new Semaphore(0, int.MaxValue);

        /// <summary>
        /// Reference to the WorkerThreadSyncContext instance that is the context for this
        /// worker thread.
        /// </summary>
        private SynchronizationContext WorkerThreadSyncContext;

        /// <summary>
        /// Reference to a Queue-Instance that handles the work items.
        /// </summary>
        private Queue<WorkQueueItem> WorkQueue = new Queue<WorkQueueItem>();

        /// <summary>
        /// Getter for the synchrnization context that can be used to enqueue work items
        /// for this thread.
        /// </summary>
        public SynchronizationContext SyncContext
        {
            get
            {
                return WorkerThreadSyncContext;
            }
        }

        /// <summary>
        /// C'tor: Create a new worker thread. Returns as soon as the new worker thread is ready
        /// to process requests. This may take a little while.
        /// </summary>
        public Worker()
        {
            //-- Start the worker thread
            WorkerThread = new Thread(WorkerMain);
            WorkerThread.IsBackground = true;
            WorkerThread.Start();

            //-- Wait for the worker thread's initialization to finish
            InitReady.WaitOne();
        }

        /// <summary>
        /// C'tor
        /// </summary>
        /// <param name="name">Name for this worker thread. Usefull for debuging purposes.</param>
        public Worker(String name) :
            this()
        {
            WorkerThread.Name = name;
        }
        
        /// <summary>
        /// The worker threads main method.
        /// </summary>
        private void WorkerMain()
        {
            WorkQueueItem currentItem = null;
            WaitHandle[] waitHandles={StopNow, QueueSemaphore};

            //-- Create the synchronization context
            AsyncOperationManager.SynchronizationContext = new WorkerSynchronizationContext(this);
            WorkerThreadSyncContext = AsyncOperationManager.SynchronizationContext;

            //-- Tell the waiting c'tor that the initialization is finished
            InitReady.Set();

            while (true)
            {
                //-- Wait for the cancel event and the queue semaphore
                if (WaitHandle.WaitAny(waitHandles) == 0)
                    break;

                lock (WorkQueue)
                {
                    currentItem = WorkQueue.Dequeue();
                }

                //-- The queue item knows how to execute its content. Just call
                //   invoke. It doesn't matter if the item is synchrnous or asynchronous.
                //   If an exception occures within an asynchronous item, the worker
                //   is terminated. This is consistent with the other worker implementations.
                currentItem.Invoke();
            }
        }

        /// <summary>
        /// Adds an asynchronous item to the queue.
        /// The item will be wraped into an AsyncWorkQueueItem.
        /// </summary>
        /// <param name="newWorkItem">Callback to execute.</param>
        /// <param name="state">Parameter for the callback.</param>
        internal void EnqueAsync(SendOrPostCallback newWorkItem, Object state)
        {
            lock (WorkQueue)
            {
                WorkQueue.Enqueue(new AsyncWorkQueueItem(newWorkItem, state));
                QueueSemaphore.Release();   //Eine neue Ressource zur Semaphore hinzufügen
            }
        }

        /// <summary>
        /// Adds a synchronous item to the queue.
        /// This method will wait for the new item to execute.
        /// </summary>
        /// <param name="newWorkItem">Callback to execute.</param>
        /// <param name="state">Parameter for the callback.</param>
        internal void EnqueSync(SendOrPostCallback newWorkItem, Object state)
        {
            //-- Create a new SyncWorkQueueItem and insert it into the queue.
            //   This time we will wait for the item to finish.
            SyncWorkQueueItem queueItem = new SyncWorkQueueItem(newWorkItem, state);

            lock (WorkQueue)
            {
                WorkQueue.Enqueue(queueItem);
                QueueSemaphore.Release();   //Add a new ressource to the semaphore to kick of the worker thread
            }

            //-- Wait for the queued item to complete.
            queueItem.Join();

            //-- If an exception occured, rethrow it within this thread.
            queueItem.RethrowException();
        }

        /// <summary>
        /// Signals the stop-event (StopNow) to terminate the worker thread.
        /// </summary>
        public void Stop()
        {
            StopNow.Set();
        }

        /// <summary>
        /// Waits for the worker thread to terminate.
        /// </summary>
        public void Join()
        {
            WorkerThread.Join();
        }
    }
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Common Development and Distribution License (CDDL)

Share

About the Author

gossd
Systems Engineer
Germany Germany
No Biography provided

| Advertise | Privacy | Mobile
Web01 | 2.8.140916.1 | Last Updated 13 Mar 2012
Article Copyright 2011 by gossd
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid