//*****************************************************************************
// RCF - Remote Call Framework
// Copyright (c) 2005. All rights reserved.
// Developed by Jarl Lindrud.
// Contact: jlindrud@hotmail.com .
//*****************************************************************************
#include <RCF/ZlibCompressionFilter.hpp>
#include "zlib.h"
#include <RCF/Tools.hpp>
namespace RCF {
class ZlibCompressionReadFilter
{
public:
ZlibCompressionReadFilter(ZlibCompressionFilter &filter, int bufferSize);
~ZlibCompressionReadFilter();
void read(char *buffer, std::size_t bufferLen);
void onReadCompleted(std::size_t bytesTransferred, int error);
private:
void resetDecompressionState();
void decompress();
ZlibCompressionFilter & filter;
z_stream d_stream_;
std::vector<char> mBuffer;
char * preBuffer;
std::size_t preBufferLen;
char * preBufferOrig;
std::size_t preBufferLenOrig;
char * postBuffer;
std::size_t postBufferLen;
int zerr_;
int err;
bool decompressionStateInited;
};
class ZlibCompressionWriteFilter
{
public:
ZlibCompressionWriteFilter(ZlibCompressionFilter &filter, int bufferSize, bool stateful);
~ZlibCompressionWriteFilter();
void write(const char *buffer, std::size_t bufferLen);
void onWriteCompleted(std::size_t bytesTransferred, int error);
private:
void resetCompressionState();
void compress();
ZlibCompressionFilter & filter;
z_stream c_stream_;
std::vector<char> mBuffer;
char * preBuffer;
std::size_t preBufferLen;
char * preBufferOrig;
std::size_t preBufferLenOrig;
char * postBuffer;
std::size_t postBufferLen;
int zerr_;
int err;
bool compressionStateInited;
const bool stateful;
};
ZlibCompressionReadFilter::ZlibCompressionReadFilter(ZlibCompressionFilter &filter, int bufferSize) :
filter(filter),
preBuffer(),
preBufferLen(),
preBufferOrig(),
preBufferLenOrig(),
postBuffer(),
postBufferLen(),
mBuffer(bufferSize),
d_stream_(),
zerr_(Z_OK),
err(),
decompressionStateInited()
{
resetDecompressionState();
}
ZlibCompressionReadFilter::~ZlibCompressionReadFilter()
{
if (decompressionStateInited)
{
zerr_ = inflateEnd(&d_stream_);
RCF_VERIFY( Z_OK == zerr_, "zlib inflateEnd()" )(zerr_);
decompressionStateInited = false;
}
}
void ZlibCompressionReadFilter::resetDecompressionState()
{
if (decompressionStateInited)
{
zerr_ = inflateEnd(&d_stream_);
RCF_VERIFY( Z_OK == zerr_, "zlib inflateEnd()" );
decompressionStateInited = false;
}
d_stream_.zalloc = NULL;
d_stream_.zfree = NULL;
d_stream_.opaque = NULL;
zerr_ = inflateInit(&d_stream_);
RCF_VERIFY( Z_OK == zerr_, "zlib inflateInit()" )(zerr_);
decompressionStateInited = true;
}
void ZlibCompressionReadFilter::read(char *buffer, std::size_t bufferLen)
{
err = 0;
preBuffer = buffer;
preBufferOrig = buffer;
preBufferLen = bufferLen;
preBufferLenOrig = bufferLen;
onReadCompleted(0, 0);
}
void ZlibCompressionReadFilter::onReadCompleted(std::size_t bytesTransferred, int error)
{
if (error == -1)
{
filter.mReadWriteCompletionCallback(0, -1);
err = -1;
}
else
{
postBufferLen += bytesTransferred;
if (postBufferLen > 0)
{
decompress();
if (preBufferLen < preBufferLenOrig)
{
filter.mReadWriteCompletionCallback(preBufferLenOrig-preBufferLen, 0);
}
else
{
onReadCompleted(0, 0);
}
}
else
{
RCF_ASSERT(postBufferLen == 0);
postBuffer = &mBuffer[0];
filter.mReadFunction(&mBuffer[0], static_cast<int>(mBuffer.size()));
}
}
}
void ZlibCompressionReadFilter::decompress()
{
d_stream_.next_in = (Bytef*) postBuffer;
d_stream_.avail_in = static_cast<uInt>(postBufferLen);
d_stream_.next_out = (Bytef*) preBuffer;
d_stream_.avail_out = static_cast<uInt>(preBufferLen);
zerr_ = inflate(&d_stream_, Z_SYNC_FLUSH);
RCF_VERIFY(zerr_ == Z_OK || zerr_ == Z_STREAM_END, "zlib inflate()")(zerr_)(preBuffer)(preBufferLen)(postBuffer)(postBufferLen);
if (zerr_ == Z_STREAM_END)
{
resetDecompressionState();
}
preBuffer += preBufferLen - d_stream_.avail_out;
preBufferLen -= preBufferLen - d_stream_.avail_out;
postBuffer += postBufferLen - d_stream_.avail_in;
postBufferLen -= postBufferLen - d_stream_.avail_in;
}
ZlibCompressionWriteFilter::ZlibCompressionWriteFilter(ZlibCompressionFilter &filter, int bufferSize, bool stateful) :
filter(filter),
preBuffer(),
preBufferLen(),
preBufferOrig(),
preBufferLenOrig(),
postBuffer(),
postBufferLen(),
mBuffer(bufferSize),
c_stream_(),
zerr_(Z_OK),
err(),
compressionStateInited(),
stateful(stateful)
{}
ZlibCompressionWriteFilter::~ZlibCompressionWriteFilter()
{
if (compressionStateInited)
{
zerr_ = deflateEnd(&c_stream_);
RCF_VERIFY(zerr_ == Z_OK || zerr_ == Z_DATA_ERROR, "zlib deflateEnd()")(zerr_)(stateful);
compressionStateInited = false;
}
}
void ZlibCompressionWriteFilter::resetCompressionState()
{
if (compressionStateInited)
{
zerr_ = deflateEnd(&c_stream_);
RCF_VERIFY( Z_OK == zerr_, "zlib deflateEnd()")(zerr_);
compressionStateInited = false;
}
c_stream_.zalloc = NULL;
c_stream_.zfree = NULL;
c_stream_.opaque = NULL;
zerr_ = deflateInit(&c_stream_, Z_DEFAULT_COMPRESSION);
RCF_VERIFY( Z_OK == zerr_, "zlib deflateInit()")(zerr_);
compressionStateInited = true;
}
void ZlibCompressionWriteFilter::write(const char *buffer, std::size_t bufferLen)
{
err = 0;
preBuffer = (char*) buffer;
preBufferOrig = (char *) buffer;
preBufferLen = bufferLen;
preBufferLenOrig = bufferLen;
postBuffer = &mBuffer[0];
postBufferLen = 0;
if (stateful == false || compressionStateInited == false)
{
resetCompressionState();
}
onWriteCompleted(0, 0);
}
void ZlibCompressionWriteFilter::onWriteCompleted(std::size_t bytesTransferred, int error)
{
// 1. if partial buffer was written -> write remaining part of buffer
// 2. if whole buffer was written -> check if any more compression or writing is needed
// 3. if no more compression or writing needed, notify previous filter of completion
if (error == -1)
{
filter.mReadWriteCompletionCallback(0, -1);
}
else
{
postBuffer += bytesTransferred;
postBufferLen -= bytesTransferred;
if (postBufferLen > 0)
{
filter.mWriteFunction(postBuffer, postBufferLen);
}
else
{
RCF_ASSERT(postBufferLen == 0);
postBuffer = &mBuffer[0];
postBufferLen = static_cast<int>(mBuffer.size());
compress();
if (postBufferLen > 0)
{
filter.mWriteFunction(postBuffer, postBufferLen);
}
else
{
RCF_ASSERT(preBufferLen == 0);
filter.mReadWriteCompletionCallback(preBufferLenOrig, 0);
}
}
}
}
void ZlibCompressionWriteFilter::compress()
{
c_stream_.next_in = (Bytef*) preBuffer;
c_stream_.avail_in = static_cast<uInt>(preBufferLen);
c_stream_.next_out = (Bytef*) postBuffer;
c_stream_.avail_out = static_cast<uInt>(postBufferLen);
zerr_ = preBufferLen > 0 || stateful ? deflate(&c_stream_, Z_SYNC_FLUSH) : deflate(&c_stream_, Z_FINISH);
RCF_VERIFY(zerr_ == Z_OK || zerr_ == Z_BUF_ERROR || zerr_ == Z_STREAM_END, "zlib deflate()")(zerr_)(preBuffer)(preBufferLen)(postBuffer)(postBufferLen);
preBuffer += preBufferLen - c_stream_.avail_in;
preBufferLen -= preBufferLen - c_stream_.avail_in;
postBufferLen = postBufferLen - c_stream_.avail_out;
}
FilterDescription ZlibStatelessCompressionFilter::sGetFilterDescription()
{
return FilterDescription("Zlib stateless compression filter", RCF_FILTER_ZLIB_COMPRESSION_STATELESS);
}
FilterDescription ZlibStatefulCompressionFilter::sGetFilterDescription()
{
return FilterDescription("Zlib stateful compression filter", RCF_FILTER_ZLIB_COMPRESSION_STATEFUL);
}
#ifdef _MSC_VER
#pragma warning( push )
#pragma warning( disable : 4355 ) // warning C4355: 'this' : used in base member initializer list
#endif
ZlibCompressionFilter::ZlibCompressionFilter(int bufferSize, bool stateful) :
preState(Ready),
readFilter( new ZlibCompressionReadFilter(*this, bufferSize) ),
writeFilter( new ZlibCompressionWriteFilter(*this, bufferSize, stateful) )
{}
#ifdef _MSC_VER
#pragma warning( pop )
#endif
FilterDescription ZlibStatelessCompressionFilter::getFilterDescription() const
{
return sGetFilterDescription();
}
FilterDescription ZlibStatefulCompressionFilter::getFilterDescription() const
{
return sGetFilterDescription();
}
void ZlibCompressionFilter::read(char *buffer, std::size_t bufferLen)
{
preState = Reading;
readFilter->read(buffer, bufferLen);
}
void ZlibCompressionFilter::write(const char *buffer, std::size_t bufferLen)
{
preState = Writing;
writeFilter->write(buffer, bufferLen);
}
void ZlibCompressionFilter::onReadWriteCompleted(std::size_t bytesTransferred, int error)
{
switch (preState)
{
case Reading: readFilter->onReadCompleted(bytesTransferred, error); break;
case Writing: writeFilter->onWriteCompleted(bytesTransferred, error); break;
default: RCF_ASSERT(0)(preState);
}
}
ZlibStatelessCompressionFilterFactory::ZlibStatelessCompressionFilterFactory(int bufferSize) :
bufferSize(bufferSize)
{}
FilterPtr ZlibStatelessCompressionFilterFactory::createFilter()
{
return FilterPtr( new ZlibStatelessCompressionFilter(bufferSize) );
}
FilterDescription ZlibStatelessCompressionFilterFactory::getFilterDescription()
{
return ZlibStatelessCompressionFilter::sGetFilterDescription();
}
ZlibStatefulCompressionFilterFactory::ZlibStatefulCompressionFilterFactory(int bufferSize) :
bufferSize(bufferSize)
{}
FilterPtr ZlibStatefulCompressionFilterFactory::createFilter()
{
return FilterPtr( new ZlibStatefulCompressionFilter(bufferSize) );
}
FilterDescription ZlibStatefulCompressionFilterFactory::getFilterDescription()
{
return ZlibStatefulCompressionFilter::sGetFilterDescription();
}
} // namespace RCF