Download MemoryStreamMultiplexer.zip
Introduction
Here’s an implementation of MemoryStream
like buffer manager where
one thread can write and many threads can read simultaneously. Each reading
thread gets its own reader and can read from the shared stream on its own
without blocking write operation or other parallel read operations. It supports
blocking Read call so that reader threads can call Read(…)
and wait
until some data is available, exactly the same way you would expect a
Stream
to behave. You can use this to read content from network or
file in one thread and then get it read by one or more threads simultaneously. Readers do not block writing. As a result, both read and write happens concurrently.
Handy for building http proxy where you are downloading a certain file and you
have multiple clients asking for the same file at the same time. You can
download it in one thread and let one or more client threads read from the same buffer
exactly at the same time. You can also use this to read same file on disk from
multiple clients at the same time. You can also use this to implement a server
side cache where the same buffer is read by multiple clients at the same time.
How does it work
First you create a MemoryStreamMultiplexer
object that holds the
shared buffer. It has a Write(…)
method to write
byte[]
to the shared buffer. Then you call GetReader()
to get a MemoryStreamReader
created that can read the content from
the shared buffer. You can call GetReader()
from a different thread
so that you can read and write simultaneously. Whenever you call
Write(…)
it signals all the
MemoryStreamReader
instances that content is now available to read. The readers that were
waiting on a
Read(…)
call gets the signal and reads from the shared
buffer.
Let’s first have a walkthrough of the
MemoryStreamMultiplexer
code.
public class MemoryStreamMultiplexer : IDisposable
{
private ManualResetEvent[] _dataReadyEvents = new ManualResetEvent[255];
private ManualResetEvent[] _finishedEvents = new ManualResetEvent[255];
private int _readerCount = 0;
private bool _finished;
private int _Length;
private List<byte[]> _Buffer = new List<byte[]>();
public int Length { get { return _Length; } }
public void Write(byte[] data, int pos, int length)
{
byte[] newBuf = new byte[length];
Buffer.BlockCopy(data, pos, newBuf, 0, length);
lock (_Buffer)
{
_Buffer.Add(newBuf);
_Length += length;
}
Set();
}
private void Set()
{
for (int i = 0; i < _readerCount; i++)
_dataReadyEvents[i].Set();
}
public void Finish()
{
for (int i = 0; i < _readerCount; i++)
_finishedEvents[i].Set();
_finished = true;
}
public MemoryStreamReader GetReader()
{
ManualResetEvent dataReady = new ManualResetEvent(_finished);
ManualResetEvent finished = new ManualResetEvent(_finished);
lock (_dataReadyEvents)
{
_dataReadyEvents[_readerCount] = dataReady;
_finishedEvents[_readerCount] = finished;
_readerCount++;
}
return new MemoryStreamReader(_Buffer, dataReady, finished);
}
private bool disposed = false;
public void Dispose()
{
if (!disposed)
{
Finish();
for (int i = 0; i < _readerCount; i++)
{
_dataReadyEvents[i].Dispose();
_finishedEvents[i].Dispose();
}
_readerCount = 0;
disposed = true;
}
}
}
It maintains a list of ManualResetEvent
that is used to signal
the reader. Each reader gets two ManualResetEvent
passed to it. One
to signal whenever a Write()
happens, so that it can unblock the
Read
call made by the reader threads and let them process the newly
available content. The other one to signal the Readers that writing has finished
so that it can stop expecting more content from the buffer.
Next is the MemoryStreamReader
where most of the complicated
code lies.
public class MemoryStreamReader : Stream, IDisposable
{
private int _position;
private int _bufferIndex;
private int _bufferPos;
private List<byte[]> _bufferList;
private ManualResetEvent[] _waitHandles;
private ManualResetEvent _dataReady;
private ManualResetEvent _finished;
public MemoryStreamReader(List<byte[]> bufferList,
ManualResetEvent dataReady,
ManualResetEvent finished)
{
_waitHandles = new ManualResetEvent[] { dataReady, finished };
_bufferList = bufferList;
_dataReady = dataReady;
_finished = finished;
_bufferPos = 0;
_bufferIndex = 0;
_position = 0;
}
public override int Read(byte[] buffer, int offset, int count)
{
if (_bufferIndex < _bufferList.Count)
{
return ReadInternal(buffer, offset, count);
}
else
{
_dataReady.Reset();
int index = WaitHandle.WaitAny(_waitHandles, TimeSpan.FromSeconds(30), false);
if (_bufferIndex < _bufferList.Count)
return ReadInternal(buffer, offset, count);
else
return 0;
}
}
private int ReadInternal(byte[] buffer, int offset, int count)
{
byte[] currentBuffer = _bufferList[_bufferIndex];
if (_bufferPos + count <= currentBuffer.Length)
{
Buffer.BlockCopy(currentBuffer, _bufferPos, buffer, offset, count);
_bufferPos += count;
_position += count;
return count;
}
else
{
if (_bufferPos < currentBuffer.Length)
{
int remainingBytes = currentBuffer.Length - _bufferPos;
Buffer.BlockCopy(currentBuffer, _bufferPos, buffer, offset, remainingBytes);
_position += remainingBytes;
_bufferIndex++;
_bufferPos = 0;
return remainingBytes +
this.Read(buffer, offset + remainingBytes, count - remainingBytes);
}
else
{
_bufferIndex++;
_bufferPos = 0;
return this.Read(buffer, offset, count);
}
}
}
The code is heavily documented, so I am not going to repeat the description
here.
How to use the MemoryStreamMultiplexer
Here’s a test code that shows you how you can write to the same buffer from
one thread and then read it from multiple threads.
<pre class="brush: csharp" name="code" lang="cs">static void Main(string[] args)
{
StringBuilder builder = new StringBuilder();
byte[] buffer = new byte[100 * 1024];
for (int i = 0; i < buffer.Length; i ++)
buffer[i] = (byte)(i % 256);
using (MemoryStreamMultiplexer m = new MemoryStreamMultiplexer())
{
Thread writer = new Thread(() =>
{
builder.AppendLine("Start writer...");
Random r = new Random((int)DateTime.Now.Ticks);
int bytesRemaining = buffer.Length;
int pos = 0;
while (bytesRemaining > 0)
{
int bytesToWrite = r.Next(0xFF, 0x1000);
int count = bytesRemaining < bytesToWrite ? bytesRemaining : bytesToWrite;
m.Write(buffer, pos, count);
pos += count;
bytesRemaining -= count;
builder.AppendLine("Write: " + bytesToWrite);
Thread.Sleep(2);
}
m.Finish();
builder.AppendLine("Finished writing: " + buffer.Length);
});
writer.Start();
Thread[] readers = new Thread[10];
for (int j = 0; j < readers.Length; j++)
{
readers[j] = new Thread(() =>
{
int threadId = Thread.CurrentThread.ManagedThreadId;
builder.AppendLine("Start reader:" + threadId);
int count;
int bytesRead = 0;
int bufferPos = 0;
byte[] chunk = new byte[0x1000];
using (MemoryStreamReader reader = m.GetReader())
{
while ((count = reader.Read(chunk, 0, 0x1000)) > 0)
{
builder.AppendLine("Read " + threadId + ":" + count);
bytesRead += count;
for (int c = 0; c < count; c++)
Debug.Assert(buffer[bufferPos++] == chunk[c]);
}
}
builder.AppendLine("End reader: " + threadId + ":" + bytesRead);
});
readers[j].Start();
}
writer.Join();
for (int j = 0; j < readers.Length; j++)
readers[j].Join();
using (MemoryStreamReader reader = m.GetReader())
{
int readCount;
int totalReadCount = 0;
byte[] readBuffer = new byte[0x1000];
while ((readCount = reader.Read(readBuffer, 0, 0x1000)) > 0)
{
builder.AppendLine("Sync Read: " + readCount);
totalReadCount += readCount;
}
builder.AppendLine("Total Sync Read: " + totalReadCount);
}
}
builder.AppendLine("========= Finished =========");
Console.WriteLine(builder.ToString());
Console.ReadLine();
}
I have tested it thoroughly on a Quad Core PC to make sure parallel reads really
happen and no thread overlaps on each other. I made sure the number of locks
hold are also minimal. You can see parallel Write and Read happening from the
Console output:
Start writer...
Start reader: 10
Start reader: 11
Write: 2704
Start reader: 12
Write: 937
Start reader: 13
Start reader: 14
Read 10: 4096
Read 11: 4096
Read 14: 4096
Read 13: 4096
Read 12: 4096
Write: 2534
Start reader: 15
Read 15: 4096
Write: 1206
Start reader: 16
Read 16: 4096
Read 11: 4096
Read 13: 4096
Read 16: 4096
Read 14: 4096
Read 12: 4096
Read 10: 4096
Start reader: 17
Read 17: 4096
Read 17: 4096
Read 15: 4096
Start reader: 19
Read 19: 4096
Read 19: 4096
Start reader: 18
Read 18: 4096
Read 18: 4096
Write: 1474
The above console output shows you that both read and write happening concurrently.
Performance testing the library
Here’s the Visual Studio 2010 Profiler report. It shows the most expensive code
is GetReader
only and there’s no other function that comes anywhere
close to it. This is a good indication that the implementation is good enough.
Even in the GetReader
function, the most expensive line of code is
creating the MemoryStreamReader
:
When you do the Concurrency analysis to see which thread is doing what, it shows
that the reader threads read available content as soon as the writer thread
writes to the shared buffer. There’s no delay in reader threads getting the
signal and reading the recently added content.
The green bars on the threads show that as soon as the writer thread (Thread
7784) signals (the yellow bars), the reader threads execute and pickup the data.
There’s just one thread 8524 which seems to struggle picking up the signal for
some reason. But rest of the threads pickup the signal and read the recently
added bytes within 0.02ms on average.
Enjoy the code.