5,445,109 members and growing! (14,044 online)
Email Password   helpLost your password?
General Programming » Threads, Processes & IPC » Threading     Intermediate License: The Code Project Open License (CPOL)

PipeStream, a Memory Effecient and Thread-Safe Stream

By James Kolpack

PipeStream is a thread-safe read/write data stream for use between two threads in a single-producer/single-consumer type problem.
C# 2.0, C#, Windows, .NET, .NET 2.0VS2005, Visual Studio, Dev

Posted: 17 Oct 2006
Updated: 17 Oct 2006
Views: 17,071
Bookmarked: 29 times
Announcements
Want a new Job?



Search    
Advanced Search
Sitemap
9 votes for this Article.
Popularity: 4.43 Rating: 4.64 out of 5
0 votes, 0.0%
1
0 votes, 0.0%
2
0 votes, 0.0%
3
4 votes, 44.4%
4
5 votes, 55.6%
5

Introduction

.NET provides a fairly user friendly threading mechanism, but lacks a built-in way to easily stream large amounts of data between threads. In .NET 2.0, the Stream.Synchronized method provides a partial solution to this problem by creating a thread-safe stream wrapper. This does not address the memory issues inherent in processing large amounts of data, since a MemoryStream does not dynamically resize as data is read. A classic way of solving this problem is to create a shared data structure, such as a byte array, and implement a locking mechanism to ensure thread safety. This takes time to implement, and can lead to illusive bugs.

The PipeStream solves these problems by abstracting a shared data structure into a Stream interface, making it easy to pipe data between threads.

Background

I built this while developing an audio file transcoder, specifically to convert audio books encoded in .mp3 to .aac (or .m4b) for use on an iPod. Since .m4b was designed with the intention of retaining its location between sessions, I decided that I would like to merge my existing multi-.mp3 audio books into single .m4b files. My first attempt involved a long shell command using several pipes, but this fell through when some of the MP3 were encoded with incompatible parameters. So, I began a second attempt by wrapping the FAAC encoder in a System.Diagnostics.Process, writing to it via the StandardInput stream and reading from the StandardOutput. While FAAC is able to encode straight from MP3, it doesn't have the ability to merge multiple input files into one output. This meant that I would need to create a concatenated stream of individual MP3s as input into FAAC. I decided an easy way to do this would be to decode MP3 into a raw Wave stream, using the wrapped LAME Process, and feed that stream into FAAC's input. This strategy was successful but involved trying to stream data between the two processes, which would optimally be run in separate threads. The MemoryStream could be wrapped in a thread safe shell, but I found that I would run out of memory for larger audio books. After experimenting with other solutions, I tried building this PipeStream, which I hope is useful for folks with similar scenarios.

For those unfamiliar with pipes, it is simply a means of redirecting the output of one process to the input of another in the command line without using any intermediate data storage. I like to conceptualize it using a familiar idiom - videogames!

PacStream

Waka waka waka... ahem. So, the light-bike produces a series of power-cells and writes them to its standard output. These are captured and buffered inside the pipe for a waka-man to read via his standard input. The same idea applies to the PipeStream, it only replaces the processes with threads, power-cells with bytes of data, and waka-man with... just kidding, waka-man is the same in both. You get the idea.

PacStream

Using the code

In general, use the PipeStream as you would any other stream in situations where large memory transfers between threads is needed. For (a trivial) example:

  • First, create the PipeStream in the spawning class.
  • PipeStream mPipeStream; // the shared stream
     
    public void ReadWriteMultiThreadTests()
    {
        mPipeStream = new PipeStream();
    
        // create some threads to read and write data using  PipeStream
        Thread readThread = new Thread(new ThreadStart(ReadThread));
        Thread writeThread = new Thread(new ThreadStart(WriteThread));
        readThread.Start();
        writeThread.Start();
    
        writeThread.Join();
        readThread.Join();
    }
  • Then, write to it in the producer thread...
  • private void WriterThread()
    {
        string inputFile = File.ReadToEnd("myFile.txt");
        int writeSize = 1024;
        for (int i = 0; i < str.Length; i += writeSize)
        {
            // select a substring of characters from the input string
            string substring = str.Substring(i, 
                (i + writeSize < str.Length) ? writeSize : str.Length - i);
            sw.Write(substring.ToCharArray(), 0, substring.Length);
        }
    }
  • ... and finally, read the data from the PipeStream in the consumer thread.
  • private void ReaderThread()
    {
        char[] buffer = new char[80];
        while (!sr.EndOfStream)
        {
            int readLength = sr.Read(buffer, 0, buffer.Length);
            // do something producetive with buffer
        }
    }

I've extended the Stream interface with a few extra properties:

  • MaxBufferLength: Gets or sets the maximum number of bytes to store in the buffer.
  • BlockLastReadBuffer: Gets or sets a value indicating whether to block the last read method before the buffer is empty.

The second property is valuable in scenarios when writing multiple streams to one reader - the final read will not occur until the writers are finished and this property is set to false.

  • When true, Read() will block until it can fill the passed in buffer and count.
  • When false, Read() will not block, returning all the available buffer data.

Points of interest

Note that the underlying data structure is a Queue<byte>, making it around an order of magnitude less efficient than a MemoryStream. The PipeStream is, therefore, most useful for CPU-bound processes, such as media encoding, but also for write-read-forget situations where transferring a substantial amount of data is required.

Related articles

History

  • 2006-10-17 - Version 1.0.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

About the Author

James Kolpack



Occupation: Web Developer
Location: United States United States

Other popular Threads, Processes & IPC articles:

Article Top
Sign Up to vote for this article
You must Sign In to use this message board.
FAQ FAQ Noise ToleranceSearch Search Messages 
 Layout  Per page   
 Msgs 1 to 11 of 11 (Total in Forum: 11) (Refresh)FirstPrevNext
Subject  Author Date 
GeneralError in Write()memberKoD6666:01 4 Aug '08  
GeneralThreading issuememberkorggy8:24 11 Jun '08  
GeneralRe: Threading issuememberJames Kolpack5:31 18 Jun '08  
GeneralVery useful componentmemberOmar Al Zabir21:45 10 Apr '08  
Generali hate to say itmemberIan MacLean16:03 6 May '07  
GeneralRe: i hate to say itmemberJames Kolpack8:47 5 Jan '08  
GeneralGood Job, Same Ideamemberchen.hendrawan19:43 17 Apr '07  
QuestionLocking?memberkapil bhavsar1:29 13 Apr '07  
AnswerRe: Locking?memberJames Kolpack8:44 5 Jan '08  
GeneralGreat articlememberDaniel Vaughan6:39 26 Nov '06  
GeneralBackground - mp3 to aac conversionmemberdimzon1:26 24 Oct '06  

General General    News News    Question Question    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

PermaLink | Privacy | Terms of Use
Last Updated: 17 Oct 2006
Editor: Smitha Vijayan
Copyright 2006 by James Kolpack
Everything else Copyright © CodeProject, 1999-2008
Web09 | Advertise on the Code Project