Click here to Skip to main content
15,892,927 members
Articles / Programming Languages / C#

Pipes and Filters concurrent Design Pattern using the Concurrency and Coordination Runtime

Rate me:
Please Sign up or sign in to vote.
3.67/5 (3 votes)
8 Oct 2008Ms-PL4 min read 37.6K   373   21  
A modular approach for concurrent message passing.
using System;
using System.Collections.Generic;
using System.Threading;
using Microsoft.Ccr.Core;

namespace WorkCardDemo
{
   
    class StateMachineFilter : IMessageFilter<PortSet<CorrectPinMessage, WrongPinMessage>, PortSet<OpenAlarm, CloseAlarm>>
    {
        enum CardStatus { Ok, Warning , Stolen };

        public StateMachineFilter()
        {
            _timeoutPort = new Port<DateTime>();
        }
        #region IMessageFilter<SuccessMessage,FailureMessage> Members

        public PortSet<CorrectPinMessage, WrongPinMessage> InputPort { get; private set; }

        public PortSet<OpenAlarm, CloseAlarm> OutputPort { get; private set; }

        public void Initialize(
               DispatcherQueue dispatcherQueue,
               PortSet<CorrectPinMessage, WrongPinMessage> inputPortSet,
               PortSet<OpenAlarm, CloseAlarm> outputPortSet)
        {
            _dispatcherQueue = dispatcherQueue;
            _currentCardStatus = CardStatus.Ok;
            InputPort = inputPortSet;
            OutputPort = outputPortSet;

            Arbiter.Activate(
                _dispatcherQueue,
                Arbiter.Interleave(
                    new TeardownReceiverGroup(),
                    new ExclusiveReceiverGroup(
                        Arbiter.Receive(true, _timeoutPort, HandleTimeout)),
                    new ConcurrentReceiverGroup(
                        Arbiter.Receive(true, InputPort.P0, HandleSuccessMessage),
                        Arbiter.Receive(true, InputPort.P1, HandleFailureMessage))));

            ResetCounters();
        }


        #endregion

        CardStatus _currentCardStatus;

        private int _correctPinCount;
        private int _wrongPinCount;
        private WrongPinMessage _firstWrongPinMessage;
        private int _alarmId;
        
        private readonly Port<DateTime> _timeoutPort;
        private DispatcherQueue _dispatcherQueue;

        #region configuration
        private static readonly TimeSpan _pollingPeriod=TimeSpan.FromSeconds(5);
        #endregion

        
        void HandleTimeout(DateTime time)
        {
            var newCardStatus = CalculateHealthState();
            if (_currentCardStatus != CardStatus.Ok && newCardStatus == CardStatus.Ok)
            {
                OutputPort.P1.Post(new CloseAlarm(_alarmId));
            }
            else if (_currentCardStatus == CardStatus.Ok && newCardStatus == CardStatus.Stolen)
            {
                var alarm = new OpenAlarm(_firstWrongPinMessage.Name, AlarmSeverity.Critical);
                OutputPort.P0.Post(alarm);
                _alarmId = alarm.Id;
            }
            else if (_currentCardStatus == CardStatus.Warning && newCardStatus == CardStatus.Stolen)
            {
                OutputPort.P0.Post(new OpenAlarm(_firstWrongPinMessage.Name, AlarmSeverity.Critical, _alarmId));
            }
            else if (_currentCardStatus == CardStatus.Ok && newCardStatus == CardStatus.Warning)
            {
                var alarm = new OpenAlarm(_firstWrongPinMessage.Name, AlarmSeverity.Warning);
                OutputPort.P0.Post(alarm);
                _alarmId = alarm.Id;
            }
            _currentCardStatus = newCardStatus;

            if (_currentCardStatus == CardStatus.Ok)
            {
                ResetCounters();
            }
            else
            {
                ResetTimer();
            }            
        }

        private void ResetTimer()
        {
            _dispatcherQueue.EnqueueTimer(_pollingPeriod, _timeoutPort);
        }

        private CardStatus CalculateHealthState()
        {
            CardStatus newStatus = _currentCardStatus;

            if (_correctPinCount > 0)
            {
                newStatus = CardStatus.Ok;
            }
            else if (_wrongPinCount > 3)
            {
                newStatus = CardStatus.Stolen;
            }
            else if (_wrongPinCount > 0)
            {
                newStatus = CardStatus.Warning;
            }
            return newStatus;
        }

        void ResetCounters()
        {
            _firstWrongPinMessage = null;
            _correctPinCount = 0;
            _wrongPinCount = 0;
        }

        void HandleSuccessMessage(CorrectPinMessage correctPinMessage)
        {
            Interlocked.Increment(ref _correctPinCount);
        }

        void HandleFailureMessage(WrongPinMessage wrongPinMessage)
        {
            if (Interlocked.Increment(ref _wrongPinCount) == 1)
            {
                _firstWrongPinMessage = wrongPinMessage;
                ResetTimer();
            }
        }
    }

  
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Microsoft Public License (Ms-PL)


Written By
Software Developer
Israel Israel
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.

Comments and Discussions