Click here to Skip to main content
12,447,625 members (58,947 online)
Click here to Skip to main content

Stats

29.3K views
166 downloads
51 bookmarked
Posted

Concurrent Programming - Investigating Task Messaging To Achieve Synchronization Free Inter-Task Communication

, 7 Jan 2008 CPOL
Further studies of Parallel FX.
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Clifton.Concurrent
{
	public class TaskMessageQueue
	{
		protected Queue<TaskMessage> msgQueue;
		protected List<Task> taskList;
		protected Semaphore sem;
		protected string name;

		/// <summary>
		/// Returns true if the queue has listening tasks.
		/// </summary>
		public bool HasListeners
		{
			get { return taskList.Count > 0; }
		}

		/// <summary>
		/// Returns the number of listening tasks.
		/// </summary>
		public int NumTasks
		{
			get { return taskList.Count; }
		}

		public int NumMessages
		{
			get { return msgQueue.Count; }
		}

		/// <summary>
		/// Returns the queue name.
		/// </summary>
		public string Name
		{
			get { return name; }
		}

		protected TaskMessageQueue()
		{
		}

		/// <summary>
		/// Constructor for the message queue.
		/// </summary>
		/// <param name="name"></param>
		public TaskMessageQueue(string name)
		{
			if (String.IsNullOrEmpty(name))
			{
				throw new ArgumentNullException("Name cannot be null or empty.");
			}

			msgQueue = new Queue<TaskMessage>();
			taskList = new List<Task>();
			sem = new Semaphore(0, Int32.MaxValue);
			this.name = name;
		}

		/// <summary>
		/// Adds a task to the task list associated with this queue.
		/// </summary>
		public void AddTask(Task task)
		{
			if (taskList.Contains(task))
			{
				throw new TaskMessageManagerException("Task already associated with this queue.");
			}

			taskList.Add(task);
		}

		/// <summary>
		/// Removes a task associated with this queue.
		/// </summary>
		public void RemoveTask(Task task)
		{
			if (!taskList.Contains(task))
			{
				throw new TaskMessageManagerException("The task is not associated with this queue.");
			}

			taskList.Remove(task);
		}

		/// <summary>
		/// Enqueues the message and releases the semaphore for one count, releasing one waiting task.
		/// </summary>
		/// <param name="msg"></param>
		public void Enqueue(TaskMessage msg)
		{
			// Add the message in a thread safe manner.
			lock (msgQueue)
			{
				msgQueue.Enqueue(msg);
			}

			// Release any task waiting for a message.
			sem.Release();
		}

		/// <summary>
		/// Dequeues the message, incrementing the semaphore by one count.  This routine
		/// will wait the specifiied timeout amount and return null if the wait expired
		/// before a message was dequeued.
		/// </summary>
		public TaskMessage Dequeue(long timeout)
		{
			if (timeout > Int32.MaxValue)
			{
				throw new ArgumentException("Timeout cannot exceed Int32.MaxValue.");
			}

			TaskMessage ret = null;
			bool released = sem.WaitOne((int)timeout, false);

			// If there are messages queued...
			if (released)
			{
				// Get the message in a thread safe manner.
				lock (msgQueue)
				{
					ret = msgQueue.Dequeue();
				}
			}
			// Otherwise return null, indicating timeout.
			
			return ret;
		}
	}
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Marc Clifton
United States United States
Marc is the creator of two open source projects, MyXaml, a declarative (XML) instantiation engine and the Advanced Unit Testing framework, and Interacx, a commercial n-tier RAD application suite.  Visit his website, www.marcclifton.com, where you will find many of his articles and his blog.

Marc lives in Philmont, NY.

You may also be interested in...

| Advertise | Privacy | Terms of Use | Mobile
Web01 | 2.8.160811.3 | Last Updated 7 Jan 2008
Article Copyright 2008 by Marc Clifton
Everything else Copyright © CodeProject, 1999-2016
Layout: fixed | fluid