.NET provides a fairly user friendly threading mechanism, but lacks a built-in way to easily stream large amounts of data between threads. In .NET 2.0, the
Stream.Synchronized method provides a partial solution to this problem by creating a thread-safe stream wrapper. This does not address the memory issues inherent in processing large amounts of data, since a
MemoryStream does not dynamically resize as data is read. A classic way of solving this problem is to create a shared data structure, such as a byte array, and implement a locking mechanism to ensure thread safety. This takes time to implement, and can lead to illusive bugs.
PipeStream solves these problems by abstracting a shared data structure into a
Stream interface, making it easy to pipe data between threads.
I built this while developing an audio file transcoder, specifically to convert audio books encoded in *.mp3 to *.aac (or *.m4b) for use on an iPod. Since *.m4b was designed with the intention of retaining its location between sessions, I decided that I would like to merge my existing multi-*.mp3 audio books into single *.m4b files. My first attempt involved a long shell command using several pipes, but this fell through when some of the MP3s were encoded with incompatible parameters. So, I began a second attempt by wrapping the FAAC encoder in a
System.Diagnostics.Process, writing to it via the
StandardInput stream and reading from the
StandardOutput. While FAAC is able to encode straight from MP3, it doesn't have the ability to merge multiple input files into one output. This meant that I would need to create a concatenated stream of individual MP3s as input into FAAC. I decided an easy way to do this would be to decode MP3 into a raw Wave stream, using the wrapped LAME Process, and feed that stream into FAAC's input. This strategy was successful but involved trying to stream data between the two processes, which would optimally be run in separate threads. The
MemoryStream could be wrapped in a thread safe shell, but I found that I would run out of memory for larger audio books. After experimenting with other solutions, I tried building this
PipeStream, which I hope will be useful for folks with similar scenarios.
For those unfamiliar with pipes, it is simply a means of redirecting the output of one process to the input of another in the command line without using any intermediate data storage. I like to conceptualize it using a familiar idiom - videogames!
Waka waka waka... ahem. So, the light-bike produces a series of power-cells and writes them to its standard output. These are captured and buffered inside the pipe for a waka-man to read via his standard input. The same idea applies to the
PipeStream, it only replaces the processes with threads, power-cells with bytes of data, and waka-man with... just kidding, waka-man is the same in both. You get the idea.
Using the Code
In general, use the
PipeStream as you would any other stream in situations where large memory transfers between threads is needed. For (a trivial) example:
First, create the
PipeStream in the spawning class:
public void ReadWriteMultiThreadTests()
mPipeStream = new PipeStream();
Thread readThread = new Thread(new ThreadStart(ReadThread));
Thread writeThread = new Thread(new ThreadStart(WriteThread));
Then, write to it in the producer thread...
private void WriterThread()
string inputFile = File.ReadToEnd("myFile.txt");
int writeSize = 1024;
for (int i = 0; i < str.Length; i += writeSize)
string substring = str.Substring(i,
(i + writeSize < str.Length) ? writeSize : str.Length - i);
sw.Write(substring.ToCharArray(), 0, substring.Length);
... and finally, read the data from the
PipeStream in the consumer thread:
private void ReaderThread()
char buffer = new char;
int readLength = sr.Read(buffer, 0, buffer.Length);
I've extended the
Stream interface with a few extra properties:
MaxBufferLength: Gets or sets the maximum number of bytes to store in the buffer.
BlockLastReadBuffer: Gets or sets a value indicating whether to block the last read method before the buffer is empty.
The second property is valuable in scenarios when writing multiple streams to one reader - the final read will not occur until the writers are finished and this property is set to
Read() will block until it can fill the passed in buffer and count.
Read() will not block, returning all the available buffer data.
Points of Interest
Note that the underlying data structure is a
Queue<byte>, making it around an order of magnitude less efficient than a
PipeStream is, therefore, most useful for CPU-bound processes, such as media encoding, but also for write-read-forget situations where transferring a substantial amount of data is required.
- 2006-10-17 - Version 1.0
- 2008-10-9 - Version 1.1 - Uses
Monitor instead of Manual Reset events for more elegant synchronicity