Click here to Skip to main content
Click here to Skip to main content

Search Keywords - Relationship between Pipes-and-Filters and Decorator design patterns

, 20 Feb 2006 CPOL
Rate this:
Please Sign up or sign in to vote.
An article on the implementation of both Pipes-and-Filters architectural design pattern and Decorator pattern.

Introduction

The article provides an implementation of the simple problem using both Pipes-and-Filters architectural pattern and Decorator pattern, and explores a relationship between them, if any.

Pipes-and-Filters Pattern - An architectural design pattern

Intent

The Pipes and Filters architectural pattern provides a structure for systems, having components that process a stream of data (filters) and connections that transmit data between adjacent components (pipes). This architecture provides reusability, maintainability, and decoupling for the system processes having distinct, easily identifiable, and independent but somehow compatible tasks.

The usage of Pipes and Filters pattern is limited to systems where the order in which filters are processed is strongly determined and sequential in nature. The pattern applies to problems where it is natural to decompose the computation into a collection of semi-independent tasks. In the Pipeline pattern, the semi-independent tasks represent the stages of the pipeline, the structure of the pipeline is static, and the interaction between successive stages is regular and loosely synchronous. A pipeline is a definition of steps/tasks that are executed to perform a business function. Each step may involve reading or writing to data confirming the “pipeline state,” and may or may not access an external service. When invoking an asynchronous service as part of a step, a pipeline can wait until a response is returned (if a response is expected), or proceed to the next step in the pipeline if the response is not required in order to continue processing.

Use the pipeline pattern when:

  • You can specify the sequence of a known/determined set of steps.
  • You do not need to wait for an asynchronous response from each step.
  • You want all downstream components to be able to inspect and act on data that comes from upstream (but not vice versa).

Advantages of the pipeline pattern include:

  • It enforces sequential processing.
  • It is easy to wrap it in an atomic transaction.

Disadvantages of the pipeline pattern include:

  • The pattern may be too simplistic to cover all cases in business logic, especially for service orchestration in which you need to branch the execution of the business logic in complex ways.
  • It does not handle conditional constructs, loops, and other flow control logic as it's mostly sequential in nature.

Background

Filters

The filters are the processing units of the pipeline. A filter may enrich, refine, process, or transform its input data.

  • It may refine the data by concentrating or extracting information from the input data stream and passing only that information to the output stream.
  • It may transform the input data to a new form before passing it to the output stream.
  • It may, of course, do some combination of enrichment, refinement, and transformation.

A filter may be active (the more common case) or passive.

  • An active filter runs as a separate process or thread; it actively pulls data from the input data stream and pushes the transformed data onto the output data stream.
  • A passive filter is activated by either being called:
    • as a function, a pull of the output from the filter.
    • as a procedure, a push of output data into the filter.

Pipes

The pipes are the connectors having the following links between a data source and the first filter, between filters, between the last filter and a data sink. As needed, a pipe synchronizes the active elements that it connects together.

Data source

A data source is an entity (e.g., a file or input device) that provides the input data to the system. It may either actively push data down the pipeline or passively supply data when requested, depending upon the situation.

Data sink

A data sink is an entity that gathers data at the end of a pipeline. It may either actively pull data from the last filter element or it may passively respond when requested by the last filter element.

Implementation Details

We may consider the generic filter as a component that processes data. The IComponent basic interface could be defined as follows:

public interface IComponent<T>
{
    bool Process(T t);
}

Throughout the sample, I used ‘T’ as a generic type parameter. InputStream and OutputStream are interfaces used for the input and the output streams.

public interface InputStream<T> : IEnumerable<T>
{
    bool Available();
    T Read();
    ulong Read(out T[] Data, ulong offset, ulong length);
    void Reset();
    void Skip(ulong offset);
}

public interface OutputStream<T>
{
    void Flush();
    void Write(T data);
    void Write(T[] Data);
    void Write(T[] Data, ulong offset, ulong length);
}

public interface IStreamingControl<T>
{
    InputStream<T> Input
    {
        get; set;
    }
    OutputStream<T> Output
    {
        get; set;
    }

    InputStream<T> FactoryInputStream();
    OutputStream<T> FactoryOutputStream();
}

Now, define the concrete implementation class StreamingControlImpl which implements the IStreamingControl as:

public class StreamingControlImpl<T> : 
       IStreamingControl<T> where T : new()

public interface IComponentStreaming<T> : 
       IComponent<StreamingControlImpl<T>> where T : new()
{
    bool Process(InputStream<T> input, OutputStream<T> output);
}

public abstract class InputSource<T> : 
       StreamingComponentBase<T> where T : new()
{
    public const bool isOutputSink = false;

    public abstract bool Process(OutputStream<T> output);

    public override bool Process(InputStream<T> input, 
                                 OutputStream<T> output)
    {
        foreach (T t in input)
            output.Write(t);

        return Process(output);
    }
}

public abstract class OutputSink<T> : 
       StreamingComponentBase<T> where T : new()
{
    public const bool isOutputSink = true;

    public abstract bool Process(InputStream<T> input);

    public override bool Process(InputStream<T> input, 
                                 OutputStream<T> output)
    {
        return Process(input);
    }
}

We can have two processing types: sequential processing and pipeline processing. Connecting components with asynchronous pipes allows each unit in the chain to operate in its own thread or its own process. When a unit has completed processing one filter, it can send the data to the output stream and immediately start processing another data. It does not have to wait for the subsequent components to read and process the data. This allows multiple data messages to be processed concurrently as they pass through the individual stages. For example, after the first message has been decrypted, it can be passed on to the authentication component. At the same time, the next message can already be decrypted. We call such a configuration a processing pipeline because messages flow through the filters like liquid flows through a pipe. When compared to strictly sequential processing, a processing pipeline can significantly increase system throughput. Here, I only discuss sequential processing.

The pipeline interface is defined as:

public interface IPipeline<T> 
{
    void AddLink(IComponent<T> link);
    bool Process(T t);
}

LinearPipeline is the IPipeline implementation that can process filters sequentially. There are two ways to process the filters in the order defined by the following enumerator:

public enum OPTYPE { AND, OR };

If processing of any filter in the pipeline fails and/or returns false, the processing stops without executing the preceding filters, and the pipeline returns false to the caller program. So the pipeline processing with the AND operator is as follows:

if (_optype == OPTYPE.AND)
{
    result = true;
    foreach (IComponent<T> link in _links)
    {
        if (! link.Process(t))
        {
            result = false;
            break;
        }
    }
}

If the processing of any filter in the pipeline returns true, the pipeline returns true to the caller program. The pipeline continues to process each filter regardless of the result obtained in processing the filters. The pipeline processing with the OR operator is as follows.

if (_optype == OPTYPE.OR)
{
    result = false;
    foreach (IComponent<T> link in _links)
    {
        if (link.Process(t))
        {
            result = true;
            //break;
        }
    }
}

Different filters do different jobs, hence are implemented as different classes. Here is the list of five filters in the pipeline:

  1. LoadFileFilter reads the program text (i.e., source code) from a file (or perhaps a sequence of files) as a stream of characters, and recognizes a sequence of tokens. It is not very efficient to read a character at a time, since file IO operations are slow. Therefore, we can read a file at a time using a StreamReader. The class Tokens is an iterator tokenizer that uses either commas or semicolons and spaces to divide into tokens.
  2. StartWithSearchFilter gets data stream from LoadFileFilter and sinks the words which start with the keyword provided as the parameter.
  3. WordLengthSearchFilter gets data stream from StartWithSearchFilter and sinks the words having length less than or equal to the maximum value provided as the parameter.
  4. PlaindromeSearchFilter gets a data stream from WordLengthSearchFilter and sinks the words that are palindrome in nature.
  5. Output is used to display the results to the user.

The following class describes the implementation of the LoadFileFilter task:

internal class LoadFileFliter : InputSource<Item>
{
    private string _path;

    public LoadFileFliter(string path)
    {
        _path = path;
    }
    public override bool Process(OutputStream<Item> output)
    {
        try
        {
            if (File.Exists(_path))
            {
                string buffer;
                // Create an instance of StreamReader to read from a file.
                // The using statement also closes the StreamReader.
                using (StreamReader sr = new StreamReader(_path))
                {
                    buffer = sr.ReadToEnd();
                }
                
               buffer = buffer.Replace("\r\n", " ");
               Tokens Tokenizer = new Tokens(buffer, new char[] { ' ', ';', ',' });
                foreach (string Token in Tokenizer)
                {
                    if (Token == string.Empty) continue;

                    System.Diagnostics.Debug.Write(Token);
                    output.Write(new Item(Token));
                }
                return true;
            }
            else
                throw new Exception("File could not be read");
        }
        catch (Exception ex)
        {
            System.Diagnostics.Debug.Write(ex.Message);
        }
   
        return false;
        
    }
}

As every filter exposes a very simple interface, it receives messages on the inbound pipe, processes the message, and publishes the results to the outbound pipe. The pipe connects one filter to the next, sending output messages from one filter to the next. Because, all components use the same external interface.

They can be composed into different solutions by connecting the components to different pipes. We can add new filters, omit existing ones, or rearrange them into a new sequence—all without having to change the filters themselves. Here is the major Invoke class that prepares an order of filters into a pipeline and processes it:

public static void Invoke()
{
    LinearPipeline<StreamingControlImpl<Item>> pipeline = 
      new LinearPipeline<StreamingControlImpl<Item>>(/*OPTYPE.OR*/);

    pipeline.AddLink(Factory<Item>.CreateLoadFile(@"c:\a.log"));
    pipeline.AddLink(Factory<Item>.CreateStartWithSearch("wa"));
    pipeline.AddLink(Factory<Item>.CreateWordLengthSearch(9));
    pipeline.AddLink(Factory<Item>.CreatePalindromeSearch());
    pipeline.AddLink(Factory<Item>.CreateOutput());

    StreamingControlImpl<Item> controldata = 
              new StreamingControlImpl<Item>();
    bool done = pipeline.Process(controldata);
    if (done)
        System.Diagnostics.Debug.Write("All filters " + 
                             "processed successfully");

}

While reading a file and tokenizing it to prepare stream data, I want to use the “string” native data type to be passed as the type parameter but using StreamingControlImpl<string> generates a compiler error as the constraint where T : new() on every level of upward hierarchy requires a parameter-less constructor for type “string”. Since string does not have a parameter-less constructor and it is a sealed class, we cannot inherit a class from string. To solve this problem instead of having a is-a relationship, we can implement a new class Item having a has-a relationship and can get the value using the Item.Text property! No big deal:

internal class Item
{
    private string _text = null;
    public string Text
    {
        get { return _text; }
        set { _text = value; }
    }
    public Item()
    {
        _text = string.Empty;
    }
    public Item(string t)
    {
        _text = t;
    }
}

The following class diagram explains the whole concept of the Pipes and Filters architecture:

Liabilities of Pipes and Filters

The pipes-and-filters architectural pattern has the following liabilities [Buschmann]:

  • Sharing state information is expensive or inflexible. The information must be encoded, transmitted, and then decoded.
  • Efficiency gain by parallel processing is often an illusion. The costs of data transfer, synchronization, and context switching may be high. Non-incremental filters, such as the Unix sort, can become the bottleneck of a system.
  • Data transformation overhead. The use of a single data channel between filters often means that much transformation of data must occur, for example, translation of numbers between binary and character formats.
  • Error handling. It is often difficult to detect errors in pipes-and-filters systems. Recovering from errors is even more difficult.

Decorator Pattern - An alternative or related to Pipes-and-Filters

Intent

Attach additional responsibilities to an object dynamically, without using subclassing. The key to a decorator is that a decorator "wraps" the object decorated and looks to a client exactly the same as the object wrapped. This means that the decorator implements the same interface as the object it decorates.

Any message that a client sends to the object goes to the decorator. The decorator may apply some processing and then pass the message it received on to the decorated object. That object probably returns a value (to the decorator) which may again apply some processing to that result, finally sending the (perhaps modified) result to the original client. This decorator story is transparent to the calling client. It just sent a message and got a result.

Implementation Details

For example, suppose that we would like to solve all of the above problems from the Pipes and Filters section by using Decorator instead. We could change the above code arrangement accordingly. So we would create a SearchDecorator as an abstract class that takes an IComponent (filling the purpose of ISearch actually) interface as a parameter in its constructor and will itself implement the IComponent interface. The following diagram describes the concept:

Like Pipes and Filters, we may consider the basic interface as IComponent defining the Process method, that is:

public interface IComponent<T>
{
    IEnumerable<T> Words { get; }
    bool Process();
}

As we don’t have the concept of streams here to carry and transfer data across filters, we may define an iterator Words in IComponent to keep the searched words. SearchDecorator is defined as:

public abstract class SearchDecorator<T> : 
                      IComponent<T> where T: new()
{
    protected IComponent<T> _ancestor;

    public SearchDecorator(IComponent<T> ancestor)
    {
        _ancestor = ancestor;
    }

    public virtual bool Process()
    {
        return _ancestor.Process();
    }
    public virtual IEnumerable<T> Words
    {
        get
        {
            return _ancestor.Words;
        }
    }
}

public class StartWithSearch : SearchDecorator<Item>
{
    private string _keyword;
    private IEnumerable<Item> _words;


    public StartWithSearch(IComponent<Item> ancestor, string keyword)
        : base(ancestor)
    {
        _keyword = keyword;
        _ancestor = ancestor;
    }
    public override bool Process()
    {
        //Call process of ancestor first
        _ancestor.Process();

        System.Diagnostics.Debug.Write(_keyword);
        _words = CreateEnumerator();
        return true;
    }

    IEnumerable<Item> CreateEnumerator()
    {
        foreach (Item Token in _ancestor.Words)
        {
            if (Token.Text == string.Empty) continue;
            if (Token.Text.StartsWith(_keyword))
            {
                System.Diagnostics.Debug.Write(Token.Text);
                yield return Token;
            }
        }
    }
    
    public override IEnumerable<Item> Words
    {
        get
        {
            return _words;
        }
    }
}

The calling program uses the decoration of search in the following ways:

IComponent<Item> S = new FileLoader(".log");
S = new StartWithSearch(S, "ma");
S = new WordLengthSearch(S, 5);
S = new PalindromeSearch(S);
S = new Output(S);
S.Process();

Comparisons

Benefits of Pipes-and-Filters

The pipes-and-filters architectural pattern has the following benefits [Buschmann]:

  • Flexibility by filter exchange. It is easy to exchange one filter element for another with the same interfaces and functionality.
  • Flexibility by recombination. It is not difficult to reconfigure a pipeline to include new filters or perhaps to use the same filters in a different sequence. As we can see here:
    pipeline.AddLink(Factory<Item>.CreateLoadFile(@"c:\a.log"));
    pipeline.AddLink(Factory<Item>.CreateStartWithSearch("wa"));
    pipeline.AddLink(Factory<Item>.CreateWordLengthSearch(9));
    pipeline.AddLink(Factory<Item>.CreatePalindromeSearch());
    pipeline.AddLink(Factory<Item>.CreateOutput());
    pipeline.Process();
  • Reuse of filter elements. The ease of filter recombination encourages filter reuse. Small, active filter elements are normally easy to reuse if the environment makes them easy to connect.
  • Rapid prototyping of pipelines. Flexibility of exchange and recombination, and ease of reuse enables the rapid creation of prototype systems.
  • Efficiency by parallel processing. Since active filters run in separate processes or threads, pipes-and-filters systems can take advantage of a multiprocessor. By implementing a pipeline with AND and OR operators, it may also process SPLIT and JOIN operators.
  • A filter in a pipeline may be a sub-pipeline. The pipeline structure may consist of two pipelines: the main pipeline and the sub-pipeline. For example, the main pipeline contains step 1 to step 5, and the sub-pipeline contains step 3-1, step 3-2, and step 3-3. Particularly, step 3 in the main pipeline has two roles: it is a normal step in the main pipeline, and is also a sub-pipeline which contains sub-steps as:

Benefits of Decorator

The Decorator pattern has the following benefits:

  • Flexibility by concrete decorator exchange. It is easy to exchange a concrete decorator for another.
  • Flexibility by recombination. It is not big deal to reconfigure a calling client to assign a new set of responsibilities or perhaps to use the same responsibilities in a different sequence. As we can see here:
    IComponent<Item> S = new FileLoader(".log");
    S = new StartWithSearch(S, "wa");
    S = new WordLengthSearch(S, 5);
    S = new PalindromeSearch(S);
    S = new Output(S);
    S.Process();
  • Reuse of concrete decorator. The ease of responsibility recombination encourages concrete decorator reuse.
  • Rapid prototyping of pipelines. Flexibility of exchange and recombination and ease of reuse enables the rapid creation of prototype systems.
  • Efficiency by parallel processing. I am not clear here whether different responsibilities can be practically processed into separate processes by using multithreading. I think parallel processing is an extra advantage of Pipes-and-Filters.
  • Responsibilities Implementation is linear. Although the responsibilities implementation is linear and simple, taking into account the nested or sub-pipeline like functionality will cause complications and difficulties in keeping references, and extensibility will also become a headache.

Conclusions

I implemented the same problem using both Pipes-and-Filters architectural pattern and Decorator pattern. The basic idea that encourages me is the dynamic arrangement of filters on one side and dynamic pluggablility of responsibilities (wrappers) on the other side. If I don’t talk about parallel pipeline processing in pipes-and-filters, you can see how pipes-and-filters with sequential processing maps to the decorator pattern exactly. So they may be related patterns in a sense that in Pipes-and-Filters, pipes are stateless and serve as conduits for moving streams of data across multiple filters, while filters are stream modifiers which process the input streams and send the resulting output stream to the pipe of another filter, and in the Decorator pattern we can see decorators as filters.

History

  • First version: February 13, 2006.

References

For more information on Pipes-and-Filters architectural pattern implementation, I recommend the following book:

  • Foundations of Object-Oriented Programming Using .NET 2.0 Patterns. Christian Gross. Apress, 2005, ISBN 1-59059-540-8.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Bashiiui
Architect
Pakistan Pakistan
Basharat Hussain currently works as Solution Architect for the company TEO (http://www.teo-intl.com) in Islamabad, Pakistan.
 
Blog: My Blog Link

Comments and Discussions

 
GeneralLinking Pipelines PinmemberLeslie Sanford7-Aug-06 9:14 
GeneralExcellent PinmemberLeslie Sanford7-Aug-06 6:29 
GeneralGreat article ! Pinmembercautroi26-Jul-06 23:25 
Pipes-and-Filters is pattern commonly used in bussiness layer in enterprise application ! This article supply some good informations about this.

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Terms of Use | Mobile
Web04 | 2.8.141220.1 | Last Updated 20 Feb 2006
Article Copyright 2006 by Bashiiui
Everything else Copyright © CodeProject, 1999-2014
Layout: fixed | fluid