Click here to Skip to main content
14,134,933 members
Click here to Skip to main content
Add your own
alternative version

Stats

8K views
13 bookmarked
Posted 25 Apr 2018
Licenced CPOL

High Speed Message Bus using C# and MongoDB

, 25 Apr 2018
Rate this:
Please Sign up or sign in to vote.
High speed message bus using C# and MongoDB

Introduction

Modern enterprises have many systems, applications and services in their data centers. Many of these applications and services are required to communicate with each other to meet business requirements. To a large extent, Web services have standardized integration between diverse systems, applications and services. As web services operate on Http, it can transparently communicate with end points or with other services. In other words, we can use a single interface for service to service communication, and for service to end point communication. Direct service to service integration results in a tightly coupled architecture; where any change in one service can have ripple effects across other services.

Figure 1 -Direct web service to web service integration in a hypothetical banking enterprise architecture

For N number of services, we need N(N-1)/2 connections to make a fully interconnected system. With each service aware of interface of other services; a change in one service can have an impact on multiple services. If number of services are handful, then this design can work. As the number of services increase (which usually do in enterprises), the number of interconnections between services will increase exponentially; making maintenance of any such systems a developers' nightmare.

Message Oriented middleware (MOM) have been around for many years. The global adoption of micro-services has reinvigorated focus on high speed messaging systems, as a preferred intra-services communication mechanism. Messaging systems are components of distributed systems that allow application to application messaging. Messaging middlewares operate on store and forward design principle, to provide guaranteed message delivery and resilience against runtime disruptions. Messaging systems enable enterprises to achieve service-to-service integration; without many services interconnections.

In today's world, the ability to handle "Big data" and "data analytics" based decisions are key strategic advantages for enterprises. Data intensive systems require high speed, high throughput databases to handle large volume of data at lightening speed. MongoDB's abilities to handle large volume of structured/unstructured data, near zero object to table impedance, and in memory storage capabilities are such significant advantages over RDBMS that makes it a preferred choice for big data solutions. .

Most traditional messaging systems use proprietary technologies and can't provide real time messaging capabilities. Rabbit MQ is a popular choice for high speed messaging; however setup, configuration and operations of Rabbit MQ is a challenge.

Using MongoDB's capabilities of capped connections, and C# Generics a light weight, scalable, production ready messaging system can be created. In this article, I have explained the design, architecture and usage of message bus developed using MongoDB and C#. All source code has been published at GITHub.

Background

Message Bus is a logical perspective developed to create conceptual framework around operation of messaging systems. Concept of Message Bus is similar network architecture of a TCP/IP network. In an IP network, packets travel over a shared media; where shared media is acting like a data bus for IP packets to transfer from source to destination. The "shared media" in an IP network is a logical concept realized with the help of many network components and physical media and connections. Similar to IP network, message bus provide a logical shared media where "application messages" travel from one system to another at a very high speed. Different components like databases and message buffers work in synergy to realize the concept of shared media between systems connected to the message bus.

Unlike TCP/IP, destination address is not provided in the message. Messaging in message bus works on the principle of fire and forget. Every system, connected with the message bus, declares its interest in receiving messages of a certain type or certain source. Whenever a system wants to publish a message, it uses the standard interface provided by message bus to publish the message. Message bus stores the message, and then deliver it to all parties subscribed for that message type and source. All message publishing is asynchronous, i.e., publisher doesn't get blocked after sending the message. Message bus provides guaranteed message delivery and capability to gracefully handle runtime errors. This architecture allows message publishing to be completely independent from the message consumers, and enables both publisher and consumer of messages to evolve independent of each other.

Messages in the context of message bus are application level events or changes in the state of domain objects that are needed to be shared with other systems or services. For example, in a banking system, some type of messages can be:

  • Opening of a new account
  • Transfer of funds from one account to another
  • Activation of a credit card
  • Authorization of a transaction

Figure 1 shows a conceptual design of a message bus implementation in a hypothetical banking system with myriad of applications and services interconnected via message bus. When a customer opens a new bank account, the core banking system can send an account opening message. This message can be picked up by the CRM send to send email and SMS notification to the customer, by Anti money laundry (AML) system to apply governance policies, by central logging system to log the transaction details, and by analytics system to update dashboards and reports. Core banking system is not required to know details of consumers. Consumers can be easily added/removed/modified without the knowledge of the message publishing system.

Fig 2 - Example of a banking system interconnected with message bus

Message Bus Architecture

Message bus is developed by harnessing the power of capped collections of MongoDB, cursors and reflection APIs of C#. Each message type, which will be supported by the message bus, is configured in the "app.config" or "web.config" file for web applications. Upon initialization, message bus will create a new capped collection for each message type; if already not existing. Separate collection for each message type allows customization of each collection according to the size and number of messages needed to be stored in the collection. It also isolates access of one message type from another, with high frequency messages can't "starve" for low frequency messages. MongoDB's capped collections are circular queues, i.e., once a collection is full, next insertion of a message will automatically remove the oldest message from the collection. Read from capped collection is in the natural order of insertion.

public void initQueues(SupportedMessages mappings)
    {
        foreach(var mapping in mappings)
        {
            string collectionName = DBUtil.GetCollectionName(mapping);

            if (!IsCollectionExists(collectionName))
            {
                MongoDB.CreateCollection(collectionName, new CreateCollectionOptions
                    { Capped = true,
                    MaxDocuments = ConfigManager.GetIntParameter
                    (mapping.GetType().FullName, Constants.Max_NUMBER_OF_DOCUMENTS, 1000),
                    MaxSize = ConfigManager.GetIntParameter
                    (mapping.GetType().FullName,
                    Constants.MAX_COLLECTION_SIZE_INBYTES, 1024 * 1024) });
            }

            IMongoCollection<BsonDocument> collection =
                                  MongoDB.GetCollection<BsonDocument>(collectionName);
            collections.Add(collectionName, collection);
        }
    }

Message bus converts each message type into a generic "BusMessage" type. Message bus keeps track of messages delivered a particular application identified by Application info. In case of any disconnection with message bus, upon reconnection message subscriber will receive messages from the point of last message read. A new subscriber will start receiving new messages from the point of subscription. Historical messages for new subscriber will not be delivered. This arrangement will optimize the performance and memory usage of MongoDB. SendMessage of Bus Manager converts each message into BusMessage and stores it into respective collection. Send Message returns a Response object that contains information about the service request completion.

// internal structure of BusMessage

public class BusMessage<t>
{
    public ObjectId _id { get; set; }
    public t Message { get; set; }
    public string SenderApplicationName { get; set; }
    public bool DeleteOnRead { get; set; }
    public DateTime CreationTime { get; set; }
    public DateTime? TTL { get; set; }

    public BusMessage(string sender, t message, TimeSpan messageTTL)
    {
        this.Message = message;
        this.CreationTime = DateTime.Now;
        this.SenderApplicationName = sender;
        this.DeleteOnRead = false;

        if(messageTTL != default(TimeSpan))
        {
            TTL = DateTime.Now.Add(messageTTL);
        }
    }

    public BusMessage()
    {
    }
}

public class Response
{
    public bool Success { get; set; }
    public int ErrorCode { get; set; }
    public string ErrorMessage { get; set; }
    public object ResponseObject { get; set; }
}

//Send message implementation
public Response SendMessage
(t message, ApplicationInfo senderApplication, TimeSpan messageTTL = default(TimeSpan))
    {
        Logger.Debug(string.Format("{0} published a message on {1}. Message =
        {2}",senderApplication.ApplicationName,DateTime.Now,message.ToJSON()));
        Response defaultResonse = DefaultResponse();

        if (!SupportedMessages.GetInstance().IsTypeSupported(message.GetType()))
        {
            Logger.Debug("default response returned. response = "+defaultResonse.ToJSON());
            return defaultResonse;
        }

        if (storeManager.SaveMessage<t>
        (new BusMessage<t>(senderApplication.ApplicationName, message,messageTTL)))
        {
            defaultResonse.Success = true;
            defaultResonse.ErrorMessage = "Message published successfully";
            defaultResonse.ErrorCode = 0;
        }
        else
        {
            defaultResonse.ErrorMessage = "Failed to publish message";
        }

        Logger.Debug(defaultResonse.ToJSON());
        return defaultResonse;
    }

Having separate collection for each message type also decouples and isolates access of different message types. To monitor modifications in the capped collection, monitoring system opens a non expiring tailable cursor on the monitored collection. Monitoring of one collection from multiple sources doesn't have performance issues. This design makes it very easy to add new collections without any impact on existing ones.

Each capped collection acts like a circulate queue, which automatically replaces oldest item added to add new items. Capped collections also maintain a natural order in which items were added to the queue.

Fig 3 - Architecture of message bus

StorageManager class is responsible to manage all MongoDB interactions. To monitor message queues, BusManager creates an object of MessageMonitor. Each MessageMonitor creates a child thread that opens a cursor with the respective MongoDB collection. Any insertion in the capped collection is automatically picked up by the cursor and message monitor calls OnEvent or OnEvents method depending upon the number of messages received.

public void Subscribe(Subscriber<t> subscriber, ApplicationInfo listenerApplication)
{
    if(subscriber.IsNull() || listenerApplication.IsNull())
    {
        throw new ArgumentNullException();
    }

    if(!SupportedMessages.GetInstance().IsTypeSupported(typeof(t)))
    {
        throw new ArgumentException("Type not supported by MessageBus");
    }

    /**Here, assumption is that only subscriber will exist
            one app context for one type of message.
    if double entry is tried that system will ignore the subscription attempt. It is the
    responsibility of the caller to ensure that double subscription doesn't happen
     */

    if (!this.subscribers.ContainsKey(typeof(t).FullName))
    {
        MessageMonitor<t> messageMonitors =
                            new MessageMonitor<t>(subscriber, listenerApplication);
        this.subscribers.Add(typeof(t).FullName, messageMonitors);
    }
}

Usage

Usage of message bus requires three discrete steps:

  • Configuration
  • Message publishing
  • Subscription to a message type for message delivery

Configuration of Message Bus

Step 1

First step is to configure MongoDB's connection string. Parameters "MONGO_CONNECTION" represents MongoDB's connection URL, without schema name. Parameter MONGODB_NAME should contain the name of the MongoDB's database name.

  <appSettings>
      <add key="MONGO_CONNECTION" value="DEV"/>
      <add key="MONGODB_NAME" value="MessageBus"/>
  </appSettings>
  .
  .
  .
<connectionStrings>
  <add name="DEV" connectionString="mongodb://127.0.0.1:27017"/>
</connectionStrings>

Step 2

Step 2 is to configure configuration handler of MessageBus by adding a section under the <configurationsections> as shown in the list below. Next is to configure message types and their storage details.

Under <messagebus>, you can configure default parameters that are applicable to all message collections; if collection specific parameters are not configured, then default parameters will be used for collection creation. Two defaults parameters are:

  1. MaxDocuments = Maximum number of documents that can be stored in the collection
  2. MaxSize = Size of the collection (in bytes)

These parameters can also be specified for each message type; to fine tune storage for each collection. Under <supportedMessageTypes>, all message types should be configured. Any message type not configured under <supportedMessageTypes> will not be available for message publishing or subscription. Message types can be any type which is serializable. In a basic form, domain objects can be used as message types, or alternatively a context based messaging standard can be developed that can wrap domain objects and all relevant data in a message type. Meta data configured as part of standardized messaging can be used by subscribers to build flexible message handling mechanism.

<configSections>
  <section name="messageBus" type="MessageBus.Config.MessageBusSection,MessageBus"/>
</configSections>

<messageBus>
  <defaultParams>
    <param name="MaxDocuments" value="1000000"/>
    <param name="MaxSize" value="1070596096"/>
  </defaultParams>
  <supportedMessageTypes>
    <supportedType value="MessageBus.Message.Revenue">
      <param name="MaxDocuments" value="200000"/>
    </supportedType>
    <supportedType value="MessageBus.Message.Booking"/>
    <supportedType value="MessagePublishingTest.Events.FundsTransfer"/>
  </supportedMessageTypes>
</messageBus>

Publishing a Message

To demonstrate message publishing, a sample web application has been developed using Microsoft's standard web application template. In the web application, I have added a new menu "Funds Transfer" to simulate a hypothetical funds transfer scenario.

Sample web applications main screen

Using Funds Transfer menu, user can requests a transfer of funds from one account to another.

Example to show publishing of an event using message bus

To capture funds transfer details, a simple domain object "FundsTransfer" has been created.

public class FundsTransfer
{
    [Display(Name = "From Customer Name")]
    public string FromCustomerName { get; set; }

    [Display(Name ="Transfer to Customer Name")]
    public string ToCustomerName { get; set; }

    [Display(Name ="Amount")]
    public double Amount { get; set; }

    public DateTime TransactionTime { get; set; }

    public FundsTransfer()
    {
        FromCustomerName = ToCustomerName = string.Empty;
        TransactionTime = DateTime.Now;
    }
}

To keep this example simple, I have used domain object as the messaging type. Messages can be published by using BusManager interface. Reference to BusManager can be obtained through BusManagerFactory. Bus Manager is the single most important object that user will work with. It allows both publishing of a message and subscription for a message type.

namespace MessageBus.Interfaces
{
    public interface BusManager<t>
    {
        Response SendMessage(t message, ApplicationInfo senderApplication,
                                         TimeSpan messageTTL = default(TimeSpan));
        void Subscribe(Subscriber<t> subscriber, ApplicationInfo listenerApplication);
    }
}

Method SendMessage will be used to publish a message. First parameter in the method signature is the message to be published. Second Parameter is "ApplicationInfo" object, which represents identity of each application or service. Each application/service, which is connected to the MessageBus, must provide a unique MessageInfo. This information will be used by message bus in two ways.

First to track messages delivered to an application and delivering messages since last successful message delivery. Second to filter message subscription by message source. Message bus also allows subscriber to subscribe to a particular message type from a particular source, as identified by sources "applicationinfo" object.

Third parameter is optional. If message being published is time sensitive and loses its meaning after a certain time, then a duration can be provided to mark validity of message. Message will automatically expire after the duration provided in messageTTL has passed.

namespace MessageBus
{
    public class ApplicationInfo
    {
        public string ApplicationName { get; set; }
        public string ApplicationIP { get; set; }
        public int ApplicationPort { get; set; }
    }
}

The following code demonstrates publishing of a message "FundsTransfer" through the BusManager. Using BusManager, sending a message is a trivial task. As stated earlier, each component that connects with the message bus must have a unique Application name. Application names uniqueness is not enforced by the message bus. Using a duplicate application name will result in race condition between two applications. ApplicationInfo also helps subscriber to subscribe to a particular type of message from a particular application.

[HttpPost]
public ActionResult PublishMessage(FundsTransfer fundsTransfer)
{
    var factory = BusManagerFactory.GetBusManager<FundsTransfer>();
    factory.SendMessage(fundsTransfer, new ApplicationInfo{ApplicationName="WebApp" });
    return View(Index_VIEW, fundsTransfer);
}

Subscription to a Message

To monitor a message, "Subscriber" interface is needed to be implemented. Subscriber is a generic interface that allows a single implementation for a wide variety of message types.

namespace MessageBus.Interfaces
{
    public interface Subscriber<t>
    {
        List<ApplicationInfo> InterestedInSources { get; set; }
        void OnEvent(t message);
        void OnEvents(List<t> messages);
        void OnError(Exception exception);
    }
}

InterestedInSources is a list of "ApplicationInfo" objects that this subscriber is interested to monitor. If no value is provided, then subscriber will listen to all messages for the given type t.

Whenever a message of particular type 't" is published by an application, OnEvent method of the subscriber is called by the MessageBus. If more than one event have been fired simultaneously, then the OnEvents method is called with list of all messages published by all publishers. All messages will be in their natural order of insertion. For any runtime error condition OnError method is called, and underlying exception is provided as the argument. OnError method will enable the subscriber to handle runtime error and can be used for notifications or logging purposes.

Once a subscriber's instance is created, then it must be registered with the message bus using BusManager interface. To register a subscriber, "Subscribe" method is called on the BusManager of a type "t". Subscription will fail with an exception if the type defined is not supported by MessageBus, i.e., not configured in the configuration file. Calling subscribe method will automatically create a new child thread to monitor messages of that particular type.

Similar to publishing, Subscribe method also requires ApplicationInfo object to uniquely identify the message listeners. As stated earlier, MessageBus keeps track of messages delivered to a particular subscriber, as identified by its ApplicationInfo. For any reason, if the subscriber gets disconnected from the MessageBus, upon reconnection, all backlog of messages will be delivered to the subscriber by calling OnEvents method. Messages fired with a specified time to live (TTL) and their TLL duration has passed will not be delivered.

For demonstration purposes, a dumb subscriber has been created that does nothing but shows the message received on the console. In the "MessageMonitorTest" project, a small application has been created to demonstrate working of a subscriber.

namespace MessageBus.Message
{
    public class DummySubscriber<t> : Subscriber<t>
    {
        public List<string> InterestedInSources { get; set; }

        public void OnError(Exception exception)
        {
            Logger.Error(this,exception);
        }

        public void OnEvent(t message)
        {
            Console.WriteLine(message.ToJSON());
        }

        public void OnEvents(List<t> messages)
        {
            foreach(var message in messages)
            {
                OnEvent(message);
            }
        }
    }
}

namespace MessageMonitorTest
{
    public class MessageMonitor
    {
        private static ApplicationInfo AppInfo =
        new ApplicationInfo { ApplicationName = "ConsoleMoniter" };
        public static void Main(string[] args)
        {
            BusManager<FundsTransfer> FundsTransferBusManager =
                               ObjectFactory.GetBusManager<FundsTransfer>();
            FundsTransferBusManager.Subscribe
                                               (new DummySubscriber<FundsTransfer>(), AppInfo);
            WaitForExit();
        }

        private static void WaitForExit()
        {
            while(true)
            {
                Console.WriteLine("Do you want to exit?");
                string input = Console.ReadLine();
                if (input.IsNull()) continue;
                if ("exit".Equals(input.ToLower()))
                {
                    return;
                }else
                {
                    Console.WriteLine("Invalid command");
                }
            }
        }
    }
}

Conclusion

This article explains the necessary steps needed to be taken to use the MessageBus API. This project was created as part of a larger initiative to build loosely coupled enterprise applications. Since MongoDB was part of our applications architecture, we decide to leverage its capabilities, by molding it into a messaging system. Considering the simplicity and the value it brings to our applications design, we decide to publish it as an open source project. We hope that others will be able to contribute to further enhance its capabilities.

All future changes and bug fixes will be published at GITHub.com at the projects repository. https://github.com/wahmed36/MessageBus

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Waseem Ahmed (Qatar)
Mowasalat
Qatar Qatar
Working for Mowasalat (Karwa) as Solutions Manager. My prime interest is in helping local (Qatar) companies to come up with innovative solutions for public use.

You may also be interested in...

Comments and Discussions

 
QuestionDistributed architecture Pin
Ryan Criddle26-Apr-18 10:44
memberRyan Criddle26-Apr-18 10:44 
AnswerRe: Distributed architecture Pin
KevinAG2-May-18 6:05
memberKevinAG2-May-18 6:05 
QuestionWhy not use Blocking Collections? Pin
Dewey25-Apr-18 20:31
memberDewey25-Apr-18 20:31 
Minor Correction: You said "Using MongoDB's capabilities of capped connections", that should have been collections.

Main Point: C# has Blocking Collections that can be used in a producer/consumer fashion and may be superior to the MongoDB version.

However the biggest advantage of using Blocking Collections is that you would then have a 100% C# solution that could be used in any NoSql database, not just Mongo.
AnswerRe: Why not use Blocking Collections? Pin
KevinAG1-May-18 19:12
memberKevinAG1-May-18 19:12 
GeneralRe: Why not use Blocking Collections? Pin
stixoffire18-Apr-19 19:29
memberstixoffire18-Apr-19 19:29 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

Permalink | Advertise | Privacy | Cookies | Terms of Use | Mobile
Web03 | 2.8.190518.1 | Last Updated 26 Apr 2018
Article Copyright 2018 by Waseem Ahmed (Qatar)
Everything else Copyright © CodeProject, 1999-2019
Layout: fixed | fluid