Click here to Skip to main content
15,883,623 members
Articles / Desktop Programming / Win32

Cross-Platform IPC Event Manager for Interaction with Service Providers

Rate me:
Please Sign up or sign in to vote.
4.40/5 (5 votes)
6 May 2009LGPL35 min read 38.6K   713   38  
This article shows you how to send or post events among processes using shared memory queues for the Windows and Linux platforms.
/* ==============================================================================================================================
 * This notice must be untouched at all times.
 *
 * Copyright  IntelliWizard Inc. 
 * All rights reserved.
 * LICENSE: LGPL. 
 * Redistributions of source code modifications must send back to the Intelliwizard Project and republish them. 
 * Web: http://www.intelliwizard.com
 * eMail: info@intelliwizard.com
 * We provide technical supports for UML StateWizard users.
 * ==============================================================================================================================*/

#include "ext_ipc_event.h"

#include <stdio.h>
#include <stdlib.h>

#define SME_SENDCMD_INTERVAL 10
#define SME_SENDCMD_TIMEOUT (8*1000/SME_SENDCMD_INTERVAL)

#ifdef WIN32
#define SME_SHAREMEMORY_CMD_NAME "SME_SHAREMEMORY_CMD_NAME"
#define SME_SHAREMEMORY_SEMAPHORE_CMD_NAME "SME_SHAREMEMORY_SEMAPHORE_CMD_NAME"

#else
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <unistd.h>
#include <errno.h>
#include <pwd.h>
#define SME_SHAREMEMORY_CMD_NAME "../SME_SHAREMEMORY_CMD"
#define SME_SHAREMEMORY_SEMAPHORE_CMD_NAME "../SME_SHAREMEMORY_SEMAPHORE_CMD"

#endif

/////////////////////////////////////////////////////////////////////////////////////////////////////////////
XSemaphore::XSemaphore(
		unsigned int initial_count, 
		unsigned int max_count, 
		const char *name, 
		bool create_new)
: sem_(0), owned(create_new), name_(name)
{
#ifdef WIN32
	if(create_new)
		sem_=CreateEvent(NULL, FALSE, TRUE, name);
	else
		sem_=OpenEvent(EVENT_ALL_ACCESS, FALSE, name);
#else
	// TODO: Error check
	// 1. Create a file with name is 'name' under certain directory
//	if(create_new)
//	{
//		int fd=open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
//		if(fd==-1)
//		{
//			return;	// In constructor I cannot do more...
//		}
//		close(fd);
//	}

	// 2. Uses GenerateKeyFromName to get a key_t
	key_t key=(key_t)GenerateKeyFromName(name, 113);
	if(key==-1)
		return;

	// 3. Uses semget to create a semaphore, if create_new is true, set  IPC_CREAT and IPC_EXCL flags.
	if(create_new)
	{
		sem_=semget(key, 1, IPC_CREAT | IPC_EXCL | S_IRUSR | S_IWUSR);
	}
	else
	{
		// Open exist
		sem_=semget(key, 1, 0);
	}
	if(sem_==-1)
	{
		sem_=0;
		return;
	}

	// 4. Set initial count, SysV Semaphore hasn't max count
	if(create_new)
	{
		/* according to X/OPEN we have to define it ourselves */
		union semun {
			int val;                  /* value for SETVAL */
			struct semid_ds *buf;     /* buffer for IPC_STAT, IPC_SET */
			unsigned short *array;    /* array for GETALL, SETALL */
			/* Linux specific part: */
			struct seminfo *__buf;    /* buffer for IPC_INFO */
		} s;
		s.val=initial_count;
		semctl(sem_, 0, SETVAL, s);
		// What can I do if semctl failed?

		//assign the gid and uid
		if(getuid()==0)
		{
			struct passwd pwd ;
			char pwd_str_buf[513]; 
			memset(&pwd_str_buf, 0, sizeof(pwd_str_buf));
			struct passwd *p_passwd=NULL;
			getpwnam_r(SMESTR_APPLICATION_USER, &pwd, pwd_str_buf, sizeof(pwd_str_buf)-1, &p_passwd);
		
			
			if(p_passwd)
			{
				struct semid_ds sd;
				memset(&sd, 0, sizeof(sd));
				semctl(sem_, 0, IPC_STAT, &sd);
				sd.sem_perm.uid = p_passwd->pw_uid;
				sd.sem_perm.gid = p_passwd->pw_gid;
				semctl(sem_, 0, IPC_SET, &sd);
			}
		}

	}

	// Ok, I know it is not a good solution, as I can merge 3 semaphores into 1 SVR4 semaphore
	//	but I think it is simple enough..
#endif
}

XSemaphore::~XSemaphore()
{
#ifdef WIN32
	if((sem_!=INVALID_HANDLE_VALUE) && (sem_!=0))
		CloseHandle(sem_);
#else
	if(owned)
	{
		// Delete semaphore
		semctl(sem_, 0, IPC_RMID);
		unlink(name_);
	}
#endif
}

int XSemaphore::P()
{
#ifdef WIN32
	DWORD ret=WaitForSingleObject(sem_, 30*1000);
	switch(ret)
	{
	case WAIT_OBJECT_0:
		return 0;
	default:
		//dirty code, set the event if timeout
		SetEvent(sem_);
		return -1;
	}
#else
	sembuf sb;
	memset(&sb, 0, sizeof(sembuf));
	sb.sem_num=0;
	sb.sem_op=-1;
	sb.sem_flg=SEM_UNDO;
	int ret=0;
	// semop maybe interrupted by signals, restart waiting
	do{
		ret=semop(sem_, &sb, 1);
		if(ret < 0) printf("ipc semop err: %d\n", errno);
	} while((ret<0) && (errno==EINTR));
	return ret;
#endif
}

int XSemaphore::V()
{
#ifdef WIN32
	return SetEvent(sem_)?0:-1;
#else
	sembuf sb;
	memset(&sb, 0, sizeof(sembuf));
	sb.sem_num=0;
	sb.sem_op=1;
	sb.sem_flg=SEM_UNDO;
	int ret=0;
	// semop maybe interrupted by signals, restart waiting
	do{
		ret=semop(sem_, &sb, 1);
		if(ret < 0) printf("ipc semop err: %d\n", errno);
	} while((ret<0) && (errno==EINTR));
	return ret;
#endif
}

void SleepInMilliseconds(int msec)
{
#ifdef WIN32
	Sleep(msec);
#else
	usleep(msec * 1000);	// usleep wait in microseconds, not milliseconds
#endif
}

XExtIPCEvent ipc;

struct UUIDInfo 
{
	unsigned long uiParameter;
};

////////////////////////////////////////////////////////////////////////////////////////////////////////
// Class XExtIPCEvent 

XExtIPCEvent::XExtIPCEvent()
      : m_pSMQueue(0),
        m_pSem(0),
        m_idProcess(-1)

{
}


XExtIPCEvent::~XExtIPCEvent()
{
}



int XExtIPCEvent::Lock ()
{
	return m_pSem->P();
}

int XExtIPCEvent::UnLock()
{
	return m_pSem->V();
}

XExtIPCEvent* XExtIPCEvent::GetInstance ()
{
	return &ipc;
}

int XExtIPCEvent::Create(int ProcessID, bool bIsMasterProcess)
{
	m_pSMQueue = new XSharedMemoryQueue();
	if(m_pSMQueue==0)
		return -1;
	if(!m_pSMQueue->Initialize(SME_SHAREMEMORY_CMD_NAME, 
		sizeof(IPCEventInfo), IPC_EVENT_BUFFER_SIZE, IPC_QUEUE_NUMBER))
	{
		Release();
		return -1;
	}

	void* pCommandInfoPoint = 0;
	if(bIsMasterProcess)
		pCommandInfoPoint = m_pSMQueue->CreateSharedMemoryQueue();
	else
		pCommandInfoPoint = m_pSMQueue->OpenSharedMemoryQueue();

	if(pCommandInfoPoint==0)
	{
		Release();
		return -1;
	}

	m_pSem=new XSemaphore(1, SME_SHAREMEMORY_SEMAPHORE_NUM, SME_SHAREMEMORY_SEMAPHORE_CMD_NAME, bIsMasterProcess);
	if(m_pSem==0)
	{
		Release();
		return -1;
	}

	m_idProcess = ProcessID;

	return 0;
}

void XExtIPCEvent::Release (bool bIsMasterProcess)
{
	if(m_pSem)
	{
		delete m_pSem;
		m_pSem = NULL;
	}

	if(m_pSMQueue!=0)
	{
		m_pSMQueue->Release();
		if(bIsMasterProcess)
			m_pSMQueue->DeleteSharedMemoryQueue();
		else
			m_pSMQueue->CloseSharedMemoryQueue();
		
		delete m_pSMQueue;
		m_pSMQueue = 0;
	}
}

/**************************************************************************************************************
Allocate a data block and move it to the event sending queue. 

Sends the specified event to a process. This function does not return until the event status is consumed (EVENT_STATUS_CONSUMED). 

**************************************************************************************************************/

int XExtIPCEvent::SendIPCEvent(IPCEventInfo* pInEventInfo, IPCEventInfo* pOutEventInfo, int nTimeout /* = -1 */)
{
	int err = 0;

	if(pInEventInfo==0||pOutEventInfo==0||m_pSMQueue==0)
		return -1;
	else
		memset(pOutEventInfo, 0, sizeof(IPCEventInfo));

	pInEventInfo->IPCEventStatus = EVENT_STATUS_SENDING;

	if (-1 != m_idProcess)
		pInEventInfo->SourceProcessID = m_idProcess;

	//Get a block and set data
	int index = -1;
	if(Lock()<0) return -1;
	IPCEventInfo* pTmpInfo = (IPCEventInfo*)m_pSMQueue->GetQueueHead(IPC_QUEUE_RESOURCE);
	if(pTmpInfo!=0&&pTmpInfo!=pInEventInfo)
	{
		memcpy(pTmpInfo, pInEventInfo, sizeof(IPCEventInfo));
		m_pSMQueue->MoveToDestQueueTail(pTmpInfo, IPC_QUEUE_SENDING);
	}
	else
		err = -2;
	if(UnLock()<0) return -1;

	if(err!=0) return err;

    const int timeout = (nTimeout<0 ? SME_SENDCMD_TIMEOUT : nTimeout/SME_SENDCMD_INTERVAL);

	//Get the result
	int Status;
	int i=0;
	for(i=0;i<timeout;i++)
	{
		if(Lock()<0) return -1;
		Status = pTmpInfo->IPCEventStatus;
		if(UnLock()<0) return -1;
		if(EVENT_STATUS_CONSUMED == Status) break;
		SleepInMilliseconds(SME_SENDCMD_INTERVAL);
	}

	//return the result
	if(i==timeout)
	{
		memset(pOutEventInfo, 0, sizeof(IPCEventInfo));
		err = -1;
	}
	else
	{
		if(Lock()<0) return -1;
		memcpy(pOutEventInfo, pTmpInfo, sizeof(IPCEventInfo));
		m_pSMQueue->MoveToDestQueueTail(pTmpInfo, IPC_QUEUE_RESOURCE);
		if(UnLock()<0) return -1;
	}

	return err;
}

// Allocate a data block and move it to the event posting queue. 


int XExtIPCEvent::PostIPCEvent(IPCEventInfo* pInEventInfo)
{
	int err = 0;
	
	if(pInEventInfo==0||m_pSMQueue==0)
		return -1;

	pInEventInfo->IPCEventStatus = EVENT_STATUS_POSTING;
	if (-1 != m_idProcess)
		pInEventInfo->SourceProcessID = m_idProcess;

	//Get a block and set data
	if(Lock()<0) return -1;
	IPCEventInfo* pTmpInfo = (IPCEventInfo*)m_pSMQueue->GetQueueHead(IPC_QUEUE_RESOURCE);
	if(pTmpInfo!=0)
	{
		memcpy(pTmpInfo, pInEventInfo, sizeof(IPCEventInfo));
		m_pSMQueue->MoveToDestQueueTail(pTmpInfo, IPC_QUEUE_POSTING);
	}
	else
		err = -2;
	if(UnLock()<0) return -1;

	return err;
}

/************************************************************************************************************
// Return 
// 1: IPC event is available.
// 0: No IPC event.
// <0: Error
************************************************************************************************************/
int XExtIPCEvent::QueryIPCEvent(IPCEventInfo* pOutEventInfo, SME_IPC_EVENT_CALLBACK_T pfnCallbak)
{
	int err = 0;
	if(pOutEventInfo==0||m_pSMQueue==0)
		return -1;
	else
		memset(pOutEventInfo, 0, sizeof(IPCEventInfo));

	// Get event from sending queue.
	if(Lock()<0) return -1;
	IPCEventInfo* pTmpFirstInfo = (IPCEventInfo*)m_pSMQueue->GetQueueHead(IPC_QUEUE_SENDING);
	IPCEventInfo* pTmpInfo = pTmpFirstInfo;
	while(pTmpInfo)
	{
		if(pTmpInfo->DestProcessID==m_idProcess)
		{
			memcpy(pOutEventInfo, pTmpInfo, sizeof(IPCEventInfo));
			m_pSMQueue->MoveToDestQueueTail(pTmpInfo, IPC_QUEUE_RESOURCE);
			err=1; // available
			break;
		}
		pTmpInfo = (IPCEventInfo*)m_pSMQueue->GetNext(pTmpInfo);
		if(pTmpInfo==pTmpFirstInfo)
		{
			memset(pOutEventInfo, 0, sizeof(IPCEventInfo));
			err = -1;
			break;
		}
	}
	if(UnLock()<0) return -1;

	if (err<0)
		return err;

	if(pTmpInfo!=0&&err>=0)
	{
		if(pfnCallbak&&!pfnCallbak(pOutEventInfo))
			err = -3;
		if(Lock()<0) return -1;
		pTmpInfo->ulParameter1=pOutEventInfo->ulParameter1;
		pTmpInfo->ulParameter2=pOutEventInfo->ulParameter2;
		pTmpInfo->IPCEventStatus=EVENT_STATUS_CONSUMED;
		if(UnLock()<0) return -1;
		return err;
	}

	///////////////////////////////////////////////////////////////////////////////////////////////
	// Get event from posting queue.
	err = 0;
	if(Lock()<0) return -1;
	pTmpFirstInfo = (IPCEventInfo*)m_pSMQueue->GetQueueHead(IPC_QUEUE_POSTING);
	pTmpInfo = pTmpFirstInfo;
	while(pTmpInfo)
	{
		if(pTmpInfo->DestProcessID==m_idProcess)
		{
			memcpy(pOutEventInfo, pTmpInfo, sizeof(IPCEventInfo));
			m_pSMQueue->MoveToDestQueueTail(pTmpInfo, IPC_QUEUE_RESOURCE);
			err=1; // available
			break;
		}
		pTmpInfo = (IPCEventInfo*)m_pSMQueue->GetNext(pTmpInfo);
		if(pTmpInfo==pTmpFirstInfo)
		{
			memset(pOutEventInfo, 0, sizeof(IPCEventInfo));
			err = -1;
			break;
		}
	}
	if(UnLock()<0) return -1;

	if (err<0)
		return err;

	if(pTmpInfo!=0&&err>=0)
	{
		if(pfnCallbak&&!pfnCallbak(pOutEventInfo))
			err = -3;
		return err;
	}

	return err;
}


void XExtIPCEvent::RecycleEvent(int idProcess)
{
	if(Lock()<0) return;
	for(int i=1;i<IPC_QUEUE_NUMBER;i++)
	{
		IPCEventInfo* pTmpFirstInfo = (IPCEventInfo*)m_pSMQueue->GetQueueHead(i);
		IPCEventInfo* pTmpInfo = pTmpFirstInfo;
		while(pTmpInfo)
		{
			IPCEventInfo* pTmpNextInfo = (IPCEventInfo*)m_pSMQueue->GetNext(pTmpInfo);
			if(pTmpInfo->DestProcessID==idProcess)
				m_pSMQueue->MoveToDestQueueTail(pTmpInfo, IPC_QUEUE_RESOURCE);
			if(pTmpInfo->SourceProcessID==idProcess)
				m_pSMQueue->MoveToDestQueueTail(pTmpInfo, IPC_QUEUE_RESOURCE);

			pTmpInfo = pTmpNextInfo;
			if(m_pSMQueue->GetQueueHead(i)==0||pTmpInfo==(IPCEventInfo*)m_pSMQueue->GetQueueHead(i)) break;
		}
	}
	if(UnLock()<0) return;
}


bool XExtIPCEvent::DumpInfo ()
{
	printf("Commands Information:\n");
	if(Lock()<0) return false;
	for(int i=0;m_pSMQueue!=0&&i<IPC_QUEUE_NUMBER;i++)
	{
		if(i!=0)
			m_pSMQueue->DumpQueue(i, true);
		else
			m_pSMQueue->DumpQueue(i, false);
	}
	if(UnLock()<0) return false;
	printf("\n");

	return true;
}

//////////////////////////////////////////////////////////////////////////////

int XExtIPCEventCreate(int ProcessID, int bIsMasterProcess)
{
	return XExtIPCEvent::GetInstance()->Create(ProcessID,bIsMasterProcess?true:false);
}

void XExtIPCEventRelease (int bIsMasterProcess)
{
	XExtIPCEvent::GetInstance()->Release(bIsMasterProcess?true:false);
}

int XExtIPCEventSend(IPCEventInfo* pInEventInfo, IPCEventInfo* pOutEventInfo, int nTimeout /* = -1 */)
{
	return XExtIPCEvent::GetInstance()->SendIPCEvent(pInEventInfo, pOutEventInfo, nTimeout);
}

int XExtIPCEventPost(IPCEventInfo* pInEventInfo)
{
	return XExtIPCEvent::GetInstance()->PostIPCEvent(pInEventInfo);
}

int XExtIPCEventQueryIPCEvent(IPCEventInfo* pOutEventInfo, SME_IPC_EVENT_CALLBACK_T pfnCallbak)
{
	return XExtIPCEvent::GetInstance()->QueryIPCEvent(pOutEventInfo, pfnCallbak);
}

void XExtIPCEventRecycleEvent(int idProcess)
{
	XExtIPCEvent::GetInstance()->RecycleEvent(idProcess);
}

bool XExtIPCEventDumpInfo ()
{
	return XExtIPCEvent::GetInstance()->DumpInfo();
}


By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The GNU Lesser General Public License (LGPLv3)


Written By
Software Developer (Senior)
United States United States
Alex "Question is more important than the answer."

Comments and Discussions