Click here to Skip to main content
Click here to Skip to main content

Tagged as

NParallel Chapter II: Loop and Task Parallelism

, 18 Dec 2007 CPOL
Rate this:
Please Sign up or sign in to vote.
NParallel0.2 now supports Loop and Task Parallelism. This article is going to show you how and what is behind the scenes.

Introduction

Before reading this article, you might like to read my first article about NParallel, which introduces all the basic concept for NParallel. At first, NParallel was designed as a simple library with asynchronous programming support (v.01 and 0.11). In this article, I will introduce some advanced parallelism introduced in NParallel0.2. The purpose of this article not only includes providing information about NParallel, another way of parallel programming, but more importantly, aims to gather ideas and feedback from readers, so a better library can be designed and implemented.

The goal of this library is to "Make concurrency accessible for 'ordinary programmers'" and "Try out ideas of how Concurrency can be tackled". So the implementation focuses more on the interface design and user accessibility. It will not put efficiency as the most important factor. But you can see from the test case that it does have a significant performance boost on a Core-Duo machine.

The interface is somewhat similar to ParallelFX and OpenMP, and I was inspired by some ideas from them.

Parallel Loops, How to Use Them

In OpenMP, you can write code like this:

// OpenMP Loop Parallelism
#pragma omp for
for (int i = 0 ; i < PROBLEM_SIZE; ++ i)
{
    // did time consuming work
    Function(i);
}

The compiler will generate the multi-threading code for you. In C# with NParallel, you can write similar code like this:

// NParallel loop parallel
NParallel.For (0, PROBLEM_SIZE -1, i=> {Function(i);});

Beware that the second parameter in NParallel.For is toInclusive, which means the value will be used in the for evaluation. It might conflict with the habit of array traversal, but it looks better when I write a function to get the sum of an array from 1 to 100. This looks like the following:

// NParallel loop parallel
int result = NParallel.For (0, PROBLEM_SIZE -1,
        i=> {return i;} ,
        (priorSum, nextElem)=>(return priorSum + nextElem));

It's easy to write a result merger in NParallel (mapped to reduction in OpenMP). You can also set the initial value for the merge result by setting the fifth method of NParallel.For. Both versions of For shown above will block until all work in the array is done. If you want to implement nowait behavior in OpenMP, you have to use NParallel.Execute on the For invocation.

Similar logic goes for ForEach, which changes the first two parameters to an IEnumerable container.

NParallel.ForEach<int>(Enumerable.Range(1, testSize), i =>
    {
        int result = Function(i);
                Console.WriteLine("{0}", result);
    }
);

ForEach also supports result merging, but the current implementation internally uses For and needs to convert the IEnumerable to Array first so the performance is not very good.

Execute a Task

NParallel0.2 introduces the concept of a task. It is defined as shown below:

public class NTask<TState, TReturnValue>
{
    public Func<TState, TReturnValue> Routine {get;set;}
    public TState State {get;set;}
    public NResult<TReturnValue> Result {get;set;}
    public void Run() ;
}

A task is a simple wrapper of a NParallel.Execute invocation, but it will be a useful construct if you want to manage the results yourself together with the calling context. It's also used in Task parallelism:

Task Parallelism

NParallel2.2 defines a new method called Do, which accepts a task array as a parameter.

public static void Do(bool waitForCompletion, params NVoidFunc[] voidRoutines);
public static void Do<TState, TReturnValue>(bool waitForCompletion,
        params NTask<TState, TReturnValue>[] tasks);

The function signatures speak for themselves, they are used to execute a list of operations in parallel, block or non-block (whether waitForCompletion) .

Testcases

Three test projects are included in the code attached.

  1. Console Testcase - Test cases with a console application contain a testcase for most of the new constructs.
  2. WindowsApplication1 - GUI test case with recursive invocation to NParallel. The same as in v0.1.1.
  3. ImageProcessing - Test app with parallel image processing.
Screenshot - showcase1219.0.2.np.PNG

This testcase shows how easy it is to adopt NParallel in a real project. Read the piece of code given below:

private Bitmap ProcessSingleThreaded(Bitmap bmp, PixelOP pixelOP)
{
...
unsafe
{
byte * ptrBmpData = (byte *)(void *)Scan0;
...
for (int y = 0; y < bmpout.Height; ++y)
{
    for (int x = 0; x < nWidthPixel; ++x)
    {
        //...Processing the pixel

    }
    ptrBmpData += nOffset;
}//end for
}//end unsafe
...

return bmpout;
}

private Bitmap ProcessMultiThreaded(Bitmap bmp, PixelOP pixelOP)
{
...
unsafe
{
...
NParallel.For(0, bmpout.Height - 1, (index) =>
{
    byte* ptrBmpData = (byte*)(void*)Scan0 + index * stride;
    for (int x = 0; x < nWidthPixel; ++x)
    {
        //... Processing the Pixel
    }
}
);
}//end for

...

return bmpout;
}

In the Code

Structure of the project in the attached code is like this:

NParallel
|== Implementation
|
|- NParallel - core lib logic
| > --Functional some FP wrappers
| > --NParallel core implementation
| >> -- APM_Impl default implementation of NParallel core logic with APM
|
|- Console Testcase - Testcases with a console application
|
|- WindowsApplication1 - GUI testcase with recursive invocation to NParallel.
|
|- ImageProcessing - test app with parallel image processing.

Behind the Scenes

A new interface is defined to abstract loop parallelism policy, like the old IParallelStrategy, it can also be overridden with your own implementation.

/// <span class="code-SummaryComment"><summary></span>
/// Interface for loop parallelism
/// <span class="code-SummaryComment"></summary></span>
public interface IParallelLoopStrategy
{
    /// <span class="code-SummaryComment"><summary></span>
    /// Get or Set the parallel thread(logical) count the loop parallelizer will spawn.
    /// This count includes the master thread.
    /// <span class="code-SummaryComment"></summary></span>
    int ParallelWorkerCount { get; set; }

    /// <span class="code-SummaryComment"><summary></span>
    /// A parallel version of For with Reduction support
    /// <span class="code-SummaryComment"></summary></span>
    /// <span class="code-SummaryComment"><typeparam name="TReturnValue">Type for the result</typeparam></span>
    /// <span class="code-SummaryComment"><param name="fromInclusive">fromInclusive index</param></span>
    /// <span class="code-SummaryComment"><param name="toInclusive">toInclusive index</param></span>
    /// <span class="code-SummaryComment"><param name="individualRoutine">routine that will be applied to</span>
    /// each element in the array<span class="code-SummaryComment"></param></span>
    /// <span class="code-SummaryComment"><param name="mergingRoutine">routine that will be used for aggregate</span>
    /// all the results from each thread<span class="code-SummaryComment"></param></span>
    /// <span class="code-SummaryComment"><param name="initialValue">default value used to initialize the result,</span>
    /// will be used for each thread
    /// (like firstprivate in OpenMP , like for addition, it should be 0
    /// For Multiply, it should be 1...)<span class="code-SummaryComment"></param></span>
    /// <span class="code-SummaryComment"><returns>The result for the aggregation/reduction</returns></span>
    /// <span class="code-SummaryComment"><remarks>The method will block current thread until all thread finishes,</span>
    /// just like a waiting #pragma omp for.
    /// If you need non-blocking method, you can use it together with
    /// NParallel.Execute<span class="code-SummaryComment"></remarks></span>
    T For<T>(int fromInclusive, int toInclusive, Func<int, T> individualRoutine,
        Func<T, T , T> mergingRoutine , T defaultValue) ;

    /// <span class="code-SummaryComment"><summary></span>
    /// Non reduction version for Parallel For
    /// <span class="code-SummaryComment"></summary></span>
    /// <span class="code-SummaryComment"><param name="fromInclusive">fromInclusive element</param></span>
    /// <span class="code-SummaryComment"><param name="toInclusive">toInclusive element</param></span>
    /// <span class="code-SummaryComment"><param name="individualRoutine">routine that will be applied to</span>
    /// each element of int from {fromInclusive , toInclusive}<span class="code-SummaryComment"></param></span>
    void For(int fromInclusive, int toInclusive, NStateVoidFunc<int> individualRoutine);

    /// <span class="code-SummaryComment"><summary></span>
    /// Parallel version of ForEach without reduction
    /// <span class="code-SummaryComment"></summary></span>
    /// <span class="code-SummaryComment"><typeparam name="TReturnValue">element type in the container</typeparam></span>
    /// <span class="code-SummaryComment"><param name="enumerable">Enumerable container that will be traversed</param></span>
    /// <span class="code-SummaryComment"><param name="individualRoutine">routine that will be applied to each element</span>
    /// in the container<span class="code-SummaryComment"></param></span>
    void ForEach<T>(IEnumerable<T> enumerable, NStateVoidFunc<T> individualRoutine);

    /// <span class="code-SummaryComment"><summary></span>
    /// Parallel version of ForEach with reduction
    /// <span class="code-SummaryComment"></summary></span>
    /// <span class="code-SummaryComment"><typeparam name="TElement">element type in the container</typeparam></span>
    /// <span class="code-SummaryComment"><typeparam name="TResult">element type that will return</typeparam></span>
    /// <span class="code-SummaryComment"><param name="enumerable">Enumerable container that will be traversed</param></span>
    /// <span class="code-SummaryComment"><param name="individualRoutine">routine that will be applied to each element</span>
    /// in the container<span class="code-SummaryComment"></param></span>
    /// <span class="code-SummaryComment"><param name="mergingRoutine">routine that will be used for aggregate</span>
    /// all the results from each thread<span class="code-SummaryComment"></param></span>
    /// <span class="code-SummaryComment"><param name="initialValue">default value used to initialize the result,</span>
    /// will be used for each thread
    /// (like firstprivate in OpenMP , like for addition, it should be 0
    /// For Multiply, it should be 1...)<span class="code-SummaryComment"></param></span>
    /// <span class="code-SummaryComment"><returns>The result for the aggregation/reduction</returns></span>
    TResult ForEach<TElement, TResult>(IEnumerable<TElement> enumerable,
        Func<TElement, TResult> individualRoutine,
        Func<TResult, TResult, TResult> mergingRoutine, TResult defaultValue);
}

The default implementation is NAPMLoopStrategyImpl.

Beware

When using NParallel with closure variables or ref/out parameter in the asyncRoutine, performance drops significantly. So you have to keep the used one in the asynRoutine locally, if state needs to be passed, use Execute<TState, TReturnValue>.

ForEach is not good enough to use for now, especially for large collections (ToArray will be used internally).

History

  • 2007-12-19: Release of NParallel0.2.

Next Steps

While currently, we have similar constructs like ParallelFX, except PLinQ, the PLINQ counterpart is not planned because that will be reinventing the wheel.

Inspired by MPI and Scala, I am planning to put ACTORS/MESSAGES construct into NParallel.
Meanwhile, I would spend some time to make NParallel0.2 more stable.

Another possible feature is more choices in task scheduling, currently we only get static(as in OpenMP), maybe dynamic scheduling and task stealing will be introduced.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

leafwiz
Software Developer (Senior)
China China
Leaf is a software developer based in ShangHai China.
My major programming languages are C#/C++.
I am very interested in distributed system design and rich client development.
Current I am working on NParallel.

Comments and Discussions

 
GeneralConsole Test Program - Test 6 PinmemberDewey15-Jan-08 13:19 
GeneralRe: Console Test Program - Test 6 Pinmemberleafwiz15-Jan-08 18:36 
GeneralRe: Console Test Program - Test 6 PinmemberDewey16-Jan-08 7:34 
GeneralRe: Console Test Program - Test 6 Pinmemberleafwiz16-Jan-08 15:32 
GeneralAsp.net PinmemberDewey15-Jan-08 13:11 
GeneralRe: Asp.net Pinmemberleafwiz15-Jan-08 18:39 
GeneralImpossible! PinmemberDewey15-Jan-08 10:58 
GeneralGood one! PinmemberGrommel28-Dec-07 3:32 
Very interesting article. It got me thinking about a couple of bottlenecks I have. How can I use NParallel on a problem where the amount of iterations is unknown beforehand? Lets say like reading lines from a StreamReader and processing those lines.
 
Example:

using (StreamReader sr = new StreamReader(pathAndFilename, Encoding.Default, false, 2048)) {
string line;
while ((line = sr.ReadLine()) != null) {
// process line here
}
}

 
I'm creating objects of every line and put them into a generic List.
 
Regards,
André
GeneralRe: Good one! Pinmemberleafwiz28-Dec-07 15:21 
GeneralRe: Good one! PinmemberGrommel29-Dec-07 2:40 
GeneralVery nice PinmemberDaniel Vaughan27-Dec-07 2:47 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Terms of Use | Mobile
Web03 | 2.8.141030.1 | Last Updated 19 Dec 2007
Article Copyright 2007 by leafwiz
Everything else Copyright © CodeProject, 1999-2014
Layout: fixed | fluid