Click here to Skip to main content
11,789,710 members (64,143 online)
Click here to Skip to main content

Tagged as

Creating an AMQP Sample App using RabbitMQ

, 9 Dec 2011 CPOL 25.3K 10
Rate this:
Please Sign up or sign in to vote.
CodeProjectThis sample app is written using .NET 4.0 in C#.RabbitMQ provides a .net client library appropriately called RabbitMQ.Client.Documentation on the client can be found here. As with all new technologies, the documentation is bare bones, the api is kind of raw and disorganized in that alm
This sample app is written using .NET 4.0 in C#.

RabbitMQ provides a .net client library appropriately called RabbitMQ.Client. Documentation on the client can be found here. As with all new technologies, the documentation is bare bones, the api is kind of raw and disorganized in that almost everything is accessible from one object - IModel.  So one of the first things to do is to:

  1. Make it a little easier to use by organizing the functionality around our most common use cases.
    1. Publish something.
    2. Receive something.
  2. Separate the declarative from functional.  
    • The process of creating exchanges, queue's and bindings is separate from
    • The process of publishing and receiving messages.
  3. Make it configurable using the app.config file.
Before we designing the simplified api's described above, let's take a brief tour of the functionality the RabbitMQ.Client provides. Here's example code to highlight how the RabbitMQ.Client api allows us to interact with the RabbitMQ server.
// Creating Connections
RabbitMQ.Client.ConnectionFactory factory = new RabbitMQ.Client.ConnectionFactory();
factory.Endpoint = new AmqpTcpEndpoint(server);
factory.UserName = userName;
factory.Password = password;

IConnection Connection = factory.CreateConnection();
To connect to the RabbitMQ server a connection has to be established between the client and the server as show above. Line 7 creates the connection that we're going to use in the following example.

The connection is not what we're going to use to communicate with the server, instead we use dedicated communication channel called IModel object that the connection object creates for us.

IModel is the communication channel between the client and the broker and multiple channels can be made. One of the RabbitMQ oddities is that the IModel object can't be shared between threads and we need to make sure to take that into account when designing our applications. The next section shows some of the most commonly used features that the channel object provides.

using (IModel channel = Connection.CreateModel())
    ... // Object declaration //
    channel.ExchangeDeclare( /* parameters */);
    channel.QueueDeclare( /* parameters */);
    channel.QueueBind( /* parameters */);

// Publishing 
     channel.BasicPublish( /* object parameters */);

// Synchronous receiving
    channel.BasicGet( /* parameters */);

// Acknowledging a message
   channel.BasicAck( /* parameters */);

// Reject a message
   channel.BasicReject( /* parameters */);

// Requeue a message
   channel.BasicReject( /* parameters - make sure to set requeue = true */);
Oddly enough Requeueing messages uses the BasicReject(...) method, BasicReject has a requeue parameter, and by setting that to true, the message is requeued.

Creating a dedicated AMQP App Config Section

When we use RabbitMQ, we have to configure the server to appropriately receive and distribute messages to clients as well as configuring our application to send or receive messages from the appropriate exchange or queues.

The easiest way to do that is to use the configuration file to do so, this also allows us to easily change these values as our application get's deployed to various environments.

I chose to create a dedicated configuration section for RabbitMQ because there's a lot to configure and I wanted to keep all of the RabbitMQ configuration elements together. The result is shown below (code can be found at CodePlex project: codeplex_link

    <sectiongroup name="AMQPConnection">
      <section name="ConnectionSettings" type="Sample.Configuration.AMQP.Config.ConnectionSection, Sample.Configuration.AMQP" />
    <sectiongroup name="AMQPAdmin">
      <section name="AMQPObjectsDeclaration" type="Sample.Configuration.AMQP.Config.AMQPObjectsDeclarationSection, Sample.Configuration.AMQP" allowlocation="true" allowdefinition="Everywhere" />
        <add name="orders" type="topic" durable="true" autodelete="false" />
        <add name="uk_orders" durable="true" autodelete="false" />
        <add subscriptionkey="" queue="uk_orders" exchange="orders" />
      <connection name="connection" username="guest" server="devserver-rr1" password="guest" />
      <publisher exchange="orders" />
      <asyncreceiver queue="uk_orders" maxthreads="4" />

Description of the configuration file by Lines

  • 4, 7: Map the configuration section interpreter to the appropriate class in your project. Each xml element represents an object and has to be interpreted
  • 13. Creates a topic exchange called orders that is durable ( meaning it will persist a server reboot) and is not autodelete ( if the autodelete flag is set to true, the exchange will be removed when all clients are done publishing to it).
  • 16. Creates a queue called "uk_orders" to represent all orders for UK customers. The queue is set to durable and not autodelete
  • 19. Binds the "uk_orders" queue to the "orders" exchange using the "" subscription key. That way all orders starting with "" will end up in that queue
  • 25: Configures the connection string to the server.
  • 26,. Configures the publisher to publish messages to the order exchange /li>
  • 27. Configures the asynchronous receiver to use 4 threads to pick up orders from "uk_orders" queue as they arrive

The configuration sections can accept lists of each type of AMQP object (exchange, queue and binding).


So what does this look like when used in creating exchanges, queues and their bindings?
namespace Sample.Configuration.AMQP.Admin
    public class RabbitAdmin
        internal static void InitializeObjects(IConnection Connection)
            var config = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None);
            var objects = config.GetSection("AMQPAdmin/AMQPObjectsDeclaration") as AMQPObjectsDeclarationSection;
            if (objects != null)
                Parallel.For(0, objects.ExchangeList.Count, i =>
                    using (IModel channel = Connection.CreateModel())
                        var exchange = objects.ExchangeList[i];
                        channel.ExchangeDeclare(exchange.Name, exchange.Type.ToString(), exchange.Durable, exchange.AutoDelete, null);
                Parallel.For(0, objects.QueueList.Count, i =>
                    using (IModel channel = Connection.CreateModel())
                        var queue = objects.QueueList[i];
                        channel.QueueDeclare(queue.Name, queue.Durable, queue.Exclusive, queue.AutoDelete, null);
                Parallel.For(0, objects.BindingList.Count, i =>
                    using (IModel channel = Connection.CreateModel())
                        var binding = objects.BindingList[i];
                        channel.QueueBind(binding.Queue, binding.Exchange, binding.SuscriptionKey);
The RabbitAdmin class creates the exchanges, queues and their bindings. It takes a connection object and uses that to communicate with the RabbitMQ server. The InitializeObjects uses the parallel for loop to create each type of object. Exchanges are created first, then queues, bindings are declared the last since they need both the queue and exchange to be there to bind the two.


The GatewayFactory, creates the connection to the server, calls RabbitAdmin to declare all the objects and provides methods to create the publisher and asynchronous listener helper objects.Messages are passed to the publisher and received from the queues, the message object is a very simple object, it contains a header and a body.
    /// <span class="code-SummaryComment"><summary>
The body is a simple byte array that your internal data structure is serialized to. To be able to generically handle conversions between data structures and messages, the following delegates and interfaces are included in the package
namespace Sample.Configuration.AMQP.Gateway
    public delegate Message ConvertToMessage(IModel channel, object packetToSend);
    public interface IConvertToMessage {
        Message ConvertObjectToMessage( IModel channel, object packetToSend);
The client of the system takes care of converting from their data structures to the message structure.


 using (var gf = new GatewayFactory())
    var mc = new Sample.Configuration.AMQP.Gateway.Converter.StringToMessageConverter();
    var publisher = gf.GetPublisher(mc.ConvertObjectToMessage);
    publisher.Publish("Hello world");
With configuration in place, publishing is a four line affair. Simply create a GatewayFactory, that internally reads the configuration file and set's everything up. Then request a publisher and pass a objectToMessage conversion handler. The library comes with a default string to message converter that is used in line 5 which is useful for xml documents. Otherwise a custom converter will have to be created. To give you an idea of how to create a converter, let's take a look at the string converter.
public class StringToMessageConverter : IConvertToMessage
    public static readonly string PLAIN_TEXT = "text/plain";
    public const string _defaultCharSet = "utf-8";
    public string CharSet { get; set; }
    public StringToMessageConverter()
        CharSet = _defaultCharSet;
    public virtual Message ConvertObjectToMessage(RabbitMQ.Client.IModel channel, object packetToSend)
        var properties = channel.CreateBasicProperties();
        var bytes = Encoding.GetEncoding(CharSet).GetBytes((string)packetToSend);
        properties.ContentType = PLAIN_TEXT;
        properties.ContentEncoding = CharSet;
        return new Message() { Body = bytes, Properties = properties, RoutingKey = string.Empty };

Conversion to a message is primarily about converting whatever needs to be sent to a bit array, letting the receiver know what the content is and setting the RoutingKey.

The Asynchronous Receiver

Receiving messages asynchronously is as simple as publishing. The receiver has to communicate back to the server the status of the message - it can be acknowledged, rejected or requeued.
class Program
    static void Main(string[] args)
        var mp = new MessageProcessor();
        using (var cf = new GatewayFactory())

class MessageProcessor : IMessageConsumer
    public void ConsumeMessage(Message message, RabbitMQ.Client.IModel channel, DeliveryHeader header)
            var str = ConvertFromMessageToString(message);
            channel.BasicAck(header.DeliveryTag, false);
        catch (Exception ex)
            channel.BasicReject(header.DeliveryTag, true);
    public string ConvertFromMessageToString(Message message)
        var content = string.Empty;
        if (message.Properties.ContentType == StringToMessageConverter.PLAIN_TEXT)
            var encoding = Encoding.GetEncoding(message.Properties.ContentEncoding ?? "utf-8");
            var ms = new MemoryStream(message.Body);
            var reader = new StreamReader(ms, encoding, false);
            content = reader.ReadToEnd();
        return content;
In Line 8 we pass the message handler to the asynchronous receiver. The async receiver is multi threaded and the handler is expected to be stateless so that it can be used by multiple threads at the same time. Responding to the server is done through the IModel object. The Delivery header contains a deliverytag and a flag to indicate if this was the first time it was delivered.
Something useful with the message can be done between lines 19 and 20.

Results in Action

To test this I simulated the e-commerce flow I've used as an example previously

This is the result
It shows how orders flow into the order exchange and from there to the queues that are interested in orders.
Here we see how shipments are sent to an exchange and then transmitted to queues that are bound to it.
You can find the source code for a working example of a distributed expert system using rabbitMQ at my source code repository


This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


About the Author

Ruben Rotteveel
United States United States
No Biography provided

You may also be interested in...

Comments and Discussions

QuestionA VS project upload ? Pin
manfbraun31-Dec-12 9:39
membermanfbraun31-Dec-12 9:39 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Terms of Use | Mobile
Web01 | 2.8.1509028.1 | Last Updated 9 Dec 2011
Article Copyright 2011 by Ruben Rotteveel
Everything else Copyright © CodeProject, 1999-2015
Layout: fixed | fluid