Click here to Skip to main content
12,761,780 members (42,188 online)
Click here to Skip to main content
Add your own
alternative version

Stats

3.4K views
17 bookmarked
Posted 5 Feb 2017

Message Broker Pattern using C#

, 5 Feb 2017 CPOL
Rate this:
Please Sign up or sign in to vote.
This article outlines the C# code implementation for the popular message broker pattern generally used in common problems which involves brokering of messages with arbitrary type.

Introduction

This article extensively covers the C# code implementation for the message broker pattern typically found as a solution to the message brokering / routing in enterprise software products. A detailed description about the pattern is available at the following URLs:

Background

In many enterprise software products, it is required to route the messages across the components of the product, with the conditions that:

  • the routing of messages should not be type aware, means the component actually routing the messages should not bother about the type of message
  • both the message publisher and message receiver in the route channel should be decoupled, means the publisher doesn't need to be aware who the message subscribers are. Similarly, the subscribers need not be aware of the originator of the message
  • publisher can also be a subscriber for the type of message it intends to receive.

The message broker / exchange is illustrated in the diagram above, wherein the direction of arrow from the component towards the message (A, B, etc.) represents publishing, whereas the arrow direction from message to the component represents subscribing. Further, the publishers are completely transparent from publishing / consuming mechanism as well as the actual consumers.

Please note that the message broker pattern described in this article is for the solution within the process context, and does not describe the brokering / routing of the messages across the distributed systems. For such scale of systems, we already have enterprise message brokers, such as Kafka, Azure Service Bus queue, etc.

Using the Code

Consider the following core level interface, which defines a contract for a message broker. As outlined therein, the method Publish<T>() is a generic publisher method of any payload of type T. Typically, the originator will call this method to publish a message of type T. The method Subscribe<T>() is called by the client to subscribe to a message of type T. Please note that the subscriber hooks the handler action method to receive the message payload and perform the action over it accordingly.

namespace MessageBroker
{
    using System;
    public interface IMessageBroker : IDisposable
    {
        void Publish<T>(object source, T message);
        void Subscribe<T>(Action<MessagePayload<T>> subscription);
        void Unsubscribe<T>(Action<MessagePayload<T>> subscription);
    }
}

The type MessagePayload is a generic type, carrying the original message T. The properties - Who, What and When are the properties describing the source, content and time of publishing respectively. The class is outlined below:

namespace MessageBroker
{
    using System;
    public class MessagePayload<T>
    {
        public object Who { get; private set; }
        public T What { get; private set; }
        public DateTime When { get; private set; }
        public MessagePayload(T payload, object source)
        {
            Who = source; What = payload; When = DateTime.UtcNow;
        }
    }
}

The implementation of the above interface is outlined in the code below, where the broker is implemented as a singleton instance. Please note that the broker needs to be a singleton to ensure that all the messages are routed through that instance only.

namespace MessageBroker
{
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading.Tasks;
    public class MessageBrokerImpl : IMessageBroker
    {
        private static MessageBrokerImpl _instance;
        private readonly Dictionary<Type, List<Delegate>> _subscribers;
        public static MessageBrokerImpl Instance
        {
            get
            {
                if (_instance == null)
                    _instance = new MessageBrokerImpl();
                return _instance;
            }
        }

        private MessageBrokerImpl()
        {
            _subscribers = new Dictionary<Type, List<Delegate>>();
        }

        public void Publish<T>(object source, T message)
        {
            if (message == null || source == null)
                return;
            if(!_subscribers.ContainsKey(typeof(T)))
            {
                return;
            }
            var delegates = _subscribers[typeof(T)];
            if (delegates == null || delegates.Count == 0) return;
            var payload = new MessagePayload<T>(message, source);
            foreach(var handler in delegates.Select
            (item => item as Action<MessagePayload<T>>))
            {
                Task.Factory.StartNew(() => handler?.Invoke(payload));
            }
        }

        public void Subscribe<T>(Action<MessagePayload<T>> subscription)
        {
            var delegates = _subscribers.ContainsKey(typeof(T)) ? 
                            _subscribers[typeof(T)] : new List<Delegate>();
            if(!delegates.Contains(subscription))
            {
                delegates.Add(subscription);
            }
            _subscribers[typeof(T)] = delegates;
        }

        public void Unsubscribe<T>(Action<MessagePayload<T>> subscription)
        {
            if (!_subscribers.ContainsKey(typeof(T))) return;
            var delegates = _subscribers[typeof(T)];
            if (delegates.Contains(subscription))
                delegates.Remove(subscription);
            if (delegates.Count == 0)
                _subscribers.Remove(typeof(T));
        }

        public void Dispose()
        {
            _subscribers?.Clear();
        }
    }
}

The implementation of the message broker interface maintains a centralized dictionary of message type against its list of subscribers. Each Subscribe<T>() call will populate this dictionary with type T as key. Whereas, the call to Unsubscribe<T>() will ensure either the key is removed from the dictionary or the subscribing action method is removed from the list representing the subscribers for the type T.

Points of Interest

The message payload class, as outlined above just represents three properties. But in the enterprise solution, the same payload can be used to carry any additional message attribute(s) as required. Further, the generic type T represents a class. But the message can represent a root class representing a facade of different concrete message instances.

History

  • Version 0.1 of this solution is published as above

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

cjmakwana
Architect
India India
No Biography provided

You may also be interested in...

Pro
Pro

Comments and Discussions

 
-- There are no messages in this forum --
Permalink | Advertise | Privacy | Terms of Use | Mobile
Web02 | 2.8.170217.1 | Last Updated 5 Feb 2017
Article Copyright 2017 by cjmakwana
Everything else Copyright © CodeProject, 1999-2017
Layout: fixed | fluid