// SimpleIPC.h
// This header is part of SimpleWin library, SimpleCore namespace
// Author - Kosta Cherry
//
// License: use any way you want, except the following:
// 1. Don't claim that you are the author of this code.
// 2. Don't try to prevent others from freely using this code. It's in public domain now. It is free and always will be free.
// 3. Don't hold me responsible for any bugs or malfunction, or alike. Use at you own risk. Absolutely no guarantee here whatsoever.
//
// Description: defines and implements class CSimpleIPC (and few helper structs). CSimpleIPC is used for inter- or intra-process communication.
// This is fast "multiple writers - multiple readers" implementation, using shared memory.
//
// History:
// Version 1.0 - initial implementation
// Version 1.1 - 2 bugfixes, both were prominent when INFINITE timeout was specified:
// 1) optional handle stopOn was ignored when INFINITE timeout was specified
// 2) due to race conditions writer was not always informing reader that write operation was completed, thus causing reader to hang forever when INFINITE timeout was specified
#pragma once
#include <string>
#include <vector>
#include <stdint.h>
#include "SimpleThreadDefines.h"
#include "SimpleCriticalSection.h"
#pragma warning(push)
#pragma warning(disable: 28159)
namespace SimpleCore
{
#define SCD_STOP_REQUESTED 0x0001
#define SCD_PREALLOCATE 0x0002
struct SimpleIPCMemoryBuffer
{
VUINT32 m_totalBlocksCount;
VUINT32 m_WriteStart;
VUINT32 m_ReadStart;
VUINT32 m_writersWatingCount; // to signal some writer to wake up
VUINT32 m_readersWatingCount; // to signal some reader to wake up
// Generic aka "static" data:
uint32_t m_blocksCount; // Blocks per one memory file; sets up once during createConnectionPoint, never changes
uint32_t m_blockSize; // Size of one block; sets up once during createConnectionPoint, never changes
uint32_t m_blocksLength;
uint32_t m_maxExpansions; // Max number of memory files; sets up once during createConnectionPoint, never changes
uint32_t m_minExpantionDistance; // Min distance between writeEnd and ReadStart when we signal need for new expansion; sets up once during createConnectionPoint, never changes
// to share information:
VUINT32 m_flags;
uint32_t m_reserved; // reserved for future use
void init(uint32_t blockSize, uint32_t initialBlocksCount, uint32_t maxExpansions, uint32_t minExpantionDistance) throw()
{
m_ReadStart = 0;
m_WriteStart = 0;
m_totalBlocksCount = initialBlocksCount;
m_writersWatingCount = m_readersWatingCount = 0;
m_blocksCount = initialBlocksCount;
m_blockSize = blockSize;
m_blocksLength = blockSize * initialBlocksCount;
m_maxExpansions = maxExpansions;
m_minExpantionDistance = minExpantionDistance;
m_flags = 0;
}
bool isFlagSet(uint32_t flag) {return (m_flags & flag) > 0;};
void setFlag(uint32_t flag) {m_flags |= flag;};
void clearFlag(uint32_t flag) {m_flags &= ~flag;};
};
#define SCD_IPC_MAX_SPINLOCKS 12
#define SCD_IPC_LOCK_MAX_SPINLOCKS 40
struct BlockWriteDescriptor
{
VLONG* status;
};
struct BlockReadDescriptor
{
VLONG* status;
};
class CSimpleIPC
{
public:
struct IPCDataFileHandle
{
HANDLE m_hDataMapFile;
void* m_pData;
void* m_pStatus; // 2 bytes each flag
IPCDataFileHandle() : m_hDataMapFile(NULL), m_pData(NULL), m_pStatus(NULL) {};
};
typedef std::vector<IPCDataFileHandle> IPCDataVector;
// constructor
CSimpleIPC() : m_connectionPointBuffer(NULL), m_hMapFile(NULL), m_hMapSignalFile(NULL), m_hDataAvailable(NULL), m_hSpaceAvailable(NULL), m_hStopAllWaits(NULL),
m_dataVectorTotal(0), m_maxSpinLocks(SCD_IPC_MAX_SPINLOCKS), m_stopWaits(false), m_isValid(false), m_isSingleThread(false), m_preAllocateArray(false) { };
~CSimpleIPC() { close();};
// manager functionality: // When supplying your own connectionPoint name, you HAVE to guarantee this name is UNIQUE in the system!!!!!!!!!
bool createConnectionPoint(uint32_t blockSize, uint32_t initialBlocksCount, uint32_t maxExpansions, bool preAllocate = false, uint32_t minExpansionDistance = 0) ;
bool createConnectionPoint(const std::wstring& connectionPoint, uint32_t blockSize, uint32_t initialBlocksCount, uint32_t maxExpansions, bool preAllocate = false, uint32_t minExpantionDistance = 0);
void requestStopCommunications() throw();
void requestResumeCommunications() throw();
// client functionality:
bool attachToConnectionPoint(const std::wstring& connectionPoint, DWORD dwMillisecondsTimeout = INFINITE);
bool detachFromConnectionPoint();
DWORD read (void *pBuff, DWORD buffSize, DWORD dwMillisecondsTimeout = INFINITE, HANDLE stopOn = NULL) throw();
DWORD write(void *pBuff, DWORD bytesCount, DWORD dwMillisecondsTimeout = INFINITE, HANDLE stopOn = NULL) throw();
void* acquireBlock(BlockReadDescriptor& blockDescriptor, DWORD dwMillisecondsTimeout = INFINITE, HANDLE stopOn = NULL) throw();
void* acquireBlock(BlockWriteDescriptor& blockDescriptor, DWORD dwMillisecondsTimeout = INFINITE, HANDLE stopOn = NULL) throw();
void releaseBlock(BlockReadDescriptor& blockDescriptor) throw();
void releaseBlock(BlockWriteDescriptor& blockDescriptor) throw();
// common functionality:
const std::wstring& getConnectionPoint() const throw() {return m_connectionPointName;};
uint32_t getExpansionsCount() const throw() {return m_connectionPointBuffer ? m_connectionPointBuffer->m_totalBlocksCount / m_connectionPointBuffer->m_blocksCount : 0;}
bool setMaxExpansionsCount(uint32_t maxExpansions) throw();
void setMaxSpinLocksCount(uint32_t blockAccessSpinLock, uint32_t expansionAccessSpinLock);
void doSingleThread(bool isSingleThread) throw() {m_isSingleThread = isSingleThread;};
inline bool isCommunicationStopped() const throw() ;
inline bool isValid() const throw() {return m_isValid; }
private:
// manager's functions:
inline bool allocateBlocksFileMap(uint32_t index) throw() ;
bool close();
bool createEvents();
bool createMainFileMapping(bool doCreate, DWORD dwMillisecondsTimeout);
bool createSignalFileMapping(bool doCreate, DWORD dwMillisecondsTimeout);
// common functions:
inline bool getBlockByIndex(const uint32_t blockIndex, void*& pData, VLONG*& pStatus) throw();
inline bool getNextAvailableBlock(uint32_t& blockIndex, void*& pData, VLONG*& pStatus, VUINT32& cursorStart, VUINT32& waitersCount, long valueCheck, HANDLE waitOn, DWORD& dwMillisecondsTimeout, HANDLE stopOn) throw();
inline uint32_t getNextIndex(uint32_t index) const throw();
inline bool lockReaderBlock(uint32_t& blockIndex, void*& pData, VLONG*& pFlags, DWORD& dwMillisecondsTimeout, HANDLE stopOn) throw();
inline bool lockWriterBlock(uint32_t& blockIndex, void*& pData, VLONG*& pFlags, DWORD& dwMillisecondsTimeout, HANDLE stopOn) throw();
// data members:
SimpleIPCMemoryBuffer* m_connectionPointBuffer;// never allocates: it's just a pointer to shared memory
CSimpleCriticalSectionSE m_locker; // used to lock access to m_dataVector when no other way can help
std::wstring m_connectionPointName; // "Address" (name) of connection point
IPCDataVector m_dataVector; // list of "data" files (shared memory "files")
HANDLE m_hMapFile; // Handle to the main mapped memory file
HANDLE m_hMapSignalFile; // Handle to the signal mapped memory file
HANDLE m_hDataAvailable; // Shared event used to signal when data exists
HANDLE m_hSpaceAvailable; // Shared event used to signal when some blocks become available
HANDLE m_hStopAllWaits; // Shared event used to signal to stop all waits, no matter what's happening
VUINT32 m_dataVectorTotal; // this is blockcount*datavector.size(); keep it in separate var to avoid recalculation every time - it changes VERY rarely
uint32_t m_maxSpinLocks;
volatile bool m_stopWaits;
bool m_isValid;
bool m_isSingleThread;
bool m_preAllocateArray;
};
}
// implementation
namespace SimpleCore
{
//--CSimpleIPC------------------------------------------------------------------------------------------------------------------------------------------------
bool CSimpleIPC::createConnectionPoint(uint32_t blockSize, uint32_t initialBlocksCount, uint32_t maxExpansions, bool preAllocate, uint32_t minExpantionDistance)
{
std::wstring connectionPoint = L"CSimpleIPC_slot_"; // we might not neet this, but just in case if someone else will come up with "clever" idea of such an id...
connectionPoint.append(std::to_wstring((unsigned long long)GetCurrentThreadId()));
connectionPoint.append(L"_");
connectionPoint.append(std::to_wstring((unsigned long long)this));
return createConnectionPoint(connectionPoint, blockSize, initialBlocksCount, maxExpansions, preAllocate, minExpantionDistance);
}
bool CSimpleIPC::createConnectionPoint(const std::wstring& connectionPoint, uint32_t blockSize, uint32_t initialBlocksCount, uint32_t maxExpansions, bool preAllocate, uint32_t minExpantionDistance)
{
if (!close()) // clean up; if there were previous errors, clean them too
return false;
m_connectionPointName = connectionPoint;
// create our main mapping file:
if(!createMainFileMapping(true, 0))
return false;
if (!createEvents())
return false;
// initialize shared memory
m_connectionPointBuffer->init(blockSize, initialBlocksCount, maxExpansions, minExpantionDistance);
if (preAllocate)
m_connectionPointBuffer->setFlag(SCD_PREALLOCATE);
m_preAllocateArray = preAllocate;
m_locker.setMaxSpinCount(SCD_IPC_LOCK_MAX_SPINLOCKS);
// add first data file
if (!allocateBlocksFileMap(0))
return false;
// signal to those wating to join that they can do it now (memory was properly set up at this point):
if(!createSignalFileMapping(true, 0))
return false;
m_isValid = true;
return true;
}
void CSimpleIPC::requestStopCommunications() throw()
{
if (!isValid())
return;
m_connectionPointBuffer->setFlag(SCD_STOP_REQUESTED);
::SetEvent(m_hStopAllWaits);
return;
}
void CSimpleIPC::requestResumeCommunications() throw()
{
if (!isValid())
return;
m_connectionPointBuffer->clearFlag(SCD_STOP_REQUESTED);
::ResetEvent(m_hStopAllWaits);
return;
}
bool CSimpleIPC::attachToConnectionPoint(const std::wstring& connectionPoint, DWORD dwMillisecondsTimeout)
{
if (!close()) // clean up; if there were previous errors, clean them too
return false;
m_connectionPointName = connectionPoint;
// wait for manager to signal that main file has been created and memory properly initialized
if(!createSignalFileMapping(false, dwMillisecondsTimeout))
return false; // well, we were unable to wait long enough. Going out...
if(!createMainFileMapping(false, dwMillisecondsTimeout))
return false;
if (!createEvents())
return false;
m_locker.setMaxSpinCount(SCD_IPC_LOCK_MAX_SPINLOCKS);
m_preAllocateArray = m_connectionPointBuffer->isFlagSet(SCD_PREALLOCATE);
// now many files currently are allocated? Let's get up to speed:
if (!allocateBlocksFileMap(m_connectionPointBuffer->m_totalBlocksCount / m_connectionPointBuffer->m_blocksCount - 1))
return false;
m_isValid = true;
return true;
}
bool CSimpleIPC::detachFromConnectionPoint()
{
if (!close())
return false;
m_connectionPointName.clear();
return true;
}
DWORD CSimpleIPC::read(void *pBuff, DWORD buffSize, DWORD dwMillisecondsTimeout, HANDLE stopOn) throw()
{
BlockReadDescriptor bd;
if (void* pData = acquireBlock(bd, dwMillisecondsTimeout, stopOn))
{
uint32_t dataSize = min(buffSize, m_connectionPointBuffer->m_blockSize);
memcpy(pBuff, pData, dataSize);
releaseBlock(bd);
return dataSize;
}
return 0;
}
DWORD CSimpleIPC::write(void *pBuff, DWORD bytesCount, DWORD dwMillisecondsTimeout, HANDLE stopOn) throw()
{
BlockWriteDescriptor bd;
if (void* pData = acquireBlock(bd, dwMillisecondsTimeout, stopOn))
{
uint32_t dataSize = min(bytesCount, m_connectionPointBuffer->m_blockSize);
memcpy(pData, pBuff, dataSize);
releaseBlock(bd);
return dataSize;
}
return 0;
}
void* CSimpleIPC::acquireBlock(BlockReadDescriptor& blockDescriptor, DWORD dwMillisecondsTimeout, HANDLE stopOn) throw()
{
uint32_t blockIndex;
void* pData;
if (!lockReaderBlock(blockIndex, pData, blockDescriptor.status, dwMillisecondsTimeout, stopOn))
{ // didn't work first time. Oh, now we can afford some slowdown
DWORD lockWaitTimeLeft = dwMillisecondsTimeout;
DWORD startTicks = ::GetTickCount();
do
{
if (lockReaderBlock(blockIndex, pData, blockDescriptor.status, lockWaitTimeLeft, stopOn))
return pData;
}
while (!m_stopWaits && (((::GetTickCount() - startTicks) < dwMillisecondsTimeout) || (dwMillisecondsTimeout == INFINITE)) && !WAITSIGNALED(m_hStopAllWaits) && (stopOn ? !WAITSIGNALED(stopOn) : true));
return NULL; // Failed out by timeout - can't get next index to read from: either no space left, or other threads are too fast and left this one behind :(
}// else - we got what we needed, proceed
return pData;
}
void* CSimpleIPC::acquireBlock(BlockWriteDescriptor& blockDescriptor, DWORD dwMillisecondsTimeout, HANDLE stopOn) throw()
{
uint32_t blockIndex;
void* pData;
if (!lockWriterBlock(blockIndex, pData, blockDescriptor.status, dwMillisecondsTimeout, stopOn))
{
DWORD lockWaitTimeLeft = dwMillisecondsTimeout;
DWORD startTicks = ::GetTickCount();
do
{
if (lockWriterBlock(blockIndex, pData, blockDescriptor.status, lockWaitTimeLeft, stopOn))
return pData;
}
while (!m_stopWaits && (((::GetTickCount() - startTicks) < dwMillisecondsTimeout) || (dwMillisecondsTimeout == INFINITE)) && !WAITSIGNALED(m_hStopAllWaits) && (stopOn ? !WAITSIGNALED(stopOn) : true));
return NULL; // Failed out by timeout - can't get next index to write to: either no space left, or other threads are too fast and left this one behind :(
}// else - we got what we needed, proceed
return pData;
}
void CSimpleIPC::releaseBlock(BlockReadDescriptor& blockDescriptor) throw()
{
*blockDescriptor.status = 0;
if (m_connectionPointBuffer->m_writersWatingCount) // signal that we moved:
::SetEvent(m_hSpaceAvailable);
}
void CSimpleIPC::releaseBlock(BlockWriteDescriptor& blockDescriptor) throw()
{
*blockDescriptor.status = 2;
if (m_connectionPointBuffer->m_readersWatingCount)
::SetEvent(m_hDataAvailable);
}
bool CSimpleIPC::setMaxExpansionsCount(uint32_t maxExpansions) throw()
{
if (!isValid())
return false;
if (m_preAllocateArray) // cannot reset if we are pre-allocated
return false;
m_connectionPointBuffer->m_maxExpansions = maxExpansions;
return true;
}
void CSimpleIPC::setMaxSpinLocksCount(uint32_t blockAccessSpinLock, uint32_t expansionAccessSpinLock)
{
m_maxSpinLocks = blockAccessSpinLock;
m_locker.setMaxSpinCount((WORD)expansionAccessSpinLock);
}
bool CSimpleIPC::isCommunicationStopped() const throw()
{
if (!isValid())
return true;
return m_connectionPointBuffer->isFlagSet(SCD_STOP_REQUESTED);
}
bool CSimpleIPC::allocateBlocksFileMap(uint32_t index) throw()
{
if (m_locker.enterExclusiveLock())
{
uint32_t oldSize = (uint32_t)m_dataVector.size();
if (oldSize <= index)
{
try
{
uint32_t newSize = m_preAllocateArray ? m_connectionPointBuffer->m_maxExpansions : index + 1;
if (m_dataVector.capacity() < newSize)
m_dataVector.reserve(newSize); // if we are thrown exception here
}
catch(...) {
m_locker.leaveLock();
return false; // nope, cannot resize, so get out
}
uint32_t dataMemSize = m_connectionPointBuffer->m_blocksCount * m_connectionPointBuffer->m_blockSize; // where we transfer memory
if (uint32_t rem = dataMemSize % sizeof(long))
dataMemSize+= sizeof(long) - rem; // align on "long"
uint32_t totalMemSize = dataMemSize + (m_connectionPointBuffer->m_blocksCount * sizeof(long));
// 1. For each missing index open/create file mapping
for (uint32_t i = oldSize; i <= index; i++)
{
IPCDataFileHandle dfh;
std::wstring cpName(m_connectionPointName);
cpName.append(L"_data_");
cpName.append(std::to_wstring((unsigned long long)i));
dfh.m_hDataMapFile = ::CreateFileMappingW(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, 0, totalMemSize, cpName.c_str());
// in a case if this was already created by someone else, we'll just hook to it. Ignore GetLastError - it'll just say "file already exists", and we are good with that
if (NULL_OR_INVALID(dfh.m_hDataMapFile))
{
m_locker.leaveLock();
return false;
}
dfh.m_pData = ::MapViewOfFile(dfh.m_hDataMapFile, FILE_MAP_ALL_ACCESS, 0, 0, totalMemSize);
if (dfh.m_pData == NULL)
{
// close the handle:
::CloseHandle(dfh.m_hDataMapFile);
m_locker.leaveLock();
return false; //TODO!! Signal an error!
}
dfh.m_pStatus = (void*)(((UINT_PTR)dfh.m_pData) + dataMemSize);
m_dataVector.push_back(dfh); // nope, this will not throw - we already allocated memory!
m_dataVectorTotal = ((uint32_t)m_dataVector.size())*m_connectionPointBuffer->m_blocksCount;
}
}
m_locker.leaveLock();
}
return true;
}
bool CSimpleIPC::close()
{
for (IPCDataVector::iterator it = m_dataVector.begin(); it != m_dataVector.end(); it++)
{
IPCDataFileHandle& dfh = *it;
if (dfh.m_pData)
::UnmapViewOfFile(dfh.m_pData);
dfh.m_pStatus = NULL;
CLOSE_HANDLE_AND_NULL(dfh.m_hDataMapFile);
}
m_dataVector.clear();
m_dataVectorTotal = 0;
CLOSE_HANDLE_AND_NULL(m_hDataAvailable);
CLOSE_HANDLE_AND_NULL(m_hSpaceAvailable);
CLOSE_HANDLE_AND_NULL(m_hStopAllWaits);
CLOSE_HANDLE_AND_NULL(m_hMapSignalFile);
// upmap and close main one:
if (m_connectionPointBuffer)
{
// indicate our leave:
::UnmapViewOfFile(m_connectionPointBuffer);
m_connectionPointBuffer = NULL;
}
CLOSE_HANDLE_AND_NULL(m_hMapFile);
m_isValid = false;
return true;
}
bool CSimpleIPC::createEvents()
{
std::wstring eventName(m_connectionPointName);
eventName.append(L"_evt_filled");
m_hDataAvailable = ::CreateEventW(NULL, FALSE, FALSE, eventName.c_str());
if (NULL_OR_INVALID(m_hDataAvailable))
return false; //TODO!! Signal an error!
eventName = m_connectionPointName;
eventName.append(L"_evt_avail");
m_hSpaceAvailable = ::CreateEventW(NULL, FALSE, FALSE, eventName.c_str());
if (NULL_OR_INVALID(m_hSpaceAvailable))
return false; //TODO!! Signal an error!
eventName = m_connectionPointName;
eventName.append(L"_evt_stopall");
m_hStopAllWaits = ::CreateEventW(NULL, TRUE, FALSE, eventName.c_str());
if (NULL_OR_INVALID(m_hStopAllWaits))
return false; //TODO!! Signal an error!
return true;
}
bool CSimpleIPC::createMainFileMapping(bool doCreate, DWORD dwMillisecondsTimeout)
{
std::wstring cpName(m_connectionPointName);
cpName.append(L"_mem");
if (doCreate)
m_hMapFile = ::CreateFileMappingW(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, 0, sizeof(SimpleIPCMemoryBuffer), cpName.c_str());
else
{
DWORD startTicks = ::GetTickCount();
do
{
m_hMapFile = ::OpenFileMappingW(FILE_MAP_ALL_ACCESS, FALSE, cpName.c_str());
if (!(m_hMapFile == NULL || m_hMapFile == INVALID_HANDLE_VALUE))
break;
::Sleep(1);
}
while ((((::GetTickCount() - startTicks) < dwMillisecondsTimeout) || (dwMillisecondsTimeout == INFINITE)) && !m_stopWaits);
}
if (m_hMapFile == NULL || m_hMapFile == INVALID_HANDLE_VALUE)
return false;
m_connectionPointBuffer = (SimpleIPCMemoryBuffer*) ::MapViewOfFile(m_hMapFile, FILE_MAP_ALL_ACCESS, 0, 0, sizeof(SimpleIPCMemoryBuffer));
if (m_connectionPointBuffer == NULL)
return false; //TODO!! Signal an error!
return true;
}
bool CSimpleIPC::createSignalFileMapping(bool doCreate, DWORD dwMillisecondsTimeout)
{
std::wstring cpName(m_connectionPointName);
cpName.append(L"_sig");
if (doCreate)
m_hMapSignalFile = ::CreateFileMappingW(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, 0, sizeof(DWORD), cpName.c_str());
else
{
DWORD startTicks = ::GetTickCount();
do
{
m_hMapSignalFile = ::OpenFileMappingW(FILE_MAP_ALL_ACCESS, FALSE, cpName.c_str());
if (!NULL_OR_INVALID(m_hMapSignalFile))
break;
::Sleep(1); // cannot open file yet - probably it doesn't exist yet
}
while ((((::GetTickCount() - startTicks) < dwMillisecondsTimeout) || (dwMillisecondsTimeout == INFINITE)) && !m_stopWaits);
}
if (NULL_OR_INVALID(m_hMapSignalFile))
return false;
return true;
}
inline bool CSimpleIPC::getBlockByIndex(const uint32_t blockIndex, void*& pData, VLONG*& pStatus) throw()
{
if (blockIndex >= m_dataVectorTotal)
if (!allocateBlocksFileMap(max(blockIndex, m_connectionPointBuffer->m_totalBlocksCount) / m_connectionPointBuffer->m_blocksCount))
return false; // aha, can't map, so return false!
const uint32_t idx = blockIndex % m_connectionPointBuffer->m_blocksCount;
if (m_preAllocateArray || m_isSingleThread) // either we guarantee that this is single thread, or we preallocated an array,
{ // and above code guaranteed to expand it without affecting those who currently hold valid pointers
const IPCDataFileHandle& dfh = m_dataVector[blockIndex / m_connectionPointBuffer->m_blocksCount];
pData = (void*)(((UINT_PTR)dfh.m_pData) + (UINT_PTR)(idx * m_connectionPointBuffer->m_blockSize));
pStatus = (VLONG*)(((UINT_PTR)dfh.m_pStatus) + (UINT_PTR)(idx << 2));
}
else if (m_locker.enterSharedLock()) // we are SO unlucky to get here. Try to avoid at all costs - performance is 3-5 times less!!!!!
{
const IPCDataFileHandle& dfh = m_dataVector[blockIndex / m_connectionPointBuffer->m_blocksCount];
pData = (void*)(((UINT_PTR)dfh.m_pData) + (UINT_PTR)(idx * m_connectionPointBuffer->m_blockSize));
pStatus = (VLONG*)(((UINT_PTR)dfh.m_pStatus) + (UINT_PTR)(idx << 2));
m_locker.leaveLock();
}
return true;
}
inline bool CSimpleIPC::getNextAvailableBlock(uint32_t& blockIndex, void*& pData, VLONG*& pStatus, VUINT32& cursorStart, VUINT32& waitersCount, long valueCheck, HANDLE waitOn, DWORD& dwMillisecondsTimeout, HANDLE stopOn) throw()
{
// get currect ReadStart:
blockIndex = cursorStart; // m_ReadStart/m_WriteStart
if (!getBlockByIndex(blockIndex, pData, pStatus))
return false;
uint32_t count = 0;
while (*pStatus != valueCheck)
{
if (blockIndex == cursorStart)
{
++count;
if (count <= m_maxSpinLocks)
::SwitchToThread();
else
{
_InterlockedIncrement((VLONG*)&waitersCount); // m_readersWatingCount/m_writersWatingCount
if (*pStatus != valueCheck && blockIndex == cursorStart) // Fix 07/15/2013 - account for reader/writer releasing block and advancing while we were incrementing
{
DWORD startTicks = ::GetTickCount();
HANDLE handles[3] = {waitOn, m_hStopAllWaits, stopOn};
::WaitForMultipleObjects((DWORD)(stopOn ? 3 : 2), handles, FALSE, dwMillisecondsTimeout); // special care: dwMillisecondsTimeout can be INFINITE
DWORD passedTicks = ::GetTickCount() - startTicks;
if (dwMillisecondsTimeout != INFINITE)
dwMillisecondsTimeout = dwMillisecondsTimeout < passedTicks ? 0 : dwMillisecondsTimeout - passedTicks; // if wait was not forever, subtract the time we spent waiting.
}
_InterlockedDecrement((VLONG*)&waitersCount); // m_readersWatingCount/m_writersWatingCount
return (*pStatus == valueCheck);
}
}
else
{
blockIndex = cursorStart; // m_ReadStart/m_WriteStart
if (!getBlockByIndex(blockIndex, pData, pStatus)) // cursor moved, reread
return false;
count = 0; // reset counter
}
}
return (*pStatus == valueCheck);
}
inline uint32_t CSimpleIPC::getNextIndex(uint32_t index) const throw()
{
++index;
return (m_connectionPointBuffer->m_totalBlocksCount == index) ? 0 : index;
}
inline bool CSimpleIPC::lockReaderBlock(uint32_t& blockIndex, void*& pData, VLONG*& pFlags, DWORD& dwMillisecondsTimeout, HANDLE stopOn) throw()
{
if (getNextAvailableBlock(blockIndex, pData, pFlags, m_connectionPointBuffer->m_ReadStart, m_connectionPointBuffer->m_readersWatingCount, 2, m_hDataAvailable, dwMillisecondsTimeout, stopOn))
if (_InterlockedCompareExchange(pFlags, 3, 2) == 2)
{
uint32_t nextIndex = getNextIndex(blockIndex);
if (_InterlockedCompareExchange((VLONG*)&m_connectionPointBuffer->m_ReadStart, (LONG)nextIndex, (LONG)blockIndex) != (LONG)blockIndex)
{
// full write/read cycle happened between us checking if block is free, and actualy marking it - WriteStart&ReadStart moved ahead. Give up block:
*pFlags = 2; // we are the only ones who hold it, so yes, we can do just an assignment
return false;
}
return true;
}
return false;
}
inline bool CSimpleIPC::lockWriterBlock(uint32_t& blockIndex, void*& pData, VLONG*& pFlags, DWORD& dwMillisecondsTimeout, HANDLE stopOn) throw()
{
if (getNextAvailableBlock(blockIndex, pData, pFlags, m_connectionPointBuffer->m_WriteStart, m_connectionPointBuffer->m_writersWatingCount, 0, m_hSpaceAvailable, dwMillisecondsTimeout, stopOn))
if (_InterlockedCompareExchange(pFlags, 1, 0) == 0)
{
uint32_t tbc = m_connectionPointBuffer->m_totalBlocksCount; // now it's time to check for expansion:
if (tbc - 1 == blockIndex)
if ((m_connectionPointBuffer->m_maxExpansions*m_connectionPointBuffer->m_blocksCount > tbc) &&
(m_connectionPointBuffer->m_ReadStart <= m_connectionPointBuffer->m_minExpantionDistance)) // <-- we need to check if we need to expand
_InterlockedCompareExchange((VLONG*)&m_connectionPointBuffer->m_totalBlocksCount, (LONG)(tbc + m_connectionPointBuffer->m_blocksCount), (LONG)tbc); // try to increase - we might not be the only one thread who tried.
uint32_t nextIndex = getNextIndex(blockIndex);
if (_InterlockedCompareExchange((VLONG*)&m_connectionPointBuffer->m_WriteStart, (LONG)nextIndex, (LONG)blockIndex) != (LONG)blockIndex)
{
// full write/read cycle happened between us checking if block is free, and actualy marking it - WriteStart&ReadStart moved ahead. Give up block:
*pFlags = 0; // we are the only ones who hold it, so yes, we can do just an assignment
return false;
}
return true;
}
return false;
}
}
#pragma warning(pop)