Click here to Skip to main content
15,896,111 members
Articles / Programming Languages / C#

Asynchronous Context Processor

Rate me:
Please Sign up or sign in to vote.
4.41/5 (22 votes)
25 Aug 200414 min read 94.8K   732   45  
Asynchronous message processing infrastructure for .NET applications.
#region Change Log
/**
 * <ChangeLog>
 *	<Author>Adityanand Pasumarthi</Author>
 *	<ContactInfo>
 *		<e-mail>adiccc@yahoo.com</e-mail>
 *	</ContactInfo>
 * </ChangeLog>
 * */
#endregion

using System;
using System.Collections;
using System.Threading;

namespace ACP
{
	[Serializable()]
	internal class QMessage
	{
		internal bool CloseMsg;
		internal MessageType MsgType;
		internal object Message;
		internal ManualResetEvent CloseEvent;
	}
	/// <summary>
	/// Logical Execution Context implementation. This Context implmentation
	/// stores all the messages queued to it in a in-memory synchronized queue.
	/// It would be transparently extended in coming versions to store messages in
	/// MSMQ based queue.
	/// </summary>
	public class Context : Resource, IContext
	{
		#region Contructor/Destructor (Public)

		internal Context(IContextManager ctxMgr)
		{
			if (ctxMgr != null)
			{
				_owningContextManager = new WeakReference(ctxMgr);
				_ctxMsgQType = ContextMsgQType.ContextLevel;
				_msgProcType = "";
				InitializeContext();
			}
			else
			{
				throw ACPException.NewACPException(
					new ArgumentNullException("ctxMgr","Context should always have a owning Context Manager."));
			}
		}

		internal Context(IContextManager ctxMgr,ContextMsgQType ctxMsgQType)
		{
			if (ctxMgr != null)
			{
				_owningContextManager = new WeakReference(ctxMgr);
				_ctxMsgQType = ctxMsgQType;
				_msgProcType = "";
				InitializeContext();
			}
			else
			{
				throw ACPException.NewACPException(
					new ArgumentNullException("ctxMgr","Context should always have a owning Context Manager."));
			}
		}

		internal Context(IContextManager ctxMgr,ContextMsgQType ctxMsgQType,string msgProcType)
		{
			if (ctxMgr != null)
			{
				_owningContextManager = new WeakReference(ctxMgr);
				_ctxMsgQType = ctxMsgQType;
				_msgProcType = msgProcType;
				InitializeContext();
			}
			else
			{
				throw ACPException.NewACPException(
					new ArgumentNullException("ctxMgr","Context should always have a owning Context Manager."));
			}
		}

		#endregion

		#region IContext Members

		public string ID
		{
			get
			{
				return _ctxID;
			}
		}

		public string TranasactionID
		{
			get
			{
				return _ctxTransactionID;
			}
		}

		public IMessageProcessor MessageProcessor
		{
			get
			{
				return _msgProcessor;
			}
		}

		public ContextMsgQType MessageQType
		{
			get
			{
				return _ctxMsgQType;
			}
		}

		public IContextProcessor ContextProcessor
		{
			get
			{
				IContextProcessor ctxProc = null;
				if (_ctxProcessor != null)
				{
					ctxProc = _ctxProcessor.Target as IContextProcessor;
				}
				return ctxProc;
			}
		}

		public IContextManager ContextManager
		{
			get
			{
				return _owningContextManager.Target as IContextManager;
			}
		}

		public void AttachToProcessor(IContextProcessor ctxProcessor)
		{
			_ctxProcessor = new WeakReference(ctxProcessor);
		}

		public void DetachFromProcessor(IContextProcessor ctxProcessor)
		{
			if (_ctxProcessor != null)
			{
				if (ctxProcessor == (_ctxProcessor.Target as IContextProcessor))
				{
					_ctxProcessor = null;
				}
			}
		}

		public void QueueMessage(IMessage msg,bool ensureCtxProcessor)
		{
			QueueMessage(msg,ensureCtxProcessor,false);
		}

		public bool ProcessNextMessage(ref long remMsgCount)
		{
			bool result = false;
			try
			{
				Queue ctxQueue = GetContextQueue();
				if (ctxQueue != null)
				{
					QMessage qMsg = (QMessage) ctxQueue.Dequeue();
					if (qMsg != null)
					{
						if (qMsg.CloseMsg == true)
						{
							qMsg.CloseEvent.Set();
							result = true;
						}
						else
						{
							IMessageProcessor msgProc = this.MessageProcessor;
							if (msgProc != null)
							{
								IMessage msg = null;

								// if the Message type is Serializable, then deserialise
								// the message before processing
								if (qMsg.MsgType == MessageType.Serializable)
								{
									msg = msgProc.DeserializeMessage((string)qMsg.Message);
								}
								else
									msg = (IMessage)qMsg.Message;

								msgProc.ProcessMessage(msg);
								result = true;
							}
							else
							{
								throw ACPException.NewACPException(
									new Exception("Message Processor is not available to process messages for this Context"));
							}
						}
					}
					remMsgCount = ctxQueue.Count;
				}
			}
			catch(Exception e)
			{
				throw ACPException.NewACPException(e);
			}
			return result;
		}

		public long MessageCount
		{
			get
			{
				long msgCount = 0;
				Queue ctxQueue = GetContextQueue();
				if (ctxQueue != null)
				{
					msgCount = ctxQueue.Count;
				}
				return msgCount;
			}
		}

		public bool WaitOnAsyncOperations(int msTimeOut)
		{
			// Check if we are already waiting on a Close event
			if (_closeEvent.WaitOne(0,false) == false)
				return _closeEvent.WaitOne(msTimeOut,false);
			else
			{
				// Post an internal close message to the context Queue
				// amd wait on the close event
				_closeEvent.Reset();
				QueueMessage(null,true,true);
				return _closeEvent.WaitOne(msTimeOut,false);
			}
		}

		#endregion

		#region Resource Class Overrides (Protected)

		protected override void OnDestroy()
		{
			// First Set our CloseEvent
			_closeEvent.Set();

			// Loop through the Q and get the Messages
			Queue ctxQ = GetContextQueue();
			foreach(QMessage qMsg in ctxQ)
			{
				if (qMsg.MsgType == MessageType.Reference)
				{
					IMessage msg = (IMessage)qMsg.Message;
					if (msg != null)
					{
						msg.Dispose();
					}
				}
			}
		}

		#endregion

		#region Instance Member Functions (Private)

		private Queue GetContextQueue()
		{
			Queue ctxQueue = null;
			
			switch(_ctxMsgQType)
			{
				case ContextMsgQType.ContextLevel:
					ctxQueue = _ctxLoaclMsgQueue;
					break;
				case ContextMsgQType.Global:
					ctxQueue = _ctxGlobalMsgQueue;
					break;
			}

			return ctxQueue;
		}

		private void QueueMessage(IMessage msg,bool ensureCtxProcessor,bool isCloseMsg)
		{
			try
			{
				Queue ctxQueue = GetContextQueue();
				IMessageProcessor msgProc = this.MessageProcessor;
				if (ctxQueue != null) 
				{
					if (msgProc != null)
					{
						// Prepare an internal message for queuing
						QMessage qMsg = new QMessage();
						if (isCloseMsg == true)
						{
							qMsg.CloseMsg = isCloseMsg;
							qMsg.MsgType = MessageType.Reference;
							qMsg.CloseEvent = _closeEvent;
						}
						else
						{
							qMsg.MsgType = msg.MsgType;

							

							// if the Message is serializable type, then we should serialize
							// it before sending to our Queue
							if (msg.MsgType == MessageType.Serializable)
							{
								qMsg.Message = msgProc.SerializeMessage(msg);
							}
							else
								qMsg.Message = msg;
						}

						ctxQueue.Enqueue(qMsg);
						if ((ensureCtxProcessor == true) && (_ctxProcessor != null))
						{
							IContextProcessor ctxProc = (IContextProcessor)_ctxProcessor.Target;
							if (ctxProc != null)
							{
								ctxProc.NotifyOnMessage(this as IContext);
							}
							else
							{
								throw ACPException.NewACPException(
									new Exception("Context Processor is not available to process messages for this Context"));
							}
						}
						else
						{
							throw ACPException.NewACPException(
								new Exception("Context Processor is not available to process messages for this Context"));
						}
					}
					else
					{
						throw ACPException.NewACPException(
							new Exception("Message Processor not available for the context"));
					}
				}
				else
				{
					throw ACPException.NewACPException(
						new Exception("Message Queue is not available to queue messages for this Context"));
				}
			}
			catch(Exception e)
			{
				throw ACPException.NewACPException(e);
			}
		}

		private void InitializeContext()
		{
			_msgProcessor = ProviderManager.GetMessageProcessor(_msgProcType);
			if (_msgProcessor == null)
			{
				throw ACPException.NewACPException(
					new Exception("Failed to get the Message Processor for the Context"));
			}
		}
		#endregion

		#region Instance Data Members (Private)
		
		private string _msgProcType = "";
		private string _ctxID = DateTime.Now.Ticks.ToString();
		private string _ctxTransactionID = null;
		private ContextMsgQType _ctxMsgQType = ContextMsgQType.ContextLevel;
			
		private WeakReference _ctxProcessor = null;		
		private WeakReference _owningContextManager = null;
		private IMessageProcessor _msgProcessor = null;

		private Queue _ctxLoaclMsgQueue = Queue.Synchronized(new Queue());

		private ManualResetEvent _closeEvent = new ManualResetEvent(true);

		#endregion

		#region Static Data Members (Private)

		private static Queue _ctxGlobalMsgQueue = Queue.Synchronized(new Queue());

		#endregion
	}
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here


Written By
Architect
India India
Software Professional with 14+ Years of experience in design & development of server products using Microsoft Technologies.

Woked/Working on server side product development using Managed C++ & C#, including Thread pools, Asynchronous Procedure Calls (APC), Inter Process Communication (IPC) using named pipes, Lock Free data structures in C++ & .Net, etc.

Comments and Discussions