Click here to Skip to main content
15,867,895 members
Articles / Programming Languages / C#

WCF: Duplex MSMQ

Rate me:
Please Sign up or sign in to vote.
4.96/5 (29 votes)
3 Sep 2009Ms-PL11 min read 146.9K   2K   97   58
How to use duplex communication over MSMQ bindings.

Image 1

Introduction

Today, I'll talk about a solution to enable duplex communication with netMsmqBinding in WCF.

The good news is that my solution can enable duplex communication with any binding which supports IOutputChannel/IInputChannel.

In plain English, it means that if a binding can send and receive a message in OneWay, it can also be used for duplex communication, i.e., send messages back to the client from the server with callbacks. Yes, you want and you will be able to use duplex communication over Twitter. I will talk about my project WCF over Twitter in another article to learn more about WCF!

If you just want to use duplex MSMQ, go here to download the project: http://duplexmsmq.codeplex.com/; an example is included.

SOAP Plumbing

First, some people will say: "you could use CompositeDuplexBindingElement", I won't re-explain why it won't work, this blog article explains the problem (Thanks to Mike Taulty).

He found a solution to make MSMQ with duplex communication to work, but it requires a reliable session. The problem with reliable sessions is that "WCF supports reliable sessions between endpoints that are active and alive at the same time". And when we use MSMQ, we want to be able to send messages to the service even if it is offline, and we want the service to be able to send messages to a client even if it isn't connected. So it's not a good solution.

But what you should find interesting is how I manage to make it work without a reliable session! And maybe, why it works only with a reliable session.

First of all, I will explain at the SOAP level how duplex communication works.

I'll use Visio with this Visio stencil "that contains 51 integration pattern icons as Visio shapes".

Quickly, in WCF, a Channel is something which can receive or send messages. There are several types of channels, but I will only talk about IInputChannel/IOutputChannel. These channels are the ones used by netMsmqBinding, and it means that a IOutputChannel only supports sending a message in OneWay to another IInputChannel. The server listens to a IInputChannel, and the client sends a message with the IOutputChannel. Obviously, the concrete implementations of these channels with netMsmqBinding uses MSMQ.

If we want to be able to send a message back to the client in a duplex scenario, the client needs to expose a IInputChannel and the service needs to use a IOutputChannel.

In WCF, IInputChannel + IOutputChannel = IDuplexChannel, and it is the responsibility of CompositeDuplexBindingElement shipped with WCF, to transform a pair of IInputChannel/IOutputChannel to a IDuplexChannel. We will come back here later.

First, the client needs to say to the server where to send a reply, like you can see in the figure below. This is done through some extra elements in the SOAP headers of every message.

Image 2

And, when the server responds, it need to specify the return address.

Image 3

So my goal was:

  • The client must create a IInputChannel from netMsmqBinding to receive messages from the server.
  • Attach the address of this IInputChannel to the return address of every message sent to the server.
  • The server must create a IOutputChannel from netMsmqBinding to send messages to the client.
  • Attach the address of the IInputChannel to the return address of every message sent to the client.

You can see that my solution is not coupled with MSMQ, and can be used with every binding which supports IInputChannel/IOutputChannel.

Well, first, I will explain why the solution of Mike Taulty worked with a reliable session, and why it's a bad solution. The trick is that the return address is sent to the server with the first message sent from the client to the server. The server creates a session for the client and stores the return address in memory. Then, the client sends, in every SOAP header, its session ID without the return address. Obviously, if the server crashes, the session is lost, and the messages of the client are dropped, or sent to a dead queue.

So, how can I extend WCF to solve this problem? I will create my own BindingElement, but we need to understand what a binding is.

WCF Plumbing

Image 4

When I think about a binding, I see hamburgers. And in reality, there are two types of burgers: those made by your favorite fast-food-chain, and those from your mama/wife/yourself made with love and care. The ingredients of a binding are BindingElements.

Let's talk about fastfood first. The benefit of the built-in bindings of WCF is that they are really easy to understand and create. And well... it's fast-food, so you don't know what really is inside, but they have a name and somehow you guess there are some BindingElements inside.

For example, with a WSHttpBinding, you guess that there is a HttpTransportBindingElement inside, and you somehow guess that it sends a SOAP message over HTTP. You guess that this code snippet activates a reliable session, but you don't know exactly which ingredient or "BindingElement" it impacts.

C#
wsHttpBinding.ReliableSession.Enabled = true;

The good thing is that in most cases, you don't have to know what is going on inside. Microsoft did a great job to satisfy most common use cases. When you pass the binding to your service host or channel factory, it will call:

C#
binding.CreateBindingElements()

to create all the BindingElements based on the properties you have set on the built-in Binding.

But sometimes, you need to specify yourself which ingredients or "BindingElements" you will add to your Binding. This is called home made binding, and there are to ways to create one:

And I'll use the latter one, because this way, it's most easy to explain how a Binding works.

A CustomBinding is just a binding where you directly specify all the BindingElements inside. For example:

C#
binding = new CustomBinding(
    new ListenUriBindingElement()
    {
        ListenUriBaseAddress = baseClient
    },
    new CompositeDuplexBindingElement(),
    new ReplyToBindingElement(),
    new TextMessageEncodingBindingElement(),
    CreateMsmqBinding());

This creates a Binding with a ListenUriBindingElement, CompositeDuplexBindingElement, ReplyToBindingElement, TextMessageEncodingBindingElement, and a MsmqTransportBindingElement (its the return type of CreateMsmqBinding()). This enumeration is called "BindingElement stack"; the last BindingElement is the lowest in the stack, and it's always a TransportBindingElement.

Most of the time, the BindingElement stack is the same in the server and client side. Each BindingElement can decide to modify, inspect, or validate a message, or they can pass parameters to BindingElement down in the BindingElement stack. For example, when you choose message security for your binding, internally, WCF creates binding elements which encode/decode a message with a certificate or something else. The server and the client having the same stack; the BindingElement will encode a message sent on the client side and decode it on the server side.

Another example, when you choose to encode your message in plain text TextMessageEncodingBindingElement, it doesn't intercept any message, but it passes itself as a parameter down in the stack.

C#
context.BindingParameters.Add(this);

This way, the TransportBindingElement (the lowest element in the stack) can retrieve the MessageEncodingBindingElement to know how to serialize and deserialize a Message to a stream of bytes. Each BindingElement says clearly if it can build a channel type with CanBuildChannelListener<TChannel> and CanBuildChannelFactory<TChannel>. And, each BindingElement has a BindingContext during the creation of their "channel managers" to pass parameters down to the stack, or to construct "channel managers" of the BindingElement directly below in the stack.

A channel manager is a IChannelFactory or IChannelListenner; BuildChannelListener is called on the server side and BuildChannelFactory is called on the client side.

C#
IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context)
IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context)

Most of the time, intermediate bindings are not interested to create a channel manager, so they just delegate the call down to the stack, as TextMessageEncodingBindingElement does.

C#
return context.BuildInnerChannelFactory<TChannel>();

In fact, this code just calls the BuildChannelFactory of the BindingElement directly below the current BindingElement.

But, if you want to intercept and modify messages, you need to wrap the inner channel manager inside a custom channel manager. What I call the "inner channel manager" is the channel manager created by the BindingElement below the current one.

This figure explains how channel managers are created. It assumes that BindingElement1 and BindingElement2 want to intercept messages, so they wrap the inner channel managers. TextMessageEncodingBindingElement doesn't need to intercept messages, so it doesn't wrap the channel manager returned by TransportBindingElement1.

Image 5

Here is an example of a binding element which wants to intercept messages; it's the Decorator pattern.

C#
public override IChannelFactory<TChannel> 
       BuildChannelFactory<TChannel>(BindingContext context)
{
    if(!CanBuildChannelFactory<TChannel>(context))
    {
        throw new ArgumentException("Impossible to build ReplyToBindingElement");
    }
    var innerChannelFactory = (IChannelFactory<IOutputChannel>)
             context.BuildInnerChannelFactory<TChannel>();
    return (IChannelFactory<TChannel>)new ReplyToChannelFactory(
                    context.ListenUriBaseAddress, innerChannelFactory);
}

My implementation of ReplyToChannelFactory will intercept calls destined to the innerChannel. Most calls are just delegated to the innerChannelFactory. We will see the implementation later. The class ServiceHost or ChannelFactory<ServiceType> will use the channel manager returned by the top most binding element to send or receive messages. Please do not confuse ChannelFactory<ServiceType> with IChannelFactory<TChannel>; ChannelFactory<ServiceType> is an implementation of this interface, but it is primarily used to create a client proxy, and is not returned by a BindingElement.

ChannelFactory<ServiceType> is just a class which serializes and deserializes the body of the SOAP message based on the method and parameter you call on a client proxy, then passes the message to a channel created by the top-most IChannelFactory (created by your top most BindingElement). ServiceHost does the same thing, just replace IChannelFactory by IChannelListener.

Here is the creation of the channel; note that IChannelFactories also uses the Decorator pattern to create channels.

Image 6

Then, when you call a method on the proxy, a message is created and the Send method of the top most channel is called:

Image 7

Duplex MSMQ Implementation

OK, now, if you have understood what I said above, you'll think that the implementation will be a piece of cake, and it really is! There is a deep learning curve to understand how to make it work, but once understood, you'll think that WCF is easily extensible and well designed. You'll see that it is not so difficult after all. The more I learn about WCF, the more I have project ideas (cool ones and very bad ones...)

First, I created helper classes called ProxyXXXX : XXXX, where XXXX is an interface on which I want to intercept calls. ProxyXXXX will take an inner XXXX and forward all calls to it.

Every method, property, and event of ProxyXXXX is virtual, so if I want to intercept a call, I inherit ProxyXXXX and override a class member.

For example, here, what I do inside the ProxyOutputChannel, my channel will override the Send method to intercept the message:

C#
public virtual void Send(System.ServiceModel.Channels.Message message, 
                         System.TimeSpan timeout)
{
    _InnerOutputChannel.Send(message, timeout);
}

ReplyToBindingElement is responsible to add headers in the output messages to enable duplex communication. So, it wraps every inner ChannelFactory.

C#
public override IChannelFactory<TChannel> 
       BuildChannelFactory<TChannel>(BindingContext context)
{
    if(!CanBuildChannelFactory<TChannel>(context))
    {
        throw new ArgumentException("Impossible to build ReplyToBindingElement");
    }
    var innerChannel = (IChannelFactory<IOutputChannel>)
                          context.BuildInnerChannelFactory<TChannel>();
    return (IChannelFactory<TChannel>)
       new ReplyToChannelFactory(context.ListenUriBaseAddress, innerChannel);
}

public override IChannelListener<TChannel> 
          BuildChannelListener<TChannel>(BindingContext context)
{
    return base.BuildChannelListener<TChannel>(context);
}

public override bool CanBuildChannelFactory<TChannel>(BindingContext context)
{
    return typeof(TChannel) == typeof(IOutputChannel);
}

Then ReplyToChannelFactory wraps every inner channel:

C#
public class ReplyToChannelFactory : ProxyChannelFactory<IOutputChannel>
{
    readonly Uri _ReplyAddress;
    public ReplyToChannelFactory(Uri replyAddress, IChannelFactory<IOutputChannel> inner)
        : base(inner)
    {
        _ReplyAddress = replyAddress;
    }
    public override IOutputChannel CreateChannel(System.ServiceModel.EndpointAddress to)
    {
        return new ReplyToChannel(_ReplyAddress, base.CreateChannel(to));
    }
    public override IOutputChannel 
           CreateChannel(System.ServiceModel.EndpointAddress to, Uri via)
    {
        return new ReplyToChannel(_ReplyAddress, base.CreateChannel(to, via));
    }
}

And, the channel adds all the necessary headers to the outgoing message:

C#
public class ReplyToChannel : ProxyOutputChannel
{
    readonly Uri _ReplyAddress;
    public ReplyToChannel(Uri replyAddress, IOutputChannel inner)
        : base(inner)
    {
        _ReplyAddress = replyAddress;
    }
    public override void Send(Message message)
    {
        ApplyReplyTo(message);
        base.Send(message);
    }
    public override void Send(Message message, TimeSpan timeout)
    {
        ApplyReplyTo(message);

        base.Send(message, timeout);
    }

    void ApplyReplyTo(Message message)
    {
        if(message.Headers.MessageId == null)
        {
            message.Headers.MessageId = new System.Xml.UniqueId();
        }
        if(message.Headers.From == null)
        {
            message.Headers.From = 
              new System.ServiceModel.EndpointAddress(_ReplyAddress);
        }
        if(message.Headers.ReplyTo == null)
        {
            message.Headers.ReplyTo = 
              new System.ServiceModel.EndpointAddress(_ReplyAddress);
        }
    }
}

Here is my service implementation and contract:

C#
[ServiceContract(CallbackContract = typeof(IConversation))]
public interface IConversation
{
    [OperationContract(IsOneWay = true)]
    void Say(String something);
}

public class Conversation : IConversation
{
    #region IConversation Members

    public void Say(string something)
    {
        Console.WriteLine("Someone says \"{0}\" to you, what is your response ?", 
                          something);
        String response = Console.ReadLine();
        OperationContext.Current.GetCallbackChannel<IConversation>().Say(response);
    }

    #endregion
}

But in the server side, an exception is thrown when attempting to send a message in the callback channel. I don't know why, but it seems that WCF doesn't automatically set the To header of the outgoing message to the address of the ReplyTo header of the ingoing message... I have no idea why I need to do that only on the server side, and not on the client side, and I'll be happy if someone can explain that to me... So, I needed to create a behavior with a message inspector which fixes the problem.

C#
public class ReplyToBehavior : IEndpointBehavior
{
    public class ReplyToInspector : IDispatchMessageInspector
    {
        #region IDispatchMessageInspector Members

        public object AfterReceiveRequest(ref Message request, 
                      IClientChannel channel, InstanceContext instanceContext)
        {
            var reply = request.Headers.ReplyTo;
            OperationContext.Current.OutgoingMessageHeaders.To = reply.Uri;
            OperationContext.Current.OutgoingMessageHeaders.RelatesTo = 
                                     request.Headers.MessageId;
            return null;
        }

        public void BeforeSendReply(ref Message reply, object correlationState)
        {
        }

        #endregion
    }


    #region IEndpointBehavior Members

    public void AddBindingParameters(ServiceEndpoint endpoint, 
                BindingParameterCollection bindingParameters)
    {
    }

    public void ApplyClientBehavior(ServiceEndpoint endpoint, 
           System.ServiceModel.Dispatcher.ClientRuntime clientRuntime)
    {
    }

    public void ApplyDispatchBehavior(ServiceEndpoint endpoint, 
                System.ServiceModel.Dispatcher.EndpointDispatcher endpointDispatcher)
    {
        endpointDispatcher.DispatchRuntime.MessageInspectors.Add(new ReplyToInspector());
    }

    public void Validate(ServiceEndpoint endpoint)
    {
    }

    #endregion
}

My implementation is done!

Two last things:

During the creation of the BindingElement stacks, I needed two things: first, a BindingElement for my client to specify the reply address of the messages, and the MSMQ queue to listen. That's is the responsibility of ListenUriBindingElement.

I've taken the one in this blog article. The implementation is not complicated, it just sets the context.ListenUriBaseAddress. BindingElements lower in the stack will need this value to know where to listen for input messages.

C#
public override IChannelListener<TChannel> 
       BuildChannelListener<TChannel>(BindingContext context)
{
    if(listenUriBaseAddress != null)
        context.ListenUriBaseAddress = listenUriBaseAddress;
    return base.BuildChannelListener<TChannel>(context);
}

I needed also to specify that my server can send messages, and that my client must listen to a MSMQ channel (this channel is specified with the context.ListenUriBaseAddress set by the previous BindingElement ListenUriBindingElement). ServiceHost will enable a service to send messages through a callback if the Binding can build an IDuplexChannel. Alas, that's not the case because one of the BindingElements in the stack doesn't support it: MsmqTransportBindingElement.CanBuildChannelListener<IDuplexChannel> returns false. So, I need a BindingElement which creates a IDuplexChannel on the top a channel which only accepts to create IInputChannel and IOuputChannel. And it's easy, you just need to use the CompositeDuplexBindingElement shipped with WCF.

This is how the ChannelFactory<ServiceType> creates the top-most IChannelFactory in my BindingElement stack; if you want to know how it works on the service side, just replace IChannelFactory by IChannelListener and ChannelFactory<ServiceType> by ServiceHost.

Image 8

We say that CompositeDuplexBindingElement change the shape of a channel. For more information, ReliableSessionBindingElement also changes the shape of a channel; it transforms a XXXChannel to a IXXXChannel to a IXXXSessionChannel to enable a sessionfull communication.

On the client side, I will use DuplexChannelFactory instead of ChannelFactory to specify the callback implementation. The same problem as the one with ServiceHost is solved with the same solution.

Client and Server creation:

Here is the creation of my BindingElement stack for the two sides:

C#
public class BindingFactory
{
    public static Binding Create(Uri baseClient)
    {
        CustomBinding binding = null;

        binding = new CustomBinding(
        new ListenUriBindingElement()
        {
            ListenUriBaseAddress = baseClient
        },
        new CompositeDuplexBindingElement(),
        new ReplyToBindingElement(),
        new TextMessageEncodingBindingElement(),
        CreateMsmqBinding());

        return binding;
    }

    private static MsmqTransportBindingElement CreateMsmqBinding()
    {
        var binding = new MsmqTransportBindingElement();
        binding.MsmqTransportSecurity.MsmqAuthenticationMode = 
                                      MsmqAuthenticationMode.None;
        binding.MsmqTransportSecurity.MsmqProtectionLevel = 
                         System.Net.Security.ProtectionLevel.None;
        binding.UseActiveDirectory = false;
        binding.ExactlyOnce = false;
        return binding;
    }
}

Obviously, the server side doesn't need ListenUriBindingElement, because ServiceHost will set context.ListenUriBaseAddress.

C#
public override IChannelListener<TChannel> 
       BuildChannelListener<TChannel>(BindingContext context)
{
    if(listenUriBaseAddress != null)
        context.ListenUriBaseAddress = listenUriBaseAddress;
    return base.BuildChannelListener<TChannel>(context);
}

To not override context.ListenUriBaseAddress, I will call:

C#
BindingFactory.Create(null)

on the server side.

Client creation:

C#
static void Main(string[] args)
{
    Console.WriteLine("Hit a key");
    Console.ReadLine();
    Binding bind = BindingFactory.Create(new Uri("net.msmq://SHIELDP/private/client"));
    DuplexChannelFactory<IConversation> channel = new DuplexChannelFactory<IConversation>
        (new Conversation(), 
        bind, 
        new EndpointAddress("net.msmq://SHIELDP/private/server"));
    channel.CreateChannel().Say("Hello I'm Nico");
    while(true)
    {
    }
}

Server creation:

C#
class Program
{
    static void Main(string[] args)
    {
        Binding bind = BindingFactory.Create(null);
        ServiceHost host = new ServiceHost(typeof(Conversation), 
                               new Uri("net.msmq://SHIELDP/private/server"));
        var endpoint = host.AddServiceEndpoint(typeof(IConversation), bind, "");
        endpoint.Behaviors.Add(new ReplyToBehavior());
        host.Open();
        Console.WriteLine("Open");
        while(true)
        {
        }
    }
}

Now, you can use callbacks from the client side and the server side through MSMQ.

Conclusion

I'm sure this project will be useful for people with high availability requirements. It's too bad that Microsoft didn't implement this; MSMQ is so well suited for duplex communication... I hope you've enjoyed this article. The next one will talk about WCF over Twitter, and will show how to create your own TransportBindingElement. If you liked this article, let me know! :)

License

This article, along with any associated source code and files, is licensed under The Microsoft Public License (Ms-PL)


Written By
Software Developer Freelance
France France
I am currently the CTO of Metaco, we are leveraging the Bitcoin Blockchain for delivering financial services.

I also developed a tool to make IaaS on Azure more easy to use IaaS Management Studio.

If you want to contact me, go this way Smile | :)

Comments and Discussions

 
QuestionCan it work over Internet? Pin
sunnydeng30-Aug-12 8:22
sunnydeng30-Aug-12 8:22 
AnswerRe: Can it work over Internet? Pin
Nicolas Dorier30-Aug-12 12:34
professionalNicolas Dorier30-Aug-12 12:34 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.