Click here to Skip to main content
Click here to Skip to main content

ZeroMQ via C#: Multi-part messages, JSON and Synchronized PUB-SUB pattern

, 25 Dec 2012 CPOL
Rate this:
Please Sign up or sign in to vote.
In this article we will talk about the two types of messages that ZeroMQ can send or receive and how to format these messages using JSON. We will also learn the polling mechanism and how to use it. Finally, we will explore the synchronized PUB-SUB pattern with an example.

Download Source and Binaries from GitHub  

Introduction  

This is the second part of the ZeroMQ via C#. Other parts are:  

ZeroMQ via C#: Introduction 

In this article we will talk about the two types of messages that ZeroMQ can send or receive and how to format these messages using JSON. We will also learn the polling mechanism and how to use it. Finally, we will explore the synchronized PUB-SUB pattern with an example.   

ZeroMQ messages  

ZeroMQ sockets can send or receive either a single-part or a multi-part message.

Single-part Message 

A single-part message is a message that has one frame. A frame is an array of bytes. The length of a frame can be zero upwards. Frames also called "message parts" in ZeroMQ reference manual. 

For example, the messages that we sent/received in the previous article were all single-part messages type. We have used strings during sending receiving operations (a string is an array of bytes). 

We can use the following methods (found in clrzmq.dll library) to send or receive a single-part message (single-frame message):  

  • Send\Receive 
  • SendFrame\ReceiveFrame 

Multi-part Messages 

A multi-part message is a message that has more than one frame. ZeroMQ sends this message as a single on-the-wire message and guarantees to deliver all the message parts (one or more), or none of them. 

A frame has a Boolean property (flag) named HasMore which indicates whether another frame is following in the message. This property has a true value in all frames except the last frame where it has a false value. 

There are two ways to send or receive multi-part messages:

Sending or receiving each part (frame) separately 

The following snippet shows an example of sending a multi-part message composed of three frames: 

socket.SendFrame(new Frame(Encoding.UTF8.GetBytes("My Frame 01")) 
                     { HasMore = true });
socket.SendFrame(new Frame(Encoding.UTF8.GetBytes("My Frame 02")) 
                     { HasMore = true });
socket.SendFrame(new Frame(Encoding.UTF8.GetBytes("My Frame 03")));  

So, to send each frame separately: 

  1. Create the first frame and set its HasMore property to true. This property will indicate to the SendFrame method that there is another frame to send.
  2. Call SendFrame method using the above created frame as its parameter.
  3. Repeat the above steps for other frames except the last frame.
  4. Create the last frame (default value of HasMore property is false). This frame's HasMore property will inform SendFrame method that there is no other frame and it's the last one to send. 

To receive each frame separately:  

  1. Loop over ReceiveFrame method until the received Frame's HasMore property equals false. 

The following snippet shows an example of receiving multi-part message:

var frames = new List<Frame>();
do
{
    frames.Add(socket.ReceiveFrame());
}
while (frames.Last().HasMore); 

Using a wrapper to send or receive the entire multi-part (frames) message

The class ZmqMessage (found in the clrzmq.dll library) encapsulates one or more frames, so, it represents a single-part or a multi-part message. This class manages internally the HasMore property values for all frames.

The following snippet shows an example of sending and receiving multi-part message using ZmqMessage class:


So, to send a multi-part message using the ZmqMessage class:

  1. Create an instance of  ZmqMessage.
  2. Create frames and append them to the ZmqMessage instance.
  3. Send the message using SendMessage method.

And, use ReceiveMessage method to receive messages of type ZmqMessage

Advantage of Multi-part Messages 

The multi-part message shines in the PUB-SUB pattern. In this pattern we can put the subscription key in a separate frame (we call it an "envelope"), and the message data in another frame. This ensures the separation between the subscription key and the published data. PUB sockets filter messages by comparing the subscription prefixes (sent by the subscribers) with the starting characters of the message to send. By putting the subscription key in a separate frame (first one), the PUB socket will filter messages using this frame only, while the other frames remain outside filtering process.  

 

One of other advantages of multi-part messages is the ability to send an address (endpoint) in a frame to the destination. For example, a publisher can send a private address inside a frame allowing the subscriber to send a reply to the publisher using this address. 

Take in consideration the note mentioned in the ZeroMQ reference manual:

ZeroMQ does not send the message (single or multi-part) right away but at some indeterminate later time. A multi-part message must therefore fit in memory. If you want to send files of arbitrary sizes, you should break them into pieces and send each piece as separate single-part messages. 

JSON over ZeroMQ  

We have seen that ZeroMQ messages are composed of one or more frames (a frame represents an array of bytes). So, we can serialize a frame using JSON data format (or other formats like Messagepack, Protobufs...).  With frame serialization we can send/receive a multi-part message having one or more serialized frames.

In this article I used ServiceStack.Text library for JSON serialization (of course you can choose another library). The following snippet shows a wrapper to serialize and deserialize a frame:  

public static class JsonFrame
{
    public static Frame Serialize<T>(T messageObject)
    {
        var message = JsonSerializer.SerializeToString<T>(
                                        messageObject);
        return new Frame(Encoding.UTF8.GetBytes(message));
    }
 
    public static T DeSerialize<T>(Frame frame)
    {
        var messageObject =   
                  JsonSerializer.DeserializeFromString<T>(
                                  Encoding
                                  .UTF8                                              
                                  .GetString(frame.Buffer));
        return messageObject;
    }
}
And here is a snippet showing how to use the above class in serializing/deserializing a frame in a multi-part message:  

In the above code we have created an instance of ShoppingBasket class. Then, we serialized this instance and put it in the second frame in a multi-part message. The first frame contains a message title (text format, no serialization). After that, we sent this message (published). At receiver side, we received it and extracted the message title from the first frame and deserialized the second frame’s content. This deserialization gave use back the ShoppingBasket instance.  

Polling  

When we want to receive a message, we used to implement it (until now) as follows: 

  • Building a main loop:
    • Call Receive\ReceiveFrame or ReceiveMessage method in blocking mode (the call to the method will not return until a message arrives).  
    • Process the message. 

And, if we have another socket and we want also to receive messages from it, then we can either: 

  1. Put the other socket in different thread and loop over the receive method.
  2. Build a main loop:
    • Build socket 1 loop:
      • Call receive method in non blocking mode
    • Build socket 2 loop:
      • Call receive method in non blocking mode 

In these solutions, managing sockets becomes hard, especially when we have many of them. Here, Polling came to rescue, which is a mechanism that enables us to send\receive messages through triggered events over multiple sockets. This mechanism is implemented in the Poll class. Here, a snippet showing how to use it: 

var pubSocket = ctx.CreateSocket(SocketType.PUB);
pubSocket.Bind(options.pubEndpoint);
pubSocket.SendReady +=new                 
            EventHandler<SocketEventArgs>(pubSocket_SendReady);
var repSocket = ctx.CreateSocket(SocketType.REP);
repSocket.Bind(options.repEndpoint);
repSocket.SendReady +=new                  
            EventHandler<SocketEventArgs>(repSocket_SendReady);
repSocket.ReceiveReady +=new               
         EventHandler<SocketEventArgs>(repSocket_ReceiveReady);
Poller poller = new Poller(new ZmqSocket[] {pubSocket, 
                           repSocket});
while (true)
{
    poller.Poll();
} 
And the implementation of events: 
void repSocket_ReceiveReady(object sender, SocketEventArgs e)
{
}
 
void repSocket_SendReady(object sender, SocketEventArgs e)
{
}  
 
static void pubSocket_SendReady(object sender, SocketEventArgs e)
{
} 

To use polling: 

  1. Create sockets
  2. Bind\connect sockets to their endpoints
  3. Subscribe to the following socket events (depending on socket type, for example we cannot send messages via PUB sockets):
    • SendReady
    • ReceiveReady
  4. Create an instance of Poll class and pass the sockets to the constructor.
  5. Loop over poll method of the Poll instance object. 

Each time a socket is ready to send a message, the SendReady event is fired, and when it’s ready to receive a message, ReceiveReady is fired.

The synchronized PUB-SUB pattern example (explained later in this article) uses polling mechanism.

Synchronized Pub-Sub pattern

In the basic Pub-Sub pattern the subscriber may lose some publisher messages due to its late connection to publisher. What we need is a sort of synchronization between the publisher and its subscribers. A simple solution is to add - in the publisher - a time delay (sleeping) before starting sending messages, ensuring that all subscribers are connected. But this solution is a fragile solution and not suitable for real applications. The good solution is using REQ-REP sockets for synchronization between the publisher and its subscribers. The following diagram shows the synchronized Pub-Sub pattern: 

 

The synchronization scenario is:
  1. The publisher must know in advance the number of subscribers it expects. 
  2. The publisher start publishing messages through PUB socket asking subscribers to synchronize. Then, it waits for all subscribers to connect.
  3. The subscriber:
    • Receives the message asking for synchronization.
    • Connects the REQ socket and sends a synchronization message to the publisher.
  4. The publisher increments the synchronized subscribers counter.
  5. The publisher replies the subscriber through the REB socket.
  6. When the synchronized subscribers counter reaches the expected number of subscribers, the publisher starts sending the real data. 

One Publisher – Two Subscribers 

In this synchronized PUB-SUB pattern example, we have one publisher and two subscribers. The publisher will not publish its data until all subscribers become ready to receive the data. 

The following diagram illustrates the example:


The code of the publisher is: 

var pubSocket = ctx.CreateSocket(SocketType.PUB);
pubSocket.Bind(options.pubEndpoint);
pubSocket.SendReady +=new 
                 EventHandler<SocketEventArgs>(pubSocket_SendReady);
var repSocket = ctx.CreateSocket(SocketType.REP);
repSocket.Bind(options.repEndpoint);
repSocket.SendReady +=new 
                 EventHandler<SocketEventArgs>(repSocket_SendReady);
repSocket.ReceiveReady +=new 
              EventHandler<SocketEventArgs>(repSocket_ReceiveReady);
Poller poller = new Poller(new ZmqSocket[] {pubSocket, repSocket});
while (true)
{
    poller.Poll();
    if (options.maxMessage >= 0)
        if (msgCptr > options.maxMessage)
            Environment.Exit(0);
} 

 And the socket events are: 

 #region REP events
static void repSocket_ReceiveReady(object sender, SocketEventArgs e)
{
    var reqMsg = e.Socket.Receive(Encoding.UTF8);
    DisplayRepMsg("REP, received: " + reqMsg);
}
 
static void repSocket_SendReady(object sender, SocketEventArgs e)
{
    DisplayRepMsg("REP, sending: Sync OK");
    e.Socket.Send(Encoding.UTF8.GetBytes("Sync OK"));
    nbSubscribersConnected++;
} 
#endregion
 
#region PUB events       
static void pubSocket_SendReady(object sender, SocketEventArgs e)
{
    var zmqMessage = new ZmqMessage();
    if (nbSubscribersConnected < options.nbExpectedSubscribers)
    {
        zmqMessage.Append(Encoding.UTF8.GetBytes("Sync"));         
        zmqMessage.Append(Encoding.UTF8
                                  .GetBytes(options.repEndpoint));
        Thread.Sleep(options.delay);
        Console.WriteLine("Publishing: Sync");
    }
    else
    {
        zmqMessage.Append(Encoding.UTF8.GetBytes("Data"));
        var data = BuildDataToPublish();
        if (!string.IsNullOrEmpty(data))
        {
            zmqMessage.Append(Encoding.UTF8.GetBytes(data));
            Thread.Sleep(options.delay);
            Console.WriteLine("Publishing (Data): " + data);
        }
    }
    e.Socket.SendMessage(zmqMessage);
} 
#endregion 

The code of the subscriber is:

// Simulate late arrivals
Thread.Sleep(options.delay);
 
// Create and connect SUB socket
var subSocket = ctx.CreateSocket(SocketType.SUB);
subSocket.Connect(options.subEndpoint);
subSocket.SubscribeAll();
                    
// Receive Sync messqage
var pubMsg = subSocket.ReceiveMessage();
if (Encoding.UTF8.GetString(pubMsg[0]) == SYNC)
{
    Console.WriteLine("SUB; received: " + 
                      Encoding.UTF8.GetString(pubMsg[0]));
    using (var reqSocket = ctx.CreateSocket(SocketType.REQ))
    {
        reqSocket.Connect(Encoding.UTF8.GetString(pubMsg[1]));
        DisplayReqMsg("REQ; sending : Sync me");
        reqSocket.Send("Sync me", Encoding.UTF8);
        var repMsg = reqSocket.Receive(Encoding.UTF8);
        DisplayReqMsg("REQ; received: " + repMsg);
    }
}
                                     
// Receive published messages
while (true)
{                        
    pubMsg = subSocket.ReceiveMessage();
    if (Encoding.UTF8.GetString(pubMsg[0]) != SYNC)
    {
        Console.WriteLine(
                "SUB; received: " +
                Encoding.UTF8.GetString(pubMsg[1]));
    }
} 

Double click on SyncPubSub_Pattern_1.bat file under bin directory. This batch file contains the following commands:

start "Subscriber 1" cmd /T:4F /k SyncSub.exe -e tcp://127.0.0.1:5000 -d 0

start "Subscriber 2" cmd /T:4F /k SyncSub.exe -e tcp://127.0.0.1:5000 -d 4000

start "Publisher" cmd /T:0A /k SyncPub.exe -e tcp://127.0.0.1:5000 -p tcp://127.0.0.1:6000 -n 2 -m "Orange #nb#";"Apple  #nb#" -x 5 -d 1000 

 

The first two commands will run two instances of a synchronized subscriber application (SyncSub.exe). Each subscriber will connect to endpoint tcp://127.0.0.1:5000 and subscribe to all publisher messages. 

The third command will run the synchronized publisher (SyncPub.exe). The publisher will bind the PUB socket to endpoint tcp://127.0.0.1:5000 and the REP socket to endpoint tcp://127.0.0.1:6000. Then, it waits for connections from subscribers. It expects two subscribers to connect (-n switch). 

The publisher starts publishing synchronizing messages. These messages are multi-part messages composed of two frames. The first frame contains the title message (“Sync”) and the second frame contains the endpoint of the REP socket (tcp://127.0.0.1:6000). 

When the subscriber receives the synchronization message, it connects the REQ socket to the received endpoint (found in the second frame of the synchronized message), and send a request. The publisher receives the request, increment the subscribers counter and acknowledges the request.

When the counter of the synchronized subscribers reaches the expected number of subscribers, the publisher starts publishing the data. It sends five messages (alternates between the word ‘Orange’ and ‘Apple’), each of these words is concatenated with the message number (#nb# macro). The delay between messages is 1000 milliseconds (-d switch).

Notice the delay of 4000 seconds in the second subscriber for simulating a late arrival (the two subscribers not connecting at the same time).

After running the above commands we get the following result:

 

Conclusion 

ZeroMQ can send either a single or multi-part messages. Each message part is a frame, which is an array of bytes. A frame can be serialized using JSON data format (or other formats). 
Polling is a mechanism enabling us to send\receive messages through triggered events over multiple sockets. 
The synchronized PUB-SUB pattern ensures that data will not be published until all expected number of subscribers are connected and ready to receive real data.  

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Manar Ezzadeen
Architect
France France
Software Architect
 
Blog:
http://idevhawk.phonezad.com
 
Twitter:
@ManarEzzadeen
Follow on   Twitter

Comments and Discussions

 
QuestionSlow subscriber Pinmemberyuanbyu12-Apr-13 15:50 
GeneralMy vote of 5 PinmentorMd. Marufuzzaman31-Mar-13 23:35 
GeneralRe: My vote of 5 PinmemberManar Ezzadeen2-Apr-13 10:01 
GeneralMy vote of 5 Pinmemberjamesatgmail26-Jan-13 7:09 
GeneralRe: My vote of 5 PinmemberManar Ezzadeen23-Mar-13 7:45 
Thank you
GeneralMy vote of 5 Pinmembersam.hill25-Dec-12 17:40 
GeneralRe: My vote of 5 PinmemberManar Ezzadeen26-Dec-12 8:19 
QuestionQuestion about ZeroMQ PinmemberDewey25-Dec-12 12:50 
AnswerRe: Question about ZeroMQ PinmemberManar Ezzadeen26-Dec-12 8:18 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Terms of Use | Mobile
Web03 | 2.8.1411022.1 | Last Updated 25 Dec 2012
Article Copyright 2012 by Manar Ezzadeen
Everything else Copyright © CodeProject, 1999-2014
Layout: fixed | fluid