using System;
using System.IO;
using System.Threading;
namespace Pfz.Remoting
{
/// <summary>
/// This class provides buffered write operations, but non-buffered read operations.
/// For TCP/IP operations with NoDelay set to true, it will be bettern than using a normal BufferedStream, because
/// you can do reads and writes at the same time (which causes errors in BufferedStream class, as the buffer is shared)
/// and will also be faster, as TCP/IP already has a read buffer. In fact, this class is only needed because
/// there is no way to control TCP/IP to only send its buffered bytes by a "flush" command.
/// </summary>
public class BufferedWriteStream:
Stream
{
private byte[] _buffer;
private int _bufferPos;
/// <summary>
/// Creates a new BufferedStream over the given baseStream and with the specified bufferSize.
/// </summary>
public BufferedWriteStream(Stream baseStream, int bufferSize)
{
if (baseStream == null)
throw new ArgumentNullException("baseStream");
if (bufferSize < 256)
throw new ArgumentException("bufferSize must be at least 256 bytes.", "bufferSize");
_stream = baseStream;
_buffer = new byte[bufferSize];
}
/// <summary>
/// Disposes the baseStream and nullifies the buffer.
/// </summary>
protected override void Dispose(bool disposing)
{
if (disposing)
{
var stream = _stream;
if (stream != null)
{
_stream = null;
stream.Dispose();
}
_buffer = null;
}
base.Dispose(disposing);
}
private Stream _stream;
/// <summary>
/// Gets the BaseStream to which the data will be written.
/// </summary>
public Stream BaseStream
{
get
{
return _stream;
}
}
/// <summary>
/// Gets or sets the BufferSize used by this BufferedWriteStream.
/// </summary>
public int BufferSize
{
get
{
return _buffer.Length;
}
set
{
if (value < 256)
throw new ArgumentException("BufferSize can't be less than 256.");
Flush();
_buffer = new byte[value];
}
}
/// <summary>
/// Gets the BaseStream.CanRead.
/// </summary>
public override bool CanRead
{
get { return _stream.CanRead; }
}
/// <summary>
/// Always returns false.
/// </summary>
public override bool CanSeek
{
get { return false; }
}
/// <summary>
/// Gets the BaseStream.CanWrite.
/// </summary>
public override bool CanWrite
{
get { return _stream.CanWrite; }
}
/// <summary>
/// Flushes any pending write and also calls the BaseStream.Flush.
/// </summary>
public override void Flush()
{
if (_bufferPos > 0)
{
_stream.Write(_buffer, 0, _bufferPos);
_bufferPos = 0;
}
_stream.Flush();
}
/// <summary>
/// Gets the BaseStream.Length, but this is likely to throw an exception.
/// </summary>
public override long Length
{
get { return _stream.Length; }
}
/// <summary>
/// Throws a NotSupportedException.
/// </summary>
public override long Position
{
get
{
throw new NotSupportedException();
}
set
{
throw new NotSupportedException();
}
}
/// <summary>
/// Redirects the call to BaseStream.Read.
/// </summary>
public override int Read(byte[] buffer, int offset, int count)
{
return _stream.Read(buffer, offset, count);
}
/// <summary>
/// Throws a NotSupportedException.
/// </summary>
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
/// <summary>
/// Calls the BaseStream.SetLength, but this is likely to throw an exception.
/// </summary>
public override void SetLength(long value)
{
_stream.SetLength(value);
}
/// <summary>
/// Tries to buffer a write. If the buffer is filled or if the data to be sent is greater than the buffer,
/// Flushes this object, and writes the new data directly.
/// </summary>
public override void Write(byte[] buffer, int offset, int count)
{
if (buffer == null)
throw new ArgumentNullException("buffer");
if (offset < 0)
throw new ArgumentException("offset can't be less than zero.", "offset");
if (count == 0)
return;
if (count < 0)
throw new ArgumentException("count can't be less than zero.", "count");
if (offset + count > buffer.Length)
throw new ArgumentException("offset + count are greater than buffer length.");
int bufferSize = _buffer.Length;
if (count >= bufferSize)
{
if (_bufferPos > 0)
{
_stream.Write(_buffer, 0, _bufferPos);
_bufferPos = 0;
}
_stream.Write(buffer, offset, count);
return;
}
int remaining = bufferSize - _bufferPos;
int copySize = count;
if (copySize > remaining)
copySize = remaining;
Buffer.BlockCopy(buffer, offset, _buffer, _bufferPos, copySize);
_bufferPos += copySize;
if (_bufferPos == bufferSize)
{
_stream.Write(_buffer, 0, bufferSize);
_bufferPos = 0;
int diff = count - copySize;
if (diff > 0)
{
Buffer.BlockCopy(buffer, offset + copySize, _buffer, 0, diff);
_bufferPos = diff;
}
}
}
/// <summary>
/// Returns BaseStream.CanTimeout
/// </summary>
public override bool CanTimeout
{
get
{
return _stream.CanTimeout;
}
}
/// <summary>
/// Gets or sets BaseStream.ReadTimeout
/// </summary>
public override int ReadTimeout
{
get
{
return _stream.ReadTimeout;
}
set
{
_stream.ReadTimeout = value;
}
}
/// <summary>
/// Gets or sets BaseStream.WriteTimeout
/// </summary>
public override int WriteTimeout
{
get
{
return _stream.WriteTimeout;
}
set
{
_stream.WriteTimeout = value;
}
}
}
}