Click here to Skip to main content
15,886,568 members
Articles / Programming Languages / C++

Fast IPC - New implementation

Rate me:
Please Sign up or sign in to vote.
4.83/5 (18 votes)
15 Jul 2013CPOL11 min read 45.6K   1.7K   73  
New (faster) implementation of shared memory IPC
//	 SimpleIPC.h
//  This header is part of SimpleWin library, SimpleCore namespace
//  Author - Kosta Cherry
//  
//  License: use any way you want, except the following:
//  1. Don't claim that you are the author of this code.
//  2. Don't try to prevent others from freely using this code. It's in public domain now. It is free and always will be free.
//  3. Don't hold me responsible for any bugs or malfunction, or alike. Use at you own risk. Absolutely no guarantee here whatsoever.
// 
//  Description: defines and implements class CSimpleIPC (and few helper structs). CSimpleIPC is used for inter- or intra-process communication.
//  This is fast "multiple writers - multiple readers" implementation, using shared memory.
//
//  History:
//  Version 1.0 - initial implementation
//  Version 1.1 - 2 bugfixes, both were prominent when INFINITE timeout was specified:
//                1) optional handle stopOn was ignored when INFINITE timeout was specified
//                2) due to race conditions writer was not always informing reader that write operation was completed, thus causing reader to hang forever when INFINITE timeout was specified

#pragma once

#include <string>
#include <vector>
#include <stdint.h> 
#include "SimpleThreadDefines.h"
#include "SimpleCriticalSection.h"

#pragma warning(push)
#pragma warning(disable: 28159)

namespace SimpleCore
{	
	#define SCD_STOP_REQUESTED		0x0001
   #define SCD_PREALLOCATE		   0x0002

   struct SimpleIPCMemoryBuffer
	{
      VUINT32		      m_totalBlocksCount;
		VUINT32		      m_WriteStart;				
		VUINT32		      m_ReadStart;				
      VUINT32		      m_writersWatingCount;   // to signal some writer to wake up
      VUINT32		      m_readersWatingCount;   // to signal some reader to wake up
      // Generic aka "static" data:
		uint32_t				m_blocksCount;          // Blocks per one memory file; sets up once during createConnectionPoint, never changes
      uint32_t  		   m_blockSize;            // Size of one block; sets up once during createConnectionPoint, never changes
      uint32_t          m_blocksLength;
      uint32_t          m_maxExpansions;        // Max number of memory files; sets up once during createConnectionPoint, never changes
      uint32_t          m_minExpantionDistance; // Min distance between writeEnd and ReadStart when we signal need for new expansion; sets up once during createConnectionPoint, never changes
      
      // to share information:
		VUINT32		      m_flags;
      uint32_t          m_reserved;             // reserved for future use

		void	init(uint32_t blockSize, uint32_t initialBlocksCount, uint32_t maxExpansions, uint32_t minExpantionDistance) throw()
		{
			m_ReadStart = 0;
			m_WriteStart = 0;
         m_totalBlocksCount = initialBlocksCount;
         m_writersWatingCount = m_readersWatingCount = 0;

         m_blocksCount = initialBlocksCount;
         m_blockSize = blockSize;
         m_blocksLength = blockSize * initialBlocksCount;
         m_maxExpansions = maxExpansions;
         m_minExpantionDistance = minExpantionDistance;
			m_flags = 0;
		}
		bool					isFlagSet(uint32_t flag) {return (m_flags & flag) > 0;};
		void					setFlag(uint32_t flag) {m_flags |= flag;};
		void					clearFlag(uint32_t flag) {m_flags &= ~flag;};
	};

   #define SCD_IPC_MAX_SPINLOCKS 12
   #define SCD_IPC_LOCK_MAX_SPINLOCKS 40

   struct BlockWriteDescriptor
   {
      VLONG*   status;
   };

   struct BlockReadDescriptor
   {
      VLONG*   status;
   };

   class CSimpleIPC
	{
	public:
		struct IPCDataFileHandle
		{
			HANDLE		m_hDataMapFile;
			void*	      m_pData;
			void*	      m_pStatus; // 2 bytes each flag
			IPCDataFileHandle() : m_hDataMapFile(NULL), m_pData(NULL), m_pStatus(NULL) {};
		};
		typedef std::vector<IPCDataFileHandle> IPCDataVector;
		
		// constructor
		CSimpleIPC() : m_connectionPointBuffer(NULL), m_hMapFile(NULL), m_hMapSignalFile(NULL), m_hDataAvailable(NULL), m_hSpaceAvailable(NULL), m_hStopAllWaits(NULL),
                     m_dataVectorTotal(0), m_maxSpinLocks(SCD_IPC_MAX_SPINLOCKS), m_stopWaits(false), m_isValid(false), m_isSingleThread(false), m_preAllocateArray(false) { };
		~CSimpleIPC() { close();};
		// manager functionality:      // When supplying your own connectionPoint name, you HAVE to guarantee this name is UNIQUE in the system!!!!!!!!!
		bool					createConnectionPoint(uint32_t blockSize, uint32_t initialBlocksCount, uint32_t maxExpansions, bool preAllocate = false, uint32_t minExpansionDistance = 0) ;
      bool					createConnectionPoint(const std::wstring& connectionPoint, uint32_t blockSize, uint32_t initialBlocksCount, uint32_t maxExpansions, bool preAllocate = false, uint32_t minExpantionDistance = 0); 
		void					requestStopCommunications() throw();
      void					requestResumeCommunications() throw();

		// client functionality:
		bool					attachToConnectionPoint(const std::wstring& connectionPoint, DWORD dwMillisecondsTimeout = INFINITE);
		bool					detachFromConnectionPoint();
		DWORD					read (void *pBuff, DWORD buffSize, DWORD dwMillisecondsTimeout = INFINITE, HANDLE stopOn = NULL) throw();
		DWORD					write(void *pBuff, DWORD bytesCount, DWORD dwMillisecondsTimeout = INFINITE, HANDLE stopOn = NULL) throw();
      void*             acquireBlock(BlockReadDescriptor&  blockDescriptor, DWORD dwMillisecondsTimeout = INFINITE, HANDLE stopOn = NULL) throw();
      void*             acquireBlock(BlockWriteDescriptor& blockDescriptor, DWORD dwMillisecondsTimeout = INFINITE, HANDLE stopOn = NULL) throw();
      void              releaseBlock(BlockReadDescriptor&  blockDescriptor) throw();
      void              releaseBlock(BlockWriteDescriptor& blockDescriptor) throw();

		// common functionality:
		const std::wstring&  getConnectionPoint() const throw() {return m_connectionPointName;};
      uint32_t             getExpansionsCount() const throw() {return m_connectionPointBuffer ? m_connectionPointBuffer->m_totalBlocksCount / m_connectionPointBuffer->m_blocksCount : 0;}
      bool                 setMaxExpansionsCount(uint32_t maxExpansions) throw();
      void                 setMaxSpinLocksCount(uint32_t blockAccessSpinLock, uint32_t expansionAccessSpinLock); 
      void                 doSingleThread(bool isSingleThread) throw() {m_isSingleThread = isSingleThread;};
		inline bool			   isCommunicationStopped() const throw() ;
		inline bool			   isValid() const throw() {return m_isValid; }
	private:
      // manager's functions:
		inline bool          allocateBlocksFileMap(uint32_t index) throw() ;
		bool					   close();
		bool					   createEvents();
		bool					   createMainFileMapping(bool doCreate, DWORD dwMillisecondsTimeout);
		bool					   createSignalFileMapping(bool doCreate, DWORD dwMillisecondsTimeout);
      // common functions:
      inline bool          getBlockByIndex(const uint32_t blockIndex, void*& pData, VLONG*& pStatus) throw();
      inline bool          getNextAvailableBlock(uint32_t& blockIndex, void*& pData, VLONG*& pStatus, VUINT32& cursorStart, VUINT32& waitersCount, long valueCheck, HANDLE waitOn, DWORD& dwMillisecondsTimeout, HANDLE stopOn) throw();

      inline uint32_t      getNextIndex(uint32_t index) const throw();
      inline bool			   lockReaderBlock(uint32_t& blockIndex, void*& pData, VLONG*& pFlags, DWORD& dwMillisecondsTimeout, HANDLE stopOn) throw();
      inline bool			   lockWriterBlock(uint32_t& blockIndex, void*& pData, VLONG*& pFlags, DWORD& dwMillisecondsTimeout, HANDLE stopOn) throw();
		// data members:
		SimpleIPCMemoryBuffer*	   m_connectionPointBuffer;// never allocates: it's just a pointer to shared memory
      CSimpleCriticalSectionSE   m_locker;            // used to lock access to m_dataVector when no other way can help
		std::wstring			m_connectionPointName;		// "Address" (name) of connection point
		IPCDataVector			m_dataVector;              // list of "data" files (shared memory "files")
		HANDLE					m_hMapFile;						// Handle to the main mapped memory file
      HANDLE					m_hMapSignalFile;				// Handle to the signal mapped memory file
		HANDLE					m_hDataAvailable;				// Shared event used to signal when data exists
		HANDLE					m_hSpaceAvailable;			// Shared event used to signal when some blocks become available
      HANDLE					m_hStopAllWaits;			   // Shared event used to signal to stop all waits, no matter what's happening
      VUINT32              m_dataVectorTotal;         // this is blockcount*datavector.size(); keep it in separate var to avoid recalculation every time - it changes VERY rarely
      uint32_t             m_maxSpinLocks;
      volatile bool        m_stopWaits;
      bool                 m_isValid;
      bool                 m_isSingleThread;
      bool                 m_preAllocateArray;
	};
}
// implementation
namespace SimpleCore
{
//--CSimpleIPC------------------------------------------------------------------------------------------------------------------------------------------------
	bool	CSimpleIPC::createConnectionPoint(uint32_t blockSize, uint32_t initialBlocksCount, uint32_t maxExpansions, bool preAllocate, uint32_t minExpantionDistance)
	{
		std::wstring connectionPoint = L"CSimpleIPC_slot_"; // we might not neet this, but just in case if someone else will come up with "clever" idea of such an id...
		connectionPoint.append(std::to_wstring((unsigned long long)GetCurrentThreadId()));
		connectionPoint.append(L"_");
		connectionPoint.append(std::to_wstring((unsigned long long)this));
		return createConnectionPoint(connectionPoint, blockSize, initialBlocksCount, maxExpansions, preAllocate, minExpantionDistance);
	}

	bool	CSimpleIPC::createConnectionPoint(const std::wstring& connectionPoint, uint32_t blockSize, uint32_t initialBlocksCount, uint32_t maxExpansions, bool preAllocate, uint32_t minExpantionDistance)
	{
		if (!close()) // clean up; if there were previous errors, clean them too
			return false;
		m_connectionPointName = connectionPoint;

      // create our main mapping file:
		if(!createMainFileMapping(true, 0))
			return false;

		if (!createEvents())
			return false;

      // initialize shared memory
		m_connectionPointBuffer->init(blockSize, initialBlocksCount, maxExpansions, minExpantionDistance);
      if (preAllocate)
         m_connectionPointBuffer->setFlag(SCD_PREALLOCATE);
      m_preAllocateArray = preAllocate;

      m_locker.setMaxSpinCount(SCD_IPC_LOCK_MAX_SPINLOCKS);

      // add first data file
		if (!allocateBlocksFileMap(0))
			return false;

      // signal to those wating to join that they can do it now (memory was properly set up at this point):
		if(!createSignalFileMapping(true, 0))
			return false;

		m_isValid = true;
		return true;
	}

	void	CSimpleIPC::requestStopCommunications() throw() 
	{
		if (!isValid())
			return;

		m_connectionPointBuffer->setFlag(SCD_STOP_REQUESTED);
      ::SetEvent(m_hStopAllWaits);
		return;
	}

   void CSimpleIPC::requestResumeCommunications() throw() 
   {
		if (!isValid())
			return;

		m_connectionPointBuffer->clearFlag(SCD_STOP_REQUESTED);
      ::ResetEvent(m_hStopAllWaits);
		return;
   }

	
	bool	CSimpleIPC::attachToConnectionPoint(const std::wstring& connectionPoint, DWORD dwMillisecondsTimeout)
	{
		if (!close()) // clean up; if there were previous errors, clean them too
			return false;
		m_connectionPointName = connectionPoint;

      // wait for manager to signal that main file has been created and memory properly initialized
		if(!createSignalFileMapping(false, dwMillisecondsTimeout))
			return false; // well, we were unable to wait long enough. Going out...

		if(!createMainFileMapping(false, dwMillisecondsTimeout))
			return false;

		if (!createEvents())
			return false;

      m_locker.setMaxSpinCount(SCD_IPC_LOCK_MAX_SPINLOCKS);
      m_preAllocateArray = m_connectionPointBuffer->isFlagSet(SCD_PREALLOCATE);

		// now many files currently are allocated? Let's get up to speed:
      if (!allocateBlocksFileMap(m_connectionPointBuffer->m_totalBlocksCount / m_connectionPointBuffer->m_blocksCount - 1))
			return false;

		m_isValid = true;
		return true;
	}
		
	bool	CSimpleIPC::detachFromConnectionPoint()
	{
		if (!close())
			return false;
		m_connectionPointName.clear();
		return true;
	}

	DWORD	CSimpleIPC::read(void *pBuff, DWORD buffSize, DWORD dwMillisecondsTimeout, HANDLE stopOn) throw() 
	{
      BlockReadDescriptor bd;
      if (void* pData = acquireBlock(bd, dwMillisecondsTimeout, stopOn))
      {
         uint32_t dataSize = min(buffSize, m_connectionPointBuffer->m_blockSize);
         memcpy(pBuff, pData, dataSize);       
         releaseBlock(bd);
         return dataSize;
      }
      return 0;
	}
	
	DWORD	CSimpleIPC::write(void *pBuff, DWORD bytesCount, DWORD dwMillisecondsTimeout, HANDLE stopOn) throw() 
	{
      BlockWriteDescriptor bd;
      if (void* pData = acquireBlock(bd, dwMillisecondsTimeout, stopOn))
      {
         uint32_t dataSize = min(bytesCount, m_connectionPointBuffer->m_blockSize);
         memcpy(pData, pBuff, dataSize);       
         releaseBlock(bd);
         return dataSize;
      }
      return 0;
	}

   void* CSimpleIPC::acquireBlock(BlockReadDescriptor&  blockDescriptor, DWORD dwMillisecondsTimeout, HANDLE stopOn) throw()
   {
      uint32_t blockIndex;
      void*    pData;
      if (!lockReaderBlock(blockIndex, pData, blockDescriptor.status, dwMillisecondsTimeout, stopOn))
      {  // didn't work first time. Oh, now we can afford some slowdown
         DWORD lockWaitTimeLeft = dwMillisecondsTimeout;
         DWORD startTicks = ::GetTickCount();  
         do
         {
            if (lockReaderBlock(blockIndex, pData, blockDescriptor.status, lockWaitTimeLeft, stopOn))
               return pData;
         }
         while (!m_stopWaits && (((::GetTickCount() - startTicks) < dwMillisecondsTimeout) || (dwMillisecondsTimeout == INFINITE)) && !WAITSIGNALED(m_hStopAllWaits) && (stopOn ? !WAITSIGNALED(stopOn) : true));
         
         return NULL; // Failed out by timeout - can't get next index to read from: either no space left, or other threads are too fast and left this one behind :(
      }// else - we got what we needed, proceed
      return pData;
   }

   void* CSimpleIPC::acquireBlock(BlockWriteDescriptor& blockDescriptor, DWORD dwMillisecondsTimeout, HANDLE stopOn) throw()
   {
      uint32_t blockIndex;
      void*    pData;
      if (!lockWriterBlock(blockIndex, pData, blockDescriptor.status, dwMillisecondsTimeout, stopOn))
      {   
         DWORD lockWaitTimeLeft = dwMillisecondsTimeout;
         DWORD startTicks = ::GetTickCount();            
         do
         {            
            if (lockWriterBlock(blockIndex, pData, blockDescriptor.status, lockWaitTimeLeft, stopOn))
               return pData;
         }
         while (!m_stopWaits  && (((::GetTickCount() - startTicks) < dwMillisecondsTimeout) || (dwMillisecondsTimeout == INFINITE)) && !WAITSIGNALED(m_hStopAllWaits) && (stopOn ? !WAITSIGNALED(stopOn) : true));

         return NULL; // Failed out by timeout - can't get next index to write to: either no space left, or other threads are too fast and left this one behind :(
      }// else - we got what we needed, proceed
      return pData;
   }

   void  CSimpleIPC::releaseBlock(BlockReadDescriptor&  blockDescriptor) throw()
   {
      *blockDescriptor.status = 0;      
      if (m_connectionPointBuffer->m_writersWatingCount) // signal that we moved:
         ::SetEvent(m_hSpaceAvailable);
   }

   void  CSimpleIPC::releaseBlock(BlockWriteDescriptor& blockDescriptor) throw()
   {
      *blockDescriptor.status = 2;
      if (m_connectionPointBuffer->m_readersWatingCount)
         ::SetEvent(m_hDataAvailable);
   }
   
   bool CSimpleIPC::setMaxExpansionsCount(uint32_t maxExpansions) throw() 
   {
      if (!isValid())
         return false;
      if (m_preAllocateArray) // cannot reset if we are pre-allocated
         return false;

      m_connectionPointBuffer->m_maxExpansions = maxExpansions;
      return true;
   }

   void CSimpleIPC::setMaxSpinLocksCount(uint32_t blockAccessSpinLock, uint32_t expansionAccessSpinLock)
   {
      m_maxSpinLocks = blockAccessSpinLock;
      m_locker.setMaxSpinCount((WORD)expansionAccessSpinLock);
   }
   
	bool CSimpleIPC::isCommunicationStopped() const throw() 
	{
		if (!isValid())
			return true;

		return m_connectionPointBuffer->isFlagSet(SCD_STOP_REQUESTED);
	}

	bool	CSimpleIPC::allocateBlocksFileMap(uint32_t index) throw() 
	{
      if (m_locker.enterExclusiveLock())
      {
		   uint32_t oldSize = (uint32_t)m_dataVector.size(); 
		   if (oldSize <= index)
		   {
			   try
            {
				   uint32_t newSize = m_preAllocateArray ? m_connectionPointBuffer->m_maxExpansions : index + 1;
               if (m_dataVector.capacity() < newSize)
				      m_dataVector.reserve(newSize); // if we are thrown exception here
			   }
			   catch(...)	{
               m_locker.leaveLock();
				   return false; // nope, cannot resize, so get out
			   }
			   uint32_t dataMemSize = m_connectionPointBuffer->m_blocksCount * m_connectionPointBuffer->m_blockSize; // where we transfer memory
            if (uint32_t rem = dataMemSize % sizeof(long))
               dataMemSize+= sizeof(long) - rem; // align on "long"
            
            uint32_t totalMemSize = dataMemSize + (m_connectionPointBuffer->m_blocksCount * sizeof(long));

			   // 1. For each missing index open/create file mapping
			   for (uint32_t i = oldSize; i <= index; i++)
			   {
				   IPCDataFileHandle dfh;
				   std::wstring cpName(m_connectionPointName);
				   cpName.append(L"_data_");
				   cpName.append(std::to_wstring((unsigned long long)i));
				
				   dfh.m_hDataMapFile = ::CreateFileMappingW(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, 0, totalMemSize, cpName.c_str());
				   // in a case if this was already created by someone else, we'll just hook to it. Ignore GetLastError - it'll just say "file already exists", and we are good with that

               if (NULL_OR_INVALID(dfh.m_hDataMapFile))
               {
                  m_locker.leaveLock();
					   return false;
               }

				   dfh.m_pData = ::MapViewOfFile(dfh.m_hDataMapFile, FILE_MAP_ALL_ACCESS, 0, 0, totalMemSize);
				   if (dfh.m_pData == NULL) 
				   {
					   // close the handle:
					   ::CloseHandle(dfh.m_hDataMapFile);
                  m_locker.leaveLock();
					   return false; //TODO!! Signal an error!
				   }
               dfh.m_pStatus = (void*)(((UINT_PTR)dfh.m_pData) + dataMemSize);

				   m_dataVector.push_back(dfh); // nope, this will not throw - we already allocated memory!
               m_dataVectorTotal = ((uint32_t)m_dataVector.size())*m_connectionPointBuffer->m_blocksCount;
			   }
		   }
         m_locker.leaveLock();
      }
		return true;
	}

	bool	CSimpleIPC::close()
	{
      for (IPCDataVector::iterator it = m_dataVector.begin(); it != m_dataVector.end(); it++)
		{
			IPCDataFileHandle& dfh = *it;
			if (dfh.m_pData)
				::UnmapViewOfFile(dfh.m_pData);
         dfh.m_pStatus = NULL;
         CLOSE_HANDLE_AND_NULL(dfh.m_hDataMapFile);
		}
		m_dataVector.clear();
      m_dataVectorTotal = 0;

      CLOSE_HANDLE_AND_NULL(m_hDataAvailable);
      CLOSE_HANDLE_AND_NULL(m_hSpaceAvailable);
      CLOSE_HANDLE_AND_NULL(m_hStopAllWaits);
      CLOSE_HANDLE_AND_NULL(m_hMapSignalFile);

		// upmap and close main one:
		if (m_connectionPointBuffer)
		{
         // indicate our leave:
			::UnmapViewOfFile(m_connectionPointBuffer);
			m_connectionPointBuffer = NULL;
		}		
      CLOSE_HANDLE_AND_NULL(m_hMapFile);
      m_isValid = false;
      return true;
	}

	bool	CSimpleIPC::createEvents()
	{
		std::wstring eventName(m_connectionPointName);
		eventName.append(L"_evt_filled");
      m_hDataAvailable = ::CreateEventW(NULL, FALSE, FALSE, eventName.c_str());
      if (NULL_OR_INVALID(m_hDataAvailable))
		   return false; //TODO!! Signal an error!

		eventName = m_connectionPointName;
		eventName.append(L"_evt_avail");
      m_hSpaceAvailable = ::CreateEventW(NULL, FALSE, FALSE, eventName.c_str());
      if (NULL_OR_INVALID(m_hSpaceAvailable)) 
         return false; //TODO!! Signal an error!

		eventName = m_connectionPointName;
		eventName.append(L"_evt_stopall");
      m_hStopAllWaits = ::CreateEventW(NULL, TRUE, FALSE, eventName.c_str());
      if (NULL_OR_INVALID(m_hStopAllWaits)) 
         return false; //TODO!! Signal an error!

		return true;
	}

	bool	CSimpleIPC::createMainFileMapping(bool doCreate, DWORD dwMillisecondsTimeout)
	{
		std::wstring cpName(m_connectionPointName);
		cpName.append(L"_mem");
		if (doCreate)
			m_hMapFile = ::CreateFileMappingW(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, 0, sizeof(SimpleIPCMemoryBuffer), cpName.c_str());
		else
		{
			DWORD startTicks = ::GetTickCount();
			do
			{
				m_hMapFile = ::OpenFileMappingW(FILE_MAP_ALL_ACCESS, FALSE, cpName.c_str());
				if (!(m_hMapFile == NULL || m_hMapFile == INVALID_HANDLE_VALUE))
					break;
            ::Sleep(1);
			}
			while ((((::GetTickCount() - startTicks) < dwMillisecondsTimeout) || (dwMillisecondsTimeout == INFINITE)) && !m_stopWaits);
			
		}
		if (m_hMapFile == NULL || m_hMapFile == INVALID_HANDLE_VALUE)
			return false;

        m_connectionPointBuffer = (SimpleIPCMemoryBuffer*) ::MapViewOfFile(m_hMapFile, FILE_MAP_ALL_ACCESS,	0, 0, sizeof(SimpleIPCMemoryBuffer)); 
        if (m_connectionPointBuffer == NULL) 
			return false; //TODO!! Signal an error!

		return true;
	}

	bool	CSimpleIPC::createSignalFileMapping(bool doCreate, DWORD dwMillisecondsTimeout)
   {
		std::wstring cpName(m_connectionPointName);
		cpName.append(L"_sig");
		if (doCreate)
			m_hMapSignalFile = ::CreateFileMappingW(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, 0, sizeof(DWORD), cpName.c_str());
		else
		{
			DWORD startTicks = ::GetTickCount();
			do
			{
				m_hMapSignalFile = ::OpenFileMappingW(FILE_MAP_ALL_ACCESS, FALSE, cpName.c_str());
				if (!NULL_OR_INVALID(m_hMapSignalFile))
					break;
            ::Sleep(1); // cannot open file yet - probably it doesn't exist yet
			}
			while ((((::GetTickCount() - startTicks) < dwMillisecondsTimeout) || (dwMillisecondsTimeout == INFINITE)) && !m_stopWaits);			
		}
		if (NULL_OR_INVALID(m_hMapSignalFile))
			return false;
      
      return true;
   }

	inline bool	CSimpleIPC::getBlockByIndex(const uint32_t blockIndex, void*& pData, VLONG*& pStatus) throw() 
	{
      if (blockIndex >= m_dataVectorTotal)
         if (!allocateBlocksFileMap(max(blockIndex, m_connectionPointBuffer->m_totalBlocksCount) / m_connectionPointBuffer->m_blocksCount))
            return false; // aha, can't map, so return false!

      const uint32_t idx = blockIndex % m_connectionPointBuffer->m_blocksCount;
      if (m_preAllocateArray || m_isSingleThread) // either we guarantee that this is single thread, or we preallocated an array,
      {                                            // and above code guaranteed to expand it without affecting those who currently hold valid pointers
         const IPCDataFileHandle& dfh = m_dataVector[blockIndex / m_connectionPointBuffer->m_blocksCount];         
         pData = (void*)(((UINT_PTR)dfh.m_pData) + (UINT_PTR)(idx * m_connectionPointBuffer->m_blockSize));
         pStatus = (VLONG*)(((UINT_PTR)dfh.m_pStatus) + (UINT_PTR)(idx << 2));
      }
      else if (m_locker.enterSharedLock()) // we are SO unlucky to get here. Try to avoid at all costs - performance is 3-5 times less!!!!!
      {
         const IPCDataFileHandle& dfh = m_dataVector[blockIndex / m_connectionPointBuffer->m_blocksCount];
         pData = (void*)(((UINT_PTR)dfh.m_pData) + (UINT_PTR)(idx * m_connectionPointBuffer->m_blockSize));
         pStatus = (VLONG*)(((UINT_PTR)dfh.m_pStatus) + (UINT_PTR)(idx << 2));
         m_locker.leaveLock();
      }
      return true;
	}

   inline bool CSimpleIPC::getNextAvailableBlock(uint32_t& blockIndex, void*& pData, VLONG*& pStatus, VUINT32& cursorStart, VUINT32& waitersCount, long valueCheck, HANDLE waitOn, DWORD& dwMillisecondsTimeout, HANDLE stopOn) throw() 
   {
      // get currect ReadStart:
      blockIndex = cursorStart; // m_ReadStart/m_WriteStart
      if (!getBlockByIndex(blockIndex, pData, pStatus))
         return false;
      uint32_t count = 0;
      while (*pStatus != valueCheck)
      {
         if (blockIndex == cursorStart)
         {            
            ++count;
            if (count <= m_maxSpinLocks)
               ::SwitchToThread();
            else
            {
               _InterlockedIncrement((VLONG*)&waitersCount); // m_readersWatingCount/m_writersWatingCount
               if (*pStatus != valueCheck && blockIndex == cursorStart) // Fix 07/15/2013 - account for reader/writer releasing block and advancing while we were incrementing
               {
                  DWORD startTicks = ::GetTickCount();  
                  HANDLE handles[3] = {waitOn, m_hStopAllWaits, stopOn};
                  ::WaitForMultipleObjects((DWORD)(stopOn ? 3 : 2), handles, FALSE, dwMillisecondsTimeout); // special care: dwMillisecondsTimeout can be INFINITE
                  DWORD passedTicks = ::GetTickCount() - startTicks;
                  if (dwMillisecondsTimeout != INFINITE)
                     dwMillisecondsTimeout = dwMillisecondsTimeout < passedTicks ? 0 : dwMillisecondsTimeout - passedTicks;  // if wait was not forever, subtract the time we spent waiting.
               }
               _InterlockedDecrement((VLONG*)&waitersCount); // m_readersWatingCount/m_writersWatingCount
               return (*pStatus == valueCheck);
            }
         }
         else
         {
            blockIndex = cursorStart;   // m_ReadStart/m_WriteStart
            if (!getBlockByIndex(blockIndex, pData, pStatus)) // cursor moved, reread
               return false;
            count = 0; // reset counter
         }
      }
      return (*pStatus == valueCheck);
   }

	inline uint32_t CSimpleIPC::getNextIndex(uint32_t index) const throw() 
	{
		++index;
      return (m_connectionPointBuffer->m_totalBlocksCount == index) ? 0 : index;
	}

   inline bool	CSimpleIPC::lockReaderBlock(uint32_t& blockIndex, void*& pData, VLONG*& pFlags, DWORD& dwMillisecondsTimeout, HANDLE stopOn) throw() 
   {
      if (getNextAvailableBlock(blockIndex, pData, pFlags, m_connectionPointBuffer->m_ReadStart, m_connectionPointBuffer->m_readersWatingCount, 2, m_hDataAvailable, dwMillisecondsTimeout, stopOn))
         if (_InterlockedCompareExchange(pFlags, 3, 2) == 2)
         {
            uint32_t nextIndex = getNextIndex(blockIndex);
            if (_InterlockedCompareExchange((VLONG*)&m_connectionPointBuffer->m_ReadStart, (LONG)nextIndex, (LONG)blockIndex) != (LONG)blockIndex)
            {
               // full write/read cycle happened between us checking if block is free, and actualy marking it - WriteStart&ReadStart moved ahead. Give up block:
               *pFlags = 2; // we are the only ones who hold it, so yes, we can do just an assignment
               return false;
            }            
            return true;
         }
      return false;
   }

   inline bool	CSimpleIPC::lockWriterBlock(uint32_t& blockIndex, void*& pData, VLONG*& pFlags, DWORD& dwMillisecondsTimeout, HANDLE stopOn) throw() 
   {
      if (getNextAvailableBlock(blockIndex, pData, pFlags, m_connectionPointBuffer->m_WriteStart, m_connectionPointBuffer->m_writersWatingCount, 0, m_hSpaceAvailable, dwMillisecondsTimeout, stopOn))
         if (_InterlockedCompareExchange(pFlags, 1, 0) == 0)
         {            
            uint32_t tbc = m_connectionPointBuffer->m_totalBlocksCount; // now it's time to check for expansion:
            if (tbc - 1 == blockIndex)
               if ((m_connectionPointBuffer->m_maxExpansions*m_connectionPointBuffer->m_blocksCount > tbc) &&
                     (m_connectionPointBuffer->m_ReadStart <= m_connectionPointBuffer->m_minExpantionDistance)) // <-- we need to check if we need to expand
                     _InterlockedCompareExchange((VLONG*)&m_connectionPointBuffer->m_totalBlocksCount, (LONG)(tbc + m_connectionPointBuffer->m_blocksCount), (LONG)tbc); // try to increase - we might not be the only one thread who tried.

            uint32_t nextIndex = getNextIndex(blockIndex);
            if (_InterlockedCompareExchange((VLONG*)&m_connectionPointBuffer->m_WriteStart, (LONG)nextIndex, (LONG)blockIndex) != (LONG)blockIndex)
            {
               // full write/read cycle happened between us checking if block is free, and actualy marking it - WriteStart&ReadStart moved ahead. Give up block:
               *pFlags = 0; // we are the only ones who hold it, so yes, we can do just an assignment
               return false;
            }            
            return true;
         }
      return false;
   }
}
#pragma warning(pop)

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
United States United States
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.

Comments and Discussions