 |
|
 |
A very useful and working component while waiting for use .Net 4 framework parallel library...
Thanks!
|
|
|
|
 |
|
 |
The parallel extensions library in .NET 4 introduces concurrent collection objects which enable transferring data between threads. In the [ParallelExtensionsExtras] exists the TransferStream class which obviates this PipeStream. If you're using .NET 4, check it out!
|
|
|
|
 |
|
 |
James,
I would like to propose additional flag. There are situations where third-party code is calling Flush, but the writing to the stream is not yet over. It will be beneficial to have a flag, which controls if the flush is indeed called or ignored until this flag is set to true.
Regards,
Ivan
|
|
|
|
 |
|
 |
Is there any particular reason you use the Queue(of Byte) class to hold the data? Since the data is all bytes I would think an array sized to the maximum buffer limit would be just as effective, and would avoid the need to process bytes one at a time. When attempting to write data, the maximum number of bytes allowed in an attempt is the minimum of: (1) the number of bytes between the queue write pointer and the end of the queue, (2) the number of bytes between the queue write pointer and the queue read pointer (if the latter is after the former), or (3) the number of bytes requested by the calling program. Similar requirements exist on the read. If a write cannot be satisfied in one attempt but at least one byte was written, repeat the attempt with the remaining data. If nothing was written (but data remains) wait until there's room to write something else.
|
|
|
|
 |
|
 |
As I understand, the idea is to allocate an array of the maximum buffer size at the time of instantiation and keep write and read pointers for the current "fresh" data locations inside the array? This has the benefit of speedier read/write operations and a cost of greater average-case memory consumption and greater implementation-complexity. I'd say the implementation is trickier since it must be able to wrap-around when the reads/writes get to the end of the array.
For why I choose a Queue<>: a pipe is conceptually a queue - data moving in one end and coming out the other. For the problem I was trying to solve, the speed of the read/operations wasn't critical - keeping the memory in check is more important. It was also beneficial to keep the implementation simple to demonstrate the idea.
As far as speed, you might also consider storing the data in blocks - such as is done in a filesystem. Maybe with a Queue<byte[]>. This will reduce the number of calls to the expensive Queue.Enqueue/Dequeue (relative to Array.Copy) by a factor equal to the block size. The implementation gets a little tricky here too, since you'll get non-full blocks when length of data given to Read/Write does not divide evenly by the block length...
|
|
|
|
 |
|
 |
Every Object in .net has a 16-byte overhead associated with it. I don't know whether a Queue(of Byte) is smart enough to avoid boxing each byte in an Object, but even if it doesn't I don't see how a Queue saves on average-case storage requirements unless the demand never grows to within 50% of the maximum size.
The code required to handle an array is really not complicated if one assumes a single reader thread and a single writer thread (or uses a lock for readers and a lock for writers). To process an operation of up to 'count' bytes starting at byte 'offset' in the supplied array, compute as 'n' the minimum of:
-1- The number of bytes of data or free space remaining in the queue
-2- The number of bytes between the buffer pointer and the end of the queue array
-3- The number of bytes requested.
If that number is zero, return the number of bytes processed so far. Otherwise, copy 'n' bytes to/from the queue array to the passed-in array, add 'n' to the buffer pointer and the passed-in offset, subtract 'n' from the passed-in size, and add or subtract 'n' to/from the number of bytes of data in the queue. If the buffer pointer points to the end of the array, then point it to the start and repeat the whole procedure.
Because every object has a 16 byte overhead, keeping the data as incoming blocks using Queue() would be space-efficient only if the blocks are generally somewhat large (30+ bytes), and would be time-efficient only if the reader expects to receive the same size blocks as were sent. For UDP packets, both conditions may often be met. For a TCP stream, neither condition may be met.
|
|
|
|
 |
|
|
 |
|
 |
The Queue class isn't quite as terrible as you might think. I'm doing winsock API work and I was concerned that Queue wasn't good enough so I wrote my own thread safe circular buffer in optimized unsafe C# using unmanaged memory and pointers. My own implementation is faster than Queue<byte>, but when I compared it to Queue<byte[]> it lost horribly. (My own class will Enqueue byte arrays or byte* + len) Queue<byte[]> is around 10 times faster than my class (I benchmarked Queue + Dequeue), so doing it in blocks may be a good idea.
|
|
|
|
 |
|
 |
In Your code Read() method throws an exception when offset is not 0.
Write() method accepts non-zero offset but while queue is filled from array this offset is not
used.
Solution
1. offset must be added to count:
for (int i = offset; i < offset + count; i++)
2. offset can be used while indexing buffer:
for (int i = 0; i < count; i++)
{
mBuffer.Enqueue(buffer[offset + i]);
}
|
|
|
|
 |
|
 |
I have encountered the same limitation and I agree it is fine to remove it.
Regards,
Ivan
|
|
|
|
 |
|
 |
I believe I've located a possible threading issue with your class.
In Read(), you have the following logic (I've added line numbers):
1. while ("more bytes needed")
{
2. mWriteEvent.Reset(); // turn off an existing write signal
3. mReadEvent.Set(); // signal any waiting reads, preventing deadlock
4. mWriteEvent.WaitOne(); // wait until a write occurs
}
In Write() you have the corresponding logic:
// queue up the buffer data
5. for (int i = offset; i < count; i++)
{
6. mBuffer.Enqueue(buffer[i]);
}
7. mWriteEvent.Set(); // signal that write has occured
The problem occurs if thread #1 executes line 7 while thread #2 is preparing to execute line 2. Event mWriteEvent is set by line 7, then immediately reset by line 2, then line 4 blocks forever waiting for mWriteEvent to be set, which never happens.
This problem reared its ugly head in a set of 170 unit tests I was running against my code, appearing only once out of maybe 1,000 tests ... but it did happen. I'll study the code and try to come up with a solution.
|
|
|
|
 |
|
 |
Good find - I'm submitting an update to the article/source code which will use Monitor.Wait() and Monitor.Pulse() instead of ManualResetEvents.
For now, I believe it is safe to remove the mWriteEvent.Reset() and mReadEvent.Reset() lines - they seem superfluous after reviewing how they're used.
|
|
|
|
 |
|
 |
James,
Where is the update?
Regards,
Ivan
|
|
|
|
 |
|
 |
Hi Ivan,
I've just submitted it for update with CodeProject - it will now use Monitor.Pulse for synchronization. I've also fixed the issue with Write() that KoD666 reported...
Thanks!
James
|
|
|
|
 |
|
 |
What happens if the logic were changed to something like:
while (!ready_to_read())
{
read_wait.WaitOne();
read_wait.Reset();
}
write_wait.Set();
write_wait.Set();
if (!ready_to_read())
read_wait.Reset();
The write routine will have similar logic, but swapping 'read' and 'write'. The above will not be threadsafe if there are multiple readers or multiple writers, but should work fine for the single-reader single-writer case. The final wait_read.Reset() allows the WaitHandle to be used by a WaitAll() within external code, if desired.
The only good approach I've found to avoid trouble in the multi-reader/multi-writer approach is to create a new WaitHandle whenever it's necessary to wait (set the old one and never use it again). If the wait code makes a copy of the current WaitHandle, checks for ready_to_read, and then--if not ready--waits on the copy, the code will get to proceed if data has been written even if there have been other intervening reads.
|
|
|
|
 |
|
 |
Sometime after I wrote this article I read the Threading in C#[^] articles and reworked the synchronization to use Monitor.Pulse() and .Wait(). The resulting code is more understandable:
lock (mBuffer)
{
while (!ReadAvailable(count))
Monitor.Wait(mBuffer);
Monitor.Pulse(mBuffer); }
lock (mBuffer)
{
while (Length >= mMaxBufferLength)
Monitor.Wait(mBuffer);
Monitor.Pulse(mBuffer); }
But yes, this project isn't meant for any multi-reader/writer, unless your desire was to have randomly interleaved data in your stream
|
|
|
|
 |
|
 |
If there are multiple data streams from which a thread will need to receive data, wait handles can provide a more scalable approach than monitor objects, since it's possible for each stream to supply a wait handle that will be set as soon as there is data. The application can then use WaitAny() to pause the thread until at least one queue has data. As for interleaving of data, it's not useful to have byte-level data get arbitrarily interleaved, but when dealing with larger chunks of data such arbitrary interleaving is fine. For example, many of my applications have a scrolling message window; those go in a lock-free thread-safe queue (that particular queue implementation tosses out excessive data when writing and performs updates in thread-safe sequence, so queue writes never block).
|
|
|
|
 |
|
 |
I've also got some forms applications that could use a good scrolling text box. I like the idea of adding messages line-by-line and removing them as the buffer is filled. I was thinking that adding this functionality on top of TextBox or RichTextBox might be the way to go, but now, perhaps it would be better to use ListBox. Do you have any suggestions?
|
|
|
|
 |
|
 |
This has been very useful to me. Thank you very much for sharing.
Regards,
Omar AL Zabir
Visual C# MVP
|
|
|
|
 |
|
 |
but the title to this article is kinda misleading. when i opened it, i thought it would be something about real pipes (named & anonymous) for interprocess communication.
still, cool article
|
|
|
|
 |
|
|
 |
|
 |
I wonder how can I have exactly the same idea with you about the need of something like PipeStream. I even think using Queue too as the storage
So I reinventing your wheel.
|
|
|
|
 |
|
 |
Do i need to lock or use monitor or Something other to obtain Synchronization or the Read and Write are treated differently .....
|
|
|
|
 |
|
 |
Sorry for the delay! No, you shouldn't need to synchronize when using read and write on different threads - it uses reset events internally to guarantee safety. By all means - if you find fault in the threading logic, please reply!
|
|
|
|
 |
|
 |
Great article! Loved the humor.
Cheers.
|
|
|
|
 |