Simple Pipeline Implementation in C#






4.54/5 (11 votes)
A single threaded, sequential implementation for the pipes and filters pattern in C#
Introduction
Pipes and filters is a very famous design and architectural pattern. Wikipedia has a detailed explanation of what is a software pipeline. I was searching for a simple implementation that can be used in a training session, but most of the implementations available in the internet were advanced with multi threading and complex input/output. I wrote this implementation during a lunch session and decided to publish it here.
Background
If you want to understand this pattern from a real world example, just imagine the assembly line of a car. At one end, the auto parts are fed into the assembly line and then at each stage, a human/robot performs some operation. The output of each stage is the input to the next stage. After all the operations are completed, a car is delivered at the other end. Each processing element is responsible for a particular task and assumes only about its input and output.
In software you would have already used this when you wrote command like:
c:\>dir | more
Here, you are passing the output of "dir
" command as the input to the "more
" command. You may wonder why can't the "dir
" command provide pagination on its own? Why have "more
" command? But in that case, you cannot do something like:
c:\>type file.txt | more
Here, the entire file's content is written to the "more
" commands input and you can view the file page by page. If "more
" is not available, then each command has to implement the pagination logic again. This is a waste of effort and introduces lot of redundancy. The pipes and filters pattern solves this problem by breaking the solution into independent steps. Each solution step only assumes about the input and output. Until this assumption is met, a user can connect the solution steps in different fashion and solve complex problems.
Before jumping into the code, let us make the terminology clear. A pipeline is a solution constructed by connecting a set of pipeline elements. A pipeline element is a solution step that takes a specific input, processes the data and produces a specific output.
As always, when learning a concept, start with a simple example. I am going to construct a pipeline based on passive pipeline elements with single input/output.
Using the Code
IPipelineElement
All the pipeline elements implement this interface. Using this interface, a pipeline element can be connected to other pipeline element, checked for completion and process the input content.
Each pipeline element needs to read/write from a common data structure for its input/output. I used the PipeStream implementation by James Kolpack. But there are other alternatives like blocking collections available as a part of .NET 4.
/// <summary>
/// Interface for a pipeline element.
/// </summary>
interface IPipelineElement
{
/// <summary>
/// Set the input for this pipeline element
/// </summary>
/// <param name="inputStream">The stream</param>
void SetInput(PipeStream inputStream);
/// <summary>
/// The output of this element will be connected to the input of the next element.
/// </summary>
/// <param name="next">The next element to be connected</param>
void Connect(IPipelineElement next);
/// <summary>
/// The pipeline elements processing function. Implement your processing here.
/// </summary>
void Process();
/// <summary>
/// Is the processing complete with respect to this particular pipeline element.
/// </summary>
bool IsComplete
{
get;
}
}
Pipeline
This
pipeline
class implements a sequential pipeline
. All the pipeline
elements are passive with a single input/output.
/// <summary>
/// This pipeline class implements a sequential pipeline. All the pipeline elements are
/// passive. The pipeline elements don't have a processing thread of their own.
/// </summary>
class Pipeline
{
/// <summary>
/// List of pipeline elements
/// </summary>
List<IPipelineElement> pipeline = new List<IPipelineElement>();
/// <summary>
/// Adds the element to the pipeline and links them.
/// </summary>
/// <param name="anElement">The element to be added.</param>
public void Add(IPipelineElement anElement)
{
pipeline.Add(anElement);
if (pipeline.Count > 1)
pipeline[pipeline.Count - 2].Connect(pipeline[pipeline.Count - 1]);
}
/// <summary>
/// This is the main processing method. It runs the pipeline until all the
/// elements declare completion.
/// </summary>
public void Run()
{
bool jobCompleted = false;
// Run the pipeline until the job is not completed
while (!jobCompleted)
{
jobCompleted = true;
for (int i = 0; i < pipeline.Count; i++)
{
pipeline[i].Process();
jobCompleted = jobCompleted && pipeline[i].IsComplete;
}
}
}
}
I have implemented 3 pipeline
elements. The code is straight forward so I am just listing the usage.
FileReader
- This element reads the content of the file and writes that to the output. The file path is provided as a command line argument.LineCounter
- This element reads the content of the input, counts the number of lines and dumps that to the output in the end.ConsoleWriter
- This element takes the content of the input, converts that tostring
using utf8 and dumps that to the output console.
/// <summary>
/// This element does not take any input. It reads the file based on the command line argument and
/// dumps that to the output stream.
/// </summary>
class FileReader :IPipelineElement
{
string filePath;
FileStream fileStream;
byte[] buffer = new byte[1024];
PipeStream outputStream = new PipeStreamPkg.PipeStream();
public FileReader()
{
IsComplete = false;
string[] args = Environment.GetCommandLineArgs();
filePath = args[1];
fileStream = new FileStream(filePath, FileMode.Open);
}
public void SetInput(PipeStream inputStream)
{
throw new InvalidOperationException("No input for this element");
}
public void Connect(IPipelineElement next)
{
next.SetInput(outputStream);
}
/// <summary>
/// Reads a chunk of the file and dumps that to the output
/// </summary>
public void Process()
{
if(fileStream == null)
{
return;
}
int bytesRead = fileStream.Read(buffer, 0, buffer.Length);
IsComplete = bytesRead <= 0;
if (bytesRead > 0)
{
outputStream.Write(buffer, 0, bytesRead);
}
else
{
fileStream.Dispose();
fileStream = null;
}
}
public bool IsComplete
{
get;
private set;
}
}
/// <summary>
/// This element takes the content of the input, counts the number of lines and
/// dumps that to the output.
/// </summary>
class LinesCounter :IPipelineElement
{
PipeStreamPkg.PipeStream input;
PipeStreamPkg.PipeStream output = new PipeStreamPkg.PipeStream();
public LinesCounter()
{
IsComplete = false;
}
public void SetInput(PipeStreamPkg.PipeStream inputStream)
{
input = inputStream;
}
public void Connect(IPipelineElement next)
{
next.SetInput(output);
}
int numberOfLine = 0;
public void Process()
{
if (input.Length <= 0)
{
IsComplete = true;
byte[] strbuf = new UTF8Encoding().GetBytes
(string.Format("Number of lines = {0}", numberOfLine));
output.Write(strbuf, 0, strbuf.Length);
return;
}
byte[] buffer = new byte[1024];
int bytesRead = 0;
while(input.Length > 0)
{
bytesRead = input.Read(buffer, 0,
(buffer.Length < input.Length) ? buffer.Length : (int)input.Length);
for (int i = 0; i < bytesRead; i++)
{
if (buffer[i] == '\n')
{
numberOfLine++;
}
}
}
}
public bool IsComplete
{
get;
private set;
}
}
/// <summary>
/// This element takes the content of the input, converts that to string using utf8 and
/// dumps that to the output console.
/// </summary>
class ConsoleWriter : IPipelineElement
{
PipeStream inputStream;
public ConsoleWriter()
{
IsComplete = false;
}
public void SetInput(PipeStream input)
{
inputStream = input;
}
public void Connect(IPipelineElement next)
{
throw new InvalidOperationException("No output from this element");
}
public void Process()
{
if (inputStream.Length <= 0)
{
IsComplete = true;
return;
}
byte[] buffer = new byte[1024];
int bytesRead = 0;
UTF8Encoding temp = new UTF8Encoding(true);
while (inputStream.Length > 0)
{
bytesRead = inputStream.Read(buffer, 0,
(buffer.Length < inputStream.Length) ?
buffer.Length : (int)inputStream.Length);
Console.WriteLine(temp.GetString(buffer, 0, bytesRead));
}
}
public bool IsComplete
{
get;
private set;
}
}
}
Now, out of these 3 elements, I constructed 2 pipelines:
- Dump the file content to the console.
- Count the lines and dump that to console.
/// <summary>
/// This pipeline dumps the file to the console.
/// </summary>
static void DumpFile()
{
// Just read and dump the file to console
Pipeline p = new Pipeline();
p.Add(new FileReader());
p.Add(new ConsoleWriter());
p.Run();
}
/// <summary>
/// This pipeline counts the number of lines.
/// </summary>
static void CountLines()
{
// Count the number of lines
Pipeline p = new Pipeline();
p.Add(new FileReader());
p.Add(new LinesCounter());
p.Add(new ConsoleWriter());
p.Run();
}
Points of Interest
During implementation, there are lot of variations possible depending on the need. For example:
- The
pipeline
element can be an active element with its own thread for processing. This way, each element processes the data on its own. No external stimulus is needed. - The
pipeline
element can take multiple input and produce multiple outputs. This is useful in case of multimedia applications where the audio and video has to be processed in parallel and played in synchronization. - The
pipeline
is provided as a configuration to the program or based on the need thepipeline
is constructed on the fly.
I chose the simplest one for this article. Passive, single input/output & static pipeline
.
References
History
- 17th November, 2014 - Initial publication
- 18th November, 2014 - Somehow, my source code zip was not visible in the article. So, I added the remaining source code.
- 20th November, 2014 - Included the PipeStream.cs in the project. In the original version, it was only linked from another project.