Click here to Skip to main content
15,897,704 members
Articles / Web Development / ASP.NET

Pipeline and Filters Pattern using C#

Rate me:
Please Sign up or sign in to vote.
4.91/5 (8 votes)
19 Apr 2016CPOL1 min read 50.3K   18   3
Implementing Pipeline and Filters pattern using C#

Introduction

Pipeline and filters is a very useful and neat pattern in the scenario when a set of filtering (processing) needs to be performed on an object to transform it into a useful state, as described below in this picture.

The code used in this article is the complete implementation of Pipeline and Filter pattern in a generic fashion.

Image 1

Background

Understanding of messaging patterns is critical to understand Enterprise Integration patterns:

Using the Code

The code here considers there is an object repository AgentStatusRepository, which has a static property Agents to return List<Agent>, this list of AgentStatus is the input to the pipeline on which various filtering needs to happen.

AgentStatus class (Step 1)

C#
public class AgentStatus
   {
       public string AgentId { get; set; }
       public string PresenceStatus { get; set; }
       public int CurrentWorkload { get; set; }
       public int MaxWorkload { get; set; }
       public DateTime StatusUpdateDatetime { get; set; }
   }

Possible Values of PresenceStatus:

  • Available
  • Busy
  • Research
  • Offline

And below is the list of filtering that needs to be performed on the List<AgentStatus> object (Object1 in the picture above), retrieved from the AgentStatusRepository:

  • AgentAvailabilityFilter (Filter1 in the picture above) - Filter the List<Agent> where Agent.PresenceStatus == 'Available'
  • AgentWorkloadFilter (Filter2 in the picture above) - Filter the list filtered from Filter1 where Agent.CurrentWorkload < Agent.MaxWorkload
  • AgentPresenceUpdateDatetimeFilter (Filter3 in the picture above) - Filter the list filtered from Filter2 where (DateTime.UtcNow - Agent.StatusUpdateDatetime).TotalMinutes < 3

AgentStatusRepository code:

C#
public class AgentStatusRepository
   {
       public static List<Agent> Agents
       {
           get
           {
               return new List<Agent>
               {
                   new Agent {AgentId = "agent_id_1",
                   PresenceStatus = "Available", CurrentWorkload = 1,
                   MaxWorkload = 2, StatusUpdateDatetime = DateTime.UtcNow },
                   new Agent {AgentId = "agent_id_2",
                   PresenceStatus = "Busy", CurrentWorkload = 2,
                   MaxWorkload = 2, StatusUpdateDatetime = DateTime.UtcNow },
                   new Agent {AgentId = "agent_id_3",
                   PresenceStatus = "Available", CurrentWorkload = 2,
                   MaxWorkload = 3, StatusUpdateDatetime = DateTime.UtcNow.AddMinutes (-5) },
                   new Agent {AgentId = "agent_id_4",
                   PresenceStatus = "Research", CurrentWorkload = 2,
                   MaxWorkload = 3, StatusUpdateDatetime = DateTime.UtcNow},
                   new Agent {AgentId = "agent_id_5",
                   PresenceStatus = "Available", CurrentWorkload = 2,
                   MaxWorkload = 3, StatusUpdateDatetime = DateTime.UtcNow },
                   new Agent {AgentId = "agent_id_6",
                   PresenceStatus = "Available", CurrentWorkload = 5,
                   MaxWorkload = 5, StatusUpdateDatetime = DateTime.UtcNow}
               };
           }
       }
   }

Pipeline and Filters Class Diagram:

Image 2

IFilter code:

C#
/// <summary>
    /// A filter to be registered in the message processing pipeline
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public interface IFilter<T>
    {
        /// <summary>
        /// Filter implementing this method would perform processing on the input type T
        /// </summary>
        /// <param name="input">The input to be executed by the filter</param>
        /// <returns></returns>
        T Execute(T input);       
    }

Pipeline code:

C#
/// <summary>
   /// An abstract Pipeline with a list of filters and abstract Process method
   /// </summary>
   /// <typeparam name="T"></typeparam>
   public abstract class Pipeline<T>
   {
       /// <summary>
       /// List of filters in the pipeline
       /// </summary>
       protected readonly List<IFilter<T>> filters = new List<IFilter<T>>();

       /// <summary>
       /// To Register filter in the pipeline
       /// </summary>
       /// <param name="filter">A filter object implementing IFilter interface</param>
       /// <returns></returns>
       public Pipeline<T> Register(IFilter<T> filter)
       {
           filters.Add(filter);
           return this;
       }

       /// <summary>
       /// To start processing on the Pipeline
       /// </summary>
       /// <param name="input">
       /// The input object on which filter processing would execute</param>
       /// <returns></returns>
       public abstract T Process(T input);
   }

AgentSelectionPipeline (ConcretePipeline) code:

C#
/// <summary>
/// Pipeline which to select final list of applicable agents
/// </summary>
public class AgentSelectionPipeline : Pipeline<IEnumerable<Agent>>
{
    /// <summary>
    /// Method which executes the filter on a given Input
    /// </summary>
    /// <param name="input">Input on which filtering
    /// needs to happen as implementing in individual filters</param>
    /// <returns></returns>
    public override IEnumerable<Agent> Process(IEnumerable<Agent> input)
    {
        foreach (var filter in filters)
        {
            input = filter.Execute(input);
        }

        return input;
    }
}

AgentAvailabilityFilter code (ConcreteFilter 1):

C#
/// <summary>
  /// This output of this filter is the list of all available agents
  /// </summary>
  public class AgentAvailabilityFilter : IFilter<IEnumerable<Agent>>
  {
      public IEnumerable<Agent> Execute(IEnumerable<Agent> input)
      {
          if (input == null || input.Count() < 1)
          {
              return input;
          }
          return input.Where(agent => agent.PresenceStatus == "Available");
      }
  }

AgentWorkloadFilter code (ConcreteFilter 2):

C#
/// <summary>
/// The output of this filter is the list of Agent for which CurrentWorkload is less than MaxWorklod
/// </summary>
public class AgentWorkloadFilter : IFilter<IEnumerable<Agent>>
{
    public IEnumerable<Agent> Execute(IEnumerable<Agent> input)
    {
        if (input == null || input.Count() < 1)
        {
            return input;
        }

        return input.Where(agent => agent.CurrentWorkload < agent.MaxWorkload);
    }
}

AgentPresenceUpdateDatetimeFilter code (ConcreteFilter 3):

C#
/// <summary>
/// The output of this filter is the list of
/// all those agents who has StatusUpdateDatetime less than 3 minutes
/// </summary>
public class AgentPresenceUpdateDatetimeFilter : IFilter<IEnumerable<Agent>>
{
    public IEnumerable<Agent> Execute(IEnumerable<Agent> input)
    {
        if (input == null || input.Count() < 1)
        {
            return input;
        }

        return input.Where(agent =>
        (DateTime.UtcNow - agent.StatusUpdateDatetime).TotalMinutes < 3) ;
    }
}

Pipeline and Filter, how to use code:

C#
public class Program
{
    static void Main(string[] args)
    {
        //Get the Agents from repository
        var agentsStatus = AgentStatusRepository.Agents;

        //Construct the Pipeline object
        AgentSelectionPipeline agentStatusPipeline = new AgentSelectionPipeline();

        //Register the filters to be executed
         agentStatusPipeline.Register(new AgentAvailabilityFilter())
            .Register(new AgentWorkloadFilter())
            .Register(new AgentPresenceUpdateDatetimeFilter());

        //Start pipeline processing
        var agentsStatus_1 = agentStatusPipeline.Process(agentsStatus);
    }
}

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer Microsoft
United States United States
I'm a Software Engineer working for Microsoft, Redmond, USA

Comments and Discussions

 
GeneralMy vote of 5 Pin
MyNameIsCode21-Nov-18 0:56
MyNameIsCode21-Nov-18 0:56 
GeneralRe: My vote of 5 Pin
SatishK Ranjan20-Sep-19 13:53
professionalSatishK Ranjan20-Sep-19 13:53 
QuestionDataflow (Task Parallel Library) Pin
Александр Алексеев21-Apr-16 7:35
Александр Алексеев21-Apr-16 7:35 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.