Contents
- Introduction
- Usage
Configuring the MSMQ Channel
-
Server config file
- Client config file
- Queues
- Concept and Design
-
Synchronously Call
-
Asynchronously Call
- Implementation
-
MSMQChannelLib
-
MSMQSender
-
Configuration
-
CreateMessageSin
-
SyncProcessMessage
-
AsyncProcessMessage
-
MSMQReceiver
- Test
- Conclusion
The Remoting channels are objects in the System.Runtine.Remoting
namespace that transport messages between applications across remoting
boundaries such as application domains, processes and computers in the
peer-to-peer manner. The channel is plug-in between the server and client side.
From the abstract point of view, the channel represents a logical connectivity
between two points. The point which listening inbound messages is called
endpoint and it is a logical address of the remote object, knew as URI
(uniform resource identifier). This connectivity is full transparent to the CLR
and "understandable" for open protocol design pattern.
The Remoting namespace includes an implementation of two channels: http
and tcp represented as standard channels. In this article I will
show you how to build and use the custom channel using the System.Messaging
namespace classes to implement the MSMQ channel for Remoting purpose. Before
than we will go to its implementation details, let's start it with usage and
configuration issue. I am assuming that you have a knowledge of the .Net
Remoting.
Using the MSMQ custom remoting channel required to install
MSMQChannelLib into the GAC. There are two
classes in this library:
- MSMQReceiver to listen and receive inbound messages at
the endpoint - server side
- MSMQSender to send outbound messages from the
consumer - client side
The Remoting object has no special
requirements to handle the custom remoting channel. It's a full transparently
without any knowledge who and how the messages are transferring. So, we can
administratively switch the channel (endpoint) any time, based on the
application environment.
The situation on the client side is slightly different. The sender needs to
know an URI address (endpoint) of the Remoting object for its activation, see
the following code snippet:
Type interfaceType = typeof(IRemInterface);
string objectUrl = @"msmq://./reqchannel/endpoint";
IRemInterface robj = (IRemInterface)Activator.GetObject(interfaceType, objectUrl);
This address can be either hard coded or retrieved from the config file.
Using the config file is more flexible. That's all differences on the client
side. Note that this issue is the same when the application is using two
standard channels (http, tcp).
Each side of the connectivity has own custom properties to configure a channel end in the
properly way.
Configuring the MSMQ Channel:
- id specifies an unique name of the channel that can be used
when referring to it (e.g. id="msmq")
- type is a full name of the channel that will be loaded (e.g. type="RKiss.MSMQChannelLib.MSMQReceiver, MSMQChannelLib"
means a MSMQReceiver class in the assembly MSMQChannelLib), that's why
assembly has to be installed into the GAC
- ref is the channel template being referenced (e.g. ref="msmq")
- name specifies the name of the channel. Each channel has an
unique name which is used for properly message routing (e.g. name="MyChannel").
Default name is msmq.
- priority specifies a preference for the channel. The default
value is 1. Higher numbers mean higher priority. So the messages are queued
based on this number.
- listener specifies a path of the existing queue for inbound messages
(MethodCall message). This is a custom channel property. Its format is
corresponding with the MSMQ specification. Note that the listener and remote
object should be located on the same machine (local queue). The default path
is ".\ReqChannel".
- respond specifies a path of the existing queue (respond
queue) for a ReturnMessage from the Receiver Channel. This is a
custom channel property. The default path is ".\RspChannel".
- admin specifies a path of the existing queue to collect
timed messages from the listener and respond queues. This is a
option custom channel property. If this property is not used, the timed
messages are not collected. The default value is null.
- timeout specifies a time in seconds for MethodCall
and ReturnMessage messages to be received. The default value is
60 seconds. The infinite timeout can be setup using the value -1.
Based on the above, the server and client channels can be configured as is
shown in the following pictures:
Server config file:
<configuration>
<system.runtime.remoting>
<application >
<service>
<wellknown mode="SingleCall" type="RemoteObject.RemObject, RemoteObject" objectUri="endpoint" />
</service>
<channels>
<channel type="RKiss.MSMQChannelLib.MSMQReceiver, MSMQChannelLib" listener=".\ReqChannel"/>
</channels>
</application>
</system.runtime.remoting>
</configuration>
The following picture shows another example of the server channel to configure two
msmq channels listening on the local queue with the name ReqChannel.
Their priority have a value 20. Each channel has own unique name such as
msmq_1 rsp. msmq_2.
<configuration>
<system.runtime.remoting>
<application >
<service>
<wellknown mode="SingleCall" type="RemoteObject.RemObject, RemoteObject" objectUri="endpoint" />
</service>
<channels>
<channel ref="msmq" name="msmq_1" listener=".\ReqChannel" priority="20"/>
<channel ref="msmq" name="msmq_2" listener=".\ReqChannel" priority="20"/>
</channels>
</application>
<channels>
<channel id="msmq" type="RKiss.MSMQChannelLib.MSMQReceiver, MSMQChannelLib"/>
</channels>
</system.runtime.remoting>
</configuration>
Client config file:
The client channel is represented by the MSMQSender class located in the
MSMQChannelLib asssembly. Its properties are: respond queue is ".\RspChannel"
, administration queue is ".\RspChannel", outbound
message and its return message are timed to be received within 30 seconds and
the channel priority is 10.
<configuration>
<system.runtime.remoting>
<application >
<client>
<wellknown type="RemoteObject.RemObject, RemoteObject" url="msmq://./ReqChannel/endpoint" />
</client>
<channels>
<channel type="RKiss.MSMQChannelLib.MSMQSender, MSMQChannelLib"
respond=".\RspChannel" admin=".\AdminChannel" timeout="30" priority="10"/>
</channels>
</application>
</system.runtime.remoting>
</configuration>
Queues:
The MSMQ Remoting channel requires an existing queue on the sender and receiver
sides. The administration queue on the sender side is an option.
These two queues have to be transactional for the channel using on the
Enterprise Network to guarantee delivering a message (one only) to the
destination queue. There is a special situation - calling a remoting OneWay
attributed method. In this case, there is no return message and the call has a
fire&forget design pattern.
The Custom Remoting MSMQ channel concept is based on the MSMQ technology,
where queues are accessible on the Enterprise Network from anywhere. The
remoting channel using a message passing design pattern between a remote object
and its consumer. It's based on the two parts: Sender and
Receiver with an interface contract ICannelSender resp.
IChannelReceiver from the System.Runtime.Remoting.Channels namespace.
Implementation of these two interfaces allows plug-in a particular channel and
passing a IMessage object through. Note that IMessage object contains a
communication data sent between cooperating message sinks controlled by the
Runtime.Remoting.Messaging namespace classes.
The IChannelSender interface allows to call a channel synchronously or
asynchronously using the delegate design pattern. Let's look at more details for
both options:
Synchronously Call:
The following picture shows an IMessage object flows through the MSMQ
remoting channel in the sync manner.
- The IMessage object is passed to the Message Sink calling the
IChannel.Sender.SyncProcessMessage function where is serialized
by a BinaryMessageFormater into the body of the message queue.
Before sending a message to the queue, its properties are setup for a callback
(respond) message. Each an outgoing MSMQ message has own unique Id, which this
design is using as a key to pick-up a callback message from the Respond queue.
So, the Sender is waiting for this message within the specified timeout limit.
Note, that we are blocking the caller, that's a sync call. When the return message
has been received, it's body is de-serialized into the IMessage object
and send it back to our caller as a result of the method call. In the case of
the timeout, the return value is null.
- The Receiver side is an event driven class derived from the
IChannelReceiver. It is listening on the Request queue for inbound
messages. When message has been received, the listener thread spawn thread to
the delegate worker. Its responsibility is a de-serializing incoming message
into the IMessage object and then synchronously dispatching this
object to the server - side chain based on the endpoint (URI) address, using
the ChannelServices.SyncDispatchMessage function. This function returns
an IMessage object, which it can be serialized into the callback message body stream. As I
mentioned earlier, the incoming message has been prepared by its sender for a
callback purpose such as a respond queue, acknowledge type, etc., so the
receiver just updating the message correlation Id (a message cookie) by the
message id and send it to the Respond queue. All this process is implemented
implicitly and for a receiver is a full transparently.
Note that the Receiver can handle also one special case - calling the OneWay
attributed remoting method. In this case, the return IMessage object
reference is null and it is done immediately without processing the
remote method body and also there is no respond message to the sender. However,
the client knows (from the assembly metadata) that this message has to be
handled asynchronously without any respond. It's very important to understand
the behaviour of this attribute which has a fire&forget design pattern. The
client in this case doesn't have any information about the remoting method processing such as
an error exceptions.
Asynchronously Call:
The following picture shows an IMessage object flows through the MSMQ
remoting channel in the async manner.
To process the IMessage object through channel in the asynchronously
manner is decided by the client. The ISenderChannel supports an
AsyncProcessMessage function which required two arguments: IMessage
and IMessageSink objects. As you can see the Sender is divided into two
isolated parts and the Receiver is identically to the above Sync Call design.
- Request part is flowing based on the IMessageSink object
value. In the null case (which it means the OneWay Method Call), the
IMessage object is serialized into the message body stream and send it to
the Request queue. There is no process of waiting for respond message, the
call is immediately returned to the caller with an IMessageCtrl value
equal null. On the other hand, when the IMessageSink object is
valid, the both arguments are delegated to the worker thread using the
BeginInvoke/EndInvoke design pattern and the call is immediately returned to
the caller with an IMessageCtrl value equal null. Note that this
solution doesn't have a possibility to abort an active async call using the
IMessageCtrl object.
- Respond part is running in the background and it's very simply. The
first, it is calling the SyncProcessMessage function to obtain the
IMessage object from the remote method in the synchronously
manner and then updating the AsyncResult object state with this value. Note
that this is a very cool design pattern between the caller (client) and
background process, that's why an async call is passing an IMessageSink
object (replySink). Calling the replySink.SyncProcessMessage function
two things are going to be performed:
-
updating an AsyncResult shared object state by
the IMessage object state (Return Message)
-
setting an AsyncResult.ManualResetEvent object
to notify a client that the result of the async call can be retrieved using the
EndInvoke function.
The MSMQ Custom Remoting Channel implementation using the System.Messaging
namespace classes to communicate with the MSMQ, System.Runtime.Remoting
namespace classes to integrated channel into the remoting mechanism and
System.Runtime.Serialization namespace classes to serialize/de-serialize
messages through the channel. All implementation is built by managed code and
package into one assembly named as MSMQChannelLib.
The MSMQChannelLib contains a functionality of the MSMQ remoting
channel for server and client side. Its assembly is required to be installed
into the GAC at the server and client side. Before using the MSMQ
remoting channel, the channel has to be registered either programmatically or
administratively. Note that there is no requirement to register a server channel
in prior to client channel registration, that's the feature of the loosely
coupled connectivity of the MSMQ. However, the MSMQ remoting channel requires in
prior of the registration that the Request respectively Respond queue is existed.
As I mentioned earlier this assembly contains two major classes. The
following are some details how they been implemented:
This is a derived class from the IChannelSender interface from the
System.Runtime.Remoting.Channels namespace. Its implementation is divided
into the following parts:
Configuration:
The class has several constructors to config the sender channel
programmatically and one constructor for administratively configuration using
the config file, here is its code snippet:
public MSMQSender(IDictionary properties, IServerChannelSinkProvider serverSinkProvider)
{
string pathRQ = DEFAULT_RESPONDCHANNEL;
string pathAQ = null;
int timeout = DEFAULT_TIMEOUTINSEC;
string channelName = DEFAULT_CHANNELNAME;
int channelPriority = DEFAULT_CHANNELPRIORITY;
if(properties.Contains("name"))
channelName = properties["name"].ToString();
if(properties.Contains("priority"))
channelPriority = Convert.ToInt32(properties["priority"]);
if(properties.Contains("respond"))
pathRQ = properties["respond"].ToString();
if(properties.Contains("admin"))
pathAQ = properties["admin"].ToString();
if(properties.Contains("timeout"))
timeout = Convert.ToInt32(properties["timeout"]);
Init(channelName, pathRQ, pathAQ, timeout, channelPriority);
Trace.WriteLine(string.Format("Client-config: name={0}, respQ={1}, adminQ={2}, timeout={3}, priority={4}",
m_ChannelName, pathRQ, pathAQ, timeout, m_ChannelPriority));
}
This constructor is invoked by the following function to pass all custom
channel properties specified in the config file via the IDictionary
interface:
RemotingConfiguration.Configure(@"..\..\Server.exe.config");
That's all of the configuration process for remoting channel. It's very
straightforward and easy implementation.
CreateMessageSink:
This is an implementation of the IChannelSender interface contract.
When consumer of the remote object calling the Activator class, as it is
shown in the following code snippet,
Type interfaceType = typeof(IRemInterface);
string objectUrl = @"msmq://./reqchannel/endpoint";
IRemInterface robj = (IRemInterface)Activator.GetObject(interfaceType, objectUrl);
the call is forward to the following function of the
MSMQSender class
public virtual IMessageSink CreateMessageSink(String url, Object data, out string objectURI)
{
objectURI = null;
string[] s = url.Split(new Char[]{'/'});
if(s.Length < PATH_MINNUMOFFIELDS || m_ChannelName + ":" != s[0])
return null;
string outpath = "";
for(int ii = 2; ii < s.Length-1; ii++)
{
outpath += s[ii];
outpath += "\\";
}
outpath = outpath.TrimEnd(new Char[]{'\\'});
objectURI = s[s.Length-1];
MSMQMessageSink msgsink = new MSMQMessageSink(this, objectURI, outpath);
Trace.WriteLine(string.Format("Client-CreateMessageSink: url = {0}, sink = {1}", url, msgsink.GetHashCode()));
return msgsink;
}
to check if the specified url can be handled by this channel. If the channel
name is not matching with the url field address, the null is returned. The
walking process is repeated through all registered channels until a successful
match is found, otherwise the error exception is thrown. In the successful case,
the MSMQMessageSink class is instated and its reference is returned to
the caller.
MSMQMessageSink:
The MSMQMessageSink is derived class from the IMessageSink and
IDictionary interfaces. When a method call is made on the proxy, the
remoting infrastructure passing IMessage object to this class using one
of the following ways:
1. SyncProcessMessage
public virtual IMessage SyncProcessMessage(IMessage msgReq)
{
IMessage msgRsp = null;
Message outMsg = null;
MessageQueueTransaction mqtx = new MessageQueueTransaction();
try
{
msgReq.Properties[OBJECTURI] = m_ObjectUri;
mqtx.Begin();
outMsg = new Message(msgReq, new BinaryMessageFormatter());
outMsg.ResponseQueue = m_parent.ResponseQueue;
outMsg.TimeToBeReceived = m_parent.ResponseTimeOut;
outMsg.AcknowledgeType = AcknowledgeTypes.NegativeReceive;
outMsg.AdministrationQueue = m_parent.AdminQueue;
string label = msgReq.Properties["__TypeName"] + "." + msgReq.Properties["__MethodName"];
m_OutQueue.Send(outMsg, label, mqtx);
mqtx.Commit();
Trace.WriteLine(string.Format("Client-Sync:PRE-CALL, msgId={0}", outMsg.Id));
Thread.Sleep(0);
msgRsp = ReceiveReturnMessage(outMsg.Id);
Trace.WriteLine(string.Format("Client-Sync:POST-CALL, msgId={0}", outMsg.Id));
}
catch (MessageQueueException ex)
{
Trace.WriteLine(string.Format("Client:SyncProcessMessage error = {0}, msgId = {1}", ex.Message, outMsg.Id));
}
catch(Exception ex)
{
Trace.WriteLine(string.Format("Client:SyncProcessMessage error = {0}, msgId = {1}", ex.Message, outMsg.Id));
}
finally
{
if(mqtx.Status == MessageQueueTransactionStatus.Pending)
{
mqtx.Abort();
Trace.WriteLine(string.Format("Client:SyncProcessMessage Aborted, msgId = {0}", outMsg.Id));
}
m_OutQueue.Close();
}
return msgRsp;
}
This is a synchronously invoking a remote method with a waiting for its
return message within a specified time. As you can see the above code snippet,
the outbound message is created and configured for its respond from the endpoint
and then sent to the transactional queue. After this, the helper
function ReceiveReturnMessage is called to wait for a message with a
specified correlation id on the Respond queue. This function returning a
reference to IMessage object, which it is returned back to the remoting
infrastructure as a ReturnMessage. Note that caller is blocked during the
process of the waiting for the respond message.
If the client wants to be pre-emptive with the remoting object, the process of
the invoking a remote method needs to be implemented by BeginInvoke/EndInvoke
design technique. In this case the following method is called:
2. AsyncProcessMessage
This function has a fire&forget design pattern implemented in two ways:
either sending an outbound message without waiting for its respond (OneWay
option) or delegating this call to the worker thread. In the both situation, the
call is immediately return back to the remoting infrastructure. The following
code snippet shows that:
public virtual IMessageCtrl AsyncProcessMessage(IMessage msgReq, IMessageSink replySink)
{
IMessageCtrl imc = null;
if(replySink == null)
{
MessageQueueTransaction mqtx = new MessageQueueTransaction();
try
{
mqtx.Begin();
msgReq.Properties[OBJECTURI] = m_ObjectUri;
string label = msgReq.Properties["__TypeName"] + "." + msgReq.Properties["__MethodName"];
m_OutQueue.Send(msgReq, label, mqtx);
mqtx.Commit();
Trace.WriteLine("Client-[OneWay]Async:CALL");
}
catch(Exception ex)
{
Trace.WriteLine(string.Format("Client:AsyncProcessMessage error = {0}", ex.Message));
}
finally
{
if(mqtx.Status == MessageQueueTransactionStatus.Pending)
{
mqtx.Abort();
}
m_OutQueue.Close();
}
}
else
{
delegateAsyncWorker daw = new delegateAsyncWorker(handlerAsyncWorker);
daw.BeginInvoke(msgReq, replySink, null, null);
}
return imc;
}
and here is a background process of the asynchronously call. This thread
actually call remote method synchronously and after that it is using a
replySink to update AsyncResult object state included setting its
ManualResetEvent object. After this work the worker thread is going to be
terminated itself.
private void handlerAsyncWorker(IMessage msgReq, IMessageSink replySink)
{
AsyncResult ar = replySink.NextSink as AsyncResult;
IMessage msgRsp = SyncProcessMessage(msgReq);
replySink.SyncProcessMessage(msgRsp);
}
This is a derived class from the IChannelReceiver interface from the
System.Runtime.Remoting.Channels namespace. The class has several
constructors to config the receiver channel programmatically and one constructor
for administratively configuration using the config file. Their design
implementation is similarly to the MSMQSender.
The implementation of the MSMQReceiver class is simpler than
MSMQSender. The listener run an endless loop in the background thread which
is blocked by m_InQueue.Receive() function. When message arrived into the
queue, this function will retrieve this message and delegate it to the worker
thread in the fire&forget fashion using the BeginInvoke design pattern. The
above is shown in the following code snippet:
private void Run()
{
try
{
Thread.CurrentThread.IsBackground = true;
while(true)
{
Message msg = m_InQueue.Receive();
delegateDispatchMessage ddm = new delegateDispatchMessage(DispatchMessage);
ddm.BeginInvoke(msg, null, null);
}
}
catch(Exception ex)
{
Trace.WriteLine(string.Format("RemoteObject-MSMQListener error = {0}", ex.Message));
}
}
The actually work is done in the following background worker thread:
public override void DispatchMessage(Message msg)
{
MessageQueueTransaction mqtx = new MessageQueueTransaction();
try
{
IMessage msgReq = msg.Body as IMessage;
msgReq.Properties["__Uri"] = msgReq.Properties[OBJECTURI];
Trace.WriteLine(string.Format("RemoteObject:PRE-CALL, msgId={0}", msg.Id));
IMessage msgRsp = ChannelServices.SyncDispatchMessage(msgReq);
if(msgRsp != null)
{
mqtx.Begin();
msg.BodyStream.Close();
msg.Body = msgRsp;
msg.CorrelationId = msg.Id;
msg.ResponseQueue.Send(msg, "RET: " + msg.Label, mqtx);
mqtx.Commit();
msg.ResponseQueue.Close();
Trace.WriteLine(string.Format("RemoteObject:POST-CALL, msgId={0}", msg.CorrelationId));
}
else
Trace.WriteLine(string.Format("RemoteObject-[OneWay]: msgId = {0}", msg.Id));
}
catch(MessageQueueException ex)
{
Trace.WriteLine(string.Format("RemoteObject error = {0}, msgId = {1}", ex.Message, msg.Id));
}
catch(Exception ex)
{
Trace.WriteLine(string.Format("RemoteObject error = {0}, msgId = {1}", ex.Message, msg.Id));
}
finally
{
if(mqtx.Status == MessageQueueTransactionStatus.Pending)
{
mqtx.Abort();
}
}
}
The primary goal of the above function is to dispatch an arrived IMessage
object to the remoting infrastructure in the synchronously manner. The
"dispatcher" will
return back a return message represented by IMessage object. If the
return value is null than nothing to do and thread will be exited, that's
the situation of the OneWay attributed remote method. In the other hand, the
IMessage object is serialized into the message body and send it back to the
Respond queue. Note that setting a message CorrelationId by its Id
is very important for the Sender side - this is a callback cookie, which the
Sender waiting for it.
Testing the MSMQ Remoting Channel (MSMQChannelLib) can be done
replacing your current channel by the MSMQ channel or using the included package
of the following assembly:
- RemotingInterface.dll
- RemotingObject.dll
- Server.exe
- Cient.exe
The Remoting object is very simply class using the Interface contract to make
a loosely coupled client/server design.
using RemoteInterface;
namespace RemoteObject
{
public class RemObject : MarshalByRefObject, IRemInterface
{
public RemObject()
{
}
public string SayHello(string msg)
{
Console.WriteLine("RemObject.SayHello({0})", msg);
return "Hello" + " " + msg;
}
[OneWay]
public void OneWay(string msg)
{
Console.WriteLine("RemObject.OneWay({0})", msg);
}
}
}
The server and client are also very straightforward and simply.
Before starting the test the following assemblies have to be installed in the
GAC:
- MSMQCannelLib on the both machine
- RemoteInterface on the both machine
- RemoteObject on the server machine
also you must create manually the following MSMQ queues:
- ReqChannel queue on the server machine - transactional
- RspChannel queue on the client machine - transactional
- AdminChannel queue (option)
Note that this test can be run also using only one machine as a server and
client.
After all the above checks, start the server host program and then
client program. The client program will ask you to write a number of
the test loops, just write 10 and press Enter. You will see all
remoting processes how fast running on the server and client console programs.
Note that client is running test for each remote method synchronously and
asynchronously. After all these test loops, press Enter and the client
console will display a test result. It should by 100 for both of them.
You can play more test such as open more server host programs or more client
programs and also you can change a channel configuration in the their config
files.
In this article has been described how to design, implement and plug-in the
MSMQ into the remoting infrastructure. Beside the standard remoting channel such
as http and tcp, the MSMQ remoting channel allows to call a remoting object in
the loosely coupled design pattern, which is very important in the mobile
application. Design and implementation of the Custom Remoting channel using
.Net Framework classes is simple and straightforward for any kind of physical
channel such can be as serial RS232, USB, X25, modem, etc.