Click here to Skip to main content
Click here to Skip to main content

SoapMSMQ Transport

, 14 Jul 2004
Rate this:
Please Sign up or sign in to vote.
This article describes a design, implementation and usage of the WSE Custom Transport over MSMQ.

Contents

Introduction

Web Services Enhancements for Microsoft .NET version 2.0 (WSE 2.0) includes a new feature - SOAP messaging. This feature enables implementation of the message driven (oriented) architecture based on the WS-* specification and minimizes its migration into the next connectivity model - Indigo. The WSE 2.0 introduces a lightweight messaging infrastructure allowing to send and receive the SoapEnvelope between the endpoint and its consumer using built-in transports such as TCP, HTTP and inproc. The business to business message exchanging flow is based on the context of the SOAP Headers where is defined its delivering and behavior such as security, policy, endpoints, etc.

Qualitative change of the SoapEnvelope infrastructure workflow enables to use in addition to the HTTP transport (Web Server) the transport hosted by Windows service, console, dllhost, etc. The SoapEnvelope can be sent/received by the custom transport and then dispatched to/from the WSE- Messaging infrastructure in a fully transparent manner.

The SOAP Messaging design pattern encapsulates the business layer from the connectivity model using the loosely coupled plug-in transport mechanism in the host process CONFIG file. Practically, the SoapEnvelope can be ended (or pull-up) at variable resources such as MSMQ, SQL, etc.

This article describes implementation and usage of the WSE Messaging Custom Transport over MSMQ resource for transporting the SoapEnvelope. Plug-in the soap.msmq transport into the connectivity infrastructure and your application model can handle a business workflow in a transactional, asynchronous and disconnect able manner.

Let's start it by describing the WSE 2.0 messaging boilerplate from the custom transport point of the view.

Concept and Design

The WSE 2.0 messaging infrastructure can be divided into the three levels like it is shown in the following picture:

This simple infrastructure is based on flowing the SoapEnvelope messages between the Transport and Business layer. Each level up of the infrastructure encapsulates a message processing into the generic model connectivity depending only on the Formatter and Transport components. This abstraction makes the WSE Messaging fully transparent between the RemoteEndpoint and its consumer and suitable for the Logical Model Connectivity.

Using the above connectivity model, the SoapEnvelope message can be transferred (or routed) to/from generic resources such as TCP socket, shared memory, MSMQ, SQL database, USB, Serial Port, etc..

Level 0

The Level 0 encapsulates the message workflow to/from physical transport using the following interface contracts:

  • ISoapFormatter to serialize or deserialize the SoapEnvelope object into the stream.
  • ISoapTransport to manage transferring the SoapEnvelope stream in the specific UriScheme such as TCP, HTTP, inproc and custom (e.g. MSMQ).
  • ISoapChannel to manage a base function of the channel such as Close and Capabilities.
  • ISoapInputChannel to control receiving a SoapEnvelope in the asynch and synch manner for a specific LocalEndpoint.
  • ISoapOutputChannel to control sending a SoapEnvelope in the asynch and synch manner for a specific RemoteEndpoint.

The generic message workflow in this level is implemented in the Microsoft.Web.Services2.Messaging namespace base classes using the above interface contracts. This level represents the "glue" pattern and encapsulation of the message handling between the transport and channel.

  • SoapDimeFormatter - this base class has the responsibility for serializing and deserializing of the SoapEnvelope into the Dime (binary) records. The SoapEnvelope body is located in the first dime record and the Attachment(s) occupied next record(s). Note that this formatter is a default formatter for TCP, inproc and soap.msmq transports. The formatter can be plugged-into the Transport programmatically or administratively using the host process CONFIG file.
  • SoapPlainFormatter - serializing and deserializing of the SoapEnvelope object into the XML text formatted string. This is a default formatter for HTTP transport.
  • SoapTransport - this is a major base class to dispatch a SOAP message between the specific channel and its transport in a generic manner. The outgoing message workflow is very simple and straightforward. The SoapTransport class has a public method to get the specific SoapOutputChannel registered in the collection based on the EndpointReference address. Once the sender obtains the specific channel, the message can flow through its registered transport (TCP, HTTP, etc.) to the RemoteEndpoint address. On the other hand, the incoming stream from the resource (socket, MSMQ, etc.) is deserialized by the SoapFormatter into the SoapEnvelope object and then dispatched to the specific SoapInputChannel. The dispatcher has the responsibility to enqueue the SoapEnvelope message reference into the in-memory queue located in the specific channel. In case of failure, the dispatcher will raise the DispatchFailed error for delegating a fault message to the application handler.
  • SoapInputChannel- the default purpose of this base class is to yield a message workflow between the transport and receiver using the in-memory queue (System.Collection.Queue object). The virtual Enqueue method (called by transport) is pushing the message reference into the queue and setting the synchronization object to wakeup its receiver. The message can be pulled from the queue synchronously or asynchronously using the Receive, BeginReceive and EndReceive virtual methods. Note that each channel is unique, identified by the LocalEndpoint reference property for its addressing purposes by transport and receiver.

    By default, the incoming messages are destined to the channel's private in-memory queue. To process a message workflow synchronously to the receiver handler, it is necessary to override a SoapInputChannel.Enqueue method - for instance like is done in the soap.msmq transport.

  • SoapOutputChannel- represents the base class of the sending message workflow to the registered transport based on the UriScheme and RemoteEndpoint address reference. From the client side, the workflow can be performed synchronously or asynchronously.

The following flowchart shows a dispatch algorithm of a SoapEnvelope to the proper SoapInputChannel:

The core of the DispatchMessage algorithm is to find the proper input channel based on the message addressing. It's based on the Address.To or AnyRoles value. The following flowchart shows that:

The LocalEndpoint.Matches(message) block is the final condition to pick-up the correct Input Channel. This block will return a true if the following rules are passed:

if (LocalPoint.Address.Value != WSAddressing.AnyRoles && 
    (LocalPoint.Address.Value != null
    && LocalPoint.Address.Value != message.Context.Addressing.To.Value)) 
    
    return False

if (LocalPoint.Via != null && 
          Channel.LocalPoint.Via != message.Context.Addressing.Via) 
    return False

if (Channel.Properties != null && 
     Channel.Properties != message.Properties) return False
return True

As you can see in the above DispatchMessage flowchart, the message flows through many addressing conditions. There is no detail exception info about why the Dispatch failed, so it is very important to understand a WS-Addressing spec.

Level 1

The Level 1 represents a one way communication layer to send or receive a SoapEnvelope from/to the business layer in either synch or asynch manner. This message processing is handled by SoapSender resp. SoapReceiver class is derived from the SoapPort base class. The SoapPort class is a logical bi-directional communication port to hold the pipeline of filters to manage a behavior of the message delivering based on the SOAP Headers. The pipeline can be customized by custom Output/InputFilters behind the DefaultFilters such as Security, Referral and Policy Filters.

Creating an instance of the SoapSender for the specific destination EndpointRefererence address, the sender is now ready to send a SoapEnvelope object synchronously using the Send method or asynchronously via a BeginSend/EndSend design pattern.

The following figure shows the workflow of the SoapEnvelope sent by the business layer to the RemoteEndpoint address through the soap.msmq transport using the Send method:

The message sending workflow is simple and straightforward, after validation of the SoapEnvelope.Context.Addressing and populating its destination endpoint and channel, the SoapEnvelope is sent to the pipeline for filtering the SOAP Headers (FilterMessage). The next step is just invoking the Send method on the registered output channel. The rest of the task is dependent on the transport type and its formatter. Note that the soap.msmq transport always sends the message to the RemoteEndpoint address in the Fire&Forget fashion (OneWay) without waiting for the response and return value. The sending process via all levels is very fast, because the call always ends in the local queue such as a private or outgoing queue of the MSMQ.

Level 2

The Level 2 is fully isolated from the message transport. It represents the highest level of the WSE 2.0 messaging infrastructure to enable a full duplex communication model using either the synch or asynch features. The application layer needs to create a business object derived from the SoapClient and SoapService classes to handle sending and receiving the SoapEnvelope messages respectively. Overriding the base virtual method(s), the messages can flow through the application specific handler to the destination service.

The following code snippet shows a simple example of the client business object to invoke a service method without handling the SoapEnvelope:

public class MyClient : SoapClient
{
    public MyClient(EndpointReference destination) : base(destination){}

    [SoapMethod("Bar")]
    public void Bar(string text)
    {
        base.SendOneWay("Bar", text);
    }
}

and here is the server side:

public class MyService : SoapService
{
    [SoapMethod("Bar")]
    public void Bar(string text)
    {
        // do some business

    }
}
SoapMsmqTransport

Based on the above basic description, the following picture shows a message workflow through the input soap.msmq custom transport to the destination SoapInputChannel:

The SoapEnvelope and its attachment(s) are transporting over the MSMQ using the DIME fashion. When the message is arrived into the queue, the PeekCompleted event is raised and its handler has the responsibility for all message process workflow to the LocalEndpoint handler. As the first step, the Dime message is deserialized by the registered formatter (SoapDimeFormatter class) and then dispatched to the proper channel based on the Dispatch algorithm shown in the above flowcharts. To avoid using the in-memory queue in the message workflow, the following override has to be performed during the channel registration:

public override void Enqueue(SoapEnvelope message)
{
   // get the receiver for this endpoint

   SoapReceiver receiver = 
        SoapReceivers.Receiver(_endpoint) as SoapReceiver;
   if(receiver != null) 
   {
      // process a message workflow through 

      // the pipeline to the endpoint handler

      receiver.ProcessMessage(message);
   }
   else 
      throw new Exception(string.Format("Missing a receiver " + 
                  "for endpoint={0}", _endpoint.Address.Value));
}

Note that by invoking the ProcessMessage method, the SoapEnvelope is passed into the SoapPort's pipeline to process a filtering on the requested SOAP Headers. After that, the message ends in the Receive handler (overridden by the application layer).

Once more think about the attachments through the soap.msmq transport. The size of the MSMQ message is limited to 4 MB, so the transferring of attachments over this limit can be accomplished by multiple transactional messages which will guarantee their order and delivering to the receiver endpoint. This feature looks like a dime chunking driven by application layer without consuming a large amount of memory.

Configuration

The soap.msmq transport properties are cached in the SoapMsmqTransportOptions object. Overwriting a default value for each property in the Options object can be done programmatically or by using the host process CONFIG file like is shown in the following snippets:

<webServices>
 <messaging>
  <transports>
   <add scheme="soap.msmq" 
          type="RKiss.WseTransports.SoapMsmqTransport, 
                            SoapMSMQ, Version=1.0.0.0, 
                                      Culture=neutral, 
                       PublicKeyToken=a94298b6c0d04e59">
    <transactional enabled="true" />
    <sender>
      <adminQueue path=".\private$\adminchannel" />
      <responseQueue path=".\private$\rspchannel" />
      <toBeReceived timeInSec="30" />
      <toBeReachQueue timeInSec="5" />
      <useDeadQueue enabled="true" />
    </sender>
    <receiver>
      <workerThreads number="1" />
      <errorQueue path=".\private$\reqchannel_error" />
     </receiver>
   </add>
  </transports>
 </messaging>
</webServices>

The configuration section for soap.msmq transport in the CONFIG file has been divided into common, sender and receiver sections. Note that the attribute names are case sensitive and the type of the transport has to be located on one line. The SoapMsmqTransport constructor obtains the XmlNodeList reference to this section to walk through the CONFIG section nodes and for properly updating properties in the Options object.

The other way to update the transport options is programmatically - see the following code snippet:

 SoapMsmqTransport transport = new SoapMsmqTransport();
 WebServicesConfiguration.MessagingConfiguration.AddTransport("soap.msmq", 
                                                                transport);
 transport.Options.TimeToBeReceived = 40;

The soap.msmq transport can be configured by the following properties:

Name and Default value Transport Note
<transactional enabled="true" /> sender and receiver sender and receiver will handle the message in the transactional manner.
<adminQueue path="" /> sender administration queue for timeout messages.
<responseQueue path="" /> sender response queue (not implemented).
<toBeReceived timeInSec="0" /> sender timeout limit to retrieve message by receiver.
<toBeReachQueue timeInSec="0" /> sender timeout limit to send message into the destination queue.
<useDeadQueue enabled="true" /> sender option to use a dead queue.
<workerThreads number="1" /> receiver number of worker threads to retrieve messages, max. number is 25.
<errorQueue path="" /> receiver receiver exception queue (non SoapMessage, etc.).
Mapping MSMQ FormatName to EndpointReference addrress

The soap.msmq transport URI address format must have a correct syntax to construct the MSMQ format name. The following formats can be used by soap.msmq transport:

URI address (examples) Comment
soap.msmq://./private$/MyQueue private queue on the local machine.
soap.msmq://MyServer/private$/MyQueue private queue on the machine MyServer.
soap.msmq://MyServer/MyQueue public queue on the machine MyServer.
soap.msmq://127.0.0.1/private$/MyQueue private queue on the machine defined by IP address.
soap.msmq://234.1.1.1:4455 broadcasting (multicast) message to the queues specified by this IP address and port.
soap.msmq://http://MyServer/msmq/private$/MyQueue private queue over Internet.
soap.msmq://127.0.0.1:1234/private$/MyQueue private queue on the machine specified by IP address and port.
soap.msmq://./MyQueue public queue on the local machine.

The following examples show some usage of the soap.msmq addressing:

  1. Send message to the RemoteEndpoint urn:myReceiver via a local private 'MyQueue' queue:
    EndpointReference epr = new EndpointReference(new Uri("urn:myReceiver")); 
    epr.Via = new Uri(@"soap.msmq://./private$/MyQueue");
    SoapSender Sender = new SoapSender(epr);
    Sender.Send(message);
  2. Send message to all RemoteEndpoints with urn:myReceiver via a MSMQ broadcasting address 234.1.1.1 and port 4455:
    EndpointReference epr = new EndpointReference(new Uri("urn:myReceiver"));
    epr.Via = new Uri(@"soap.msmq://234.1.1.1:4455");
    SoapSender Sender = new SoapSender(epr);
    Sender.Send(message);
  3. Send message to the RemoteEndpoint urn:myReceiver via a private 'MyQueue' queue located on MyServer over Internet:
    EndpointReference epr = new EndpointReference(new Uri("urn:myReceiver")); 
    epr.Via = new Uri(@"soap.msmq://http://MyServer/msmq/private$/MyQueue");
    SoapSender Sender = new SoapSender(epr);
    Sender.Send(message);
SoapEnvelope Context for soap.msmq transport

The soap.msmq transport has the capability to be controlled by the client SoapEnvelope context based on the contract properties. The following table shows the properties available at the sender/receiver side:

Name Type Used by Note
TransactionId string receiver MSMQ transaction ID.
IsFirstInTransaction boolean receiver true if the message is first in the single/multiple transaction.
IsLastInTransaction boolean receiver true if the message is last in the multiple transaction or in the single transaction.
Acknowledgment object receiver classification of acknowledgment that the received message represents.
MessageQueueTransaction object sender receiver Object to provide a Message Queuing internal transaction (MSMQ).
MessageLabel string sender receiver message label

Example to identify a message position in the multiple transactions:

 bool first = Convert.ToBoolean(message.Context[SoapMSMQ.IsFirstInTransaction]);

Example to setup a label of the MSMQ message (suitable for troubleshooting):

 envelope.Context.Add(SoapMSMQ.MessageLabel, "This is a test label");

Usage

Using the soap.msmq transport in the communication model enables a loosely coupled design pattern between the endpoints or peer-to-peer connectivity. In the Service Oriented Architecture (SOA), asynchronously sending messages in the fire&forget fashion is a basic pattern for the push model (forward message to the endpoint).

In the following design examples, I will show you some usage of the soap.msmq transport to encapsulate a business layer from the connectivity service.

Example 1. Yielding a business workflow

When business workflow process is driven by slow resources or third party services, the above design solution can help it. Its concept is based on splitting the business workflow into the synch and asynch sub-workflows. The synch part represents a pre-processing workflow with a fast response status. Using the soap.msmq transport in the SoapClient, the pre-processing can be done in a transactional manner using the MSDTC supported by EnterpriseServices (SWS) feature. The actual business work is processing asynchronously in the second part of the workflow (SoapService) and when it is done the notification message is sent to the notification/post-processing RemoteEndpoint address via soap.msmq in the WS-E fashion.

Example 2. Parallel business workflow processing

The business workflow concurrency means to process same common (repeatable) parts of the processing in a parallel manner. The advantage of this solution is in significantly increasing the performance of the business throughput supported by clustering environment. The core of the parallel - event driven processing model is based on the MSMQ technology. The soap.msmq transport enables to encapsulate the business layer from this asynch service.

For example, the batch request (or multiple request orders) can be processed in a parallel way instead of a serial one by one. The synchronous pre-processing workflow will generate a set of multiple transactional single requests with the business data and after they commit the call is returned back with the business status. The actual business process is done in the SoapService receivers. Each business processor updates the business state located in the Enterprise database and the last one will send a notification message for the next step - postprocessing.

Example 3. Broadcasting notification

The MSMQ 3.0 (Win2K3/XP) supports a broadcasting message (MULTICAST feature). Configuring the SoapService for multicast non-transactional queue by using the EndpointReference address, for instance: Uri("soap:msmq://.private$/notification") we can receive a loosely coupled notification (SoapEnvelope) from the publisher.

Example 4. Upload/Download file

The WSE 2 - messaging supports a DIME feature. Its programming is very straightforward at both ends. Attaching a large file(s) to the SoapEnvelope can lead to resource problems such as memory size, recycling, etc.. The soap.msmq transport offers a solution based on the MSMQ feature - sending the multiple messages in the transaction context. In this scenario, the SoapClient represents the root of the MSMQ transaction, and its application Send method will split the file into small chunks of attachments (must be a less 4MB - the MSMQ limit!) and send to SoapService after they are committed. Note that the attachments are in sequence order and they move as an ATOMIC package across the network. The MSMQ guarantees delivering this package based on the sender requirements.

The SoapService should be configured as a soap.msmq Transporter with only one worker thread to process the merging attachments into the destination file one by one. Based on the message context, the SoapService knows the position of the Attachments in the transactional package.

Examples of the transactional programming

The following code snippet shows the distributed transaction between two resources such as MSMQ (soap.msmq) and database. Using the SWC (Service Without Components feature of the .NET 1.1 on the platforms Win2K3/XPSP2) in the root object, the transactional workflow implementation is very straightforward and it enables hosting the root transaction outside of the COM+ catalog or IIS. Note that the soap.msmq transport enlists a transaction automatically:

class MyTxClient : SoapClient
{
   public MyTxClient(EndpointReference destination) : 
     base(destination) {}

   [SoapMethod("Bar")]
   public void Bar(string text)
   {
      try 
      {
         ServiceConfig sc = new ServiceConfig();
         sc.Transaction = TransactionOption.RequiresNew;
         sc.TransactionTimeout = 20;
         sc.TrackingAppName = "SoapMSMQ Test"; 
         sc.TrackingEnabled = true;

         // create a transactional context

         ServiceDomain.Enter(sc);

         // send tx message

         base.SendOneWay("Bar", text);

         // update database

         UpdateDB(text);

         // commit

         ContextUtil.SetComplete();
      }
      catch(Exception ex) 
      {
         // abort transaction

         ContextUtil.SetAbort();
         Trace.WriteLine(ex.Message);
      }
      finally
      {
         // clean-up 

         ServiceDomain.Leave();
      }
   }
}

The following example shows how two SoapEnvelopes can be sent to the Endpoints in a transactional manner as multiple messages. The soap.msmq transport supports a manual transaction (MessageQueueTransaction) created in the client's root object. The transaction context is passed by the envelope's context to the transport layer. This manual transaction is controlled by the client using the Commit and Abort methods. The messages are inserted into the destination queues after they are committed. Note that the non-delivered or non-received messages can be collected by the system transactional dead letter queue or by the application specific non-transactional admin queue.

{
   MessageQueueTransaction mqtx = null;
   try
   {
      mqtx = new MessageQueueTransaction();
      mqtx.Begin();

      // clients

      MyClient client1 = new MyClient(new Uri("urn:myReceiver"), 
                         new Uri(@"soap.msmq://./private$/reqchannel"));
      MyClient client2 = new MyClient(new Uri("urn:myReceiver"), 
                         new Uri(@"soap.msmq://./private$/reqchannel2"));

      // envelopes

      SoapEnvelope env1 = new SoapEnvelope();
      env1.SetBodyObject(string.Format("<Say>First message</Say>"));
      SoapEnvelope env2 = new SoapEnvelope();
      env2.SetBodyObject(string.Format("<Say>Second message</Say>"));

      // use the same transaction - multiple tx messages

      env1.Context.Add(SoapMSMQ.MessageQueueTransaction, mqtx);
      env2.Context.Add(SoapMSMQ.MessageQueueTransaction, mqtx);

      // action

      client1.Foo(env1);
      client2.Foo(env2);

      // commit

      if(mqtx != null && mqtx.Status == MessageQueueTransactionStatus.Pending) 
         mqtx.Commit();      
   }
   finally 
   {
                
      if(mqtx != null) 
      {
         if(mqtx.Status == MessageQueueTransactionStatus.Pending) 
         mqtx.Abort();    
         mqtx.Dispose();
      }
   }
}

Implementation

Implementation of the soap.msmq transport solution is divided into four files based on the layout of the WSE 2.0 standard transports for TCP, HTP and inproc resources. Thanks for Reflector for .NET for digging and helping me to understand more about the WSE 2.0 messaging implementation. Also my thanks go to the custom soap.smtp (Steve Maine), soap.udp ( Hervey Wilson) and soap.sql (Mike Taulty) transports for additional valuable info.

TransportOptions.cs

The purpose of this class is to cache the transport's properties. As you can see, it's a straightforward implementation of the get/set smart properties. The transport constructor initiates this class with default values on the beginning of the process.

//*****************************************************************************

//    Description.....MSMQ Transport for WSE Messaging - Transport Options

//                                

//    Author..........Roman Kiss, rkiss@pathcom.com

//    Copyright © 2004 ATZ Consulting Inc.     

//                        

//    Date Created:    06/06/04

//

//    Date        Modified By     Description

//-----------------------------------------------------------------------------

//    06/06/04        Roman Kiss     Initial Revision

//*****************************************************************************


#region References
using System;
#endregion

namespace RKiss.WseTransports
{
   /// <span class="code-SummaryComment"><SUMMARY>Summary description for TransportOptions.</SUMMARY></span>

   public class SoapMsmqTransportOptions
   {    
      #region Constants
      const int MaxNumberOfThreads = 20;
      const int MinNumberOfThreads = 1;
      #endregion

      #region Private members
      private int _TimeToBeReceived = 0;
      private int _TimeToReachQueue = 0;
      private string _AdminQueuePath = "";
      private string _ErrorQueuePath = "";
      private string _ResponseQueuePath = "";
      private bool _UseDeadLetterQueue = false;
      private bool _Transactional = true;
      private int _NumberOfThreads = MinNumberOfThreads;
      #endregion

      #region Constructors
      public SoapMsmqTransportOptions()
      {
      }
      #endregion

      #region Properties
      public bool Transactional 
      {
        get { return _Transactional; }
        set { _Transactional = value; }
      }

      public int NumberOfThreads 
      {
        get { return _NumberOfThreads; }
        set 
          { 
            _NumberOfThreads = (value > 0 && value <= MaxNumberOfThreads) 
                ? value : MinNumberOfThreads;
          }
      }

      public bool UseDeadLetterQueue 
      {
        get { return _UseDeadLetterQueue; }
        set { _UseDeadLetterQueue = value; }
      }

      public int TimeToBeReceived 
      {
        get { return _TimeToBeReceived; }
        set { _TimeToBeReceived = value; }
      }
        
      public int TimeToReachQueue 
      {
        get { return _TimeToReachQueue; }
        set { _TimeToReachQueue = value; }
      }

      public string AdminQueuePath 
      {
        get { return _AdminQueuePath; }
        set { _AdminQueuePath = value; }
      }

      public string ErrorQueuePath 
      {
        get { return _ErrorQueuePath; }
        set { _ErrorQueuePath = value; }
      }

      public string ResponseQueuePath 
      {
        get { return _ResponseQueuePath; }
        set { _ResponseQueuePath = value; }
      }
      #endregion

      #region Public Methods
      
      public SoapMsmqTransportOptions Clone()
      {
        SoapMsmqTransportOptions options = new SoapMsmqTransportOptions();
        options.ResponseQueuePath = ResponseQueuePath;
        options.ErrorQueuePath    = ErrorQueuePath;
        options.TimeToReachQueue  = TimeToReachQueue;
        options.TimeToBeReceived  = TimeToBeReceived;
        options.UseDeadLetterQueue = UseDeadLetterQueue;
        options.NumberOfThreads    = NumberOfThreads;
        options.Transactional      = Transactional;
        return options;
      }    
      #endregion
   }
}

OutputChannel.cs

This is a lightweight derived class from the SoapOutputChannel to hold the specific properties of the custom transport needed for sending the SoapEnvelope to the underlying layer - registered transport. Note that the soap.msmq transport is always a fire&forget message to the RemoteEndpoint address, so there are no implementation of the public BeginSend/EndSend pair methods.

The SoapMsmqOutputChannel is initiated by the SoapSender request to the registered transport to obtain a channel based on the UriScheme. Each sender will create an instance of the SoapMsmqOutputChannel object and properly initiate its Destination address.

//********************************************************************

//    Description.....MSMQ Transport for WSE Messaging - Output Channel

//                                

//    Author..........Roman Kiss, rkiss@pathcom.com

//    Copyright © 2004 ATZ Consulting Inc.     

//                        

//    Date Created:    06/06/04

//

//    Date        Modified By    Description

//--------------------------------------------------------------------

//    06/06/04        Roman Kiss    Initial Revision

//********************************************************************



#region References
using System;
//

using Microsoft.Web.Services2;
using Microsoft.Web.Services2.Addressing;
using Microsoft.Web.Services2.Messaging;
#endregion

namespace RKiss.WseTransports
{
    /// <span class="code-SummaryComment"><SUMMARY>Summary description for SoapMsmqOutputChannel.</span>

    /// <span class="code-SummaryComment"></SUMMARY></span>

    public sealed class SoapMsmqOutputChannel : SoapOutputChannel
    {
    #region Private Members
    SoapMsmqTransport _transport;
    #endregion

    #region Constructors
    /// <span class="code-SummaryComment"><SUMMARY>Constructor.</SUMMARY></span>

    /// <span class="code-SummaryComment"><PARAM name="endpoint">The remote endpoint for the channel.</span>
    /// <span class="code-SummaryComment"></PARAM></span>
    /// <span class="code-SummaryComment"><PARAM name="socket">The socket for the channel.</PARAM></span>
    /// <span class="code-SummaryComment"><PARAM name="transport">The transport for the channel.</span>
    /// <span class="code-SummaryComment"></PARAM></span>
    internal SoapMsmqOutputChannel(EndpointReference endpoint, 
                   SoapMsmqTransport transport) : base(endpoint)
    {
       _transport = transport;
    }
    #endregion

    #region Overrides
    /// <span class="code-SummaryComment"><SUMMARY>Returns the capabilities of the channel.</span>

    /// <span class="code-SummaryComment"></SUMMARY></span>

    public override SoapChannelCapabilities Capabilities
    {
       get {    return SoapChannelCapabilities.None; }
    }
    #endregion

    #region ISoapOutputChannel Overrides
    public override IAsyncResult BeginSend(SoapEnvelope message, 
                              AsyncCallback callback, object state)
    {
       throw new NotImplementedException();
    }
        
    public override void EndSend(IAsyncResult result)
    {
       throw new NotImplementedException();
    }

    /// <span class="code-SummaryComment"><SUMMARY>Sends the specified message </span>

    /// to the RemoteEndpoint.<span class="code-SummaryComment"></SUMMARY></span>

    /// <span class="code-SummaryComment"><PARAM name="message">The message to be sent.</PARAM></span>
    public override void Send(SoapEnvelope message)
    {
       if(message == null)
          throw new ArgumentNullException("message");

       _transport.SendTo(message, RemoteEndpoint);
    }
    #endregion
    }
}

InputChannel.cs

This file implements a SoapMsmqInputChannel class (derived from the SoapInputClass) for the soap.msmq transport. Its major purpose is to hold properties of the registered channel and activate the channel for incoming messages from the queue. The class is initiated when the SoapReceiver for the specified EndpointReference address has been added to the collection of the Receivers. Note that the SoapMsmqInputChannel has to be unique for this address. The number for worker threads is configurable.

The SoapMsmqInputChannel overrides a base Enqueue method to allow synchronous forwarding of a SoapEnvelope message to the Receiver's handler without its queuing in-memory queue.

//*****************************************************************************

//    Description.....MSMQ Transport for WSE Messaging - Input Channel

//                                

//    Author..........Roman Kiss, rkiss@pathcom.com

//    Copyright © 2004 ATZ Consulting Inc.     

//                        

//    Date Created:    06/06/04

//

//    Date        Modified By    Description

//-----------------------------------------------------------------------------

//    06/06/04        Roman Kiss    Initial Revision

//*****************************************************************************


#region References
using System;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Security.Permissions;
using System.Threading;
using System.Messaging;

//

using Microsoft.Web.Services2;
using Microsoft.Web.Services2.Addressing;
using Microsoft.Web.Services2.Messaging;
#endregion

namespace RKiss.WseTransports
{
    /// <span class="code-SummaryComment"><SUMMARY>Summary description for InputChannel.</SUMMARY></span>

    public sealed class SoapMsmqInputChannel : SoapInputChannel
    {
        #region Private members
        SoapMsmqTransport _transport = null;
        EndpointReference _endpoint = null;
        MessageQueue _InpQueue = new MessageQueue();
        #endregion
        
    #region Constructors
    /// <span class="code-SummaryComment"><SUMMARY>Constructor</SUMMARY></span>

    internal SoapMsmqInputChannel(EndpointReference endpoint, 
           SoapMsmqTransport transport, string strQueuePath) : base(endpoint)
    {
       try 
       {
          _transport = transport;
          _endpoint =  endpoint;
          _InpQueue.Path = strQueuePath;
          _InpQueue.MessageReadPropertyFilter.Label = true;
          _InpQueue.MessageReadPropertyFilter.ResponseQueue = true;
          _InpQueue.MessageReadPropertyFilter.TransactionStatusQueue = true;
          _InpQueue.MessageReadPropertyFilter.TransactionId = true;
          _InpQueue.MessageReadPropertyFilter.IsFirstInTransaction = true;
          _InpQueue.MessageReadPropertyFilter.IsLastInTransaction = true;
          _InpQueue.MessageReadPropertyFilter.UseDeadLetterQueue = true;
          _InpQueue.PeekCompleted += 
                new PeekCompletedEventHandler(_transport.PeekCompleted);
          
          // create a "pseudo" thread pool

          for(int ii = 0; ii < transport.Options.NumberOfThreads; ii++) 
          {
             _InpQueue.BeginPeek();
          }
       }
       catch(Exception ex) 
       {
          throw new Exception(string.Format("Creating InputChannel " + 
                    "for endpoint {0} failed.", endpoint.Via.Value), ex);
       }
    }
    #endregion

    #region ISoapChannel Overrides
    /// <span class="code-SummaryComment"><SUMMARY>Channel Capabilities</SUMMARY></span>

    public override SoapChannelCapabilities Capabilities
    {
       get { return SoapChannelCapabilities.ActivelyListening; }
    }
        
    /// <span class="code-SummaryComment"><SUMMARY>Close Channel</SUMMARY></span>

    public override void Close()
    {
       if(!Closed)
       {
          base.Close();
         _InpQueue.Close();
         _transport.CloseInputChannel(this);
         Trace.WriteLine(string.Format("[{0}].Close channel done. Endpoint={1}", 
                                _transport.UriScheme, _endpoint.Address.Value));
       }
    }

    /// <span class="code-SummaryComment"><SUMMARY>Force the message synchronously </span>

    /// to the receiver handler.<span class="code-SummaryComment"></SUMMARY></span>

    public override void Enqueue(SoapEnvelope message)
    {
       // get the receiver for this endpoint

       SoapReceiver receiver = 
          SoapReceivers.Receiver(_endpoint) as SoapReceiver;
       if(receiver != null) 
       {
          // synchronous process message workflow through 

          // the pipeline to the endpoint handler

          receiver.ProcessMessage(message);
       }
       else 
          throw new Exception(string.Format("Missing a " + 
                    "receiver for endpoint={0}", _endpoint.Address.Value));
    }        
    #endregion
    }
}

Transport.cs

This file represents a core of the soap.msmq transport. All the custom plumbing is dependent on this SoapMsmqTransport class. There are two constructors, the default one is used for programmatic setup and the other one for the CONFIG file. The class has implemented an ISoapTransport interface contract to obtain an Input resp. Output channel based on the requested EndpointReference address and capability.

The output channel is simple and straightforward - generic solution for any custom transport, but the other one - the input channel needs to setup the unique LocalEndpoint address for this channel. Based on this address the Dispatcher will find the proper channel to force a received SoapEnvelope. I decided to setup its Via address to null, because the MSMQ queue can be addressed by many different ways such as local, remote, direct, multicast, over HTTP(s), private, etc., so there is no unique format to identify its address described by Via value. Secondly, the SoapEnvelope.Context.Addressing.Via address is always null through the SoapDimeFormatter serializer.

The actual message workflow through its transport is processed in two methods - SendTo and PeekCompleted. Here is an interaction with a MSMQ messaging. The first method sends the SoapEnvelope to the message queue based on the transport options. The implementation is simple, in the first step, the helper method QueuePathFromUri is mapping the EndpointReference.Via address to the physical queue path based on the MSMQ specification, then the SoapEnvelope is serialized into the queue message and sent to the queue.

The incoming MSMQ messages are event driven. The I/O completion port waits for the message in the queue. When the message is arrived, the PeekCompleted handler will manage all message processing in the application receiver handler in a synch manner. The message is deserialized to the SoapEnvelope object, populates its context with additional info and then it will call the Dispatch method to deliver a SoapEnvelope to the endpoint through the WSE messaging infrastructure. Note again, that the Enquire method of the Input channel has been overridden to bypass an in-memory queuing and delivers direct message to the handler.

In this version, any exception thrown by the receiver's handler is not propagated to the worker thread. So, the transaction of the MSMQ message is always committed - the message is pulled-up from the queue.

The PeekCompleted handler skips non-SoapEnvelope messages and based on the configuration they can be collected in the error queue.

As I mentioned earlier, this class represents the transport core, so I am planning to make more test and improvement here such as additional options, DTC receiver and so on.

//***********************************************************************

//    Description.....MSMQ Transport for WSE Messaging - Transport (core)

//                                

//    Author..........Roman Kiss, rkiss@pathcom.com

//    Copyright © 2004 ATZ Consulting Inc.     

//                        

//    Date Created:    06/06/04

//

//    Date        Modified By    Description

//-----------------------------------------------------------------------

//    06/06/04        Roman Kiss    Initial Revision

//***********************************************************************


#region References
using System;
using System.Diagnostics;
using System.Collections;
using System.Collections.Specialized;
using System.Globalization;
using System.Reflection;
using System.Messaging;
using System.EnterpriseServices;
using System.Xml;
using System.IO;
using System.Threading;
//

using Microsoft.Web.Services2;
using Microsoft.Web.Services2.Diagnostics;
using Microsoft.Web.Services2.Addressing;
using Microsoft.Web.Services2.Messaging;
using Microsoft.Web.Services2.Referral;
#endregion

namespace RKiss.WseTransports
{
    /// <span class="code-SummaryComment"><SUMMARY>The Keys for context objects.</SUMMARY></span>

    public sealed class SoapMSMQ 
    {
        #region Public Constants
        public const string TransactionId = "TransactionId";
        public const string IsFirstInTransaction = 
                                     "IsFirstInTransaction";
        public const string IsLastInTransaction = 
                                     "IsLastInTransaction";
        public const string MessageQueueTransaction = 
                                   "MessageQueueTransaction";
        public const string MessageLabel = "MessageLabel";
        public const string Acknowledgment = "Acknowledgment"
        #endregion
    }

    /// <span class="code-SummaryComment"><SUMMARY>MSMQ Transport for WSE Messaging.</SUMMARY></span>

    public sealed class SoapMsmqTransport : SoapTransport, ISoapTransport
    {
        #region Private members
        readonly string _UriScheme = "soap.msmq";
        ISoapFormatter    _formatter = null;
        SoapMsmqTransportOptions _options = null;
        MessageQueue _OutQueue = new MessageQueue();
        MessageQueue _AdminQueue = new MessageQueue();
        MessageQueue _ResponseQueue = new MessageQueue();
        MessageQueue _ErrorQueue = new MessageQueue();
        #endregion

    #region Properties
    public string UriScheme 
    {
        get { return _UriScheme; }
    }

    public SoapMsmqTransportOptions Options 
    {
        get { return _options; }
        set { _options = value; }
    }

    public ISoapFormatter Formatter 
    {
        get { return _formatter; }
        set { _formatter = value; }
    }
    #endregion

    #region constructors
    /// <span class="code-SummaryComment"><SUMMARY>default</SUMMARY></span>

    public SoapMsmqTransport() : this(null)
    {
    }

    /// <span class="code-SummaryComment"><SUMMARY>Parameterized constructor</SUMMARY></span>

    public SoapMsmqTransport(XmlNodeList configData)
    {
        // defaults

        Formatter = new SoapDimeFormatter();
        Options   = new SoapMsmqTransportOptions();
        _OutQueue.Formatter = new BinaryMessageFormatter();
            
         // override options based on the config file

        if(configData != null)
        {
          string val = null;
          foreach(XmlNode node in configData)
          {
              XmlElement child = node as XmlElement;
              if(child != null) 
              {
                  switch (child.LocalName) 
                  {
                      case "sender":
                      {
                          #region sender
                         foreach(XmlNode sender in child) 
                          {
                              XmlElement elem = sender as XmlElement;
                              if(elem != null) 
                              {
                                  if(elem.LocalName == "toBeReceived") 
                                  {
                                      val = elem.GetAttribute("timeInSec");
                                    if(val != "") Options.TimeToBeReceived = 
                                                         Convert.ToInt32(val);
                                  }
                                  else if(elem.LocalName == "toBeReachQueue") 
                                  {
                                     val = elem.GetAttribute("timeInSec");
                                    if(val != "") Options.TimeToReachQueue = 
                                                         Convert.ToInt32(val);
                                  }
                                  else if(elem.LocalName == "adminQueue") 
                                  {
                                    val = elem.GetAttribute("path");
                                    if(val != "") Options.AdminQueuePath = val;
                                  }
                                  else if(elem.LocalName == "responseQueue") 
                                  {
                                    val = elem.GetAttribute("path");
                                    if(val != "") Options.ResponseQueuePath = val;
                                  }
                                  else if(elem.LocalName == "useDeadQueue") 
                                  {
                                    val = elem.GetAttribute("enabled");
                                    if(val != "") Options.UseDeadLetterQueue = 
                                                          Convert.ToBoolean(val);
                                  }
                              }
                          }
                          break;
                          #endregion
                      }
                      case "receiver":
                      {
                          #region receiver
                          foreach(XmlNode receiver in child) 
                          {
                              XmlElement elem = receiver as XmlElement;
                              if(elem != null) 
                              { 
                                  if(elem.LocalName == "retry") 
                                  { 
                                    val = elem.GetAttribute("timeInSec");
                                    if(val != "") Options.TimeToBeReceived = 
                                                           Convert.ToInt32(val); 
                                  } 
                                  else if(elem.LocalName == "errorQueue") 
                                  { 
                                    val = elem.GetAttribute("path"); 
                                    if(val != "") Options.ErrorQueuePath = val; 
                                  } 
                                  else if(elem.LocalName == "workerThreads") 
                                  { 
                                    val = elem.GetAttribute("number"); 
                                    if(val != "") Options.NumberOfThreads = 
                                                          Convert.ToInt32(val); 
                                  } 
                              } 
                          }
                          break;
                          #endregion
                      }
                      case "formatter": 
                      {
                          val = child.GetAttribute("type");
                          // tbd

                          break;
                      }
                      case "transactional": 
                      {
                          val = child.GetAttribute("enabled");
                          if(val != null) Options.Transactional = 
                                                 Convert.ToBoolean(val);
                          break;
                      }
                      default:
                      break;
                  }
              }
          }
      }
        
      // for cleanly shutdown

      AppDomain.CurrentDomain.ProcessExit += new EventHandler(OnProcessExit);
    }
    #endregion

    #region ISoapTransport
    /// <span class="code-SummaryComment"><SUMMARY>Build an ISoapOutputChannel for </span>

    /// the specified endpoint.<span class="code-SummaryComment"></SUMMARY></span>

    /// <span class="code-SummaryComment"><PARAM name="endpoint">The target endpoint</PARAM></span>
    /// <span class="code-SummaryComment"><PARAM name="capabilities">The channel capabilities</PARAM></span>
    /// <span class="code-SummaryComment"><RETURNS>ISoapOutputChannel</RETURNS></span>

    public ISoapOutputChannel GetOutputChannel(EndpointReference endpoint, 
                                      SoapChannelCapabilities capabilities)
    {
        #region validate arguments
        if(endpoint.TransportAddress.Scheme != UriScheme)
          throw new ArgumentException("The transport scheme for the " + 
                   "specified endpoint does not match this transport.", 
                                                           "endpoint");
        if(capabilities != SoapChannelCapabilities.None)
          throw new NotSupportedException("Unsupported " + 
            "SoapChannelCapabilities flags. Use SoapChannelCapabilities.None.");
        #endregion

        // create a new output channel

        return new SoapMsmqOutputChannel(endpoint, this);
    }

    /// <span class="code-SummaryComment"><SUMMARY>Build an ISoapInputChannel for </span>

    /// the specified endpoint.<span class="code-SummaryComment"></SUMMARY></span>

    public ISoapInputChannel GetInputChannel(EndpointReference endpoint, 
                                     SoapChannelCapabilities capabilities)
    {
        #region validate arguments
        if(endpoint == null)
            throw new ArgumentNullException("endpoint");
        if(capabilities == SoapChannelCapabilities.ActivelyListening && 
                          endpoint.TransportAddress.Scheme != UriScheme)
            throw new ArgumentException("Invalid Transport " + 
                                             "Scheme specified");
        if(capabilities != SoapChannelCapabilities.None && 
               capabilities != SoapChannelCapabilities.ActivelyListening)
            throw new NotSupportedException("Unsupported " + 
                                   "SoapChannelCapabilities Flags");
        #endregion

        // create queue path from the Uri endpoint

        string strQueuePath = QueuePathFromUri(endpoint);

        // lock the channel collecton

        lock(InputChannels.SyncRoot )
        {
            // check if the requested channel already exists.

            SoapMsmqInputChannel channel = 
                InputChannels[endpoint] as SoapMsmqInputChannel;

            if(channel == null) 
            {            
                // Create the channel

                channel = new SoapMsmqInputChannel(endpoint, this, 
                                                      strQueuePath);

                // set the local endpoint for dispatching a message

                channel.LocalEndpoint.Via = null;

                InputChannels.Add(channel);
            }

            return channel;
        }            
    }
    #endregion

    #region SendTo
    public void SendTo(SoapEnvelope message, EndpointReference endpoint)
    {    
        Message outMsg = new Message();
        string strQueuePath = "";
        string label = "";

        try 
        {
            #region write soap message into the queue 
            // set the destination queue

            strQueuePath = QueuePathFromUri(endpoint);
            _OutQueue.Path = strQueuePath;

            // serialize a soapmessage

            Formatter.Serialize(message, outMsg.BodyStream);
            
            // option: timeout to pick-up a message (receive message)

            if(Options.TimeToBeReceived > 0)
            outMsg.TimeToBeReceived = 
                     TimeSpan.FromSeconds(Options.TimeToBeReceived); 

            // option: timeout to reach destination queue (send message) 

            if(Options.TimeToReachQueue > 0)
                outMsg.TimeToReachQueue = 
                     TimeSpan.FromSeconds(Options.TimeToReachQueue);    

            // option: xact dead queue

            outMsg.UseDeadLetterQueue = Options.UseDeadLetterQueue;

            // option: notify a negative receive on the client/server side

            if(Options.AdminQueuePath != "") 
            {
                // queue path

                _AdminQueue.Path = Options.AdminQueuePath;

                // acknowledge type (mandatory)

                outMsg.AcknowledgeType = AcknowledgeTypes.NegativeReceive | 
                                   AcknowledgeTypes.NotAcknowledgeReachQueue;
                
                // admin queue for a time-expired messages

                outMsg.AdministrationQueue = _AdminQueue;    
            }

            // option: response queue

            if(Options.ResponseQueuePath != "") 
            {
                // queue path

                _ResponseQueue.Path = Options.ResponseQueuePath;
               outMsg.ResponseQueue = _ResponseQueue;
            }

            // message label

            if(message.Context[SoapMSMQ.MessageLabel] != null)
               label = 
                  Convert.ToString(message.Context[SoapMSMQ.MessageLabel]);
            else
               label = string.Concat(endpoint.TransportAddress.AbsoluteUri, 
                                 "/", endpoint.Address.Value.AbsolutePath);

            // Send message based on the transaction context

            if(ContextUtil.IsInTransaction == true) 
            {
                // we are already in the transaction - 

                // automatic (DTC) transactional message

                _OutQueue.Send(outMsg, label, 
                        MessageQueueTransactionType.Automatic);    
            }
            else if(strQueuePath.ToLower().StartsWith("formatname:multicast"))
            {
                // this is a bradcasting message to no-transactional queues

                _OutQueue.Send(outMsg, label);
            }
            else if(message.Context[SoapMSMQ.MessageQueueTransaction] != null) 
            {
                // this is a multiple message transactions

                _OutQueue.Send(outMsg, label, 
                        message.Context[SoapMSMQ.MessageQueueTransaction] as 
                                                     MessageQueueTransaction);
            }
            else if(Options.Transactional) 
            {
                // this is a single transactional message    

                using(MessageQueueTransaction mqtx = 
                                          new MessageQueueTransaction()) 
                {
                    mqtx.Begin();
                    _OutQueue.Send(outMsg, label, mqtx);
                    mqtx.Commit();
                }
            }
            else 
            {
                // this is a single non-transactional message    

                _OutQueue.Send(outMsg, label);
            }
            #endregion

            // echo

            Trace.WriteLine(string.Format("[{0}].SendTo done: {1}", 
                                         UriScheme, message.OuterXml));
        }
        catch(Exception ex) 
        {
            string strError = string.Format("[{0}].SendTo error = {1}, 
                                                   epr={2}, qpath={3}", 
                                                 UriScheme, ex.Message, 
                                    endpoint.Address.Value.AbsoluteUri, 
                                                         strQueuePath); 
            throw new Exception(strError);
        }
        finally
        {
          #region clean-up
//        if(mqtx.Status == MessageQueueTransactionStatus.Pending) 

//        {    

//            mqtx.Abort();

//            Trace.WriteLine(string.Format("[{0}].SendTo Aborted, 

                                   msgId = {1}", UriScheme, outMsg.Id));
//        }

          if(outMsg != null)
            outMsg.Dispose();
          #endregion
        }
    }
    #endregion

    #region PeekCompleted
    internal void PeekCompleted(object sender, 
                           PeekCompletedEventArgs asyncResult)
    {
        // method state
        bool done = false;
        Exception exception = null;
        Message message = null;
        MessageQueue mq = sender as MessageQueue;
        MessageQueueTransaction mqtx = 
                     new MessageQueueTransaction();

        try 
        {
            // Ending the asynchronous peek operation.
            mq.EndPeek(asyncResult.AsyncResult);

            // check if this queue is transactional 
            if(mq.Transactional)
               mqtx.Begin();

            // receive message from the queue
            message = mq.Receive(new TimeSpan(1000), mqtx);
        
            // deserialize message into the soapmessage
            SoapEnvelope envelope = 
                   Formatter.Deserialize(message.BodyStream);
                
            // insert a transaction info into the Context 
            // for receiver purposes
            envelope.Context.Add(SoapMSMQ.MessageLabel, 
                                                message.Label);
            envelope.Context.Add(SoapMSMQ.MessageQueueTransaction, 
                                                              mqtx);
            envelope.Context.Add(SoapMSMQ.TransactionId, 
                                            message.TransactionId);
            envelope.Context.Add(SoapMSMQ.IsFirstInTransaction, 
                                     message.IsFirstInTransaction);
            envelope.Context.Add(SoapMSMQ.IsLastInTransaction, 
                                      message.IsLastInTransaction);
            envelope.Context.Add(SoapMSMQ.Acknowledgment, 
                                           message.Acknowledgment);

            // dispatch soapmessage to the local 
            // endpoint (no exception!)
            done = DispatchMessage(envelope);

            // echo
            Trace.WriteLine(string.Format("[{0}].PeekCompleted {1}: {2}", 
                          UriScheme, done ? "Delivered" : "NonDelivered", 
                                                     envelope.OuterXml));
            
            // dispatch a fault message to the specified FaultTo address
            if(done == false && envelope.Fault == null && 
                    envelope.Context.Addressing.FaultTo != null &&
                    ((envelope.Context.Addressing.RelatesTo == null ) || 
                    (envelope.Context.Addressing.RelatesTo.RelationshipType != 
                                       WSAddressing.AttributeValues.Reply )))
            {
             // deliver fault message 
             AddressingFault faultAddr = 
              new AddressingFault(AddressingFault.DestinationUnreachableMessage, 
                                    AddressingFault.DestinationUnreachableCode);
             SoapEnvelope faultMessage = GetFaultMessage(envelope, faultAddr);
             ISoapOutputChannel outChannel = 
               SoapTransport.StaticGetOutputChannel(
                                   envelope.Context.Addressing.FaultTo);
             outChannel.Send(faultMessage);        
         
             Trace.WriteLine(string.Format("[{0}].PeekCompleted - " + 
                                    "Dispatched fault messages: {1}", 
                                  UriScheme, faultMessage.OuterXml));
            }    
        }
        catch(MessageQueueException ex) 
        {            
            if(ex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout) 
            {
                // skip this error, it can be happen for multiple receivers
                Trace.WriteLine(string.Format("[{0}].PeekCompleted - " + 
                    "IOTimeout info (no message in queue)", UriScheme));
            }
            else 
            {
                exception = ex;
                throw;
            }
        }
        catch(Exception ex) 
        {    
            exception = ex;

            #region option: send queued message (any) into the error queue
            if(Options.ErrorQueuePath != "" && message != null) 
            {
                try 
                {
                    string label = string.Concat(message.Label, ", 
                                                  Error=", ex.Message);
                    _ErrorQueue.Path = Options.ErrorQueuePath;
                    _ErrorQueue.Send(message, label, mqtx);
                }
                catch(Exception ex2) 
                {
                  string strErrMsg = 
                      string.Format("[{0}]PeekCompleted Failure" + 
                            " at ErrorQueue.Send: {1}", UriScheme, 
                                                  ex2.ToString());
                  Trace.WriteLine(strErrMsg);
                  Microsoft.Web.Services2.Diagnostics.EventLog.WriteError(
                                                                  strErrMsg);
                }
            }
            #endregion
        }
        finally 
        {
            // close a pending transaction (there is no abort!)
            if(mqtx.Status == MessageQueueTransactionStatus.Pending)
                mqtx.Commit();

            // exception handling
            if(exception != null) 
            {
                // publish error
                string strErrMsg = 
                    string.Format("[{0}]PeekCompleted Failure: {1}", 
                                   UriScheme, exception.ToString());
                Trace.WriteLine(strErrMsg);
                Microsoft.Web.Services2.Diagnostics.EventLog.WriteError(
                                                               strErrMsg);
            }

            // clean-up
            if(message != null)
                message.Dispose();

            // peek a next message
            mq.BeginPeek();            
        }
    }
    #endregion

    #region QueuePathFromUri
    internal string QueuePathFromUri(EndpointReference epr)
    {    
        string qpath = null;
        if(epr.TransportAddress.Authority == ".") 
        {
           // local queue (the most frequently path)
           qpath = string.Concat(".", 
              epr.TransportAddress.AbsolutePath.Replace("/", "\\"));
        }
        else 
        {
           string[] ipform = epr.TransportAddress.Authority.Split('.');
           if(ipform.Length == 4) 
           { 
             if(ipform[0].CompareTo("223") > 0 && 
                                  ipform[0].CompareTo("240") < 0) 
             {
                 // multicasting (no transactional!)
                 qpath = string.Concat("FormatName:MULTICAST=", 
                                      epr.TransportAddress.Authority);
             }
             else if(epr.TransportAddress.AbsolutePath.StartsWith("/private$/"))
             {
                 // private queue addressed by direct 
                 // format using the ip address
                 qpath = string.Concat("FormatName:DIRECT=tcp:", 
                           epr.TransportAddress.Authority, 
                         epr.TransportAddress.AbsolutePath.Replace("/", "\\"));
             }
             else 
             {
                 // public queue
                 qpath = string.Concat(epr.TransportAddress.Authority, 
                     epr.TransportAddress.AbsolutePath.Replace("/", "\\"));
             }
            }
            else 
            {
                if(epr.TransportAddress.Authority.StartsWith("http"))
                {
                    // over Internet
                    qpath = string.Concat("FormatName:DIRECT=", 
                           epr.TransportAddress.Authority, ":", 
                            epr.TransportAddress.AbsolutePath);

                }
                else if(epr.TransportAddress.AbsolutePath.StartsWith("/private$/"))
                {
                    // private queue addressed by direct format 
                    // using the ip address
                    qpath = string.Concat("FormatName:DIRECT=OS:", 
                             epr.TransportAddress.Authority, 
                             epr.TransportAddress.AbsolutePath.Replace("/", "\\"));
                }
                else 
                {
                    // public queue
                    qpath = string.Concat(epr.TransportAddress.Authority, 
                            epr.TransportAddress.AbsolutePath.Replace("/", "\\"));
                }
            }
        }
        return qpath;
    }
    #endregion
    
    #region Onxxxx
    void OnProcessExit(object sender, EventArgs args)
    {
        // close all resources
        Trace.WriteLine(string.Format("[{0}].OnProcessExit", UriScheme));

        //Close all resources
        _OutQueue.Close();
        _AdminQueue.Close();
        _ResponseQueue.Close();
        _ErrorQueue.Close();

        // close all msmq input channels
        lock(InputChannels.SyncRoot) 
        {
            HybridDictionary hd = InputChannels.SyncRoot as HybridDictionary;
            if(hd != null) 
            {
                  //walk through all registered input channels    
                foreach(IList item in hd.Values) 
                {
                    foreach(SoapInputChannel ch in item) 
                    {
                        if(ch is SoapMsmqInputChannel)
                            ch.Close();                        
                    }
                }
            }
        }
    }
    #endregion

    #region CloseInputChannel
    internal void CloseInputChannel(SoapMsmqInputChannel channel)
    {
        lock(InputChannels.SyncRoot)
        {
            if(InputChannels.Contains(channel))
                InputChannels.Remove(channel);
        }
    }
    #endregion
    }
}

Test

The soap.msmq transport is ready to use after its assembly has been inserted into the GAC. For test purposes, I have included a simple client and server to evaluate the transport for different addresses and its behavior. There are many combinations to perform a specific test. The test requires creating the following queues in prior: ReqChannel (Tx), ReqChannel2 (Tx), AdminChannel (nonTx).

Run the server and then the client (or opposite) console programs to see action on the server console. Then modify the client resp. server and run again.

Appendix A - SendRequestResponse

This feature has been added in version 1.1. The soap.msmq transport is mostly used for a fire & forget asynchronous communication model. Each request represents a OneWay communication (no ReplyTo endpoint) between the ends (client and service). The WSE2 infrastructure has built-in an implementation of the Request/Response workflow based on the one way communication and message correlation between the request/response messages. This feature can be useful for certain applications.

The following code snippet shows a usage of the soap.msmq transport for downloading a file from the service.

The client side:

class MyClient : SoapClient
{ 
   [SoapMethod("GetFile")]
   public void GetFile(string name)
   {
      // request

      EndpointReference eprReplay = new EndpointReference(new Uri("urn:myClient"));
      eprReplay.Via = new Uri(@"soap.msmq://./private$/reqchannel_ack");

      SoapEnvelope request = new SoapEnvelope();
      request.SetBodyObject(name);
      request.Context.Addressing.ReplyTo = new ReplyTo(eprReplay);

      // action (wait for the response from the service)

      SoapEnvelope response = base.SendRequestResponse("GetFile", request);

      // response

      // todo:

   }
}

The service side:

class MyService : SoapService
{ 
   [SoapMethod("GetFile")]
   public SoapEnvelope GetFile(SoapEnvelope request)
   {
      // request

      // todo:

      
      // response

      SoapEnvelope response = new SoapEnvelope();

      // attachment

      Attachment item = new Attachment("TestImage", "image/gif", @"server.exe");
      response.Context.Attachments.Add(item);

      return response; 
   }
}

What's the behind the SendRequestResponse function?

The SendRequestResponse function will create an asynch client object - SoapClientAynchResult to start processing the request and response in the Begin/End pattern fashion in the output and input channels. The following code snippets show their implementation in the SoapMsmqOutputChannel:

public override IAsyncResult BeginSend(SoapEnvelope message, 
   AsyncCallback callback, object state)
{
   SendDelegate delegator = new SendDelegate(this._transport.SendTo);
   return delegator.BeginInvoke(message, RemoteEndpoint, callback, state);
}

public override void EndSend(IAsyncResult result)
{
   SoapClient client = result.AsyncState as SoapClient;
   System.Runtime.Remoting.Messaging.AsyncResult aResult = 
         (System.Runtime.Remoting.Messaging.AsyncResult)result; 
   SendDelegate delegator = (SendDelegate)aResult.AsyncDelegate;
   string cid = delegator.EndInvoke(result);
}

and the SoapMsmqInputChannel:

public override IAsyncResult BeginReceive(AsyncCallback callback, 
                                                    object state)
{
   if(_capabilities == SoapChannelCapabilities.None)
   { 
      TimeSpan timeout = (state != null && state is SoapClient) ? 
         TimeSpan.FromMilliseconds((state as SoapClient).Timeout) :
         TimeSpan.FromSeconds(_transport.Options.TimeToBeReplied);

      ReceiveByIdDelegate delegator = 
             new ReceiveByIdDelegate(this._transport.ReceiveById);
      string cid = 
            _endpoint.ReferenceProperties.RelatesTo.Value.AbsolutePath + 
                                                             "\\1234567";
      return delegator.BeginInvoke(_InpQueue, cid, timeout, 
                                                      callback, state); 
   }

   return null;
}

public override SoapEnvelope EndReceive(IAsyncResult result)
{
   System.Runtime.Remoting.Messaging.AsyncResult aResult = 
      (System.Runtime.Remoting.Messaging.AsyncResult)result; 
   ReceiveByIdDelegate delegator = 
                 (ReceiveByIdDelegate)aResult.AsyncDelegate;
   SoapEnvelope envelope = delegator.EndInvoke(result);

   return envelope;
}

As you can see, the above BeginReceive method delegates a task to the _transport.ReceiveById method to retrieve a specific message from the queue. The message query is based on the message correlation ID and it has to be setup in the _transport.SendTo method:

outMsg.CorrelationId = 
   message.Context.Addressing.RelatesTo.Value.AbsolutePath + "\\1234567";

To retrieve a message from the queue is a straightforward process supported by the messaging method - ReceiveByCorrelationId. The following code snippet shows this process:

public SoapEnvelope ReceiveById(MessageQueue mq, string cid, TimeSpan timeout) 
{
   MessageQueueTransaction mqtx = new MessageQueueTransaction();
   SoapEnvelope envelope = null;
   Message message = null;

   try 
   {
      mq.MessageReadPropertyFilter.CorrelationId = true;

      // check if this queue is transactional 

      if(mq.Transactional)
         mqtx.Begin();

      // get the message

      message = mq.ReceiveByCorrelationId(cid, timeout, mqtx);

      // deserialize message into the soapmessage

      envelope = Formatter.Deserialize(message.BodyStream);

      // insert an additional info into the context

      envelope.Context.Add(SoapMSMQ.MessageLabel, message.Label);
      envelope.Context.Add(SoapMSMQ.Acknowledgment, message.Acknowledgment);
   }
   catch(Exception ex) 
   {
      Trace.WriteLine(string.Format("[{0}].ReceiveById failed: {1}", 
         UriScheme, ex.Message));
   }
   finally 
   {
      // close a pending transaction (there is no abort!)

      if(mqtx.Status == MessageQueueTransactionStatus.Pending)
         mqtx.Commit();

      // clean-up

      if(message != null)
         message.Dispose();
   }

   return envelope;
}

Note that the SendRequestResponse feature in the soap.msmq transport is controlled by the WSE2 infrastructure, so the following issues should be taken care of in its usage:

  • The ReplyTo transport has to be the same as the client's destination transport scheme.
  • The service method should use the signature with the SoapEnvelope object in the case of void return value, otherwise the service will handle the request as a OneWay attributed method.
  • The SoapClient.Timeout value should be the same as the transport.Options.TimeToBeReplied value. The WSE2 infrastructure breaks the state in the OnSendComplete handler (passing a value null to the state of the OnReceiveComplete handler).

Also, I made in this version a workaround in the _transport.PeekCompleted function to allow dispatching messages without blocking the input channels in the DisptachMessage method.

Conclusion

In this article I have described a WSE Messaging Custom Transport over MSMQ. Using this UriScheme in your connectivity model enables a new dimension of communication features such as loosely coupled disconnect able services, clients, etc. I hope you will enjoy it.

History

  • 06/06/04
    • Version 1.0.0.0 - Initial revision.
  • 07/07/04
    • Version 1.1.0.0 - New feature - SendRequestResponse support.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Roman Kiss
Software Developer (Senior)
United States United States
No Biography provided

Comments and Discussions

 
QuestionApplying this to EMS [modified] Pinmembernciti31-Oct-07 12:30 
AnswerRe: Applying this to EMS Pinmembernciti1-Nov-07 7:55 
QuestionWhy not use msmq over the internet to win 2003 server? Pinmembermoonminus28-Nov-06 14:07 
GeneralRegistering custom transport on WSE3 Pinmemberfgava14-Aug-06 9:06 
GeneralRe: Registering custom transport on WSE3 PinmemberRoman Kiss14-Aug-06 14:46 
AnswerRe: Registering custom transport on WSE3 Pinmemberfgava15-Aug-06 6:42 
QuestionQueue not registered in DS. PinmemberRamon Smits6-Mar-06 1:47 
AnswerRe: Queue not registered in DS. PinmemberRoman Kiss6-Mar-06 8:12 
GeneralRe: Queue not registered in DS. PinmemberRamon Smits6-Mar-06 8:45 
GeneralRe: Queue not registered in DS. PinmemberRoman Kiss6-Mar-06 10:06 
GeneralRe: Queue not registered in DS. PinmemberRamon Smits6-Mar-06 10:52 
GeneralRe: Queue not registered in DS. PinmemberRoman Kiss6-Mar-06 11:05 
GeneralRe: Queue not registered in DS. PinmemberRamon Smits6-Mar-06 11:22 
GeneralRe: Queue not registered in DS. PinmemberRamon Smits7-Mar-06 6:42 
GeneralRe: Queue not registered in DS. PinmemberRamon Smits7-Mar-06 6:37 
Question.Net 2.0 with WSE 3? Pinmemberaawaijane7-Feb-06 10:27 
AnswerRe: .Net 2.0 with WSE 3? PinmemberRoman Kiss7-Feb-06 11:21 
Generalinternet PinmemberAdrian Bacaianu1-Jan-06 5:55 
GeneralMissing receiver Pinmemberyc4king19-Sep-05 11:24 
GeneralRe: Missing receiver Pinmemberyc4king19-Sep-05 12:26 
GeneralRe: Missing receiver PinsussRoman Kiss, MVP19-Sep-05 12:40 
GeneralRe: Missing receiver Pinmemberyc4king19-Sep-05 12:49 
GeneralRe: Missing receiver PinsussRoman Kiss, MVP19-Sep-05 13:05 
GeneralRe: Missing receiver Pinmemberyc4king19-Sep-05 19:12 
GeneralMSMQ over HTTP PinmemberDaniel Danilin23-Aug-05 3:35 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Mobile
Web04 | 2.8.140827.1 | Last Updated 14 Jul 2004
Article Copyright 2004 by Roman Kiss
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid