Click here to Skip to main content
15,895,606 members
Articles / Programming Languages / C++

A Simple and Easy to Use IOCP Server Framework

Rate me:
Please Sign up or sign in to vote.
3.43/5 (8 votes)
30 Jun 2008LGPL33 min read 44.2K   1.7K   35  
An introduction to SPServer Framework
/*
 * Copyright 2007 Stephen Liu
 * For license terms, see the file COPYING along with this library.
 */

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <errno.h>
#include <signal.h>

#include "spporting.hpp"
#include "spthread.hpp"

#include "spdispatcher.hpp"

#include "speventcb.hpp"
#include "sphandler.hpp"
#include "spsession.hpp"
#include "spexecutor.hpp"
#include "sputils.hpp"
#include "spiochannel.hpp"
#include "spioutils.hpp"

#include "event_msgqueue.h"

SP_Dispatcher :: SP_Dispatcher( SP_CompletionHandler * completionHandler, int maxThreads )
{
#ifdef SIGPIPE
	/* Don't die with SIGPIPE on remote read shutdown. That's dumb. */
	signal( SIGPIPE, SIG_IGN );
#endif

	mIsShutdown = 0;
	mIsRunning = 0;

	mEventArg = new SP_EventArg( 600 );

	mMaxThreads = maxThreads > 0 ? maxThreads : 4;

	mCompletionHandler = completionHandler;

	mPushQueue = msgqueue_new( mEventArg->getEventBase(), 0, onPush, mEventArg );
}

SP_Dispatcher :: ~SP_Dispatcher()
{
	if( 0 == mIsRunning ) sleep( 1 );

	shutdown();

	for( ; mIsRunning; ) sleep( 1 );

	//msgqueue_destroy( (struct event_msgqueue*)mPushQueue );

	delete mEventArg;
	mEventArg = NULL;
}

void SP_Dispatcher :: setTimeout( int timeout )
{
	mEventArg->setTimeout( timeout );
}

void SP_Dispatcher :: shutdown()
{
	mIsShutdown = 1;
}

int SP_Dispatcher :: isRunning()
{
	return mIsRunning;
}

int SP_Dispatcher :: getSessionCount()
{
	return mEventArg->getSessionManager()->getCount();
}

int SP_Dispatcher :: getReqQueueLength()
{
	return mEventArg->getInputResultQueue()->getLength();
}

int SP_Dispatcher :: dispatch()
{
	int ret = -1;

	sp_thread_attr_t attr;
	sp_thread_attr_init( &attr );
	assert( sp_thread_attr_setstacksize( &attr, 1024 * 1024 ) == 0 );
	sp_thread_attr_setdetachstate( &attr, SP_THREAD_CREATE_DETACHED );

	sp_thread_t thread;
	ret = sp_thread_create( &thread, &attr, eventLoop, this );
	sp_thread_attr_destroy( &attr );
	if( 0 == ret ) {
		sp_syslog( LOG_NOTICE, "Thread #%ld has been created for dispatcher", thread );
	} else {
		mIsRunning = 0;
		sp_syslog( LOG_WARNING, "Unable to create a thread for dispatcher, %s",
			strerror( errno ) ) ;
	}

	return ret;
}

sp_thread_result_t SP_THREAD_CALL SP_Dispatcher :: eventLoop( void * arg )
{
	SP_Dispatcher * dispatcher = (SP_Dispatcher*)arg;

	dispatcher->mIsRunning = 1;

	dispatcher->start();

	dispatcher->mIsRunning = 0;

	return 0;
}

void SP_Dispatcher :: outputCompleted( void * arg )
{
	SP_CompletionHandler * handler = ( SP_CompletionHandler * ) ((void**)arg)[0];
	SP_Message * msg = ( SP_Message * ) ((void**)arg)[ 1 ];

	handler->completionMessage( msg );

	free( arg );
}

int SP_Dispatcher :: start()
{
	SP_Executor workerExecutor( mMaxThreads, "work" );
	SP_Executor actExecutor( 1, "act" );

	/* Start the event loop. */
	while( 0 == mIsShutdown ) {
		event_base_loop( mEventArg->getEventBase(), EVLOOP_ONCE );

		for( ; NULL != mEventArg->getInputResultQueue()->top(); ) {
			SP_Task * task = (SP_Task*)mEventArg->getInputResultQueue()->pop();
			workerExecutor.execute( task );
		}

		for( ; NULL != mEventArg->getOutputResultQueue()->top(); ) {
			SP_Message * msg = (SP_Message*)mEventArg->getOutputResultQueue()->pop();

			void ** arg = ( void** )malloc( sizeof( void * ) * 2 );
			arg[ 0 ] = (void*)mCompletionHandler;
			arg[ 1 ] = (void*)msg;

			actExecutor.execute( outputCompleted, arg );
		}
	}

	sp_syslog( LOG_NOTICE, "Dispatcher is shutdown." );

	return 0;
}

typedef struct tagSP_PushArg {
	int mType;      // 0 : fd, 1 : timer

	// for push fd
	int mFd;
	SP_Handler * mHandler;
	SP_IOChannel * mIOChannel;
	int mNeedStart;

	// for push timer
	struct timeval mTimeout;
	struct event mTimerEvent;
	SP_TimerHandler * mTimerHandler;
	SP_EventArg * mEventArg;
	void * mPushQueue;
} SP_PushArg_t;

void SP_Dispatcher :: onPush( void * queueData, void * arg )
{
	SP_PushArg_t * pushArg = (SP_PushArg_t*)queueData;
	SP_EventArg * eventArg = (SP_EventArg*)arg;

	if( 0 == pushArg->mType ) {
		SP_Sid_t sid;
		sid.mKey = eventArg->getSessionManager()->allocKey( &sid.mSeq );
		assert( sid.mKey > 0 );

		SP_Session * session = new SP_Session( sid );

		eventArg->getSessionManager()->put( sid.mKey, sid.mSeq, session );

		session->setHandler( pushArg->mHandler );
		session->setIOChannel( pushArg->mIOChannel );
		session->setArg( eventArg );

		event_set( session->getReadEvent(), pushArg->mFd, EV_READ,
				SP_EventCallback::onRead, session );
		event_set( session->getWriteEvent(), pushArg->mFd, EV_WRITE,
				SP_EventCallback::onWrite, session );

		if( pushArg->mNeedStart ) {
			SP_EventHelper::doStart( session );
		} else {
			SP_EventCallback::addEvent( session, EV_WRITE, pushArg->mFd );
			SP_EventCallback::addEvent( session, EV_READ, pushArg->mFd );
		}

		free( pushArg );
	} else {
		event_set( &( pushArg->mTimerEvent ), -1, 0, onTimer, pushArg );
		event_base_set( eventArg->getEventBase(), &( pushArg->mTimerEvent ) );
		event_add( &( pushArg->mTimerEvent ), &( pushArg->mTimeout ) );
	}
}

int SP_Dispatcher :: push( int fd, SP_Handler * handler, int needStart )
{
	SP_IOChannel * ioChannel = new SP_DefaultIOChannel();
	return push( fd, handler, ioChannel, needStart );
}

int SP_Dispatcher :: push( int fd, SP_Handler * handler,
		SP_IOChannel * ioChannel, int needStart )
{
	SP_PushArg_t * arg = (SP_PushArg_t*)malloc( sizeof( SP_PushArg_t ) );
	arg->mType = 0;
	arg->mFd = fd;
	arg->mHandler = handler;
	arg->mIOChannel = ioChannel;
	arg->mNeedStart = needStart;

	SP_IOUtils::setNonblock( fd );

	return msgqueue_push( (struct event_msgqueue*)mPushQueue, arg );
}

void SP_Dispatcher :: onTimer( int, short, void * arg )
{
	SP_PushArg_t * pushArg = (SP_PushArg_t*)arg;

	pushArg->mEventArg->getInputResultQueue()->push(
		new SP_SimpleTask( timer, pushArg, 1 ) );
}

void SP_Dispatcher :: timer( void * arg )
{
	SP_PushArg_t * pushArg = (SP_PushArg_t*)arg;
	SP_TimerHandler * handler = pushArg->mTimerHandler;
	SP_EventArg * eventArg = pushArg->mEventArg;

	SP_Sid_t sid;
	sid.mKey = SP_Sid_t::eTimerKey;
	sid.mSeq = SP_Sid_t::eTimerSeq;
	SP_Response * response = new SP_Response( sid );
	if( 0 == handler->handle( response, &( pushArg->mTimeout ) ) ) {
		msgqueue_push( (struct event_msgqueue*)pushArg->mPushQueue, arg );
	} else {
		delete pushArg->mTimerHandler;
		free( pushArg );
	}

	msgqueue_push( (struct event_msgqueue*)eventArg->getResponseQueue(), response );
}

int SP_Dispatcher :: push( const struct timeval * timeout, SP_TimerHandler * handler )
{
	SP_PushArg_t * arg = (SP_PushArg_t*)malloc( sizeof( SP_PushArg_t ) );

	arg->mType = 1;
	arg->mTimeout = *timeout;
	arg->mTimerHandler = handler;
	arg->mEventArg = mEventArg;
	arg->mPushQueue = mPushQueue;

	return msgqueue_push( (struct event_msgqueue*)mPushQueue, arg );
}

int SP_Dispatcher :: push( SP_Response * response )
{
	return msgqueue_push( (struct event_msgqueue*)mEventArg->getResponseQueue(), response );
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The GNU Lesser General Public License (LGPLv3)


Written By
China China
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.

Comments and Discussions