Click here to Skip to main content
15,885,948 members
Articles / Programming Languages / C

BasicAdmin - Personal Organizer

Rate me:
Please Sign up or sign in to vote.
4.94/5 (14 votes)
1 Aug 2009CPOL6 min read 50.7K   5.6K   60  
Finance, contacts, notes organizer
// network.cpp - written and placed in the public domain by Wei Dai

#include "pch.h"
#include "network.h"
#include "wait.h"

#define CRYPTOPP_TRACE_NETWORK 0

NAMESPACE_BEGIN(CryptoPP)

#ifdef HIGHRES_TIMER_AVAILABLE

lword LimitedBandwidth::ComputeCurrentTransceiveLimit()
{
	if (!m_maxBytesPerSecond)
		return ULONG_MAX;

	double curTime = GetCurTimeAndCleanUp();
	lword total = 0;
	for (OpQueue::size_type i=0; i!=m_ops.size(); ++i)
		total += m_ops[i].second;
	return SaturatingSubtract(m_maxBytesPerSecond, total);
}

double LimitedBandwidth::TimeToNextTransceive()
{
	if (!m_maxBytesPerSecond)
		return 0;

	if (!m_nextTransceiveTime)
		ComputeNextTransceiveTime();

	return SaturatingSubtract(m_nextTransceiveTime, m_timer.ElapsedTimeAsDouble());
}

void LimitedBandwidth::NoteTransceive(lword size)
{
	if (m_maxBytesPerSecond)
	{
		double curTime = GetCurTimeAndCleanUp();
		m_ops.push_back(std::make_pair(curTime, size));
		m_nextTransceiveTime = 0;
	}
}

void LimitedBandwidth::ComputeNextTransceiveTime()
{
	double curTime = GetCurTimeAndCleanUp();
	lword total = 0;
	for (unsigned int i=0; i!=m_ops.size(); ++i)
		total += m_ops[i].second;
	m_nextTransceiveTime =
		(total < m_maxBytesPerSecond) ? curTime : m_ops.front().first + 1000;
}

double LimitedBandwidth::GetCurTimeAndCleanUp()
{
	if (!m_maxBytesPerSecond)
		return 0;

	double curTime = m_timer.ElapsedTimeAsDouble();
	while (m_ops.size() && (m_ops.front().first + 1000 < curTime))
		m_ops.pop_front();
	return curTime;
}

void LimitedBandwidth::GetWaitObjects(WaitObjectContainer &container, const CallStack &callStack)
{
	double nextTransceiveTime = TimeToNextTransceive();
	if (nextTransceiveTime)
		container.ScheduleEvent(nextTransceiveTime, CallStack("LimitedBandwidth::GetWaitObjects()", &callStack));
}

// *************************************************************

size_t NonblockingSource::GeneralPump2(
	lword& byteCount, bool blockingOutput,
	unsigned long maxTime, bool checkDelimiter, byte delimiter)
{
	m_blockedBySpeedLimit = false;

	if (!GetMaxBytesPerSecond())
	{
		size_t ret = DoPump(byteCount, blockingOutput, maxTime, checkDelimiter, delimiter);
		m_doPumpBlocked = (ret != 0);
		return ret;
	}

	bool forever = (maxTime == INFINITE_TIME);
	unsigned long timeToGo = maxTime;
	Timer timer(Timer::MILLISECONDS, forever);
	lword maxSize = byteCount;
	byteCount = 0;

	timer.StartTimer();

	while (true)
	{
		lword curMaxSize = UnsignedMin(ComputeCurrentTransceiveLimit(), maxSize - byteCount);

		if (curMaxSize || m_doPumpBlocked)
		{
			if (!forever) timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
			size_t ret = DoPump(curMaxSize, blockingOutput, timeToGo, checkDelimiter, delimiter);
			m_doPumpBlocked = (ret != 0);
			if (curMaxSize)
			{
				NoteTransceive(curMaxSize);
				byteCount += curMaxSize;
			}
			if (ret)
				return ret;
		}

		if (maxSize != ULONG_MAX && byteCount >= maxSize)
			break;

		if (!forever)
		{
			timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
			if (!timeToGo)
				break;
		}

		double waitTime = TimeToNextTransceive();
		if (!forever && waitTime > timeToGo)
		{
			m_blockedBySpeedLimit = true;
			break;
		}

		WaitObjectContainer container;
		LimitedBandwidth::GetWaitObjects(container, CallStack("NonblockingSource::GeneralPump2() - speed limit", 0));
		container.Wait((unsigned long)waitTime);
	}

	return 0;
}

size_t NonblockingSource::PumpMessages2(unsigned int &messageCount, bool blocking)
{
	if (messageCount == 0)
		return 0;

	messageCount = 0;

	lword byteCount;
	do {
		byteCount = LWORD_MAX;
		RETURN_IF_NONZERO(Pump2(byteCount, blocking));
	} while(byteCount == LWORD_MAX);

	if (!m_messageEndSent && SourceExhausted())
	{
		RETURN_IF_NONZERO(AttachedTransformation()->Put2(NULL, 0, GetAutoSignalPropagation(), true));
		m_messageEndSent = true;
		messageCount = 1;
	}
	return 0;
}

lword NonblockingSink::TimedFlush(unsigned long maxTime, size_t targetSize)
{
	m_blockedBySpeedLimit = false;

	size_t curBufSize = GetCurrentBufferSize();
	if (curBufSize <= targetSize && (targetSize || !EofPending()))
		return 0;

	if (!GetMaxBytesPerSecond())
		return DoFlush(maxTime, targetSize);

	bool forever = (maxTime == INFINITE_TIME);
	unsigned long timeToGo = maxTime;
	Timer timer(Timer::MILLISECONDS, forever);
	lword totalFlushed = 0;

	timer.StartTimer();

	while (true)
	{	
		size_t flushSize = UnsignedMin(curBufSize - targetSize, ComputeCurrentTransceiveLimit());
		if (flushSize || EofPending())
		{
			if (!forever) timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
			size_t ret = (size_t)DoFlush(timeToGo, curBufSize - flushSize);
			if (ret)
			{
				NoteTransceive(ret);
				curBufSize -= ret;
				totalFlushed += ret;
			}
		}

		if (curBufSize <= targetSize && (targetSize || !EofPending()))
			break;

		if (!forever)
		{
			timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
			if (!timeToGo)
				break;
		}

		double waitTime = TimeToNextTransceive();
		if (!forever && waitTime > timeToGo)
		{
			m_blockedBySpeedLimit = true;
			break;
		}

		WaitObjectContainer container;
		LimitedBandwidth::GetWaitObjects(container, CallStack("NonblockingSink::TimedFlush() - speed limit", 0));
		container.Wait((unsigned long)waitTime);
	}

	return totalFlushed;
}

bool NonblockingSink::IsolatedFlush(bool hardFlush, bool blocking)
{
	TimedFlush(blocking ? INFINITE_TIME : 0);
	return hardFlush && (!!GetCurrentBufferSize() || EofPending());
}

// *************************************************************

NetworkSource::NetworkSource(BufferedTransformation *attachment)
	: NonblockingSource(attachment), m_buf(1024*16)
	, m_waitingForResult(false), m_outputBlocked(false)
	, m_dataBegin(0), m_dataEnd(0)
{
}

unsigned int NetworkSource::GetMaxWaitObjectCount() const
{
	return LimitedBandwidth::GetMaxWaitObjectCount()
		+ GetReceiver().GetMaxWaitObjectCount()
		+ AttachedTransformation()->GetMaxWaitObjectCount();
}

void NetworkSource::GetWaitObjects(WaitObjectContainer &container, CallStack const& callStack)
{
	if (BlockedBySpeedLimit())
		LimitedBandwidth::GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - speed limit", &callStack));
	else if (!m_outputBlocked)
	{
		if (m_dataBegin == m_dataEnd)
			AccessReceiver().GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - no data", &callStack)); 
		else
			container.SetNoWait(CallStack("NetworkSource::GetWaitObjects() - have data", &callStack));
	}

	AttachedTransformation()->GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - attachment", &callStack));
}

size_t NetworkSource::DoPump(lword &byteCount, bool blockingOutput, unsigned long maxTime, bool checkDelimiter, byte delimiter)
{
	NetworkReceiver &receiver = AccessReceiver();

	lword maxSize = byteCount;
	byteCount = 0;
	bool forever = maxTime == INFINITE_TIME;
	Timer timer(Timer::MILLISECONDS, forever);
	BufferedTransformation *t = AttachedTransformation();

	if (m_outputBlocked)
		goto DoOutput;

	while (true)
	{
		if (m_dataBegin == m_dataEnd)
		{
			if (receiver.EofReceived())
				break;

			if (m_waitingForResult)
			{
				if (receiver.MustWaitForResult() &&
					!receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
						CallStack("NetworkSource::DoPump() - wait receive result", 0)))
					break;

				unsigned int recvResult = receiver.GetReceiveResult();
#if CRYPTOPP_TRACE_NETWORK
				OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str());
#endif
				m_dataEnd += recvResult;
				m_waitingForResult = false;

				if (!receiver.MustWaitToReceive() && !receiver.EofReceived() && m_dataEnd != m_buf.size())
					goto ReceiveNoWait;
			}
			else
			{
				m_dataEnd = m_dataBegin = 0;

				if (receiver.MustWaitToReceive())
				{
					if (!receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
							CallStack("NetworkSource::DoPump() - wait receive", 0)))
						break;

					receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd);
					m_waitingForResult = true;
				}
				else
				{
ReceiveNoWait:
					m_waitingForResult = true;
					// call Receive repeatedly as long as data is immediately available,
					// because some receivers tend to return data in small pieces
#if CRYPTOPP_TRACE_NETWORK
					OutputDebugString((IntToString((unsigned int)this) + ": Receiving " + IntToString(m_buf.size()-m_dataEnd) + " bytes\n").c_str());
#endif
					while (receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd))
					{
						unsigned int recvResult = receiver.GetReceiveResult();
#if CRYPTOPP_TRACE_NETWORK
						OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str());
#endif
						m_dataEnd += recvResult;
						if (receiver.EofReceived() || m_dataEnd > m_buf.size() /2)
						{
							m_waitingForResult = false;
							break;
						}
					}
				}
			}
		}
		else
		{
			m_putSize = UnsignedMin(m_dataEnd - m_dataBegin, maxSize - byteCount);

			if (checkDelimiter)
				m_putSize = std::find(m_buf+m_dataBegin, m_buf+m_dataBegin+m_putSize, delimiter) - (m_buf+m_dataBegin);

DoOutput:
			size_t result = t->PutModifiable2(m_buf+m_dataBegin, m_putSize, 0, forever || blockingOutput);
			if (result)
			{
				if (t->Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
						CallStack("NetworkSource::DoPump() - wait attachment", 0)))
					goto DoOutput;
				else
				{
					m_outputBlocked = true;
					return result;
				}
			}
			m_outputBlocked = false;

			byteCount += m_putSize;
			m_dataBegin += m_putSize;
			if (checkDelimiter && m_dataBegin < m_dataEnd && m_buf[m_dataBegin] == delimiter)
				break;
			if (maxSize != ULONG_MAX && byteCount == maxSize)
				break;
			// once time limit is reached, return even if there is more data waiting
			// but make 0 a special case so caller can request a large amount of data to be
			// pumped as long as it is immediately available
			if (maxTime > 0 && timer.ElapsedTime() > maxTime)
				break;
		}
	}

	return 0;
}

// *************************************************************

NetworkSink::NetworkSink(unsigned int maxBufferSize, unsigned int autoFlushBound)
	: m_maxBufferSize(maxBufferSize), m_autoFlushBound(autoFlushBound)
	, m_needSendResult(false), m_wasBlocked(false), m_eofState(EOF_NONE)
	, m_buffer(STDMIN(16U*1024U+256, maxBufferSize)), m_skipBytes(0) 
	, m_speedTimer(Timer::MILLISECONDS), m_byteCountSinceLastTimerReset(0)
	, m_currentSpeed(0), m_maxObservedSpeed(0)
{
}

float NetworkSink::ComputeCurrentSpeed()
{
	if (m_speedTimer.ElapsedTime() > 1000)
	{
		m_currentSpeed = m_byteCountSinceLastTimerReset * 1000 / m_speedTimer.ElapsedTime();
		m_maxObservedSpeed = STDMAX(m_currentSpeed, m_maxObservedSpeed * 0.98f);
		m_byteCountSinceLastTimerReset = 0;
		m_speedTimer.StartTimer();
//		OutputDebugString(("max speed: " + IntToString((int)m_maxObservedSpeed) + " current speed: " + IntToString((int)m_currentSpeed) + "\n").c_str());
	}
	return m_currentSpeed;
}

float NetworkSink::GetMaxObservedSpeed() const
{
	lword m = GetMaxBytesPerSecond();
	return m ? STDMIN(m_maxObservedSpeed, float(CRYPTOPP_VC6_INT64 m)) : m_maxObservedSpeed;
}

unsigned int NetworkSink::GetMaxWaitObjectCount() const
{
	return LimitedBandwidth::GetMaxWaitObjectCount() + GetSender().GetMaxWaitObjectCount();
}

void NetworkSink::GetWaitObjects(WaitObjectContainer &container, CallStack const& callStack)
{
	if (BlockedBySpeedLimit())
		LimitedBandwidth::GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - speed limit", &callStack));
	else if (m_wasBlocked)
		AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - was blocked", &callStack));
	else if (!m_buffer.IsEmpty())
		AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - buffer not empty", &callStack));
	else if (EofPending())
		AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - EOF pending", &callStack));
}

size_t NetworkSink::Put2(const byte *inString, size_t length, int messageEnd, bool blocking)
{
	if (m_eofState == EOF_DONE)
	{
		if (length || messageEnd)
			throw Exception(Exception::OTHER_ERROR, "NetworkSink::Put2() being called after EOF had been sent");

		return 0;
	}

	if (m_eofState > EOF_NONE)
		goto EofSite;

	{
		if (m_skipBytes)
		{
			assert(length >= m_skipBytes);
			inString += m_skipBytes;
			length -= m_skipBytes;
		}

		m_buffer.Put(inString, length);

		if (!blocking || m_buffer.CurrentSize() > m_autoFlushBound)
			TimedFlush(0, 0);

		size_t targetSize = messageEnd ? 0 : m_maxBufferSize;
		if (blocking)
			TimedFlush(INFINITE_TIME, targetSize);

		if (m_buffer.CurrentSize() > targetSize)
		{
			assert(!blocking);
			m_wasBlocked = true;
			m_skipBytes += length;
			size_t blockedBytes = UnsignedMin(length, m_buffer.CurrentSize() - targetSize);
			return STDMAX<size_t>(blockedBytes, 1);
		}

		m_wasBlocked = false;
		m_skipBytes = 0;
	}

	if (messageEnd)
	{
		m_eofState = EOF_PENDING_SEND;

	EofSite:
		TimedFlush(blocking ? INFINITE_TIME : 0, 0);
		if (m_eofState != EOF_DONE)
			return 1;
	}

	return 0;
}

lword NetworkSink::DoFlush(unsigned long maxTime, size_t targetSize)
{
	NetworkSender &sender = AccessSender();

	bool forever = maxTime == INFINITE_TIME;
	Timer timer(Timer::MILLISECONDS, forever);
	unsigned int totalFlushSize = 0;

	while (true)
	{
		if (m_buffer.CurrentSize() <= targetSize)
			break;
		
		if (m_needSendResult)
		{
			if (sender.MustWaitForResult() &&
				!sender.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
					CallStack("NetworkSink::DoFlush() - wait send result", 0)))
				break;

			unsigned int sendResult = sender.GetSendResult();
#if CRYPTOPP_TRACE_NETWORK
			OutputDebugString((IntToString((unsigned int)this) + ": Sent " + IntToString(sendResult) + " bytes\n").c_str());
#endif
			m_buffer.Skip(sendResult);
			totalFlushSize += sendResult;
			m_needSendResult = false;

			if (!m_buffer.AnyRetrievable())
				break;
		}

		unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
		if (sender.MustWaitToSend() && !sender.Wait(timeOut, CallStack("NetworkSink::DoFlush() - wait send", 0)))
			break;

		size_t contiguousSize = 0;
		const byte *block = m_buffer.Spy(contiguousSize);

#if CRYPTOPP_TRACE_NETWORK
		OutputDebugString((IntToString((unsigned int)this) + ": Sending " + IntToString(contiguousSize) + " bytes\n").c_str());
#endif
		sender.Send(block, contiguousSize);
		m_needSendResult = true;

		if (maxTime > 0 && timeOut == 0)
			break;	// once time limit is reached, return even if there is more data waiting
	}

	m_byteCountSinceLastTimerReset += totalFlushSize;
	ComputeCurrentSpeed();
	
	if (m_buffer.IsEmpty() && !m_needSendResult)
	{
		if (m_eofState == EOF_PENDING_SEND)
		{
			sender.SendEof();
			m_eofState = sender.MustWaitForEof() ? EOF_PENDING_DELIVERY : EOF_DONE;
		}

		while (m_eofState == EOF_PENDING_DELIVERY)
		{
			unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
			if (!sender.Wait(timeOut, CallStack("NetworkSink::DoFlush() - wait EOF", 0)))
				break;

			if (sender.EofSent())
				m_eofState = EOF_DONE;
		}
	}

	return totalFlushSize;
}

#endif	// #ifdef HIGHRES_TIMER_AVAILABLE

NAMESPACE_END

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer
Argentina Argentina
System developer from Argentina.

Programmed in VB 5,6,.NET, C#, Java, PL-SQL, Transac-SQL, C, C++ and even some "calculator" language.

Love to build small, useful applications.
Usually building big and complicated apps based on solid, reliable components.

Hobbies: reading, photography, chess, paddle, running.

Comments and Discussions