Click here to Skip to main content
Click here to Skip to main content

Topic-based publish/subscribe design pattern implementation in c#-Part II (Using WCF).

, 23 Mar 2009
Rate this:
Please Sign up or sign in to vote.
Implementation of Topic based publish subscribe design pattern using WCF callback.

Introduction

This is Part 2 in a series of articles on writing Topic based publish/subscribe design pattern implementation in c#. This series shows publish/subscribe design pattern implementation in two approaches with the same sample program along with the general idea of publish/subscribe design pattern implementation. The approaches are using Socket programming and Using WCF. 

Topic-based publish/subscribe design pattern implementation in c#-Part I (Using Socket programming). 

Topic-based publish/subscribe design pattern implementation in c#-Part II (Using WCF).  

As we know, whatever technology (Socket/Remoting/WCF) we use to implement publish/subscribe design pattern, End result will be almost same. For this reason, at first each part in this series of articles discusses what are the general ideas to implement publish/subscribe design pattern, then it shows the implementation using a specific technology. Here publish/subscribe design pattern is implemented using  two different approaches for the same sample application so that you can compare each approach implementation with others and can pick one which best fits your requirements. The advantages and disadvantages of each approach are also discussed in each part. 

Can You Read Each Part Independently? 

Here you can read each part independently. To make each part independently readable, necessary information is repeated in each part.

Background

I have written an article about a year ago which discusses Non Topic based publish/subscribe design pattern implementation in c#. After that I got few requests to make a Topic based version of this. I gave them word, I will surely do this when I will get free time. Few days ago, I got some off days and started to change the implementation for Non Topic based to Topic based. But after seeing my code and article after 1 year, it seems to me the code is unnecessary lengthy and article is not enough informative. Then I started to write this two series article to make my work better. I have written this series of two part articles as a learning exercise, and I expect comments from you about the write up. Please let me know if you have any suggestion.

What is Topic based publish subscribe design pattern? 

 In a Topic based publish-subscribe pattern, sender applications tag each message with the name of a topic, instead of referencing specific receivers. The messaging system then sends the message to all applications that have asked to receive messages on that topic .Message senders need only concern themselves with creating the original message, and can leave the task of servicing receivers to the messaging infrastructure.

In this pattern, the publisher and subscriber can communicate only via messages. The Publish-Subscribe Pattern solves the tight coupling problem. It is a very loosely coupled architecture, in which senders do not even know who their subscribers are. 

pubsub.png 

Fig: Topic-based Publish Subscribe paradigm.

There are three types of software applications in the above diagram: Message Publisher, Message Subscriber, and Pub/Sub Server. Here a Message publisher sends messages to the Message Server without any knowledge of Message subscribers and Message Subscribers will only get the messages of types for which he is registered with the server. E.g., say we have 3 different  Topics (message type): a, b, c. but only topic  a   is of interest to Subscriber 1, topics  b and c  are of interest to Subscriber 2 and Subscriber 3 want notification of topics  a and c  . So subscriber 1 will be notified for topic a, Subscriber 2 will be notified for topics b and c  and Subscriber 3 will be notified for topics a and c  by Server. Here Message Publisher does not know who are receiving the messages and Message subscriber does not have knowledge about Message publisher.

Basic Idea about Implementation  

Requirement 1: Subscriber Applications subscribe in one or more topics and only receive messages that are of interest. 

To implement so, we have to keep the record of which subscriber is interested for which topic, then based on these records, we will decide where to relay an event of a particular topic. So we have to maintain the lists of subscribers topic wise. For every topic, there will be a list which contains the address of subscribers who have the interest on that topic. When a publisher application sends an event of a particular topic to pub/sub server,   pub/sub server needs to relay this event to subscribers who has interest on that topic. To make this happen, pub/sub server will take the subscriber list of that topic and will send the event to the addresses of this list. To keep the lists of subscriber’s addresses topic wise, dictionary is a good choice. Key of the dictionary represents topic name and value represents subscribers address list.  

Requirement 2: Loose coupling between the publisher and the subscriber. The publisher has no knowledge about the subscribers, including how many there are or where they live as well as the subscriber has no knowledge about the publisher, including how many there are or where they live. 

To implement so, direct communication between publishers and subscribers can not be allowed. So we have to place a separate entity among them who will keep the topic wise subscribers address list and receives the event from subscriber and relay the event to subscribers and expose methods for  the subscriber to subscribe topic wise. The subscribers and publisher only know this separate entity. This separate entity is called the Publish/Subscribe Service or Server. The functionality of this separate entity is divided between publish service and Subscribe Service. Publish service will receive the event from publisher and relay it to subscribers. Subscriber service will expose Subscribe/Unsubscribe operations for the subscribers. How do we filter out subscribers to relay event of a particular topic? To make filtering out subscribers to relay event of a particular topic, an entity named Filter is implemented. Filter entity is used by both Publish Service and Subscriber Service. 

implementation2.png 
Fig: Implementation diagram.

 Functionality of Filter Unit: 

  • Keeps the list of subscriber topic wise 
  • Returns a list of subscribers for a Topic.
  • Exposes method to add new subscriber. 
  • Exposes method to remove subscriber.  

Functionality of Publish Service:

  • Receives event from the Applications (Publishers) to notify Subscribers.
  • Sends event of a particular topic to Applications (Subscribers) who has been subscribed for that topic. 

Functionality of Subscribe Service: 

  • Exposes method which an application (Subscriber) can remotely invoke through communication channel to subscribe topic wise for getting events.
  • Exposes method which an application (Subscriber) can remotely invoke through communication channel to Unsubscribe topic wise.  

Functionality of Subscriber:

  • Subscriber application registers itself for one or more topics event.
  • Receives event when event is sent from publisher. 

Functionality of Publisher: 

  • Sends event to publisher Service for publishing 
 

Implementation of pub/sub in WCF:  

Here WCF callback   is used to notify the subscribers about an event. Every subscriber implements a callback interface. When the subscriber sends request to  subscribe to Subscriber Service ,its callback reference is kept topic wise so that when a later time an event comes of that topic , server can use that callback reference to notify the corresponding subscriber. When a publisher sends an event of a particular topic by invoking a method of publisher service, Publisher service takes the list  of callback references of that topic and notify the subscribers who have interest on that topic by invoking a method named purplish of callback references. 

Steps of Implementation: 

Step 1: Making the filter class. 

Filter class has the following responsibilities and is used by both Publish Service and Subscriber Service.

  • Keeps the list of subscriber topic wise and 
  • Exposes method to add new subscriber, 
  • Exposes method to remove subscriber,
  • Returns a list of subscribers for a Topic.

 class Filter  
    {
        static Dictionary<string, List<IPublishing>> _subscribersList = new Dictionary<string, List<IPublishing>>();

        static public Dictionary<string, List<IPublishing>> SubscribersList
        {
            get {
                lock (typeof(Filter))
                {
                    return _subscribersList;
                }
            }

        }

        static public List<IPublishing> GetSubscribers(String topicName)
        {
            lock (typeof(Filter))
            {
                if (SubscribersList.ContainsKey(topicName))
                {
                    return SubscribersList[topicName];
                }
                else
                    return null;
            }
        }

        static public void AddSubscriber(String topicName, IPublishing subscriberCallbackReference)
        {
            lock (typeof(Filter))
            {
                if (SubscribersList.ContainsKey(topicName))
                {
                    if (!SubscribersList[topicName].Contains(subscriberCallbackReference))
                    {
                        SubscribersList[topicName].Add(subscriberCallbackReference);
                    }
                }
                else
                {
                    List<IPublishing> newSubscribersList = new List<IPublishing>();
                    newSubscribersList.Add(subscriberCallbackReference);
                    SubscribersList.Add(topicName, newSubscribersList);
                }
            }

        }

        static public void RemoveSubscriber(String topicName, IPublishing subscriberCallbackReference)
        {
            lock (typeof(Filter))
            {
                if (SubscribersList.ContainsKey(topicName))
                {
                    if (SubscribersList[topicName].Contains(subscriberCallbackReference))
                    {
                        SubscribersList[topicName].Remove(subscriberCallbackReference);
                    }
                }
            }
        }

    }<o:p> </o:p>

The dictionary _subscribersList keeps the subscriber topic wise. Here key is topic name (String type).Value is a List of subscriber’s callback references (List<IPublishing>). 

The GetSubscribers(String topicName) method returns a list of callback references based on the Topic Name. Callback references hold necessary mechanism to notify subscribers. This method first checks whether the topic name exists in the dictionary. If exist then it returns the corresponding list of callback references of that topic. If not then returns null.  

The method named AddSubscriber adds the callback Reference of a subscriber to a list topic wise. It has two parameters: First one is topic name and second one is callback reference. This method first checks whether there is a list for that topic. If exist then adds this callback reference to that list. If not then makes a new list for this topic and adds the callback reference to the new list. 

The method named RemoveSubscriber(String topicName, IPublishing subscriberCallbackReference) removes the callback Reference of a subscriber from the corresponding topic wise list. It has tow parameters: First one is topic name and second one is callback reference. It first checks whether there is a list of this topic. If not does nothing. If exist then checks whether this callback reference is exist in the list. If exist removes it otherwise does nothing. 

Step 2: Implementation of subscribe Service.  

Subscription service implements the ISubscription interface. This service methods are called by subscriber application to make it subscribed and unsubscribed. During the subscription of a subscriber, Subscribe Service keeps the callback reference of the subscriber so that later time publisher can notify the Subscriber using the callback reference. 

Now the question is how we can get callback Reference of a subscriber? 

When a subscriber invoke a method call in the subscription service, in that call using the following line Subscriber Service can get callback reference:

<o:p>IPublishing subscriber = OperationContext.Current.GetCallbackChannel<IPublishing>(); </o:p>

We get the call back reference using the above call as every subscriber implements the call back Interface.

<o:p> [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
class Subscription : ISubscription
{
    #region ISubscription Members

    public void Subscribe(string topicName)
    {
        IPublishing subscriber = OperationContext.Current.GetCallbackChannel<IPublishing>();
        Filter.AddSubscriber(topicName, subscriber);
    }

    public void UnSubscribe(string topicName)
    {
        IPublishing subscriber = OperationContext.Current.GetCallbackChannel<IPublishing>();
        Filter.RemoveSubscriber(topicName, subscriber);
    }

    #endregion
}</o:p> 

When the method Subscribe(string topicName) is called from the Subscriber application , it first gets the callback reference by calling method GetCallbackChannel and then passes the topic name and callback reference to filter class to be registered. 

When the method UnSubscribe(string topicName) is called from the Subscriber application , it first gets the callback reference by calling method GetCallbackChannel and then calls the method RemoveSubscriber of filter class with the parameters: topic name, callback reference to be Unsubscribed. 

Step 3: Implementation of Publisher Service.  

Publisher Service implements IPublishing interface. When a publisher invokes the method  named  Publish(Message e, string topicName) of Publisher Service ,then the publish method calls the method Filter.GetSubscribers(topicName) to get list of callback references of subscribers for this topic. Then publish method gets the method information of "Publish”. Then for each call back references fire the method "Publish" using the following line of code with appropriate parameters.

<o:p> publishMethodInfo.Invoke(subscriber, new object[] { e, topicName }); 
</o:p>

This call notifies the subscribers about the event who are interested in the topic.

 [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
class Publishing : IPublishing
{
    #region IPublishing Members
    public void Publish(Message e, string topicName)
    {
        List<IPublishing> subscribers = Filter.GetSubscribers(topicName);

        Type type = typeof(IPublishing);
        MethodInfo publishMethodInfo = type.GetMethod("Publish");
        foreach (IPublishing subscriber in subscribers)
        {
            try
            {
                publishMethodInfo.Invoke(subscriber, new object[] { e, topicName });
            }
            catch
            {

            }

        }


    }


    #endregion
} 
Step 4:  How callback is implemented in subscriber Application. 

WCF has the mechanism to call back a client. To implement callback in WCF , WCF service exposes  a callback contract that all client applications must implement and requires communication over duplex channel. You also have to use Duplex channelFactory to construct the communication channel and also need to provide an Reference Context. netTcpBinding,wsdualHttpBinding and namedPipeBinding support callback as they are bidirectional. 

1. To do so, at first you have to expose a call back contract. The callback contract is in the following.

<o:p>[ServiceContract]
public interface IPublishing
{
    [OperationContract(IsOneWay = true)]
    void Publish(Message e, string topicName);
} </o:p>

2. Then specify the callback contract in service contract. A service contract can have at most one callback contract. 

[ServiceContract(CallbackContract = typeof(IPublishing))]
public interface ISubscription
{
    [OperationContract]
    void Subscribe(string topicName);

    [OperationContract]
    void UnSubscribe(string topicName);
}

3. Then implement the callback contract in Subscriber application as Clients of duplex services must implement a callback contract class. In this project it is done using the following code: 

public partial class Subscriber : Form, IPublishing
    {
       .........(Code)

        #region IMyEvents Members

        public void Publish(Message e, String topicName)
        {
            if (e != null)
            {
                int itemNum = (lstEvents.Items.Count < 1) ? 0 : lstEvents.Items.Count;
                lstEvents.Items.Add(itemNum.ToString());
                lstEvents.Items[itemNum].SubItems.AddRange(new string[] { e.TopicName.ToString(), e.EventData });
                _eventCount += 1;
                txtAstaEventCount.Text = _eventCount.ToString();

            }
        }

        #endregion
    }

4. Create an instance of the Subscriber (callback contract implementation class) and use it to create the System.ServiceModel.InstanceContext object that you will pass to DuplexChannelFactory class as constructor parameter.  

 public void MakeProxy(string EndpoindAddress, object callbackinstance)
        {
            NetTcpBinding netTcpbinding = new NetTcpBinding(SecurityMode.None);
            EndpointAddress endpointAddress = new EndpointAddress(EndpoindAddress);
            InstanceContext context = new InstanceContext(callbackinstance);        

            DuplexChannelFactory<ISubscription> channelFactory = new DuplexChannelFactory<ISubscription>(
                new InstanceContext(this),netTcpbinding, endpointAddress);
            _proxy  = channelFactory.CreateChannel();
        }

5. Then Invoke operations of Service contract (Subscription Service) and handle callback operations in client code.

Step 5: How publisher is implemented.  

It constructs the communication channel with publishing service using ChannelFactory. The following code is used for this purpose:

   private void CreateProxy()
        {
            string endpointAddressInString = ConfigurationManager.AppSettings["EndpointAddress"];
            EndpointAddress endpointAddress = new EndpointAddress(endpointAddressInString);
            NetTcpBinding netTcpBinding = new NetTcpBinding();
            _proxy = ChannelFactory<IPublishing>.CreateChannel(netTcpBinding, endpointAddress);
        } 

Then using this proxy reference, it can invoke the publish method of publishing service to send an event using the following code:

_proxy.Publish(alertData, topicName);<o:p> </o:p>

Hosting of publisher and subscriber service  

The following lines are used to host Publishing Service. In this endpoint, address is net.tcp://localhost:7001/Pub .Binding is netTcpBinding and contract is IPublishing. If you would like to add multiple protocols support then you have to add multiple Endpoints using AddServiceEndpoint method.

 private void HostPublishService()
        {
            _publishServiceHost = new ServiceHost(typeof(Publishing));
            NetTcpBinding tcpBindingpublish = new NetTcpBinding();
            _publishServiceHost.AddServiceEndpoint(typeof(IPublishing), tcpBindingpublish,
                                    "net.tcp://localhost:7001/Pub");
            _publishServiceHost.Open();
        }<o:p>
</o:p>

The following three lines are used to host Subscription Service. In this endpoint address is net.tcp://localhost:7002/Sub .Binding is netTcpBinding and contract is IPublishing. If you would like to add multiple protocols support then you have to add multiple Endpoints using AddServiceEndpoint method.

private void HostSubscriptionService()
        {
            _subscribeServiceHost = new ServiceHost(typeof(Subscription));
            NetTcpBinding tcpBinding = new NetTcpBinding(SecurityMode.None);
            _subscribeServiceHost.AddServiceEndpoint(typeof(ISubscription), tcpBinding,
                                "net.tcp://localhost:7002/Sub");
            _subscribeServiceHost.Open();
        }

Why is Publish method one way? 

Publish method has no returned values and publisher does not need the acknowledgement of   this method invocation.  

Why is Channel factory used to construct communication channel here?   

If we use proxy class for publisher and subscriber, then when we will change data contract, Service contract, callback contract we need to regenerate the proxy for publisher and subscriber. If we use channel factory to construct the channel we do not need to change channel factory code when there is a change in data contract, service contract and callback contract.  Here to use Channel Factory to construct channel between server and publisher/subscriber, all the contracts are kept in a separate DLL that is shared by server, publisher and Subscribe. 

Socket Implementation VS WCF Implementation.  ·      

  •  Socket based implementation sends comma separated plain text message which is not verbose whereas WCF implementation send SOAP message which is verbose. 
  • Socket Based implementation is not based on any standard. To achieve interoperability, proprietary protocol is not a good choice whereas SOAP is a standard.  
  • Socket based implementation does not need WCF runtime so it can be used for embedded programming but WCF implementation does not.  
  • WCF Implementation is in object oriented fashion but Socket implementation is not. 
  • WCF implementation can support multiple protocols easily just adding extra endpoint. But in socket implementation if you want to add multiple protocol support you have to do Considerable works like you have to write different implementation for each protocol. 
  • To give security feature in socket based implementation  you  have to do lot of works and it will not be based on standard. But in WCF you can give message security easily based on standard.  
  • Socket based implementation is fast and consume less traffic than WCF implementation.    

Sample Code 

Here, a project has been attached which shows Topic based publish/subscribe design pattern implementation in c# (Using WCF Callback).

Conclusion

Thanks for reading this write up. I hope that this article will be helpful for some people. If you guys have any question, I would love to answer. I always appreciate comments.

History

Initial release – 22/03/09

References 

Publish-Subscribe glossary  

Publish/subscribe - Wikipedia, the free encyclopedia

Publish/Subscribe (MSDN)

[Baldoni03] Baldoni, R.; M. Contenti, and A. Virgillito. "The Evolution of Publish/Subscribe Communication Systems." Future Directions of Distributed Computing. Springer Verlag LNCS Vol. 2584, 2003.  

 

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Razan Paul (Raju)
Software Developer (Senior) CP
Australia Australia
I am an Independent Contractor in Brisbane, Australia. For me, programming is a passion first, a hobby second, and a career third.
 
My Blog: http://weblogs.asp.net/razan/
 

 



Comments and Discussions

 
GeneralMy vote of 5 Pinmembermohang.2008@yahoo.com29-Jul-10 21:14 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Mobile
Web02 | 2.8.140827.1 | Last Updated 23 Mar 2009
Article Copyright 2009 by Razan Paul (Raju)
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid