Memory Stream Multiplexer–write and read from many threads simultaneously






4.76/5 (14 votes)
MemoryStreamMultiplexer is a MemoryStream like buffer manager where one thread can write and many threads can read from it simultaneously. It supports blocking reads, so that reader threads can call .Read() and wait for some data to be written. Handy for loading data in one thread that is consumed c
Download MemoryStreamMultiplexer.zip
Introduction
Here’s an implementation of MemoryStream
like buffer manager where
one thread can write and many threads can read simultaneously. Each reading
thread gets its own reader and can read from the shared stream on its own
without blocking write operation or other parallel read operations. It supports
blocking Read call so that reader threads can call Read(…)
and wait
until some data is available, exactly the same way you would expect a
Stream
to behave. You can use this to read content from network or
file in one thread and then get it read by one or more threads simultaneously. Readers do not block writing. As a result, both read and write happens concurrently.
Handy for building http proxy where you are downloading a certain file and you
have multiple clients asking for the same file at the same time. You can
download it in one thread and let one or more client threads read from the same buffer
exactly at the same time. You can also use this to read same file on disk from
multiple clients at the same time. You can also use this to implement a server
side cache where the same buffer is read by multiple clients at the same time.
How does it work
First you create a MemoryStreamMultiplexer
object that holds the
shared buffer. It has a Write(…)
method to write
byte[]
to the shared buffer. Then you call GetReader()
to get a MemoryStreamReader
created that can read the content from
the shared buffer. You can call GetReader()
from a different thread
so that you can read and write simultaneously. Whenever you call
Write(…)
it signals all the MemoryStreamReader
instances that content is now available to read. The readers that were
waiting on a Read(…)
call gets the signal and reads from the shared
buffer.
Let’s first have a walkthrough of the MemoryStreamMultiplexer
code.
public class MemoryStreamMultiplexer : IDisposable
{
private ManualResetEvent[] _dataReadyEvents = new ManualResetEvent[255];
private ManualResetEvent[] _finishedEvents = new ManualResetEvent[255];
private int _readerCount = 0;
private bool _finished;
private int _Length;
private List<byte[]> _Buffer = new List<byte[]>();
public int Length { get { return _Length; } }
public void Write(byte[] data, int pos, int length)
{
byte[] newBuf = new byte[length];
Buffer.BlockCopy(data, pos, newBuf, 0, length);
lock (_Buffer)
{
_Buffer.Add(newBuf);
_Length += length;
}
Set();
}
private void Set()
{
for (int i = 0; i < _readerCount; i++)
_dataReadyEvents[i].Set();
}
public void Finish()
{
for (int i = 0; i < _readerCount; i++)
_finishedEvents[i].Set();
_finished = true;
}
public MemoryStreamReader GetReader()
{
ManualResetEvent dataReady = new ManualResetEvent(_finished);
ManualResetEvent finished = new ManualResetEvent(_finished);
lock (_dataReadyEvents)
{
_dataReadyEvents[_readerCount] = dataReady;
_finishedEvents[_readerCount] = finished;
_readerCount++;
}
return new MemoryStreamReader(_Buffer, dataReady, finished);
}
private bool disposed = false;
public void Dispose()
{
if (!disposed)
{
Finish();
for (int i = 0; i < _readerCount; i++)
{
_dataReadyEvents[i].Dispose();
_finishedEvents[i].Dispose();
}
_readerCount = 0;
disposed = true;
}
}
}
It maintains a list of ManualResetEvent
that is used to signal
the reader. Each reader gets two ManualResetEvent
passed to it. One
to signal whenever a Write()
happens, so that it can unblock the
Read
call made by the reader threads and let them process the newly
available content. The other one to signal the Readers that writing has finished
so that it can stop expecting more content from the buffer.
Next is the MemoryStreamReader
where most of the complicated
code lies.
public class MemoryStreamReader : Stream, IDisposable
{
private int _position;
private int _bufferIndex;
private int _bufferPos;
private List<byte[]> _bufferList;
private ManualResetEvent[] _waitHandles;
private ManualResetEvent _dataReady;
private ManualResetEvent _finished;
public MemoryStreamReader(List<byte[]> bufferList,
ManualResetEvent dataReady,
ManualResetEvent finished)
{
_waitHandles = new ManualResetEvent[] { dataReady, finished };
_bufferList = bufferList;
_dataReady = dataReady;
_finished = finished;
_bufferPos = 0;
_bufferIndex = 0;
_position = 0;
}
public override int Read(byte[] buffer, int offset, int count)
{
if (_bufferIndex < _bufferList.Count)
{
return ReadInternal(buffer, offset, count);
}
else
{
_dataReady.Reset();
// Wait for either data ready event of the finished event.
int index = WaitHandle.WaitAny(_waitHandles, TimeSpan.FromSeconds(30), false);
// either of the event fired. see if there's more data to read.
if (_bufferIndex < _bufferList.Count)
return ReadInternal(buffer, offset, count);
else
return 0; // No more bytes will be available. Finished.
}
}
private int ReadInternal(byte[] buffer, int offset, int count)
{
byte[] currentBuffer = _bufferList[_bufferIndex];
if (_bufferPos + count <= currentBuffer.Length)
{
// the current buffer holds the same or more bytes than what is asked for
// So, give what was asked.
Buffer.BlockCopy(currentBuffer, _bufferPos, buffer, offset, count);
_bufferPos += count;
_position += count;
return count;
}
else
{
// current buffer does not have the necessary bytes. deliver whatever is available.
if (_bufferPos < currentBuffer.Length)
{
int remainingBytes = currentBuffer.Length - _bufferPos;
Buffer.BlockCopy(currentBuffer, _bufferPos, buffer, offset, remainingBytes);
_position += remainingBytes;
_bufferIndex++;
_bufferPos = 0;
// Try to read from the next buffer in the list and deliver
// the undelivered bytes. The Read call might block and wait for
// remaining bytes to appear.
return remainingBytes +
this.Read(buffer, offset + remainingBytes, count - remainingBytes);
}
else
{
// Already all bytes from currnet buffer has been delivered. Try next buffer.
_bufferIndex++;
_bufferPos = 0;
// There may not be next buffer and thus we will have to wait.
return this.Read(buffer, offset, count);
}
}
}
The code is heavily documented, so I am not going to repeat the description here.
How to use the MemoryStreamMultiplexer
Here’s a test code that shows you how you can write to the same buffer from one thread and then read it from multiple threads.
static void Main(string[] args) { StringBuilder builder = new StringBuilder(); byte[] buffer = new byte[100 * 1024]; for (int i = 0; i < buffer.Length; i ++) buffer[i] = (byte)(i % 256); using (MemoryStreamMultiplexer m = new MemoryStreamMultiplexer()) { Thread writer = new Thread(() => { builder.AppendLine("Start writer..."); Random r = new Random((int)DateTime.Now.Ticks); int bytesRemaining = buffer.Length; int pos = 0; while (bytesRemaining > 0) { // Randomly write bytes to simulate flaky network streaming int bytesToWrite = r.Next(0xFF, 0x1000); int count = bytesRemaining < bytesToWrite ? bytesRemaining : bytesToWrite; m.Write(buffer, pos, count); pos += count; bytesRemaining -= count; builder.AppendLine("Write: " + bytesToWrite); Thread.Sleep(2); } m.Finish(); builder.AppendLine("Finished writing: " + buffer.Length); }); writer.Start(); Thread[] readers = new Thread[10]; for (int j = 0; j < readers.Length; j++) { readers[j] = new Thread(() => { int threadId = Thread.CurrentThread.ManagedThreadId; builder.AppendLine("Start reader:" + threadId); int count; int bytesRead = 0; int bufferPos = 0; byte[] chunk = new byte[0x1000]; using (MemoryStreamReader reader = m.GetReader()) { while ((count = reader.Read(chunk, 0, 0x1000)) > 0) { builder.AppendLine("Read " + threadId + ":" + count); bytesRead += count; // Verify we have got the right bytes in the chunk for (int c = 0; c < count; c++) Debug.Assert(buffer[bufferPos++] == chunk[c]); } } builder.AppendLine("End reader: " + threadId + ":" + bytesRead); }); readers[j].Start(); } writer.Join(); for (int j = 0; j < readers.Length; j++) readers[j].Join(); // Test a synchronous read when the full buffer is fully populated using (MemoryStreamReader reader = m.GetReader()) { int readCount; int totalReadCount = 0; byte[] readBuffer = new byte[0x1000]; while ((readCount = reader.Read(readBuffer, 0, 0x1000)) > 0) { builder.AppendLine("Sync Read: " + readCount); totalReadCount += readCount; } builder.AppendLine("Total Sync Read: " + totalReadCount); } } builder.AppendLine("========= Finished ========="); Console.WriteLine(builder.ToString()); Console.ReadLine(); }
I have tested it thoroughly on a Quad Core PC to make sure parallel reads really happen and no thread overlaps on each other. I made sure the number of locks hold are also minimal. You can see parallel Write and Read happening from the Console output:
Start writer...
Start reader: 10
Start reader: 11
Write: 2704
Start reader: 12
Write: 937
Start reader: 13
Start reader: 14
Read 10: 4096
Read 11: 4096
Read 14: 4096
Read 13: 4096
Read 12: 4096
Write: 2534
Start reader: 15
Read 15: 4096
Write: 1206
Start reader: 16
Read 16: 4096
Read 11: 4096
Read 13: 4096
Read 16: 4096
Read 14: 4096
Read 12: 4096
Read 10: 4096
Start reader: 17
Read 17: 4096
Read 17: 4096
Read 15: 4096
Start reader: 19
Read 19: 4096
Read 19: 4096
Start reader: 18
Read 18: 4096
Read 18: 4096
Write: 1474
The above console output shows you that both read and write happening concurrently.
Performance testing the library
Here’s the Visual Studio 2010 Profiler report. It shows the most expensive code
is GetReader
only and there’s no other function that comes anywhere
close to it. This is a good indication that the implementation is good enough.
Even in the GetReader
function, the most expensive line of code is
creating the MemoryStreamReader
:
When you do the Concurrency analysis to see which thread is doing what, it shows that the reader threads read available content as soon as the writer thread writes to the shared buffer. There’s no delay in reader threads getting the signal and reading the recently added content.
The green bars on the threads show that as soon as the writer thread (Thread 7784) signals (the yellow bars), the reader threads execute and pickup the data. There’s just one thread 8524 which seems to struggle picking up the signal for some reason. But rest of the threads pickup the signal and read the recently added bytes within 0.02ms on average.
Enjoy the code.