Pipeline and Filters Pattern using C#






4.91/5 (8 votes)
Implementing Pipeline and Filters pattern using C#
Introduction
Pipeline and filters is a very useful and neat pattern in the scenario when a set of filtering (processing) needs to be performed on an object to transform it into a useful state, as described below in this picture.
The code used in this article is the complete implementation of Pipeline and Filter pattern in a generic fashion.
Background
Understanding of messaging patterns is critical to understand Enterprise Integration patterns:
Using the Code
The code here considers there is an object repository AgentStatusRepository
, which has a static
property Agents
to return List<Agent>
, this list of AgentStatus
is the input to the pipeline on which various filtering needs to happen.
AgentStatus class (Step 1)
public class AgentStatus
{
public string AgentId { get; set; }
public string PresenceStatus { get; set; }
public int CurrentWorkload { get; set; }
public int MaxWorkload { get; set; }
public DateTime StatusUpdateDatetime { get; set; }
}
Possible Values of PresenceStatus
:
Available
Busy
Research
Offline
And below is the list of filtering that needs to be performed on the List<AgentStatus>
object (Object1
in the picture above), retrieved from the AgentStatusRepository
:
AgentAvailabilityFilter
(Filter1
in the picture above) - Filter theList<Agent>
whereAgent.PresenceStatus == 'Available'
AgentWorkloadFilter
(Filter2
in the picture above) - Filter the list filtered fromFilter1
whereAgent.CurrentWorkload < Agent.MaxWorkload
AgentPresenceUpdateDatetimeFilter
(Filter3
in the picture above) - Filter the list filtered fromFilter2
where(DateTime.UtcNow - Agent.StatusUpdateDatetime).TotalMinutes < 3
AgentStatusRepository
code:
public class AgentStatusRepository
{
public static List<Agent> Agents
{
get
{
return new List<Agent>
{
new Agent {AgentId = "agent_id_1",
PresenceStatus = "Available", CurrentWorkload = 1,
MaxWorkload = 2, StatusUpdateDatetime = DateTime.UtcNow },
new Agent {AgentId = "agent_id_2",
PresenceStatus = "Busy", CurrentWorkload = 2,
MaxWorkload = 2, StatusUpdateDatetime = DateTime.UtcNow },
new Agent {AgentId = "agent_id_3",
PresenceStatus = "Available", CurrentWorkload = 2,
MaxWorkload = 3, StatusUpdateDatetime = DateTime.UtcNow.AddMinutes (-5) },
new Agent {AgentId = "agent_id_4",
PresenceStatus = "Research", CurrentWorkload = 2,
MaxWorkload = 3, StatusUpdateDatetime = DateTime.UtcNow},
new Agent {AgentId = "agent_id_5",
PresenceStatus = "Available", CurrentWorkload = 2,
MaxWorkload = 3, StatusUpdateDatetime = DateTime.UtcNow },
new Agent {AgentId = "agent_id_6",
PresenceStatus = "Available", CurrentWorkload = 5,
MaxWorkload = 5, StatusUpdateDatetime = DateTime.UtcNow}
};
}
}
}
Pipeline and Filters Class Diagram:
IFilter
code:
/// <summary>
/// A filter to be registered in the message processing pipeline
/// </summary>
/// <typeparam name="T"></typeparam>
public interface IFilter<T>
{
/// <summary>
/// Filter implementing this method would perform processing on the input type T
/// </summary>
/// <param name="input">The input to be executed by the filter</param>
/// <returns></returns>
T Execute(T input);
}
Pipeline
code:
/// <summary>
/// An abstract Pipeline with a list of filters and abstract Process method
/// </summary>
/// <typeparam name="T"></typeparam>
public abstract class Pipeline<T>
{
/// <summary>
/// List of filters in the pipeline
/// </summary>
protected readonly List<IFilter<T>> filters = new List<IFilter<T>>();
/// <summary>
/// To Register filter in the pipeline
/// </summary>
/// <param name="filter">A filter object implementing IFilter interface</param>
/// <returns></returns>
public Pipeline<T> Register(IFilter<T> filter)
{
filters.Add(filter);
return this;
}
/// <summary>
/// To start processing on the Pipeline
/// </summary>
/// <param name="input">
/// The input object on which filter processing would execute</param>
/// <returns></returns>
public abstract T Process(T input);
}
AgentSelectionPipeline
(ConcretePipeline
) code:
/// <summary>
/// Pipeline which to select final list of applicable agents
/// </summary>
public class AgentSelectionPipeline : Pipeline<IEnumerable<Agent>>
{
/// <summary>
/// Method which executes the filter on a given Input
/// </summary>
/// <param name="input">Input on which filtering
/// needs to happen as implementing in individual filters</param>
/// <returns></returns>
public override IEnumerable<Agent> Process(IEnumerable<Agent> input)
{
foreach (var filter in filters)
{
input = filter.Execute(input);
}
return input;
}
}
AgentAvailabilityFilter
code (ConcreteFilter
1):
/// <summary>
/// This output of this filter is the list of all available agents
/// </summary>
public class AgentAvailabilityFilter : IFilter<IEnumerable<Agent>>
{
public IEnumerable<Agent> Execute(IEnumerable<Agent> input)
{
if (input == null || input.Count() < 1)
{
return input;
}
return input.Where(agent => agent.PresenceStatus == "Available");
}
}
AgentWorkloadFilter
code (ConcreteFilter
2):
/// <summary>
/// The output of this filter is the list of Agent for which CurrentWorkload is less than MaxWorklod
/// </summary>
public class AgentWorkloadFilter : IFilter<IEnumerable<Agent>>
{
public IEnumerable<Agent> Execute(IEnumerable<Agent> input)
{
if (input == null || input.Count() < 1)
{
return input;
}
return input.Where(agent => agent.CurrentWorkload < agent.MaxWorkload);
}
}
AgentPresenceUpdateDatetimeFilter
code (ConcreteFilter
3):
/// <summary>
/// The output of this filter is the list of
/// all those agents who has StatusUpdateDatetime less than 3 minutes
/// </summary>
public class AgentPresenceUpdateDatetimeFilter : IFilter<IEnumerable<Agent>>
{
public IEnumerable<Agent> Execute(IEnumerable<Agent> input)
{
if (input == null || input.Count() < 1)
{
return input;
}
return input.Where(agent =>
(DateTime.UtcNow - agent.StatusUpdateDatetime).TotalMinutes < 3) ;
}
}
Pipeline
and Filter
, how to use code:
public class Program
{
static void Main(string[] args)
{
//Get the Agents from repository
var agentsStatus = AgentStatusRepository.Agents;
//Construct the Pipeline object
AgentSelectionPipeline agentStatusPipeline = new AgentSelectionPipeline();
//Register the filters to be executed
agentStatusPipeline.Register(new AgentAvailabilityFilter())
.Register(new AgentWorkloadFilter())
.Register(new AgentPresenceUpdateDatetimeFilter());
//Start pipeline processing
var agentsStatus_1 = agentStatusPipeline.Process(agentsStatus);
}
}