Here WCF callback is used
to notify the subscribers about an event. Every subscriber implements a
callback interface. When the subscriber sends request to subscribe to Subscriber Service ,its callback reference is kept
topic wise so that when a later time an event comes of that topic , server can
use that callback reference to notify the corresponding subscriber. When a
publisher sends an event of a particular topic by invoking a method of
publisher service, Publisher service takes the list of callback references
of that topic and notify the subscribers who have interest on that topic by
invoking a method named purplish of callback references.
Filter class has the
following responsibilities and is used by both Publish Service and Subscriber
Service.
class Filter
{
static Dictionary<string, List<IPublishing>> _subscribersList = new Dictionary<string, List<IPublishing>>();
static public Dictionary<string, List<IPublishing>> SubscribersList
{
get {
lock (typeof(Filter))
{
return _subscribersList;
}
}
}
static public List<IPublishing> GetSubscribers(String topicName)
{
lock (typeof(Filter))
{
if (SubscribersList.ContainsKey(topicName))
{
return SubscribersList[topicName];
}
else
return null;
}
}
static public void AddSubscriber(String topicName, IPublishing subscriberCallbackReference)
{
lock (typeof(Filter))
{
if (SubscribersList.ContainsKey(topicName))
{
if (!SubscribersList[topicName].Contains(subscriberCallbackReference))
{
SubscribersList[topicName].Add(subscriberCallbackReference);
}
}
else
{
List<IPublishing> newSubscribersList = new List<IPublishing>();
newSubscribersList.Add(subscriberCallbackReference);
SubscribersList.Add(topicName, newSubscribersList);
}
}
}
static public void RemoveSubscriber(String topicName, IPublishing subscriberCallbackReference)
{
lock (typeof(Filter))
{
if (SubscribersList.ContainsKey(topicName))
{
if (SubscribersList[topicName].Contains(subscriberCallbackReference))
{
SubscribersList[topicName].Remove(subscriberCallbackReference);
}
}
}
}
}<o:p> </o:p>
The dictionary _subscribersList
keeps the subscriber
topic wise. Here key is topic name (String type).Value is a List of
subscriber’s callback references (List<IPublishing>).
The GetSubscribers(String topicName) method returns a list of callback references based on the Topic Name. Callback references hold necessary mechanism
to notify subscribers. This method first checks whether the topic name exists
in the dictionary. If exist then it returns the corresponding list of callback references
of that topic. If not then returns null.
The method named AddSubscriber
adds the callback Reference of a subscriber to a list topic wise. It has two parameters:
First one is topic name and second one is callback reference. This method
first checks whether there is a list for that topic. If exist then adds this callback reference to that list. If not then makes a new list for this
topic and adds the callback reference to the new list.
The method named RemoveSubscriber(String topicName, IPublishing
subscriberCallbackReference) removes
the callback Reference of a subscriber from the corresponding topic wise list.
It has tow parameters: First one is topic name and second one is callback reference.
It first checks whether there is a list of this topic. If not does nothing. If
exist then checks whether this callback reference is exist in the list. If exist
removes it otherwise does nothing.
Step 2: Implementation of subscribe
Service.
Subscription service
implements the ISubscription interface. This service methods are called by
subscriber application to make it subscribed and unsubscribed. During the subscription
of a subscriber, Subscribe Service keeps the callback reference of the
subscriber so that later time publisher can notify the Subscriber using
the callback reference.
Now the question is how
we can get callback Reference of a subscriber?
When a subscriber invoke
a method call in the subscription service, in that call using the following
line Subscriber Service can
get callback reference:
<o:p>IPublishing subscriber = OperationContext.Current.GetCallbackChannel<IPublishing>(); </o:p>
We get the call back reference
using the above call as every subscriber implements the call back Interface.
<o:p> [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
class Subscription : ISubscription
{
#region ISubscription Members
public void Subscribe(string topicName)
{
IPublishing subscriber = OperationContext.Current.GetCallbackChannel<IPublishing>();
Filter.AddSubscriber(topicName, subscriber);
}
public void UnSubscribe(string topicName)
{
IPublishing subscriber = OperationContext.Current.GetCallbackChannel<IPublishing>();
Filter.RemoveSubscriber(topicName, subscriber);
}
#endregion
}</o:p>
When the method
Subscribe(string topicName) is called from the Subscriber application , it
first gets the callback reference by calling method GetCallbackChannel and
then passes the topic name and callback reference to filter class to be
registered.
When the method
UnSubscribe(string topicName) is called from the Subscriber application , it
first gets the callback reference by calling method GetCallbackChannel and
then calls the method RemoveSubscriber of filter class with the parameters:
topic name, callback reference to be Unsubscribed.
Step 3: Implementation of
Publisher Service.
Publisher Service
implements IPublishing interface. When a publisher invokes the method named Publish(Message e, string
topicName) of Publisher Service ,then
the publish method calls the method Filter.GetSubscribers(topicName) to get list
of callback references of subscribers for this topic. Then publish method gets the method information of "Publish”. Then for each call back references
fire the method "Publish" using the following line of code with
appropriate parameters.
<o:p> publishMethodInfo.Invoke(subscriber, new object[] { e, topicName });
</o:p>
This call notifies the
subscribers about the event who are interested in the topic.
[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
class Publishing : IPublishing
{
#region IPublishing Members
public void Publish(Message e, string topicName)
{
List<IPublishing> subscribers = Filter.GetSubscribers(topicName);
Type type = typeof(IPublishing);
MethodInfo publishMethodInfo = type.GetMethod("Publish");
foreach (IPublishing subscriber in subscribers)
{
try
{
publishMethodInfo.Invoke(subscriber, new object[] { e, topicName });
}
catch
{
}
}
}
#endregion
} Step 4: How callback is
implemented in subscriber Application.
WCF has the mechanism to
call back a client. To implement callback in WCF , WCF service exposes a
callback contract that all client applications must implement and requires
communication over duplex channel. You also have to use Duplex channelFactory
to construct the communication channel and also need to provide an Reference
Context. netTcpBinding,wsdualHttpBinding and namedPipeBinding support callback
as they are bidirectional.
1. To do so, at first you
have to expose a call back contract. The callback contract is in the following.
<o:p>[ServiceContract]
public interface IPublishing
{
[OperationContract(IsOneWay = true)]
void Publish(Message e, string topicName);
} </o:p>
2. Then specify the
callback contract in service contract. A service contract can have at most one callback contract.
[ServiceContract(CallbackContract = typeof(IPublishing))]
public interface ISubscription
{
[OperationContract]
void Subscribe(string topicName);
[OperationContract]
void UnSubscribe(string topicName);
}
3. Then implement the
callback contract in Subscriber application as Clients of duplex services must
implement a callback contract class. In this project it is done using the
following code:
public partial class Subscriber : Form, IPublishing
{
.........(Code)
#region IMyEvents Members
public void Publish(Message e, String topicName)
{
if (e != null)
{
int itemNum = (lstEvents.Items.Count < 1) ? 0 : lstEvents.Items.Count;
lstEvents.Items.Add(itemNum.ToString());
lstEvents.Items[itemNum].SubItems.AddRange(new string[] { e.TopicName.ToString(), e.EventData });
_eventCount += 1;
txtAstaEventCount.Text = _eventCount.ToString();
}
}
#endregion
}
4. Create an instance of
the Subscriber (callback contract implementation class) and use it to create
the System.ServiceModel.InstanceContext object that you will pass to DuplexChannelFactory class as
constructor parameter.
public void MakeProxy(string EndpoindAddress, object callbackinstance)
{
NetTcpBinding netTcpbinding = new NetTcpBinding(SecurityMode.None);
EndpointAddress endpointAddress = new EndpointAddress(EndpoindAddress);
InstanceContext context = new InstanceContext(callbackinstance);
DuplexChannelFactory<ISubscription> channelFactory = new DuplexChannelFactory<ISubscription>(
new InstanceContext(this),netTcpbinding, endpointAddress);
_proxy = channelFactory.CreateChannel();
}
5. Then Invoke
operations of Service contract (Subscription Service) and handle callback operations
in client code.
Step
5: How publisher is
implemented.
It constructs the communication
channel with publishing service using ChannelFactory. The following code is
used for this purpose:
private void CreateProxy()
{
string endpointAddressInString = ConfigurationManager.AppSettings["EndpointAddress"];
EndpointAddress endpointAddress = new EndpointAddress(endpointAddressInString);
NetTcpBinding netTcpBinding = new NetTcpBinding();
_proxy = ChannelFactory<IPublishing>.CreateChannel(netTcpBinding, endpointAddress);
}
Then using this proxy reference, it
can invoke the publish method of publishing service to send an event using the
following code:
_proxy.Publish(alertData, topicName);<o:p> </o:p>
Hosting of publisher and subscriber service
The following lines are used to host
Publishing Service. In this endpoint, address is net.tcp://localhost:7001/Pub .Binding is netTcpBinding and contract is IPublishing. If
you would like to add multiple protocols support then you have to add
multiple Endpoints using AddServiceEndpoint method.
private void HostPublishService()
{
_publishServiceHost = new ServiceHost(typeof(Publishing));
NetTcpBinding tcpBindingpublish = new NetTcpBinding();
_publishServiceHost.AddServiceEndpoint(typeof(IPublishing), tcpBindingpublish,
"net.tcp://localhost:7001/Pub");
_publishServiceHost.Open();
}<o:p>
</o:p>
The following three lines are used
to host Subscription Service. In this endpoint address is net.tcp://localhost:7002/Sub
.Binding is netTcpBinding and contract is IPublishing. If you would like to add
multiple protocols support then you have to add multiple Endpoints using
AddServiceEndpoint method.
private void HostSubscriptionService()
{
_subscribeServiceHost = new ServiceHost(typeof(Subscription));
NetTcpBinding tcpBinding = new NetTcpBinding(SecurityMode.None);
_subscribeServiceHost.AddServiceEndpoint(typeof(ISubscription), tcpBinding,
"net.tcp://localhost:7002/Sub");
_subscribeServiceHost.Open();
}
Why is Publish method
one way?
Publish method has no
returned values and publisher does not need the acknowledgement of this
method invocation.
Why is Channel
factory used to construct communication
channel here?
If we use proxy class
for publisher and subscriber, then when we will change data contract, Service contract,
callback contract we need to regenerate the proxy for publisher and subscriber.
If we use channel factory to construct the channel we do not need to
change channel factory code when there is a change in data contract, service contract and
callback contract. Here to use Channel Factory
to construct channel between server and publisher/subscriber, all the
contracts are kept in a separate DLL that is shared by server, publisher and
Subscribe.
Socket Implementation VS WCF Implementation. ·
- Socket based implementation sends comma separated plain text message which is not verbose whereas WCF implementation send SOAP message which is verbose.
- Socket Based implementation is not based on any standard. To achieve interoperability, proprietary protocol is not a good choice whereas SOAP is a standard.
- Socket based implementation does not need WCF runtime so it can be used for embedded programming but WCF implementation does not.
- WCF Implementation is in object oriented fashion but Socket implementation is not.
- WCF implementation can support multiple protocols easily just adding extra endpoint. But in socket implementation if you want to add multiple protocol support you have to do Considerable works like you have to write different implementation for each protocol.
- To give security feature in socket based implementation you have to do lot of works and it will not be based on standard. But in WCF you can give message security easily based on standard.
- Socket based implementation is fast and consume less traffic than WCF implementation.
Sample Code
Here, a
project has been attached which shows Topic based publish/subscribe design
pattern implementation in c# (Using WCF Callback).
Conclusion
Thanks
for reading this write up. I hope that this article will be helpful for some
people. If you guys have any question, I would love to answer. I always
appreciate comments.
History
Initial
release – 22/03/09
References
Publish/subscribe - Wikipedia, the free
encyclopedia
Publish/Subscribe (MSDN)
[Baldoni03]
Baldoni, R.; M. Contenti, and A. Virgillito. "The Evolution of Publish/Subscribe Communication Systems." Future Directions of Distributed Computing. Springer Verlag LNCS Vol. 2584,
2003.