Contents
Web Services Enhancements for Microsoft .NET version 2.0 (WSE 2.0) includes a new feature - SOAP messaging. This feature enables implementation of the message driven (oriented) architecture based on the WS-* specification and minimizes its migration into the next connectivity model - Indigo. The WSE 2.0 introduces a lightweight messaging infrastructure allowing to send and receive the SoapEnvelope
between the endpoint and its consumer using built-in transports such as TCP, HTTP and inproc. The business to business message exchanging flow is based on the context of the SOAP Headers where is defined its delivering and behavior such as security, policy, endpoints, etc.
Qualitative change of the SoapEnvelope
infrastructure workflow enables to use in addition to the HTTP transport (Web Server) the transport hosted by Windows service, console, dllhost, etc. The SoapEnvelope
can be sent/received by the custom transport and then dispatched to/from the WSE- Messaging infrastructure in a fully transparent manner.
The SOAP Messaging design pattern encapsulates the business layer from the connectivity model using the loosely coupled plug-in transport mechanism in the host process CONFIG file. Practically, the SoapEnvelope
can be ended (or pull-up) at variable resources such as MSMQ, SQL, etc.
This article describes implementation and usage of the WSE Messaging Custom Transport over MSMQ resource for transporting the SoapEnvelope
. Plug-in the soap.msmq
transport into the connectivity infrastructure and your application model can handle a business workflow in a transactional, asynchronous and disconnect able manner.
Let's start it by describing the WSE 2.0 messaging boilerplate from the custom transport point of the view.
The WSE 2.0 messaging infrastructure can be divided into the three levels like it is shown in the following picture:

This simple infrastructure is based on flowing the SoapEnvelope
messages between the Transport and Business layer. Each level up of the infrastructure encapsulates a message processing into the generic model connectivity depending only on the Formatter and Transport components. This abstraction makes the WSE Messaging fully transparent between the RemoteEndpoint
and its consumer and suitable for the Logical Model Connectivity.
Using the above connectivity model, the SoapEnvelope
message can be transferred (or routed) to/from generic resources such as TCP socket, shared memory, MSMQ, SQL database, USB, Serial Port, etc..
Level 0
The Level 0 encapsulates the message workflow to/from physical transport using the following interface contracts:
ISoapFormatter
to serialize or deserialize the SoapEnvelope
object into the stream. ISoapTransport
to manage transferring the SoapEnvelope
stream in the specific UriScheme such as TCP, HTTP, inproc and custom (e.g. MSMQ). ISoapChannel
to manage a base function of the channel such as Close and Capabilities. ISoapInputChannel
to control receiving a SoapEnvelope
in the asynch and synch manner for a specific LocalEndpoint
. ISoapOutputChannel
to control sending a SoapEnvelope
in the asynch and synch manner for a specific RemoteEndpoint
.
The generic message workflow in this level is implemented in the Microsoft.Web.Services2.Messaging
namespace base classes using the above interface contracts. This level represents the "glue" pattern and encapsulation of the message handling between the transport and channel.
SoapDimeFormatter
- this base class has the responsibility for serializing and deserializing of the SoapEnvelope
into the Dime (binary) records. The SoapEnvelope
body is located in the first dime record and the Attachment(s) occupied next record(s). Note that this formatter is a default formatter for TCP, inproc and soap.msmq
transports. The formatter can be plugged-into the Transport programmatically or administratively using the host process CONFIG file. SoapPlainFormatter
- serializing and deserializing of the SoapEnvelope
object into the XML text formatted string. This is a default formatter for HTTP transport. SoapTransport
- this is a major base class to dispatch a SOAP message between the specific channel and its transport in a generic manner. The outgoing message workflow is very simple and straightforward. The SoapTransport
class has a public
method to get the specific SoapOutputChannel
registered in the collection based on the EndpointReference
address. Once the sender obtains the specific channel, the message can flow through its registered transport (TCP, HTTP, etc.) to the RemoteEndpoint
address. On the other hand, the incoming stream from the resource (socket, MSMQ, etc.) is deserialized by the SoapFormatter
into the SoapEnvelope
object and then dispatched to the specific SoapInputChannel
. The dispatcher has the responsibility to enqueue the SoapEnvelope
message reference into the in-memory queue located in the specific channel. In case of failure, the dispatcher will raise the DispatchFailed
error for delegating a fault message to the application handler. SoapInputChannel
- the default purpose of this base class is to yield a message workflow between the transport and receiver using the in-memory queue (System.Collection.Queue
object). The virtual Enqueue
method (called by transport) is pushing the message reference into the queue and setting the synchronization object to wakeup its receiver. The message can be pulled from the queue synchronously or asynchronously using the Receive
, BeginReceive
and EndReceive
virtual
methods. Note that each channel is unique, identified by the LocalEndpoint
reference property for its addressing purposes by transport and receiver.
By default, the incoming messages are destined to the channel's private in-memory queue. To process a message workflow synchronously to the receiver handler, it is necessary to override a SoapInputChannel.Enqueue
method - for instance like is done in the soap.msmq
transport.
SoapOutputChannel
- represents the base class of the sending message workflow to the registered transport based on the UriScheme and RemoteEndpoint
address reference. From the client side, the workflow can be performed synchronously or asynchronously.
The following flowchart shows a dispatch algorithm of a SoapEnvelope
to the proper SoapInputChannel
:

The core of the DispatchMessage
algorithm is to find the proper input channel based on the message addressing. It's based on the Address.To
or AnyRoles
value. The following flowchart shows that:

The LocalEndpoint.Matches(message)
block is the final condition to pick-up the correct Input Channel. This block will return a true
if the following rules are passed:
if (LocalPoint.Address.Value != WSAddressing.AnyRoles &&
(LocalPoint.Address.Value != null
&& LocalPoint.Address.Value != message.Context.Addressing.To.Value))
return False
if (LocalPoint.Via != null &&
Channel.LocalPoint.Via != message.Context.Addressing.Via)
return False
if (Channel.Properties != null &&
Channel.Properties != message.Properties) return False
return True
As you can see in the above DispatchMessage
flowchart, the message flows through many addressing conditions. There is no detail exception info about why the Dispatch failed, so it is very important to understand a WS-Addressing spec.
Level 1
The Level 1 represents a one way communication layer to send or receive a SoapEnvelope
from/to the business layer in either synch or asynch manner. This message processing is handled by SoapSender
resp. SoapReceiver
class is derived from the SoapPort
base class. The SoapPort
class is a logical bi-directional communication port to hold the pipeline of filters to manage a behavior of the message delivering based on the SOAP Headers. The pipeline can be customized by custom Output/InputFilters behind the DefaultFilters such as Security, Referral and Policy Filters.
Creating an instance of the SoapSender
for the specific destination EndpointRefererence
address, the sender is now ready to send a SoapEnvelope
object synchronously using the Send
method or asynchronously via a BeginSend
/EndSend
design pattern.
The following figure shows the workflow of the SoapEnvelope
sent by the business layer to the RemoteEndpoint
address through the soap.msmq
transport using the Send
method:

The message sending workflow is simple and straightforward, after validation of the SoapEnvelope.Context.Addressing
and populating its destination endpoint and channel, the SoapEnvelope
is sent to the pipeline for filtering the SOAP Headers (FilterMessage
). The next step is just invoking the Send
method on the registered output channel. The rest of the task is dependent on the transport type and its formatter. Note that the soap.msmq
transport always sends the message to the RemoteEndpoint
address in the Fire&Forget fashion (OneWay) without waiting for the response and return value. The sending process via all levels is very fast, because the call always ends in the local queue such as a private or outgoing queue of the MSMQ.
Level 2
The Level 2 is fully isolated from the message transport. It represents the highest level of the WSE 2.0 messaging infrastructure to enable a full duplex communication model using either the synch or asynch features. The application layer needs to create a business object derived from the SoapClient
and SoapService
classes to handle sending and receiving the SoapEnvelope
messages respectively. Overriding the base virtual
method(s), the messages can flow through the application specific handler to the destination service.
The following code snippet shows a simple example of the client business object to invoke a service method without handling the SoapEnvelope
:
public class MyClient : SoapClient
{
public MyClient(EndpointReference destination) : base(destination){}
[SoapMethod("Bar")]
public void Bar(string text)
{
base.SendOneWay("Bar", text);
}
}
and here is the server side:
public class MyService : SoapService
{
[SoapMethod("Bar")]
public void Bar(string text)
{
}
}
SoapMsmqTransport
Based on the above basic description, the following picture shows a message workflow through the input soap.msmq
custom transport to the destination SoapInputChannel
:

The SoapEnvelope
and its attachment(s) are transporting over the MSMQ using the DIME fashion. When the message is arrived into the queue, the PeekCompleted
event is raised and its handler has the responsibility for all message process workflow to the LocalEndpoint
handler. As the first step, the Dime message is deserialized by the registered formatter (SoapDimeFormatter
class) and then dispatched to the proper channel based on the Dispatch algorithm shown in the above flowcharts. To avoid using the in-memory queue in the message workflow, the following override has to be performed during the channel registration:
public override void Enqueue(SoapEnvelope message)
{
SoapReceiver receiver =
SoapReceivers.Receiver(_endpoint) as SoapReceiver;
if(receiver != null)
{
receiver.ProcessMessage(message);
}
else
throw new Exception(string.Format("Missing a receiver " +
"for endpoint={0}", _endpoint.Address.Value));
}
Note that by invoking the ProcessMessage
method, the SoapEnvelope
is passed into the SoapPort's pipeline to process a filtering on the requested SOAP Headers. After that, the message ends in the Receive handler (overridden by the application layer).
Once more think about the attachments through the soap.msmq
transport. The size of the MSMQ message is limited to 4 MB, so the transferring of attachments over this limit can be accomplished by multiple transactional messages which will guarantee their order and delivering to the receiver endpoint. This feature looks like a dime chunking driven by application layer without consuming a large amount of memory.
The soap.msmq
transport properties are cached in the SoapMsmqTransportOptions
object. Overwriting a default value for each property in the Options
object can be done programmatically or by using the host process CONFIG file like is shown in the following snippets:
<webServices>
<messaging>
<transports>
<add scheme="soap.msmq"
type="RKiss.WseTransports.SoapMsmqTransport,
SoapMSMQ, Version=1.0.0.0,
Culture=neutral,
PublicKeyToken=a94298b6c0d04e59">
<transactional enabled="true" />
<sender>
<adminQueue path=".\private$\adminchannel" />
<responseQueue path=".\private$\rspchannel" />
<toBeReceived timeInSec="30" />
<toBeReachQueue timeInSec="5" />
<useDeadQueue enabled="true" />
</sender>
<receiver>
<workerThreads number="1" />
<errorQueue path=".\private$\reqchannel_error" />
</receiver>
</add>
</transports>
</messaging>
</webServices>
The configuration section for soap.msmq
transport in the CONFIG file has been divided into common, sender and receiver sections. Note that the attribute names are case sensitive and the type of the transport has to be located on one line. The SoapMsmqTransport
constructor obtains the XmlNodeList
reference to this section to walk through the CONFIG section nodes and for properly updating properties in the Options
object.
The other way to update the transport options is programmatically - see the following code snippet:
SoapMsmqTransport transport = new SoapMsmqTransport();
WebServicesConfiguration.MessagingConfiguration.AddTransport("soap.msmq",
transport);
transport.Options.TimeToBeReceived = 40;
The soap.msmq
transport can be configured by the following properties:
Name and Default value | Transport | Note |
<transactional enabled="true" /> | sender and receiver | sender and receiver will handle the message in the transactional manner. |
<adminQueue path="" /> | sender | administration queue for timeout messages. |
<responseQueue path="" /> | sender | response queue (not implemented). |
<toBeReceived timeInSec="0" /> | sender | timeout limit to retrieve message by receiver. |
<toBeReachQueue timeInSec="0" /> | sender | timeout limit to send message into the destination queue. |
<useDeadQueue enabled="true" /> | sender | option to use a dead queue. |
<workerThreads number="1" /> | receiver | number of worker threads to retrieve messages, max. number is 25. |
<errorQueue path="" /> | receiver | receiver exception queue (non SoapMessage, etc.). |
Mapping MSMQ FormatName to EndpointReference addrress
The soap.msmq
transport URI address format must have a correct syntax to construct the MSMQ format name. The following formats can be used by soap.msmq
transport:
URI address (examples) | Comment |
soap.msmq://./private$/MyQueue | private queue on the local machine. |
soap.msmq://MyServer/private$/MyQueue | private queue on the machine MyServer. |
soap.msmq://MyServer/MyQueue | public queue on the machine MyServer. |
soap.msmq://127.0.0.1/private$/MyQueue | private queue on the machine defined by IP address. |
soap.msmq://234.1.1.1:4455 | broadcasting (multicast) message to the queues specified by this IP address and port. |
soap.msmq://http://MyServer/msmq/private$/MyQueue | private queue over Internet. |
soap.msmq://127.0.0.1:1234/private$/MyQueue | private queue on the machine specified by IP address and port. |
soap.msmq://./MyQueue | public queue on the local machine. |
The following examples show some usage of the soap.msmq
addressing:
- Send message to the
RemoteEndpoint
urn:myReceiver
via a local private 'MyQueue
' queue:
EndpointReference epr = new EndpointReference(new Uri("urn:myReceiver"));
epr.Via = new Uri(@"soap.msmq://./private$/MyQueue");
SoapSender Sender = new SoapSender(epr);
Sender.Send(message);
- Send message to all
RemoteEndpoint
s with urn:myReceiver
via a MSMQ broadcasting address 234.1.1.1 and port 4455:
EndpointReference epr = new EndpointReference(new Uri("urn:myReceiver"));
epr.Via = new Uri(@"soap.msmq://234.1.1.1:4455");
SoapSender Sender = new SoapSender(epr);
Sender.Send(message);
- Send message to the
RemoteEndpoint
urn:myReceiver
via a private 'MyQueue
' queue located on MyServer over Internet:
EndpointReference epr = new EndpointReference(new Uri("urn:myReceiver"));
epr.Via = new Uri(@"soap.msmq://http://MyServer/msmq/private$/MyQueue");
SoapSender Sender = new SoapSender(epr);
Sender.Send(message);
SoapEnvelope Context for soap.msmq transport
The soap.msmq
transport has the capability to be controlled by the client SoapEnvelope
context based on the contract properties. The following table shows the properties available at the sender/receiver side:
Name | Type | Used by | Note |
TransactionId | string | receiver | MSMQ transaction ID. |
IsFirstInTransaction | boolean | receiver | true if the message is first in the single/multiple transaction. |
IsLastInTransaction | boolean | receiver | true if the message is last in the multiple transaction or in the single transaction. |
Acknowledgment | object | receiver | classification of acknowledgment that the received message represents. |
MessageQueueTransaction | object | sender receiver | Object to provide a Message Queuing internal transaction (MSMQ). |
MessageLabel | string | sender receiver | message label |
Example to identify a message position in the multiple transactions:
bool first = Convert.ToBoolean(message.Context[SoapMSMQ.IsFirstInTransaction]);
Example to setup a label of the MSMQ message (suitable for troubleshooting):
envelope.Context.Add(SoapMSMQ.MessageLabel, "This is a test label");
Using the soap.msmq
transport in the communication model enables a loosely coupled design pattern between the endpoints or peer-to-peer connectivity. In the Service Oriented Architecture (SOA), asynchronously sending messages in the fire&forget fashion is a basic pattern for the push model (forward message to the endpoint).
In the following design examples, I will show you some usage of the soap.msmq
transport to encapsulate a business layer from the connectivity service.
Example 1. Yielding a business workflow

When business workflow process is driven by slow resources or third party services, the above design solution can help it. Its concept is based on splitting the business workflow into the synch and asynch sub-workflows. The synch part represents a pre-processing workflow with a fast response status. Using the soap.msmq
transport in the SoapClient
, the pre-processing can be done in a transactional manner using the MSDTC supported by EnterpriseServices (SWS) feature. The actual business work is processing asynchronously in the second part of the workflow (SoapService
) and when it is done the notification message is sent to the notification/post-processing RemoteEndpoint
address via soap.msmq
in the WS-E fashion.
Example 2. Parallel business workflow processing
The business workflow concurrency means to process same common (repeatable) parts of the processing in a parallel manner. The advantage of this solution is in significantly increasing the performance of the business throughput supported by clustering environment. The core of the parallel - event driven processing model is based on the MSMQ technology. The soap.msmq
transport enables to encapsulate the business layer from this asynch service.
For example, the batch request (or multiple request orders) can be processed in a parallel way instead of a serial one by one. The synchronous pre-processing workflow will generate a set of multiple transactional single requests with the business data and after they commit the call is returned back with the business status. The actual business process is done in the SoapService
receivers. Each business processor updates the business state located in the Enterprise database and the last one will send a notification message for the next step - postprocessing.

Example 3. Broadcasting notification
The MSMQ 3.0 (Win2K3/XP) supports a broadcasting message (MULTICAST feature). Configuring the SoapService
for multicast non-transactional queue by using the EndpointReference
address, for instance: Uri("soap:msmq://.private$/notification")
we can receive a loosely coupled notification (SoapEnvelope
) from the publisher.

Example 4. Upload/Download file
The WSE 2 - messaging supports a DIME feature. Its programming is very straightforward at both ends. Attaching a large file(s) to the SoapEnvelope
can lead to resource problems such as memory size, recycling, etc.. The soap.msmq
transport offers a solution based on the MSMQ feature - sending the multiple messages in the transaction context. In this scenario, the SoapClient
represents the root of the MSMQ transaction, and its application Send
method will split the file into small chunks of attachments (must be a less 4MB - the MSMQ limit!) and send to SoapService
after they are committed. Note that the attachments are in sequence order and they move as an ATOMIC package across the network. The MSMQ guarantees delivering this package based on the sender requirements.
The SoapService
should be configured as a soap.msmq
Transporter with only one worker thread to process the merging attachments into the destination file one by one. Based on the message context, the SoapService
knows the position of the Attachments in the transactional package.

Examples of the transactional programming
The following code snippet shows the distributed transaction between two resources such as MSMQ (soap.msmq
) and database. Using the SWC (Service Without Components feature of the .NET 1.1 on the platforms Win2K3/XPSP2) in the root object, the transactional workflow implementation is very straightforward and it enables hosting the root transaction outside of the COM+ catalog or IIS. Note that the soap.msmq
transport enlists a transaction automatically:
class MyTxClient : SoapClient
{
public MyTxClient(EndpointReference destination) :
base(destination) {}
[SoapMethod("Bar")]
public void Bar(string text)
{
try
{
ServiceConfig sc = new ServiceConfig();
sc.Transaction = TransactionOption.RequiresNew;
sc.TransactionTimeout = 20;
sc.TrackingAppName = "SoapMSMQ Test";
sc.TrackingEnabled = true;
ServiceDomain.Enter(sc);
base.SendOneWay("Bar", text);
UpdateDB(text);
ContextUtil.SetComplete();
}
catch(Exception ex)
{
ContextUtil.SetAbort();
Trace.WriteLine(ex.Message);
}
finally
{
ServiceDomain.Leave();
}
}
}
The following example shows how two SoapEnvelope
s can be sent to the Endpoints in a transactional manner as multiple messages. The soap.msmq
transport supports a manual transaction (MessageQueueTransaction
) created in the client's root object. The transaction context is passed by the envelope's context to the transport layer. This manual transaction is controlled by the client using the Commit
and Abort
methods. The messages are inserted into the destination queues after they are committed. Note that the non-delivered or non-received messages can be collected by the system transactional dead letter queue or by the application specific non-transactional admin queue.
{
MessageQueueTransaction mqtx = null;
try
{
mqtx = new MessageQueueTransaction();
mqtx.Begin();
MyClient client1 = new MyClient(new Uri("urn:myReceiver"),
new Uri(@"soap.msmq://./private$/reqchannel"));
MyClient client2 = new MyClient(new Uri("urn:myReceiver"),
new Uri(@"soap.msmq://./private$/reqchannel2"));
SoapEnvelope env1 = new SoapEnvelope();
env1.SetBodyObject(string.Format("<Say>First message</Say>"));
SoapEnvelope env2 = new SoapEnvelope();
env2.SetBodyObject(string.Format("<Say>Second message</Say>"));
env1.Context.Add(SoapMSMQ.MessageQueueTransaction, mqtx);
env2.Context.Add(SoapMSMQ.MessageQueueTransaction, mqtx);
client1.Foo(env1);
client2.Foo(env2);
if(mqtx != null && mqtx.Status == MessageQueueTransactionStatus.Pending)
mqtx.Commit();
}
finally
{
if(mqtx != null)
{
if(mqtx.Status == MessageQueueTransactionStatus.Pending)
mqtx.Abort();
mqtx.Dispose();
}
}
}
Implementation of the soap.msmq
transport solution is divided into four files based on the layout of the WSE 2.0 standard transports for TCP, HTP and inproc resources. Thanks for Reflector for .NET for digging and helping me to understand more about the WSE 2.0 messaging implementation. Also my thanks go to the custom soap.smtp (Steve Maine), soap.udp ( Hervey Wilson) and soap.sql (Mike Taulty) transports for additional valuable info.
TransportOptions.cs
The purpose of this class is to cache the transport's properties. As you can see, it's a straightforward implementation of the get
/set
smart properties. The transport constructor initiates this class with default values on the beginning of the process.
#region References
using System;
#endregion
namespace RKiss.WseTransports
{
public class SoapMsmqTransportOptions
{
#region Constants
const int MaxNumberOfThreads = 20;
const int MinNumberOfThreads = 1;
#endregion
#region Private members
private int _TimeToBeReceived = 0;
private int _TimeToReachQueue = 0;
private string _AdminQueuePath = "";
private string _ErrorQueuePath = "";
private string _ResponseQueuePath = "";
private bool _UseDeadLetterQueue = false;
private bool _Transactional = true;
private int _NumberOfThreads = MinNumberOfThreads;
#endregion
#region Constructors
public SoapMsmqTransportOptions()
{
}
#endregion
#region Properties
public bool Transactional
{
get { return _Transactional; }
set { _Transactional = value; }
}
public int NumberOfThreads
{
get { return _NumberOfThreads; }
set
{
_NumberOfThreads = (value > 0 && value <= MaxNumberOfThreads)
? value : MinNumberOfThreads;
}
}
public bool UseDeadLetterQueue
{
get { return _UseDeadLetterQueue; }
set { _UseDeadLetterQueue = value; }
}
public int TimeToBeReceived
{
get { return _TimeToBeReceived; }
set { _TimeToBeReceived = value; }
}
public int TimeToReachQueue
{
get { return _TimeToReachQueue; }
set { _TimeToReachQueue = value; }
}
public string AdminQueuePath
{
get { return _AdminQueuePath; }
set { _AdminQueuePath = value; }
}
public string ErrorQueuePath
{
get { return _ErrorQueuePath; }
set { _ErrorQueuePath = value; }
}
public string ResponseQueuePath
{
get { return _ResponseQueuePath; }
set { _ResponseQueuePath = value; }
}
#endregion
#region Public Methods
public SoapMsmqTransportOptions Clone()
{
SoapMsmqTransportOptions options = new SoapMsmqTransportOptions();
options.ResponseQueuePath = ResponseQueuePath;
options.ErrorQueuePath = ErrorQueuePath;
options.TimeToReachQueue = TimeToReachQueue;
options.TimeToBeReceived = TimeToBeReceived;
options.UseDeadLetterQueue = UseDeadLetterQueue;
options.NumberOfThreads = NumberOfThreads;
options.Transactional = Transactional;
return options;
}
#endregion
}
}
OutputChannel.cs
This is a lightweight derived class from the SoapOutputChannel
to hold the specific properties of the custom transport needed for sending the SoapEnvelope
to the underlying layer - registered transport. Note that the soap.msmq
transport is always a fire&forget message to the RemoteEndpoint
address, so there are no implementation of the public BeginSend
/EndSend
pair methods.
The SoapMsmqOutputChannel
is initiated by the SoapSender
request to the registered transport to obtain a channel based on the UriScheme. Each sender will create an instance of the SoapMsmqOutputChannel
object and properly initiate its Destination address.
#region References
using System;
using Microsoft.Web.Services2;
using Microsoft.Web.Services2.Addressing;
using Microsoft.Web.Services2.Messaging;
#endregion
namespace RKiss.WseTransports
{
public sealed class SoapMsmqOutputChannel : SoapOutputChannel
{
#region Private Members
SoapMsmqTransport _transport;
#endregion
#region Constructors
internal SoapMsmqOutputChannel(EndpointReference endpoint,
SoapMsmqTransport transport) : base(endpoint)
{
_transport = transport;
}
#endregion
#region Overrides
public override SoapChannelCapabilities Capabilities
{
get { return SoapChannelCapabilities.None; }
}
#endregion
#region ISoapOutputChannel Overrides
public override IAsyncResult BeginSend(SoapEnvelope message,
AsyncCallback callback, object state)
{
throw new NotImplementedException();
}
public override void EndSend(IAsyncResult result)
{
throw new NotImplementedException();
}
public override void Send(SoapEnvelope message)
{
if(message == null)
throw new ArgumentNullException("message");
_transport.SendTo(message, RemoteEndpoint);
}
#endregion
}
}
InputChannel.cs
This file implements a SoapMsmqInputChannel
class (derived from the SoapInputClass
) for the soap.msmq
transport. Its major purpose is to hold properties of the registered channel and activate the channel for incoming messages from the queue. The class is initiated when the SoapReceiver
for the specified EndpointReference
address has been added to the collection of the Receivers. Note that the SoapMsmqInputChannel
has to be unique for this address. The number for worker threads is configurable.
The SoapMsmqInputChannel
overrides a base Enqueue
method to allow synchronous forwarding of a SoapEnvelope
message to the Receiver's handler without its queuing in-memory queue.
#region References
using System;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Security.Permissions;
using System.Threading;
using System.Messaging;
using Microsoft.Web.Services2;
using Microsoft.Web.Services2.Addressing;
using Microsoft.Web.Services2.Messaging;
#endregion
namespace RKiss.WseTransports
{
public sealed class SoapMsmqInputChannel : SoapInputChannel
{
#region Private members
SoapMsmqTransport _transport = null;
EndpointReference _endpoint = null;
MessageQueue _InpQueue = new MessageQueue();
#endregion
#region Constructors
internal SoapMsmqInputChannel(EndpointReference endpoint,
SoapMsmqTransport transport, string strQueuePath) : base(endpoint)
{
try
{
_transport = transport;
_endpoint = endpoint;
_InpQueue.Path = strQueuePath;
_InpQueue.MessageReadPropertyFilter.Label = true;
_InpQueue.MessageReadPropertyFilter.ResponseQueue = true;
_InpQueue.MessageReadPropertyFilter.TransactionStatusQueue = true;
_InpQueue.MessageReadPropertyFilter.TransactionId = true;
_InpQueue.MessageReadPropertyFilter.IsFirstInTransaction = true;
_InpQueue.MessageReadPropertyFilter.IsLastInTransaction = true;
_InpQueue.MessageReadPropertyFilter.UseDeadLetterQueue = true;
_InpQueue.PeekCompleted +=
new PeekCompletedEventHandler(_transport.PeekCompleted);
for(int ii = 0; ii < transport.Options.NumberOfThreads; ii++)
{
_InpQueue.BeginPeek();
}
}
catch(Exception ex)
{
throw new Exception(string.Format("Creating InputChannel " +
"for endpoint {0} failed.", endpoint.Via.Value), ex);
}
}
#endregion
#region ISoapChannel Overrides
public override SoapChannelCapabilities Capabilities
{
get { return SoapChannelCapabilities.ActivelyListening; }
}
public override void Close()
{
if(!Closed)
{
base.Close();
_InpQueue.Close();
_transport.CloseInputChannel(this);
Trace.WriteLine(string.Format("[{0}].Close channel done. Endpoint={1}",
_transport.UriScheme, _endpoint.Address.Value));
}
}
public override void Enqueue(SoapEnvelope message)
{
SoapReceiver receiver =
SoapReceivers.Receiver(_endpoint) as SoapReceiver;
if(receiver != null)
{
receiver.ProcessMessage(message);
}
else
throw new Exception(string.Format("Missing a " +
"receiver for endpoint={0}", _endpoint.Address.Value));
}
#endregion
}
}
Transport.cs
This file represents a core of the soap.msmq
transport. All the custom plumbing is dependent on this SoapMsmqTransport
class. There are two constructors, the default one is used for programmatic setup and the other one for the CONFIG file. The class has implemented an ISoapTransport
interface contract to obtain an Input resp. Output channel based on the requested EndpointReference
address and capability.
The output channel is simple and straightforward - generic solution for any custom transport, but the other one - the input channel needs to setup the unique LocalEndpoint
address for this channel. Based on this address the Dispatcher will find the proper channel to force a received SoapEnvelope
. I decided to setup its Via
address to null
, because the MSMQ queue can be addressed by many different ways such as local, remote, direct, multicast, over HTTP(s), private, etc., so there is no unique format to identify its address described by Via
value. Secondly, the SoapEnvelope.Context.Addressing.Via
address is always null
through the SoapDimeFormatter
serializer.
The actual message workflow through its transport is processed in two methods - SendTo
and PeekCompleted
. Here is an interaction with a MSMQ messaging. The first method sends the SoapEnvelope
to the message queue based on the transport options. The implementation is simple, in the first step, the helper method QueuePathFromUri
is mapping the EndpointReference.Via
address to the physical queue path based on the MSMQ specification, then the SoapEnvelope
is serialized into the queue message and sent to the queue.
The incoming MSMQ messages are event driven. The I/O completion port waits for the message in the queue. When the message is arrived, the PeekCompleted
handler will manage all message processing in the application receiver handler in a synch manner. The message is deserialized to the SoapEnvelope
object, populates its context with additional info and then it will call the Dispatch
method to deliver a SoapEnvelope
to the endpoint through the WSE messaging infrastructure. Note again, that the Enquire
method of the Input channel has been overridden to bypass an in-memory queuing and delivers direct message to the handler.
In this version, any exception thrown by the receiver's handler is not propagated to the worker thread. So, the transaction of the MSMQ message is always committed - the message is pulled-up from the queue.
The PeekCompleted
handler skips non-SoapEnvelope messages and based on the configuration they can be collected in the error queue.
As I mentioned earlier, this class represents the transport core, so I am planning to make more test and improvement here such as additional options, DTC receiver and so on.
#region References
using System;
using System.Diagnostics;
using System.Collections;
using System.Collections.Specialized;
using System.Globalization;
using System.Reflection;
using System.Messaging;
using System.EnterpriseServices;
using System.Xml;
using System.IO;
using System.Threading;
using Microsoft.Web.Services2;
using Microsoft.Web.Services2.Diagnostics;
using Microsoft.Web.Services2.Addressing;
using Microsoft.Web.Services2.Messaging;
using Microsoft.Web.Services2.Referral;
#endregion
namespace RKiss.WseTransports
{
public sealed class SoapMSMQ
{
#region Public Constants
public const string TransactionId = "TransactionId";
public const string IsFirstInTransaction =
"IsFirstInTransaction";
public const string IsLastInTransaction =
"IsLastInTransaction";
public const string MessageQueueTransaction =
"MessageQueueTransaction";
public const string MessageLabel = "MessageLabel";
public const string Acknowledgment = "Acknowledgment"
#endregion
}
public sealed class SoapMsmqTransport : SoapTransport, ISoapTransport
{
#region Private members
readonly string _UriScheme = "soap.msmq";
ISoapFormatter _formatter = null;
SoapMsmqTransportOptions _options = null;
MessageQueue _OutQueue = new MessageQueue();
MessageQueue _AdminQueue = new MessageQueue();
MessageQueue _ResponseQueue = new MessageQueue();
MessageQueue _ErrorQueue = new MessageQueue();
#endregion
#region Properties
public string UriScheme
{
get { return _UriScheme; }
}
public SoapMsmqTransportOptions Options
{
get { return _options; }
set { _options = value; }
}
public ISoapFormatter Formatter
{
get { return _formatter; }
set { _formatter = value; }
}
#endregion
#region constructors
public SoapMsmqTransport() : this(null)
{
}
public SoapMsmqTransport(XmlNodeList configData)
{
Formatter = new SoapDimeFormatter();
Options = new SoapMsmqTransportOptions();
_OutQueue.Formatter = new BinaryMessageFormatter();
if(configData != null)
{
string val = null;
foreach(XmlNode node in configData)
{
XmlElement child = node as XmlElement;
if(child != null)
{
switch (child.LocalName)
{
case "sender":
{
#region sender
foreach(XmlNode sender in child)
{
XmlElement elem = sender as XmlElement;
if(elem != null)
{
if(elem.LocalName == "toBeReceived")
{
val = elem.GetAttribute("timeInSec");
if(val != "") Options.TimeToBeReceived =
Convert.ToInt32(val);
}
else if(elem.LocalName == "toBeReachQueue")
{
val = elem.GetAttribute("timeInSec");
if(val != "") Options.TimeToReachQueue =
Convert.ToInt32(val);
}
else if(elem.LocalName == "adminQueue")
{
val = elem.GetAttribute("path");
if(val != "") Options.AdminQueuePath = val;
}
else if(elem.LocalName == "responseQueue")
{
val = elem.GetAttribute("path");
if(val != "") Options.ResponseQueuePath = val;
}
else if(elem.LocalName == "useDeadQueue")
{
val = elem.GetAttribute("enabled");
if(val != "") Options.UseDeadLetterQueue =
Convert.ToBoolean(val);
}
}
}
break;
#endregion
}
case "receiver":
{
#region receiver
foreach(XmlNode receiver in child)
{
XmlElement elem = receiver as XmlElement;
if(elem != null)
{
if(elem.LocalName == "retry")
{
val = elem.GetAttribute("timeInSec");
if(val != "") Options.TimeToBeReceived =
Convert.ToInt32(val);
}
else if(elem.LocalName == "errorQueue")
{
val = elem.GetAttribute("path");
if(val != "") Options.ErrorQueuePath = val;
}
else if(elem.LocalName == "workerThreads")
{
val = elem.GetAttribute("number");
if(val != "") Options.NumberOfThreads =
Convert.ToInt32(val);
}
}
}
break;
#endregion
}
case "formatter":
{
val = child.GetAttribute("type");
break;
}
case "transactional":
{
val = child.GetAttribute("enabled");
if(val != null) Options.Transactional =
Convert.ToBoolean(val);
break;
}
default:
break;
}
}
}
}
AppDomain.CurrentDomain.ProcessExit += new EventHandler(OnProcessExit);
}
#endregion
#region ISoapTransport
public ISoapOutputChannel GetOutputChannel(EndpointReference endpoint,
SoapChannelCapabilities capabilities)
{
#region validate arguments
if(endpoint.TransportAddress.Scheme != UriScheme)
throw new ArgumentException("The transport scheme for the " +
"specified endpoint does not match this transport.",
"endpoint");
if(capabilities != SoapChannelCapabilities.None)
throw new NotSupportedException("Unsupported " +
"SoapChannelCapabilities flags. Use SoapChannelCapabilities.None.");
#endregion
return new SoapMsmqOutputChannel(endpoint, this);
}
public ISoapInputChannel GetInputChannel(EndpointReference endpoint,
SoapChannelCapabilities capabilities)
{
#region validate arguments
if(endpoint == null)
throw new ArgumentNullException("endpoint");
if(capabilities == SoapChannelCapabilities.ActivelyListening &&
endpoint.TransportAddress.Scheme != UriScheme)
throw new ArgumentException("Invalid Transport " +
"Scheme specified");
if(capabilities != SoapChannelCapabilities.None &&
capabilities != SoapChannelCapabilities.ActivelyListening)
throw new NotSupportedException("Unsupported " +
"SoapChannelCapabilities Flags");
#endregion
string strQueuePath = QueuePathFromUri(endpoint);
lock(InputChannels.SyncRoot )
{
SoapMsmqInputChannel channel =
InputChannels[endpoint] as SoapMsmqInputChannel;
if(channel == null)
{
channel = new SoapMsmqInputChannel(endpoint, this,
strQueuePath);
channel.LocalEndpoint.Via = null;
InputChannels.Add(channel);
}
return channel;
}
}
#endregion
#region SendTo
public void SendTo(SoapEnvelope message, EndpointReference endpoint)
{
Message outMsg = new Message();
string strQueuePath = "";
string label = "";
try
{
#region write soap message into the queue
strQueuePath = QueuePathFromUri(endpoint);
_OutQueue.Path = strQueuePath;
Formatter.Serialize(message, outMsg.BodyStream);
if(Options.TimeToBeReceived > 0)
outMsg.TimeToBeReceived =
TimeSpan.FromSeconds(Options.TimeToBeReceived);
if(Options.TimeToReachQueue > 0)
outMsg.TimeToReachQueue =
TimeSpan.FromSeconds(Options.TimeToReachQueue);
outMsg.UseDeadLetterQueue = Options.UseDeadLetterQueue;
if(Options.AdminQueuePath != "")
{
_AdminQueue.Path = Options.AdminQueuePath;
outMsg.AcknowledgeType = AcknowledgeTypes.NegativeReceive |
AcknowledgeTypes.NotAcknowledgeReachQueue;
outMsg.AdministrationQueue = _AdminQueue;
}
if(Options.ResponseQueuePath != "")
{
_ResponseQueue.Path = Options.ResponseQueuePath;
outMsg.ResponseQueue = _ResponseQueue;
}
if(message.Context[SoapMSMQ.MessageLabel] != null)
label =
Convert.ToString(message.Context[SoapMSMQ.MessageLabel]);
else
label = string.Concat(endpoint.TransportAddress.AbsoluteUri,
"/", endpoint.Address.Value.AbsolutePath);
if(ContextUtil.IsInTransaction == true)
{
_OutQueue.Send(outMsg, label,
MessageQueueTransactionType.Automatic);
}
else if(strQueuePath.ToLower().StartsWith("formatname:multicast"))
{
_OutQueue.Send(outMsg, label);
}
else if(message.Context[SoapMSMQ.MessageQueueTransaction] != null)
{
_OutQueue.Send(outMsg, label,
message.Context[SoapMSMQ.MessageQueueTransaction] as
MessageQueueTransaction);
}
else if(Options.Transactional)
{
using(MessageQueueTransaction mqtx =
new MessageQueueTransaction())
{
mqtx.Begin();
_OutQueue.Send(outMsg, label, mqtx);
mqtx.Commit();
}
}
else
{
_OutQueue.Send(outMsg, label);
}
#endregion
Trace.WriteLine(string.Format("[{0}].SendTo done: {1}",
UriScheme, message.OuterXml));
}
catch(Exception ex)
{
string strError = string.Format("[{0}].SendTo error = {1},
epr={2}, qpath={3}",
UriScheme, ex.Message,
endpoint.Address.Value.AbsoluteUri,
strQueuePath);
throw new Exception(strError);
}
finally
{
#region clean-up
msgId = {1}", UriScheme, outMsg.Id));
// }
if(outMsg != null)
outMsg.Dispose();
#endregion
}
}
#endregion
#region PeekCompleted
internal void PeekCompleted(object sender,
PeekCompletedEventArgs asyncResult)
{
// method state
bool done = false;
Exception exception = null;
Message message = null;
MessageQueue mq = sender as MessageQueue;
MessageQueueTransaction mqtx =
new MessageQueueTransaction();
try
{
// Ending the asynchronous peek operation.
mq.EndPeek(asyncResult.AsyncResult);
// check if this queue is transactional
if(mq.Transactional)
mqtx.Begin();
// receive message from the queue
message = mq.Receive(new TimeSpan(1000), mqtx);
// deserialize message into the soapmessage
SoapEnvelope envelope =
Formatter.Deserialize(message.BodyStream);
// insert a transaction info into the Context
// for receiver purposes
envelope.Context.Add(SoapMSMQ.MessageLabel,
message.Label);
envelope.Context.Add(SoapMSMQ.MessageQueueTransaction,
mqtx);
envelope.Context.Add(SoapMSMQ.TransactionId,
message.TransactionId);
envelope.Context.Add(SoapMSMQ.IsFirstInTransaction,
message.IsFirstInTransaction);
envelope.Context.Add(SoapMSMQ.IsLastInTransaction,
message.IsLastInTransaction);
envelope.Context.Add(SoapMSMQ.Acknowledgment,
message.Acknowledgment);
// dispatch soapmessage to the local
// endpoint (no exception!)
done = DispatchMessage(envelope);
// echo
Trace.WriteLine(string.Format("[{0}].PeekCompleted {1}: {2}",
UriScheme, done ? "Delivered" : "NonDelivered",
envelope.OuterXml));
// dispatch a fault message to the specified FaultTo address
if(done == false && envelope.Fault == null &&
envelope.Context.Addressing.FaultTo != null &&
((envelope.Context.Addressing.RelatesTo == null ) ||
(envelope.Context.Addressing.RelatesTo.RelationshipType !=
WSAddressing.AttributeValues.Reply )))
{
// deliver fault message
AddressingFault faultAddr =
new AddressingFault(AddressingFault.DestinationUnreachableMessage,
AddressingFault.DestinationUnreachableCode);
SoapEnvelope faultMessage = GetFaultMessage(envelope, faultAddr);
ISoapOutputChannel outChannel =
SoapTransport.StaticGetOutputChannel(
envelope.Context.Addressing.FaultTo);
outChannel.Send(faultMessage);
Trace.WriteLine(string.Format("[{0}].PeekCompleted - " +
"Dispatched fault messages: {1}",
UriScheme, faultMessage.OuterXml));
}
}
catch(MessageQueueException ex)
{
if(ex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
{
// skip this error, it can be happen for multiple receivers
Trace.WriteLine(string.Format("[{0}].PeekCompleted - " +
"IOTimeout info (no message in queue)", UriScheme));
}
else
{
exception = ex;
throw;
}
}
catch(Exception ex)
{
exception = ex;
#region option: send queued message (any) into the error queue
if(Options.ErrorQueuePath != "" && message != null)
{
try
{
string label = string.Concat(message.Label, ",
Error=", ex.Message);
_ErrorQueue.Path = Options.ErrorQueuePath;
_ErrorQueue.Send(message, label, mqtx);
}
catch(Exception ex2)
{
string strErrMsg =
string.Format("[{0}]PeekCompleted Failure" +
" at ErrorQueue.Send: {1}", UriScheme,
ex2.ToString());
Trace.WriteLine(strErrMsg);
Microsoft.Web.Services2.Diagnostics.EventLog.WriteError(
strErrMsg);
}
}
#endregion
}
finally
{
// close a pending transaction (there is no abort!)
if(mqtx.Status == MessageQueueTransactionStatus.Pending)
mqtx.Commit();
// exception handling
if(exception != null)
{
// publish error
string strErrMsg =
string.Format("[{0}]PeekCompleted Failure: {1}",
UriScheme, exception.ToString());
Trace.WriteLine(strErrMsg);
Microsoft.Web.Services2.Diagnostics.EventLog.WriteError(
strErrMsg);
}
// clean-up
if(message != null)
message.Dispose();
// peek a next message
mq.BeginPeek();
}
}
#endregion
#region QueuePathFromUri
internal string QueuePathFromUri(EndpointReference epr)
{
string qpath = null;
if(epr.TransportAddress.Authority == ".")
{
// local queue (the most frequently path)
qpath = string.Concat(".",
epr.TransportAddress.AbsolutePath.Replace("/", "\\"));
}
else
{
string[] ipform = epr.TransportAddress.Authority.Split('.');
if(ipform.Length == 4)
{
if(ipform[0].CompareTo("223") > 0 &&
ipform[0].CompareTo("240") < 0)
{
// multicasting (no transactional!)
qpath = string.Concat("FormatName:MULTICAST=",
epr.TransportAddress.Authority);
}
else if(epr.TransportAddress.AbsolutePath.StartsWith("/private$/"))
{
// private queue addressed by direct
// format using the ip address
qpath = string.Concat("FormatName:DIRECT=tcp:",
epr.TransportAddress.Authority,
epr.TransportAddress.AbsolutePath.Replace("/", "\\"));
}
else
{
// public queue
qpath = string.Concat(epr.TransportAddress.Authority,
epr.TransportAddress.AbsolutePath.Replace("/", "\\"));
}
}
else
{
if(epr.TransportAddress.Authority.StartsWith("http"))
{
// over Internet
qpath = string.Concat("FormatName:DIRECT=",
epr.TransportAddress.Authority, ":",
epr.TransportAddress.AbsolutePath);
}
else if(epr.TransportAddress.AbsolutePath.StartsWith("/private$/"))
{
// private queue addressed by direct format
// using the ip address
qpath = string.Concat("FormatName:DIRECT=OS:",
epr.TransportAddress.Authority,
epr.TransportAddress.AbsolutePath.Replace("/", "\\"));
}
else
{
// public queue
qpath = string.Concat(epr.TransportAddress.Authority,
epr.TransportAddress.AbsolutePath.Replace("/", "\\"));
}
}
}
return qpath;
}
#endregion
#region Onxxxx
void OnProcessExit(object sender, EventArgs args)
{
// close all resources
Trace.WriteLine(string.Format("[{0}].OnProcessExit", UriScheme));
//Close all resources
_OutQueue.Close();
_AdminQueue.Close();
_ResponseQueue.Close();
_ErrorQueue.Close();
// close all msmq input channels
lock(InputChannels.SyncRoot)
{
HybridDictionary hd = InputChannels.SyncRoot as HybridDictionary;
if(hd != null)
{
//walk through all registered input channels
foreach(IList item in hd.Values)
{
foreach(SoapInputChannel ch in item)
{
if(ch is SoapMsmqInputChannel)
ch.Close();
}
}
}
}
}
#endregion
#region CloseInputChannel
internal void CloseInputChannel(SoapMsmqInputChannel channel)
{
lock(InputChannels.SyncRoot)
{
if(InputChannels.Contains(channel))
InputChannels.Remove(channel);
}
}
#endregion
}
}
The soap.msmq
transport is ready to use after its assembly has been inserted into the GAC. For test purposes, I have included a simple client and server to evaluate the transport for different addresses and its behavior. There are many combinations to perform a specific test. The test requires creating the following queues in prior: ReqChannel
(Tx), ReqChannel2
(Tx), AdminChannel
(nonTx).
Run the server and then the client (or opposite) console programs to see action on the server console. Then modify the client resp. server and run again.
Appendix A - SendRequestResponse
This feature has been added in version 1.1. The soap.msmq
transport is mostly used for a fire & forget asynchronous communication model. Each request represents a OneWay communication (no ReplyTo endpoint) between the ends (client and service). The WSE2 infrastructure has built-in an implementation of the Request/Response workflow based on the one way communication and message correlation between the request/response messages. This feature can be useful for certain applications.
The following code snippet shows a usage of the soap.msmq
transport for downloading a file from the service.
The client side:
class MyClient : SoapClient
{
[SoapMethod("GetFile")]
public void GetFile(string name)
{
EndpointReference eprReplay = new EndpointReference(new Uri("urn:myClient"));
eprReplay.Via = new Uri(@"soap.msmq://./private$/reqchannel_ack");
SoapEnvelope request = new SoapEnvelope();
request.SetBodyObject(name);
request.Context.Addressing.ReplyTo = new ReplyTo(eprReplay);
SoapEnvelope response = base.SendRequestResponse("GetFile", request);
}
}
The service side:
class MyService : SoapService
{
[SoapMethod("GetFile")]
public SoapEnvelope GetFile(SoapEnvelope request)
{
SoapEnvelope response = new SoapEnvelope();
Attachment item = new Attachment("TestImage", "image/gif", @"server.exe");
response.Context.Attachments.Add(item);
return response;
}
}
What's the behind the SendRequestResponse
function?
The SendRequestResponse
function will create an asynch client object - SoapClientAynchResult
to start processing the request and response in the Begin/End pattern fashion in the output and input channels. The following code snippets show their implementation in the SoapMsmqOutputChannel
:
public override IAsyncResult BeginSend(SoapEnvelope message,
AsyncCallback callback, object state)
{
SendDelegate delegator = new SendDelegate(this._transport.SendTo);
return delegator.BeginInvoke(message, RemoteEndpoint, callback, state);
}
public override void EndSend(IAsyncResult result)
{
SoapClient client = result.AsyncState as SoapClient;
System.Runtime.Remoting.Messaging.AsyncResult aResult =
(System.Runtime.Remoting.Messaging.AsyncResult)result;
SendDelegate delegator = (SendDelegate)aResult.AsyncDelegate;
string cid = delegator.EndInvoke(result);
}
and the SoapMsmqInputChannel
:
public override IAsyncResult BeginReceive(AsyncCallback callback,
object state)
{
if(_capabilities == SoapChannelCapabilities.None)
{
TimeSpan timeout = (state != null && state is SoapClient) ?
TimeSpan.FromMilliseconds((state as SoapClient).Timeout) :
TimeSpan.FromSeconds(_transport.Options.TimeToBeReplied);
ReceiveByIdDelegate delegator =
new ReceiveByIdDelegate(this._transport.ReceiveById);
string cid =
_endpoint.ReferenceProperties.RelatesTo.Value.AbsolutePath +
"\\1234567";
return delegator.BeginInvoke(_InpQueue, cid, timeout,
callback, state);
}
return null;
}
public override SoapEnvelope EndReceive(IAsyncResult result)
{
System.Runtime.Remoting.Messaging.AsyncResult aResult =
(System.Runtime.Remoting.Messaging.AsyncResult)result;
ReceiveByIdDelegate delegator =
(ReceiveByIdDelegate)aResult.AsyncDelegate;
SoapEnvelope envelope = delegator.EndInvoke(result);
return envelope;
}
As you can see, the above BeginReceive
method delegates a task to the _transport.ReceiveById
method to retrieve a specific message from the queue. The message query is based on the message correlation ID and it has to be setup in the _transport.SendTo
method:
outMsg.CorrelationId =
message.Context.Addressing.RelatesTo.Value.AbsolutePath + "\\1234567";
To retrieve a message from the queue is a straightforward process supported by the messaging method - ReceiveByCorrelationId
. The following code snippet shows this process:
public SoapEnvelope ReceiveById(MessageQueue mq, string cid, TimeSpan timeout)
{
MessageQueueTransaction mqtx = new MessageQueueTransaction();
SoapEnvelope envelope = null;
Message message = null;
try
{
mq.MessageReadPropertyFilter.CorrelationId = true;
if(mq.Transactional)
mqtx.Begin();
message = mq.ReceiveByCorrelationId(cid, timeout, mqtx);
envelope = Formatter.Deserialize(message.BodyStream);
envelope.Context.Add(SoapMSMQ.MessageLabel, message.Label);
envelope.Context.Add(SoapMSMQ.Acknowledgment, message.Acknowledgment);
}
catch(Exception ex)
{
Trace.WriteLine(string.Format("[{0}].ReceiveById failed: {1}",
UriScheme, ex.Message));
}
finally
{
if(mqtx.Status == MessageQueueTransactionStatus.Pending)
mqtx.Commit();
if(message != null)
message.Dispose();
}
return envelope;
}
Note that the SendRequestResponse
feature in the soap.msmq
transport is controlled by the WSE2 infrastructure, so the following issues should be taken care of in its usage:
- The
ReplyTo
transport has to be the same as the client's destination transport scheme. - The
service
method should use the signature with the SoapEnvelope
object in the case of void
return value, otherwise the service will handle the request as a OneWay attributed method. - The
SoapClient.Timeout
value should be the same as the transport.Options.TimeToBeReplied
value. The WSE2 infrastructure breaks the state in the OnSendComplete
handler (passing a value null
to the state of the OnReceiveComplete
handler).
Also, I made in this version a workaround in the _transport.PeekCompleted
function to allow dispatching messages without blocking the input channels in the DisptachMessage
method.
In this article I have described a WSE Messaging Custom Transport over MSMQ. Using this UriScheme in your connectivity model enables a new dimension of communication features such as loosely coupled disconnect able services, clients, etc. I hope you will enjoy it.
- 06/06/04
- Version 1.0.0.0 - Initial revision.
- 07/07/04
- Version 1.1.0.0 - New feature -
SendRequestResponse
support.
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.