Click here to Skip to main content
15,879,535 members
Articles / Programming Languages / C#

Concurrent Programming - Investigating Task Messaging To Achieve Synchronization Free Inter-Task Communication

Rate me:
Please Sign up or sign in to vote.
4.80/5 (15 votes)
7 Jan 2008CPOL17 min read 47.1K   208   51  
Further studies of Parallel FX.
#define DualTask

using System;
using System.Drawing;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;

using Clifton.Concurrent;

using Sci.Math;

namespace ms
{
	// Here we are using classes instead of structs, since structs get boxed
	// anyways.

	/// <summary>
	/// The initial task is primed with width messages, each an x coordinate.
	/// </summary>
	public class XCoordMessage : IMessage, IClonableMessage
	{
		public int x;

		public XCoordMessage(int x)
		{
			this.x = x;
		}

		public IMessage DeepClone()
		{
			return new XCoordMessage(x);
		}
	}

	/// <summary>
	/// The coordinate message is sent to the task that computes
	/// the iterations.
	/// </summary>
	public class CoordMessage : IMessage, IClonableMessage
	{
		public int x;
		public int y;

		public CoordMessage(int x, int y)
		{
			this.x = x;
			this.y = y;
		}

		public IMessage DeepClone()
		{
			return new CoordMessage(x, y);
		}
	}

	/// <summary>
	/// This message is sent to the task responsible for updating
	/// the bitmap.
	/// </summary>
	public class CoordColorMessage : IMessage, IClonableMessage
	{
		public int x;
		public int y;
		public int colorIndex;

		public CoordColorMessage(int x, int y, int colorIndex)
		{
			this.x = x;
			this.y = y;
			this.colorIndex = colorIndex;
		}

		public IMessage DeepClone()
		{
			return new CoordColorMessage(x, y, colorIndex);
		}
	}

	/// <summary>
	/// This message has no parameters, so we can reuse the same instance.
	/// </summary>
	public class UpdateProgressMessage : IMessage, IClonableMessage
	{
		public static UpdateProgressMessage msg = new UpdateProgressMessage();

		public IMessage DeepClone()
		{
			return msg;
		}
	}

	public class Tasks
	{
		// TODO: Ideally, these should be moved into the local thread method, simulating the thread's local heap.
		TaskMessageManager tmm = TaskMessageManager.Default;
		readonly double xstep;
		readonly double ystep;
		readonly double escapeRadius;
		readonly double logEscapeRadius;
		readonly int width;
		readonly int height;
		readonly byte[] argb;
		readonly Color[] colors;
		readonly ComplexNumber p1;
		readonly ComplexNumber p2;
		readonly int maxIteration;
		readonly ProgressBar progress;

		// EEK!
		public static int progressValue;

		public Tasks(double xstep, double ystep, double escapeRadius, double logEscapeRadius, int width, int height, int maxIteration, ComplexNumber p1, ComplexNumber p2, byte[] argb, Color[] colors, ProgressBar progress)
		{
			this.xstep = xstep;
			this.ystep = ystep;
			this.escapeRadius = escapeRadius;
			this.logEscapeRadius = logEscapeRadius;
			this.width = width;
			this.height = height;
			this.maxIteration=maxIteration;
			this.p1=p1;
			this.p2=p2;
			this.argb = argb;
			this.colors = colors;
			this.progress = progress;

			Tasks.progressValue = 0;
		}

		public void Initialize()
		{
			Task task;

			// Create tasks for each core, since these tasks should max out each core.
			// We're simulating task replication here, but poorly, because we're overloading the cores with more tasks than just 2 at a time.
			// This degrades performance because each core is now running multiple active threads, forcing the core to do thread context switching.
			// Additional performance hits occur because we are using the same TMM, so unnecessary locking is occurring when accessing the queues.
#if DualTask
			for (int i = 0; i < System.Environment.ProcessorCount; i++)
#endif
			{
#if DualTask
#else
				int i = 0;
#endif
				task = Task.Create(PostCoordinates, null, TaskManager.Default, TaskCreationOptions.None, "Coordinates"+i);
				while (!tmm.IsRegistered(task)) { Thread.Sleep(10); }
				task = Task.Create(ComputeIterations, null, TaskManager.Default, TaskCreationOptions.None, "Iterations"+i);
				while (!tmm.IsRegistered(task)) { Thread.Sleep(10); }
				task = Task.Create(UpdateBitmap, null, TaskManager.Default, TaskCreationOptions.None, "Bitmap" + i);
				while (!tmm.IsRegistered(task)) { Thread.Sleep(10); }
			}

			// Create a single task for updating the progress bar.
			//task = Task.Create(UpdateProgress, null, TaskManager.Default, TaskCreationOptions.None, "Progress");
			//while (!tmm.IsRegistered(task)) { Thread.Sleep(10); }
		}

		/// <summary>
		/// Sends the X coordinate to the PostCoordinates task.
		/// </summary>
		/// <param name="width"></param>
		public void SendX()
		{
			// Yes, this this silly, but I want to parallelize as much of this code as possible.

			// This loop itself can be run concurrently, though because it blocks when adding
			// messages to the queue, there's little point in doing so.
			Parallel.For(0, width, x => tmm.PostMessage("CoordQueue", new XCoordMessage(x)));

			// TODO: Research, can a task still be running when the for loop exits?
		}

		/// <summary>
		/// Sends the (x, y) coordinate to the ComputeIterations task.
		/// </summary>
		/// <param name="obj"></param>
		public void PostCoordinates(object obj)
		{
			// When we get the the x coord and vertical height, post the complete coordinate.
			tmm.RegisterTask(Task.Current, "CoordQueue");
			bool stopped = false;

			while (!stopped)
			{
				TaskMessage tm = tmm.GetMessage();

				if (tm.Message is StopMessage)
				{
					stopped = true;
				}
				else if (tm.Message is XCoordMessage)
				{
					XCoordMessage xch = (XCoordMessage)tm.Message;

					for (int y = 0; y < height; y++)
					{
						tmm.PostMessage("IterationQueue", new CoordMessage(xch.x, y));
					}
				}
			}
		}

		/// <summary>
		/// Sends the computed iteration to the UpdateBitmap task.  Each task works on one (x,y) coordinate.
		/// </summary>
		public void ComputeIterations(object obj)
		{
			// When we get the coordinate, compute the # of iterations before escape and 
			// post the result.
			tmm.RegisterTask(Task.Current, "IterationQueue");
			bool stopped = false;

			while (!stopped)
			{
				TaskMessage tm = tmm.GetMessage();

				if (tm.Message is StopMessage)
				{
					stopped = true;
				}
				else if (tm.Message is CoordMessage)
				{
					CoordMessage cm = (CoordMessage)tm.Message;
					ComplexNumber z = p2;
					z.Re = p2.Re + (cm.x * xstep);
					z.Im = p1.Im-cm.y*ystep;
					ComplexNumber C = z;
					int iteration = 0;

					while ( (z.Modulus < escapeRadius) && (iteration < maxIteration) )
					{
						z = z * z + C;
						iteration++;
					}

					int colorIndex = 0;

					if (iteration < maxIteration)
					{
						z = z * z + C; iteration++;
						z = z * z + C; iteration++;
						double mu = iteration - (Math.Log(Math.Log(z.Modulus))) / logEscapeRadius;
						colorIndex = (int)(mu / maxIteration * 768);
					}

					 tmm.PostMessage("BitmapQueue", new CoordColorMessage(cm.x, cm.y, colorIndex));
				}
			}
		}

		/// <summary>
		/// Updates the bitmap.
		/// </summary>
		/// <param name="obj"></param>
		public void UpdateBitmap(object obj)
		{
			// When we get the computed iterations for the coordinate, put it in the bitmap.
			tmm.RegisterTask(Task.Current, "BitmapQueue");
			bool stopped = false;

			while (!stopped)
			{
				TaskMessage tm = tmm.GetMessage();

				if (tm.Message is StopMessage)
				{
					stopped = true;
				}
				else if (tm.Message is CoordColorMessage)
				{
					CoordColorMessage cm = (CoordColorMessage)tm.Message;
					int colorIndex=cm.colorIndex;

					if ((colorIndex < 0) || (colorIndex >= 768))
					{
						colorIndex = 0;
					}

					int index = (cm.y * width + cm.x) * 4;
					argb[index] = colors[colorIndex].B;
					argb[index + 1] = colors[colorIndex].G;
					argb[index + 2] = colors[colorIndex].R;
					argb[index + 3] = 255;

					// See comments in UpdateProgress for why we do this rather than
					// another task that updates the progress bar.
					Interlocked.Increment(ref Tasks.progressValue);
					// tmm.PostMessage("UpdateQueue", UpdateProgressMessage.msg);
				}
			}
		}

		/// <summary>
		/// Updates the progress bar.  One thread is writing to the bar, so
		/// this is "safe", even though it's not the main application thread.
		/// If our TMM forced DoEvents when Wait is called by the application
		/// thread, then we could use BeginInvoke to get the progress bar update
		/// over onto the application thread.
		/// </summary>
		/// <param name="obj"></param>
		public void UpdateProgress(object obj)
		{
			tmm.RegisterTask(Task.Current, "UpdateQueue");
			bool stopped = false;
			int count = 0;
			Form form = progress.FindForm();

			while (!stopped)
			{
				TaskMessage tm = tmm.GetMessage();

				if (tm.Message is StopMessage)
				{
					stopped = true;
				}
				else if (tm.Message is UpdateProgressMessage)
				{
					// This locks up the app.  So much for cross-thread UI updates.
					// progress.Value = count++;

					// And when we do this, it cripples the tasks.
					form.Invoke((MethodInvoker)delegate
					{
						// If we do ++count, we get an exception.
						// TODO: Why isn't this exception percolated up to the application?  Is the TMM eating them?
						progress.Value = count++;
					});
				}
			}
		}
	}
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Architect Interacx
United States United States
Blog: https://marcclifton.wordpress.com/
Home Page: http://www.marcclifton.com
Research: http://www.higherorderprogramming.com/
GitHub: https://github.com/cliftonm

All my life I have been passionate about architecture / software design, as this is the cornerstone to a maintainable and extensible application. As such, I have enjoyed exploring some crazy ideas and discovering that they are not so crazy after all. I also love writing about my ideas and seeing the community response. As a consultant, I've enjoyed working in a wide range of industries such as aerospace, boatyard management, remote sensing, emergency services / data management, and casino operations. I've done a variety of pro-bono work non-profit organizations related to nature conservancy, drug recovery and women's health.

Comments and Discussions