Click here to Skip to main content
15,860,972 members
Articles / Programming Languages / C#

Memory Stream Multiplexer–write and read from many threads simultaneously

Rate me:
Please Sign up or sign in to vote.
4.76/5 (14 votes)
22 Jul 2012CPOL3 min read 96.4K   1K   52   13
MemoryStreamMultiplexer is a MemoryStream like buffer manager where one thread can write and many threads can read from it simultaneously. It supports blocking reads, so that reader threads can call .Read() and wait for some data to be written. Handy for loading data in one thread that is consumed c

Download MemoryStreamMultiplexer.zip 

Image 1

Introduction

Here’s an implementation of MemoryStream like buffer manager where one thread can write and many threads can read simultaneously. Each reading thread gets its own reader and can read from the shared stream on its own without blocking write operation or other parallel read operations. It supports blocking Read call so that reader threads can call Read(…) and wait until some data is available, exactly the same way you would expect a Stream to behave. You can use this to read content from network or file in one thread and then get it read by one or more threads simultaneously. Readers do not block writing. As a result, both read and write happens concurrently. Handy for building http proxy where you are downloading a certain file and you have multiple clients asking for the same file at the same time. You can download it in one thread and let one or more client threads read from the same buffer exactly at the same time. You can also use this to read same file on disk from multiple clients at the same time. You can also use this to implement a server side cache where the same buffer is read by multiple clients at the same time.  

How does it work  

First you create a MemoryStreamMultiplexer object that holds the shared buffer. It has a Write(…) method to write byte[] to the shared buffer. Then you call GetReader() to get a MemoryStreamReader created that can read the content from the shared buffer. You can call GetReader() from a different thread so that you can read and write simultaneously. Whenever you call Write(…) it signals all the

MemoryStreamReader 
instances that content is now available to read. The readers that were waiting on a Read(…) call gets the signal and reads from the shared buffer.

Let’s first have a walkthrough of the

MemoryStreamMultiplexer 
code.

C#
public class MemoryStreamMultiplexer : IDisposable
{
  private ManualResetEvent[] _dataReadyEvents = new ManualResetEvent[255];
  private ManualResetEvent[] _finishedEvents = new ManualResetEvent[255];
  private int _readerCount = 0;

  private bool _finished;
  private int _Length;
  private List<byte[]> _Buffer = new List<byte[]>();

  public int Length { get { return _Length; } }

  public void Write(byte[] data, int pos, int length)
  {
    byte[] newBuf = new byte[length];
    Buffer.BlockCopy(data, pos, newBuf, 0, length);
    lock (_Buffer)
    {
      _Buffer.Add(newBuf);
      _Length += length;
    }
    Set();
  }

  private void Set()
  {
    for (int i = 0; i < _readerCount; i++)
      _dataReadyEvents[i].Set();
  }

  public void Finish()
  {
    for (int i = 0; i < _readerCount; i++)
      _finishedEvents[i].Set();
    _finished = true;
  }

  public MemoryStreamReader GetReader()
  {
    ManualResetEvent dataReady = new ManualResetEvent(_finished);
    ManualResetEvent finished = new ManualResetEvent(_finished);
    lock (_dataReadyEvents)
    {
      _dataReadyEvents[_readerCount] = dataReady;
      _finishedEvents[_readerCount] = finished;
      _readerCount++;
    }      
    return new MemoryStreamReader(_Buffer, dataReady, finished);
  }

  private bool disposed = false;
  public void Dispose()
  {
    if (!disposed)
    {
      Finish();

      for (int i = 0; i < _readerCount; i++)
      {
        _dataReadyEvents[i].Dispose();
        _finishedEvents[i].Dispose();
      }
      _readerCount = 0;
      
      disposed = true;
    }
  }
} 

It maintains a list of ManualResetEvent that is used to signal the reader. Each reader gets two ManualResetEvent passed to it. One to signal whenever a Write() happens, so that it can unblock the Read call made by the reader threads and let them process the newly available content. The other one to signal the Readers that writing has finished so that it can stop expecting more content from the buffer.

Next is the MemoryStreamReader where most of the complicated code lies. 

C#
public class MemoryStreamReader : Stream, IDisposable
{
  private int _position;
  private int _bufferIndex;
  private int _bufferPos;
  private List<byte[]> _bufferList;

  private ManualResetEvent[] _waitHandles;
  private ManualResetEvent _dataReady;
  private ManualResetEvent _finished;

  public MemoryStreamReader(List<byte[]> bufferList, 
ManualResetEvent dataReady, 
ManualResetEvent finished)
  {
    _waitHandles = new ManualResetEvent[] { dataReady, finished };
    _bufferList = bufferList;
    _dataReady = dataReady;
    _finished = finished;
    _bufferPos = 0;
    _bufferIndex = 0;
    _position = 0;
  }
  
  public override int Read(byte[] buffer, int offset, int count)
  {       
    if (_bufferIndex < _bufferList.Count)
    {
      return ReadInternal(buffer, offset, count);
    }
    else
    {
      _dataReady.Reset();
      // Wait for either data ready event of the finished event.
      int index = WaitHandle.WaitAny(_waitHandles, TimeSpan.FromSeconds(30), false);
      // either of the event fired. see if there's more data to read.
      if (_bufferIndex < _bufferList.Count)
        return ReadInternal(buffer, offset, count);
      else
        return 0;   // No more bytes will be available. Finished.
    }
  }

  private int ReadInternal(byte[] buffer, int offset, int count)
  {
    byte[] currentBuffer = _bufferList[_bufferIndex];

    if (_bufferPos + count <= currentBuffer.Length)
    {
      // the current buffer holds the same or more bytes than what is asked for
      // So, give what was asked.
      Buffer.BlockCopy(currentBuffer, _bufferPos, buffer, offset, count);

      _bufferPos += count;
      _position += count;
      return count;
    }
    else
    {
      // current buffer does not have the necessary bytes. deliver whatever is available.
      if (_bufferPos < currentBuffer.Length)
      {
        int remainingBytes = currentBuffer.Length - _bufferPos;
        Buffer.BlockCopy(currentBuffer, _bufferPos, buffer, offset, remainingBytes);

        _position += remainingBytes;
        _bufferIndex++;
        _bufferPos = 0;

        // Try to read from the next buffer in the list and deliver
        // the undelivered bytes. The Read call might block and wait for 
        // remaining bytes to appear. 
        return remainingBytes + 
          this.Read(buffer, offset + remainingBytes, count - remainingBytes);
      }
      else
      {
        // Already all bytes from currnet buffer has been delivered. Try next buffer.
        _bufferIndex++;
        _bufferPos = 0;

        // There may not be next buffer and thus we will have to wait.
        return this.Read(buffer, offset, count);          
      }
    }
  } 

The code is heavily documented, so I am not going to repeat the description here.

How to use the MemoryStreamMultiplexer 

Here’s a test code that shows you how you can write to the same buffer from one thread and then read it from multiple threads.  

<pre class="brush: csharp" name="code" lang="cs">static void Main(string[] args)
{
  StringBuilder builder = new StringBuilder();
  
  byte[] buffer = new byte[100 * 1024];
  
  for (int i = 0; i < buffer.Length; i ++)
    buffer[i] = (byte)(i % 256);

  using (MemoryStreamMultiplexer m = new MemoryStreamMultiplexer())
  {
    Thread writer = new Thread(() =>
      {
        builder.AppendLine("Start writer...");
        Random r = new Random((int)DateTime.Now.Ticks);
        int bytesRemaining = buffer.Length;
        
        int pos = 0;
        while (bytesRemaining > 0)
        {
          // Randomly write bytes to simulate flaky network streaming
          int bytesToWrite = r.Next(0xFF, 0x1000);
          int count = bytesRemaining < bytesToWrite ? bytesRemaining : bytesToWrite;
          m.Write(buffer, pos, count);
          pos += count;
          bytesRemaining -= count;

          builder.AppendLine("Write: " + bytesToWrite);
          Thread.Sleep(2);
        }
        
        m.Finish();
        
        builder.AppendLine("Finished writing: " + buffer.Length);
      });
    writer.Start();

    Thread[] readers = new Thread[10];
    for (int j = 0; j < readers.Length; j++)
    {
      readers[j] = new Thread(() =>
        {
          int threadId = Thread.CurrentThread.ManagedThreadId;
          builder.AppendLine("Start reader:" + threadId);

          int count;
          int bytesRead = 0;
          int bufferPos = 0;
          byte[] chunk = new byte[0x1000];

          using (MemoryStreamReader reader = m.GetReader())
          {
            while ((count = reader.Read(chunk, 0, 0x1000)) > 0)
            {
              builder.AppendLine("Read " + threadId + ":" + count);
              bytesRead += count;

              // Verify we have got the right bytes in the chunk 
              for (int c = 0; c < count; c++)
                Debug.Assert(buffer[bufferPos++] == chunk[c]);
            }
          }
          builder.AppendLine("End reader: " + threadId + ":" + bytesRead);
        });

      readers[j].Start();
    }

    writer.Join();
    for (int j = 0; j < readers.Length; j++)
      readers[j].Join();

    // Test a synchronous read when the full buffer is fully populated        
    using (MemoryStreamReader reader = m.GetReader())
    {
      int readCount;
      int totalReadCount = 0;
      byte[] readBuffer = new byte[0x1000];
      while ((readCount = reader.Read(readBuffer, 0, 0x1000)) > 0)
      {
        builder.AppendLine("Sync Read: " + readCount);
        totalReadCount += readCount;
      }
      builder.AppendLine("Total Sync Read: " + totalReadCount);
    }
  }

  builder.AppendLine("========= Finished =========");

  Console.WriteLine(builder.ToString());
  Console.ReadLine();
} 

I have tested it thoroughly on a Quad Core PC to make sure parallel reads really happen and no thread overlaps on each other. I made sure the number of locks hold are also minimal. You can see parallel Write and Read happening from the Console output:

Start writer...
Start reader: 10
Start reader: 11
Write: 2704
Start reader: 12
Write: 937
Start reader: 13
Start reader: 14
Read 10: 4096
Read 11: 4096
Read 14: 4096
Read 13: 4096
Read 12: 4096
Write: 2534
Start reader: 15
Read 15: 4096
Write: 1206
Start reader: 16
Read 16: 4096
Read 11: 4096
Read 13: 4096
Read 16: 4096
Read 14: 4096
Read 12: 4096
Read 10: 4096
Start reader: 17
Read 17: 4096
Read 17: 4096
Read 15: 4096
Start reader: 19
Read 19: 4096
Read 19: 4096
Start reader: 18
Read 18: 4096
Read 18: 4096
Write: 1474 

The above console output shows you that both read and write happening concurrently.

Performance testing the library 

Here’s the Visual Studio 2010 Profiler report. It shows the most expensive code is GetReader only and there’s no other function that comes anywhere close to it. This is a good indication that the implementation is good enough.

 Image 2

Even in the GetReader function, the most expensive line of code is creating the MemoryStreamReader:

Image 3

When you do the Concurrency analysis to see which thread is doing what, it shows that the reader threads read available content as soon as the writer thread writes to the shared buffer. There’s no delay in reader threads getting the signal and reading the recently added content.

Image 4

 The green bars on the threads show that as soon as the writer thread (Thread 7784) signals (the yellow bars), the reader threads execute and pickup the data. There’s just one thread 8524 which seems to struggle picking up the signal for some reason. But rest of the threads pickup the signal and read the recently added bytes within 0.02ms on average.

Enjoy the code.  

Image 5 

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Architect BT, UK (ex British Telecom)
United Kingdom United Kingdom

Comments and Discussions

 
Bug[My vote of 1] m? Pin
areht11-Jan-18 11:52
areht11-Jan-18 11:52 
QuestionUsing a Stream as an Input ? Pin
Jean-Philippe Encausse18-Mar-15 4:40
Jean-Philippe Encausse18-Mar-15 4:40 
Questionhow to implement Pin
irfanark7-Oct-13 22:06
irfanark7-Oct-13 22:06 
QuestionReading large amounts of data can result in null reference exception Pin
Anton Averin22-Sep-13 18:46
Anton Averin22-Sep-13 18:46 
GeneralMy vote of 5 Pin
arhoads7625-Jul-12 6:46
arhoads7625-Jul-12 6:46 
QuestionInheritance Pin
ShotgunWedding8-Jul-12 2:54
ShotgunWedding8-Jul-12 2:54 
QuestionTo me, what this code does is very similar to what ReaderWriterLock does Pin
Tecfield20-Mar-12 3:15
Tecfield20-Mar-12 3:15 
QuestionI didn't like the implementation. Pin
Paulo Zemek12-Mar-12 8:56
mvaPaulo Zemek12-Mar-12 8:56 
AnswerRe: I didn't like the implementation. Pin
Omar Al Zabir12-Mar-12 23:48
Omar Al Zabir12-Mar-12 23:48 
GeneralRe: I didn't like the implementation. Pin
Paulo Zemek13-Mar-12 3:56
mvaPaulo Zemek13-Mar-12 3:56 
QuestionQuick query (note I have not read article...yet) Pin
Sacha Barber12-Mar-12 7:17
Sacha Barber12-Mar-12 7:17 
AnswerRe: Quick query (note I have not read article...yet) Pin
Omar Al Zabir12-Mar-12 23:40
Omar Al Zabir12-Mar-12 23:40 
AnswerRe: Quick query (note I have not read article...yet) Pin
Paulo Zemek13-Mar-12 6:36
mvaPaulo Zemek13-Mar-12 6:36 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.