NParallel Chapter II: Loop and Task Parallelism
NParallel0.2 now supports Loop and Task Parallelism. This article is going to show you how and what is behind the scenes.
Introduction
Before reading this article, you might like to read my first article about NParallel
, which introduces all the basic concept for NParallel
. At first, NParallel
was designed as a simple library with asynchronous programming support (v.01 and 0.11). In this article, I will introduce some advanced parallelism introduced in NParallel0.2. The purpose of this article not only includes providing information about NParallel
, another way of parallel programming, but more importantly, aims to gather ideas and feedback from readers, so a better library can be designed and implemented.
The goal of this library is to "Make concurrency accessible for 'ordinary programmers'" and "Try out ideas of how Concurrency can be tackled". So the implementation focuses more on the interface design and user accessibility. It will not put efficiency as the most important factor. But you can see from the test case that it does have a significant performance boost on a Core-Duo machine.
The interface is somewhat similar to ParallelFX and OpenMP, and I was inspired by some ideas from them.
Parallel Loops, How to Use Them
In OpenMP, you can write code like this:
// OpenMP Loop Parallelism
#pragma omp for
for (int i = 0 ; i < PROBLEM_SIZE; ++ i)
{
// did time consuming work
Function(i);
}
The compiler will generate the multi-threading code for you. In C# with NParallel
, you can write similar code like this:
// NParallel loop parallel
NParallel.For (0, PROBLEM_SIZE -1, i=> {Function(i);});
Beware that the second parameter in NParallel.For
is toInclusive
, which means the value will be used in the for
evaluation. It might conflict with the habit of array traversal, but it looks better when I write a function to get the sum of an array from 1 to 100. This looks like the following:
// NParallel loop parallel
int result = NParallel.For (0, PROBLEM_SIZE -1,
i=> {return i;} ,
(priorSum, nextElem)=>(return priorSum + nextElem));
It's easy to write a result merger in NParallel
(mapped to reduction in OpenMP). You can also set the initial value for the merge result by setting the fifth method of NParallel.For
. Both versions of For
shown above will block until all work in the array is done. If you want to implement nowait
behavior in OpenMP, you have to use NParallel.Execute
on the For
invocation.
Similar logic goes for ForEach
, which changes the first two parameters to an IEnumerable
container.
NParallel.ForEach<int>(Enumerable.Range(1, testSize), i =>
{
int result = Function(i);
Console.WriteLine("{0}", result);
}
);
ForEach
also supports result merging, but the current implementation internally uses For
and needs to convert the IEnumerable
to Array
first so the performance is not very good.
Execute a Task
NParallel0.2 introduces the concept of a task. It is defined as shown below:
public class NTask<TState, TReturnValue>
{
public Func<TState, TReturnValue> Routine {get;set;}
public TState State {get;set;}
public NResult<TReturnValue> Result {get;set;}
public void Run() ;
}
A task is a simple wrapper of a NParallel.Execute
invocation, but it will be a useful construct if you want to manage the results yourself together with the calling context. It's also used in Task parallelism:
Task Parallelism
NParallel2.2 defines a new method called Do
, which accepts a task array as a parameter.
public static void Do(bool waitForCompletion, params NVoidFunc[] voidRoutines);
public static void Do<TState, TReturnValue>(bool waitForCompletion,
params NTask<TState, TReturnValue>[] tasks);
The function signatures speak for themselves, they are used to execute a list of operations in parallel, block or non-block (whether waitForCompletion
) .
Testcases
Three test projects are included in the code attached.
Console
Testcase - Test cases with a console application contain a testcase for most of the new constructs.WindowsApplication1
- GUI test case with recursive invocation toNParallel
. The same as in v0.1.1.ImageProcessing
- Test app with parallel image processing.
This testcase shows how easy it is to adopt NParallel
in a real project. Read the piece of code given below:
private Bitmap ProcessSingleThreaded(Bitmap bmp, PixelOP pixelOP)
{
...
unsafe
{
byte * ptrBmpData = (byte *)(void *)Scan0;
...
for (int y = 0; y < bmpout.Height; ++y)
{
for (int x = 0; x < nWidthPixel; ++x)
{
//...Processing the pixel
}
ptrBmpData += nOffset;
}//end for
}//end unsafe
...
return bmpout;
}
private Bitmap ProcessMultiThreaded(Bitmap bmp, PixelOP pixelOP)
{
...
unsafe
{
...
NParallel.For(0, bmpout.Height - 1, (index) =>
{
byte* ptrBmpData = (byte*)(void*)Scan0 + index * stride;
for (int x = 0; x < nWidthPixel; ++x)
{
//... Processing the Pixel
}
}
);
}//end for
...
return bmpout;
}
In the Code
Structure of the project in the attached code is like this:
NParallel
|== Implementation
|
|- NParallel - core lib logic
| > --Functional some FP wrappers
| > --NParallel core implementation
| >> -- APM_Impl default implementation of NParallel core logic with APM
|
|- Console Testcase - Testcases with a console application
|
|- WindowsApplication1 - GUI testcase with recursive invocation to NParallel.
|
|- ImageProcessing - test app with parallel image processing.
Behind the Scenes
A new interface is defined to abstract loop parallelism policy, like the old IParallelStrategy
, it can also be overridden with your own implementation.
/// <summary>
/// Interface for loop parallelism
/// </summary>
public interface IParallelLoopStrategy
{
/// <summary>
/// Get or Set the parallel thread(logical) count the loop parallelizer will spawn.
/// This count includes the master thread.
/// </summary>
int ParallelWorkerCount { get; set; }
/// <summary>
/// A parallel version of For with Reduction support
/// </summary>
/// <typeparam name="TReturnValue">Type for the result</typeparam>
/// <param name="fromInclusive">fromInclusive index</param>
/// <param name="toInclusive">toInclusive index</param>
/// <param name="individualRoutine">routine that will be applied to
/// each element in the array</param>
/// <param name="mergingRoutine">routine that will be used for aggregate
/// all the results from each thread</param>
/// <param name="initialValue">default value used to initialize the result,
/// will be used for each thread
/// (like firstprivate in OpenMP , like for addition, it should be 0
/// For Multiply, it should be 1...)</param>
/// <returns>The result for the aggregation/reduction</returns>
/// <remarks>The method will block current thread until all thread finishes,
/// just like a waiting #pragma omp for.
/// If you need non-blocking method, you can use it together with
/// NParallel.Execute</remarks>
T For<T>(int fromInclusive, int toInclusive, Func<int, T> individualRoutine,
Func<T, T , T> mergingRoutine , T defaultValue) ;
/// <summary>
/// Non reduction version for Parallel For
/// </summary>
/// <param name="fromInclusive">fromInclusive element</param>
/// <param name="toInclusive">toInclusive element</param>
/// <param name="individualRoutine">routine that will be applied to
/// each element of int from {fromInclusive , toInclusive}</param>
void For(int fromInclusive, int toInclusive, NStateVoidFunc<int> individualRoutine);
/// <summary>
/// Parallel version of ForEach without reduction
/// </summary>
/// <typeparam name="TReturnValue">element type in the container</typeparam>
/// <param name="enumerable">Enumerable container that will be traversed</param>
/// <param name="individualRoutine">routine that will be applied to each element
/// in the container</param>
void ForEach<T>(IEnumerable<T> enumerable, NStateVoidFunc<T> individualRoutine);
/// <summary>
/// Parallel version of ForEach with reduction
/// </summary>
/// <typeparam name="TElement">element type in the container</typeparam>
/// <typeparam name="TResult">element type that will return</typeparam>
/// <param name="enumerable">Enumerable container that will be traversed</param>
/// <param name="individualRoutine">routine that will be applied to each element
/// in the container</param>
/// <param name="mergingRoutine">routine that will be used for aggregate
/// all the results from each thread</param>
/// <param name="initialValue">default value used to initialize the result,
/// will be used for each thread
/// (like firstprivate in OpenMP , like for addition, it should be 0
/// For Multiply, it should be 1...)</param>
/// <returns>The result for the aggregation/reduction</returns>
TResult ForEach<TElement, TResult>(IEnumerable<TElement> enumerable,
Func<TElement, TResult> individualRoutine,
Func<TResult, TResult, TResult> mergingRoutine, TResult defaultValue);
}
The default implementation is NAPMLoopStrategyImpl
.
Beware
When using NParallel
with closure variables or ref
/out
parameter in the asyncRoutine
, performance drops significantly. So you have to keep the used one in the asynRoutine
locally, if state needs to be passed, use Execute<TState, TReturnValue>
.
ForEach
is not good enough to use for now, especially for large collections (ToArray
will be used internally).
History
- 2007-12-19: Release of NParallel0.2.
Next Steps
While currently, we have similar constructs like ParallelFX, except PLinQ, the PLINQ counterpart is not planned because that will be reinventing the wheel.
Inspired by MPI and Scala, I am planning to put ACTORS/MESSAGES construct into NParallel
.
Meanwhile, I would spend some time to make NParallel0.2 more stable.
Another possible feature is more choices in task scheduling, currently we only get static(as in OpenMP), maybe dynamic scheduling and task stealing will be introduced.