Pipes, River, Rails and Binder Patterns - Part III






4.79/5 (9 votes)
A look at possible parallel-producer-consumer patterns. (Final Part)
<< Pipes, River, Rails and Binder Patterns - Part II
The Beginning
Finally, I am able to write the third (and last) final part of this series. Frankly speaking, when I started working on this article, I wanted to publish only Pipes
design as tip/trick, however, by the time I could finish it, I decided to include other implementations like (River
& Binder
). But, again, I failed to include other patterns in the article as I was never finding time to finish it. So I decided to break it in 3 parts and also add Rails
. This was, so far, my boring story behind this series. Anyway, let's quickly review what we saw in Part I & II.
Pipes
: An "out-in" members based simple design. Also, requires Action<TInput, TOutput> delegate(s) as final outcome handler. (See: Pipes, River, Rails and Binder Patterns - Part I)River
: An improvement overPipes
which obviates the need of "out-in" relationship. The pipeline consists onlyAction<TInput, TOutput>
delegates and input/output pair flows throughout the pipeline. Hence, no need of final outcome handler. (See: Pipes, River, Rails and Binder Patterns - Part II)Rails
: This pattern is similar toRiver
, however, requires an Interface implementation. The input data is a class and implements the required interface where interface represents the pipeline logic. The advantage of this implementation is that multiple input types can be processed by the same pipeline. (See: Pipes, River, Rails and Binder Patterns - Part II)Binder
: An extension overRiver
which decouples pipelines members and provides mean to allocate desired worker threads for each pipeline member. And this part is all about it.
As we construct a pipeline, there are times we want to allocate some threads to a member function (due to structural/tactical/performance or some other weird reasons). Thus, I propose you the design of Binder
. If you have followed the series, you will find the usage of this implementation similar, i.e.
- Create the pipeline.
- Call
StartProcessing
. - Add values to the pipeline using
AddValue
. - Call
StopProcessing
.
Fourth Pattern: Binder
During the implementation of River
, we have seen how to construct a Pipeline in which input and output can flow together. Now, we will extend this idea to have the liberty to independently assign worker threads to pipeline member functions. In order to gain this control, I decided to create a dedicated pipeline instance per member function and bind all those together in chain; without laden the user of code wondering about managing those instances. And, provide the user a simple and familiar (same as other 3 implementations) usage structure.
Let's first look at the code of Binder
class:
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Piper
{
public class Binder<TInput, TOutput>
where TOutput : class, new()
{
private BlockingCollection<DataWrapper> _dataCollection = null;
private ManualResetEvent _completionEvent;
private CancellationTokenSource _cancelTokenSource;
private Binder<TInput, TOutput> _nextBind = null;
private Action<TInput, TOutput> _currentAction;
private int _maxConcurrency;
private string _opCode;
//This Ctor will create the seed instance
public Binder(Action<TInput, TOutput> currFunc,
int maxConcurrency = -1,
string opCode = "MyBinder")
: this(currFunc, maxConcurrency, opCode, new CancellationTokenSource())
{
}
//This Ctor is to create the remaining pipes
private Binder(Action<TInput, TOutput> currFunc,
int maxConcurrency = -1,
string opCode = "MyBinder",
CancellationTokenSource tokenSource = null)
{
_currentAction = currFunc;
_maxConcurrency = maxConcurrency;
_completionEvent = new ManualResetEvent(false);
_dataCollection = new BlockingCollection<DataWrapper>();
_opCode = opCode;
_cancelTokenSource = tokenSource;
}
public Binder<TInput, TOutput> Bind(Action<TInput, TOutput> nextFunc,
int maxConcurrency = -1,
string opCode = "MyBinder1")
{
if (_nextBind == null)
{
_nextBind =
new Binder<TInput, TOutput>
(nextFunc, maxConcurrency, opCode, _cancelTokenSource);
}
else
{
_nextBind.Bind(nextFunc, maxConcurrency, opCode);
}
return this;
}
public bool AddValue(TInput inputValue)
{
return AddValue(new DataWrapper
{
InputVal = inputValue,
OutputVal = new TOutput()
});
}
public void StopProcessing(bool waitForProcessing = false)
{
_dataCollection.CompleteAdding();
if (waitForProcessing)
_completionEvent.WaitOne();
}
public void StartProcessing(Action<TInput, TOutput, string, Exception> errorHandler)
{
if (_nextBind != null)
_nextBind.StartProcessing(errorHandler);
var option = new ParallelOptions
{
MaxDegreeOfParallelism = Math.Max(-1, _maxConcurrency),
CancellationToken = _cancelTokenSource.Token
};
Task.Factory.StartNew(() =>
{
try
{
Parallel.ForEach(_dataCollection.GetConsumingEnumerable(),
option,
currData =>
{
try
{
option.CancellationToken
.ThrowIfCancellationRequested();
_currentAction(currData.InputVal,
currData.OutputVal);
if (_nextBind != null)
_nextBind.AddValue(currData);
}
catch (Exception e)
{
errorHandler(currData.InputVal,
currData.OutputVal,
"Error occurred inside " +
_opCode + " pipeline.",
e);
}
});
}
catch (OperationCanceledException)
{
}
finally
{
if (_nextBind != null)
{
_nextBind._dataCollection.CompleteAdding();
_nextBind._completionEvent.WaitOne();
}
_completionEvent.Set();
}
});
}
public void AbortProcessing()
{
_dataCollection.CompleteAdding();
_cancelTokenSource.Cancel();
}
private bool AddValue(DataWrapper currData)
{
return _dataCollection.TryAdd(currData);
}
private class DataWrapper
{
internal TInput InputVal;
internal TOutput OutputVal;
}
}
}
Binder Dissection
Now, to understand how it works, first we need to know that, in C#, we have a class-level privacy, i.e. private members of class instance are accessible inside that class. Let's take an example of this:
public class MyInput
{
private int a = 1;
public MyInput GetInstanceWithValue(int b)
{
var newInstance = new MyInput();
//This is legal in C#
//accessing private variable of new instance inside the same class
newInstance.a = b;
return newInstance;
}
}
Understanding Construction
Taking advantage of class-level privacy feature, I am allowed to hold the instance of Binder
class privately and can access all the properties/members. And this exactly what I am doing BUT in chain like this (please follow the code comments):
public class Binder<TInput, TOutput>
where TOutput : class, new()
{
private Binder<TInput, TOutput> _nextBind = null;
public Binder<TInput, TOutput> Bind(Action<TInput, TOutput> nextFunc,
int maxConcurrency = -1,
string opCode = "MyBinder1")
{
//If child pipe is not defined, then we define it as current pipe
if (_nextBind == null)
{
//Here we use the private Ctor and pass the same cancellation token source
//this way, cancelling on seed pipe's token will cancel all pipes in chain
_nextBind = new Binder<TInput, TOutput>
(nextFunc, maxConcurrency, opCode, _cancelTokenSource);
}
else
{
//Otherwise, we pass the current pipe to child pipe
//Then, child pipe will check its child pipe and so on...
//thus, given pipe will become the child of the last pipe in the chain
//and for the next Bind() call, current pipe will be the last pipe in the chain
_nextBind.Bind(nextFunc, maxConcurrency, opCode);
}
//Mind it, I am returning the seed instance for every call, thus,
//Consumer of this class can call this method ONLY on SEED pipe.
//This is also required, coz user must call Start/Stop/Abort Processing on seed
//As Seed is the first pipe to process input/output pair and
//we DO not want to overwhelm the user with new instant per BIND.
return this;
}
}
Understanding Processing
Once, we have pipeline constructed (as discussed above), our immediate next goal is to implement the StartProcessing method which can consume such a chain of Binder
instances. To achieve our goal, few things we have to keep in mind which are:
- We must start the pipeline in reverse order (bottom-up approach), i.e. each child pipe
GetConsumingEnumerable()
loop must be initialized before the parent pipe can pass the data to it. This would ensure smooth data processing without any blockage. Top-down approach is also possible but, in this case,BlockingCollection
may start bloating. - After processing each input/output pair, we need to ensure that this pair flows to pipe (as everything is Async) next in chain.
- Upon
CompleteAdding()
call on parent pipe, we must ensure that child pipe has also finished processing all the remaining items. AbortProcessing()
call must abort every pipe in chain : This one is easy to achieve as we are sharing the sameCancellationTokenSource
among pipes.
Let me show you how above mentioned points have been integrated in the code (please follow the code comments):
public void StartProcessing(Action<TInput, TOutput, string, Exception> errorHandler)
{
//If child pipe exist, call its StartProcessing in chain to have bottom-up initialization
if (_nextBind != null)
_nextBind.StartProcessing(errorHandler);
//each pipe will use its own maxConcurrency value
//and share the same cancellation token
var option = new ParallelOptions
{
MaxDegreeOfParallelism = Math.Max(-1, _maxConcurrency),
CancellationToken = _cancelTokenSource.Token
};
Task.Factory.StartNew(() =>
{
try
{
Parallel.ForEach(_dataCollection.GetConsumingEnumerable(),
option,
currData =>
{
try
{
option.CancellationToken
.ThrowIfCancellationRequested();
//Execute the current pipe method with the data of OWN BlockingCollection
//NOTE: if this is inside SEED, then data pair is what USER supplied,
// else it is the processed pair from previous pipe in chain.
_currentAction(currData.InputVal,
currData.OutputVal);
//Pass the pair to child pipe
//NOTE: this call is NOT recursive => it just populate the
// BlockingCollection of immediate child.
//this pair will be available to child ForEach loop
//and then from there to its own child and so on...
if (_nextBind != null)
_nextBind.AddValue(currData);
}
catch (Exception e)
{
errorHandler(currData.InputVal,
currData.OutputVal,
"Error occurred inside " +
_opCode + " pipeline.",
e);
}
});
}
catch (OperationCanceledException)
{
}
finally
{
//If child is not null
if (_nextBind != null)
{
//Close BlockingCollection of Child for data-adding
//Reason: We are here means, parent is done, hence child shouldn't receive data
// All data adding was done above in Try clause
// Or by the user (SEED case) and he called StopProcessing.
_nextBind._dataCollection.CompleteAdding();
//Wait until child gives completion signal
_nextBind._completionEvent.WaitOne();
}
//Give OWN completion signal
//Note: Last child in the chain will give this signal without waiting for
// any other signal, thus,
// its parent could give its OWN signal to its parent and so on...
_completionEvent.Set();
}
});
}
Other functions like AbortProcessing()
, StopProcessing()
etc. are trivial and I let the readers to explore those. Thus, I would say that I am done with the fourth and last pattern of this three (3) parts series.
Binder By Example
Lets me show you a quick (trivial and boring) usage of above pattern:
using System;
using System.Threading.Tasks;
namespace Piper
{
class Program
{
static void Main(string[] args)
{
SomeCode();
GC.Collect();
GC.WaitForPendingFinalizers();
Console.WriteLine("###### DONE ######");
Console.ReadLine();
}
static void SomeCode()
{
var myPipes = new Binder<MyInput, MyOutput>(One, 1, "One")
.Bind(Two, 1, "Two")
.Bind(Three, 2, "Three")// => 2 threads for THREE
.Bind(Four, 1, "Four");
myPipes.StartProcessing(HandleError);
Parallel.For(0, 4, input => myPipes.AddValue(new MyInput()));
myPipes.StopProcessing();
}
static void HandleError<T1, T2>(T1 a, T2 b, string code, Exception e)
{
Console.WriteLine(code + Environment.NewLine + e.Message);
}
static void One(MyInput a, MyOutput b)
{
Console.WriteLine("One on T:" +
System.Threading.Thread.CurrentThread.ManagedThreadId);
}
static void Two(MyInput a, MyOutput b)
{
System.Threading.Thread.Sleep(1000);
Console.WriteLine("Two on T:" +
System.Threading.Thread.CurrentThread.ManagedThreadId);
}
static void Three(MyInput a, MyOutput b)
{
System.Threading.Thread.Sleep(2000);
Console.WriteLine("Three on T:" +
System.Threading.Thread.CurrentThread.ManagedThreadId);
//throw new Exception("Mine");
}
static void Four(MyInput a, MyOutput b)
{
Console.WriteLine("Four (or LAST)");
Console.WriteLine("A=" + a + ",B=" + b +
",On T:" + System.Threading.Thread.CurrentThread.ManagedThreadId);
}
}
public class MyInput
{
//All my Input values
}
public class MyOutput
{
//All my output values
}
}
In the above example, following strategy has been adopted to simulate the processing time in order to show the benefit of dedicated threads per method:
- Function
One()
andFour()
has no wait and single threaded, - Function
Two()
has a wait of 1 second and single threaded. - Function
Three()
has a wait of 2 seconds and has 2 threads.
Using such example, we expect following:
- All calls to
One()
will execute immediately, and thus,BlockingCollection
ofTwo()
will be immediately be populated. - After first 2 executions of
Two()
, we MUST see a single execution ofThree()
(be'coz whileThree()
waits for 2 seconds, two calls ofTwo()
can execute). - Then for each outcome of
Two()
, we MUST see an outcome ofThree()
(double duration, double threads effect). - We must see the outcome of
Four()
, almost next to outcome ofThree()
asFour()
has no wait.
On my personal laptop with 2 cores, I am able to see below listed outcome. I let you prepare and run other examples/tests.
Bonne Année 2015
In this last part of this series, we have seen the fourth and final pattern. I hope you (the readers) liked the series, nonetheless, let me know if you still have any question/comment/suggestion/improvement. Lastly, I wish you all A VERY HAPPY NEW YEAR 2015. May GOD blah blah blah... ENJOY and Keep sharing interesting codes!!! Bonne Année 2015!!! Yu-Hu!!!
<< Pipes, River, Rails and Binder Patterns - Part II
History
This is the V1 of the suggested solution.
Logically, nothing was changed, however, some diagrams for visualization have been added.