|
|||||||||||||||||||||||||||||||||||||||||||||
|
|||||||||||||||||||||||||||||||||||||||||||||
|
Announcements
Want a new Job?
Chapters
Services
Feature Zones
|
Part I - Concurrent Programming - A Primer IntroductionMy purpose to writing the previous article, Concurrent Programming - A Primer, was to investigate Microsoft's Parallel FX library (PFX). As was made clear by various experts, PFX does not address the issue of application state and shared memory synchronization. One of the features of Erlang that enamored me was the concept of message passing between tasks. Specifically, the thread receiving the message gets a local copy of the message being sent by the sender thread. This completely eliminates synchronization issues. The purpose of this article is to investigate a simple TaskMessageManager (TMM) implementation that works with PFX to add serialized messaging between tasks. About The CodeIf you download the code, you will also need to install the Parallel FX library. Also, and very important, if you compile the code in Debug mode, uncheck the "Define DEBUG constant" in the properties--the debug messages that the TMM outputs will kill the performance of the application when testing the TMM mode. Visual Studio 2008 is required. Basic ArchitectureTasks register themselves with the TMM, specifying the message queue on which they are listening. The task blocks until a message is placed into the message queue. Tasks can share the same message queue so that work can be distributed among tasks. Rather than use the SelfReplicating flag option (see PFX Problems below), the application must set up tasks to run concurrently against a specific message queue. Messages are retrieved from the queue as the task requests them, meaning that it has completed its current work. A couple built-in message (cancel and done) are used, respectively, to tell the task to terminate and to indicate to the task that all its work is complete. In both cases, that task terminates. If the queue is shared by several tasks, they all terminate (a sort of broadcast to all tasks listening to the message queue). What The TaskMessageManager DoesThe TMM sets up message queues in which the main application and its tasks can communicate between each other, initiating work based on the message type and contents. Furthermore, messages are copied, so that the receiver thread does not directly reference the sender thread's message. The TMM is a thin API which the developer can use to reduce the setup work necessary to achieve the same functionality. What The TaskMessageManager Does Not DoThe TMM is an experiment, one that I plan to continue further. It is not intended to be used in production code. There are clearly performance issues that will probably never be solvable in C#. There are also core architectural issues that I imagine can only be solved with a VM, like Erlang's, that facilitates moving of data between tasks as values and doesn't even include constructs for synchronization. On the other hand, the careful use of the TMM may result in an application benefiting from synchronization-less task communication. And of course, the TMM doesn't eliminate synchronization issues between tasks, it merely moves the problem from one the developer has to deal with to one the TMM deals with for the developer. There are lots of lock statements in the TMM to control access to the message queues, and of course, there's the serialization process as well, which is ripe with performance and usability issues. This is true as well with PFX--it makes it easier for the developer to add concurrency into an application by taking the thread management problem off of the developer's hands and putting into PFX. ImplementationInterfacesThere are two key interfaces, public interface IClonableMessage
{
IMessage DeepClone();
}
then the UsageA Simple ExampleIn this simple example, two tasks are created, and the application waits for the tasks to register (see PFX Problems below): task1 = Task.Create(ATask, null, TaskManager.Default, TaskCreationOptions.None, "Task1");
task2 = Task.Create(ATask, null, TaskManager.Default, TaskCreationOptions.None, "Task2");
while (!tmm.IsRegistered(task1)) {Thread.Sleep(10);}
while (!tmm.IsRegistered(task2)) { Thread.Sleep(10); }
The task registers itself, and starts listening to the same message queue, specifying a 100ms timeout. static void ATask(object obj)
{
tmm.RegisterTask(Task.Current, "TaskQueue");
bool stopped = false;
while (!stopped)
{
TaskMessage tm = tmm.GetMessage(100);
...
If a task specifies a timeout, the while (!stopped)
{
TaskMessage tm = tmm.GetMessage(100);
Debug.WriteLine("!tmm: " + Task.Current.Name + ": "+tm.Message.ToString());
if (tm.Message is SayHelloMessage)
{
// executes in either task 1 or task 2.
Console.WriteLine("Hello!");
stopped = true;
}
else if (tm.Message is RequestHelloMessage)
{
// executes in task 1.
// Post the message queue rather than a specific task.
tmm.PostMessage("TaskQueue", new SayHelloMessage());
stopped = true;
}
else if (tm.Message is StopMessage)
{
stopped = true;
}
else if (tm.Message is NoMessage)
{
}
}
The application posts tmm.PostMessage("TaskQueue", new RequestHelloMessage());
tmm.Wait();
The message queue monitors the By looking at the trace of this application when it is run, you will see the two tasks working:
A different run shows that Task1 received the message first:
Concurrent Application PatternsAt this point, it's useful to start identifying concurrent programming patterns. Process Work Then TerminateOne kind of concurrent application pattern is a "Process Work Then Terminate" pattern, which is basically just a task that waits for a message, acts on that message, then terminates itself. The above example illustrates this pattern. Process Messages Then TerminateA slightly more complex version processes all the messages in its queue, then terminates. This assumes that the queue has been fully loaded before the task begins, otherwise the task is racing against the loader. This pattern is currently not possible (see PFX Problems below). Process Work Until StoppedA more advanced pattern than the previous requires that the last message added to the queue is A More Complex ExampleGoing back to the Mandelbrot example5 I used in the first article, I'm going to show how the code is changed yet again, illustrating tasks and task messaging this time. What you will immediately notice about this architecture is that the original code has been broken up into small, autonomous units. In this version though, there is still a global set of variables that is referenced by the tasks: readonly double xstep;
readonly double ystep;
readonly double escapeRadius;
readonly double logEscapeRadius;
readonly int width;
readonly int height;
readonly byte[] argb;
readonly Color[] colors;
readonly ComplexNumber p1;
readonly ComplexNumber p2;
readonly int maxIteration;
readonly ProgressBar progress;
On my todo list is moving these variables into their respective tasks and using the task parameter to initialize them when the task is created. For the moment, they are designated The MessagesThere are three messages. /// <summary>
/// The initial task is primed with width messages, each an x coordinate.
/// </summary>
public class XCoordMessage : IMessage, IClonableMessage
{
public int x;
public XCoordMessage(int x)
{
this.x = x;
}
public IMessage DeepClone()
{
return new XCoordMessage(x);
}
}
/// <summary>
/// The coordinate message is sent to the task that computes
/// the iterations.
/// </summary>
public class CoordMessage : IMessage, IClonableMessage
{
public int x;
public int y;
public CoordMessage(int x, int y)
{
this.x = x;
this.y = y;
}
public IMessage DeepClone()
{
return new CoordMessage(x, y);
}
}
/// <summary>
/// This message is sent to the task responsible for updating
/// the bitmap.
/// </summary>
public class CoordColorMessage : IMessage, IClonableMessage
{
public int x;
public int y;
public int colorIndex;
public CoordColorMessage(int x, int y, int colorIndex)
{
this.x = x;
this.y = y;
this.colorIndex = colorIndex;
}
public IMessage DeepClone()
{
return new CoordColorMessage(x, y, colorIndex);
}
}
The first message sets up the task to generate the full (x,y) coordinate that is then passed to the iteration task. The iteration task, when the iteration is determined, sends a message to the task that is responsible for updating the bitmap. The TasksThere are three tasks. Each task checks for a PostCoordinatesThis task posts the (x, y) coordinates of the pixel to compute to the public void PostCoordinates(object obj)
{
// When we get the the x coord
// and vertical height, post the complete coordinate.
tmm.RegisterTask(Task.Current, "CoordQueue");
bool stopped = false;
while (!stopped)
{
TaskMessage tm = tmm.GetMessage();
if (tm.Message is StopMessage)
{
stopped = true;
}
else if (tm.Message is XCoordMessage)
{
XCoordMessage xch = (XCoordMessage)tm.Message;
for (int y = 0; y < height; y++)
{
tmm.PostMessage("IterationQueue", new CoordMessage(xch.x, y));
}
}
}
}
ComputeIterationsThis task computes the number of iterations before z escapes. It operates on the (x, y) coordinate received in the message for its queue. public void ComputeIterations(object obj)
{
// When we get the coordinate,
// compute the # of iterations before escape and
// post the result.
tmm.RegisterTask(Task.Current, "IterationQueue");
bool stopped = false;
while (!stopped)
{
TaskMessage tm = tmm.GetMessage();
if (tm.Message is StopMessage)
{
stopped = true;
}
else if (tm.Message is CoordMessage)
{
CoordMessage cm = (CoordMessage)tm.Message;
ComplexNumber z = p2;
z.Re = p2.Re + (cm.x * xstep);
z.Im = p1.Im-cm.y*ystep;
ComplexNumber C = z;
int iteration = 0;
while ( (z.Modulus < escapeRadius) && (iteration < maxIteration) )
{
z = z * z + C;
iteration++;
}
int colorIndex = 0;
if (iteration < maxIteration)
{
z = z * z + C; iteration++;
z = z * z + C; iteration++;
double mu = iteration - (Math.Log(Math.Log(z.Modulus))) / logEscapeRadius;
colorIndex = (int)(mu / maxIteration * 768);
}
tmm.PostMessage("BitmapQueue", new CoordColorMessage(cm.x, cm.y, colorIndex));
}
}
}
UpdateBitmapThis task receives the (x, y) coordinate of the pixel as well as the color index computed by the task above. It is responsible for posting the color to the bitmap. public void UpdateBitmap(object obj)
{
// When we get the computed iterations
// for the coordinate, put it in the bitmap.
tmm.RegisterTask(Task.Current, "BitmapQueue");
bool stopped = false;
while (!stopped)
{
TaskMessage tm = tmm.GetMessage();
if (tm.Message is StopMessage)
{
stopped = true;
}
else if (tm.Message is CoordColorMessage)
{
CoordColorMessage cm = (CoordColorMessage)tm.Message;
int colorIndex=cm.colorIndex;
if ((colorIndex < 0) || (colorIndex >= 768))
{
colorIndex = 0;
}
int index = (cm.y * width + cm.x) * 4;
argb[index] = colors[colorIndex].B;
argb[index + 1] = colors[colorIndex].G;
argb[index + 2] = colors[colorIndex].R;
argb[index + 3] = 255;
// See comments in UpdateProgress for why we do this rather than
// another task that updates the progress bar.
Interlocked.Increment(ref Tasks.progressValue);
// tmm.PostMessage("UpdateQueue", UpdateProgressMessage.msg);
}
}
}
The Progress Bar Update ProcessThe progress bar is updated in the main application thread. Because more than one protected void UpdateUI()
{
// on 32 bit systems, this is atomic.
int val = Tasks.progressValue;
progress.Value = val;
Application.DoEvents();
Thread.Sleep(100);
}
The upshot though is that this violates the model of keeping data local to the task. I am leaving this issue for a future enhancement. Task InitializationTasks are initialized via a compiler option, as either one public void Initialize()
{
Task task;
#if DualTask
for (int i = 0; i < System.Environment.ProcessorCount; i++)
#endif
{
#if DualTask
#else
int i = 0;
#endif
task = Task.Create(PostCoordinates, null, TaskManager.Default,
TaskCreationOptions.None, "Coordinates"+i);
while (!tmm.IsRegistered(task)) { Thread.Sleep(10); }
task = Task.Create(ComputeIterations, null, TaskManager.Default,
TaskCreationOptions.None, "Iterations"+i);
while (!tmm.IsRegistered(task)) { Thread.Sleep(10); }
task = Task.Create(UpdateBitmap, null, TaskManager.Default,
TaskCreationOptions.None, "Bitmap" + i);
while (!tmm.IsRegistered(task)) { Thread.Sleep(10); }
}
}
The Main Application ThreadRather than the two loops for calculating the iterations of each fractal, the main application primes the first task with x coordinate messages and then waits for all the tasks to complete: Tasks tasks = new Tasks(xStep, yStep, escapeRadius, logEscapeRadius, p.Width, p.Height,
maxIteration, P1, P2, argb, Colors, progress);
tasks.Initialize();
tasks.SendX();
// Wait for the coord queue to flush.
TaskMessageManager.Default.Wait("CoordQueue", UpdateUI);
TaskMessageManager.Default.PostMessage("CoordQueue", StopMessage.Default);
// Wait next for the iteration queue to flush.
TaskMessageManager.Default.Wait("IterationQueue", UpdateUI);
TaskMessageManager.Default.PostMessage("IterationQueue", StopMessage.Default);
// Then wait for the bitmap queue to flush.
TaskMessageManager.Default.Wait("BitmapQueue", UpdateUI);
TaskMessageManager.Default.PostMessage("BitmapQueue", StopMessage.Default);
// Then wait for the bitmap queue to flush.
//TaskMessageManager.Default.Wait("UpdateQueue", UpdateUI);
//TaskMessageManager.Default.PostMessage("UpdateQueue", StopMessage.Default);
TaskMessageManager.Default.Wait(UpdateUI);
The salient point here is that the application posts a Performance ResultsThe resulting performance is illustrated quite dramatically by this chart4:
You will note, to one's amazement, that parallelizing the outer loop (using Now, the interesting thing is that, depending on what area of the fractal one is rendering, the performance is sometimes better with PFX. Perhaps, by luck, I found an area of the fractal where PFX performs worse. This is definitely something that deserves a lot more investigation. PFX ProblemsThe most important result of this experiment was the problems I discovered with PFX. Hopefully, they will be addressed in future releases, but if not, replacing the PFX task manager and Automatic Task StartupThe first problem is that tasks, once created, immediately start. This means that I cannot register the Ideally though, I would think that the Self-Replication Extra EventsWhen self-replicating, the
you can see that Task1 is registered twice (the PFX Task Instance Re-use?With self replication enabled, I have seen this exception occur: if (taskQueueMap.ContainsKey(task))
{
throw new TaskMessageManagerException("The task " + task.Name +
" is already registered.");
}
I cannot reliably replicate this problem, but it indicates that a PFX Bugs Of ConcernBecause a task may be waiting a long time for a message, this "known correctness bug" (I guess, saying simply "bug" isn't in vogue anymore): "Tasks blocked for significant periods of time may cause runaway thread injection and out of memory conditions when the default policy is used."2 is of concern. Hopefully, this will be fixed. Core vs. Thread UtilizationTasks are assigned to cores, and in self-replication, tasks are replicated when a core becomes available.3 To me, this is not ideal. It assumes that a task will consume 100% of its core, which is not always the case. What if the task is waiting for an I/O completion event? With regards to the TMM, what if a task is waiting for a message? Can no other PerformanceI fully expected that the Mandelbrot rendering with PFX would be better than the single threaded operation, and I'm actually dismayed that it's worse! It would be great if someone from the PFX team could explain this result. TMM ProblemsTask Message Manager InstancesOnly one TMM is permitted. The task always interacts with the default TMM. The whole concept of task queues might be better managed by allowing for multiple TMM's, each managing a single queue. PostMessage Overloads
Self-Replicating TasksThe TMM does not work with self-replicating tasks. A whole section of the code has been commented out regarding this issue and "smart" removal of tasks no longer needed because there's nothing in the queue to process. This may be mixing apples and oranges regarding task management, and it probably conflicts with PFX's Stopping TasksIn the "Process Work Until Stopped" pattern, the TMM sends a BoxingUsing structs for messages is essentially pointless, I would think, because they are boxed when referenced via the interface (though I need to really check on this). This is another reason that the .NET environment is unsuitable in certain scenarios for inter-task messaging. Those scenarios can be identified as having tasks that are so short-lived that serialization of the message is a measurable percentage of the task itself. Multiple Queues Per Task Message ManagerAs eluded to above, multiple queues per TMM causes unnecessary locking when adding and removing messages to a particular queue, as the entire queue collection is being locked, rather than just the queue itself. This will block other tasks that are trying to retrieve a message from a different queue altogether. A future performance improvement is to utilize only one queue per TMM, which will necessitate creating TMM instances. Other ObservationsDebug MessagesDebug messages in the TMM really slow it down. Too Many Active Threads Per CoreIf a core is task switching between numerous active threads, the task switching itself starts to bog down the core. This requires further investigation. Understand Your Application's TasksWorking with the TMM, it's clear that a careful understanding of the parallel tasks is required in order to take advantage of concurrent programming. Understand Your Concurrent Programming ToolsetIt also is clear that, in addition to understanding your application requirements, one needs to clearly understand the workings of any library that is facilitating the management of tasks and coordination (such as messaging) between tasks. I would hope that the PFX team produces high quality documentation so that the developer can clearly understand how to best take advantage of PFX and concurrent programming. The pitfall would be to believe that PFX makes concurrent programming easy. It does not--the CTP is little more than syntactical sugar regarding the process of managing threads yourself. The performance test above shows that parallelizing the outer loop does not improve performance--in fact, it degrades it. If anyone on the PFX team can explain this behavior and make post on this article about it, I would greatly appreciate it. Future Improvements
ConclusionWhat I have attempted to do here is put together a use case for PFX that probably is outside of the scope of what the designer of PFX had planned for. Also, I wanted to experiment with synchronization free inter-task communication to study the advantages and disadvantages of such an approach. It may turn out that PFX is not suitable for this kind of work, regardless of other more fundamental issues such as serialization performance (which may make .NET languages, in general, unsuitable for this approach). However, I do hope that the creators of PFX will look at this work and consider some of my suggestions. I plan on continuing this investigation in future articles. Performance measurement and optimization is one such topic that intrigues me--how to measure performance changes with PFX, or TMM, and so forth, as well as performance optimization within a core, since quite frankly, I expect to be working in .NET languages for quite a while. References and Notes1 - Canceling a task 2 - Known Correctness Bugs with PFX 3 - First Look at Parallel FX and self-replicating tasks 4 - These tests were done on a Sony Viao VGN-FE890 laptop, running 32 bit Vista Business with 2GB RAM, T5500 processor at 1.66Ghz. The Mandelbrot configuration was: maxIteration = 300;
P1 = new ComplexNumber(-0.669912107426552, 0.451398357629374);
P2 = new ComplexNumber(-0.672973630864052, 0.449948162316874);
and the drawing area was 1254w x 594h.
|
||||||||||||||||||||||||||||||||||||||||||||