My Articles about ZeroMQ
Introduction
In this article, we will talk about the two types of messages that ZeroMQ can send or receive and how to format these messages using JSON. We will also learn the polling mechanism and how to use it. Finally, we will explore the synchronized PUB-SUB pattern with an example.
ZeroMQ Messages
ZeroMQ
sockets can send or receive either a single-part or a multi-part message.
Single-part Message
A single-part message is a message that has one frame. A frame is an array of bytes. The length of a frame can be zero upwards. Frames also called "message parts" in ZeroMQ reference manual.
For example, the messages that we sent/received in the previous article were all single-part messages type. We have used string
s during sending receiving operations (a string
is an array of bytes).
We can use the following methods (found in clrzmq.dll library) to send or receive a single-part message (single-frame message):
- Send\Receive
- SendFrame\ReceiveFrame
Multi-part Messages
A multi-part message is a message that has more than one frame. ZeroMQ sends this message as a single on-the-wire message and guarantees to deliver all the message parts (one or more), or none of them.

A frame has a Boolean property (flag
) named HasMore
which indicates whether another frame is following in the message. This property has a true value in all frames except the last frame where it has a false
value.
There are two ways to send or receive multi-part messages:
Sending or Receiving Each Part (frame) Separately
The following snippet shows an example of sending a multi-part message composed of three frames:
socket.SendFrame(new Frame(Encoding.UTF8.GetBytes("My Frame 01"))
{ HasMore = true });
socket.SendFrame(new Frame(Encoding.UTF8.GetBytes("My Frame 02"))
{ HasMore = true });
socket.SendFrame(new Frame(Encoding.UTF8.GetBytes("My Frame 03")));
So, to send each frame separately:
- Create the first frame and set its
HasMore
property to true
. This property will indicate to the SendFrame
method that there is another frame to send. - Call
SendFrame
method using the above created frame as its parameter. - Repeat the above steps for other frames except the last frame.
- Create the last frame (default value of
HasMore
property is false
). This frame's HasMore
property will inform SendFrame
method that there is no other frame and it's the last one to send.
To receive each frame separately:
- Loop over
ReceiveFrame
method until the received Frame's HasMore
property equals false
.
The following snippet shows an example of receiving multi-part message:
var frames = new List<Frame>();
do
{
frames.Add(socket.ReceiveFrame());
}
while (frames.Last().HasMore);
Using a Wrapper to Send or Receive the Entire Multi-part (frames) Message
The class ZmqMessage
(found in the clrzmq.dll library) encapsulates one or more frames, so, it represents a single-part or a multi-part message. This class manages internally the HasMore
property values for all frames.
The following snippet shows an example of sending and receiving multi-part message using ZmqMessage
class:

So, to send a multi-part message using the ZmqMessage
class:
- Create an instance of
ZmqMessage
. - Create frames and append them to the
ZmqMessage
instance. - Send the message using
SendMessage
method.
And, use ReceiveMessage
method to receive messages of type ZmqMessage
.
Advantage of Multi-part Messages
The multi-part message shines in the PUB-SUB pattern. In this pattern, we can put the subscription key in a separate frame (we call it an "envelope"), and the message data in another frame. This ensures the separation between the subscription key and the published data. PUB sockets filter messages by comparing the subscription prefixes (sent by the subscribers) with the starting characters of the message to send. By putting the subscription key in a separate frame (first one), the PUB socket will filter messages using this frame only, while the other frames remain outside filtering process.

One of other advantages of multi-part messages is the ability to send an address (endpoint) in a frame to the destination. For example, a publisher can send a private address inside a frame allowing the subscriber to send a reply to the publisher using this address.
Take into consideration the note mentioned in the ZeroMQ reference manual:
ZeroMQ does not send the message (single or multi-part) right away but at some indeterminate later time. A multi-part message must therefore fit in memory. If you want to send files of arbitrary sizes, you should break them into pieces and send each piece as separate single-part messages.
JSON over ZeroMQ
We have seen that ZeroMQ messages are composed of one or more frames (a frame represents an array of bytes). So, we can serialize a frame using JSON data format (or other formats like Messagepack
, Protobufs
...). With frame serialization, we can send/receive a multi-part message having one or more serialized frames.
In this article, I used ServiceStack.Text
library for JSON serialization (of course, you can choose another library). The following snippet shows a wrapper to serialize and deserialize a frame:
public static class JsonFrame
{
public static Frame Serialize<T>(T messageObject)
{
var message = JsonSerializer.SerializeToString<T>(
messageObject);
return new Frame(Encoding.UTF8.GetBytes(message));
}
public static T DeSerialize<T>(Frame frame)
{
var messageObject =
JsonSerializer.DeserializeFromString<T>(
Encoding
.UTF8
.GetString(frame.Buffer));
return messageObject;
}
}
And here is a snippet showing how to use the above class in serializing/deserializing a frame in a multi-part message:

In the above code, we have created an instance of ShoppingBasket
class. Then, we serialized this instance and put it in the second frame in a multi-part message. The first frame contains a message title (text format, no serialization). After that, we sent this message (published). At receiver side, we received it and extracted the message title from the first frame and deserialized the second frame’s content. This deserialization gave us back the ShoppingBasket
instance.
Polling
When we want to receive a message, we used to implement it (until now) as follows:
- Building a main loop:
- Call
Receive
\ReceiveFrame
or ReceiveMessage
method in blocking mode (the call to the method will not return until a message arrives). - Process the message.
And, if we have another socket and we want also to receive messages from it, then we can either:
- Put the other socket in different thread and loop over the receive method.
- Build a main loop:
- Build socket 1 loop:
- Call receive method in non blocking mode
- Build socket 2 loop:
- Call receive method in non blocking mode
In these solutions, managing sockets becomes hard, especially when we have many of them. Here, Polling came to rescue, which is a mechanism that enables us to send\receive messages through triggered events over multiple sockets. This mechanism is implemented in the Poll
class. Here, a snippet showing how to use it:
var pubSocket = ctx.CreateSocket(SocketType.PUB);
pubSocket.Bind(options.pubEndpoint);
pubSocket.SendReady +=new
EventHandler<SocketEventArgs>(pubSocket_SendReady);
var repSocket = ctx.CreateSocket(SocketType.REP);
repSocket.Bind(options.repEndpoint);
repSocket.SendReady +=new
EventHandler<SocketEventArgs>(repSocket_SendReady);
repSocket.ReceiveReady +=new
EventHandler<SocketEventArgs>(repSocket_ReceiveReady);
Poller poller = new Poller(new ZmqSocket[] {pubSocket,
repSocket});
while (true)
{
poller.Poll();
}
And the implementation of events:
void repSocket_ReceiveReady(object sender, SocketEventArgs e)
{
}
void repSocket_SendReady(object sender, SocketEventArgs e)
{
}
static void pubSocket_SendReady(object sender, SocketEventArgs e)
{
}
To use polling:
- Create sockets.
- Bind\connect sockets to their endpoints.
- Subscribe to the following socket events (depending on socket type, for example, we cannot send messages via PUB sockets):
- Create an instance of
Poll
class and pass the sockets to the constructor. - Loop over
poll
method of the Poll
instance object.
Each time a socket is ready to send a message, the SendReady
event is fired, and when it’s ready to receive a message, ReceiveReady
is fired.
The synchronized PUB-SUB pattern example (explained later in this article) uses polling mechanism.
Synchronized Pub-Sub Pattern
In the basic Pub-Sub pattern, the subscriber may lose some publisher messages due to its late connection to publisher. What we need is a sort of synchronization between the publisher and its subscribers. A simple solution is to add - in the publisher - a time delay (sleeping) before starting sending messages, ensuring that all subscribers are connected. But this solution is a fragile solution and not suitable for real applications. The good solution is using REQ-REP sockets for synchronization between the publisher and its subscribers. The following diagram shows the synchronized Pub-Sub pattern:

The synchronization scenario is:
- The publisher must know in advance the number of subscribers it expects.
- The publisher start publishing messages through PUB socket asking subscribers to synchronize. Then, it waits for all subscribers to connect.
- The subscriber:
- Receives the message asking for synchronization
- Connects the REQ socket and sends a synchronization message to the publisher
- The publisher increments the synchronized subscribers counter.
- The publisher replies the subscriber through the REB socket.
- When the synchronized subscribers counter reaches the expected number of subscribers, the publisher starts sending the real data.
One Publisher – Two Subscribers
In this synchronized PUB-SUB pattern example, we have one publisher and two subscribers. The publisher will not publish its data until all subscribers become ready to receive the data.
The following diagram illustrates the example:

The code of the publisher is:
var pubSocket = ctx.CreateSocket(SocketType.PUB);
pubSocket.Bind(options.pubEndpoint);
pubSocket.SendReady +=new
EventHandler<SocketEventArgs>(pubSocket_SendReady);
var repSocket = ctx.CreateSocket(SocketType.REP);
repSocket.Bind(options.repEndpoint);
repSocket.SendReady +=new
EventHandler<SocketEventArgs>(repSocket_SendReady);
repSocket.ReceiveReady +=new
EventHandler<SocketEventArgs>(repSocket_ReceiveReady);
Poller poller = new Poller(new ZmqSocket[] {pubSocket, repSocket});
while (true)
{
poller.Poll();
if (options.maxMessage >= 0)
if (msgCptr > options.maxMessage)
Environment.Exit(0);
}
And the socket events are:
#region REP events
static void repSocket_ReceiveReady(object sender, SocketEventArgs e)
{
var reqMsg = e.Socket.Receive(Encoding.UTF8);
DisplayRepMsg("REP, received: " + reqMsg);
}
static void repSocket_SendReady(object sender, SocketEventArgs e)
{
DisplayRepMsg("REP, sending: Sync OK");
e.Socket.Send(Encoding.UTF8.GetBytes("Sync OK"));
nbSubscribersConnected++;
}
#endregion
#region PUB events
static void pubSocket_SendReady(object sender, SocketEventArgs e)
{
var zmqMessage = new ZmqMessage();
if (nbSubscribersConnected < options.nbExpectedSubscribers)
{
zmqMessage.Append(Encoding.UTF8.GetBytes("Sync"));
zmqMessage.Append(Encoding.UTF8
.GetBytes(options.repEndpoint));
Thread.Sleep(options.delay);
Console.WriteLine("Publishing: Sync");
}
else
{
zmqMessage.Append(Encoding.UTF8.GetBytes("Data"));
var data = BuildDataToPublish();
if (!string.IsNullOrEmpty(data))
{
zmqMessage.Append(Encoding.UTF8.GetBytes(data));
Thread.Sleep(options.delay);
Console.WriteLine("Publishing (Data): " + data);
}
}
e.Socket.SendMessage(zmqMessage);
}
#endregion
The code of the subscriber is:
Thread.Sleep(options.delay);
var subSocket = ctx.CreateSocket(SocketType.SUB);
subSocket.Connect(options.subEndpoint);
subSocket.SubscribeAll();
var pubMsg = subSocket.ReceiveMessage();
if (Encoding.UTF8.GetString(pubMsg[0]) == SYNC)
{
Console.WriteLine("SUB; received: " +
Encoding.UTF8.GetString(pubMsg[0]));
using (var reqSocket = ctx.CreateSocket(SocketType.REQ))
{
reqSocket.Connect(Encoding.UTF8.GetString(pubMsg[1]));
DisplayReqMsg("REQ; sending : Sync me");
reqSocket.Send("Sync me", Encoding.UTF8);
var repMsg = reqSocket.Receive(Encoding.UTF8);
DisplayReqMsg("REQ; received: " + repMsg);
}
}
while (true)
{
pubMsg = subSocket.ReceiveMessage();
if (Encoding.UTF8.GetString(pubMsg[0]) != SYNC)
{
Console.WriteLine(
"SUB; received: " +
Encoding.UTF8.GetString(pubMsg[1]));
}
}
Double click on SyncPubSub_Pattern_1.bat file under bin directory. This batch file contains the following commands:
start "Subscriber 1" cmd /T:4F /k SyncSub.exe -e tcp://127.0.0.1:5000 -d 0
start "Subscriber 2" cmd /T:4F /k SyncSub.exe -e tcp://127.0.0.1:5000 -d 4000
start "Publisher" cmd /T:0A /k SyncPub.exe -e tcp://127.0.0.1:5000
-p tcp://127.0.0.1:6000 -n 2 -m "Orange #nb#";"Apple #nb#" -x 5 -d 1000
The first two commands will run two instances of a synchronized subscriber application (SyncSub.exe). Each subscriber will connect to endpoint tcp://127.0.0.1:5000 and subscribe to all publisher messages.
The third command will run the synchronized publisher (SyncPub.exe). The publisher will bind the PUB socket to endpoint tcp://127.0.0.1:5000 and the REP socket to endpoint tcp://127.0.0.1:6000. Then, it waits for connections from subscribers. It expects two subscribers to connect (-n switch
).
The publisher starts publishing synchronizing messages. These messages are multi-part messages composed of two frames. The first frame contains the title message (“Sync
”) and the second frame contains the endpoint of the REP socket (tcp://127.0.0.1:6000).
When the subscriber receives the synchronization message, it connects the REQ socket to the received endpoint (found in the second frame of the synchronized message), and send a request. The publisher receives the request, increments the subscribers counter and acknowledges the request.
When the counter of the synchronized subscribers reaches the expected number of subscribers, the publisher starts publishing the data. It sends five messages (alternates between the word ‘Orange
’ and ‘Apple
’), each of these words is concatenated with the message number (#nb# macro
). The delay between messages is 1000 milliseconds (-d switch
).
Notice the delay of 4000 seconds in the second subscriber for simulating a late arrival (the two subscribers not connecting at the same time).
After running the above commands, we get the following result:

Conclusion
ZeroMQ can send either a single or multi-part messages. Each message part is a frame, which is an array of bytes. A frame can be serialized using JSON data format (or other formats).
Polling is a mechanism enabling us to send\receive messages through triggered events over multiple sockets.
The synchronized PUB-SUB pattern ensures that data will not be published until all expected number of subscribers are connected and ready to receive real data.