Click here to Skip to main content
Email Password   helpLost your password?

Contents

Introduction

The event driven architecture model is based on the synchronous pre-processing and asynchronous post-processing design patterns. By using the Remoting infrastructure and MSMQ Technology for implementation of these patterns, it allows to encapsulate a business model from its physical deployment. This article shows the implementation and usage of the MSMQ Channel in the distributed application model such as the transactional calls, post-processing and its configuration. I also recommend to review my previous article Using MSMQ for Custom Remoting Channel for the concept and design issues.

Features

The MSMQ Channel supports the following features:

Configuration

The MSMQ Channel (like any remoting channel) is a stateful object in the appDomain process identified by its unique name, for instance, msmq. Its state can be initiated during the construction time by properties located in the config file. All these properties represent a Knowledge Base (KB) of the channel. Based on their values the channel can perform a change its state. For instance, a retry mechanism needs to know how many times to re-deliver a message to the endpoint during the exception process. Some of the properties are constant and they can not be changed during their lifetime - it's called as a static KB, for instance: Channel Name. On the other hand, the dynamic properties can change a behavior of the channel on the run-time without restarting the process. Coming to the point, the MSMQ Channel has a control mechanism to :

This last feature allows to build a smart distributed model with a transparent connectivity between the remote object and its consumer where its state can be dynamically changed based on the business behavior - it's called as a learning and tuning design pattern.

Note : The MSMQ Channel allows to change its state on the run-time programmatically using the standard remoting design pattern.

The following picture shows a high level design of the MSMQ Channel between the remote object and its consumer:

As you can see, each MSMQ custom channel has own Knowledge Base to hold the channel state. The KB is initiated during the registration process of the remoting channel and published as a remoting object using the channel name as its endpoint. Access to the KB is via the IKnowledgeBaseControl interface contract.

In prior of the MSMQ Channel registration, the MSMQ queues have to be available to send and receive messages. The minimum requirement is to have a destination queue for the message call (request channel). In the above picture is the ReqChannel queue. Based on the application requirements, additional queues such as retry queue and administration queue can be added. The name of the retry queue is based on the request queue name with the postfix _Retry. Note that all queues except the administration one have to be a transactional.

The receiver, based on the Knowledge Base can distributed the target response to the notification remote objects, such as the Notify Remote Object (NRO), Exception Remote Object (ERO) and Acknowledge Remote Object (ARO) using the standard remoting manner and interface contract IRemotingResponse. This feature allows to chain channels, so the response messages can be sent to the additional queues, for instance: ReqChannel_ACK, ReqChannel_ERROR, etc. using the MSMQ Sender Channel features.

The notifyurl property has a special feature for delivering the remoting IMessage to the endpoint. If its value is not empty, the IMessage is passed to the NRO notification object via its argument. This design pattern will allow to evaluate any IMessage received from the queue and make the application specific decision at the application layer such as re-routing, modifying, etc.

Notes :
  • The Knowledge Base can be locked administratively using the updateKB property to prevent its runtime updating. In this case, the KB will hold the properties from the config file.
  • The MSMQChannel Response can be distributed to the notification remote objects for its post-processing such as evaluation of the output arguments, business tracing, rollback, monitoring, tuning the channel's Knowledge Base, etc.

Message workflow

Consuming the remote object using the remoting infrastructure via the MSMQ custom channel is full transparently. However, there is necessary to know its behavior to properly setup the Knowledge Base of the Sender and Receiver channel.

Sender

The remoting message begins at the client side where the remote method is invoked on the transparent proxy. To obtain the transparent proxy of the remote object, the client calling the Activator.GetObject with the specified interface contract and the physical url address of the endpoint. To avoid explicitly (hard-coded) url address, the MSMQ Channel sender has a capability to use a logical url address related to the business connectivity model. For instance: msmq://BusinessEndpoint. This logical address is a constant and it can be hard-coded in the application layer. The MSMQ Channel scans the logical address in the Knowledge Base to find its physical interpretation. For its negative result, the Sender will suppose that this is a physical address. Note that also the hard-coded physical url address can be mapped into the another physical one. This feature allows to configure a properly deployment pattern administratively or programmatically on the fly. When this process is completed, the message is sent to the destination queue. This delivering can be under timeout control - sendtimeout limit property, special when the message has been sent over network or to disconnect able server. The message lifetime (waiting for receiver) can be also under the timeout limit specified by its receivertimeout property. When the time is elapsed, the message is stored in the administration queue (must be specified).

Note : The remote method is using a fire&forget design pattern for any type of the call such as synchronous, asynchronous or OneWay attributed. This means that the call will be not waited for any response from the target object. Its responsibility is to send the IMessage to the transactional queue and return back to the caller with the null IMethodReturnMessage. In the case of the distributed transaction (COM+/DTC) the IMessage is sent to the destination queue when transaction has been committed.

The Sender channel configuration part is shown in the following example:

<channel ref="msmq_sender" name="msmq" priority="1"
   updatekb="true" 
   receivetimeout="20" 
   sendtimeout="10"
   useheaders="false" 
   admin=".\private$\adminchannel" 
   lurl="Queue=.\private$\ReqChannel,
         BusinessEndpoint=.\private$\ReqChannel/endpoint,
         .\private$\ReqChannel/endpoint=.\private$\ReqChannel/endpointABC">
</channel>        

The Knowledge Base of the Sender channel:

NAME DEFAULT VALUE KB NOTE
name "msmq" static Unique name of the channel
priority "1" static Priority of the messaging. Not used in this version.
updatekb "true" static Lock the Knowledge Base to prevent its updating on the runtime
useheaders "false" static Pass the sink transport headers to the receiver.
admin "" dynamic The queue path to the administration non-transactional queue
sendtimeout "60" dynamic The time limit for the message to reach the destination queue. The value is in seconds and -1 represents the InfiniteTimeout
receivetimeout "60" dynamic The time limit for the message to be retrieved from the destination queue. The value is in seconds and -1 represents the InfiniteTimeout
lurl "" dynamic The logical connectivity knowledge to map logical address to the physical url address. The format using name/value pattern with a comma delimiter. For instance:

lurl = "End = .\private$\ReqChannel/endpoint, FarEnd = .\private$\ReqChannel; tcp://localhost:9999/endpoint"

Receiver

The MSMQ Channel Receiver is more sophisticated part of the channel than its Sender side. It runs in the post-processing manner, triggered by the incoming message from the specified message queue - listener property. Its Knowledge Base has more properties to control the message workflow. The message is retrieved from the queue in the transactional manner and dispatched to the target endpoint. The result of the remote call is distributed to the notification object - ARO/ERO. There is a feature for the exception case - retry mechanism. Based on its properties such as retry counter, retrytime and retryfilter, the message can be sent for the specified time to the retry queue for its lately re-delivering to the target object. This process is repeating for a retry times and then the IMessage is sent to the specified exception object as a not deliverable message. To prevent retrying a message for a specified error (always generated it), the Knowledge Base contains a property retryfilter. The receiver scans its KB for the obtained target error and for its positive result will invoke immediately a specified exception object instead of passing it through the retry mechanism. Because the retry mechanism circulates the original message between the listener queue and its retry queue, the message lifetime can be turned off using the usetimeout property otherwise the message is sent to the administration queue when its time limit has been elapsed. Other words, delivering a message to the target object can be under the timeout limit including its retry mechanism. Note that in this case the all timeout properties for Sender and Receiver must be properly calculated.

Note: Retrieving or retrying a queue messages are processing in the transactional manner to prevent their lost during the shutdown process.

Pulling messages from the queue is controlled by the receiver thread-pool controller to allow only a specified number of the receiver workers ran. This maxthreads property can be in the range of 0 to 20 threads, where value zero represents a turn-off listener from its incoming queue. The receiver worker represents a root thread (virtual client) to execute the IMessage. Its lifetime is depended from the target application. When all workers are busy for the notifytime time, the warning message is sent to the EventLog.

The Receiver channel configuration part is shown in the following example:

<channel ref="msmq_receiver" name="msmqRcv" 
   listener=".\private$\ReqChannel" 
   updatekb="true" 
   usetimeout="false"
   maxthreads="10" 
   retry="2" 
   retrytime="20" 
   retryfilter="No receiver registered. No connection could be 
      made because the target machine actively refused it."
   acknowledgeurl="msmq://.\private$\ReqChannel_Ack; 
      tcp://localhost:4444/endpoint" 
   exceptionurl="FormatName:DIRECT=
      http://localhost/msmq/private$/reqchannel_Error/endpoint>
</channel>

The Knowledge Base of the Receiver channel:

NAME DEFAULT VALUE KB NOTE
name "msmq" static Unique name of the channel
priority "1" static Priority of the messaging. Not used in this version.
updatekb "true" static Lock the Knowledge Base to prevent its updating on the runtime
listener "" static The queue path for receiver. If the queue is non-transactional, the retry mechanism is blocked.
retry "0" dynamic The retry counter for re-delivering message to the target object
retrytime "60" dynamic The time limit for the message to wait for its re-delivering. The value is in seconds and -1 represents the InfiniteTimeout
retryfilter "60" dynamic The filter of the exception messages. The positive result will skip the retry mechanism and forward a message to the exception notification object. (see exceptionurl property)
maxthreads "20" dynamic Maximum number of workers in the transactional thread pool. Value zero or negative will turn-off the thread-pool from the listener queue.
usetimeout "false" dynamic Use the sender's receivetimeout limit to complete the re-delivering message to the target object. When the timeout is elapsed, the message is sent to the administration queue.
notifytime "60" dynamic The time limit to notify the EventLog that the thread pool is busy.
notifyurl "" dynamic The Url address of the notify object. When its value is not empty, the IMessage is going to be delivered to its specified endpoint instead of the target object. This feature allows to retrieve the IMessage from other queues using the same receiver mechanism.
acknowledgeurl "" dynamic The Url address of the acknowledge notification object. Use this property to obtain the result of the target remote method. The acknowledge remote object can exanimate the IMethodReturnMessage for all requested values like a real client.
exceptionurl "" dynamic The Url address of the exception notification object. The endpoint object will receive a thrown Exception object and body of the remoting call - IMethodCallMessage stream.

Note that all Url properties can have a format for chaining channels which it allows to forward messages through more than one physical channel. The following example shows this feature:

exceptionurl = "msmq://.\private$\ReqChannel_Error ; 
    tcp://localhost:4444/endpoint"

where the exceptionurl property has been specified to forward an exception to the ReqChannel_Error and then through the standard tcp channel to the endpoint object.

IKnowledgeBaseControl

The Knowledge Base is published by the IKnowledgeBaseControl interface contract and registered for remoting under the name of the channel. The following code snippet show the abstract definition of this contract:

public interface IKnowledgeBaseControl
{
   void Save(string filename); // save it into the config file

   void Load(string filename); // refresh from the config file

   void Store(string strLURL, bool bOverwrite);          
      // store logical addresses name/value, ... 

   void Store(string name, string val, bool bOverwrite); 
      // store single logical address

   void Update(string strLURL); 
     // update logical addresses name/value, ... 

   void Update(string name, string val);                 
      // update single logical address

   void RemoveAll(); // remove all logical addresses

   void Remove(string name); // remove specified logical name

   object GetAll(); // get all logical addresses

   string Get(string name); // get the specified logical name

   bool Exists(string name); // check if we have a specified logical name

   bool CanBeUpdated(); // check the status

   string Mapping(string name);   // mapping logical name by physical, 

    // if it doesn't exist, just copy 

}

Programmatically accessing the KB using the standard remoting design pattern:

 string strObjectUrl = "tcp://localhost:1332/msmq";
    // channel endpoint

 Type objObjectType = typeof(IKnowledgeBaseControl);
    // interface contract

 IKnowledgeBaseControl ro = (IKnowledgeBaseControl)
    Activator.GetObject(objObjectType, strObjectUrl);
 ro.Update("name", "value"); 
    // update Knowledge Base - property 'name'
Note: The Knowledge Base of the MSMQ Channel is stateful and the Load Balanced Cluster mechanism has to be considered for its updating. The best solution is to create a local KB controller for each node separately, driven by the usage of the local resources such and MSMQ private queues.

IRemotingResponse

The IRemotingResponse is an interface contract between the notification object and channel. Based on the channel state, the receiver can dispatch a response to the specified notification object to perform a specific task such as recovering, re-routing, updating a business state, tracing, logging, tuning the channel's KB and etc. See the Test Sample solution - the RemotingObject.cs file for its implementation. The following code snippet described its abstract definition:

#region IRemotingResponse
[Serializable]
public class MsgSourceInfo 
{
   Acknowledgment m_Acknowledgment;
   DateTime       m_MessageSentTime;
   string         m_ResponseQueuePath;
   string         m_ReportQueuePath;

   public Acknowledgment Acknowledgment 
    { get { return m_Acknowledgment;} 
      set { m_Acknowledgment = value; }}
   public DateTime MessageSentTime 
    { get { return m_MessageSentTime;} 
      set { m_MessageSentTime = value; }}
   public string ResponseQueuePath 
    { get { return m_ResponseQueuePath;} 
      set { m_ResponseQueuePath = value; }}
   public string ReportQueuePath   { 
    get { return m_ReportQueuePath;} 
      set { m_ReportQueuePath = value; }}
}
public interface IRemotingResponse
{
   void ResponseNotify(MsgSourceInfo src, object msg);   
    // remoting notification

   void ResponseAck(object msg);                         
    // remoting response notification

   void ResponseError(object msg, Exception ex);         
    // remoting exception notification

}
#endregion

CallContext Ticket

The CallContext - Ticket object allows to pass the application specific info through the remoting infrastructure. It's an option feature of the MSMQ Channel to hide all implementation logic and simplified its usage. Any point of the remoting hierarchy including the both endpoints in the application layer such as client and remoting object can modify the Ticket. The Ticket is using the String type name/value design pattern.

#region CallContext Ticket
[Serializable]
public class LogicalTicket : NameValueCollection, 
  ILogicalThreadAffinative
{
   public LogicalTicket();                        
    // retrieve the Ticket from the current CallContext

   public LogicalTicket(params string[] args);    
    // insert name/value property into this Ticket

   public LogicalTicket(IMessage imsg);           
    // retrieve the Ticket from the IMessage object

   protected LogicalTicket(SerializationInfo info, 
    StreamingContext context); // need it for the ser/des remoting issue

   public void Set(); // set this Ticket into the current CallContext

   public void Reset(); // clear the Ticket in the current CallContext

)
#endregion

Usage of the Ticket is very generic and it can be very useful, for instance, to handle a compensation issue during the distributed transaction for the disconnected queue .

Installation

The MSMQ Channel (Sender and Receiver) requires to install the following assemblies into the GAC:

You can install them manually or using the .msi file included in this article. Note that before using the MSMQ Channel is necessary to have all MSMQ resources specified by the channel's KB registered and ready to send/receive messages. This task is not automated and it has to be completed manually.

The MSMQ Server/Receiver Channels can be declared globally in the machine.config file or in the host process config file. The following example shows their declaration in the channels tag:

<channels>
 <channel id="msmq_sender" type="RKiss.MSMQChannel.Sender, MSMQChannel, 
    Version=2.0.0.0, Culture=neutral, PublicKeyToken=7cfeed5d7aef9679" />
 <channel id="msmq_receiver" type="RKiss.MSMQChannel.Receiver, MSMQChannel, 
    Version=2.0.0.0, Culture=neutral, PublicKeyToken=7cfeed5d7aef9679" />
</channels>
Note: Use the .msi file for installation of the the MSMQ Channel.

Usage

In the previous chapters have been described the MSMQ Channel Sender and Receiver from the installation and configuration point of the view. Now, I will show you a simple and advanced usage of the MSMQ Channel in the event driven architecture. Before the first example here are some useful comments:

The following examples show patterns:

The host process config files are part of the deployment pattern. Each end of the connectivity is hosted by own AppDomain (process) and they are connected each other using the Sender and Receiver mechanism. The host process also can hosted both ends such as Receiver and Sender. The following config snippet show example of the config file:

<configuration>
  <configSections></configSections>
  <appSettings>
   add key="DataSource" value="connection string to your database"
  </appSettings>
  <system.runtime.remoting>
    <application>
      <service>
        <wellknown mode="SingleCall" type="RemoteObject, 
             RemoteObject" objectUri="endpoint" />
      </service>
      <channels>
      <channel ref="msmq_sender" name="msmq" 
          receivetimeout="20"          
          sendtimeout="10"
          admin=".\private$\adminchannel" 
          lurl="EventABC=.\private$\ReqChannel/endpoint,
                RootObjectABC=FormatName:DIRECT=
                http://localhost/msmq/private$/reqchannel,
                BusinessLogicABC=.\private$\ReqChannel; 
                 tcp://localhost:4444/endpoint"
      </channel>
      <channel ref="msmq_sender" name="msmqEvent" 
          receivetimeout="20"          
          sendtimeout="10"
          admin=".\private$\adminchannel" 
          lurl="EventABC=.\private$\EventQueue/endpoint"            
      </channel>
      <channel ref="msmq_receiver" name="msmqRcv" 
          listener=".\private$\ReqChannel" 
          usetimeout="false"
          retry="2" 
          retrytime="20" 
          retryfilter="Requested Service not found. 
                       No connection could be made because 
                       the target machine actively refused it."
          acknowledgeurl="tcp://localhost:4444/endpoint"
          exceptionurl="tcp://localhost:4444/endpoint">
      </channel>
      <channel ref="tcp" port="4444" />
    </channels>
  </application>
  <channelSinkProviders></channelSinkProviders>
  <channels>
    <channel id="msmq_sender" type="RKiss.MSMQChannel.Sender, 
             MSMQChannel, Version=2.0.0.0, Culture=neutral, 
                  PublicKeyToken=7cfeed5d7aef9679" />
    <channel id="msmq_receiver" 
             type="RKiss.MSMQChannel.Receiver, MSMQChannel, 
             Version=2.0.0.0, Culture=neutral, 
             PublicKeyToken=7cfeed5d7aef9679" />
  </channels>
  </system.runtime.remoting>
</configuration>

Example 1. - Generating Event

This is a simple example how to yield the synchronous call and make its post-processing.

The Client calls the remoting method in the fire&forget manner. This process is synchronously and transactional with a high performance profile. The client guarantees that any time the request (inquire) call will be performed and accepted without wary about the target connectivity. The request call is a stateful message and it can be post-processed with the timeout limit. Based on the application, the remote object also can call itself (recursively call) without wary about the stack overflow. As I mentioned early, the message is a stateful object and it can be modified during its traveling. Based on this state, the process can be finished and generated the event notification using the above described scenario. That's the typical behavior of the "push model"

In the case of the distributed multiple events, the Client can use a new .Net 1.1 feature - SWC (Service Without Components). I like this feature and here is its implementation from the Test Sample code (Form1.cs):

private void buttonAll_Click(object sender, System.EventArgs e)
{
   LogicalTicket ticket = null;

   try 
   {
      // CallContext

      string[] info = comboBoxTicket.Text.Split(',');
      ticket = new LogicalTicket(info);
      ticket.Set();

      // serviceconfig (.Net 1.1 feature only)

      ServiceConfig sc = new ServiceConfig();
      sc.Transaction = TransactionOption.RequiresNew;
      sc.TransactionTimeout = 20;
      sc.TrackingAppName = "MSMQChannel Test";
      sc.TransactionDescription = "buttonAll_Click"; 
      sc.TrackingEnabled = true;

      // create a transactional context

      ServiceDomain.Enter(sc);

      // all methods

      DateTime dt;
      long before = DateTime.Now.Ticks;
      string strObjectUrl = comboBoxUrl.Text;
      Type objObjectType = typeof(IPing);
      IPing ro = (IPing)Activator.GetObject(objObjectType, 
          strObjectUrl);
      ro.OneWay(string.Format("#{0}: {1}", m_counter, 
          "First message"));
      ro.Echo(string.Format("#{0}: {1}", m_counter, 
          "Second message"), out dt);
      ro.Ping(string.Format("#{0}: {1}", m_counter, 
         "Third message"));
      long after = DateTime.Now.Ticks;

      // for test purpose only

      if(comboBoxMsg.Text.IndexOf("msgbox") >= 0)
      {
         if(MessageBox.Show("Are you sure to abort it?", "Sample", 
             MessageBoxButtons.YesNo)==DialogResult.Yes)
            throw new Exception("This call has been aborted");
      }
      if(comboBoxMsg.Text.IndexOf("throw") >= 0)
         throw new Exception("This is an exception test");

      // commit transaction

      ContextUtil.SetComplete();

      // echo

      ListBoxAdd(string.Format("#{0} done, time={1}[ms]",
           m_counter++, (after-before)/10000)); 
   }
   catch(Exception ex) 
   {
      // abort transaction

      ContextUtil.SetAbort();

      Trace.WriteLine(ex.Message);
      ListBoxAddXml(ex.Message); 
   }
   finally
   {
      // clean-up 

      ServiceDomain.Leave();

      // clean-up

      if(ticket != null) 
         ticket.Reset();
   } 
}        

The above bolded code is related to SWC feature. Notice that the code snippet is a part of the Windows Form without the derived from the ServicedComponent class. Its concept is very straightforward, based on creating the transaction context using the ServiceConfig class and passing its reference to the current appDomain. The rest is the same like in the Enterprise Services. You can test this feature pressing the button All in the ClientForm program - see Test.

For the large and more complex application model, the business control state is persisted in the database. The following example shows how can be worked two different resources such as database and MSMQ in the transactional manner:

Example 2. - Updating Business State

The MSMQ Channel supports the transactional call using the DTC transaction mechanism. The transactional Root Object can work with the managed transactional recourses such as Database and MSMQ in the atomic manner. It guarantees that all resources must be committed or roll backed all of them. In the above example, it's not important which resource is going to be handled first (that's the first phase of the commit), because when the root object is out of scope all resources are going to commit respectively make a rollback if less one of them aborted. After this we can expected a message in the MSMQ Channel. This is a perfect feature of the MSMQ Channel channel to join the transactional party in the root object.

The above solution is suitable for a corporate network. What about if the business state located over Internet.

Example 3. - Updating Business State over Internet

The current technology doesn't support a Distributed Transaction Coordinator (DTC) over Internet. The Internet is based on the loosely connectivity and it's not good idea to lock the resource for the long time. There is a some "pseudo" solution to help with this issue. The Transaction needs to be divided into the synch processing and asynch post-processing transaction and its compensator. The compensator will have a responsibility to make a "rollback resources". As the above picture shows, there are two root objects for handling the resources located behind the firewall. Using the MSMQ Channel the request call can be post-processed in the target root object. Basically there are two scenarios between the root objects:

Once again, the properly solution can be selected and configured in the deployment pattern using the config files.

Example 4. - "Parallel processing"

This example shows a technique how to create a "parallel business processor" using the MSMQ Channel features. Let me assume we have an application to migrate the thousands pages of the thousands records to another database schema. The task can be accomplished using the legacy synchronous design pattern, where records are processing sequentially one by one in the main loop. Everything is fine except that the processing time is too long. The event driven architecture supported by the MSMQ Channel channel can accomplish this task faster, better manageable, trace able and the components are re-usable. Here is its solution, which it requires to divided this process into the following parts:

When you look at again the above picture, you will see how the "push model" is forwarding a request message from the left-to-right side. The implementation of the above pattern is straightforward and simple, much better test able than the first one - legacy sequentially solution.

To change deployment for the root objects is very simple using the chaining channels as it is shown in the following:

before: BusinessLogicABC=.\private$\ReqChannel/endpoint"

after: BusinessLogicABC=.\private$\ReqChannel ; tcp://localhost:4444/endpoint"

where the root objects after that are hosted in the separate process for instance: Windows Service.

Smart Deployment Tool - idea

As I mentioned early, the business model encapsulated services into the deployment pattern design. This step is implemented by the remoting infrastructure and its custom channel such as MSMQ Channel. It will be nice have a visual tool to administrate the application deployment pattern statically (creating the config files) and dynamically on the fly (see MSMQChannel Knowledge Base). The following picture shows this idea:

The concept of this idea is based on the remoting infrastructure. Based on the physical environment and deployment pattern the Tool will generate a host processes, config files, etc. During the runtime, the tool can subscribe its subscription to the custom sink publisher - Remoting Probe to obtain the model behavior for its tuning process (config files, Knowledge Base, etc.)

Implementation

Implementation of the Sender and Receiver of the MSMQ Custom Channel is straightforward using the standard remoting boilerplate. The design pattern of the remoting boilerplate is very well documented and also published in the advanced books. Additionally, you can find more details in my previous article [1]. Here, I am going to describe some interesting part related with the MSMQ, only.

Sender

The core of the MSMQChannel.Sender is implemented in the AsyncProcessRequest method. Based on the Knowledge Base (properties) the message call is initialized and sent to the destination queue using the specified transaction option. The following code snippet shows that:

public void AsyncProcessRequest(IClientChannelSinkStack sinkStack, 
        IMessage msgReq, ITransportHeaders headers, Stream stream) 
{
   // scope state

   string strQueuePath = null;
   string strObjectUri = null;
   Message outMsg = null;
   MessageQueueTransaction mqtx = new MessageQueueTransaction();

   try 
   {
      #region pre-processor (mapping url address)
      // split into the queuepath and endpoint

      strObjectUri = ParseLogicalUrl(m_LogicalUri, out strQueuePath);

      // update Uri property

      msgReq.Properties[MSMQChannelProperties.ObjectUri] = strObjectUri;

      // pass TransportHeaders to the receiver

      if(m_Sender.AllowHeaders == true) 
      {
         headers["__RequestUri"] = strObjectUri;
         msgReq.Properties["__RequestHeaders"] = headers;
      }
      #endregion

      #region send a remoting message 
      // set the destination queue

      m_OutQueue.Path = strQueuePath;

      // create a message

      outMsg = new Message(msgReq, new BinaryMessageFormatter()); 

      // option: timeout to pick-up a message (receive message)

      int intTimeToBeReceived = m_Sender.TimeToBeReceived;
      if(intTimeToBeReceived > 0)
         outMsg.TimeToBeReceived = 
            TimeSpan.FromSeconds(intTimeToBeReceived); 

      // option: timeout to reach destination queue (send message) 

      int intTimeToReachQueue = m_Sender.TimeToReachQueue;
      if(intTimeToReachQueue > 0)
         outMsg.TimeToReachQueue = 
            TimeSpan.FromSeconds(intTimeToReachQueue); 

      // option: notify a negative receive on the client/server side

      if(m_Sender.AdminQueuePath != MSMQChannelDefaults.EmptyStr) 
      {
         // acknowledge type (mandatory)

         outMsg.AcknowledgeType = AcknowledgeTypes.NegativeReceive | 
               AcknowledgeTypes.NotAcknowledgeReachQueue;

         // admin queue for a time-expired messages

         outMsg.AdministrationQueue = m_Sender.AdminQueue; 
      }

     // message label

     string label = string.Format("{0}/{1}, url={2}", 
        Convert.ToString(msgReq.Properties["__TypeName"]).Split(',')[0], 
        Convert.ToString(msgReq.Properties["__MethodName"]), strObjectUri);

     // Send message based on the transaction context

     if(ContextUtil.IsInTransaction == true) 
     {
        // we are already in the transaction - 

        // automatic (DTC) transactional message

        m_OutQueue.Send(outMsg, label, 
             MessageQueueTransactionType.Automatic); 
     }
     else
     {
        // this is a single transactional message 

        mqtx.Begin();
        m_OutQueue.Send(outMsg, label, mqtx); 
        mqtx.Commit();
     }
     #endregion
   }
   catch(Exception ex) 
   {
      string strError = 
        string.Format("[{0}]MSMQClientTransportSink.AsyncProcessRequest 
        error = {1}, queuepath={2},", 
        m_Sender.ChannelName, ex.Message, strQueuePath);

      m_Sender.WriteLogMsg(strError, EventLogEntryType.Error); 
      throw new Exception(strError);
   }
   finally
   {
      #region clean-up
      if(mqtx.Status == MessageQueueTransactionStatus.Pending) 
      { 
         mqtx.Abort();
      }

      if(outMsg != null)
         outMsg.Dispose();
      #endregion
   }
}        

The above method is called by the ProcessMessage in the MSMQChannel Transport Sink. Here is one interesting thing. As I mentioned early, the Sender supports only a remoting call using the Fire&Forget design pattern for any kind of the call. Its return value (IMethodReturnMessage) is always null, see the following code snippet:

public void ProcessMessage(IMessage msgReq, 
   ITransportHeaders requestHeaders, Stream requestStream, 
    out ITransportHeaders responseHeaders, out Stream responseStream)
{ 
   IMessage msgRsp = null;

   try 
   {
      #region send a remoting message 
      AsyncProcessRequest(null, msgReq, requestHeaders, requestStream);
      #endregion

      #region this is a Fire&Forget call, so we have 
               to simulate a null return message
      object retVal = null;

      // generating a null return message

      IMethodCallMessage mcm = msgReq as IMethodCallMessage;
      MethodInfo mi = mcm.MethodBase as MethodInfo;
      if(mi.ReturnType != Type.GetType("System.Void"))
      retVal = mi.ReturnType.IsValueType ? 
              Convert.ChangeType(0, mi.ReturnType) : null; 

      // return message

      msgRsp = (IMessage)new ReturnMessage(retVal, null, 0, null, mcm);
      #endregion
   }
   catch(Exception ex) 
   {
      msgRsp = new ReturnMessage(ex, (IMethodCallMessage)msgReq);
   }
   finally
   {
      #region serialize IMessage response to return back
      // Note that the BinaryFormatter is a mandatory 

      // formatter for MSMQChannel!

      BinaryFormatter bf = new BinaryFormatter();
      MemoryStream rspstream = new MemoryStream();
      bf.Serialize(rspstream, msgRsp);
      rspstream.Position = 0;

      // returns

      responseStream = rspstream;
      responseHeaders = requestHeaders;
      #endregion
   }
}    

That's all for the Sender side.

Receiver

The other side, Receiver is more complex with the event driven design pattern . Its core implementation is based on the following methods:

#region RetryManager
private void RetryManager(object source, 
  PeekCompletedEventArgs asyncResult)
{
   // the message is going to move using the transactional manner

   MessageQueueTransaction mqtx = null;

   // Connect to the queue.

   MessageQueue mq = (MessageQueue)source;

   try 
   { 
      // End the asynchronous peek operation.

      Message msg = mq.EndPeek(asyncResult.AsyncResult);

      // check the message lifetime

      DateTime elapsedTime = msg.ArrivedTime.AddSeconds(RetryTime);

      if(DateTime.Compare(DateTime.Now, elapsedTime) >= 0) 
      {
         // it's the time to move it

         // create destination queue

         MessageQueue InQueue = new MessageQueue(ListenerPath);
         InQueue.Formatter = new BinaryMessageFormatter(); 
         mqtx = new MessageQueueTransaction();

         // move it

         mqtx.Begin(); 
         Message msgToMove = mq.Receive(new TimeSpan(0,0,0,1));
         InQueue.Send(msgToMove, mqtx);
         mqtx.Commit();
     }
     else 
     {
        // it's the time to sleep

        TimeSpan deltaTime = elapsedTime.Subtract(DateTime.Now);

        // the sleep time

        int sleeptime = Convert.ToInt32(deltaTime.TotalMilliseconds);

        // wait for the timeout or signal such as change retrytime

       // value or shutdown process

        m_EventRetryMgr.WaitOne(sleeptime, true);
     } 
   }
   catch(Exception ex) 
   {
      if(mqtx != null && mqtx.Status == 
         MessageQueueTransactionStatus.Pending) 
         mqtx.Abort();

      string strErr = string.Format(
       "[{0}]MSMQChannel.Receiver:RetryMamager failed, error = {1}",
       ChannelName, ex.Message);
      WriteLogMsg(strErr, EventLogEntryType.Error);
   }
   finally 
   {
      // Restart the asynchronous peek operation

      if(m_RetryListening == true)
         mq.BeginPeek(); 
      else
       Trace.WriteLine(string.Format(
         "[{0}]MSMQChannel.Receiver.RetryManager has been disconnected",
        ChannelName)); 
   }
}
#endregion
        

KnowledgeBase

The Knowledge Base in the custom channel is implemented as a remoting object with a hash table to hold all dynamic properties. The object is published under the channel name and its lifetime has been initialized for infinite time. The channel inherit this object for its fast access. The public access is available through the IKnowledgeBaseControl interface contract. The following code snippet shows the method to update the Knowledge Base:

public virtual void Update(string name, string val)
{
   // validation

   if(AllowToUpdate == false)
      throw new Exception("The Knowledge Base is locked");
   if(KB.Contains(name) == false)
      throw new Exception(string.Format(
          "The logical name '{0}' doesn't exist", name));

   // fire event

   if(OnBeforeChangeKnowledgeBase(name, val) == false) 
      return;

   // update value in the KB

   KB[name] = val;

   // fire event

   OnChangedKnowledgeBase(name, val);
}                

As you can see, its implementation is very simple and straightforward. Its public methods has a virtual signature, which it allows to override it in the derived class

Test

I built a separate solution with the following projects to test a MSMQ Custom Channel:

Before starting the Test be sure that MSMQChannel and Sample have been installed - see the Installation process. Also I would like to remind you that the following MSMQ private queues have to be available:

The .msi setup will create on your desktop MSMQChannel folder. Lunch the host processes such as Server and ClientForm located in the Sample subfolder:

The Server console program using a screen to show the response from the remoting object. The ClientForm is a little bit complicated and it requires to know more details described in the previous chapters. Basically, there are two sections:

Now, let's make the test:

  1. Press button GetAll
  2. Check the Knowledge Base in the response listBox
  3. Press button Echo
  4. Check the response on the console (see Server console picture)
  5. Select the text 'throw exception' on the Method comboBox
  6. Press button Echo
  7. After 40 seconds [2 (retry) x 20s(retrytimeout)] the ResponseError should be appeared on the listBox

The same way use the other buttons, updating Knowledge Base, changing the CallContext, selecting different url addresses and so on.

Conclusion

Using the MSMQ Technology in the Remoting infrastructure will give to your Architecture model a "third dimension" - event driven pattern. In this article, I described how to incorporate and configure the MSMQ Custom Channel and encapsulate the business model from its deployment. Behind that, you have a free MSMQ Custom Channel solution including its source code and installation file. I hope you will enjoy it.

You must Sign In to use this message board.
 
 
Per page   
 FirstPrevNext
GeneralTnaX Giving
Alireza Soleimani
21:23 1 Aug '08  
This is useful to me, Thank you Mr.Roman Kiss
GeneralResponse to the client
l_e_o
19:05 19 Aug '05  
How can the client get a response (out parameters, function results) after calling a server method, using the MSMQ Channel?
Thanks everyone.
Generalhow to debug?
afantio
23:02 25 Jul '05  
Confused how can i trace into MSMQChannel(Listener.cs,Sender.cs.... etc)
GeneralRe: how to debug?
Roman Kiss
12:17 28 Jul '05  
The following things you should keep in mind for MSMQChannel debuging:
1. where is hosted a channel's sender
2. where is hosted a channel's receiver

The simple (one process) way is if the channel (1 + 2) is running in the same process. Otherwise (two processes), you have to open two debuggers separetly for each parts as the channel such as sender and receiver.



hth
Roman
GeneralQueue not registered in DS
bob10101
4:52 14 Jul '05  
Hi I'm new with MSMQ and I'm trying to get this application to run.
This is the exception that I'm thrown.

Remoting configuration failed with the exception System.Reflection.TargetInvocationException: Exception has been thrown by the target of an invocation. ---> System.Messaging.MessageQueueException: Queue is not registered in the DS.
at System.Messaging.MessageQueue.ResolveFormatNameFromQueuePath(String queuePath, Boolean throwException)
at System.Messaging.MessageQueue.get_FormatName()
at System.Messaging.MessageQueue.add_PeekCompleted(PeekCompletedEventHandler value)
at RKiss.MSMQChannel.Listener.Init()
at RKiss.MSMQChannel.Listener.Start()
at RKiss.MSMQChannel.Receiver.StartListening(Object data)
at RKiss.MSMQChannel.Receiver..ctor(IDictionary properties, IServerChannelSinkProvider serverSinkProvider)
--- End of inner exception stack trace ---
at System.Reflection.RuntimeConstructorInfo.InternalInvoke(BindingFlags invokeAttr, Binder binder, Object[] parameters, CultureInfo culture, Boolean isBinderDefault)
at System.Reflection.RuntimeConstructorInfo.Invoke(BindingFlags invokeAttr, Binder binder, Object[] parameters, CultureInfo culture)
at System.RuntimeType.CreateInstanceImpl(BindingFlags bindingAttr, Binder binder, Object[] args, CultureInfo culture, Object[] activationAttributes)
at System.Activator.CreateInstance(Type type, BindingFlags bindingAttr, Binder binder, Object[] args, CultureInfo culture, Object[] activationAttributes)
at System.Runtime.Remoting.RemotingConfigHandler.CreateChannelFromConfigEntry(ChannelEntry entry)
at System.Runtime.Remoting.RemotingConfigHandler.ConfigureChannels(RemotingXmlConfigFileData configData)
at System.Runtime.Remoting.RemotingConfigHandler.ConfigureRemoting(RemotingXmlConfigFileData configData).

I used the msi to install the application and found that it did install both the MSMQChannel and the ICustomChannel in the GAC. I also added :

channel id="msmq_sender" type="RKiss.MSMQChannel.Sender, MSMQChannel,
Version=2.0.0.0, Culture=neutral, PublicKeyToken=7cfeed5d7aef9679"
channel id="msmq_receiver" type="RKiss.MSMQChannel.Receiver, MSMQChannel,
Version=2.0.0.0, Culture=neutral, PublicKeyToken=7cfeed5d7aef9679"

to my machine.config and even tried adding "reqchannel" to my Private Queues manually.
I am on a domain and my Windows Components - Message Queuing - Active Directory Integration is installed.

Any suggestions?


GeneralTracking SingleCall Object Creation
0siris
20:11 7 Sep '03  
Roman:

Would you know how I can track SingleCall Object Creation on the server side?

I am trying to generate statistics for my server which will keep a log of how many objects were created, etc.
GeneralRe: Tracking SingleCall Object Creation
Anonymous
19:16 16 Jan '05  
fsafdsafsafasf
GeneralQeued Component over HTTP ?
Adrian Bacaianu
5:10 4 Aug '03  
Using direct access, with pathname like that:
Set obj = GetObject("queue:FormatName=DIRECT=OS:ServerName\PRIVATE$\articleqcom/new:qc.sm")
, the methods of QC component are called correct.

But what about a call using HTTP transport, a new enhancement of MSMQ 3.0 ?(windows xp and 2003)

Just changing the FormatPath with something like this :
Set obj = GetObject("queue:FormatName=DIRECT=HTTP://ServerName/msmq\PRIVATE$\articleqcom/new:QC.sm")
will give an error:
"Automation error.Invalid Syntax."

What is the problem ?

(Note, sending a simple message qeue using the HTTP is working very good)

Adrian Bacaianu
GeneralRe: Callbacks and Firewalls...
Roman Kiss
12:39 28 Jun '03  
Matthew,

The Remoting over MSMQ is not a synchronous process (like the tcp/http channels) where the callback and event are generated within the remoting call. The MSMQ Channel design pattern is the following:

- The MSMQ Channel splits the call into the synch process (Sender side) and async (Receiver side) post-process call. The MSMQ 3.0 has a feature to send a message over the Internet using the DIRECT format name using the http/https protocol (the destination queue can be located behind the firewall).

- The Receiver's worker thread represents the "virtual client" to handle the post-processing call such as forwarding the remoting message to the requested endpoint. The remote object can send a callback message over MSMQ channel using the same mechanism. The event such as the Acknowledge or Exception can be generated from the Receiver administratively using the notification url address, for instance: msmq://FormatName:DIRECT=http://machineABC/msmq/private$/EventQueue/endpointEvent

- The Callback/Event call from the remote object driven by MSMQ Channel is using the "push model" pattern - see my Examples.

HTH

Roman
GeneralRead this !!!
fkocak
1:29 23 Jun '03  
hi Roman,

In my mother language, turkish, your name means "novel". Smile
Yes your articles and your knowledge exactly match your name Smile
you are excellently writing articles and share them with us. You are like a "Novel", Roman Smile

i can honestly say that there may be only one question that you can ask us, which is "how can i solve this .... problem ?" LOL

Keep going , please.

Doing something is better than doing nothing. So ... Move !
GeneralRoman Kiss RULEZZZZZZZZ Forever!!!
Kocha
21:52 22 Jun '03  
Hi, Roman!
Thanks for the personal newsletter!

Your speed of writing articles and content is amazing!!
I think you could write article about this!
It would be very usefull!



Kocha


Last Updated 21 Jun 2003 | Advertise | Privacy | Terms of Use | Copyright © CodeProject, 1999-2010