using System;
using System.Collections.Generic;
using System.Text;
using System.Reflection;
using System.Diagnostics;
namespace Leaf.Parallel
{
/// <summary>
///
/// </summary>
public static class NParallel
{
public const int DefaultWorkerCount = 2;
#region Variables
static CallbackMode callbackMode= CallbackMode.Queue;
static IParallelStrategy parallelStrategy= new NAMPStrategyImpl();
static IParallelLoopStrategy parallelLoopStrategy= new NAPMLoopStrategyImpl();
#endregion
#region Properties
/// <summary>
/// How async calls will be made, for Execute and Execute
/// </summary>
public static IParallelStrategy Strategy
{
get
{
return parallelStrategy;
}
set
{
Debug.Assert(value != null);
parallelStrategy = value;
}
}
/// <summary>
/// How loops will be handled, for For and ForEach
/// </summary>
public static IParallelLoopStrategy LoopStrategy
{
get
{
return parallelLoopStrategy;
}
set
{
Debug.Assert(value != null);
parallelLoopStrategy = value;
}
}
/// <summary>
/// Queue used to dispatch tasks
/// </summary>
public static NQueueDispatcher Queue
{
get
{
return NQueueDispatcher.Instance;
}
}
/// <summary>
/// Set a global Exception handler, will be used when an async originalRoutine does not provide an
/// exception handler
/// </summary>
public static NStateVoidFunc<Exception> ExceptionRoutine
{
get
{
return parallelStrategy.ExceptionHandler;
}
set
{
parallelStrategy.ExceptionHandler = value;
}
}
/// <summary>
/// Callback mode
/// </summary>
public static CallbackMode CallbackMode
{
get
{
return callbackMode;
}
set
{
callbackMode = value;
}
}
#endregion
#region ParallelExecute of TReturnValue
#region stateless, Execute without callerState, beware of the closure:
/// <summary>
/// Execute a method on asynchronously without sending callerState in.
/// </summary>
/// <typeparam name="TReturnValue">Return value type</typeparam>
/// <param name="parallelRoutine">The method to be called asynchronously</param>
/// <returns>Result handle which can be used later to get the return value or wait for completion</returns>
/// <remarks>Whenever you want to access global callerState, you have to manage the locks yourself.
/// If you want to use closure variables, beware the value might be different when asyncRoutine is called and when Execute is called, so if
/// you cant a callerState snapshot on the executing thread, use Execute instead</remarks>
public static NResult<T> Execute<T>(Func<T> parallelRoutine)
{
return Execute<T>(parallelRoutine, null);
}
/// <summary>
/// Execute a method on asynchronously without sending callerState in.
/// </summary>
/// <typeparam name="TReturnValue">Return value type</typeparam>
/// <param name="parallelRoutine">The method to be called asynchronously</param>
/// <param name="callback">The callback to be invoked when asyncRoutine ends</param>
/// <returns>Result handle which can be used later to get the return value or wait for completion</returns>
public static NResult<T> Execute<T>(Func<T> parallelRoutine, NStateVoidFunc<T> callback)
{
return Execute<T>(parallelRoutine, callback, CallbackMode);
}
/// <summary>
/// Execute a method on asynchronously without sending callerState in.
/// </summary>
/// <typeparam name="TReturnValue">Return value type</typeparam>
/// <param name="parallelRoutine">The method to be called asynchronously</param>
/// <param name="callbackMode">callback mode, auto or manual(by default auto,you can make queued or implement your own policy to finish the originalRoutine)</param>
/// <returns>Result handle which can be used later to get the return value or wait for completion</returns>
public static NResult<T> Execute<T>(Func<T> parallelRoutine, CallbackMode callbackMode)
{
return Execute<T>(parallelRoutine, null, callbackMode);
}
/// <summary>
/// Execute a method on asynchronously without sending callerState in.
/// </summary>
/// <typeparam name="TReturnValue">Return value type</typeparam>
/// <param name="parallelRoutine">The method to be called asynchronously</param>
/// <param name="callback">The callback to be invoked when asyncRoutine ends</param>
/// <param name="callbackMode">callback mode, auto or manual(by default auto,you can make queued or implement your own policy to finish the originalRoutine)</param>
/// <returns>Result handle which can be used later to get the return value or wait for completion</returns>
public static NResult<T> Execute<T>(Func<T> parallelRoutine, NStateVoidFunc<T> callback, CallbackMode callbackMode)
{
return Execute<T>(parallelRoutine, callback, null, callbackMode);
}
/// <summary>
/// Execute a method on asynchronously without sending callerState in.
/// </summary>
/// <typeparam name="TReturnValue">Return value type</typeparam>
/// <param name="parallelRoutine">The method to be called asynchronously</param>
/// <param name="callback">The callback to be invoked when asyncRoutine ends</param>
/// <param name="globalExceptionRoutine">routine used to handle exceptions</param>
/// <param name="callbackMode">callback mode, auto or manual(by default auto,you can make queued or implement your own policy to finish the originalRoutine)</param>
/// <returns>Result handle which can be used later to get the return value or wait for completion</returns>
public static NResult<T> Execute<T>(Func<T> parallelRoutine, NStateVoidFunc<T> callback, NStateVoidFunc<Exception> exceptionRoutine, CallbackMode callbackMode)
{
return parallelStrategy.Execute(null, NCurrier.Curry<T, object>(parallelRoutine), callback, exceptionRoutine, callbackMode);
}
#endregion
#region With callerState , Execute
/// <summary>
/// Execute a method on asynchronously with callerState.
/// </summary>
/// <typeparam name="TReturnValue">Return value type</typeparam>
/// <typeparam name="TState">CallerState value type</typeparam>
/// <param name="callerState">CallerState variable to send to the execution thread</param>
/// <param name="parallelRoutine">The method to be called asynchronously</param>
/// <returns>Result handle which can be used later to get the return value or wait for completion</returns>
public static NResult<TResult> Execute<TState, TResult>(TState state, Func<TState, TResult> parallelRoutine)
{
return Execute<TState, TResult>(state, parallelRoutine, null);
}
/// <summary>
/// Execute a method on asynchronously with callerState.
/// </summary>
/// <typeparam name="TReturnValue">Return value type</typeparam>
/// <typeparam name="TState">CallerState value type</typeparam>
/// <param name="callerState">CallerState variable to send to the execution thread</param>
/// <param name="parallelRoutine">The method to be called asynchronously</param>
/// <param name="callback">The callback to be invoked when asyncRoutine ends</param>
/// <returns>Result handle which can be used later to get the return value or wait for completion</returns>
public static NResult<TResult> Execute<TState, TResult>(TState state, Func<TState, TResult> parallelRoutine, NStateVoidFunc<TResult> callback)
{
return Execute<TState, TResult>(state, parallelRoutine, callback, CallbackMode);
}
/// <summary>
/// Execute a method on asynchronously with callerState.
/// </summary>
/// <typeparam name="TReturnValue">Return value type</typeparam>
/// <typeparam name="TState">CallerState value type</typeparam>
/// <param name="callerState">CallerState variable to send to the execution thread</param>
/// <param name="parallelRoutine">The method to be called asynchronously</param>
/// <param name="callbackMode">callback mode, auto or manual(by default auto,you can make queued or implement your own policy to finish the originalRoutine)</param>
/// <returns>Result handle which can be used later to get the return value or wait for completion</returns>
public static NResult<TResult> Execute<TState, TResult>(TState state, Func<TState, TResult> parallelRoutine, CallbackMode callbackMode)
{
return Execute<TState, TResult>(state, parallelRoutine, null, callbackMode);
}
/// <summary>
/// Execute a method on asynchronously with callerState.
/// </summary>
/// <typeparam name="TReturnValue">Return value type</typeparam>
/// <typeparam name="TState">CallerState value type</typeparam>
/// <param name="callerState">CallerState variable to send to the execution thread</param>
/// <param name="parallelRoutine">The method to be called asynchronously</param>
/// <param name="callback">The callback to be invoked when asyncRoutine ends</param>
/// <param name="callbackMode">callback mode, auto or manual(by default auto,you can make queued or implement your own policy to finish the originalRoutine)</param>
/// <returns>Result handle which can be used later to get the return value or wait for completion</returns>
public static NResult<TResult> Execute<TState, TResult>(TState state, Func<TState, TResult> parallelRoutine, NStateVoidFunc<TResult> callback, CallbackMode callbackMode)
{
return Execute<TState, TResult>(state, parallelRoutine, callback, null, callbackMode);
}
/// <summary>
/// Execute a method on asynchronously with callerState.
/// </summary>
/// <typeparam name="TReturnValue">Return value type</typeparam>
/// <typeparam name="TState">CallerState value type</typeparam>
/// <param name="callerState">CallerState variable to send to the execution thread</param>
/// <param name="parallelRoutine">The method to be called asynchronously</param>
/// <param name="callback">The callback to be invoked when asyncRoutine ends</param>
/// <param name="globalExceptionRoutine">routine used to handle exceptions</param>
/// <param name="callbackMode">callback mode, auto or manual(by default auto,you can make queued or implement your own policy to finish the originalRoutine)</param>
/// <returns>Result handle which can be used later to get the return value or wait for completion</returns>
public static NResult<TResult> Execute<TState, TResult>(TState state, Func<TState, TResult> parallelRoutine, NStateVoidFunc<TResult> callback, NStateVoidFunc<Exception> exceptionRoutine, CallbackMode callbackMode)
{
return parallelStrategy.Execute<TResult, TState>(state, parallelRoutine, callback, exceptionRoutine, callbackMode);
}
#endregion
#endregion
#region ParallelExecute of void
#region stateless, Execute(obsolete)
/// <summary>
/// Execute a method on asynchronously without sending callerState in. The method has no return value.
/// </summary>
/// <param name="parallelRoutine">The method to be called asynchronously</param>
/// <returns>Result handle which can be used later to wait for completion</returns>
/// <remarks>Whenever you want to access global callerState, you have to manage the locks yourself.
/// If you want to use closure variables, beware the value might be different when asyncRoutine is called and when Execute is called, so if
/// you cant a callerState snapshot on the executing thread, use Execute instead</remarks>
public static NResult Execute(NVoidFunc parallelRoutine)
{
return Execute(parallelRoutine, (NVoidFunc)null);
}
/// <summary>
/// Execute a method on asynchronously without sending callerState in. The method has no return value.
/// </summary>
/// <param name="parallelRoutine">The method to be called asynchronously</param>
/// <param name="callback">The callback to be invoked when asyncRoutine ends</param>
/// <returns>Result handle which can be used later to wait for completion</returns>
public static NResult Execute(NVoidFunc parallelRoutine, NVoidFunc callback)
{
return Execute(parallelRoutine, callback, CallbackMode);
}
/// <summary>
/// Execute a method on asynchronously without sending callerState in. The method has no return value.
/// </summary>
/// <param name="parallelRoutine">The method to be called asynchronously</param>
/// <param name="callbackMode">callback mode, auto or manual(by default auto,you can make queued or implement your own policy to finish the originalRoutine)</param>
/// <returns>Result handle which can be used later to wait for completion</returns>
public static NResult Execute(NVoidFunc parallelRoutine, CallbackMode callbackMode)
{
return Execute(parallelRoutine, (NVoidFunc)null, callbackMode);
}
/// <summary>
/// Execute a method on asynchronously without sending callerState in. The method has no return value.
/// </summary>
/// <param name="parallelRoutine">The method to be called asynchronously</param>
/// <param name="callback">The callback to be invoked when asyncRoutine ends</param>
/// <param name="callbackMode">callback mode, auto or manual(by default auto,you can make queued or implement your own policy to finish the originalRoutine)</param>
/// <returns>Result handle which can be used later to wait for completion</returns>
public static NResult Execute(NVoidFunc parallelRoutine, NVoidFunc callback, CallbackMode callbackMode)
{
return Execute(parallelRoutine, callback, null, callbackMode);
}
/// <summary>
/// Execute a method on asynchronously without sending callerState in. The method has no return value.
/// </summary>
/// <param name="parallelRoutine">The method to be called asynchronously</param>
/// <param name="callback">The callback to be invoked when asyncRoutine ends</param>
/// <param name="globalExceptionRoutine">routine used to handle exceptions</param>
/// <param name="callbackMode">callback mode, auto or manual(by default auto,you can make queued or implement your own policy to finish the originalRoutine)</param>
/// <returns>Result handle which can be used later to wait for completion</returns>
public static NResult Execute(NVoidFunc parallelRoutine, NVoidFunc callback, NStateVoidFunc<Exception> exceptionRoutine, CallbackMode callbackMode)
{
return parallelStrategy.Execute(null, NCurrier.Curry<Object>(parallelRoutine), callback, exceptionRoutine, callbackMode);
}
#endregion
#region with callerState, Execute
/// <summary>
/// Execute a method on asynchronously with callerState. The method has no return value.
/// </summary>
/// <param name="callerState">The callerState variable that will be sent to the execution thread</param>
/// <param name="parallelRoutine">The method to be called asynchronously</param>
/// <returns>Result handle which can be used later to wait for completion</returns>
public static NResult Execute<T>(T state, NStateVoidFunc<T> parallelRoutine)
{
return Execute(state, parallelRoutine, null);
}
/// <summary>
/// Execute a method on asynchronously with callerState. The method has no return value.
/// </summary>
/// <param name="callerState">The callerState variable that will be sent to the execution thread</param>
/// <param name="parallelRoutine">The method to be called asynchronously</param>
/// <param name="callback">The callback to be invoked when asyncRoutine ends</param>
/// <returns>Result handle which can be used later to wait for completion</returns>
public static NResult Execute<T>(T state, NStateVoidFunc<T> parallelRoutine, NVoidFunc callback)
{
return Execute(state, parallelRoutine, callback, CallbackMode);
}
/// <summary>
/// Execute a method on asynchronously with callerState. The method has no return value.
/// </summary>
/// <param name="callerState">The callerState variable that will be sent to the execution thread</param>
/// <param name="parallelRoutine">The method to be called asynchronously</param>
/// <param name="callbackMode">callback mode, auto or manual(by default auto,you can make queued or implement your own policy to finish the originalRoutine)</param>
/// <returns>Result handle which can be used later to wait for completion</returns>
public static NResult Execute<T>(T state, NStateVoidFunc<T> parallelRoutine, CallbackMode callbackMode)
{
return Execute(state, parallelRoutine, null, callbackMode);
}
/// <summary>
/// Execute a method on asynchronously without sending callerState in. The method has no return value.
/// </summary>
/// <param name="callerState">The callerState variable that will be sent to the execution thread</param>
/// <param name="parallelRoutine">The method to be called asynchronously</param>
/// <param name="callback">The callback to be invoked when asyncRoutine ends</param>
/// <param name="callbackMode">callback mode, auto or manual(by default auto,you can make queued or implement your own policy to finish the originalRoutine)</param>
/// <returns>Result handle which can be used later to wait for completion</returns>
public static NResult Execute<T>(T state, NStateVoidFunc<T> parallelRoutine, NVoidFunc callback, CallbackMode callbackMode)
{
return Execute(state, parallelRoutine, callback, null, callbackMode);
}
/// <summary>
/// Execute a method on asynchronously without sending callerState in. The method has no return value.
/// </summary>
/// <param name="callerState">The callerState variable that will be sent to the execution thread</param>
/// <param name="parallelRoutine">The method to be called asynchronously</param>
/// <param name="callback">The callback to be invoked when asyncRoutine ends</param>
/// <param name="globalExceptionRoutine">routine used to handle exceptions</param>
/// <param name="callbackMode">callback mode, auto or manual(by default auto,you can make queued or implement your own policy to finish the originalRoutine)</param>
/// <returns>Result handle which can be used later to wait for completion</returns>
public static NResult Execute<T>(T state, NStateVoidFunc<T> parallelRoutine, NVoidFunc callback, NStateVoidFunc<Exception> exceptionRoutine, CallbackMode callbackMode)
{
return parallelStrategy.Execute(state, parallelRoutine, callback, exceptionRoutine, callbackMode);
}
#endregion
#endregion
#region Loop Parallelism
/// <summary>
/// Parallel For loop with result merging
/// </summary>
/// <typeparam name="TReturnValue">Return value type for individualRoutine</typeparam>
/// <param name="fromInclusive">the first number of the loop</param>
/// <param name="toInclusive">the last number of the loop[Include itself]</param>
/// <param name="individualRoutine">routine that will be applied to each element between {fromInclusive , toInclusive}</param>
/// <param name="mergingRoutine">routine that will be used to merge results[reduction]</param>
/// <returns>the merge result</returns>
/// <remarks>will use default(TReturnValue) as the initial value for merge result</remarks>
public static T For<T>(int fromInclusive, int toInclusive, Func<int, T> individualRoutine, Func<T, T, T> mergingRoutine)
{
return For<T>(fromInclusive, toInclusive, individualRoutine, mergingRoutine, default(T));
}
/// <summary>
/// Parallel For loop with result merging
/// </summary>
/// <typeparam name="TReturnValue">Return value type for individualRoutine</typeparam>
/// <param name="fromInclusive">the first number of the loop</param>
/// <param name="toInclusive">the last number of the loop[Include itself]</param>
/// <param name="individualRoutine">routine that will be applied to each element between {fromInclusive , toInclusive}</param>
/// <param name="mergingRoutine">routine that will be used to merge results[reduction]</param>
/// <param name="initialValue">InitialValue for the merge result, should be 1 for multiply and 0 for addition/substraction</param>
/// <returns>the merge result</returns>
public static T For<T>(int fromInclusive, int toInclusive, Func<int, T> individualRoutine, Func<T, T, T> mergingRoutine, T initialValue)
{
// Make sure that fromInclusive is lessequal than toInclusive.
if (fromInclusive > toInclusive)
{
int temp = toInclusive;
toInclusive = fromInclusive;
fromInclusive = temp;
}
return parallelLoopStrategy.For<T>(fromInclusive, toInclusive, individualRoutine, mergingRoutine, initialValue);
}
/// <summary>
/// Parallel For loop without result
/// </summary>
/// <param name="fromInclusive">the first number of the loop</param>
/// <param name="toInclusive">the last number of the loop[Include itself]</param>
/// <param name="individualRoutine">routine that will be applied to each element between {fromInclusive , toInclusive}</param>
public static void For(int fromInclusive, int toInclusive, NStateVoidFunc<int> individualRoutine)
{
parallelLoopStrategy.For(fromInclusive , toInclusive , individualRoutine);
}
/// <summary>
/// Parallel ForEach Loop without result
/// </summary>
/// <typeparam name="TReturnValue">Element Type</typeparam>
/// <param name="enumerable">The container of the target collection</param>
/// <param name="individualRoutine">routine that will be applied to each element in enumerable</param>
public static void ForEach<T>(IEnumerable<T> enumerable, NStateVoidFunc<T> individualRoutine)
{
parallelLoopStrategy.ForEach<T>(enumerable, individualRoutine);
}
/// <summary>
/// Parallel ForEach Loop with result
/// </summary>
/// <typeparam name="TElement">Element Type</typeparam>
/// <typeparam name="TResult">Result Type</typeparam>
/// <param name="enumerable">The container of the target collection</param>
/// <param name="individualRoutine">routine that will be applied to each element in enumerable</param>
/// <param name="mergingRoutine">routine that will be used to merge result from each thread</param>
/// <param name="initialValue">Initial value of the merge result(will be copied to each thread)</param>
/// <returns></returns>
public static TResult ForEach<TElement, TResult>(IEnumerable<TElement> enumerable, Func<TElement, TResult> callerRoutine, Func<TResult, TResult, TResult> mergingRoutine, TResult initialValue)
{
return parallelLoopStrategy.ForEach<TElement , TResult>(enumerable, callerRoutine , mergingRoutine , initialValue);
}
/// <summary>
/// Do a list of operations asynchronously, non has return values/callerState input
/// </summary>
/// <param name="voidRoutines"></param>
public static void Do(params NVoidFunc[] voidRoutines)
{
NResult[] results = new NResult[voidRoutines.Length - 1];
for (int i = 0; i < voidRoutines.Length - 1; ++i)
{
results[i] = Execute(voidRoutines[i]);
}
voidRoutines[voidRoutines.Length - 1]();
foreach (var item in results)
{
item.Wait();
}
}
/// <summary>
/// Do a list of operations asynchronously, with callerState and return value.
/// Notice: the master thread will not be assigned any task
/// </summary>
/// <typeparam name="TState">CallerState Type</typeparam>
/// <typeparam name="TReturnValue">Return Value Type</typeparam>
/// <param name="waitForCompletion">if True, the thread will be blocked waiting for completion of all tasks</param>
/// <param name="tasks">Tasks to be executed</param>
public static void Do<TState, TReturnValue>(bool waitForCompletion ,params NTask<TState, TReturnValue>[] tasks)
{
for (int i = 0; i < tasks.Length - 1; ++i)
{
tasks[i].Result = Execute<TState, TReturnValue>(tasks[i].State, tasks[i].Routine);
}
if (waitForCompletion)
{
foreach (var item in tasks)
{
item.Result.Wait();
}
}
}
#endregion
}
}