Introduction
How about a simple parallel ForEach iterator that works with .NET 3.0 until Task Parallel Library is available with .NET 4.0.
Background
Task Parallel Library (TPL) will certainly help developers squeeze every bit of processing power out of their hardware, however Microsoft does not offer TPL for .NET 3.0 or .NET 3.5. TPL and Parallel LINQ will be offered as part of .NET 4.0. Until Microsoft releases .NET 4.0 or developers upgrade their development environments, here is a “Poor Man’s Parallel.ForEach” for the rest of us.
There is vast amount of literature about parallel processing, however in a nutshell I will try to explain the motivation for this code and offer my warnings: Reasonably priced laptops come with two CPU cores and average Windows servers have eight CPU cores. Even if you have ten thousand elements to iterate, a serial iteration such as foreach, while or for can only utilize single core where all the other cores are idling. If a particular procedure doesn't need to access objects that constantly share state, spreading processing over multiple threads provides proven performance gains. Being careful about objects sharing state is utterly important, if a coder blindly starts converting traditional serial iterations to parallel iterations; results will not be any different than giving a chain-saw to an 8 year old.
Using the Code
The usage is of the code is exactly as in TPL. Very simple and concise:
var orders = GetOrders();
Parallel.ForEach(orders, order => {
if(order.IsValid)
{
order.Process();
}
});
Parallel.ForEach
I used asynchronous delegate invocation. Begin/End pattern is relatively simple to read and maintain. There is possibly an overhead using delegates over using System.Threads. Let me know how you can improve this code. I am looking forward to hear your comments.
public class Parallel
{
public static int NumberOfParallelTasks;
static Parallel()
{
NumberOfParallelTasks = Environment.ProcessorCount;
}
public static void ForEach<T>(IEnumerable<T> enumerable,Action<T> action)
{
var syncRoot = new object();
if (enumerable == null ) return;
var enumerator = enumerable.GetEnumerator();
InvokeAsync<T> del = InvokeAction;
var seedItemArray = new T[NumberOfParallelTasks];
var resultList = new List<IAsyncResult>(NumberOfParallelTasks);
for (int i = 0; i < NumberOfParallelTasks; i++)
{
bool moveNext;
lock (syncRoot)
{
moveNext = enumerator.MoveNext();
seedItemArray[i] = enumerator.Current;
}
if (moveNext)
{
var iAsyncResult= del.BeginInvoke
(enumerator, action, seedItemArray[i], syncRoot,i, null,null);
resultList.Add(iAsyncResult);
}
}
foreach(var iAsyncResult in resultList)
{
del.EndInvoke(iAsyncResult);
iAsyncResult.AsyncWaitHandle.Close();
}
}
delegate void InvokeAsync<T>(IEnumerator<T> enumerator,
Action<T> achtion, T item, object syncRoot, int i);
static void InvokeAction<T>(IEnumerator<T> enumerator,Action<T> action,
T item,object syncRoot,int i )
{
if(String.IsNullOrEmpty(Thread.CurrentThread.Name))
Thread.CurrentThread.Name =
String.Format("Parallel.ForEach Worker Thread No:{0}", i);
bool moveNext=true;
while (moveNext)
{
action.Invoke(item);
lock (syncRoot)
{
moveNext = enumerator.MoveNext();
item = enumerator.Current;
}
}
}
}
Changes
There is always room for improvement. After publishing this article, I dug up other articles about BeginInvoke. Calling EndInvoke and AsyncWaitHandel.Close() is suggested not to leave garbage collection to chance. I tested it and it works fine and I didn't see performance overhead.
foreach (var iAsyncResult in resultList)iAsyncResult.AsyncWaitHandle.WaitOne();
The above changed to the following code:
foreach(var iAsyncResult in resultList)
{
del.EndInvoke(iAsyncResult);
iAsyncResult.AsyncWaitHandle.Close();
}
History
- 4th February, 2009: Initial post
| You must Sign In to use this message board. |
|
|
 |
|
 |
Nice article but if you are doing any form of multi threaded processing you have to check out the Power Threading library.
http://www.wintellect.com/PowerThreading.aspx[^]
It's free and written by Jeffrey Richter, basically the definitive guru on multi threaded programming (amongst other things) and there is a great video with Jeffrey showing how it works and how to use it.
The very interesting thing about it is it allows you to write asynchronous code in a "synchronous" manner. Also means you don't need to use any nasty 'lock' statements 
|
| Sign In·View Thread·PermaLink | |
|
|
|
 |
|
 |
Hallo Emre,
I like your article. I have some numerical algorithms that I needed to optimize. I found the best way was to first determine the number of cores are available, then create a thread per core and chop up the itterations. When I used the Parrallel.For I found I get slower, I think this is because of Amdols law and that there is an optimal size of work packet for a givn task. Here is the code to show you what I mean.
public void ThreadedDemo2() { double Total = 0; int Itterations = 200000000; int NumberOfThreads = 3; runningWorkers = NumberOfThreads;
int[] ItterateFrom; int[] ItterateTo;
DateTime Start = DateTime.Now;
ItterateFrom = new int[NumberOfThreads]; ItterateTo = new int[NumberOfThreads]; Totals = new double[NumberOfThreads]; int Step = Itterations / NumberOfThreads; int StartingAt = 0; for (int i = 0; i < NumberOfThreads; i++) { ItterateFrom[i] = StartingAt; ItterateTo[i] = StartingAt + Step; StartingAt = ItterateTo[i]; Totals[i] = 0; } ItterateTo[NumberOfThreads - 1] = Itterations;
Thread[] threads = new Thread[NumberOfThreads];
for (int i = 0; i < NumberOfThreads; i++) { DataForThread2 z = new DataForThread2(); z.i = i; z.ItterateFrom = ItterateFrom[i]; z.ItterateTo = ItterateTo[i]; threads[i] = new Thread(DoWork); threads[i].Start(z); }
for (int i = 0; i < NumberOfThreads; i++) { threads[i].Join(); Total += Totals[i]; }
DateTime Finish = DateTime.Now; TimeSpan delta = Finish.Subtract(Start); Console.WriteLine("ThreadedDemo2 Total = " + Total + " In " + delta.TotalMilliseconds + " ms"); }
public static void DoWork(object z) {
Thread.BeginThreadAffinity();
DataForThread2 zz = (DataForThread2)z; double a = aa; double b = bb; double c = cc;
for (int x = zz.ItterateFrom; x < zz.ItterateTo; x++) { Totals[zz.i] += a * x * x + b * x + c; } lock (workerlocker) { runningWorkers--; Monitor.Pulse(workerlocker); } Thread.EndThreadAffinity();
}
I also was looking into doing this in F#. I had the intention of using asynchronous work flows but could not find a way to make this work from C#. I have the feeling this might not work as fast as the threaded example but thought it was worth looking at because like the parrelel.For it automatically uses all available cpus
file.fs ==== #light module UserScript open System let Demo() = let a = 10 let b = 20 let c = 30 let total = seq { 0 .. 200000000 } |> Seq.fold (fun acc x -> acc + a * x * x + b * x + c) 0; total
Then in C# I call the above as follows:
public void test() { DateTime Start = DateTime.Now; double Total = UserScript.Demo(); DateTime Finish = DateTime.Now; TimeSpan delta = Finish.Subtract(Start); Console.WriteLine("Total = " + Total + " In " + delta.TotalMilliseconds + " ms");
}
Let me know what you think
have a nice day...
Nigel...
|
| Sign In·View Thread·PermaLink | |
|
|
|
 |
|
 |
Nigel,
I believe you are right, you have specialized algorithm and it is very likely when you put time in optimizing threading you will get better results than my Parallel.ForEach.
In my tests also I sometimes got better results if I used more threads than number of cores. My explanation to that is OS thread scheduler is also helping by utilizing any idle cycle within the iteration work. Even if we have 2 real cores, when we request OS to give us 16 threads it will give us and it will manage scheduling tasks between two real cores. If a single iteration work is idling (waiting for disk, memory or network I/O, graphics) OS will do a great job of utilizing that idle cycles for other threads.
Figuring optimum number of threads is my Parallel.ForEach's obvious weakness, I chose a conservative approach and used number of CPUs for that. In relation to that , another weakness of this code is that is totally agnostic of environment state and ambient tasks. For example if machine has 4 cores but 2 of them is busy, or two or more Parallel.ForEach statement are nested this code will overburden OS by too many unnecessary threads. That's why I am calling this "Poor Man's Parallel.FoEach"
I am open to any suggestions on how to make this code better.
Emre
|
| Sign In·View Thread·PermaLink | |
|
|
|
 |
|
 |
When I tried the code as presented the program always got an error in InvokeAction at the statement: item = enumerator.Current . The errror was "Enumeration already finished" . I put a try {} catch {} around the statement and ignored the error on the last one and it seems to work fine. I'll know more after I take a serious look at what is going on, for now I just copied and tested.
Thanks for the code and the idea. I'm looking forward to .Net 4.0
modified on Friday, February 6, 2009 2:19 PM
|
| Sign In·View Thread·PermaLink | |
|
|
|
 |
|
 |
You were right, it is embarrassing I fixed the code. I really have good reason why the code was broken, but it will take too long to explain. Anyways I submitted the corrected code, but until the published page refreshes, here is the fixed code:
public class Parallel { public static int NumberOfParallelTasks; static Parallel() { NumberOfParallelTasks = Environment.ProcessorCount; }
public static void ForEach(IEnumerable enumerable,Action action) { var syncRoot = new object();
if (enumerable == null ) return;
var enumerator = enumerable.GetEnumerator(); InvokeAsync del = InvokeAction; var seedItemArray = new T[NumberOfParallelTasks]; var resultList = new List(NumberOfParallelTasks);
for (int i = 0; i < NumberOfParallelTasks; i++) { lock (syncRoot) { if (enumerator.MoveNext() == false) break; seedItemArray[i] = enumerator.Current; }
var iAsyncResult= del.BeginInvoke(enumerator, action, seedItemArray[i], syncRoot,i, null,null); resultList.Add(iAsyncResult); } foreach(var iAsyncResult in resultList) { del.EndInvoke(iAsyncResult); iAsyncResult.AsyncWaitHandle.Close(); } }
delegate void InvokeAsync(IEnumerator enumerator, Action achtion, T item, object syncRoot, int i);
static void InvokeAction(IEnumerator enumerator,Action action, T item,object syncRoot,int i ) { if(String.IsNullOrEmpty(Thread.CurrentThread.Name)) Thread.CurrentThread.Name = String.Format("Parallel.ForEach Worker Thread No:{0}", i); loopStart:
action.Invoke(item); lock (syncRoot) { if(enumerator.MoveNext()==false) return; item = enumerator.Current; }
goto loopStart; } }
|
| Sign In·View Thread·PermaLink | |
|
|
|
 |
|
|
 |
|
 |
Can you please append the article and explain how this solution takes advantage of multiple core processors? Thanks
Josh Fischer
|
| Sign In·View Thread·PermaLink | |
|
|
|
 |
|
|
 |
|
|
 |
|
|