Click here to Skip to main content
15,069,075 members
Articles / Programming Languages / C#
Posted 17 Oct 2006


80 bookmarked

PipeStream, a Memory Efficient and Thread-Safe Stream

Rate me:
Please Sign up or sign in to vote.
4.72/5 (23 votes)
10 Nov 2008CPOL4 min read
PipeStream is a thread-safe read/write data stream for use between two threads in a single-producer/single-consumer type problem.

Image 1


.NET provides a fairly user friendly threading mechanism, but lacks a built-in way to easily stream large amounts of data between threads. In .NET 2.0, the Stream.Synchronized method provides a partial solution to this problem by creating a thread-safe stream wrapper. This does not address the memory issues inherent in processing large amounts of data, since a MemoryStream does not dynamically resize as data is read. A classic way of solving this problem is to create a shared data structure, such as a byte array, and implement a locking mechanism to ensure thread safety. This takes time to implement, and can lead to illusive bugs.

The PipeStream solves these problems by abstracting a shared data structure into a Stream interface, making it easy to pipe data between threads.


I built this while developing an audio file transcoder, specifically to convert audio books encoded in *.mp3 to *.aac (or *.m4b) for use on an iPod. Since *.m4b was designed with the intention of retaining its location between sessions, I decided that I would like to merge my existing multi-*.mp3 audio books into single *.m4b files. My first attempt involved a long shell command using several pipes, but this fell through when some of the MP3s were encoded with incompatible parameters. So, I began a second attempt by wrapping the FAAC encoder in a System.Diagnostics.Process, writing to it via the StandardInput stream and reading from the StandardOutput. While FAAC is able to encode straight from MP3, it doesn't have the ability to merge multiple input files into one output. This meant that I would need to create a concatenated stream of individual MP3s as input into FAAC. I decided an easy way to do this would be to decode MP3 into a raw Wave stream, using the wrapped LAME Process, and feed that stream into FAAC's input. This strategy was successful but involved trying to stream data between the two processes, which would optimally be run in separate threads. The MemoryStream could be wrapped in a thread safe shell, but I found that I would run out of memory for larger audio books. After experimenting with other solutions, I tried building this PipeStream, which I hope will be useful for folks with similar scenarios.

For those unfamiliar with pipes, it is simply a means of redirecting the output of one process to the input of another in the command line without using any intermediate data storage. I like to conceptualize it using a familiar idiom - videogames!


Waka waka waka... ahem. So, the light-bike produces a series of power-cells and writes them to its standard output. These are captured and buffered inside the pipe for a waka-man to read via his standard input. The same idea applies to the PipeStream, it only replaces the processes with threads, power-cells with bytes of data, and waka-man with... just kidding, waka-man is the same in both. You get the idea.


Using the Code

In general, use the PipeStream as you would any other stream in situations where large memory transfers between threads is needed. For (a trivial) example:

  • First, create the PipeStream in the spawning class:

    PipeStream mPipeStream; // the shared stream
    public void ReadWriteMultiThreadTests()
        mPipeStream = new PipeStream();
        // create some threads to read and write data using PipeStream
        Thread readThread = new Thread(new ThreadStart(ReadThread));
        Thread writeThread = new Thread(new ThreadStart(WriteThread));
  • Then, write to it in the producer thread...

    private void WriterThread()
        string inputFile = File.ReadToEnd("myFile.txt");
        int writeSize = 1024;
        for (int i = 0; i < str.Length; i += writeSize)
            // select a substring of characters from the input string
            string substring = str.Substring(i, 
                (i + writeSize < str.Length) ? writeSize : str.Length - i);
            sw.Write(substring.ToCharArray(), 0, substring.Length);
  • ... and finally, read the data from the PipeStream in the consumer thread:

    private void ReaderThread()
        char[] buffer = new char[80];
        while (!sr.EndOfStream)
            int readLength = sr.Read(buffer, 0, buffer.Length);
            // do something productive with buffer

I've extended the Stream interface with a few extra properties:

  • MaxBufferLength: Gets or sets the maximum number of bytes to store in the buffer.
  • BlockLastReadBuffer: Gets or sets a value indicating whether to block the last read method before the buffer is empty.

The second property is valuable in scenarios when writing multiple streams to one reader - the final read will not occur until the writers are finished and this property is set to false.

  • When true, Read() will block until it can fill the passed in buffer and count.
  • When false, Read() will not block, returning all the available buffer data.

Points of Interest

Note that the underlying data structure is a Queue<byte>, making it around an order of magnitude less efficient than a MemoryStream. The PipeStream is, therefore, most useful for CPU-bound processes, such as media encoding, but also for write-read-forget situations where transferring a substantial amount of data is required.

Related Articles


  • 2006-10-17 - Version 1.0
  • 2008-10-9 - Version 1.1 - Uses Monitor instead of Manual Reset events for more elegant synchronicity


This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


About the Author

James Kolpack
Software Developer (Senior)
United States United States
No Biography provided

Comments and Discussions

QuestionHow does it know there's no more data? Pin
rmacfadyen10-Dec-16 9:28
Memberrmacfadyen10-Dec-16 9:28 
QuestionDispose Issue Pin
gmav26-May-15 22:07
Membergmav26-May-15 22:07 
QuestionBugfix Pin
Jörgen Sigvardsson9-Dec-14 1:22
MemberJörgen Sigvardsson9-Dec-14 1:22 
AnswerRe: Bugfix Pin
James Kolpack15-Dec-14 7:34
MemberJames Kolpack15-Dec-14 7:34 
GeneralMy vote of 5 Pin
Member 86120914-Mar-13 17:49
MemberMember 86120914-Mar-13 17:49 
GeneralVery useful component! Pin
zumamiki13-Apr-11 3:32
Memberzumamiki13-Apr-11 3:32 
GeneralNow (almost) in the framework! Pin
James Kolpack23-Jun-10 3:55
MemberJames Kolpack23-Jun-10 3:55 
GeneralControl Flush / End Of Writing Pin
CozyRoc14-Jan-09 3:53
MemberCozyRoc14-Jan-09 3:53 
QuestionWhy the Queue class? Pin
supercat910-Nov-08 15:35
Membersupercat910-Nov-08 15:35 
AnswerRe: Why the Queue class? Pin
James Kolpack11-Nov-08 9:21
MemberJames Kolpack11-Nov-08 9:21 
GeneralRe: Why the Queue class? Pin
supercat912-Nov-08 6:57
Membersupercat912-Nov-08 6:57 
GeneralRe: Why the Queue class? Pin
James Kolpack18-Nov-08 13:30
MemberJames Kolpack18-Nov-08 13:30 
GeneralRe: Why the Queue class? Pin
_xenoglyph_30-Nov-08 22:14
Member_xenoglyph_30-Nov-08 22:14 
GeneralError in Write() Pin
KoD6664-Aug-08 5:01
MemberKoD6664-Aug-08 5:01 
GeneralRe: Error in Write() Pin
CozyRoc8-Nov-08 11:36
MemberCozyRoc8-Nov-08 11:36 
GeneralThreading issue Pin
korggy11-Jun-08 7:24
Memberkorggy11-Jun-08 7:24 
GeneralRe: Threading issue Pin
James Kolpack18-Jun-08 4:31
MemberJames Kolpack18-Jun-08 4:31 
GeneralRe: Threading issue Pin
CozyRoc8-Nov-08 11:41
MemberCozyRoc8-Nov-08 11:41 
GeneralRe: Threading issue Pin
James Kolpack9-Nov-08 5:36
MemberJames Kolpack9-Nov-08 5:36 
GeneralRe: Threading issue Pin
supercat910-Nov-08 12:52
Membersupercat910-Nov-08 12:52 
GeneralRe: Threading issue Pin
James Kolpack11-Nov-08 8:48
MemberJames Kolpack11-Nov-08 8:48 
GeneralRe: Threading issue Pin
supercat912-Nov-08 7:04
Membersupercat912-Nov-08 7:04 
GeneralRe: Threading issue Pin
James Kolpack18-Nov-08 13:40
MemberJames Kolpack18-Nov-08 13:40 
GeneralVery useful component Pin
Omar Al Zabir10-Apr-08 20:45
MemberOmar Al Zabir10-Apr-08 20:45 
Generali hate to say it Pin
Ian MacLean6-May-07 15:03
MemberIan MacLean6-May-07 15:03 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.