Click here to Skip to main content
15,867,453 members
Articles / Programming Languages / C#
Article

NParallel Chapter II: Loop and Task Parallelism

Rate me:
Please Sign up or sign in to vote.
4.84/5 (11 votes)
18 Dec 2007CPOL4 min read 42.3K   293   34   11
NParallel0.2 now supports Loop and Task Parallelism. This article is going to show you how and what is behind the scenes.

Introduction

Before reading this article, you might like to read my first article about NParallel, which introduces all the basic concept for NParallel. At first, NParallel was designed as a simple library with asynchronous programming support (v.01 and 0.11). In this article, I will introduce some advanced parallelism introduced in NParallel0.2. The purpose of this article not only includes providing information about NParallel, another way of parallel programming, but more importantly, aims to gather ideas and feedback from readers, so a better library can be designed and implemented.

The goal of this library is to "Make concurrency accessible for 'ordinary programmers'" and "Try out ideas of how Concurrency can be tackled". So the implementation focuses more on the interface design and user accessibility. It will not put efficiency as the most important factor. But you can see from the test case that it does have a significant performance boost on a Core-Duo machine.

The interface is somewhat similar to ParallelFX and OpenMP, and I was inspired by some ideas from them.

Parallel Loops, How to Use Them

In OpenMP, you can write code like this:

C#
// OpenMP Loop Parallelism
#pragma omp for
for (int i = 0 ; i < PROBLEM_SIZE; ++ i)
{
    // did time consuming work
    Function(i);
}

The compiler will generate the multi-threading code for you. In C# with NParallel, you can write similar code like this:

C#
// NParallel loop parallel
NParallel.For (0, PROBLEM_SIZE -1, i=> {Function(i);});

Beware that the second parameter in NParallel.For is toInclusive, which means the value will be used in the for evaluation. It might conflict with the habit of array traversal, but it looks better when I write a function to get the sum of an array from 1 to 100. This looks like the following:

C#
// NParallel loop parallel
int result = NParallel.For (0, PROBLEM_SIZE -1,
        i=> {return i;} ,
        (priorSum, nextElem)=>(return priorSum + nextElem));

It's easy to write a result merger in NParallel (mapped to reduction in OpenMP). You can also set the initial value for the merge result by setting the fifth method of NParallel.For. Both versions of For shown above will block until all work in the array is done. If you want to implement nowait behavior in OpenMP, you have to use NParallel.Execute on the For invocation.

Similar logic goes for ForEach, which changes the first two parameters to an IEnumerable container.

C#
NParallel.ForEach<int>(Enumerable.Range(1, testSize), i =>
    {
        int result = Function(i);
                Console.WriteLine("{0}", result);
    }
);

ForEach also supports result merging, but the current implementation internally uses For and needs to convert the IEnumerable to Array first so the performance is not very good.

Execute a Task

NParallel0.2 introduces the concept of a task. It is defined as shown below:

C#
public class NTask<TState, TReturnValue>
{
    public Func<TState, TReturnValue> Routine {get;set;}
    public TState State {get;set;}
    public NResult<TReturnValue> Result {get;set;}
    public void Run() ;
}

A task is a simple wrapper of a NParallel.Execute invocation, but it will be a useful construct if you want to manage the results yourself together with the calling context. It's also used in Task parallelism:

Task Parallelism

NParallel2.2 defines a new method called Do, which accepts a task array as a parameter.

C#
public static void Do(bool waitForCompletion, params NVoidFunc[] voidRoutines);
public static void Do<TState, TReturnValue>(bool waitForCompletion,
        params NTask<TState, TReturnValue>[] tasks);

The function signatures speak for themselves, they are used to execute a list of operations in parallel, block or non-block (whether waitForCompletion) .

Testcases

Three test projects are included in the code attached.

  1. Console Testcase - Test cases with a console application contain a testcase for most of the new constructs.
  2. WindowsApplication1 - GUI test case with recursive invocation to NParallel. The same as in v0.1.1.
  3. ImageProcessing - Test app with parallel image processing.
Screenshot - showcase1219.0.2.np.PNG

This testcase shows how easy it is to adopt NParallel in a real project. Read the piece of code given below:

C#
private Bitmap ProcessSingleThreaded(Bitmap bmp, PixelOP pixelOP)
{
...
unsafe
{
byte * ptrBmpData = (byte *)(void *)Scan0;
...
for (int y = 0; y < bmpout.Height; ++y)
{
    for (int x = 0; x < nWidthPixel; ++x)
    {
        //...Processing the pixel

    }
    ptrBmpData += nOffset;
}//end for
}//end unsafe
...

return bmpout;
}

private Bitmap ProcessMultiThreaded(Bitmap bmp, PixelOP pixelOP)
{
...
unsafe
{
...
NParallel.For(0, bmpout.Height - 1, (index) =>
{
    byte* ptrBmpData = (byte*)(void*)Scan0 + index * stride;
    for (int x = 0; x < nWidthPixel; ++x)
    {
        //... Processing the Pixel
    }
}
);
}//end for

...

return bmpout;
}

In the Code

Structure of the project in the attached code is like this:

plain
NParallel
|== Implementation
|
|- NParallel - core lib logic
| > --Functional some FP wrappers
| > --NParallel core implementation
| >> -- APM_Impl default implementation of NParallel core logic with APM
|
|- Console Testcase - Testcases with a console application
|
|- WindowsApplication1 - GUI testcase with recursive invocation to NParallel.
|
|- ImageProcessing - test app with parallel image processing.

Behind the Scenes

A new interface is defined to abstract loop parallelism policy, like the old IParallelStrategy, it can also be overridden with your own implementation.

C#
/// <summary>
/// Interface for loop parallelism
/// </summary>
public interface IParallelLoopStrategy
{
    /// <summary>
    /// Get or Set the parallel thread(logical) count the loop parallelizer will spawn.
    /// This count includes the master thread.
    /// </summary>
    int ParallelWorkerCount { get; set; }

    /// <summary>
    /// A parallel version of For with Reduction support
    /// </summary>
    /// <typeparam name="TReturnValue">Type for the result</typeparam>
    /// <param name="fromInclusive">fromInclusive index</param>
    /// <param name="toInclusive">toInclusive index</param>
    /// <param name="individualRoutine">routine that will be applied to
    /// each element in the array</param>
    /// <param name="mergingRoutine">routine that will be used for aggregate
    /// all the results from each thread</param>
    /// <param name="initialValue">default value used to initialize the result,
    /// will be used for each thread
    /// (like firstprivate in OpenMP , like for addition, it should be 0
    /// For Multiply, it should be 1...)</param>
    /// <returns>The result for the aggregation/reduction</returns>
    /// <remarks>The method will block current thread until all thread finishes,
    /// just like a waiting #pragma omp for.
    /// If you need non-blocking method, you can use it together with
    /// NParallel.Execute</remarks>
    T For<T>(int fromInclusive, int toInclusive, Func<int, T> individualRoutine,
        Func<T, T , T> mergingRoutine , T defaultValue) ;

    /// <summary>
    /// Non reduction version for Parallel For
    /// </summary>
    /// <param name="fromInclusive">fromInclusive element</param>
    /// <param name="toInclusive">toInclusive element</param>
    /// <param name="individualRoutine">routine that will be applied to
    /// each element of int from {fromInclusive , toInclusive}</param>
    void For(int fromInclusive, int toInclusive, NStateVoidFunc<int> individualRoutine);

    /// <summary>
    /// Parallel version of ForEach without reduction
    /// </summary>
    /// <typeparam name="TReturnValue">element type in the container</typeparam>
    /// <param name="enumerable">Enumerable container that will be traversed</param>
    /// <param name="individualRoutine">routine that will be applied to each element
    /// in the container</param>
    void ForEach<T>(IEnumerable<T> enumerable, NStateVoidFunc<T> individualRoutine);

    /// <summary>
    /// Parallel version of ForEach with reduction
    /// </summary>
    /// <typeparam name="TElement">element type in the container</typeparam>
    /// <typeparam name="TResult">element type that will return</typeparam>
    /// <param name="enumerable">Enumerable container that will be traversed</param>
    /// <param name="individualRoutine">routine that will be applied to each element
    /// in the container</param>
    /// <param name="mergingRoutine">routine that will be used for aggregate
    /// all the results from each thread</param>
    /// <param name="initialValue">default value used to initialize the result,
    /// will be used for each thread
    /// (like firstprivate in OpenMP , like for addition, it should be 0
    /// For Multiply, it should be 1...)</param>
    /// <returns>The result for the aggregation/reduction</returns>
    TResult ForEach<TElement, TResult>(IEnumerable<TElement> enumerable,
        Func<TElement, TResult> individualRoutine,
        Func<TResult, TResult, TResult> mergingRoutine, TResult defaultValue);
}

The default implementation is NAPMLoopStrategyImpl.

Beware

When using NParallel with closure variables or ref/out parameter in the asyncRoutine, performance drops significantly. So you have to keep the used one in the asynRoutine locally, if state needs to be passed, use Execute<TState, TReturnValue>.

ForEach is not good enough to use for now, especially for large collections (ToArray will be used internally).

History

  • 2007-12-19: Release of NParallel0.2.

Next Steps

While currently, we have similar constructs like ParallelFX, except PLinQ, the PLINQ counterpart is not planned because that will be reinventing the wheel.

Inspired by MPI and Scala, I am planning to put ACTORS/MESSAGES construct into NParallel.
Meanwhile, I would spend some time to make NParallel0.2 more stable.

Another possible feature is more choices in task scheduling, currently we only get static(as in OpenMP), maybe dynamic scheduling and task stealing will be introduced.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer (Senior)
China China
Leaf is a software developer based in ShangHai China.
My major programming languages are C#/C++.
I am very interested in distributed system design and rich client development.
Current I am working on NParallel.

Comments and Discussions

 
GeneralConsole Test Program - Test 6 Pin
Dewey15-Jan-08 13:19
Dewey15-Jan-08 13:19 
GeneralRe: Console Test Program - Test 6 Pin
leafwiz15-Jan-08 18:36
leafwiz15-Jan-08 18:36 
GeneralRe: Console Test Program - Test 6 Pin
Dewey16-Jan-08 7:34
Dewey16-Jan-08 7:34 
GeneralRe: Console Test Program - Test 6 Pin
leafwiz16-Jan-08 15:32
leafwiz16-Jan-08 15:32 
GeneralAsp.net Pin
Dewey15-Jan-08 13:11
Dewey15-Jan-08 13:11 
GeneralRe: Asp.net Pin
leafwiz15-Jan-08 18:39
leafwiz15-Jan-08 18:39 
GeneralImpossible! Pin
Dewey15-Jan-08 10:58
Dewey15-Jan-08 10:58 
GeneralGood one! Pin
Grommel28-Dec-07 3:32
Grommel28-Dec-07 3:32 
GeneralRe: Good one! Pin
leafwiz28-Dec-07 15:21
leafwiz28-Dec-07 15:21 
GeneralRe: Good one! Pin
Grommel29-Dec-07 2:40
Grommel29-Dec-07 2:40 
GeneralVery nice Pin
Daniel Vaughan27-Dec-07 2:47
Daniel Vaughan27-Dec-07 2:47 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.