Click here to Skip to main content
15,887,746 members
Articles / Programming Languages / C#

NParallel Chapter II: Loop and Task Parallelism

Rate me:
Please Sign up or sign in to vote.
4.84/5 (11 votes)
18 Dec 2007CPOL4 min read 42.6K   293   34  
NParallel0.2 now supports Loop and Task Parallelism. This article is going to show you how and what is behind the scenes.
using System;
using System.Collections.Generic;
using System.Text;
using System.Reflection;
using System.Diagnostics;

namespace Leaf.Parallel
{
    /// <summary>
    /// 
    /// </summary>
    public static class NParallel
    {
        public const int DefaultWorkerCount = 2;

        #region Variables
        static CallbackMode callbackMode= CallbackMode.Queue;
        static IParallelStrategy parallelStrategy= new NAMPStrategyImpl();
        static IParallelLoopStrategy parallelLoopStrategy= new NAPMLoopStrategyImpl();
        #endregion

        #region Properties
        /// <summary>
        /// How async calls will be made, for Execute and Execute
        /// </summary>
        public static IParallelStrategy Strategy 
        {
            get 
            {
                return parallelStrategy;
            }
            set 
            {
                Debug.Assert(value != null);
                parallelStrategy = value;
            }
        }

        /// <summary>
        /// How loops will be handled, for For and ForEach
        /// </summary>
        public static IParallelLoopStrategy LoopStrategy
        {
            get
            {
                return parallelLoopStrategy;
            }
            set
            {
                Debug.Assert(value != null);
                parallelLoopStrategy = value;
            }
        }

        /// <summary>
        /// Queue used to dispatch tasks
        /// </summary>
        public static NQueueDispatcher Queue
        {
            get
            {
                return NQueueDispatcher.Instance;
            }
        }

        /// <summary>
        /// Set a global Exception handler, will be used when an async originalRoutine does not provide an
        /// exception handler
        /// </summary>
        public static NStateVoidFunc<Exception> ExceptionRoutine 
        {
            get 
            {
                return parallelStrategy.ExceptionHandler;
            }
            set 
            {
                parallelStrategy.ExceptionHandler = value;
            }
        }

        /// <summary>
        /// Callback mode
        /// </summary>
        public static CallbackMode CallbackMode 
        {
            get 
            {
                return callbackMode;
            }
            set 
            {
                callbackMode = value;
            }
        }
        #endregion

        #region ParallelExecute of TReturnValue
        #region stateless, Execute without callerState, beware of the closure:
        /// <summary>
        /// Execute a method on asynchronously without sending callerState in.
        /// </summary>
        /// <typeparam name="TReturnValue">Return value type</typeparam>
        /// <param name="parallelRoutine">The method to be called asynchronously</param>
        /// <returns>Result handle which can be used later to get the return value or wait for completion</returns>
        /// <remarks>Whenever you want to access global callerState, you have to manage the locks yourself. 
        /// If you want to use closure variables, beware the value might be different when asyncRoutine is called and when Execute is called, so if
        /// you cant a callerState snapshot on the executing thread, use Execute instead</remarks>
        public static NResult<T> Execute<T>(Func<T> parallelRoutine)
        {
            return Execute<T>(parallelRoutine, null);
        }
        
        /// <summary>
        /// Execute a method on asynchronously without sending callerState in.
        /// </summary>
        /// <typeparam name="TReturnValue">Return value type</typeparam>
        /// <param name="parallelRoutine">The method to be called asynchronously</param>
        /// <param name="callback">The callback to be invoked when asyncRoutine ends</param>
        /// <returns>Result handle which can be used later to get the return value or wait for completion</returns>
        public static NResult<T> Execute<T>(Func<T> parallelRoutine, NStateVoidFunc<T> callback)
        {
            return Execute<T>(parallelRoutine, callback, CallbackMode);
        }
        
        /// <summary>
        /// Execute a method on asynchronously without sending callerState in.
        /// </summary>
        /// <typeparam name="TReturnValue">Return value type</typeparam>
        /// <param name="parallelRoutine">The method to be called asynchronously</param>
        /// <param name="callbackMode">callback mode, auto or manual(by default auto,you can make queued or implement your own policy to finish the originalRoutine)</param>
        /// <returns>Result handle which can be used later to get the return value or wait for completion</returns>
        public static NResult<T> Execute<T>(Func<T> parallelRoutine, CallbackMode callbackMode)
        {
            return Execute<T>(parallelRoutine, null, callbackMode);
        }

        /// <summary>
        /// Execute a method on asynchronously without sending callerState in.
        /// </summary>
        /// <typeparam name="TReturnValue">Return value type</typeparam>
        /// <param name="parallelRoutine">The method to be called asynchronously</param>
        /// <param name="callback">The callback to be invoked when asyncRoutine ends</param>
        /// <param name="callbackMode">callback mode, auto or manual(by default auto,you can make queued or implement your own policy to finish the originalRoutine)</param>
        /// <returns>Result handle which can be used later to get the return value or wait for completion</returns>
        public static NResult<T> Execute<T>(Func<T> parallelRoutine, NStateVoidFunc<T> callback, CallbackMode callbackMode)
        {
            return Execute<T>(parallelRoutine, callback, null, callbackMode);
        }

        /// <summary>
        /// Execute a method on asynchronously without sending callerState in.
        /// </summary>
        /// <typeparam name="TReturnValue">Return value type</typeparam>
        /// <param name="parallelRoutine">The method to be called asynchronously</param>
        /// <param name="callback">The callback to be invoked when asyncRoutine ends</param>
        /// <param name="globalExceptionRoutine">routine used to handle exceptions</param>
        /// <param name="callbackMode">callback mode, auto or manual(by default auto,you can make queued or implement your own policy to finish the originalRoutine)</param>
        /// <returns>Result handle which can be used later to get the return value or wait for completion</returns>
        public static NResult<T> Execute<T>(Func<T> parallelRoutine, NStateVoidFunc<T> callback, NStateVoidFunc<Exception> exceptionRoutine, CallbackMode callbackMode)
        {
            return parallelStrategy.Execute(null, NCurrier.Curry<T, object>(parallelRoutine), callback, exceptionRoutine, callbackMode);
        }
        #endregion
        
        #region With callerState , Execute
        /// <summary>
        /// Execute a method on asynchronously with callerState.
        /// </summary>
        /// <typeparam name="TReturnValue">Return value type</typeparam>
        /// <typeparam name="TState">CallerState value type</typeparam>
        /// <param name="callerState">CallerState variable to send to the execution thread</param>
        /// <param name="parallelRoutine">The method to be called asynchronously</param>
        /// <returns>Result handle which can be used later to get the return value or wait for completion</returns>
        public static NResult<TResult> Execute<TState, TResult>(TState state, Func<TState, TResult> parallelRoutine)
        {
            return Execute<TState, TResult>(state, parallelRoutine, null);
        }

        /// <summary>
        /// Execute a method on asynchronously with callerState.
        /// </summary>
        /// <typeparam name="TReturnValue">Return value type</typeparam>
        /// <typeparam name="TState">CallerState value type</typeparam>
        /// <param name="callerState">CallerState variable to send to the execution thread</param>
        /// <param name="parallelRoutine">The method to be called asynchronously</param>
        /// <param name="callback">The callback to be invoked when asyncRoutine ends</param>
        /// <returns>Result handle which can be used later to get the return value or wait for completion</returns>
        public static NResult<TResult> Execute<TState, TResult>(TState state, Func<TState, TResult> parallelRoutine, NStateVoidFunc<TResult> callback)
        {
            return Execute<TState, TResult>(state, parallelRoutine, callback, CallbackMode);
        }

        /// <summary>
        /// Execute a method on asynchronously with callerState.
        /// </summary>
        /// <typeparam name="TReturnValue">Return value type</typeparam>
        /// <typeparam name="TState">CallerState value type</typeparam>
        /// <param name="callerState">CallerState variable to send to the execution thread</param>
        /// <param name="parallelRoutine">The method to be called asynchronously</param>
        /// <param name="callbackMode">callback mode, auto or manual(by default auto,you can make queued or implement your own policy to finish the originalRoutine)</param>
        /// <returns>Result handle which can be used later to get the return value or wait for completion</returns>
        public static NResult<TResult> Execute<TState, TResult>(TState state, Func<TState, TResult> parallelRoutine, CallbackMode callbackMode)
        {
            return Execute<TState, TResult>(state, parallelRoutine, null, callbackMode);
        }
        
        /// <summary>
        /// Execute a method on asynchronously with callerState.
        /// </summary>
        /// <typeparam name="TReturnValue">Return value type</typeparam>
        /// <typeparam name="TState">CallerState value type</typeparam>
        /// <param name="callerState">CallerState variable to send to the execution thread</param>
        /// <param name="parallelRoutine">The method to be called asynchronously</param>
        /// <param name="callback">The callback to be invoked when asyncRoutine ends</param>
        /// <param name="callbackMode">callback mode, auto or manual(by default auto,you can make queued or implement your own policy to finish the originalRoutine)</param>
        /// <returns>Result handle which can be used later to get the return value or wait for completion</returns>
        public static NResult<TResult> Execute<TState, TResult>(TState state, Func<TState, TResult> parallelRoutine, NStateVoidFunc<TResult> callback, CallbackMode callbackMode)
        {
            return Execute<TState, TResult>(state, parallelRoutine, callback, null, callbackMode);
        }

        /// <summary>
        /// Execute a method on asynchronously with callerState.
        /// </summary>
        /// <typeparam name="TReturnValue">Return value type</typeparam>
        /// <typeparam name="TState">CallerState value type</typeparam>
        /// <param name="callerState">CallerState variable to send to the execution thread</param>
        /// <param name="parallelRoutine">The method to be called asynchronously</param>
        /// <param name="callback">The callback to be invoked when asyncRoutine ends</param>
        /// <param name="globalExceptionRoutine">routine used to handle exceptions</param>
        /// <param name="callbackMode">callback mode, auto or manual(by default auto,you can make queued or implement your own policy to finish the originalRoutine)</param>
        /// <returns>Result handle which can be used later to get the return value or wait for completion</returns>
        public static NResult<TResult> Execute<TState, TResult>(TState state, Func<TState, TResult> parallelRoutine, NStateVoidFunc<TResult> callback, NStateVoidFunc<Exception> exceptionRoutine, CallbackMode callbackMode)
        {
            return parallelStrategy.Execute<TResult, TState>(state, parallelRoutine, callback, exceptionRoutine, callbackMode);
        }
        #endregion
        #endregion

        #region ParallelExecute of void
        #region stateless, Execute(obsolete)
        /// <summary>
        /// Execute a method on asynchronously without sending callerState in. The method has no return value.
        /// </summary>
        /// <param name="parallelRoutine">The method to be called asynchronously</param>
        /// <returns>Result handle which can be used later to wait for completion</returns>
        /// <remarks>Whenever you want to access global callerState, you have to manage the locks yourself. 
        /// If you want to use closure variables, beware the value might be different when asyncRoutine is called and when Execute is called, so if
        /// you cant a callerState snapshot on the executing thread, use Execute instead</remarks>
        public static NResult Execute(NVoidFunc parallelRoutine)
        {
            return Execute(parallelRoutine, (NVoidFunc)null);
        }

        /// <summary>
        /// Execute a method on asynchronously without sending callerState in. The method has no return value.
        /// </summary>
        /// <param name="parallelRoutine">The method to be called asynchronously</param>
        /// <param name="callback">The callback to be invoked when asyncRoutine ends</param>
        /// <returns>Result handle which can be used later to wait for completion</returns>
        public static NResult Execute(NVoidFunc parallelRoutine, NVoidFunc callback)
        {
            return Execute(parallelRoutine, callback, CallbackMode);
        }

        /// <summary>
        /// Execute a method on asynchronously without sending callerState in. The method has no return value.
        /// </summary>
        /// <param name="parallelRoutine">The method to be called asynchronously</param>
        /// <param name="callbackMode">callback mode, auto or manual(by default auto,you can make queued or implement your own policy to finish the originalRoutine)</param>
        /// <returns>Result handle which can be used later to wait for completion</returns>
        public static NResult Execute(NVoidFunc parallelRoutine, CallbackMode callbackMode)
        {
            return Execute(parallelRoutine, (NVoidFunc)null, callbackMode);
        }

        /// <summary>
        /// Execute a method on asynchronously without sending callerState in. The method has no return value.
        /// </summary>
        /// <param name="parallelRoutine">The method to be called asynchronously</param>
        /// <param name="callback">The callback to be invoked when asyncRoutine ends</param>
        /// <param name="callbackMode">callback mode, auto or manual(by default auto,you can make queued or implement your own policy to finish the originalRoutine)</param>
        /// <returns>Result handle which can be used later to wait for completion</returns>
        public static NResult Execute(NVoidFunc parallelRoutine, NVoidFunc callback, CallbackMode callbackMode)
        {
            return Execute(parallelRoutine, callback, null, callbackMode);
        }

        /// <summary>
        /// Execute a method on asynchronously without sending callerState in. The method has no return value.
        /// </summary>
        /// <param name="parallelRoutine">The method to be called asynchronously</param>
        /// <param name="callback">The callback to be invoked when asyncRoutine ends</param>
        /// <param name="globalExceptionRoutine">routine used to handle exceptions</param>
        /// <param name="callbackMode">callback mode, auto or manual(by default auto,you can make queued or implement your own policy to finish the originalRoutine)</param>
        /// <returns>Result handle which can be used later to wait for completion</returns>
        public static NResult Execute(NVoidFunc parallelRoutine, NVoidFunc callback, NStateVoidFunc<Exception> exceptionRoutine, CallbackMode callbackMode)
        {
            return parallelStrategy.Execute(null, NCurrier.Curry<Object>(parallelRoutine), callback, exceptionRoutine, callbackMode);
        }
        #endregion
        #region with callerState, Execute
        /// <summary>
        /// Execute a method on asynchronously with callerState. The method has no return value.
        /// </summary>
        /// <param name="callerState">The callerState variable that will be sent to the execution thread</param>
        /// <param name="parallelRoutine">The method to be called asynchronously</param>
        /// <returns>Result handle which can be used later to wait for completion</returns>
        public static NResult Execute<T>(T state, NStateVoidFunc<T> parallelRoutine)
        {
            return Execute(state, parallelRoutine, null);
        }

        /// <summary>
        /// Execute a method on asynchronously with callerState. The method has no return value.
        /// </summary>
        /// <param name="callerState">The callerState variable that will be sent to the execution thread</param>
        /// <param name="parallelRoutine">The method to be called asynchronously</param>
        /// <param name="callback">The callback to be invoked when asyncRoutine ends</param>
        /// <returns>Result handle which can be used later to wait for completion</returns>
        public static NResult Execute<T>(T state, NStateVoidFunc<T> parallelRoutine, NVoidFunc callback)
        {
            return Execute(state, parallelRoutine, callback, CallbackMode);
        }

        /// <summary>
        /// Execute a method on asynchronously with callerState. The method has no return value.
        /// </summary>
        /// <param name="callerState">The callerState variable that will be sent to the execution thread</param>
        /// <param name="parallelRoutine">The method to be called asynchronously</param>
        /// <param name="callbackMode">callback mode, auto or manual(by default auto,you can make queued or implement your own policy to finish the originalRoutine)</param>
        /// <returns>Result handle which can be used later to wait for completion</returns>
        public static NResult Execute<T>(T state, NStateVoidFunc<T> parallelRoutine, CallbackMode callbackMode)
        {
            return Execute(state, parallelRoutine, null, callbackMode);
        }

        /// <summary>
        /// Execute a method on asynchronously without sending callerState in. The method has no return value.
        /// </summary>
        /// <param name="callerState">The callerState variable that will be sent to the execution thread</param>
        /// <param name="parallelRoutine">The method to be called asynchronously</param>
        /// <param name="callback">The callback to be invoked when asyncRoutine ends</param>
        /// <param name="callbackMode">callback mode, auto or manual(by default auto,you can make queued or implement your own policy to finish the originalRoutine)</param>
        /// <returns>Result handle which can be used later to wait for completion</returns>        
        public static NResult Execute<T>(T state, NStateVoidFunc<T> parallelRoutine, NVoidFunc callback, CallbackMode callbackMode)
        {
            return Execute(state, parallelRoutine, callback, null, callbackMode);
        }
        
        /// <summary>
        /// Execute a method on asynchronously without sending callerState in. The method has no return value.
        /// </summary>
        /// <param name="callerState">The callerState variable that will be sent to the execution thread</param>
        /// <param name="parallelRoutine">The method to be called asynchronously</param>
        /// <param name="callback">The callback to be invoked when asyncRoutine ends</param>
        /// <param name="globalExceptionRoutine">routine used to handle exceptions</param>
        /// <param name="callbackMode">callback mode, auto or manual(by default auto,you can make queued or implement your own policy to finish the originalRoutine)</param>
        /// <returns>Result handle which can be used later to wait for completion</returns>
        public static NResult Execute<T>(T state, NStateVoidFunc<T> parallelRoutine, NVoidFunc callback, NStateVoidFunc<Exception> exceptionRoutine, CallbackMode callbackMode)
        {
            return parallelStrategy.Execute(state, parallelRoutine, callback, exceptionRoutine, callbackMode);
        }
        #endregion
        #endregion

        #region Loop Parallelism
        
        /// <summary>
        /// Parallel For loop with result merging
        /// </summary>
        /// <typeparam name="TReturnValue">Return value type for individualRoutine</typeparam>
        /// <param name="fromInclusive">the first number of the loop</param>
        /// <param name="toInclusive">the last number of the loop[Include itself]</param>
        /// <param name="individualRoutine">routine that will be applied to each element between {fromInclusive , toInclusive}</param>
        /// <param name="mergingRoutine">routine that will be used to merge results[reduction]</param>
        /// <returns>the merge result</returns>
        /// <remarks>will use default(TReturnValue) as the initial value for merge result</remarks>
        public static T For<T>(int fromInclusive, int toInclusive, Func<int, T> individualRoutine, Func<T, T, T> mergingRoutine)
        {

            return For<T>(fromInclusive, toInclusive, individualRoutine, mergingRoutine, default(T));
        }

        /// <summary>
        /// Parallel For loop with result merging
        /// </summary>
        /// <typeparam name="TReturnValue">Return value type for individualRoutine</typeparam>
        /// <param name="fromInclusive">the first number of the loop</param>
        /// <param name="toInclusive">the last number of the loop[Include itself]</param>
        /// <param name="individualRoutine">routine that will be applied to each element between {fromInclusive , toInclusive}</param>
        /// <param name="mergingRoutine">routine that will be used to merge results[reduction]</param>
        /// <param name="initialValue">InitialValue for the merge result, should be 1 for multiply and 0 for addition/substraction</param>
        /// <returns>the merge result</returns>
        public static T For<T>(int fromInclusive, int toInclusive, Func<int, T> individualRoutine, Func<T, T, T> mergingRoutine, T initialValue) 
        {
            // Make sure that fromInclusive is lessequal than toInclusive.
            if (fromInclusive > toInclusive)
            {
                int temp = toInclusive;
                toInclusive = fromInclusive;
                fromInclusive = temp;
            }

            return parallelLoopStrategy.For<T>(fromInclusive, toInclusive, individualRoutine, mergingRoutine, initialValue);
        }

        /// <summary>
        /// Parallel For loop without result
        /// </summary>
        /// <param name="fromInclusive">the first number of the loop</param>
        /// <param name="toInclusive">the last number of the loop[Include itself]</param>
        /// <param name="individualRoutine">routine that will be applied to each element between {fromInclusive , toInclusive}</param>
        public static void For(int fromInclusive, int toInclusive, NStateVoidFunc<int> individualRoutine)
        {
            parallelLoopStrategy.For(fromInclusive , toInclusive , individualRoutine);
        }

        /// <summary>
        /// Parallel ForEach Loop without result
        /// </summary>
        /// <typeparam name="TReturnValue">Element Type</typeparam>
        /// <param name="enumerable">The container of the target collection</param>
        /// <param name="individualRoutine">routine that will be applied to each element in enumerable</param>
        public static void ForEach<T>(IEnumerable<T> enumerable, NStateVoidFunc<T> individualRoutine) 
        {
            parallelLoopStrategy.ForEach<T>(enumerable, individualRoutine);
        }

        /// <summary>
        /// Parallel ForEach Loop with result
        /// </summary>
        /// <typeparam name="TElement">Element Type</typeparam>
        /// <typeparam name="TResult">Result Type</typeparam>
        /// <param name="enumerable">The container of the target collection</param>
        /// <param name="individualRoutine">routine that will be applied to each element in enumerable</param>
        /// <param name="mergingRoutine">routine that will be used to merge result from each thread</param>
        /// <param name="initialValue">Initial value of the merge result(will be copied to each thread)</param>
        /// <returns></returns>
        public static TResult ForEach<TElement, TResult>(IEnumerable<TElement> enumerable, Func<TElement, TResult> callerRoutine, Func<TResult, TResult, TResult> mergingRoutine, TResult initialValue) 
        {
            return parallelLoopStrategy.ForEach<TElement , TResult>(enumerable, callerRoutine , mergingRoutine , initialValue);
        }

        /// <summary>
        /// Do a list of operations asynchronously, non has return values/callerState input
        /// </summary>
        /// <param name="voidRoutines"></param>
        public static void Do(params NVoidFunc[] voidRoutines)
        {
            NResult[] results = new NResult[voidRoutines.Length - 1];
            for (int i = 0; i < voidRoutines.Length - 1; ++i)
            {
                results[i] = Execute(voidRoutines[i]);
            }
            voidRoutines[voidRoutines.Length - 1]();

            foreach (var item in results)
            {
                item.Wait();
            }
        }

        /// <summary>
        /// Do a list of operations asynchronously, with callerState and return value.
        /// Notice: the master thread will not be assigned any task
        /// </summary>
        /// <typeparam name="TState">CallerState Type</typeparam>
        /// <typeparam name="TReturnValue">Return Value Type</typeparam>
        /// <param name="waitForCompletion">if True, the thread will be blocked waiting for completion of all tasks</param>
        /// <param name="tasks">Tasks to be executed</param>
        public static void Do<TState, TReturnValue>(bool waitForCompletion ,params NTask<TState, TReturnValue>[] tasks)
        {
            for (int i = 0; i < tasks.Length - 1; ++i)
            {
                tasks[i].Result = Execute<TState, TReturnValue>(tasks[i].State, tasks[i].Routine);
            }

            if (waitForCompletion)
            {
                foreach (var item in tasks)
                {
                    item.Result.Wait();
                }
            }
        }


        #endregion
    }
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer (Senior)
China China
Leaf is a software developer based in ShangHai China.
My major programming languages are C#/C++.
I am very interested in distributed system design and rich client development.
Current I am working on NParallel.

Comments and Discussions