- acp.zip
- ACP
- ACP_Demo
- ACP_Source
- Assemblies
|
#region Change Log
/**
* <ChangeLog>
* <Author>Adityanand Pasumarthi</Author>
* <ContactInfo>
* <e-mail>adiccc@yahoo.com</e-mail>
* </ContactInfo>
* </ChangeLog>
* */
#endregion
using System;
using System.Collections;
using System.Threading;
namespace ACP
{
[Serializable()]
internal class QMessage
{
internal bool CloseMsg;
internal MessageType MsgType;
internal object Message;
internal ManualResetEvent CloseEvent;
}
/// <summary>
/// Logical Execution Context implementation. This Context implmentation
/// stores all the messages queued to it in a in-memory synchronized queue.
/// It would be transparently extended in coming versions to store messages in
/// MSMQ based queue.
/// </summary>
public class Context : Resource, IContext
{
#region Contructor/Destructor (Public)
internal Context(IContextManager ctxMgr)
{
if (ctxMgr != null)
{
_owningContextManager = new WeakReference(ctxMgr);
_ctxMsgQType = ContextMsgQType.ContextLevel;
_msgProcType = "";
InitializeContext();
}
else
{
throw ACPException.NewACPException(
new ArgumentNullException("ctxMgr","Context should always have a owning Context Manager."));
}
}
internal Context(IContextManager ctxMgr,ContextMsgQType ctxMsgQType)
{
if (ctxMgr != null)
{
_owningContextManager = new WeakReference(ctxMgr);
_ctxMsgQType = ctxMsgQType;
_msgProcType = "";
InitializeContext();
}
else
{
throw ACPException.NewACPException(
new ArgumentNullException("ctxMgr","Context should always have a owning Context Manager."));
}
}
internal Context(IContextManager ctxMgr,ContextMsgQType ctxMsgQType,string msgProcType)
{
if (ctxMgr != null)
{
_owningContextManager = new WeakReference(ctxMgr);
_ctxMsgQType = ctxMsgQType;
_msgProcType = msgProcType;
InitializeContext();
}
else
{
throw ACPException.NewACPException(
new ArgumentNullException("ctxMgr","Context should always have a owning Context Manager."));
}
}
#endregion
#region IContext Members
public string ID
{
get
{
return _ctxID;
}
}
public string TranasactionID
{
get
{
return _ctxTransactionID;
}
}
public IMessageProcessor MessageProcessor
{
get
{
return _msgProcessor;
}
}
public ContextMsgQType MessageQType
{
get
{
return _ctxMsgQType;
}
}
public IContextProcessor ContextProcessor
{
get
{
IContextProcessor ctxProc = null;
if (_ctxProcessor != null)
{
ctxProc = _ctxProcessor.Target as IContextProcessor;
}
return ctxProc;
}
}
public IContextManager ContextManager
{
get
{
return _owningContextManager.Target as IContextManager;
}
}
public void AttachToProcessor(IContextProcessor ctxProcessor)
{
_ctxProcessor = new WeakReference(ctxProcessor);
}
public void DetachFromProcessor(IContextProcessor ctxProcessor)
{
if (_ctxProcessor != null)
{
if (ctxProcessor == (_ctxProcessor.Target as IContextProcessor))
{
_ctxProcessor = null;
}
}
}
public void QueueMessage(IMessage msg,bool ensureCtxProcessor)
{
QueueMessage(msg,ensureCtxProcessor,false);
}
public bool ProcessNextMessage(ref long remMsgCount)
{
bool result = false;
try
{
Queue ctxQueue = GetContextQueue();
if (ctxQueue != null)
{
QMessage qMsg = (QMessage) ctxQueue.Dequeue();
if (qMsg != null)
{
if (qMsg.CloseMsg == true)
{
qMsg.CloseEvent.Set();
result = true;
}
else
{
IMessageProcessor msgProc = this.MessageProcessor;
if (msgProc != null)
{
IMessage msg = null;
// if the Message type is Serializable, then deserialise
// the message before processing
if (qMsg.MsgType == MessageType.Serializable)
{
msg = msgProc.DeserializeMessage((string)qMsg.Message);
}
else
msg = (IMessage)qMsg.Message;
msgProc.ProcessMessage(msg);
result = true;
}
else
{
throw ACPException.NewACPException(
new Exception("Message Processor is not available to process messages for this Context"));
}
}
}
remMsgCount = ctxQueue.Count;
}
}
catch(Exception e)
{
throw ACPException.NewACPException(e);
}
return result;
}
public long MessageCount
{
get
{
long msgCount = 0;
Queue ctxQueue = GetContextQueue();
if (ctxQueue != null)
{
msgCount = ctxQueue.Count;
}
return msgCount;
}
}
public bool WaitOnAsyncOperations(int msTimeOut)
{
// Check if we are already waiting on a Close event
if (_closeEvent.WaitOne(0,false) == false)
return _closeEvent.WaitOne(msTimeOut,false);
else
{
// Post an internal close message to the context Queue
// amd wait on the close event
_closeEvent.Reset();
QueueMessage(null,true,true);
return _closeEvent.WaitOne(msTimeOut,false);
}
}
#endregion
#region Resource Class Overrides (Protected)
protected override void OnDestroy()
{
// First Set our CloseEvent
_closeEvent.Set();
// Loop through the Q and get the Messages
Queue ctxQ = GetContextQueue();
foreach(QMessage qMsg in ctxQ)
{
if (qMsg.MsgType == MessageType.Reference)
{
IMessage msg = (IMessage)qMsg.Message;
if (msg != null)
{
msg.Dispose();
}
}
}
}
#endregion
#region Instance Member Functions (Private)
private Queue GetContextQueue()
{
Queue ctxQueue = null;
switch(_ctxMsgQType)
{
case ContextMsgQType.ContextLevel:
ctxQueue = _ctxLoaclMsgQueue;
break;
case ContextMsgQType.Global:
ctxQueue = _ctxGlobalMsgQueue;
break;
}
return ctxQueue;
}
private void QueueMessage(IMessage msg,bool ensureCtxProcessor,bool isCloseMsg)
{
try
{
Queue ctxQueue = GetContextQueue();
IMessageProcessor msgProc = this.MessageProcessor;
if (ctxQueue != null)
{
if (msgProc != null)
{
// Prepare an internal message for queuing
QMessage qMsg = new QMessage();
if (isCloseMsg == true)
{
qMsg.CloseMsg = isCloseMsg;
qMsg.MsgType = MessageType.Reference;
qMsg.CloseEvent = _closeEvent;
}
else
{
qMsg.MsgType = msg.MsgType;
// if the Message is serializable type, then we should serialize
// it before sending to our Queue
if (msg.MsgType == MessageType.Serializable)
{
qMsg.Message = msgProc.SerializeMessage(msg);
}
else
qMsg.Message = msg;
}
ctxQueue.Enqueue(qMsg);
if ((ensureCtxProcessor == true) && (_ctxProcessor != null))
{
IContextProcessor ctxProc = (IContextProcessor)_ctxProcessor.Target;
if (ctxProc != null)
{
ctxProc.NotifyOnMessage(this as IContext);
}
else
{
throw ACPException.NewACPException(
new Exception("Context Processor is not available to process messages for this Context"));
}
}
else
{
throw ACPException.NewACPException(
new Exception("Context Processor is not available to process messages for this Context"));
}
}
else
{
throw ACPException.NewACPException(
new Exception("Message Processor not available for the context"));
}
}
else
{
throw ACPException.NewACPException(
new Exception("Message Queue is not available to queue messages for this Context"));
}
}
catch(Exception e)
{
throw ACPException.NewACPException(e);
}
}
private void InitializeContext()
{
_msgProcessor = ProviderManager.GetMessageProcessor(_msgProcType);
if (_msgProcessor == null)
{
throw ACPException.NewACPException(
new Exception("Failed to get the Message Processor for the Context"));
}
}
#endregion
#region Instance Data Members (Private)
private string _msgProcType = "";
private string _ctxID = DateTime.Now.Ticks.ToString();
private string _ctxTransactionID = null;
private ContextMsgQType _ctxMsgQType = ContextMsgQType.ContextLevel;
private WeakReference _ctxProcessor = null;
private WeakReference _owningContextManager = null;
private IMessageProcessor _msgProcessor = null;
private Queue _ctxLoaclMsgQueue = Queue.Synchronized(new Queue());
private ManualResetEvent _closeEvent = new ManualResetEvent(true);
#endregion
#region Static Data Members (Private)
private static Queue _ctxGlobalMsgQueue = Queue.Synchronized(new Queue());
#endregion
}
}
|
By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.
If a file you wish to view isn't highlighted, and is a text file (not binary), please
let us know and we'll add colourisation support for it.
Software Professional with 14+ Years of experience in design & development of server products using Microsoft Technologies.
Woked/Working on server side product development using Managed C++ & C#, including Thread pools, Asynchronous Procedure Calls (APC), Inter Process Communication (IPC) using named pipes, Lock Free data structures in C++ & .Net, etc.