Click here to Skip to main content
Click here to Skip to main content
Add your own
alternative version

Concurrent Programming - Investigating Task Messaging To Achieve Synchronization Free Inter-Task Communication

, 7 Jan 2008 CPOL
Further studies of Parallel FX.
mandelbrot2.zip
Mandelbrot2
demo
ms
Ms
bin
Release
ms.vshost.exe
ms.vshost.exe.manifest
Clifton.Concurrent
Clifton.Tools.Data
Ms.csproj.user
Properties
Settings.settings
sci.math
doc
obj
Debug
ResolveAssemblyReference.cache
Release
ResolveAssemblyReference.cache
Properties
sci
doc
Properties
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

using Clifton.Tools.Data;

// Known issues with the PFX: http://blogs.msdn.com/pfxteam/archive/2007/11/29/6558570.aspx
// eek!  Some of these bugs are rather surprising.  I guess Microsoft doesn't practice "fix the bug before adding a new feature".
// How to cancel a task in PFX: http://blogs.msdn.com/pedram/archive/2008/01/02/how-to-cancel-a-task-in-parallel-fx.aspx
// First look at Parallel FX and self-replicating tasks: http://blogs.msdn.com/pedram/archive/2007/12/04/first-look-at-parallel-fx-and-self-replicating-tasks.aspx

namespace Clifton.Concurrent
{
	public delegate void WaitCallback();

	public class TaskMessageManager
	{
		internal static TaskMessageManager defaultTMM=new TaskMessageManager();

		protected Dictionary<string, TaskMessageQueue> messageQueueMap;
		protected Dictionary<Task, TaskMessageQueue> taskQueueMap;

		public static TaskMessageManager Default
		{
			get {return defaultTMM;}
		}

		public bool HasTasks
		{
			get { return taskQueueMap.Count > 0; }
		}

		/// <summary>
		/// Returns a Task instance given the task name, or null if not found.
		/// </summary>
		public Task GetTaskByName(string taskName)
		{
			Task ret=null;

			if (String.IsNullOrEmpty(taskName))
			{
				throw new ArgumentNullException("The task name cannot be null or empty.");
			}

			lock (taskQueueMap)
			{
				foreach (Task t in taskQueueMap.Keys)
				{
					if (t.Name == taskName)
					{
						ret = t;
						break;
					}
				}
			}

			return ret;
		}

		/// <summary>
		/// Waits indefinitely until all tasks managed by the TMM have been completed.
		/// </summary>
		public void Wait()
		{
			while (HasTasks) { Thread.Sleep(10); }
		}

		public void Wait(WaitCallback callback)
		{
			if (callback == null)
			{
				throw new ArgumentNullException("Callback cannot be null");
			}

			while (HasTasks) { callback(); }
		}

		/// <summary>
		/// Wait indefinitely until the queue is empty.
		/// </summary>
		/// <param name="queueName"></param>
		public void Wait(string queueName)
		{
			if (String.IsNullOrEmpty(queueName))
			{
				throw new ArgumentNullException("Queue name cannot be null or empty.");
			}

			TaskMessageQueue tqm;

			lock (taskQueueMap)
			{
				if (!messageQueueMap.ContainsKey(queueName))
				{
					throw new ArgumentException("The queue " + queueName + " is not part of the queue collection.");
				}

				tqm = messageQueueMap[queueName];
			}

			while (tqm.NumMessages > 0) { Thread.Sleep(10); }
			//Trace.WriteLine("!tmm:" + queueName + " is empty.");
		}

		public void Wait(string queueName, WaitCallback callback)
		{
			if (String.IsNullOrEmpty(queueName))
			{
				throw new ArgumentNullException("Queue name cannot be null or empty.");
			}

			if (callback == null)
			{
				throw new ArgumentNullException("Callback cannot be null");
			}

			TaskMessageQueue tqm;

			lock (taskQueueMap)
			{
				if (!messageQueueMap.ContainsKey(queueName))
				{
					throw new ArgumentException("The queue " + queueName + " is not part of the queue collection.");
				}

				tqm = messageQueueMap[queueName];
			}

			while (tqm.NumMessages > 0) { callback();  }
			//Trace.WriteLine("!tmm:" + queueName + " is empty.");
		}

		/// <summary>
		/// Registers a task with the message queue.  The task cannot have already been
		/// previous registered.  The queue name is required to inform the TMM as to which
		/// queue the task is listening.
		/// </summary>
		public void RegisterTask(Task task, string queueName)
		{
			if (String.IsNullOrEmpty(queueName))
			{
				throw new ArgumentNullException("The queue name cannot be null or empty.");
			}

			if (String.IsNullOrEmpty(task.Name))
			{
				throw new TaskMessageManagerException("The task must have a name.");
			}

			if (taskQueueMap.ContainsKey(task))
			{
				// It appears that the TaskManager is re-using task instances?  Is this part of the thread pooling?
				// For example, single stepping through the Clone method resulting in this exception when
				// self replication was enabled.
				throw new TaskMessageManagerException("The task " + task.Name + " is already registered.");
			}

			Debug.WriteLine("!tmm:Registering task: " + task.Name);

			// Register the task in a thread safe manner into the task-queue map.
			lock (taskQueueMap)
			{
				if (!messageQueueMap.ContainsKey(queueName))
				{
					// register a new task map.
					messageQueueMap[queueName] = new TaskMessageQueue(queueName);
				}

				// Add the task to parties interested in the queue.
				messageQueueMap[queueName].AddTask(task);

				lock (taskQueueMap)
				{
					// We need to remember the queue to which the task is connected.
					taskQueueMap[task] = messageQueueMap[queueName];
				}
			}

			// Wire up the completed event so we can remove the task from our list when it's completed.
			task.Completed += new EventHandler(OnTaskCompleted);
		}

		/// <summary>
		/// Returns true if the task is registered, false otherwise.
		/// </summary>
		public bool IsRegistered(Task task)
		{
			if (String.IsNullOrEmpty(task.Name))
			{
				throw new TaskMessageManagerException("The task must have a name.");
			}

			bool ret;

			lock (taskQueueMap)
			{
				ret = taskQueueMap.ContainsKey(task);
			}

			return ret;
		}

		/// <summary>
		/// Posts a message to a queue, which will be consumed by the first task listening
		/// on that queue that is released.
		/// </summary>
		public void PostMessage(string queueName, IMessage taskMessage)
		{
			PostMessage(null, queueName, taskMessage);
		}

		/// <summary>
		/// Posts a message to a queue, which will be consumed by the first task listening
		/// on that queue that is released.
		/// </summary>
		public void PostMessage(Task fromTask, string queueName, IMessage taskMessage)
		{
			lock (taskQueueMap)
			{
				if (!messageQueueMap.ContainsKey(queueName))
				{
					throw new ArgumentException("The queue name " + queueName + " doesn't exist.");
				}

				IMessage msg = CloneMessage(taskMessage);
				PostMessage(messageQueueMap[queueName], new TaskMessage(fromTask, msg));
			}
		}

		/// <summary>
		/// Posts a message to the task's message queue. 
		/// </summary>
		/// TODO: We may want this to post only to this task, even if the message queue is
		/// shared by multiple tasks.
		public void PostMessage(Task toTask, IMessage taskMessage)
		{
			IMessage msg = CloneMessage(taskMessage);
			PostMessage(taskQueueMap[toTask], new TaskMessage(null, msg));
		}

		/// <summary>
		/// Posts a message to the task's message queue. 
		/// </summary>
		/// TODO: We may want this to post only to this task, even if the message queue is
		/// shared by multiple tasks.
		public void PostMessage(Task fromTask, Task toTask, IMessage taskMessage)
		{
			IMessage msg = CloneMessage(taskMessage);
			PostMessage(taskQueueMap[toTask], new TaskMessage(fromTask, msg));
		}

		/// <summary>
		/// Returns the next message that is in this task's message queue, waiting forever or
		/// until the task is cancelled.
		/// </summary>
		public TaskMessage GetMessage()
		{
			return GetMessage(Task.Current, -1);
		}

		/// <summary>
		/// Returns the next message that is in this task's message queue, waiting for the specified
		/// timeout.  If timed out, the TMM returns NoMessage.
		/// </summary>
		public TaskMessage GetMessage(int timeout)
		{
			return GetMessage(Task.Current, timeout);
		}

		/// <summary>
		/// Clones the message.  Will throw an exception if the message is not serializable.
		/// </summary>
		/// TODO: For now, we are cloning the message as opposed to sending a true
		/// serialization of the message, then deserializing when we receive it.  This
		/// is satisfactory for an initial test of this architecture, but of course it
		/// doesn't support sending messages across application boundaries.
		protected IMessage CloneMessage(IMessage msg)
		{
			IMessage ret;

			if (msg is IClonableMessage)
			{
				ret = ((IClonableMessage)msg).DeepClone();
			}
			else
			{
				ret = Cloner.CloneObject<IMessage>(msg);
			}

			return ret;
		}

		/// <summary>
		/// Sends a serialized message to the target task.  Calling this method
		/// assumes that the IMessage instance has been serialized/cloned.  This
		/// ensures that the target task gets its own copy of the message and cannot
		/// in any way reference the objects in the message from the sender.
		/// </summary>
		/// <param name="toTask"></param>
		/// <param name="message"></param>
		protected void PostMessage(TaskMessageQueue tqm, TaskMessage message)
		{
			Debug.WriteLine("!tmm:PostMessage " + tqm.Name + " -> " + message.Message.ToString());

			lock (taskQueueMap)
			{
				// If it's stop message, queue up enough messages for the current number of listeners.
				// This ensures that all listening tasks get the message (unless of course the task does
				// something strange, like get a message twice).
				if (message.Message is StopMessage)
				{
					for (int i = 0; i < tqm.NumTasks; i++)
					{
						tqm.Enqueue(message);
					}
				}
				else
				{
					// Otherwise post one instance of the message.
					tqm.Enqueue(message);
				}
			}
		}

		// The problem with this code is that it does all this testing and
		// setup each time the task calls into it. 
		// Can this be changed so that the setup is done once for the task
		// in question?
		protected TaskMessage GetMessage(Task task, int timeout)
		{
			Debug.WriteLine("!tmm:GetMessage " + task.Name);
			TaskMessageQueue tmQueue=null;

			// We still need to honor tasks being cancelled, so even if the 
			// timeout is infinite (-1), internally, we don't block forever.
			// Access the message queue map in a thread safe manner.
			lock (taskQueueMap)
			{
				if (!taskQueueMap.ContainsKey(task))
				{
					throw new TaskMessageManagerException("The target task is not registered with the message manager.");
				}

				tmQueue = taskQueueMap[task];
			}

			TaskMessage tm = null;
			Stopwatch startWaiting = new Stopwatch();
			startWaiting.Start();

			while (tm == null)
			{
				long maxTimeout = GetRemainingTimeToWait(startWaiting, timeout);

				// The task timeout has been exceeded with no messages.
				// Return to the task with a NoMessage message.
				if (maxTimeout <= 0)
				{
					Debug.WriteLine("!tmm:Returning NoMessage");
					tm = new TaskMessage(null, NoMessage.Default);
				}
				else
				{
					// Gets a message or null, in which case we continue waiting.
					// This is a thread-safe call into our queue manager.
					tm = tmQueue.Dequeue(maxTimeout);

					/* -- Because of problems with interaction between multiple
					 * threads and "smart" termination of unecessary tasks, this
					 * code has been removed for the moment.
					if (tm == null)
					{
						// If the task has nothing to do, and we've got additional
						// tasks running on this same message queue, then stop this
						// task.  We don't need this task with no messages to process
						// and other tasks listening for messages.  This dynamically
						// adjusts the number of tasks replicated to work on a particular
						// message queue.  

						// Does this end up fighting with the TaskManager's
						// task replcation algorithm?  Should the task itself replicate?
						// Should the message manager replicate the task instead?
						lock (taskMessageQueueMap)
						{
							// this task can stop if there's another task that will
							// be processing messages.  But does the TaskManager
							// clone the task when our task becomes busy again???
							if (taskMessageQueueMap[task.Name].NumTasks > 1)
							{
								// Remove the task here, so that another task that
								// times out with no messages doesn't remove itself
								// as well (because the count of NumTasks is still
								// >1 if we do this only at A (below).
								taskMessageQueueMap[task.Name].RemoveTask(task);
								Debug.WriteLine("!tmm:Sending StopMessage");
								tm = new TaskMessage(null, StopMessage.Default);
							}
						}
					}
					*/
				}

				// But first, we check if the task has been cancelled.
				ThrowIfCanceled(task);
			}

			return tm;
		}

		protected void OnTaskCompleted(object sender, EventArgs e)
		{
			// Obviously, the task will not be in the GetMessage loop.
			Task task = (Task)sender;
			//Trace.WriteLine("!tmm:Task Completed: " + task.Name);

			try
			{
				// Remove the task in a thread safe manner.
				lock (taskQueueMap)
				{
					// Remove the task from the queue list of task listeners.

					// With SelfReplicating set as the task creation option,
					// this throws a key not found exception if we set a breakpoint in
					// the Clone routine and single step through that code, letting the
					// tasks go through the process of checking for messages and 
					// reducing to 1 task, as there are no messages queued yet.
					// Does the TaskManager create another task automatically?

					// *** A
					// We still have to do this here in case the task terminated 
					// normally.  There probably is still an issue here, that
					// normally terminating task and one waiting for messages
					// will also terminate itself before this task has been 
					// removed.
					taskQueueMap[task].RemoveTask(task);

					// If there are no more listening tasks, remove the task from the map.
					if (!taskQueueMap[task].HasListeners)
					{
						//Trace.WriteLine("!tmm:Tasks for " + taskQueueMap[task].Name + " are all completed.");
						// This has the potential side effect of throwing an exception if the application
						// thinks that the queue is still around after all tasks have terminated.  This will
						// be resolved when the queue is automatically created when messages are posted to it,
						// or when we support TMM instances and use a single queue per TMM architecture.
						messageQueueMap.Remove(taskQueueMap[task].Name);
					}

					taskQueueMap.Remove(task);
				}
			}
			catch (Exception ex)
			{
				Debug.WriteLine("!tmm error:" + ex.Message);
			}
		}

		protected long GetRemainingTimeToWait(Stopwatch startWaiting, int timeout)
		{
			// by default, on an infinite wait, we timeout every 1/4 second
			// to check for thread cancel messages.
			long ret = 250;

			if (timeout != -1)
			{
				// This is the remaining time to wait that the task has requested.
				ret = timeout - startWaiting.ElapsedMilliseconds;
			}

			return ret;
		}

		protected void ThrowIfCanceled(Task task)
		{
			if (task.IsCanceled)
			{
				Debug.WriteLine("!tmm:Cancelling: " + task.Name);
				throw new TaskCanceledException(task);
			}
		}

		protected TaskMessageManager()
		{
			messageQueueMap = new Dictionary<string, TaskMessageQueue>();
			taskQueueMap = new Dictionary<Task, TaskMessageQueue>();
		}
	}
}



// what we need:
// message queues
// messages queues specific to each thread, by task name
// ability to add a message to a particular task
// ability for a task to get its next "job" (a message)
// the message manager is responsible for queue synchronization
// the message manager is responsible for task blocking
// a task can request to block infinitely or for a specific timeout
// when timing out, if there are no messages, the task gets a "no message" message returned.
// another built in message is "stop"
// messages are cloned, so that what's put in the queue is dereferenced from the sender.
// how would we simulate a thread-specific heap?

// This "known correctness bug" with PFX: "Tasks blocked for significant periods of time may cause runaway thread 
// injection and out of memory conditions when the default policy is used. "
// is probably a show stopper because we will potentially be blocking for a long period of time if there are no messages
// to process.

// However, in the face of these obstacles, let us continue, with the faith that we won't encounter these issues and
// that shorty the TPL will be fixed.

// Another problem -- the task starts executing right away, which causes a synchronization problem when managing
// the task in a separate collection from that of the TaskManager.  To work around this problem, we are going to
// have the task register itself.  This means we are going, for the moment, to work with only a single instance
// of the task manager.

// The features of the Thread class ought to be exposed in the Task class, namely the ability to create the thread
// but not immediately run it, and also the ability to suspend and resume a thread.  I suppose this would get into
// conflict with the TaskManager and perhaps goes beyond the design goals of the TaskManager.

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Marc Clifton

United States United States
Marc is the creator of two open source projets, MyXaml, a declarative (XML) instantiation engine and the Advanced Unit Testing framework, and Interacx, a commercial n-tier RAD application suite.  Visit his website, www.marcclifton.com, where you will find many of his articles and his blog.
 
Marc lives in Philmont, NY.

| Advertise | Privacy | Mobile
Web03 | 2.8.141022.2 | Last Updated 7 Jan 2008
Article Copyright 2008 by Marc Clifton
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid