Click here to Skip to main content
15,884,629 members
Articles / Programming Languages / C#

Concurrent Programming - Investigating Task Messaging To Achieve Synchronization Free Inter-Task Communication

Rate me:
Please Sign up or sign in to vote.
4.80/5 (15 votes)
7 Jan 2008CPOL17 min read 47.2K   208   51  
Further studies of Parallel FX.
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Clifton.Concurrent
{
	public class TaskMessageQueue
	{
		protected Queue<TaskMessage> msgQueue;
		protected List<Task> taskList;
		protected Semaphore sem;
		protected string name;

		/// <summary>
		/// Returns true if the queue has listening tasks.
		/// </summary>
		public bool HasListeners
		{
			get { return taskList.Count > 0; }
		}

		/// <summary>
		/// Returns the number of listening tasks.
		/// </summary>
		public int NumTasks
		{
			get { return taskList.Count; }
		}

		public int NumMessages
		{
			get { return msgQueue.Count; }
		}

		/// <summary>
		/// Returns the queue name.
		/// </summary>
		public string Name
		{
			get { return name; }
		}

		protected TaskMessageQueue()
		{
		}

		/// <summary>
		/// Constructor for the message queue.
		/// </summary>
		/// <param name="name"></param>
		public TaskMessageQueue(string name)
		{
			if (String.IsNullOrEmpty(name))
			{
				throw new ArgumentNullException("Name cannot be null or empty.");
			}

			msgQueue = new Queue<TaskMessage>();
			taskList = new List<Task>();
			sem = new Semaphore(0, Int32.MaxValue);
			this.name = name;
		}

		/// <summary>
		/// Adds a task to the task list associated with this queue.
		/// </summary>
		public void AddTask(Task task)
		{
			if (taskList.Contains(task))
			{
				throw new TaskMessageManagerException("Task already associated with this queue.");
			}

			taskList.Add(task);
		}

		/// <summary>
		/// Removes a task associated with this queue.
		/// </summary>
		public void RemoveTask(Task task)
		{
			if (!taskList.Contains(task))
			{
				throw new TaskMessageManagerException("The task is not associated with this queue.");
			}

			taskList.Remove(task);
		}

		/// <summary>
		/// Enqueues the message and releases the semaphore for one count, releasing one waiting task.
		/// </summary>
		/// <param name="msg"></param>
		public void Enqueue(TaskMessage msg)
		{
			// Add the message in a thread safe manner.
			lock (msgQueue)
			{
				msgQueue.Enqueue(msg);
			}

			// Release any task waiting for a message.
			sem.Release();
		}

		/// <summary>
		/// Dequeues the message, incrementing the semaphore by one count.  This routine
		/// will wait the specifiied timeout amount and return null if the wait expired
		/// before a message was dequeued.
		/// </summary>
		public TaskMessage Dequeue(long timeout)
		{
			if (timeout > Int32.MaxValue)
			{
				throw new ArgumentException("Timeout cannot exceed Int32.MaxValue.");
			}

			TaskMessage ret = null;
			bool released = sem.WaitOne((int)timeout, false);

			// If there are messages queued...
			if (released)
			{
				// Get the message in a thread safe manner.
				lock (msgQueue)
				{
					ret = msgQueue.Dequeue();
				}
			}
			// Otherwise return null, indicating timeout.
			
			return ret;
		}
	}
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Architect Interacx
United States United States
Blog: https://marcclifton.wordpress.com/
Home Page: http://www.marcclifton.com
Research: http://www.higherorderprogramming.com/
GitHub: https://github.com/cliftonm

All my life I have been passionate about architecture / software design, as this is the cornerstone to a maintainable and extensible application. As such, I have enjoyed exploring some crazy ideas and discovering that they are not so crazy after all. I also love writing about my ideas and seeing the community response. As a consultant, I've enjoyed working in a wide range of industries such as aerospace, boatyard management, remote sensing, emergency services / data management, and casino operations. I've done a variety of pro-bono work non-profit organizations related to nature conservancy, drug recovery and women's health.

Comments and Discussions