5,696,576 members and growing! (17,930 online)
Email Password   helpLost your password?
General Programming » Threads, Processes & IPC » Multi-threading     Advanced License: The Microsoft Public License (Ms-PL)

Pipes and Filters concurrent design pattern using the Concurrency and Coordination Runtime

By itaifrenkel

A modular approach for concurrent message passing
C#, .NET (.NET 3.5, .NET), Architect, Dev

Posted: 8 Oct 2008
Updated: 8 Oct 2008
Views: 2,048
Bookmarked: 16 times
Announcements
Loading...



Search    
Advanced Search
Sitemap
1 vote for this Article.
Popularity: 0.00 Rating: 4.00 out of 5
0 votes, 0.0%
1
0 votes, 0.0%
2
0 votes, 0.0%
3
1 vote, 100.0%
4
0 votes, 0.0%
5
Note: This is an unedited contribution. If this article is inappropriate, needs attention or copies someone else's work without reference then please Report This Article

Introduction

This article shows the usage of the Concurrency and Coordination Runtime (CCR) to implement the Pipes and Filters design pattern. The idea came from the interview Bob Familiar had with Stephen Tarmey.

Disclaimer: Please note that I'm not a concurrency expert and this is my first attempt with the CCR.

Background

The pipe and filters design pattern is used to describe a pipeline where messages are created by source objects, transformed by filter objects and consumed by sink objects. Classic multi-threaded programming is imperative programming, where the developer assigns threads (thread per object, thread per message or an asynchronous thread-pool scheme). For example, an imperative implementation would use Queue<> for pipes, lock access by a ReaderWriter lock and perform the object processing as a standard asynchronous operation on the thread pool.

On the other hand, declerative programming defines "what" to do, but not how to do it. For example, Linq allows to filter an array using a condition, without worrying about the specific order the array is traveresed. Simmiliarly the Concurrency and Coordination Runtime (CCR) allows the developer to declare "what" should happen in each object, without worrying about threads,locks and that kinda stuff (well, almost).

Let's dive into some basic CCR declerations:

using (Dispatcher dispatcher = new Dispatcher(0, "CCR Threads"))
{
   DispatcherQueue dq = new DispatcherQueue("Pipeline DispatcherQueue", dispatcher);
}

The dispatcher is the object that holds a thread pool. The first parameter is the number of threads to be used, where 0 means sets the number of threads to Max(2,NumberOfProcessors) * ThreadPerCPU. The dispatcher queue holds the list of pending delegates that can be executed immidiatly and are waiting for a thread to become available. For more information see the article Concurrency and Coordination Runtime

Here is a snippet that registers three methods listening on three ports. A port holds messages and holds methods interested in processing these messages.

public void Initialize(
               DispatcherQueue dispatcherQueue,
               PortSet<correctpinmessage> inputPort,
               PortSet<openalarm> outputPort)
        {
        ...
            Arbiter.Activate(
                dispatcherQueue,
                Arbiter.Interleave(
                    new TeardownReceiverGroup(),
                    new ExclusiveReceiverGroup(
                        Arbiter.Receive(true, _timeoutPort, HandleTimeout)),
                    new ConcurrentReceiverGroup(
                        Arbiter.Receive(true, InputPort.P0, HandleSuccessMessage),
                        Arbiter.Receive(true, InputPort.P1, HandleFailureMessage))));
         

}

read the code like this:

  • Each time the inputPort.Port0<correctpinmessage> receives a CorrectPinMessage object, call the HandleSuccessMessage.
  • Each time the inputPort.Port1<wrongpinmessage> receives a WrongPinMessage object, call the HandleFailureMessage.
  • Each time the _timeoutPort<datetime> receives a DateTime object, call the HandleTimeout method.

The Arbiter.Receive first parameter (persist = true) indicates that the method continues receiving messages after handling the first message.

The Arbiter.Interleave decleration defines the locking mechanism that the CCR uses. HandleSuccessMessage and HandleFailureMessage are defined in the concurrent group (in essence simmiliar to the ReaderWriterLock.AcquireReaderLock()).

The concurrent handlers implementation has to be thread safe (achieved by using interlocked increment):

void HandleFailureMessage(WrongPinMessage wrongPinMessage)
{
    if (Interlocked.Increment(ref _wrongPinCount) == 1)
    {
       _firstWrongPinMessage = wrongPinMessage;
       _dispatcherQueue.EnqueueTimer(_pollingPeriod, _timeoutPort);
    }
}

void HandleSuccessMessage(CorrectPinMessage correctPinMessage)
{
    Interlocked.Increment(ref _correctPinCount);
}

Notice that when the first WrongPinMessage is received the timer is started. A few milliseconds later the _timeoutPort receivea a DateTime object which triggers the HandleTimeout method call.

Since the HandleTimeout is defined in the exclusive group (in essence simmiliar to the ReaderWriterLock.AcquireWriterLock()) the handler implementation is not required to be thread safe. It access the _wrongPinCount member without using any locks or interlocked calls:

void HandleTimeout(DateTime time)
{
   CardStatus newStatus = _currentCardStatus;

   if (_correctPinCount > 0)
   {
       newStatus = CardStatus.Ok;
   }
   else if (_wrongPinCount > 3)
   {
        newStatus = CardStatus.Stolen;
   }
   else if (_wrongPinCount > 0)
   {
        newStatus = CardStatus.Warning;
   }
   ...
}

The HandleTimeout method "assumes" that X milliseconds have passed since the first WrongPinMessage was called. It uses the total number of messages received in that period of time to change the internal state of the state machine.

For more information on CCR basic concepts see the video CCR Programming - Jeffrey Richter and George Chrysanthakopoulos and the OOPSLA/SCOOL paper on the CCR.
The Ccr.Core.dll is available for download as part of Microsoft Robotics Studio.

Using the code

The code in this article defines a pipeline abstraction of a source (that has an output port - in green) of a filter (that has both an input port and output port - in blue) and a sink (that has only an input port - in yellow).

The demo project describes a worker that has a card with a secret PIN (personal identification number). The CorrectPinMessage describes the event of a worker that typed the correct PIN, and simmiliarly WrongPinMessage represents an incorrect PIN number. the "worker mock source" object creates new messages, the state machine consolidates simmiliar events by counting them. Each state change results in an OpenAlert message or a CloseAlert message which are displayed on the console window by the alarm sink.

Each pipeline object implement either IMessageSource or IMessageFilter or IMessageSink. The interface Initialize method is used to call the CCR Arbiter.Activate() method (which register the HandleXXX methods with Ports) . The Start method is used to start generating messages.

interface IMessageSink where TInputPort : IPort
{
   TInputPort InputPortSet { get; }
   void Initialize(DispatcherQueue dispatcherQueue, TInputPort inputPortSet);
}

interface IMessageSource where TOutputPort: IPort
{
    TOutputPort OutputPortSet { get; }
    void Initialize(DispatcherQueue dispatcherQueue, TOutputPort outputPortSet);
    void Start();
}

interface IMessageFilter
{
    TInputPort InputPort { get; }
    TOutputPort OutputPort { get; }
    void Initialize(DispatcherQueue dispatcherQueue, TInputPort inputPort, TOutputPort outputPort);
}

Extension methods are used to provide serial and parallel connection of these objects. A serial connection is the natural source ==> filter ==> sink message flow connection.
A parallel connection allows N sources to post messages to the same output port.

static public IMessageSource ConnectTo(
            this IMessageSource source,
            IMessageFilter filter)
            where TInputPort : IPort,new()
            where TOutputPort : IPort
{ ... }

public static IMessageSource ConnectInParallel(
            this IMessageSource source1, 
            IMessageSource source2)
            where TInputPort : IPort
{ ... }

When a source is connected to a sink a complete pipeline is created.

interface IMessagePipeLine
{
     void Start(DispatcherQueue dispatcherQueue);
}
    
public static IMessagePipeLine ConnectTo(
            this IMessageSource source, 
            IMessageSink sink)
            where TPort : IPort,new()
{ ... }

State Machine per Worker

In order to save a state machine per worker, we need to demultiplex the messages into new state machines.

Disclaimer: The following implementation is not optimal. I'd be happy to hear your comments about it.

The first filter converts messages of type T into KeyValuePair messages. The key is the name of the worker that typed the PIN.

class WorkerKeyValueFilter : 
   IMessageFilter<PortSet<CorrectPinMessage>, Port<KeyValuePair<string,CorrectPinMessage>>>
{
   void HandleMessage<T>(T message) where T : WorkCardSwipeMessageBase
   {
      OutputPort.Post(new KeyValuePair<string,T>(message.Name, message));
   }
}

The second filter uses the key to post the message to the dedicated port.
class DemuxMessageFilter<TMessageFilter, TInputPort,TOutputPort,TKey> 
        : IMessageFilter<TInputPort,TOutputPort>
        where TMessageFilter : IMessageFilter<TInputPort,TOutputPort>, new() 
        where TInputPort : IPortSet,new() 
        where TOutputPort : IPort
{

   void HandleMessage(KeyValuePair<TKey,object> message) 
   { 
      var messageFilter = GetMessageFilter(message.Key); 
      messageFilter.InputPort.PostUnknownType(message.Value); 
   } 

   TMessageFilter GetMessageFilter(TKey key) 
   { 
      TMessageFilter filter; 
      if (!_messageFilters.TryGetValue(key, out filter)) 
      { 
         filter = new TMessageFilter(); 
         filter.Initialize(_dispatcherQueue, new TInputPort(), OutputPort); 
         _messageFilters.Add(key, filter); 
      } 
      return filter; 
   }
}

History

10-oct-2008 initial version

License

This article, along with any associated source code and files, is licensed under The Microsoft Public License (Ms-PL)

About the Author

itaifrenkel



Occupation: Software Developer
Location: Israel Israel

Other popular Threads, Processes & IPC articles:

Article Top
Sign Up to vote for this article
You must Sign In to use this message board.
FAQ FAQ Noise ToleranceSearch Search Messages 
 Layout  Per page   
 Msgs 1 to 1 of 1 (Total in Forum: 1) (Refresh)FirstPrevNext
GeneralRelated Post: Parallel Recursion with CCRmemberitaifrenkel10:14 11 Oct '08  

General General    News News    Question Question    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

PermaLink | Privacy | Terms of Use
Last Updated: 8 Oct 2008
Editor:
Copyright 2008 by itaifrenkel
Everything else Copyright © CodeProject, 1999-2008
Web15 | Advertise on the Code Project