Contents
The event driven architecture model is based on the synchronous
pre-processing and asynchronous post-processing design patterns. By using the
Remoting infrastructure and MSMQ Technology for implementation of these
patterns, it allows to encapsulate a business model from its physical
deployment. This article shows the implementation and usage of the MSMQ Channel
in the distributed application model such as the transactional calls,
post-processing and its configuration. I also recommend to review my previous
article Using MSMQ
for Custom Remoting Channel for the concept and design issues.
The MSMQ Channel supports the following features:
- Full Remoting custom model
- Channel Knowledge Base
- Run-time updating Channel Knowledge Base using the remoting design pattern
- Configuration file
- Public and Private queues (including the remote private queues)
- Direct format name (including the MSMQ 3.0 over Internet)
- Chaining channels mechanism
- Mapping a logical address of the connectivity to the full physical url
address
- Fire&Forget transactional design pattern including the COM+/DTC
transaction
- Multiple Fire&Forget calls in the transactional manner
- Passing the application info using the
CallContext
object
(Ticket object)
- Multithreading transactional receiver with the thread-pool controller
- Transactional Retry mechanism
- Smart retry exception filter
- Send and Receive timeout administration
- Endpoint Notify mechanism to retrieve a remoting message from any queue
- Acknowledge Notification
- Exception Notification
- Support only the SAO remote object such as
SingleCall
and
Singleton
EventLog
The MSMQ Channel (like any remoting channel) is a stateful object in the
appDomain process identified by its unique name, for instance,
msmq
. Its state can be initiated during the construction time by
properties located in the config file. All these properties represent a
Knowledge Base (KB) of the channel. Based on their values the channel can
perform a change its state. For instance, a retry mechanism needs to know how
many times to re-deliver a message to the endpoint during the exception process.
Some of the properties are constant and they can not be changed during their
lifetime - it's called as a static KB, for instance: Channel Name. On the other
hand, the dynamic properties can change a behavior of the channel on the
run-time without restarting the process. Coming to the point, the MSMQ Channel
has a control mechanism to :
- initiate its Knowledge Base from the config file like the standard channel
- create and modify its Knowledge Base on the runtime programmatically like
another remote object
This last feature allows to build a smart distributed model with a
transparent connectivity between the remote object and its consumer where its
state can be dynamically changed based on the business behavior - it's called as
a learning and tuning design pattern.
Note : The MSMQ Channel allows to change its state on
the run-time programmatically using the standard remoting design
pattern. |
The following picture shows a high level design of the MSMQ Channel between
the remote object and its consumer:

As you can see, each MSMQ custom channel has own Knowledge Base to hold the
channel state. The KB is initiated during the registration process of the
remoting channel and published as a remoting object using the channel name as
its endpoint. Access to the KB is via the IKnowledgeBaseControl
interface contract.
In prior of the MSMQ Channel registration, the MSMQ queues have to be
available to send and receive messages. The minimum requirement is to have a
destination queue for the message call (request channel). In the above picture
is the ReqChannel
queue. Based on the application requirements,
additional queues such as retry queue and administration queue can be added. The
name of the retry queue is based on the request queue name with the postfix
_Retry
. Note that all queues except the administration one have to
be a transactional.
The receiver, based on the Knowledge Base can distributed the target response
to the notification remote objects, such as the Notify Remote Object (NRO),
Exception Remote Object (ERO) and Acknowledge Remote Object (ARO) using the
standard remoting manner and interface contract IRemotingResponse
.
This feature allows to chain channels, so the response messages can be sent to
the additional queues, for instance: ReqChannel_ACK
,
ReqChannel_ERROR
, etc. using the MSMQ Sender Channel features.
The notifyurl
property has a special feature for delivering the
remoting IMessage
to the endpoint. If its value is not empty, the
IMessage
is passed to the NRO notification object via its argument.
This design pattern will allow to evaluate any IMessage received from the queue
and make the application specific decision at the application layer such as
re-routing, modifying, etc.
Notes :
- The Knowledge Base can be locked administratively using the
updateKB property to prevent its runtime updating. In this case,
the KB will hold the properties from the config file.
- The
MSMQChannel Response can be distributed to the notification
remote objects for its post-processing such as evaluation of the output
arguments, business tracing, rollback, monitoring, tuning the channel's
Knowledge Base, etc. |
Message workflow
Consuming the remote object using the remoting infrastructure via the MSMQ
custom channel is full transparently. However, there is necessary to know its
behavior to properly setup the Knowledge Base of the Sender and Receiver
channel.
Sender
The remoting message begins at the client side where the remote method is
invoked on the transparent proxy. To obtain the transparent proxy of the remote
object, the client calling the Activator.GetObject
with the
specified interface contract and the physical url address of the endpoint. To
avoid explicitly (hard-coded) url address, the MSMQ Channel sender has a
capability to use a logical url address related to the business connectivity
model. For instance: msmq://BusinessEndpoint
. This logical address
is a constant and it can be hard-coded in the application layer. The MSMQ
Channel scans the logical address in the Knowledge Base to find its physical
interpretation. For its negative result, the Sender
will suppose
that this is a physical address. Note that also the hard-coded physical url
address can be mapped into the another physical one. This feature allows to
configure a properly deployment pattern administratively or programmatically on
the fly. When this process is completed, the message is sent to the destination
queue. This delivering can be under timeout control - sendtimeout
limit property, special when the message has been sent over network or to
disconnect able server. The message lifetime (waiting for receiver) can be also
under the timeout limit specified by its receivertimeout
property.
When the time is elapsed, the message is stored in the administration queue
(must be specified).
Note : The remote method is using a fire&forget design
pattern for any type of the call such as synchronous, asynchronous or OneWay
attributed. This means that the call will be not waited for any response from
the target object. Its responsibility is to send the IMessage to
the transactional queue and return back to the caller with the null
IMethodReturnMessage . In the case of the distributed transaction
(COM+/DTC) the IMessage is sent to the destination queue when
transaction has been committed. |
|
The Sender channel configuration part is shown in the following example:
<channel ref="msmq_sender" name="msmq" priority="1"
updatekb="true"
receivetimeout="20"
sendtimeout="10"
useheaders="false"
admin=".\private$\adminchannel"
lurl="Queue=.\private$\ReqChannel,
BusinessEndpoint=.\private$\ReqChannel/endpoint,
.\private$\ReqChannel/endpoint=.\private$\ReqChannel/endpointABC">
</channel>
The Knowledge Base of the Sender
channel:
NAME | DEFAULT VALUE | KB | NOTE |
name | "msmq" | static | Unique name of the channel |
priority | "1" | static | Priority of the messaging. Not used in this version. |
updatekb | "true" | static | Lock the Knowledge Base to prevent its updating on the
runtime |
useheaders | "false" | static | Pass the sink transport headers to the receiver. |
admin | "" | dynamic | The queue path to the administration non-transactional queue
|
sendtimeout | "60" | dynamic | The time limit for the message to reach the destination queue. The
value is in seconds and -1 represents the InfiniteTimeout |
receivetimeout | "60" | dynamic | The time limit for the message to be retrieved from the
destination queue. The value is in seconds and -1 represents the
InfiniteTimeout |
lurl | "" | dynamic | The logical connectivity knowledge to map logical address to the
physical url address. The format using name/value pattern with a comma
delimiter. For instance:
lurl = "End = .\private$\ReqChannel/endpoint, FarEnd =
.\private$\ReqChannel;
tcp://localhost:9999/endpoint" |
Receiver
The MSMQ Channel Receiver is more sophisticated part of the channel than its
Sender side. It runs in the post-processing manner, triggered by the incoming
message from the specified message queue - listener
property. Its
Knowledge Base has more properties to control the message workflow. The message
is retrieved from the queue in the transactional manner and dispatched to the
target endpoint. The result of the remote call is distributed to the
notification object - ARO/ERO
. There is a feature for the exception
case - retry mechanism. Based on its properties such as retry
counter, retrytime
and retryfilter
, the message can be
sent for the specified time to the retry queue for its lately re-delivering to
the target object. This process is repeating for a retry times and then the
IMessage
is sent to the specified exception object as a not
deliverable message. To prevent retrying a message for a specified error (always
generated it), the Knowledge Base contains a property retryfilter
.
The receiver scans its KB for the obtained target error and for its positive
result will invoke immediately a specified exception object instead of passing
it through the retry mechanism. Because the retry mechanism circulates the
original message between the listener queue and its retry queue, the message
lifetime can be turned off using the usetimeout
property otherwise
the message is sent to the administration queue when its time limit has been
elapsed. Other words, delivering a message to the target object can be under the
timeout limit including its retry mechanism. Note that in this case the all
timeout properties for Sender and Receiver must be properly calculated.
Note: Retrieving or retrying a queue messages are
processing in the transactional manner to prevent their lost during the shutdown
process. |
Pulling messages from the queue is controlled by the receiver thread-pool
controller to allow only a specified number of the receiver workers ran. This
maxthreads
property can be in the range of 0 to 20 threads, where
value zero represents a turn-off listener from its incoming queue. The receiver
worker represents a root thread (virtual client) to execute the
IMessage
. Its lifetime is depended from the target application.
When all workers are busy for the notifytime
time, the warning
message is sent to the EventLog
.
The Receiver channel configuration part is shown in the following
example:
<channel ref="msmq_receiver" name="msmqRcv"
listener=".\private$\ReqChannel"
updatekb="true"
usetimeout="false"
maxthreads="10"
retry="2"
retrytime="20"
retryfilter="No receiver registered. No connection could be
made because the target machine actively refused it."
acknowledgeurl="msmq://.\private$\ReqChannel_Ack;
tcp://localhost:4444/endpoint"
exceptionurl="FormatName:DIRECT=
http://localhost/msmq/private$/reqchannel_Error/endpoint>
</channel>
The Knowledge Base of the Receiver channel:
NAME | DEFAULT VALUE | KB | NOTE |
name | "msmq" | static | Unique name of the channel |
priority | "1" | static | Priority of the messaging. Not used in this version. |
updatekb | "true" | static | Lock the Knowledge Base to prevent its updating on the
runtime |
listener | "" | static | The queue path for receiver. If the queue is non-transactional,
the retry mechanism is blocked. |
retry | "0" | dynamic | The retry counter for re-delivering message to the target
object |
retrytime | "60" | dynamic | The time limit for the message to wait for its re-delivering. The
value is in seconds and -1 represents the InfiniteTimeout |
retryfilter | "60" | dynamic | The filter of the exception messages. The positive result will
skip the retry mechanism and forward a message to the exception notification
object. (see exceptionurl property) |
maxthreads | "20" | dynamic | Maximum number of workers in the transactional thread pool. Value
zero or negative will turn-off the thread-pool from the listener
queue. |
usetimeout | "false" | dynamic | Use the sender's receivetimeout limit to complete the
re-delivering message to the target object. When the timeout is elapsed, the
message is sent to the administration queue. |
notifytime | "60" | dynamic | The time limit to notify the EventLog that the thread
pool is busy. |
notifyurl | "" | dynamic | The Url address of the notify object. When its value is not empty,
the IMessage is going to be delivered to its specified endpoint
instead of the target object. This feature allows to retrieve the
IMessage from other queues using the same receiver
mechanism. |
acknowledgeurl | "" | dynamic | The Url address of the acknowledge notification object. Use this
property to obtain the result of the target remote method. The acknowledge
remote object can exanimate the IMethodReturnMessage for all
requested values like a real client. |
exceptionurl | "" | dynamic | The Url address of the exception notification object. The endpoint
object will receive a thrown Exception object and body of the remoting call -
IMethodCallMessage stream. |
Note that all Url properties can have a format for chaining channels which it
allows to forward messages through more than one physical channel. The following
example shows this feature:
exceptionurl = "msmq://.\private$\ReqChannel_Error ;
tcp://localhost:4444/endpoint"
where the exceptionurl
property has been specified to forward an
exception to the ReqChannel_Error
and then through the standard tcp
channel to the endpoint object.
IKnowledgeBaseControl
The Knowledge Base is published by the IKnowledgeBaseControl
interface contract and registered for remoting under the name of the channel.
The following code snippet show the abstract definition of this contract:
public interface IKnowledgeBaseControl
{
void Save(string filename);
void Load(string filename);
void Store(string strLURL, bool bOverwrite);
void Store(string name, string val, bool bOverwrite);
void Update(string strLURL);
void Update(string name, string val);
void RemoveAll();
void Remove(string name);
object GetAll();
string Get(string name);
bool Exists(string name);
bool CanBeUpdated();
string Mapping(string name);
}
Programmatically accessing the KB using the standard remoting design
pattern:
string strObjectUrl = "tcp://localhost:1332/msmq";
Type objObjectType = typeof(IKnowledgeBaseControl);
IKnowledgeBaseControl ro = (IKnowledgeBaseControl)
Activator.GetObject(objObjectType, strObjectUrl);
ro.Update("name", "value");
Note: The Knowledge Base of the MSMQ Channel is stateful and the Load
Balanced Cluster mechanism has to be considered for its updating. The best
solution is to create a local KB controller for each node separately, driven by
the usage of the local resources such and MSMQ private queues.
|
IRemotingResponse
The IRemotingResponse
is an interface contract between the
notification object and channel. Based on the channel state, the receiver can
dispatch a response to the specified notification object to perform a specific
task such as recovering, re-routing, updating a business state, tracing,
logging, tuning the channel's KB and etc. See the Test
Sample solution - the RemotingObject.cs file for its implementation. The
following code snippet described its abstract definition:
#region IRemotingResponse
[Serializable]
public class MsgSourceInfo
{
Acknowledgment m_Acknowledgment;
DateTime m_MessageSentTime;
string m_ResponseQueuePath;
string m_ReportQueuePath;
public Acknowledgment Acknowledgment
{ get { return m_Acknowledgment;}
set { m_Acknowledgment = value; }}
public DateTime MessageSentTime
{ get { return m_MessageSentTime;}
set { m_MessageSentTime = value; }}
public string ResponseQueuePath
{ get { return m_ResponseQueuePath;}
set { m_ResponseQueuePath = value; }}
public string ReportQueuePath {
get { return m_ReportQueuePath;}
set { m_ReportQueuePath = value; }}
}
public interface IRemotingResponse
{
void ResponseNotify(MsgSourceInfo src, object msg);
void ResponseAck(object msg);
void ResponseError(object msg, Exception ex);
}
#endregion
CallContext Ticket
The CallContext
- Ticket object allows to pass the application
specific info through the remoting infrastructure. It's an option feature of the
MSMQ Channel to hide all implementation logic and simplified its usage. Any
point of the remoting hierarchy including the both endpoints in the application
layer such as client and remoting object can modify the Ticket. The Ticket is
using the String
type name/value design pattern.
#region CallContext Ticket
[Serializable]
public class LogicalTicket : NameValueCollection,
ILogicalThreadAffinative
{
public LogicalTicket();
public LogicalTicket(params string[] args);
public LogicalTicket(IMessage imsg);
protected LogicalTicket(SerializationInfo info,
StreamingContext context);
public void Set();
public void Reset();
)
#endregion
Usage of the Ticket
is very generic and it can be very useful,
for instance, to handle a compensation issue during the distributed transaction
for the disconnected queue .
The MSMQ Channel (Sender and Receiver) requires to install the following
assemblies into the GAC:
ICustomChannel
- interface contract
MSMQChannel
- Sender and Receiver channels
You can install them manually or using the .msi file included in this
article. Note that before using the MSMQ Channel is necessary to have all MSMQ
resources specified by the channel's KB registered and ready to send/receive
messages. This task is not automated and it has to be completed manually.
The MSMQ Server/Receiver Channels can be declared globally in the
machine.config file or in the host process config file. The following
example shows their declaration in the channels tag:
<channels>
<channel id="msmq_sender" type="RKiss.MSMQChannel.Sender, MSMQChannel,
Version=2.0.0.0, Culture=neutral, PublicKeyToken=7cfeed5d7aef9679" />
<channel id="msmq_receiver" type="RKiss.MSMQChannel.Receiver, MSMQChannel,
Version=2.0.0.0, Culture=neutral, PublicKeyToken=7cfeed5d7aef9679" />
</channels>
Note: Use the .msi file for installation of the the MSMQ
Channel. |
In the previous chapters have been described the MSMQ Channel Sender and
Receiver from the installation and configuration point of the view. Now, I will
show you a simple and advanced usage of the MSMQ Channel in the event driven
architecture. Before the first example here are some useful comments:
- Transaction context doesn't flow through the standard Remoting channels such
as tcp and http
- MSMQ Channel Sender can handle itself a single transactional message
- MSMQ Channel Sender can be part of the distributed transaction (DTC)
- MSMQ Channel supports only Fire&Forget call
- Multiple Fire&Forget Calls in the transactional manner can be completed
using the ServiceConfig feature (SWC)
- MSMQ Channel Receiver can dispatch a message to the local endpoint
- MSMQ Channel Receiver can dispatch a message to the chained channels
- MSMQ Channel Receiver worker represents a virtual client
- MSMQ Channel can be hosted by any process such as IIS, Windows Service,
Windows Form, dllhost, etc.
- Remoting Messages are stateful
- The event driven architecture model is based on the "push model" design
pattern
- Remoting infrastructure allows to isolate a logical model from its physical
deployment
- Remoting channel supports the deployment patterns
The following examples show patterns:
- Business Logic model - where object and its consumer are described by
the logical connectivity. Note that this pattern is driven by the application
and it can be full virtualized. For instance: mobile deployment, desktop
deployment, etc.
- Deployment - where the logical connectivity is mapped to the physical
environment using the deployment pattern model. This layer respects the physical
environment and architecture design pattern such as synchronous processor and
asynchronous post-processor. The behavior of this pattern can be changed
administratively using the config files. In the case of using the MSMQ Channel
channel, the Knowledge Base can be updated on the runtime.
The host process config files are part of the deployment pattern. Each end of
the connectivity is hosted by own AppDomain (process) and they are connected
each other using the Sender and Receiver mechanism. The host process also can
hosted both ends such as Receiver and Sender. The following config snippet show
example of the config file:
<configuration>
<configSections></configSections>
<appSettings>
add key="DataSource" value="connection string to your database"
</appSettings>
<system.runtime.remoting>
<application>
<service>
<wellknown mode="SingleCall" type="RemoteObject,
RemoteObject" objectUri="endpoint" />
</service>
<channels>
<channel ref="msmq_sender" name="msmq"
receivetimeout="20"
sendtimeout="10"
admin=".\private$\adminchannel"
lurl="EventABC=.\private$\ReqChannel/endpoint,
RootObjectABC=FormatName:DIRECT=
http://localhost/msmq/private$/reqchannel,
BusinessLogicABC=.\private$\ReqChannel;
tcp://localhost:4444/endpoint"
</channel>
<channel ref="msmq_sender" name="msmqEvent"
receivetimeout="20"
sendtimeout="10"
admin=".\private$\adminchannel"
lurl="EventABC=.\private$\EventQueue/endpoint"
</channel>
<channel ref="msmq_receiver" name="msmqRcv"
listener=".\private$\ReqChannel"
usetimeout="false"
retry="2"
retrytime="20"
retryfilter="Requested Service not found.
No connection could be made because
the target machine actively refused it."
acknowledgeurl="tcp://localhost:4444/endpoint"
exceptionurl="tcp://localhost:4444/endpoint">
</channel>
<channel ref="tcp" port="4444" />
</channels>
</application>
<channelSinkProviders></channelSinkProviders>
<channels>
<channel id="msmq_sender" type="RKiss.MSMQChannel.Sender,
MSMQChannel, Version=2.0.0.0, Culture=neutral,
PublicKeyToken=7cfeed5d7aef9679" />
<channel id="msmq_receiver"
type="RKiss.MSMQChannel.Receiver, MSMQChannel,
Version=2.0.0.0, Culture=neutral,
PublicKeyToken=7cfeed5d7aef9679" />
</channels>
</system.runtime.remoting>
</configuration>
Example 1. - Generating Event
This is a simple example how to yield the synchronous call and make its
post-processing.

The Client calls the remoting method in the fire&forget manner. This
process is synchronously and transactional with a high performance profile. The
client guarantees that any time the request (inquire) call will be performed and
accepted without wary about the target connectivity. The request call is a
stateful message and it can be post-processed with the timeout limit. Based on
the application, the remote object also can call itself (recursively call)
without wary about the stack overflow. As I mentioned early, the message is a
stateful object and it can be modified during its traveling. Based on this
state, the process can be finished and generated the event notification using
the above described scenario. That's the typical behavior of the "push
model"
In the case of the distributed multiple events, the Client can use a new .Net
1.1 feature - SWC (Service Without Components). I like this feature and here is
its implementation from the Test Sample code (Form1.cs):
private void buttonAll_Click(object sender, System.EventArgs e)
{
LogicalTicket ticket = null;
try
{
string[] info = comboBoxTicket.Text.Split(',');
ticket = new LogicalTicket(info);
ticket.Set();
ServiceConfig sc = new ServiceConfig();
sc.Transaction = TransactionOption.RequiresNew;
sc.TransactionTimeout = 20;
sc.TrackingAppName = "MSMQChannel Test";
sc.TransactionDescription = "buttonAll_Click";
sc.TrackingEnabled = true;
ServiceDomain.Enter(sc);
DateTime dt;
long before = DateTime.Now.Ticks;
string strObjectUrl = comboBoxUrl.Text;
Type objObjectType = typeof(IPing);
IPing ro = (IPing)Activator.GetObject(objObjectType,
strObjectUrl);
ro.OneWay(string.Format("#{0}: {1}", m_counter,
"First message"));
ro.Echo(string.Format("#{0}: {1}", m_counter,
"Second message"), out dt);
ro.Ping(string.Format("#{0}: {1}", m_counter,
"Third message"));
long after = DateTime.Now.Ticks;
if(comboBoxMsg.Text.IndexOf("msgbox") >= 0)
{
if(MessageBox.Show("Are you sure to abort it?", "Sample",
MessageBoxButtons.YesNo)==DialogResult.Yes)
throw new Exception("This call has been aborted");
}
if(comboBoxMsg.Text.IndexOf("throw") >= 0)
throw new Exception("This is an exception test");
ContextUtil.SetComplete();
ListBoxAdd(string.Format("#{0} done, time={1}[ms]",
m_counter++, (after-before)/10000));
}
catch(Exception ex)
{
ContextUtil.SetAbort();
Trace.WriteLine(ex.Message);
ListBoxAddXml(ex.Message);
}
finally
{
ServiceDomain.Leave();
if(ticket != null)
ticket.Reset();
}
}
The above bolded code is related to SWC feature. Notice that the code snippet
is a part of the Windows Form without the derived from the
ServicedComponent
class. Its concept is very straightforward, based
on creating the transaction context using the ServiceConfig
class
and passing its reference to the current appDomain
. The rest is the
same like in the Enterprise Services. You can test this feature pressing the
button All
in the ClientForm
program - see Test.
For the large and more complex application model, the business control state
is persisted in the database. The following example shows how can be worked two
different resources such as database and MSMQ in the transactional manner:
Example 2. - Updating Business State

The MSMQ Channel supports the transactional call using the DTC transaction
mechanism. The transactional Root Object can work with the managed transactional
recourses such as Database and MSMQ in the atomic manner. It guarantees that all
resources must be committed or roll backed all of them. In the above example,
it's not important which resource is going to be handled first (that's the first
phase of the commit), because when the root object is out of scope all resources
are going to commit respectively make a rollback if less one of them aborted.
After this we can expected a message in the MSMQ Channel. This is a perfect
feature of the MSMQ Channel channel to join the transactional party in the root
object.
The above solution is suitable for a corporate network. What about if the
business state located over Internet.
Example 3. - Updating Business State over Internet

The current technology doesn't support a Distributed Transaction Coordinator
(DTC) over Internet. The Internet is based on the loosely connectivity and it's
not good idea to lock the resource for the long time. There is a some "pseudo"
solution to help with this issue. The Transaction needs to be divided into the
synch processing and asynch post-processing transaction and its compensator. The
compensator will have a responsibility to make a "rollback resources". As the
above picture shows, there are two root objects for handling the resources
located behind the firewall. Using the MSMQ Channel the request call can be
post-processed in the target root object. Basically there are two scenarios
between the root objects:
- The Synch Call is processing locally and its message is stored in the local
queue like in the previous examples. The post-processing is over Internet using
the chained channel mechanism (see my article Remoting Over
Internet). This solution is suitable if the remote object is available.
- Using the MSMQ 3.0 feature - Direct format name over Internet. In this case,
the Fire&Forget is called over Internet and the destination queue is behind
the firewall. Note that the client side needs to know in advance the full direct
name of the destination queue. This solution is suitable for the disconnect able
applications.
Once again, the properly solution can be selected and configured in the
deployment pattern using the config files.
Example 4. - "Parallel processing"

This example shows a technique how to create a "parallel business processor"
using the MSMQ Channel features. Let me assume we have an application to migrate
the thousands pages of the thousands records to another database schema. The
task can be accomplished using the legacy synchronous design pattern, where
records are processing sequentially one by one in the main loop. Everything is
fine except that the processing time is too long. The event driven architecture
supported by the MSMQ Channel channel can accomplish this task faster, better
manageable, trace able and the components are re-usable. Here is its solution,
which it requires to divided this process into the following parts:
- Pre-processor represented by the Manager component to handle pages of
the records and the control process state in the database. Its logic will invoke
a business remoting method with the specific state in the loop of the entry
page. The calls are synchronous using the fire&forget mechanism. After this
loop the Manager will update its state in the database and commit all resources.
When the Manager left the scope, the next phase is starting.
- Processor represents the actually business layer, where the remoting
method is processing its business logic based on the received state. The root
object is a transactional
SingleCall
(stateless) remoting object.
The incoming messages from the MSMQ Channel are performed in the thread-pool
oriented workers. Each root object will update the control state in the
database. Based on this state the finally event can be generated to start the
post-process.
- Post-Processor is receiving the event message (call) to finalize the
application process such as aborted, done, etc.
When you look at again the above picture, you will see how the "push model"
is forwarding a request message from the left-to-right side. The implementation
of the above pattern is straightforward and simple, much better test able than
the first one - legacy sequentially solution.
To change deployment for the root objects is very simple using the chaining
channels as it is shown in the following:
before: BusinessLogicABC=.\private$\ReqChannel/endpoint"
after: BusinessLogicABC=.\private$\ReqChannel ;
tcp://localhost:4444/endpoint" |
where the root objects after that are hosted in the separate process for
instance: Windows Service.
Smart Deployment Tool - idea
As I mentioned early, the business model encapsulated services into the
deployment pattern design. This step is implemented by the remoting
infrastructure and its custom channel such as MSMQ Channel. It will be nice have
a visual tool to administrate the application deployment pattern statically
(creating the config files) and dynamically on the fly (see
MSMQChannel
Knowledge Base). The following picture shows this
idea:

The concept of this idea is based on the remoting infrastructure. Based on
the physical environment and deployment pattern the Tool will generate a host
processes, config files, etc. During the runtime, the tool can subscribe its
subscription to the custom sink publisher - Remoting Probe to obtain the model
behavior for its tuning process (config files, Knowledge Base, etc.)
Implementation of the Sender and Receiver of the MSMQ Custom Channel is
straightforward using the standard remoting boilerplate. The design pattern of
the remoting boilerplate is very well documented and also published in the
advanced books. Additionally, you can find more details in my previous article
[1]. Here, I am going to describe some interesting part related
with the MSMQ, only.
Sender
The core of the MSMQChannel.Sender
is implemented in the
AsyncProcessRequest
method. Based on the Knowledge Base
(properties) the message call is initialized and sent to the destination queue
using the specified transaction option. The following code snippet shows
that:
public void AsyncProcessRequest(IClientChannelSinkStack sinkStack,
IMessage msgReq, ITransportHeaders headers, Stream stream)
{
string strQueuePath = null;
string strObjectUri = null;
Message outMsg = null;
MessageQueueTransaction mqtx = new MessageQueueTransaction();
try
{
#region pre-processor (mapping url address)
strObjectUri = ParseLogicalUrl(m_LogicalUri, out strQueuePath);
msgReq.Properties[MSMQChannelProperties.ObjectUri] = strObjectUri;
if(m_Sender.AllowHeaders == true)
{
headers["__RequestUri"] = strObjectUri;
msgReq.Properties["__RequestHeaders"] = headers;
}
#endregion
#region send a remoting message
m_OutQueue.Path = strQueuePath;
outMsg = new Message(msgReq, new BinaryMessageFormatter());
int intTimeToBeReceived = m_Sender.TimeToBeReceived;
if(intTimeToBeReceived > 0)
outMsg.TimeToBeReceived =
TimeSpan.FromSeconds(intTimeToBeReceived);
int intTimeToReachQueue = m_Sender.TimeToReachQueue;
if(intTimeToReachQueue > 0)
outMsg.TimeToReachQueue =
TimeSpan.FromSeconds(intTimeToReachQueue);
if(m_Sender.AdminQueuePath != MSMQChannelDefaults.EmptyStr)
{
outMsg.AcknowledgeType = AcknowledgeTypes.NegativeReceive |
AcknowledgeTypes.NotAcknowledgeReachQueue;
outMsg.AdministrationQueue = m_Sender.AdminQueue;
}
string label = string.Format("{0}/{1}, url={2}",
Convert.ToString(msgReq.Properties["__TypeName"]).Split(',')[0],
Convert.ToString(msgReq.Properties["__MethodName"]), strObjectUri);
if(ContextUtil.IsInTransaction == true)
{
m_OutQueue.Send(outMsg, label,
MessageQueueTransactionType.Automatic);
}
else
{
mqtx.Begin();
m_OutQueue.Send(outMsg, label, mqtx);
mqtx.Commit();
}
#endregion
}
catch(Exception ex)
{
string strError =
string.Format("[{0}]MSMQClientTransportSink.AsyncProcessRequest
error = {1}, queuepath={2},",
m_Sender.ChannelName, ex.Message, strQueuePath);
m_Sender.WriteLogMsg(strError, EventLogEntryType.Error);
throw new Exception(strError);
}
finally
{
#region clean-up
if(mqtx.Status == MessageQueueTransactionStatus.Pending)
{
mqtx.Abort();
}
if(outMsg != null)
outMsg.Dispose();
#endregion
}
}
The above method is called by the ProcessMessage
in the
MSMQChannel
Transport Sink. Here is one interesting thing. As I
mentioned early, the Sender supports only a remoting call using the
Fire&Forget design pattern for any kind of the call. Its return value
(IMethodReturnMessage
) is always null, see the following code
snippet:
public void ProcessMessage(IMessage msgReq,
ITransportHeaders requestHeaders, Stream requestStream,
out ITransportHeaders responseHeaders, out Stream responseStream)
{
IMessage msgRsp = null;
try
{
#region send a remoting message
AsyncProcessRequest(null, msgReq, requestHeaders, requestStream);
#endregion
#region this is a Fire&Forget call, so we have
to simulate a null return message
object retVal = null;
IMethodCallMessage mcm = msgReq as IMethodCallMessage;
MethodInfo mi = mcm.MethodBase as MethodInfo;
if(mi.ReturnType != Type.GetType("System.Void"))
retVal = mi.ReturnType.IsValueType ?
Convert.ChangeType(0, mi.ReturnType) : null;
msgRsp = (IMessage)new ReturnMessage(retVal, null, 0, null, mcm);
#endregion
}
catch(Exception ex)
{
msgRsp = new ReturnMessage(ex, (IMethodCallMessage)msgReq);
}
finally
{
#region serialize IMessage response to return back
BinaryFormatter bf = new BinaryFormatter();
MemoryStream rspstream = new MemoryStream();
bf.Serialize(rspstream, msgRsp);
rspstream.Position = 0;
responseStream = rspstream;
responseHeaders = requestHeaders;
#endregion
}
}
That's all for the Sender
side.
Receiver
The other side, Receiver
is more complex with the event driven
design pattern . Its core implementation is based on the following methods:
Init
- with a responsibility to initialize the incoming
and retry queues based on the Receiver Knowledge Base. Registering the delegate
for a PeekCompleted
event will force the control to its handler as
an action of the incoming message. This handler is a Manager method.
Manager
with a responsibility to delegate a message to
its background worker based on the Thread Pool availability and Knowledge Base
(maxthreads
property). The manager makes a fast delegate
work and at the end it will call again the BeginPeek
message for
the next possible incoming message. Note that the actually work is done in its
worker - MessageWorker
.
#region Manager
private void Manager(object source,
PeekCompletedEventArgs asyncResult)
{
MessageQueue mq = source as MessageQueue;
try
{
mq.EndPeek(asyncResult.AsyncResult);
while(true)
{
if(m_EventMgr.WaitOne(NotifyTime * 1000, true) == false)
{
if(MaxNumberOfWorkers > 0)
{
string strWarning = string.Format(
"[{0}]MSMQChannel.Receiver:" +
"All Message Workers are busy", ChannelName);
WriteLogMsg(strWarning, EventLogEntryType.Warning);
continue;
}
else
{
m_Listening = false;
return;
}
}
break;
}
MessageWorkerDelegator mwd =
new MessageWorkerDelegator(MessageWorker);
mwd.BeginInvoke(source, null, null);
}
catch(Exception ex)
{
string strErr = string.Format(
"[{0}]MSMQChannel.Receiver:Mamager failed, error = {1}",
ChannelName, ex.Message);
WriteLogMsg(strErr, EventLogEntryType.Error);
}
finally
{
if(m_Listening == true)
mq.BeginPeek();
else
Trace.WriteLine(string.Format(
"[{0}]MSMQChannel.Receiver.Manager has been disconnected",
ChannelName));
}
}
#endregion
MessageWorker
- retrieving the message from the incoming
queue in the transactional manner. After de-serializing its stream, the remoting
message is passed to the MessageDispatcher
. Its result can be
distributed based on the Knowledge Base to the acknowledge or exception remote
object.
MessageDispatcher
has a responsibility to deliver a
remoting message to the local endpoint or forward it to the next channel using
the chaining channels mechanism.
MessageDispatcher_NOTIFY
- this is a special feature of
the channel, when the remoting message can be redirected to the specified
remoting notify object. The IMessage
is serialized into the byte
array and passed to the endpoint.
MessageDispatcher_ACK
- to perform a remoting call to
the specified notification endpoint with the input argument such as
IMethodReturnMessage
. This notification allows to retrieve a
response from the target endpoint.
MessageDispatcher_ERROR
- to perform a remoting call to
the specified notification endpoint with the input arguments such as the
IMethodCallMessage and exception thrown by the target endpoint.
MessageDispatcher_RETRY
has a responsibility to move a
message from the incoming parent queue to the retry queue in the transactional
manner based on the Knowledge Base. This feature will allow to re-deliver a
remoting message to the target endpoint after the specified timeout limit.
RetryManager
has only one responsibility - a transactional
movement of the timed message from the retry queue back to the parent incoming
queue for its re-delivering process. The moved message represents an original
one with a delayed by the waiting queue (retry queue). As the MSMQ spec stated,
the administration queue can not be transactional. This restriction leaded me to
design this method based on queuing the retry messages and peeking the first one
in the properly time. When the message's arrived time and plus the
retrytimeout
limit is less that current time, the
RetryManager
is going to sleep. It's wakeup time will generate a
next BeginPeek
event and the scenario is repeating over. In the
other case, when the message timeout limit is elapsed, the message is retrieved
from the queue and sent to the parent queue (incoming queue) in the
transactional manner. The messages between the queues are moved sequentially one
by one using only one thread from the thread-pool. Here is its implementation:
#region RetryManager
private void RetryManager(object source,
PeekCompletedEventArgs asyncResult)
{
MessageQueueTransaction mqtx = null;
MessageQueue mq = (MessageQueue)source;
try
{
Message msg = mq.EndPeek(asyncResult.AsyncResult);
DateTime elapsedTime = msg.ArrivedTime.AddSeconds(RetryTime);
if(DateTime.Compare(DateTime.Now, elapsedTime) >= 0)
{
MessageQueue InQueue = new MessageQueue(ListenerPath);
InQueue.Formatter = new BinaryMessageFormatter();
mqtx = new MessageQueueTransaction();
mqtx.Begin();
Message msgToMove = mq.Receive(new TimeSpan(0,0,0,1));
InQueue.Send(msgToMove, mqtx);
mqtx.Commit();
}
else
{
TimeSpan deltaTime = elapsedTime.Subtract(DateTime.Now);
int sleeptime = Convert.ToInt32(deltaTime.TotalMilliseconds);
m_EventRetryMgr.WaitOne(sleeptime, true);
}
}
catch(Exception ex)
{
if(mqtx != null && mqtx.Status ==
MessageQueueTransactionStatus.Pending)
mqtx.Abort();
string strErr = string.Format(
"[{0}]MSMQChannel.Receiver:RetryMamager failed, error = {1}",
ChannelName, ex.Message);
WriteLogMsg(strErr, EventLogEntryType.Error);
}
finally
{
if(m_RetryListening == true)
mq.BeginPeek();
else
Trace.WriteLine(string.Format(
"[{0}]MSMQChannel.Receiver.RetryManager has been disconnected",
ChannelName));
}
}
#endregion
KnowledgeBase
The Knowledge Base in the custom channel is implemented as a remoting object
with a hash table to hold all dynamic properties. The object is published under
the channel name and its lifetime has been initialized for infinite time. The
channel inherit this object for its fast access. The public access is available
through the IKnowledgeBaseControl
interface contract. The following
code snippet shows the method to update the Knowledge Base:
public virtual void Update(string name, string val)
{
if(AllowToUpdate == false)
throw new Exception("The Knowledge Base is locked");
if(KB.Contains(name) == false)
throw new Exception(string.Format(
"The logical name '{0}' doesn't exist", name));
if(OnBeforeChangeKnowledgeBase(name, val) == false)
return;
KB[name] = val;
OnChangedKnowledgeBase(name, val);
}
As you can see, its implementation is very simple and straightforward. Its
public methods has a virtual signature, which it allows to override it in the
derived class
I built a separate solution with the following projects to test a MSMQ Custom
Channel:
Server
- console host process
Interface
- Interface contract for Remote Object
RemoteObject
- Remote object endpoint
RootObject
- Enterprise Service object
Client
- Windows Form
Before starting the Test be sure that MSMQChannel
and Sample
have been installed - see the Installation process.
Also I would like to remind you that the following MSMQ private queues have to
be available:
ReqChannel
- transactional
ReqChannel_Retry
- transactional
ReqChannel_Ack
- transactional
ReqChannel_Error
- transactional
AdminChannel
- non transactional
The .msi setup will create on your desktop MSMQChannel
folder.
Lunch the host processes such as Server
and ClientForm
located in the Sample subfolder:


The Server console program using a screen to show the response from the
remoting object. The ClientForm
is a little bit complicated and it
requires to know more details described in the previous chapters. Basically,
there are two sections:
- Channel Knowledge Base - to update a Knowledge Base of the selected
channel. Note that the tcp port is selected implicitly, that's why can be opened
more
ClientForm
programs. There is a one hidden trick. Double click
on the listbox item will copy its contains into the data textBox, which can be
modified and updated it back to the KB.
- Remote Object - to invoke the method with a specified input argument
and CallContext Ticket on the specified remote object using the url address.
There are two flat buttons:
Echo+Ping
and All
.
Pressing the first button will invoke a method on the Root Object to perform a
distributed transaction of the two remoting calls such as Echo and Ping. The
second one will perform three remoting calls in the transaction manner direct
from the ClientForm
using the SWC feature. Note that the Remote
Object can be thrown exception, slept or stopped by the Message Box based on the
sent text in the message input argument.
Now, let's make the test:
- Press button GetAll
- Check the Knowledge Base in the response listBox
- Press button Echo
- Check the response on the console (see Server console picture)
- Select the text 'throw exception' on the Method comboBox
- Press button Echo
- After 40 seconds [2 (retry) x 20s(retrytimeout)] the
ResponseError
should be appeared on the listBox
The same way use the other buttons, updating Knowledge Base, changing the
CallContext
, selecting different url addresses and so on.
Using the MSMQ Technology in the Remoting infrastructure will give to your
Architecture model a "third dimension" - event driven pattern. In this article,
I described how to incorporate and configure the MSMQ Custom Channel and
encapsulate the business model from its deployment. Behind that, you have a free
MSMQ Custom Channel solution including its source code and installation file. I
hope you will enjoy it.
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.