|
|||||||||||||||||||||||||||||||||||||||||
|
|||||||||||||||||||||||||||||||||||||||||
|
Announcements
Chapters
Services
Feature Zones
|
Note: This is an unedited contribution. If this article is inappropriate,
needs attention or copies someone else's work without reference then please
Report This Article
IntroductionThis article shows the usage of the Concurrency and Coordination Runtime (CCR) to implement the Pipes and Filters design pattern. The idea came from the interview Bob Familiar had with Stephen Tarmey. Disclaimer: Please note that I'm not a concurrency expert and this is my first attempt with the CCR.BackgroundThe pipe and filters design pattern is used to describe a pipeline where messages are created by source objects, transformed by filter objects and consumed by sink objects. Classic multi-threaded programming is imperative programming, where the developer assigns threads (thread per object, thread per message or an asynchronous thread-pool scheme). For example, an imperative implementation would use Queue<> for pipes, lock access by a ReaderWriter lock and perform the object processing as a standard asynchronous operation on the thread pool. On the other hand, declerative programming defines "what" to do, but not how to do it. For example, Linq allows to filter an array using a condition, without worrying about the specific order the array is traveresed. Simmiliarly the Concurrency and Coordination Runtime (CCR) allows the developer to declare "what" should happen in each object, without worrying about threads,locks and that kinda stuff (well, almost). Let's dive into some basic CCR declerations: using (Dispatcher dispatcher = new Dispatcher(0, "CCR Threads"))
{
DispatcherQueue dq = new DispatcherQueue("Pipeline DispatcherQueue", dispatcher);
}
The dispatcher is the object that holds a thread pool. The first parameter is the number of threads to be used, where 0 means sets the number of threads to Max(2,NumberOfProcessors) * ThreadPerCPU. The dispatcher queue holds the list of pending delegates that can be executed immidiatly and are waiting for a thread to become available. For more information see the article Concurrency and Coordination Runtime Here is a snippet that registers three methods listening on three ports. A port holds messages and holds methods interested in processing these messages. public void Initialize(
DispatcherQueue dispatcherQueue,
PortSet<correctpinmessage> inputPort,
PortSet<openalarm> outputPort)
{
...
Arbiter.Activate(
dispatcherQueue,
Arbiter.Interleave(
new TeardownReceiverGroup(),
new ExclusiveReceiverGroup(
Arbiter.Receive(true, _timeoutPort, HandleTimeout)),
new ConcurrentReceiverGroup(
Arbiter.Receive(true, InputPort.P0, HandleSuccessMessage),
Arbiter.Receive(true, InputPort.P1, HandleFailureMessage))));
}
read the code like this:
The The The concurrent handlers implementation has to be thread safe (achieved by using interlocked increment): void HandleFailureMessage(WrongPinMessage wrongPinMessage)
{
if (Interlocked.Increment(ref _wrongPinCount) == 1)
{
_firstWrongPinMessage = wrongPinMessage;
_dispatcherQueue.EnqueueTimer(_pollingPeriod, _timeoutPort);
}
}
void HandleSuccessMessage(CorrectPinMessage correctPinMessage)
{
Interlocked.Increment(ref _correctPinCount);
}
Notice that when the first Since the void HandleTimeout(DateTime time)
{
CardStatus newStatus = _currentCardStatus;
if (_correctPinCount > 0)
{
newStatus = CardStatus.Ok;
}
else if (_wrongPinCount > 3)
{
newStatus = CardStatus.Stolen;
}
else if (_wrongPinCount > 0)
{
newStatus = CardStatus.Warning;
}
...
}
The For more information on CCR basic concepts see the video CCR Programming - Jeffrey Richter and George Chrysanthakopoulos and the OOPSLA/SCOOL paper on the CCR. Using the codeThe code in this article defines a pipeline abstraction of a source (that has an output port - in green) of a filter (that has both an input port and output port - in blue) and a sink (that has only an input port - in yellow). Each pipeline object implement either interface IMessageSink
Extension methods are used to provide serial and parallel connection of these objects. A serial connection is the natural source ==> filter ==> sink message flow connection. static public IMessageSource
When a source is connected to a sink a complete pipeline is created. interface IMessagePipeLine
{
void Start(DispatcherQueue dispatcherQueue);
}
public static IMessagePipeLine ConnectTo
State Machine per WorkerIn order to save a state machine per worker, we need to demultiplex the messages into new state machines. The first filter converts messages of type T into KeyValuePair class WorkerKeyValueFilter :
IMessageFilter<PortSet<CorrectPinMessage>, Port<KeyValuePair<string,CorrectPinMessage>>>
{
void HandleMessage<T>(T message) where T : WorkCardSwipeMessageBase
{
OutputPort.Post(new KeyValuePair<string,T>(message.Name, message));
}
}
The second filter uses the key to post the message to the dedicated port. class DemuxMessageFilter<TMessageFilter, TInputPort,TOutputPort,TKey>
: IMessageFilter<TInputPort,TOutputPort>
where TMessageFilter : IMessageFilter<TInputPort,TOutputPort>, new()
where TInputPort : IPortSet,new()
where TOutputPort : IPort
{
void HandleMessage(KeyValuePair<TKey,object> message)
{
var messageFilter = GetMessageFilter(message.Key);
messageFilter.InputPort.PostUnknownType(message.Value);
}
TMessageFilter GetMessageFilter(TKey key)
{
TMessageFilter filter;
if (!_messageFilters.TryGetValue(key, out filter))
{
filter = new TMessageFilter();
filter.Initialize(_dispatcherQueue, new TInputPort(), OutputPort);
_messageFilters.Add(key, filter);
}
return filter;
}
}
History10-oct-2008 initial version
|
||||||||||||||||||||||||||||||||||||||||