- Codeproject.EventBroker.zip
- Codeproject.EventBroker
- Codeproject.EventBroker.Common
- Codeproject.EventBroker.Contracts
- Codeproject.EventBroker.Host
- Codeproject.EventBroker.Service
- Codeproject.EventBroker.sln
- Codeproject.EventBroker.suo
- Codeproject.EventBroker.TestMessagePublisher
- Codeproject.EventBroker.WebUI
- Lib
- Apache
- log4net
- 1.2.10.0
- Castle
- 1.2.0.6623
- Castle.Components.Binder.dll
- Castle.Components.Validator.dll
- Castle.Core.dll
- Castle.DynamicProxy2.dll
- Castle.MicroKernel.dll
- Castle.Windsor.dll
- Microsoft
- SignalR
- SignalR.dll
|
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.ServiceModel;
using Codeproject.EventBroker.Contracts.Service;
using Codeproject.EventBroker.Service.Data;
using System.Threading.Tasks;
using System.Configuration;
using Codeproject.EventBroker.Contracts.Faults;
using System.Messaging;
using Codeproject.EventBroker.Common;
using Codeproject.EventBroker.Service.Utils;
using Codeproject.EventBroker.Contracts.Data;
using Codeproject.EventBroker.Service.Extensions;
using Codeproject.EventBroker.Service.Services.Contracts;
using System.Threading;
namespace Codeproject.EventBroker.Service
{
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Multiple)]
public class EventBroker : IEventBroker
{
Dictionary<string, List<UniqueCallbackHandle>> eventNameToCallbackLookups = new Dictionary<string, List<UniqueCallbackHandle>>();
private static Object syncObj = new Object();
private static string inputQueueName ="";
private bool shouldRun = true;
private static readonly TimeSpan queueReadTimeOut = TimeSpan.FromSeconds(500);
private static readonly TimeSpan queuePeekTimeOut = TimeSpan.FromSeconds(30);
private IXmlParser xmlParser;
public EventBroker()
{
inputQueueName = ConfigurationManager.AppSettings["eventBrokerQueueName"].ToString();
StartCollectingMessage();
xmlParser = IOCManager.Instance.Container.Resolve<IXmlParser>();
}
public void StartCollectingMessage()
{
try
{
GetMessageFromQueue();
}
catch (Exception ex)
{
throw new FaultException<EventBrokerException>(new EventBrokerException(ex.Message), new FaultReason(ex.Message));
}
}
public void Subscribe(Guid subscriptionId, string[] eventNames)
{
try
{
CreateSubscription(subscriptionId, eventNames);
}
catch(Exception ex)
{
throw new FaultException<EventBrokerException>(new EventBrokerException(ex.Message), new FaultReason(ex.Message));
}
}
public void EndSubscription(Guid subscriptionId)
{
lock (syncObj)
{
//create new dictionary that will be populated by those remaining
Dictionary<string, List<UniqueCallbackHandle>> remainingEventNameToCallbackLookups =
new Dictionary<string, List<UniqueCallbackHandle>>();
foreach (KeyValuePair<string,List<UniqueCallbackHandle>> kvp in eventNameToCallbackLookups)
{
//get all the remaining subscribers whos session id is not the same as the one we wish to remove
List<UniqueCallbackHandle> remainingMessageSubscriptions =
kvp.Value.Where(x => x.CallbackSessionId != subscriptionId).ToList();
if (remainingMessageSubscriptions.Any())
{
remainingEventNameToCallbackLookups.Add(kvp.Key, remainingMessageSubscriptions);
}
}
//now left with only the subscribers that are subscribed
eventNameToCallbackLookups = remainingEventNameToCallbackLookups;
}
}
#region Private Methods
private void GetMessageFromQueue()
{
try
{
Task messageQueueReaderTask = Task.Factory.StartNew(() =>
{
using (MessageQueue queue = new MessageQueue(inputQueueName, QueueAccessMode.Receive))
{
queue.Formatter = new XmlMessageFormatter(new[] { typeof(string) });
while (shouldRun)
{
Message message = null;
try
{
if (!queue.IsEmpty())
{
LogManager.Log.Debug("Receiving queue message");
message = queue.Receive(queueReadTimeOut);
ProcessMessage(message);
}
}
catch (MessageQueueException e)
{
if (e.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout)
{
LogManager.Log.Warn("Message queue exception occured", e);
}
}
catch (Exception e)
{
// Write the message details to the Error queue
LogManager.Log.Warn("Exception occured", e);
}
}
}
}, TaskCreationOptions.LongRunning);
}
catch (AggregateException ex)
{
throw;
}
}
private void ProcessMessage(Message msmqMessage)
{
string messageBody = (string)msmqMessage.Body;
LogManager.Log.DebugFormat("ProcessMessage : {0}", messageBody);
RealTimeEventMessage messageToSendToSubscribers = xmlParser.ParseRawMsmqXml(messageBody);
if (messageToSendToSubscribers != null)
{
lock (syncObj)
{
List<Guid> deadSubscribers = new List<Guid>();
if (eventNameToCallbackLookups.ContainsKey(messageToSendToSubscribers.EventName))
{
List<UniqueCallbackHandle> uniqueCallbackHandles =
eventNameToCallbackLookups[messageToSendToSubscribers.EventName];
foreach (UniqueCallbackHandle uniqueCallbackHandle in uniqueCallbackHandles)
{
try
{
uniqueCallbackHandle.Callback.ReceiveStreamingResult(messageToSendToSubscribers);
}
catch(CommunicationObjectAbortedException coaex)
{
deadSubscribers.Add(uniqueCallbackHandle.CallbackSessionId);
}
}
}
//end all subcriptions for dead subscribers
foreach (Guid deadSubscriberId in deadSubscribers)
{
EndSubscription(deadSubscriberId);
}
}
}
}
private void CreateSubscription(Guid subscriptionId, string[] eventNames)
{
//Ensure that a subscription is created for each message type the subscriber wants to receive
lock (syncObj)
{
foreach (string eventName in eventNames)
{
if (!eventNameToCallbackLookups.ContainsKey(eventName))
{
List<UniqueCallbackHandle> currentCallbacks = new List<UniqueCallbackHandle>();
eventNameToCallbackLookups[eventName] = currentCallbacks;
}
eventNameToCallbackLookups[eventName].Add(
new UniqueCallbackHandle(subscriptionId, OperationContext.Current.GetCallbackChannel<IEventBrokerCallback>()));
}
}
}
#endregion
}
}
|
By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.
If a file you wish to view isn't highlighted, and is a text file (not binary), please
let us know and we'll add colourisation support for it.
I currently hold the following qualifications (amongst others, I also studied Music Technology and Electronics, for my sins)
- MSc (Passed with distinctions), in Information Technology for E-Commerce
- BSc Hons (1st class) in Computer Science & Artificial Intelligence
Both of these at Sussex University UK.
Award(s)
I am lucky enough to have won a few awards for Zany Crazy code articles over the years
- Microsoft C# MVP 2016
- Codeproject MVP 2016
- Microsoft C# MVP 2015
- Codeproject MVP 2015
- Microsoft C# MVP 2014
- Codeproject MVP 2014
- Microsoft C# MVP 2013
- Codeproject MVP 2013
- Microsoft C# MVP 2012
- Codeproject MVP 2012
- Microsoft C# MVP 2011
- Codeproject MVP 2011
- Microsoft C# MVP 2010
- Codeproject MVP 2010
- Microsoft C# MVP 2009
- Codeproject MVP 2009
- Microsoft C# MVP 2008
- Codeproject MVP 2008
- And numerous codeproject awards which you can see over at my blog
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.