![]() |
Languages »
C# »
General
Intermediate
License: The Code Project Open License (CPOL)
NParallel, A Small Parallel Execution LibraryBy leafwizA Simple Library which allows you to write asynchronous code easily, almost in a synchronous pattern. |
C# 2.0, C# 3.0, Windows, .NET 2.0, .NET 3.0, .NET 3.5VS2005, VS2008, Dev
|
||||||||||
|
Advanced Search Add to IE Search |
|
|
|
||||||||||||||||
This article mainly remains to introduce the main construct of NParallel, for loop parallelism , I posted a new article discuss the design and usage of loop parallelism for NParallel, you could refer to the testcases attached.
There are many ways to write multi-threading codes in .NET, like to use the asynchronous method invocation, create a thread etc. But most of these methods changes the way you write code, and introduces new concepts like IAsyncResult and callback delegates which is somewhat irrelevant with the application logic. They usually make the code hard to read and maintain, and requires application developers to know how the underlying threading works.
My previous project relies heavily on the Begin/EndInvoke model. After defined many delegates and function pairs, I am bored with the Begin/End codes and decide to make a wrapper library to hide the complexity, make the code easy to read and write, and meanwhile provide flexibility to change the underlying threading mechanism later on.
Then I came up with NParallel, which I am going to introduce to you in this article.
Let's first have a look at the old ways to do parallel code execution. Suppose that we have a query that cost long time to complete:
// Sample code 1: Time Consuming Operation
int LongRunOperation(int queryID)
{
Thread.Sleep(10000);
// Do some query
return queryID * 2;
}
// synchronous call to the method
int result = LongRunOperation(10);
We defined the method LongRunOperation and call it with given parameters. Everything looks straightforward. But the execution thread will be blocked for 10 seconds! For application which cannot afford this blocking, we have to make run parallel.
// asynchronous call to the method
void BeginLongRunOperation(int queryID)
{
Func<int, int> operation = LongRunOperation;
IAsyncResult ar = operation.BeginInvoke(10 , EndBeginLongRunOperation, null);
}
void EndLongRunOperation(IAsyncResult ar)
{
Func<int, int> operation2 = ((AsyncResult)ar).AsyncDelegate;
int result = operation2.EndInvoke(ar);
// process with the return value
}
In the code above I used the pre-defined System.Linq.Func instead of the old fashion user defined delegate . Even through, the code still look tedious and error prone because of:
operation.BeginInvoke(10 , EndBeginLongRunOperation, null);. This separates parameters from the method itself. IAsyncResult ar is introduced, which is not related to the application logic itself. When I started to define NParallel, the goal is to:
I came up with the idea like this:
// NParallel Psudo code
NResult pResult = NParallel.Execute(parellel_code_block);
if(pResult.IsDone())
{
var result = pResult.Value;
// work with the result
}
I was expecting the parallel_code_block to be a method invocation, a delegate, a lumbda expression or even a LINQ query. After trying several ways, I find anonymous delegate an ideal implementation for parallel_code_block. The actual code look like this:
// Current NParallel Gramma, simple asynchronous call with callback:
NResult<int> pResult = NParallel.Execute<int>(()=>
{return LongRunOperation(10); }, /*Asynchronous code block*/
EndBeginLongRunOperation /*Callback operation*/
);
void EndBeginLongRunOperation(int theResult)
{
// process with the return value of LongRunOperation;
}
The way NParallel works is by wrap the codes to be executed asynchronously into an anonymous delegate. Benefit from the fact that anonymous delegate can use all the local variables on current stack, I don't need to define different versions of Execute methods based on the parameter. I only defined a generic version and a non-generic version of Execute to deal with methods that have a return value or return nothing. Below are the signatures of the two methods:
// Generic method signature for the invoker.
NResult<T> Execute<T>(Func<T> asyncRoutine, NResultDel<T> callbackRoutine, NExceptionDel exceptionRoutine, CallbackMode callbackMode);
// Method for executing non-result code blocks
NResult Execute <T>(T state, NStateVoidFunc<T> parallelRoutine, NVoidFunc callback, NStateVoidFunc<Exception> exceptionRoutine,
CallbackMode callbackMode)
Both versions of Execute have many overloads, refer to the code for more information, the first parameter defines the code block to be called asynchronously, the second parameter defines a callback and the third parameter defines how exception should be handled and the last defines how the callback methods will be called. All parameters except the first one are optional. We are going to look at each parameter.
let's first have a look at the signature of the two delegates for the first parameter:
// Generic code block delegate
public delegate TResult Func<TResult>();
// Usage in NParallel
delegate() // or just use ()=>
{
T result;
//your code block here
return result;
}
// Void call code block delegate
public delegate void DelegateVoid();
// Usage in NParallel
delegate()
{
//your code block here
}
You can almost put anything into the delegate code block, if the code block changes shared variables, you are responsible to manage locking. Below are some more complex code blocks you can put into the delegate. Looks cool, eh?
// Current NParallel Gramma, calling to asynchronous code blocks:
NResult<int> pResult = NParallel.Execute<int>(delegate()
{
int op1 = PrepareParam(localVar1);
int op2 = PrepareParam(localVar2);
return LongRunOperation(op1 + op2);
} /*Asynchronous code block*/
);
// calling a linq query in NParallel
NParallel.Execute<IList<int>>
(
delegate()
{
return (from i in Enumerable.Range(10, 100)
where i % 5 == 0
select i).ToList();
}/*Asynchronous code block in LINQ*/
);
The best thing of NParallel is that it's easy to read, and looks almost the same as synchronous calls, you don't have to break anything, just mark the calls you want to be called parallel.
Important remarks:
When using delegate in the above scenario, it is easy to misuse the closure variables (local variables which will be copied to the delegate execution stack). e.g. assume there is a variable called curCount and you use it in the anonymous delegate, after call to Execute, this variable will be changed, the NParallel engine cannot tell this and might use the changed value instead. If you want to send a variable into the execute thread, you can use the state version instead.
int localVariable = 6;
NParallel.Execute<int ,IList<int>>
(
localVariable, // the state variable to send in the execution thread.
delegate(criteria) // using state variable
{
return (from i in Enumerable.Range(10, 100)
where i % criteria== 0
select i).ToList();
}
);
NParallel, different from asynchronous delegate invocation, your callback does not need to deal with IAsyncResults, you just need to process the result you are expecting for the method. The signature for the callbacks are defined as followed:
// Generic callback delegate
public delegate void TResultDel<T>(T result);
// void callback delegate
public delegate void DelegateVoid();
// Sample: Provide a callback when invoke the method:
void PrintResult(int value);
NParallel.Execute (delegate(){return temp1 + temp2;},
PrintResult);
If no callback method is provided, a default callback will be called. And the EndInvoke() is called anyway. You can just fire and forget it.
If exception raises when a asynchronous code block is executed, NParallel will catch the exception and call the exception handling Routine. You can specify the routine by this parameter. If no routine is specified, the exception will be thrown when Value is called.
public delegate void NExceptionDel(Exception exc);
You can provide a global exception handling routine by set NParallel.ExceptionRoutine.
When using the BeginInvoke/EndInvoke APM model, the callback will be automatically called when the method is done. It will be executed on the same thread of the threadpool as the method being executed on. In some circumstances, we may not want it to work this way, we may want the callbacks to be marshaled in a centralized queue or decided when to callback at runtime. The third parameter of NParallel.Execute enables you to do this. CallbackMode is an enum defined as below:
public enum CallbackMode
{
Manual,
Queue,
Auto
}
CallbackMode is set to Auto, EndInvoke and Callback methods are invoked automatically, just like what you did with Begin/EndInvoke. CallbackMode to Queue, all the asynchronous calls will be marshaled within the queue (see Queue). CallbackMode to Manual, you have to manually call NResult.Wait() to do the EndInvoke and the Callback. (see NResult) The default value for CallbackMode can be set via NParallel.DefaultMarshal and it's by default Marshal.Auto. You can mix the use of the three callback modes in the application.
NResult has two versions, NResult and NResult<T>. Below are the most important methods for them:
// For NResult <T> only
public T Value{get;};
//Block current thread, wait for the method to finish
public void Wait();
// Get the current status of the task.
public bool IsDone();
Wait blocks current thread and calls to EndInvoke and the Callback method provided to get the result of the code block.
Call to Value will result in calling Wait() .
NQueue can be used to queue all the tasks invoked with CallbackMode.Queue. It needs to be put into a regular loop of the application, you can use the application message loop or a timer, just call NParallel.Queue.Update();. The callbacks will be invoked in the thread the timer runs.
//Call code block with CallbackMode.Queue
NParallel.Execute (()=>{
return 0; // your method here
},
CallbackMode.Queue);
//Update NQueue within a timer proc
void TimerProc()
{
NParallel.Queue.Update();
}
The callback will be called in the same thread as the TimerProc.
NResult<T> result; result.Cancel();Notice that if the task is already executed, Cancel will return false. And even Cancel is called, EndInvoke will still be called for the asynchronous routine.
The library current simply wrap the delegate Begin/EndInvoke call with NDefaultStrategy/NDefaultResult. This is just one of the asynchronous models you can adopt, maybe you want to use explicit thread or your own threadpool in stead, you can change the threading policy by replacing the policy layer. You can do this using IParallelStrategy Interface:
public interface IParallelStrategy
{
/// <summary>
/// Execute a code block asynchronously, with return value
/// </summary>
/// <typeparam name="TReturnValue">Result type of the code block</typeparam>
/// <param name="callerState">CallerState variable</param>
/// <param name="asyncRoutine">The code block to execute</param>
/// <param name="callbackRoutine">The callback routine</param>
/// <param name="callbackMode"></param>
/// <param name="globalExceptionRoutine">The Exception Routine</param>
/// <returns>Holder of the result for the code block, canbe used to wait</returns>
NResult<TReturnValue> Execute<TReturnValue, TState>(TState state, Func<TState, TReturnValue> asyncRoutine, NStateVoidFunc<TReturnValue> callbackRoutine,
NStateVoidFunc<Exception> exceptionRoutine, CallbackMode callbackMode);
/// <summary>
/// Execute a code block asynchronously, without return value
/// </summary>
/// <typeparam name="TReturnValue">type of the callerState variable</typeparam>
/// <param name="callerState">CallerState variable</param>
/// <param name="asyncRoutine">The code block to execute</param>
/// <param name="callbackRoutine">The callback routine</param>
/// <param name="callbackMode"></param>
/// <param name="globalExceptionRoutine">The Exception Routine</param>
/// <returns>Holder of the result for the code block, canbe used to wait</returns>
NResult Execute<TState>(TState state, NStateVoidFunc<TState> asyncRoutine, NVoidFunc callbackRoutine, NStateVoidFunc<Exception> exceptionRoutine,
CallbackMode callbackMode);
/// <summary>
/// The overall exception handling routine,
/// will be used when no exception routine is provided for the function
/// </summary>
NStateVoidFunc<Exception> ExceptionHandler { get; set; }
}
// NResult abstract class
public abstract class NResult
{
public abstract bool IsDone();
internal abstract object CallerRoutine{get;}
public abstract void Wait(/* int milli-sec-to-wait */);
public virtual bool Cancel();
}
public abstract class NResult<T> : NResult
{
public abstract T Value { get; }
}
You have to implement the IParallelStrategy interface and then call NParallel.Strategy to Replace the default
NAMPStrategyImpl, you will also have to implement your own NResult .
I have used the codes in many small test and a application I am working on. It makes asynchronous programming quite easy and fun. I hope the code is useful for you too. You can modify the code and use it in your own project at will, just please keep the credits. If you find defects and bugs in the code, please let me know.
The package for VS2005 contains three projects, NParallel, NParallelCore and Testcase, the later two are the very first version of NParallel, which contains the early quick and dirty codes and some incomplete functions. You can have a look at it if you are interested. If you only want to have NParallel, you can ignore the other two projects
VS2005 version NParallel will no longer be supported in further versions(from 0.2)
The package for VS2008 contains only one console project, you can simply change its output type to library if you want a dll.
V0.2 contains three test projects including a GUI image processing library.
I have seen many articles on asynchronous programming and they are all great. You can just go search in MSDN.
Functional Programming For The Rest of Us
One of the articles I found good for the beginners in codeproject can be found here.
You can download ParallelFX CTP from MSDN now. Which is a parallel extension to .NET3.5.
General
News
Question
Answer
Joke
Rant
Admin
|
PermaLink |
Privacy |
Terms of Use
Last Updated: 19 Dec 2007 Editor: |
Copyright 2007 by leafwiz Everything else Copyright © CodeProject, 1999-2009 Web22 | Advertise on the Code Project |