Click here to Skip to main content
15,896,207 members
Articles / Programming Languages / C#

A beginner's guide to queuing with WCF and MSMQ showing bi-directional client correspondance

Rate me:
Please Sign up or sign in to vote.
4.92/5 (9 votes)
2 Jan 2013CPOL24 min read 95.7K   3.3K   38  
This is a working example of clients in queued correspondence with a service, sending messages to it, recieving unsolicited messages from it while both client and service can queue messages to the other while the other is offline.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.ServiceModel;
using System.Transactions;  // For Transaction Scope
using System.Messaging;     //Access Messaging
using System.Runtime.Serialization; //DataContract
using GPH_QueuedMessageContract.ServiceReference1;

namespace NGPH_QueuedMessageContract
{
    // Publishing Data contract - the purpose of this DC is to build WCF comms specific parameters into he 
    // outbount message rather than include them as function parametes with the business logic
    [DataContract(Name = "PublishToDataContract")]
    public class PublishToDataContract
    {
        [DataMember]
        public string PublishToAddress { get; set; }
        [DataMember]
        public readonly string FaultAddress; // Future use
        [DataMember]
        public readonly string MethodId; // Future use
    }
    [ServiceContract(
    Name = "GPH_QueuedService"
    )]

    public interface IGPH_QueuedService
    {
        [OperationContract(IsOneWay = true)]
        void RegisterUser(string arg_Username);
        [OperationContract(IsOneWay = true)]
        void ReceiveMessage(string userName, List<string> addressList, string userMessage);
        [OperationContract(IsOneWay = true)]
        void RemoveUser(string arg_Username);
        [OperationContract(IsOneWay = true)]
        void ExposeContract(PublishToDataContract arg_publish_details);
    }
  


    public class CGPH_QueuedMessageContract : IGPH_QueuedService
    {
        // Populate this on the client side and return via the header.
        PublishToDataContract m_PublishToDetails = new PublishToDataContract();
        string m_message;
        EndpointAddress m_address;
        NetMsmqBinding m_Binding;
        private static List<string> m_SubscriberList = new List<string>();

        
        // current plan is for a dictionary of users and proxies where a proxy is created for each user
        // on registration and then used for any communication to the user.
        // The alternative is to hold the endpoint in the dictionary and create the proxy before each message
        // and destroy it afterwards
        private static Dictionary<string, GPH_QueuedMessageContract.ServiceReference1.GPH_InboundMessageHandlerClient> m_NotifyList =
        new Dictionary<string, GPH_QueuedMessageContract.ServiceReference1.GPH_InboundMessageHandlerClient>();// Default Constructor


        public CGPH_QueuedMessageContract()
        {
        }

        [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
        public void RegisterUser(string arg_Username)
        {
            Console.WriteLine("Got Message");
            try
            {
                m_PublishToDetails = OperationContext.Current.IncomingMessageHeaders.GetHeader<PublishToDataContract>(
                    "PublishToDataContract", "NGPH_QueuedMessageContract");
            }
            catch (Exception e)
            {
                Console.WriteLine("Exception [{0}] reading the header", e.Message);
                Console.WriteLine("Header 0: {0}",OperationContext.Current.IncomingMessageHeaders[0].ToString());
                return;
            }
            try
            {
                Console.WriteLine("Join Request from Queue: {0} via hdr passing addr: {1}", arg_Username, m_PublishToDetails.PublishToAddress);
                m_message = "Join Request from Queue: " 
                                 + arg_Username + " via address " + m_PublishToDetails.PublishToAddress;
            }
            catch (Exception e)
            {
                Console.WriteLine("Exception [{0}] displaying the header details", e.Message);
                return;
            }
            try
            {                
                m_Binding = new NetMsmqBinding();
                m_Binding.Security.Transport.MsmqAuthenticationMode = MsmqAuthenticationMode.None;
                m_Binding.Security.Transport.MsmqProtectionLevel = System.Net.Security.ProtectionLevel.None;
            }
            catch (Exception e)
            {
                Console.WriteLine("Exception [{0}] creating the binding", e.Message);
                return;
            }
            try
            {
                m_address = new EndpointAddress(m_PublishToDetails.PublishToAddress);
            }
            catch (Exception e)
            {
                Console.WriteLine("Exception [{0}] creating the endpoint", e.Message);
                return;
            }
            try
            {
                GPH_QueuedMessageContract.ServiceReference1.GPH_InboundMessageHandlerClient proxy;
                //Use an overload to supply the new response address. Store in a key / value table per client
                proxy = new GPH_QueuedMessageContract.ServiceReference1.GPH_InboundMessageHandlerClient(m_Binding, m_address);
                // Store the username and proxy in a lookup table
                m_NotifyList.Add(arg_Username, proxy);
                m_SubscriberList.Add(arg_Username);
                MessageDistributor(arg_Username, m_SubscriberList, " has joined the converstation",1);

            }
            catch (Exception e)
            {
                Console.WriteLine("Exception [{0}] passing message to proxy", e.Message);
                return;
            }
          
        }
        
        [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
        public void RemoveUser(string arg_Username)
        {
            Console.WriteLine("Got Removal Message from [{0}]", arg_Username);
            try
            {
                GPH_QueuedMessageContract.ServiceReference1.GPH_InboundMessageHandlerClient proxy = m_NotifyList[arg_Username]; 
                // Remove the username and proxy from the lookup table and subscriber list.
                m_NotifyList.Remove(arg_Username);
                m_SubscriberList.Remove(arg_Username);
                proxy.Close();
                MessageDistributor(arg_Username, m_SubscriberList, " has left the conversation",1);

            }
            catch (Exception e)
            {
                Console.WriteLine("Exception [{0}] passing message to proxy", e.Message);
                return;
            }
        }

        [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
        public void ReceiveMessage(string arg_userName, List<string> arg_addressList, string arg_userMessage)
        {
            Console.WriteLine("Message [{0}] from [{1}]", arg_userMessage, arg_userName);
            MessageDistributor(arg_userName, arg_addressList, arg_userMessage, 0);

        }

        public void MessageDistributor(string arg_userName, List<string> arg_addressList, string arg_userMessage, int arg_ind)
        {
            // 0 - message from user only
            // 1 - new distribution list with preset message from service
            Console.WriteLine("Message [{0}] from [{1}]", arg_userMessage, arg_userName);
            foreach (string tmpAddr in arg_addressList)
            {
                using (TransactionScope scope = new TransactionScope())
                {
                    try
                    {
                        GPH_QueuedMessageContract.ServiceReference1.GPH_InboundMessageHandlerClient proxy = m_NotifyList[tmpAddr];
                        // Write message to queue
                        try
                        {
                            if (arg_ind == 1)
                            {
                                string[] tmpSubscriberList = (string[])m_SubscriberList.ToArray();
                                proxy.OnRegistration(tmpSubscriberList); //Send back a list of current subscribers
                            }
                            //proxy.Open();
                            Console.WriteLine("Dispatching message to: [{0}]", proxy.Endpoint.Address);
                            proxy.OnInboundMessage(arg_userName, arg_userMessage);
                        }
                        catch (Exception e)
                        {
                            Console.WriteLine("Exception [{0}] passing message to proxy", e.Message);
                            return;
                        }
                        try
                        {
                            //proxy.Close();
                            scope.Complete();
                        }
                        catch (Exception e)
                        {
                            Console.WriteLine("Exception [{0}] closing proxy", e.Message);
                            return;
                        }
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine("Exception [{0}] retrieving proxy", e.Message);
                        return;
                    }
                } //Messages committed to queue here            
            }

        }
        public void ExposeContract(PublishToDataContract arg_publish_details)
        {
            // This apparently pointless function is required to ensure that PublishToDataContract
            // class is exposed to the client.
            ;
        }
    }

}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer
Ireland Ireland
My first program was written in Basic on a Sinclair Spectrum ZX 16K in the summer of '85. Having studied Computer Systems I attempted to break into the world of C but took a wrong turn and got immersed in COBOL!

I looked a C again in 1994 but didnt follow up on it. In 2001 I introduced myself to Visual C++ 6.0 courtesy of Ivor Hortons book, but found the going difficult. I tipped my toe in the .NET water in '05 but the first example I tried in VC++ 2005 express didnt work and allied with the absence of MFC in the express package, I parked that up.

Along the way my career got shunted into software testing

A personal machine change force me to migrate to VS2008 in 2008. The new edition of Ivor Hortons book for VC++ in VS2008 reintroduced me to .NET and I got curious whereupon I went out and acquired Stephen Fraser's "Pro Visual C++/CLI and
the .NET 3.5 Platform". I was hooked!

After 20 years I think I finally found my destination.

But it would take a further 8 years of exile before I was reappointed to a developer role. In that time I migrated to C# and used selenium wedriver (courtesy of Arun Motoori's Selenium By Arun) as the catalyst to finally grab the opportunity.

Comments and Discussions