Click here to Skip to main content
Click here to Skip to main content

WS-Eventing for WCF (Indigo)

By , 4 Jun 2007
Rate this:
Please Sign up or sign in to vote.

Contents

Note: This article was written by .NET Framework 3.0 (RTM)

Introduction

Event-driven async distributed architecture requires a common interaction pattern for transmitting a workflow state across a service boundary. The Web Service Eventing (WS-Eventing) represents a SOAP-based specification for pub/sub communications patterns. These enable decoupling of an event source from its consumer, based on the event interest known as a subscription. The WS-Eventing specification was created by IBM, BEA Systems, TIBCO Software, Sun Microsystems, Computer Associates and Microsoft. On March 15, 2006 it was submitted to W3C (HP, IBM, Intel and Microsoft) for Web Service Standards. More details about that can be found here.

The Web Service Standards introduced a few new specifications on the top of the WS-Eventing such as WS-EventingNotification and WS-ResourceTransfer. These new WS-* specs can standardize an interaction of the WS-Eventing in the Service Oriented Architecture (SOA), e.g. using a WS-ResourceTransfer for subscription storage and WS-EventingNotification for delivering a notification message to the event sink. Note that these specs are new and therefore they are not publicly available yet. This article describes a design, implementation and usage of the WS-Eventing spec in the foundation level with a capability for future extensions using the above mentioned new specs.

Concept and design implementation

In order to better understand the concept of the Eventing notifications system and message exchange pattern, let's start with a simple notification case that is shown in the following figure:

In the above picture, I am assuming that some endpoints would like to be notified from the place where some situation (event) occurred. The place where the situation occurred is called in the WS-Eventing spec and is referred to as as Event Source, the initiator (producer) of the message. The message is called the Event Message because the message is related to some event situation. The Event Message is sent to the service that is waiting for this event situation, the Event Sink. The Event Sink is described by Address, Binding and Contract (ABC), known as ServiceEndpoint, and of course by a behavior. However, that's the local subjective of each Event Sink.

Now, the upcoming question would be, "How does the Event Source knows about the Event Sink and delivery mechanism?" The minimum requirement is to know a base address of the Event Sink. In order to obtain binding information of the Event Sink endpoint, we can use a WS-MEX (Web Service MetadataExchange) message pattern. In addition, the Event Source needs to know how to deliver an Event Message to the destination sink service. This delivery mechanism is called in the WS-Eventing as Delivery Mode.

Based on the above analysis, the Event Source needs some metadata for the message producer to properly generate an Event Message for each situation. This metadata related to the Event Sink interest is known as Subscriptions. Subscriptions, based on the event situation, can be organized by the event topic related to the application-specific Event Source, for example: weather, stock market, magazine, etc.

One more thing: each event situation represents some event value -- i.e. DataContract between the Event Sink and Event Source -- that can be used as a condition by Event Sink interest for delivering a notification message. This pattern can be defined in the Subscription using the Filter element. Note that the WS-Eventing specification does not constrain notifications and it depends on the Contract only. Basically, any SOAP-based message can be an Event Message. However, if the Event Sink requires some additional application-specific header blocks, the subscription will be held. This topic will be described later on this article.

Event source

The Event Source hides many WS-Eventing features. Let's look at them in detail. We know that the logical connectivity (interest) between the Event Sink and its Source is defined (abstracted) in the resource called Subscription. For a message exchange pattern with an Event Sink, the Event Source contains a message producer (service) that relays on one or more Event Topics. This producer (Notification Manager) encapsulated the Event Sink from the Event Source. The following picture shows its position in the Event Source:

Let's assume that somehow we stored our subscriptions in the Event Storage. This could be done administratively (manually) or programmatically using DAL layer or WS-Transfer service. In addition, the Notification Manager is able to query the Event Storage for Subscription based on the Event Topic. The Subscription is stored in the Storage as a resource with a resource descriptor. The following code snippet shows an example of the Subscription and its descriptor:

<ResourceDescriptor>
  <Name>Subscription for weather report</Name>
  <Key>uuid:770b3a90-27c8-419e-94ae-301313d52793</Key>
  <Topic>weather:report.storm</Topic>
  <Expires>2007-05-22T05:58:14.4838Z</Expires>
  <Created>2006-05-22T06:35:57.7556Z</Created>
  <Updated/>
</ResourceDescriptor>


<wse:Subscription>  
  <wse:Delivery wse:Mode=
    "http://schemas.xmlsoap.org/ws/2004/08/eventing/DeliveryModes/Push">
    <wse:NotifyTo>
      <a:Address>net.tcp://localhost:33333/OnStormWarning</Address>
      <a:ReferenceParameters xmlns="http://www.w3.org/2005/08/addressing">
        <MySubscription xmlns="urn:MyNamespace">1234567890</MySubscription>
      </a:ReferenceParameters>
    </wse:NotifyTo>
  </wse:Delivery>
  <wse:EndTo>
    <a:Address>net.tcp://localhost:44444/Admin</Address>
    <a:ReferenceParameters>
      <MySubscription xmlns="urn:MyNamespace">1234567890</MySubscription>
    </a:ReferenceParameters>
  </wse:EndTo>
  <wse:Expires>2007-05-22T05:58:14.4838368Z</wse:Expires>
  <wse:Filter xmlns:ow="http://www.example.org/oceanwatch">
     //ow:WindReport/ow:Speed <= 65
  </wse:Filter>
</wse:Subscription>

The above Subscription example contains major metadata, the delivery pattern knowledge base for the Notification Manager Service. It represents an Event Sink interest. When a situation in the Event Source has occurred, the specific event topic is delivered to the Notification Manager service based on the internal connectivity knowledge base. The process in the service starts to query all ResourceDescriptors in the Event Storage related to this event topic, for example: weather:report.storm. Walking through this Subscription collection, the service looks for the Filter. It will execute its expression on the situation value (a message payload). When the filtering is passed, the Delivery element starts processing based on the delivery mode.

In our example, the delivery mode is this, where the Event Message is forced to push it to the sink in the fire & forget (async) manner. The service dispatcher will initiate an output channel destination for NotifyTo EndpointAddress. Note that the NotifyTo's ReferenceParameters will be added in the message header and delivered to the sink without any modification (loosely coupled correlation info between the sink and subscription). The above Subscription contains additional information such as EndTo EndpointAddress, which is useful for delivering a notification message about the Event Source status (disable, shutdown, etc.). Also, it is possible to add additional information to Subscription, such as Subscription Policy, binding key, etc.

Note:

The Event Message is delivered to the Event Sink based on the ABC description. In the WS-Eventing for Web Service, the binding uses BasicHttpBinding (Soap 1.1). What about the other bindings, for instance tcp, msmq, namepipe, etc?. How does the Event Source knows about the binding of the specific Event Sink? Or how about the scalability, when the Event Source needs to deliver a notification message to a thousand sinks?

The WS-Eventing specification didn't specify any of these questions. Some of them will require additional information to be added to the subscription or the use of another WS-* spec to obtain it. For example, WS-MetadataExchange enables one to get the Event Sink's metadata for creating a proxy. Certainly, this part of the Notification Manager must necessarily be customized based on the application requirements. This article describes how to implement a basic Event Message push delivering.

Based on the above description, you could have a fundamental question. Where is the WS-Eventing spec pattern and can the Event Source send messages to the Event Sink based on Data and Operation Contracts? Well, that is the right question. WS-Eventing didn't specify anything about the Event Notification message, just a Push (fire&forget) delivery mode. It appears that the Notification Manager is doing router work based on the Subscription without the knowledge of the event situation. That's correct.

For example: The Event Sink can send a Subscription by e-mail (or FedEx), and then administrator will manually add (subscribe) it into the Event Storage, which will activate the Event Sink for its notification interest. This off-line subscribing process is used by many applications in private pub/sub notification systems. It is the first step for incremental development of WS-Eventing based on the 3 actors such as Event Source, Subscription/Notification Manager and Event Sink.

On-line subscribing (WS-Eventing)

Adding an actor into the Event Source for subscribing and managing a Subscription in Event Storage enables control of a Publish/Subscribe Notification on the fly. This service (Subscription Manager) can have private and/or standardized (by W3C organization) operation contracts knows as WS-Eventing. The following picture shows the actors in the WS-Eventing pattern:

The above picture added a Subscription Manager service into the Event Source. This service allows a service requestor called Subscriber to manage Subscription resources by WS-Eventing operations such as Subscribe, Unsubscribe, Renew and GetStatus. It looks like a WS-Transfer resource factory and operation with Create, Delete, Put and Get. The Subscription and the Notification managers are sharing the Event Storage with Subscription resources. They are a separate entity and they can be hosted on the same appDomain or across the network and using the WS-Transfer accessing the Event Storage.

Note that the WS-Eventing didn't specify an event topic for subscribing an interest by Subscriber. One way how to organize Pub/Sub Notification Systems is to initiate the Subscription/Notification managers based on the event topics and assign their ABC EndpointAddress. The other way is to use a custom private header (e.g. Topic) in the Subscriber interest. Of course, this header must be recognized by the Subscription manager.

Delivery modes

WS-Eventing outlines only the Push model for notifications. This delivery is based on the async pushing (routing) the notification message to the Event Sink in the fire&forget manner. The other customizable modes are:

  • batched, where the event source allows batching of multiple notifications into a single notification message. It will reduce the notification traffic between the event source and sink.
  • pull, where the event sink is responsible for polling the event source periodically and pulling notifications from the endpoint given by Event Source in the subscribe operation.
  • trap, where the event source is pushing the notification message to the event sink(s) using a UDP multicasting address.
  • wrapped, where the event source sends a unique wrapped notification message to its consumer - event sink.
  • pushWithAck, where the event source sends a notification message and waits for its acknowledgement.

As mentioned earlier, the Notification Manager pushes the event situation message known as Event Message to its consumer, based on the Subscriber interest (Subscription) in the transparent manner. This allows chaining the Notification Managers and grouping them based on the Event Sources, and of course by Subscriptions located in the different Event Storages. The other scenario is intercepting the Event Message in the special sink, as it is shown in the following figure:

In the above picture, the Storage service represents 3 actors such as Event Sink, Storage and Event Source for processing Batched and Pull Delivery modes. The Fire #1 message is accumulated in the Storage and then triggers an event for situation Fire #2 and delivers it as a single Event Message to the destination Event Sink. The number of the accumulated messages in the Storage can be configured based on the quantity, timestamps or duration.

The above picture also shows a pattern of the Pull Delivery mode. The Event Sink can periodically poll the Storage for pulling stored event messages Fire#1. Based on the Storage implementation, both types of Event Sinks can be worked concurrently. Note that this model is driven by distributed Subscriptions and "special" sink service(s) where a Local Event Source knows its own neighborhood only.

Finally, we come to the point where all major actors of the Publish/Subscribe Notification Systems have been explained. Now we can focus on the WS-Eventing spec and its implementation by MS Windows Communication Foundation (aka Indigo) Technology.

WS-Eventing

WS-Eventing is a WS-Addressing based specification for message exchange between the Event Sink and Event Source to register interest in certain events. The following points are the major highlights:

  • There are 3 actors: the Event Sink, Event Source and Subscription manager.
  • Event Source is responsible for controlling the information that Event Sinks are interested in.
  • The Event Sink subscribes an interest with the Event Source by operation contract, such as Subscribe.
  • Subscription is a resource that represents an interest of Event Sink.
  • Event Source is capable of sending a notification message -- called as SubscriptionEnd -- to Event Sink when Subscription is terminated, e.g. lifetime expires, out of service, etc.
  • Subscription manager is responsible for managing a Subscription's lifetime using operation contracts such as Unsubscribe, Renew and GetStatus.
  • Delivery mode is a transport pattern how Event Source sends a notification message to the Event Sinks. The default mode is push.

Some additional points based on this article:

  • Subscriber is a representative of the Event Sink that handles an interest in the Event Source.
  • Notification manager is a representative of the Event Source responsible for delivering a notification message to the Event Sink based on the Subscription (interest).
  • Event Storage is a resource driven service for storing Subscriptions.

Putting all of the above bullets together around the WS-* Service Bus and orchestrated based on the WS-Eventing pattern, we can get the logical model of the Publish/Subscribe Notification Systems shown in the following picture:

Screenshot - DrawingEventing5.jpg

This model can provide a distributed notification where the Event Storages, Sources, Sinks and Managers are logically connected via WS-Transfer and WS-Eventing patterns. In other words, some companies can only support storage systems (e.g. amazon S3 - Simple Storage Service) while some others can support the Subscription manager services and Notification manager services, etc. and when your application wants to be notified by some situation, these WS-Eventing services can be leased for your interest.

Let's describe how an application can subscribe an interest into the Event Source. What is the workflow process in the Event Source to generate the Notification to the Event Sink? I will use the following sequence diagram to demonstrate the case:

Let's assume that the Subscriber (Event Sink representative) knows an Endpoint Address (ABC) of the Event Source. Then:

  1. In order to receive the event situation from the Event Source, the Subscriber initiates a call and sends an interest (Subscription) request to the Event Source representative service using a WS-Eventing Subscribe operation. This action is known as Subscribe. The following is an example of the Subscribe request:
<s:Envelope 
    xmlns:s="http://www.w3.org/2003/05/soap-envelope" 
    xmlns:a="http://www.w3.org/2005/08/addressing"
    xmlns:wse="http://schemas.xmlsoap.org/ws/2004/08/eventing">
  <s:Header>
    <a:Action s:mustUnderstand="1">
      http://schemas.xmlsoap.org/ws/2004/08/eventing/Subscribe
    </a:Action>
    <h:SubscriptionTopic xmlns:h=
    "http://www.rkiss.net/schemas/sb/2004/08/servicebus">
      weather:report.storm
    </h:SubscriptionTopic>
    <a:MessageID>urn:uuid:b373f5d9-d6e9-471d-af73-28544290f146<
      /a:MessageID>
    <a:ReplyTo>
      <a:Address>http://www.w3.org/2005/08/addressing/anonymous<
      /a:Address>
    </a:ReplyTo>
    <a:To s:mustUnderstand="1"
      >net.tcp://localhost:11111/SubscriptionManager</a:To>
  </s:Header>
  <s:Body>
    <wse:Subscribe>
    <wse:EndTo>
      <a:Address>net.tcp://localhost:44444/Admin</a:Address>
      <a:ReferenceParameters>
        <MySubscription xmlns="urn:MyNamespace">1234567890<
          /MySubscription>
      </a:ReferenceParameters>
    </wse:EndTo>
    <wse:Delivery>
      <wse:NotifyTo>
        <a:Address>net.tcp://localhost:33333/OnStormWarning</a:Address>
        <a:ReferenceParameters>
          <MySubscription xmlns="urn:MyNamespace">1234567890<
            /MySubscription>
        </a:ReferenceParameters>
      </wse:NotifyTo>
    </wse:Delivery>
    <wse:Expires>PT5M</wse:Expires>
    <wse:Filter xmlns:ow="http://www.example.org/oceanwatch">
      //ow:WindReport/ow:Speed &gt;= 65
    </wse:Filter>
    </wse:Subscribe>
  </s:Body>
</s:Envelope>

The header block uses a WS-Addressing spec to describe a Subscribe request destination and where its response will reply to. This MEP fashion allows for delivering of additional application specific information, such as applicationId, etc. to the replyTo service. In our example, the response is back to the Subscriber.

The message payload describes an interest of the application for certain events. There are metadata for delivering an Event Message with an option to include some application specific data, leasing time of the notification service, interest of the situation value represented by the Filter element, and also where the fault message that terminates the Subscription will be delivered. As I mentioned earlier, WS-Eventing is designed on the top of WS-Addressing. Therefore any additional information to the destination address can be added in the ReferenceProperties. There are no limitations. It could be a small string or complex type that can be recognized by the destination service.

  1. The Subscription manager validates the message body, such as Delivery mode, Filter mode and Expires time. If there is no match (support), the Subscription manager will send an application-specific fault message (e.g. FilteringNotSupported). When the interest is accepted, the manager will create a subscription resource and ask the Storage for its persistence. The Storage can be accessed via WS-Transfer interface or simple use of the internal adapter like it is implemented in this article. Note that every Subscription has a unique identifier for its service operation purpose. Basically, the Subscription can be stored like a resource with its own resource descriptor for additional info such as key, topic, timestamp created, etc. This pattern encapsulates a resource body from its identifications.
  1. Subscription manager returns a response message to the specified address that can be the initiator of the request or any destination defined by the ReplyTo header. The response message payload is very simple. It contains a Subscription manager service endpoint address that includes a Subscription Identifier. Basically, anyone who has this endpoint address will be allowed to manage specific Subscription in the Event Source Storage. The following code snippet shows an example of the SubscribeResponse message:
<s:Envelope 
    xmlns:a="http://www.w3.org/2005/08/addressing" 
    xmlns:s="http://www.w3.org/2003/05/soap-envelope"
    xmlns:wse="http://schemas.xmlsoap.org/ws/2004/08/eventing">
  <s:Header>
    <a:Action s:mustUnderstand="1">
      http://schemas.xmlsoap.org/ws/2004/08/eventing/SubscribeResponse
    </a:Action>
    <a:RelatesTo>urn:uuid:b373f5d9-d6e9-471d-af73-28544290f146<
      /a:RelatesTo>
  </s:Header>
  <s:Body>
    <wse:SubscribeResponse>
      <wse:SubscriptionManager>
        <a:Address>net.tcp://localhost:11111/SubscriptionManager<
          /a:Address>
        <a:ReferenceParameters>
          <wse:Identifier>uuid:2c125232-93cd-4ea7-a9a1-ee2b163266fd<
            /wse:Identifier>
        </a:ReferenceParameters>
      </wse:SubscriptionManager>
      <wse:Expires>2006-06-04T04:24:46.5194432Z</wse:Expires>
    </wse:SubscribeResponse>
  </s:Body>
</s:Envelope>
  1. The subscribing phase is done. The interest (Subscription) was stored in the Event Storage and the Event Sink is waiting for the notification message (default delivery mode = push). Of course, we can also start managing a Subscription such as Unsubscribe, Renew or ask for GetStatus. So, let's make the same situation in the Event Source that will trigger the "send an internal message" event to the Notification manager.
  1. Notification manager receives an event from a specific topic source. This event must be delivered to all Event Sinks based on their registered interest that persisted as Subscription. Therefore the manager asks the Event Storage for all Subscriptions for this topic. If the manager is not configured for a specific event topic, the Storage will return all available Subscriptions. Walking through all Subscriptions, the manager will check a delivery for Filter match and delegate the work to the ThreadPool for async fire&forget message delivery to the NotifTo destination.

Note that WS-Eventing spec supports only one operation for specific update of subscribed interest in the Event Storage. This Renew operation allows changing of the Expires time of the Subscription only. For other changes such as Filter, delivery mode, etc., the Event Sink needs to perform two steps. The first one is to perform the Unsubscribe Subscription operation and then create new one.

Implementation

The concept of the WS-Eventing implementation for the WCF model is based on decoupling an Eventing Storage from the service layers into the configurable adapter. This concept has been described in detail in my previous article WS-Transfer for WCF. The configurable adapter enables use of different storage such as Memory, SQL, WS-Transfer, etc. The default adapter that is implemented in this article is a Memory adapter. Note that the behaviorExtensionSection that plugs into the custom service adapter is used from the WS-Transfer for WCF solution. See the ServiceAdapterBehaviorExtension.cs source file. The WS-Eventing implementation requires building of the following:

  • SOAP stack for WS-Eventing messages (set of the Xml/CLR classes)
  • Subscription manager service for subscribing and managing Event Sink interest in the Event Storage
  • Notification manager for dealing with events
  • Memory adapter for Event Storage

ServiceContract

Let's start with an interface contract. There are 4 web service operations based on the WS-Eventing spec. The first one, Subscribe, is for creating a Susbscription and the others are for its management. Therefore I have created 2 service contracts. The following code snippet shows these contracts:

[ServiceContract(Namespace = WSEventing.NamespaceUri)]
[XmlSerializerFormat(Style = OperationFormatStyle.Document)] 
public interface IWSEventing : IWSEventingFactory, IWSEventingOperation
{
}

[ServiceContract(Namespace = WSEventing.NamespaceUri)]
[XmlSerializerFormat(Style = OperationFormatStyle.Document)]   
public interface IWSEventingFactory
{
  [OperationContract(Action = WSEventing.SubscribeAction, 
    ReplyAction = WSEventing.SubscribeResponseAction)]
  [FaultContract(typeof(SupportedDeliveryMode))]
  [FaultContract(typeof(SupportedDialect))]
  [TransactionFlow(TransactionFlowOption.Allowed)]
  SubscribeResponse Subscribe(SubscribeRequest request);
}

[ServiceContract(Namespace = WSEventing.NamespaceUri)]
[XmlSerializerFormat(Style = OperationFormatStyle.Document)]
public interface IWSEventingOperation
{
  [OperationContract(Action = WSEventing.GetStatusAction, 
    ReplyAction = WSEventing.GetStatusResponseAction)]
  [TransactionFlow(TransactionFlowOption.Allowed)]
  GetStatusResponse GetStatus(GetStatusRequest request);

  [OperationContract(Action = WSEventing.RenewAction, 
    ReplyAction = WSEventing.RenewResponseAction)]
  [TransactionFlow(TransactionFlowOption.Allowed)]
  RenewResponse Renew(RenewRequest request);

  [OperationContract(Action = WSEventing.UnsubscribeAction, 
    ReplyAction = WSEventing.UnsubscribeResponseAction)]
  [TransactionFlow(TransactionFlowOption.Allowed)]
  UnsubscribeResponse Unsubscribe(UnsubscribeRequest request);

}

Subscribe request/response message contract

The pattern of the operation contract is created quite easily by using the request/response message contract. The request resp. response class represents a message envelope boilerplate with the header(s) and body properties. Actually, XML serialization/deserialization logic is encapsulated into the base class. The following code snippet shows the SubscribeRequest and SubscribeResponse classes for message contracts. Note that the request has an option to add a SubscriptionTopic by Subscriber for categorized subscriptions in the Event Storage.

/// WS-Eventing Subscribe request message class
[MessageContract]
public class SubscribeRequest : Subscribe
{
  #region DataMembers
  private string _subscriptionTopic;
  #endregion

  // option: 
  [MessageHeader(Name = WSEventing.Extension.SubscriptionTopic, 
    Namespace = WSEventing.Extension.NamespaceUri)]
  public string SubscriptionTopic
  {
    get { return _subscriptionTopic; }
    set { _subscriptionTopic = value; }
  }

  #region Properties
  [MessageBodyMember(Name = WSEventing.ElementNames.Subscribe, 
    Namespace = WSEventing.NamespaceUri, Order = 0)]
  public Subscribe Body   
  {
    get { return this; }
    set { this.Copy(value); }
  }
  #endregion

  #region Constructors
  public SubscribeRequest()
  {
  }
  public SubscribeRequest(Subscribe subscribe)
  {
    Body = subscribe;
  }
  #endregion
}

/// WS-Eventing Subscribe response message class
[MessageContract]
public class SubscribeResponse : SubscribeResult
{
  #region Properties
  [MessageBodyMember(Name = WSEventing.ElementNames.SubscribeResponse, 
    Namespace = WSEventing.NamespaceUri, Order = 0)]
  public SubscribeResult Body
  {
    get { return this; }
    set { this.Copy(value); }
  }
  #endregion

  #region Constructors
  public SubscribeResponse()
  {
  }
  public SubscribeResponse(SubscribeResult result)
  {
     Body = result;
  }
  #endregion
}

All base classes use the IXmlSerializable interface for writing/reading an element/attribute based on the WS-Eventing spec in order to have a better control over XML serialization. Implementation of the IXmlSerializable interface is straightforward and lightweight, using public EndpoitAddress methods for XML reader and writer. Some other classes such as Delivery and Expires have been created with the same technique style. The following code snippet shows an implementation of the Subscribe base class:

[MessageContract]
[XmlSchemaProvider(null, IsAny = true)]
public class Subscribe : IXmlSerializable
{
  #region DataMembers
  EndpointAddress _endTo;
  Delivery _delivery;
  Expires _expires;
  XPathMessageFilter _filter;
  string _filterDialect;
  #endregion

  #region Properties
  //...
  #endregion

  #region Constructors
  //...
  #endregion

  #region IXmlSerializable Members
  public System.Xml.Schema.XmlSchema GetSchema()
  {
      return null;
  }
  public void ReadXml(XmlReader reader)
  {
    reader.ReadStartElement(WSEventing.ElementNames.Subscribe, 
      WSEventing.NamespaceUri);
    while (reader.NodeType != XmlNodeType.EndElement)
    {
      if(reader.IsStartElement(WSEventing.ElementNames.EndTo,
        WSEventing.NamespaceUri))
      {
          EndTo = EndpointAddress.ReadFrom(AddressingVersion.WSAddressing10, 
            reader);
      }
      else if(reader.IsStartElement(WSEventing.ElementNames.Delivery,
        WSEventing.NamespaceUri))
      {
          Delivery = new Delivery(reader);
      }
      else if(reader.IsStartElement(WSEventing.ElementNames.Expires,
        WSEventing.NamespaceUri))
      {
          Expires = new Expires(reader);
      }
      else if(reader.IsStartElement(WSEventing.ElementNames.Filter,
        WSEventing.NamespaceUri))
      {
        string dialect = reader.GetAttribute("Dialect");
        if (dialect == null || dialect == WSEventing.XPathDialect)
        {
           Filter = new XPathMessageFilter(reader);
        }
        else
        {
          _filterDialect = dialect;
          reader.Skip();
        }                      
      }
      reader.MoveToContent();
    }
    reader.ReadEndElement();
  }
  public void WriteXml(XmlWriter writer)
  {
    writer.WriteStartElement(WSEventing.NamespacePrefix, 
      WSEventing.ElementNames.Subscribe, WSEventing.NamespaceUri);
    if (EndTo != null)
    {
      EndTo.WriteTo(AddressingVersion.WSAddressing10, writer, 
        WSEventing.ElementNames.EndTo, WSEventing.NamespaceUri);
    }
    if (Delivery != null)
    {
      Delivery.WriteXml(writer);
    }
    if (Expires != null)
    {
      Expires.WriteXml(writer);
    }
    if (Filter != null) 
    {
      Filter.WriteXPathTo(writer, WSEventing.NamespacePrefix, 
        WSEventing.ElementNames.Filter, WSEventing.NamespaceUri, true);
    }
    writer.WriteEndElement();
  }
  #endregion
}

I will skip all other message contracts, as they are using the same request/response style as shown in the above Subscribe operation. I will now step into the WS-Eventing services.

Subscription manager

Subscription manager service has been decoupled into 2 layers. The first layer, closed to the "wire," implements an IWSEventing service contract. The other one implements a configurable adapter of the Event Storage. Using the request/response classes and Storage adapter, the interface implementation is lightweight and readable:

public class SubscriptionManagerService : 
  SubscriptionManagerService<ConfiguredAddapter> { }

[ServiceBehavior(ReturnUnknownExceptionsAsFaults = true)]
public class SubscriptionManagerService<T> : 
  ServiceAdapterBase<T, IStorageAdapter>, IWSEventing where T: class
{
  List<string> _delivering = new List<string>();
  List<string> _filtering = new List<string>();
  
  public SubscriptionManagerService()
  {
    // supported features
    // filtering
    _filtering.Add(WSEventing.XPathDialect);

    // delivery modes (Note that that the push mode is a mandatory mode)
    _delivering.Add(WSEventing.PushDeliveryMode);
    MessageProperties mp = OperationContext.Current.IncomingMessageProperties;
    if (mp.ContainsKey("EventingPublishers"))
    {
      Publishers publishers = mp["EventingPublishers"] as Publishers;
      if (publishers != null && publishers.Count > 0)
      {
        string[] modes = new string[publishers.Count];
        publishers.Keys.CopyTo(modes, 0);
        foreach (string mode in modes)
        {
          if (mode != WSEventing.PushDeliveryMode)
              _delivering.Add(mode);
        }
      }
    }         
  }
  
  public SubscribeResponse Subscribe(SubscribeRequest request)
  {
    // validate expiration type and value
    if (request.Expires != null)
    {
      request.Expires.IsValidTime(true);
    }
    
    // validate delivery mode
    SupportedDeliveryMode sdm = new SupportedDeliveryMode(_delivering);
    sdm.IsSupported(request.Delivery.DeliveryMode, true);
    
    // validate filtering 
    SupportedDialect sd = new SupportedDialect(_filtering);
    sd.IsSupported(request.FilterDialect, true);
    
    // response values
    Identifier identifier = new Identifier();
    Expires expires = request.Expires;
    Uri address = OperationContext.Current.IncomingMessageHeaders.To;
    SubscriptionManager manager = 
      new SubscriptionManager(address, identifier);
    
    // storage resource descriptor
    ResourceDescriptor rd = 
      new ResourceDescriptor(identifier.Value, expires.Value);
    
    // option: subscription topic
    rd.Topic = request.SubscriptionTopic;
    
    // Subscription - storage resource
    Subscription subscription = new Subscription(request, manager);
    
    // storage action 
    Adapter.Create<Subscription>(rd, subscription);
    
    // response
    SubscribeResponse response = new SubscribeResponse();
    response.SubscriptionManager = manager;
    response.Expires = expires; 
    return response;
  }
  
  public UnsubscribeResponse Unsubscribe(UnsubscribeRequest request)
  {
    // storage action
    Adapter.Delete<Subscription>(new ResourceDescriptor(
      request.Identifier.Value));
    
    UnsubscribeResponse response = new UnsubscribeResponse();
    return response;
  }
  
  public RenewResponse Renew(RenewRequest request)
  {
    // ...
  }
  
  public GetStatusResponse GetStatus(GetStatusRequest request)
  {
    // ...
  } 
}

The above class shows an implementation of the Subscribe operation contract, where the request starts processing a validation such as delivery mode, expiry time and filtering. After that, the Subscription resource and its descriptor are created and passed to the adapter factory for its persistence. Note that each resource (Subscription) can be categorized by subscription topic based on the non WS-Eventing header, or it can be commonly used for all subscribed resources from the config file.

Service Adapter

We encapsulated a service behavior related to the Event Storage into a generic virtual resource target using the Adapter concept in the service extension. This concept has been described in more detail in my article WS-Transfer for WCF. The service adapter is plugged into the service behavior layer using a WCF extension model.

The following code snippet shows a part of the service config file:

<behaviors>
  <serviceBehaviors>
    <behavior name="ManagerExtention" >
      <storage name="LocalStorage" 
        type="RKiss.WSEventing.Adapters.MemoryStorageAdapter" 
        maxsubscriptions="10" 
        topic="weather:report.storm"/>
    </behavior>
  </serviceBehaviors>
</behaviors>

<extensions>
  <behaviorExtensions>
    <add name="storage" 
      type="RKiss.WSLib.ServiceAdapterBehaviorElement, WSEventing
      Version=1.0.0.0, Culture=neutral, PublicKeyToken=null"/>
  </behaviorExtensions>
</extensions>

The above config section shows an Event Storage configuration as a MemoryStorageAdapter. The maximum subscriptions in the storage is limited to 10 for the same weather:report.storm topic. Note that the topic can be overwritten by a Subscribe request. This article includes only an implementation of the Memory Storage using the following interface. Based on this implementation, it is possible to build other ones such as SQL, WS-Transfer, etc. and plug them into the service in a loosely coupled manner.

public interface IStorageAdapter
{
  // basic
  ResourceDescriptor Create<T>(ResourceDescriptor rd, 
    T resource) where T : class;
  T Get<T>(ResourceDescriptor rd) where T : class;
  void Put<T>(ResourceDescriptor rd, T resource) where T : class;
  T Delete<T>(ResourceDescriptor rd) where T : class;
  
  // advanced methods
  void CreateOrUpdate<T>(ResourceDescriptor rd, T resource) where T : class;
  IList<T> Get<T>(string topic) where T : class;
  IList<T> Delete<T>(string topic) where T : class;
  T Delete<T>(UniqueId id);
  ResourceDescriptor GetResourceDescriptor(ResourceDescriptor rd);
  ResourceDescriptor[] GetResourceDescriptors(ResourceDescriptor rd);
  void Expires(ResourceDescriptor rd); 
}

That is all for the WS-Eventing spec implementation. Next, I am going to focus on the implementation of the Event Message delivery. As I mentioned earlier, the Event Message is a contract between the Event Sink and Event Source. The Event Source generates an Event Message payload for unknown sink without the knowledge of the delivery and application specific data. The events send an internal message to the Notification manager that is responsible for the message delivery, based on the Event Sink's interest (Subscription). This is a fully transparent event to the Event source. There is no WS-Eventing specific operation and therefore I designed the following service contracts (for this article only):

Notification manager

The Notification manager has the same design implementation as the Subscription manager based on the pluggable Event Storage adapter. We are using the MemoryStorage adapter; therefore both managers must be hosted on the same appDomain. In other cases, the SqlStorage or WS-Transfer adapters are required to plug-in.

The Notification manager has a built-in a Push Publisher for default delivery mode. See the next code snippet for details. For other delivery modes (custom) such as Pull, Wrapp, Batched, etc., the Notification manager service has a capability to plug them in a loosely coupled manner based on the configuration. Using the RKiss.WSEventing.PublishersBehaviorSection class, the service behavior can be configured as it is shown in the following picture:

The above picture shows the Notification manager service decoupled into the Event Storage adapter and Publishers. Notice that the built-in default Push Publisher can be disabled by a configurable Push Publisher. The configurable (custom) Publisher is plugged into the Publisher ThreadPool via a callback DeliveryMessage method in the abstract PublisherBase class:

public abstract class PublisherBase
{
  #region Properties
  HybridDictionary _properties;
  public HybridDictionary Properties
  {
      get { return _properties; }
  }
  #endregion

  #region Constructors
  protected PublisherBase()
    : this(null)
  {
  }
  protected PublisherBase(HybridDictionary properties)
  {
     _properties = properties;
  }
  #endregion

  public void DeliveryMessage(object state)
  {
    FireState firestate = state as FireState;
    if (firestate != null)
    {
        OnDeliveryMessage(firestate.Message, firestate.Subscription);
    }
  }

  public virtual void OnDeliveryMessage(Message message, 
    Subscription subscription)
  {
  }
}

You can implement a custom delivery mode for a specific subscription by overriding the OnDeliveryMessage method in the derived class. Note that the Publishers are running in the ThreadPool in the fire&forget manner. The following code snippet shows a dummy custom Publisher (tester):

public class PublisherTester : PublisherBase
{
  #region Constructors
  public PublisherTester() : base() { }
  public PublisherTester(HybridDictionary properties) : base(properties) { }
  #endregion

  #region PublisherBase override
  public override void OnDeliveryMessage(Message message, 
    Subscription subscription)
  {
    Console.WriteLine("\n=============== tester =========================");
    Console.WriteLine(message);
    Util.ShowMe(subscription);
    Console.WriteLine("\n================================================");
  }
  #endregion
}

Notification manager - service contract

Notification manager supports simple service contracts. The first operation is for firing the event that will dispatch an Event Message and the other one is for sending a SubscriptionEnd message to inform the Event Sinks required by WS-Eventing spec. The following code snippet shows this service contract.

[ServiceContract]
public interface INotificationManager
{
  [OperationContract(Action = "*", IsOneWay = true)]
  void FireSubscription(Message message);

  [OperationContract(Action = "CancelSubscription", IsOneWay = true)]
  void CancelSubscription(Uri code, string reason, string culturename);
}

Note that that the FireSubscription operation has been attributed for any Action (*) to receive a message for its rerouting based on the Subscription delivery. Let's look at the implementation of the FireSubscription operation contract. Based on the event topic, we can get the list of all Subscriptions for this situation. Walking trough the list, we delegated a copy of the message to the worker thread for message delivery in the async manner.

public void FireSubscription(Message message)
{
  // configuration
  string topic = Properties["topic"] as string;
  MessageProperties mp = OperationContext.Current.IncomingMessageProperties;
  Publishers publishers = mp.ContainsKey("EventingPublishers") ?
      mp["EventingPublishers"] as Publishers : null;
 
  using (TransactionScope ts = new TransactionScope(
    TransactionScopeOption.Required))
  {
    // Get subscriptions from Event Storage
    IList<Subscription> subscriptions = Adapter.Get<Subscription>(topic);
    
    // Create buffered message
    using (MessageBuffer buffer = message.CreateBufferedCopy(0xffffff))
    {
      // walk through all subscriptions
      foreach (Subscription subscription in subscriptions)
      {
        //Util.ShowMe(subscription);

        // filter message
        if (subscription.Filter == null || subscription.Filter.Match(buffer))
        {
          FireState state = new FireState(subscription,buffer.CreateMessage(),
            Properties);
          Publisher publisher = null;

          // delivery mode
          if (publishers != null && 
            publishers.TryGetValue(subscription.Delivery.DeliveryMode, 
              out publisher))
          {
            // loosely coupled Publisher
            Properties["Publisher"] = publisher;
            object[] args = new object[] { Properties };
            PublisherBase driver = 
              (PublisherBase)Activator.CreateInstance(publisher.Type, args );
            ThreadPool.QueueUserWorkItem(new WaitCallback(
              driver.DeliveryMessage), state);
          }
          else if (subscription.Delivery.DeliveryMode == null || 
            subscription.Delivery.DeliveryMode == WSEventing.PushDeliveryMode)
          {
            // default built-in Push Publisher
            ThreadPool.QueueUserWorkItem(new WaitCallback(
              FireSubscriptionWorker), state);
          }
        }
      }
    }
  }
}

And here is the worker for the message delivery:

public void FireSubscriptionWorker(object state)
{
  // state
  FireState firestate = state as FireState;

  try
  {
    // destination
    EndpointAddress epa = firestate.Subscription.Delivery.NotifyTo;
    
    // create config name for binding 
    string epname = (string)firestate.Properties["epname"] ?? string.Empty;
    string configname = string.Concat(epname, epa.Uri.Scheme);
    
    // action
    using (ChannelFactory<IOneWay> cf = new ChannelFactory<IOneWay>(
      configname,epa))
    {
      IOutputChannel channel = (IOutputChannel)cf.CreateChannel();
      channel.Send(firestate.Message);
      channel.Close();
    }  
  }
  catch (Exception ex)
  {
    try
    {
      Adapter.Delete<Subscription>(
        firestate.Subscription.Manager.Identifier.Value);
      SubscriptionEnd subscriptionEnd = new SubscriptionEnd(
          firestate.Subscription.Manager,
          SubscriptionEndCode.DeliveryFailure,
          ex.Message);
      CancelState cancelstate = 
        new CancelState(firestate.Subscription.EndTo,subscriptionEnd,
        firestate.Properties);
      CancelSubscriptionWorker(cancelstate);

      Trace.WriteLine(ex.Message);
    }
    catch (Exception ex2)
    {
        Trace.WriteLine(ex2.Message);
    }
  }
}

The FireSubscription worker has all information (Subscription) about how to deliver a message, except for one. The WS-Eventing for Web Services expected a basic binding for delivery notification to the Event Sink (ASMX service). In other binding cases -- for example, TCP binding -- we don't have a sink's binding description in the Subscription. If the sink does not support WS-MetadaExchange, we have to find some configuration mechanism. One of the solutions is to have a client config section in the Notification manager service. Based on the NotifyTo EndpointAddress schema, we can map the correct binding for notification delivery. Notice that I am planning to change this mechanism in the next version. The following code snippet shows an example of the client section for delivery binding:

<client>
  <!--<span class="code-comment"> Notification binding --></span>
  <endpoint name ="http"
    address="http://tbd" binding="basicHttpBinding" 
    contract="RKiss.WSEventing.IOneWay" />
  <endpoint name ="net.tcp" 
    address="net.tcp://tbd" binding="customBinding" 
    contract="RKiss.WSEventing.IOneWay" 
    bindingConfiguration="binding1"/>
  <endpoint name ="net.msmq" 
    address="net.msmq://tbd" binding="netMsmqBinding" 
    contract="RKiss.WSEventing.IOneWay" />
</client>

Notice that the application can have more Notification managers configured for a specific event topic.

Test

The WS-Eventing solution is divided into the WSEventing core library and Test projects:

The WSEventing assembly must be included with your event driven application in order to use Subscription and Notification managers. The Test folder contains a project for hosting managers that generate and consume Events and Subscribers (client application). All processes are a console program. Launch the ServiceHost (Server) program and the ClientApplication (Subscriber). Press the Enter key on the ClientApplication for subscribing application interests. The following screen snippet shows this situation:

Screenshot - DrawingEventing9.jpg

The next steps are launching the EventSink and EventSource console programs. You should do this within the 5 minutes; that's the expiry time setup in this example. Otherwise, create another subscription by the ClientApplication. Now press any key on the EventSource console to generate a weather report for EventSink. You should see the following screens:

Screenshot - DrawingEventing10.jpg

Usage

Finally, we are here. I am going to show steps that move any (no response) web service messaging to the WS-Eventing driven messaging using the WSEventing assembly.

Step 1. EventSource - EventSink

Let's assume that we have a standard OneWay Service contract, as is shown in the following example. This example can be found in my Tester. It's a very simple example with one operation for generating a WindReport.

[ServiceContract]
[XmlSerializerFormat]
public interface IWeather
{
  [OperationContract(IsOneWay=true,
    Action="http://www.example.org/oceanwatch/WindReport")] 
  void WindReport(WindReportRequest request);
}

[MessageContract]
public class WindReportRequest
{
  [MessageBodyMember]
  public WindReport WindReport;
  public WindReportRequest() { }
  public WindReportRequest(WindReport report) { WindReport = report; }
}

The following picture shows a connected system containing the Event Sink and Event Source ends. The Event Sink is a consumer of the OneWay message based on the IWeather service contract.

Screenshot - DrawingEventing11.jpg

Note that the Event Source and Event Sink have been logically connected based on the ABC descriptions. See the aforementioned part of the config file for the Event Source. Notice also that the address is the same: net.tcp://localhost:33333/OnStormWarning

There is nothing special involved in the step 1. The Event Source sends a single message to the known Event Sink in the fire&forget manner. One message is delivered to one destination endpoint. We don't need to know anything about the Pub/Sub Notifications if the application will keep this configured pattern (One - to - One). The situation will start when the Event Source wants to send a message to the unknown Event Sinks based on their interest. For this delivery pattern (One - to - Many) we need a step 2 and redirect an address of the Event Source to the Notification manager. In our example, it is address="net.tcp://localhost:22222/NotificationManager"

Step 2. Event Sink/Subscriber

We need a hosting server for WS-Eventing managers loaded from the WSEventing assembly for this step. We can use a ServerHost console host program from the Test package. So, let's launch this console program. The screen will show two listeners: the Subscription and Notification Managers. Since the Event Source has been addressed to the Notification Manager, we can generate an event by Event Source and see the message arrive in the manager, but that's all. The Event Sink will still wait for the events.

Why? What happened? Why didn't the Notification manager deliver a message to the Event Sink? Well, the answer is simple. The Notification manager's knowledge does not have an instruction to do so. It is the Subscription managers' process, based on the Event Sink interest, to create an instruction (Subscription). The following code snippet is an example of how the client application can subscribe an interest to receive an event. The interest -- such as Delivery, Filter and etc. -- needs to be populated in the Subscribe Request and sent to the Subscription Manager. More details can be found in the Test package ClientApplication (Subscriber) project.

using(SubscriptionManagerClient sm = 
  new SubscriptionManagerClient("SubscriptionManager"))
{

  #region Operation Subscribe
  // Prepare request for subscribing interest (subscription)
  SubscribeRequest subscribeRequest = new SubscribeRequest();
 
  // option: Organize subscriptions in the Storage based on the Topic
  subscribeRequest.SubscriptionTopic = "weather:report.storm";
 
  // Push Delivery
  Uri uri = new Uri("net.tcp://localhost:33333/OnStormWarning");
  subscribeRequest.Delivery.NotifyTo = new EndpointAddress(uri);

  // option: EndTo
  uri = new Uri("net.tcp://localhost:44444/Admin");
  subscribeRequest.EndTo = new EndpointAddress(uri);

  // option: Filter
  XmlNamespaceManager nsmgr = new XmlNamespaceManager(
    new XmlDocument().NameTable);
  nsmgr.AddNamespace("ow", "http://www.example.org/oceanwatch");
  string expression = "//ow:WindReport/ow:Speed >= 65";
  subscribeRequest.Filter = new XPathMessageFilter(expression, nsmgr);

  // option: Lifetime of the Subscription
  subscribeRequest.Expires = new Expires(TimeSpan.FromSeconds(300));
    
  // Action - Subscribe this interest
  SubscribeResponse subscribeResponse = sm.Subscribe(subscribeRequest);
  #endregion

}

Note that the above highlighted address in the delivery section is the address of the Event Sink. Once the Subscriber subscribes an interest, the Notification manager will deliver the Event Source message to the NotifyTo address. Of course, if the watchdog of the Event Store will not terminate this subscription, see the above setup for 300 seconds. The following figure shows connected systems in the Pub/Sub Notification scenario. We can subscribe more Event Sinks for different event topics, filters, etc. and examine how the WS-Eventing handles it.

Conclusion

In this article, I have described the design and implementation of the WS-Eventing specification. It is a basic (lightweight) spec comparable to the WS-Notification family (a topic-based publish/subscribe pattern). The Pub/Sub notification plays a significant role in the event driven Service Oriented Architecture. For example, the long running business workflow or updating business state requires the use of the Eventing pattern for efficient usage of resources. This scenario is usually divided into the pre-processor, processor and post-processor (sync - async - notify) workflow. The post-processor can use the WS-Eventing to notify clients that the workflow has been completed. The other very common scenario is data driven Storage, where the clients are waiting for a situation in the Storage. For example, in the real estate business where the buyers want to be notified when their real estate interests are found on the market. There are many other scenarios, but the common pattern for them is the event driven model. WS-Eventing and WS-Transfer enable this model in the WS-* Service Bus connected systems.

References

History

  • 4 June, 2007 -- Article edited and posted to the main CodeProject.com article base
  • 23 November, 2006 -- Article updated
  • 4 June, 2006 -- Original version posted

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

About the Author

Roman Kiss
Software Developer (Senior)
United States United States
No Biography provided

Comments and Discussions

 
QuestionBehaviorExtensionElement Pinmemberandeyv18-Aug-06 10:14 
AnswerRe: BehaviorExtensionElement PinmemberRoman Kiss18-Aug-06 10:24 
GeneralRe: BehaviorExtensionElement Pinmemberandeyv18-Aug-06 12:46 
GeneralRe: BehaviorExtensionElement PinmemberRoman Kiss18-Aug-06 12:49 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Mobile
Web03 | 2.8.140415.2 | Last Updated 5 Jun 2007
Article Copyright 2006 by Roman Kiss
Everything else Copyright © CodeProject, 1999-2014
Terms of Use
Layout: fixed | fluid