65.9K
CodeProject is changing. Read more.
Home

Pipeline and Filters Pattern using C#

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.91/5 (8 votes)

Apr 20, 2016

CPOL

1 min read

viewsIcon

51395

Implementing Pipeline and Filters pattern using C#

Introduction

Pipeline and filters is a very useful and neat pattern in the scenario when a set of filtering (processing) needs to be performed on an object to transform it into a useful state, as described below in this picture.

The code used in this article is the complete implementation of Pipeline and Filter pattern in a generic fashion.

Background

Understanding of messaging patterns is critical to understand Enterprise Integration patterns:

Using the Code

The code here considers there is an object repository AgentStatusRepository, which has a static property Agents to return List<Agent>, this list of AgentStatus is the input to the pipeline on which various filtering needs to happen.

AgentStatus class (Step 1)

 public class AgentStatus
    {
        public string AgentId { get; set; }
        public string PresenceStatus { get; set; }
        public int CurrentWorkload { get; set; }
        public int MaxWorkload { get; set; }
        public DateTime StatusUpdateDatetime { get; set; }
    }

Possible Values of PresenceStatus:

  • Available
  • Busy
  • Research
  • Offline

And below is the list of filtering that needs to be performed on the List<AgentStatus> object (Object1 in the picture above), retrieved from the AgentStatusRepository:

  • AgentAvailabilityFilter (Filter1 in the picture above) - Filter the List<Agent> where Agent.PresenceStatus == 'Available'
  • AgentWorkloadFilter (Filter2 in the picture above) - Filter the list filtered from Filter1 where Agent.CurrentWorkload < Agent.MaxWorkload
  • AgentPresenceUpdateDatetimeFilter (Filter3 in the picture above) - Filter the list filtered from Filter2 where (DateTime.UtcNow - Agent.StatusUpdateDatetime).TotalMinutes < 3

AgentStatusRepository code:

 public class AgentStatusRepository
    {
        public static List<Agent> Agents
        {
            get
            {
                return new List<Agent>
                {
                    new Agent {AgentId = "agent_id_1", 
                    PresenceStatus = "Available", CurrentWorkload = 1, 
                    MaxWorkload = 2, StatusUpdateDatetime = DateTime.UtcNow },
                    new Agent {AgentId = "agent_id_2", 
                    PresenceStatus = "Busy", CurrentWorkload = 2, 
                    MaxWorkload = 2, StatusUpdateDatetime = DateTime.UtcNow },
                    new Agent {AgentId = "agent_id_3", 
                    PresenceStatus = "Available", CurrentWorkload = 2, 
                    MaxWorkload = 3, StatusUpdateDatetime = DateTime.UtcNow.AddMinutes (-5) },
                    new Agent {AgentId = "agent_id_4", 
                    PresenceStatus = "Research", CurrentWorkload = 2, 
                    MaxWorkload = 3, StatusUpdateDatetime = DateTime.UtcNow},
                    new Agent {AgentId = "agent_id_5", 
                    PresenceStatus = "Available", CurrentWorkload = 2, 
                    MaxWorkload = 3, StatusUpdateDatetime = DateTime.UtcNow },
                    new Agent {AgentId = "agent_id_6", 
                    PresenceStatus = "Available", CurrentWorkload = 5, 
                    MaxWorkload = 5, StatusUpdateDatetime = DateTime.UtcNow}
                };
            }
        }
    }

Pipeline and Filters Class Diagram:

IFilter code:

/// <summary>
    /// A filter to be registered in the message processing pipeline
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public interface IFilter<T>
    {
        /// <summary>
        /// Filter implementing this method would perform processing on the input type T
        /// </summary>
        /// <param name="input">The input to be executed by the filter</param>
        /// <returns></returns>
        T Execute(T input);       
    }

Pipeline code:

 /// <summary>
    /// An abstract Pipeline with a list of filters and abstract Process method
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public abstract class Pipeline<T>
    {
        /// <summary>
        /// List of filters in the pipeline
        /// </summary>
        protected readonly List<IFilter<T>> filters = new List<IFilter<T>>();

        /// <summary>
        /// To Register filter in the pipeline
        /// </summary>
        /// <param name="filter">A filter object implementing IFilter interface</param>
        /// <returns></returns>
        public Pipeline<T> Register(IFilter<T> filter)
        {
            filters.Add(filter);
            return this;
        }

        /// <summary>
        /// To start processing on the Pipeline
        /// </summary>
        /// <param name="input">
        /// The input object on which filter processing would execute</param>
        /// <returns></returns>
        public abstract T Process(T input);       
    }

AgentSelectionPipeline (ConcretePipeline) code:

    /// <summary>
    /// Pipeline which to select final list of applicable agents
    /// </summary>
    public class AgentSelectionPipeline : Pipeline<IEnumerable<Agent>>
    {
        /// <summary>
        /// Method which executes the filter on a given Input
        /// </summary>
        /// <param name="input">Input on which filtering 
        /// needs to happen as implementing in individual filters</param>
        /// <returns></returns>
        public override IEnumerable<Agent> Process(IEnumerable<Agent> input)
        {
            foreach (var filter in filters)
            {
                input = filter.Execute(input);
            }

            return input;
        }       
    }

AgentAvailabilityFilter code (ConcreteFilter 1):

  /// <summary>
    /// This output of this filter is the list of all available agents
    /// </summary>
    public class AgentAvailabilityFilter : IFilter<IEnumerable<Agent>>
    {
        public IEnumerable<Agent> Execute(IEnumerable<Agent> input)
        {
            if (input == null || input.Count() < 1)
            {
                return input;
            }             
            return input.Where(agent => agent.PresenceStatus == "Available");
        }
    }

AgentWorkloadFilter code (ConcreteFilter 2):

    /// <summary>
    /// The output of this filter is the list of Agent for which CurrentWorkload is less than MaxWorklod
    /// </summary>
    public class AgentWorkloadFilter : IFilter<IEnumerable<Agent>>
    {
        public IEnumerable<Agent> Execute(IEnumerable<Agent> input)
        {
            if (input == null || input.Count() < 1)
            {
                return input;
            }

            return input.Where(agent => agent.CurrentWorkload < agent.MaxWorkload);
        }
    }

AgentPresenceUpdateDatetimeFilter code (ConcreteFilter 3):

    /// <summary>
    /// The output of this filter is the list of 
    /// all those agents who has StatusUpdateDatetime less than 3 minutes
    /// </summary>
    public class AgentPresenceUpdateDatetimeFilter : IFilter<IEnumerable<Agent>>
    {
        public IEnumerable<Agent> Execute(IEnumerable<Agent> input)
        {
            if (input == null || input.Count() < 1)
            {
                return input;
            }

            return input.Where(agent => 
            (DateTime.UtcNow - agent.StatusUpdateDatetime).TotalMinutes < 3) ;
        }
    }

Pipeline and Filter, how to use code:

    public class Program
    {
        static void Main(string[] args)
        {
            //Get the Agents from repository
            var agentsStatus = AgentStatusRepository.Agents;

            //Construct the Pipeline object
            AgentSelectionPipeline agentStatusPipeline = new AgentSelectionPipeline();
            
            //Register the filters to be executed
             agentStatusPipeline.Register(new AgentAvailabilityFilter())
                .Register(new AgentWorkloadFilter())
                .Register(new AgentPresenceUpdateDatetimeFilter());

            //Start pipeline processing
            var agentsStatus_1 = agentStatusPipeline.Process(agentsStatus);            
        }
    }