Click here to Skip to main content
13,189,324 members (66,504 online)
Click here to Skip to main content
Add your own
alternative version

Stats

6.8K views
18 bookmarked
Posted 5 Feb 2017

Message Broker Pattern using C#

, 5 Feb 2017
Rate this:
Please Sign up or sign in to vote.
This article outlines the C# code implementation for the popular message broker pattern generally used in common problems which involves brokering of messages with arbitrary type.

Introduction

This article extensively covers the C# code implementation for the message broker pattern typically found as a solution to the message brokering / routing in enterprise software products. A detailed description about the pattern is available at the following URLs:

Background

In many enterprise software products, it is required to route the messages across the components of the product, with the conditions that:

  • the routing of messages should not be type aware, means the component actually routing the messages should not bother about the type of message
  • both the message publisher and message receiver in the route channel should be decoupled, means the publisher doesn't need to be aware who the message subscribers are. Similarly, the subscribers need not be aware of the originator of the message
  • publisher can also be a subscriber for the type of message it intends to receive.

The message broker / exchange is illustrated in the diagram above, wherein the direction of arrow from the component towards the message (A, B, etc.) represents publishing, whereas the arrow direction from message to the component represents subscribing. Further, the publishers are completely transparent from publishing / consuming mechanism as well as the actual consumers.

Please note that the message broker pattern described in this article is for the solution within the process context, and does not describe the brokering / routing of the messages across the distributed systems. For such scale of systems, we already have enterprise message brokers, such as Kafka, Azure Service Bus queue, etc.

Using the Code

Consider the following core level interface, which defines a contract for a message broker. As outlined therein, the method Publish<T>() is a generic publisher method of any payload of type T. Typically, the originator will call this method to publish a message of type T. The method Subscribe<T>() is called by the client to subscribe to a message of type T. Please note that the subscriber hooks the handler action method to receive the message payload and perform the action over it accordingly.

namespace MessageBroker
{
    using System;
    public interface IMessageBroker : IDisposable
    {
        void Publish<T>(object source, T message);
        void Subscribe<T>(Action<MessagePayload<T>> subscription);
        void Unsubscribe<T>(Action<MessagePayload<T>> subscription);
    }
}

The type MessagePayload is a generic type, carrying the original message T. The properties - Who, What and When are the properties describing the source, content and time of publishing respectively. The class is outlined below:

namespace MessageBroker
{
    using System;
    public class MessagePayload<T>
    {
        public object Who { get; private set; }
        public T What { get; private set; }
        public DateTime When { get; private set; }
        public MessagePayload(T payload, object source)
        {
            Who = source; What = payload; When = DateTime.UtcNow;
        }
    }
}

The implementation of the above interface is outlined in the code below, where the broker is implemented as a singleton instance. Please note that the broker needs to be a singleton to ensure that all the messages are routed through that instance only.

namespace MessageBroker
{
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading.Tasks;
    public class MessageBrokerImpl : IMessageBroker
    {
        private static MessageBrokerImpl _instance;
        private readonly Dictionary<Type, List<Delegate>> _subscribers;
        public static MessageBrokerImpl Instance
        {
            get
            {
                if (_instance == null)
                    _instance = new MessageBrokerImpl();
                return _instance;
            }
        }

        private MessageBrokerImpl()
        {
            _subscribers = new Dictionary<Type, List<Delegate>>();
        }

        public void Publish<T>(object source, T message)
        {
            if (message == null || source == null)
                return;
            if(!_subscribers.ContainsKey(typeof(T)))
            {
                return;
            }
            var delegates = _subscribers[typeof(T)];
            if (delegates == null || delegates.Count == 0) return;
            var payload = new MessagePayload<T>(message, source);
            foreach(var handler in delegates.Select
            (item => item as Action<MessagePayload<T>>))
            {
                Task.Factory.StartNew(() => handler?.Invoke(payload));
            }
        }

        public void Subscribe<T>(Action<MessagePayload<T>> subscription)
        {
            var delegates = _subscribers.ContainsKey(typeof(T)) ? 
                            _subscribers[typeof(T)] : new List<Delegate>();
            if(!delegates.Contains(subscription))
            {
                delegates.Add(subscription);
            }
            _subscribers[typeof(T)] = delegates;
        }

        public void Unsubscribe<T>(Action<MessagePayload<T>> subscription)
        {
            if (!_subscribers.ContainsKey(typeof(T))) return;
            var delegates = _subscribers[typeof(T)];
            if (delegates.Contains(subscription))
                delegates.Remove(subscription);
            if (delegates.Count == 0)
                _subscribers.Remove(typeof(T));
        }

        public void Dispose()
        {
            _subscribers?.Clear();
        }
    }
}

The implementation of the message broker interface maintains a centralized dictionary of message type against its list of subscribers. Each Subscribe<T>() call will populate this dictionary with type T as key. Whereas, the call to Unsubscribe<T>() will ensure either the key is removed from the dictionary or the subscribing action method is removed from the list representing the subscribers for the type T.

Points of Interest

The message payload class, as outlined above just represents three properties. But in the enterprise solution, the same payload can be used to carry any additional message attribute(s) as required. Further, the generic type T represents a class. But the message can represent a root class representing a facade of different concrete message instances.

History

  • Version 0.1 of this solution is published as above

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

cjmakwana
Architect
India India
Chandresh is a 1999 batch engineering graduate in E & C engineering, a certified UML practitioner and established system architect with a broad 14+ years of industry experience. With different capabilities and roles, he has worked on the enterprise systems development for various domains - GIS / Geo-spatial Engineering, Surveillance and Lawful Interception, Incident Commanding and Life Safety, Public Services, Disaster Management and so. He has been pivotal in several projects involving wearable device communication design with the enterprise cloud using Industrial IoT devices and protocols.
He is a person with an out-of the box thinker and down-to-earth attitude, having passion and interest in music and culture. He loves the sports, especially badminton and chess.

You may also be interested in...

Pro
Pro

Comments and Discussions

 
QuestionDealing with concurrency Pin
Member 360424415-May-17 3:02
memberMember 360424415-May-17 3:02 
AnswerRe: Dealing with concurrency Pin
cjmakwana9-Jun-17 4:11
membercjmakwana9-Jun-17 4:11 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

Permalink | Advertise | Privacy | Terms of Use | Mobile
Web02 | 2.8.171016.2 | Last Updated 5 Feb 2017
Article Copyright 2017 by cjmakwana
Everything else Copyright © CodeProject, 1999-2017
Layout: fixed | fluid