Click here to Skip to main content
Click here to Skip to main content

Azure Service Bus Pusher

, 3 Apr 2012 CPOL
Rate this:
Please Sign up or sign in to vote.
This article describes the design and implementation of the Push Service for Windows Azure Service Bus Messaging.

Contents

Features

  • Pub/Sub-Push Model (PushAndPush model)
  • Virtual Hosting for Subscribers and Receivers in the WorkerRole
  • Table Storage for runtime and configuration Repository
  • Pub/Sub Controller for managing WorkerRole
  • No external endpoints
  • Target endpoint for Brokered Message
  • Target endpoint for BasicHttpBinding (http/https)
  • Pushing messages across the namespaces, topics, etc.
  • Option to send a BrokeredMessageProperty Header
  • Option for Message Version (default is Soap12)
  • Direct or Indirect Push addressing
  • Windows Azure Service Bus Messaging (November 2011)
  • WCF Technology

Introduction

The Windows Azure Service Bus represents a logical connectivity between the WCF and other service endpoints, including REST endpoints. Basically, there are two messaging capabilities such as "relayed" and "brokered" messaging. This article is focuses on the brokered messaging.

The Brokered Messaging only provides one-way durable asynchronous communication with messaging components such as Queue, Topic and Subscription with Publish-Subscribe (PubSub) features. Its infrastructure is based on the PushAndPull loosely decouple distributed architecture.

The brokered message is pushed to the specific component entity (Queue or Topic) and stored temporally until the receiving party is ready to pull it out. That's why it is called a PushAndPull model. The consumer of the event interest is logically connected to the event source via a service bus component such as queue or topic/subscription. In this scenario, the consumer of the event interest represents a service to the service bus for "pulling" message from its component.

From the architecture point of the view, the consumer of the service bus is a receiver of the message, must be hosted and available via endpoint which can be consumed by service bus delivery part. For more details about this technique see my article Azure Service Bus Tester, where this little tool can help you understand the usage of the Azure Service Bus Brokered Messaging.

The other architecture style for consuming service bus components is their full encapsulation. In other words, the consumer of the event interest doesn't need to know any details about the connectivity. Its receiver is a generic service for message mediation without a knowledge where and how the message was created. It can be created from the service bus, from 3rd party adapter, directly from the client, etc. In this scenario, the receiver is a regular message service, not a subscriber neither a receiver to the Service Bus.

For this kind of architecture we need to extend the Service Bus infrastructure with a service to provide a PushAndPush connectivity pattern, where the event interest is pushed to the Service Bus and then pushed again to the service (sink) of the event interest.

This article describes in detail how to design and implement this Pusher Service for Azure Service Bus.

The following picture shows its position to the Windows Azure:

Basically, the above picture shows a Pusher Service hosted by WorkerRole as a consumer of the Service Bus components from where the message is pulled and push to the target service based on the metadata stored in the Table Storage. This is a standard scenario of the Service Bus Brokered Messaging, only the difference is that there is a centralized consumer and mapper of the event interests published by publishers/senders on the Service Bus.

The Table Storage is a Repository of the metadata for Pusher Service. There are start-up data for creating subscribers/receivers and runtime data for pushing a message to the specific target. Besides that, there is one subscriber for controlling a Pusher. Note that the Pusher is running in the background process as a WorkerRole with a multiple instances and sharing the same Table Storage. There is no public endpoint for communication with the Pusher, all control is done via the Service Bus, therefore any control message send to the Topic Pusher is multicast across the all WorkerRole Instances. On the other side, the Pusher has a built-in a publisher of the Info/Warning/Error notification messages.

So, for any registered Service Bus components such Queue, Topic/Subscription we can assign (subscribe) a Pusher in the Table Storage. When the published message found the match in those components, the copy of the message is delivered to the Pusher and based on the runtime metadata from the Table Storage is pushed to the specific target endpoint.

Subscribing interest on the Service Bus is different between the queue and subscription components. The following screen snippet shows these differences:

As you can see, the Pusher is assigned to the Queue and/or to the specific Rule of the Subscription. Each row in the Table (Repository) must be unique and defined by address of the Service Bus components. For Subscription, the RuleName is added for unique definition (RowKey in the Table Storage). Based on this, you can see in the above picture that the queue has only one Pusher, but the Subscription can have multiple ones, based on the needs.

Note, that the Pusher will always create only one subscriber for each subscription. For example, if the Subscriber has one or multiple rules, it will always host only one subscriber, because they have the same listener Uri on the Service Bus.

One more thing, the Pusher has a responsibility to push (forward) a received message by subscriber to the target endpoint. It is one-to-one mapping pattern. Therefore only the Service Bus (registered components such as Subscription and Rule) can handle a message multicasting. For example, if our scenario needed a message multicasting we can register a subscription with multiple Rules with the same SqlExpression filter (max limit is 64) or using the same Rule for multiple Subscriptions (max limit is 2000). From the Pusher point the view, I do recommend to use the first case such as one subscription (one subscriber) with multiple Rules.

Well, now based on the above description, we can abstract our pusher like it is shown in the following picture:

The above picture shows, that the Publisher message is pushed via Mapping Table to the Target Service. Administrating the Table Storage we can manage this process on the runtime such as adding a new push, enable/disable push, editing a target endpoint, etc.

The Pusher allows flexible options for selecting a target endpoint address. Basically, there are two address options such as direct by brokered message, which has a priority and the other one is indirect via Table Storage. Based on the value of the brokered message property To, the Pusher will know how to handle this push delivery. This is a very powerful concept which allows to address a target from the client side (Publisher), Subscription using a Rule Action and finally via Repository.

The following screen snippet shows the $Default Rule Action for setting up a sys.To property using the name of the Rule. Based on this alias name, the target endpoint address is selected from the Repository during the runtime.

Note, that the Subscription Rule has a Filter with a sql expression. Only a true match will allow to execute a SqlAction and delivering a message copy to the subscriber. We can also evaluate a value of sys.To property as part of the SqlExpression. This feature is giving us to dynamically change the target endpoint address based on the needs.

That's great. Can we also setup a target endpoint address to the Service Bus components such as Queue or Topic? Yes, for example, the Action like SET sys.To = 'sb://rk2011.servicebus.windows.net/image' will push the message to the queue at the namespace rk2011.

The following picture shows a scenario when Publisher published a message to the Queue, then Pusher to the Subscription, where multiple Rules can multicast message to the Services. This example basically shows a multicasting a queue message:

The other scenario demonstrating usefulness of Pusher is a "bridge" between the different Service Bus Namespaces. In this scenario the Pusher is practically a Publisher for next Service Bus:

OK, let's continue with the concept and design. I am assuming you have a working knowledge of the Windows Azure Platform.

Concept and Design

The Azure Service Bus Pusher concept is based on the pulling a brokered message by WCF Subscriber and forwarding (pushing) this message to the target endpoint. The target endpoint is dynamically selected based on the brokered message property To.

Target Addressing

Basically, the target endpoint can be either the Service Bus Component (such as Queue or Topic) or BasicHttpBinding endpoint. This address can be declared either directly in the message or in the Repository. The following picture shows the target addressing:

In the first two cases, the BrokeredMessage.To property has a direct value of the endpoint address. In the case of the sb schema, the brokered message is forwarded to the Service Bus components. In the last two scenarios, the BrokeredMessage.To property has a name of the Rule (for example, $Default) which can be used to query a target endpoint address from the Repository.

The following code snippet shows a private method for delivering a push message to the Service Bus Components (Queue or Topic):

private void PushMessageToServiceBus(string address, Message message)
{
    ChannelFactory<IGenericOneWayContract> factory = null;

    try
    {
        var ns = address.Substring("sb://".Length).Split(new char[] { '.' }, 2)[0];

        var query = Repository.CreateQuery<AccountEntity>()
                        .AddQueryOption("$filter", string.Format("(PartitionKey eq '{0}' and RowKey eq '$Namespace')", ns))
                        .AddQueryOption("$top", "1");

        AccountEntity[] accounts = query.Execute().ToArray();

        if (accounts.Count() == 0)
            throw new EntryPointNotFoundException(string.Format("Account {0} doesn't exist in the Repository", ns));

        var securityBehavior = new Microsoft.ServiceBus.TransportClientEndpointBehavior()
        {
            TokenProvider = Microsoft.ServiceBus.TokenProvider.CreateSharedSecretTokenProvider(accounts[0].AccountName, accounts[0].AccountKey),
        };

        var binding = new NetMessagingBinding();
        var se = new ServiceEndpoint(ContractDescription.GetContract(typeof(IGenericOneWayContract)), binding, new EndpointAddress(address));
        se.Behaviors.Add(securityBehavior);

        factory = new ChannelFactory<IGenericOneWayContract>(se);
        var channel = factory.CreateChannel();
        channel.ProcessMessage(message);

        factory.Close();
    }
    catch (Exception ex)
    {
        if (factory != null)
        {
            if (factory.State == CommunicationState.Faulted)
                factory.Abort();
            else
                factory.Close();
            factory = null;
        }

        throw ex;
    }
}

HTTPS and Basic Authentication

The Push message to the Target can be secured using a https transport and basic authentication. The client credential such as UserName and Password are stored in the Repository Table for each pusher entry or individually using an entry $Credential row.

The following code snippet shows a security part of the http/https client proxy:

if (binding.Security.Mode == BasicHttpSecurityMode.TransportWithMessageCredential)
{
    if (string.IsNullOrEmpty(entity.UserName) || string.IsNullOrEmpty(entity.Password))
    {
        #region Get Credential entity from the Repository
        var url = new Uri(entity.Url);
        string key = (url.Host + url.AbsolutePath).Replace("/", "%");

        var query = Repository.CreateQuery<CredentialEntity>()
                        .AddQueryOption("$filter", string.Format("(PartitionKey eq '{0}' and RowKey eq '$Credential')", key))
                        .AddQueryOption("$top", "1");

        CredentialEntity[] credential = query.Execute().ToArray();
        if (credential.Count() == 0)
            throw new EntryPointNotFoundException(string.Format("Credential Entity {0} doesn't exist in the Repository", key));

        entity.UserName = credential[0].UserName;
        entity.Password = credential[0].Password;
        #endregion
    }

    factory.Credentials.UserName.UserName = entity.UserName;
    factory.Credentials.UserName.Password = entity.Password;
    binding.Security.Transport.ClientCredentialType = HttpClientCredentialType.Basic;

    ServicePointManager.ServerCertificateValidationCallback = new RemoteCertificateValidationCallback(delegate { return true; });
}              

As you can see, when the credential is empty in the PusherEntity row, the Table will query all $Credential rows for PartitionKey = key. The PartitionKey for $Credential row is based on this format: (url.Host + url.AbsolutePath).Replace("/", "%").

Note, that the target side (service) must be enabled for https binding and accept the client's credential. In this scenario, you need to install a SSL certificate on IIS and configure the virtual directory to require SSL.

<basicHttpBinding>
  <binding name="secure">
    <security mode="TransportWithMessageCredential">
      <transport clientCredentialType="Basic" />
    </security>
  </binding>
</basicHttpBinding> 

Subscriber/Receiver

Subscriber or Receiver is a hosted service for pulling an event interest from the Service Bus such as Topic or Queue based on the Subscription rsp. Queue name. In the case of Pusher, the Subscriber/Receiver is not an end (business) message consumer; its design can be virtualized with one common service. Thanks for NetMessagingBinding for simplifying this pulling process and abstracted by WCF model.

The following code snippet shows an implementation of creating a subscriber host using a WCF hosting technique:

public static ServiceHost CreateSubscriberHost(AccountEntity account, string addressTopic, string addressSubcription, bool bRequiresSession = false)
{
    if (account == null)
        throw new ArgumentNullException("account");

    var securityBehavior = new TransportClientEndpointBehavior()
    {
        TokenProvider = TokenProvider.CreateSharedSecretTokenProvider(account.AccountName, account.AccountKey),
    };

    var binding = new NetMessagingBinding();
    var cd = ContractDescription.GetContract(typeof(IGenericOneWayContract));
    cd.SessionMode = bRequiresSession ? SessionMode.Required : SessionMode.Allowed;
    
    EndpointAddress topicEndpointAddress = new EndpointAddress(addressTopic);
    var se = new ServiceEndpoint(cd, binding, topicEndpointAddress);
    se.ListenUri = new Uri(addressSubcription);
    se.Behaviors.Add(securityBehavior);

    var host = new ServiceHost(typeof(PusherService));
    host.AddServiceEndpoint(se);

    return host;
}

As you can see, this is a straightforward implementation for creating a ServiceHost instance, which will be listening for addressTopic. Don't be a confused with the ListenerUri property. This property is for service bus broker, which is needed for listening to a specific subscription. This "trick" is cover by NetMessagingBinding logic.

The common service for Subscriber/Receiver is the PusherService with a generic (untyped) one way contract.

[ServiceContract(Namespace = "urn:rkiss.sb/pusher/2012/03")]
public interface IGenericOneWayContract
{
    [OperationContract(IsOneWay = true, Action="*")]
    void ProcessMessage(System.ServiceModel.Channels.Message msg);
}

Note, that this service is also used for an internal controller such as Update, Restart, etc., see more details later.

PusherService

The PusherService is an implementation of the IGenericOneWayContract service contract. This is a place, where a message from the Service Bus is pulled (received).

The following code snippet shows its boilerplate logic, where are basically two branches such as Controller and Push message. Based on the Target addressing, the Push Message has its own implementation and processing. The pusherKey to query a Repository for additional metadata is obtained from the specific host using an OperationContext.Current. In the direct target addressing, the endpoint address can be obtained from bmp.To property. More implementation details such as http/https and sb proxies can be found in the included source code.

public class PusherService : IGenericOneWayContract
{
  public void ProcessMessage(System.ServiceModel.Channels.Message msg)
  {
    Guid? hostId = ((ServiceHost)OperationContext.Current.Host).Id();
    Uri listener = OperationContext.Current.EndpointDispatcher.ChannelDispatcher.Listener.Uri;
    string pusherKey = (listener.Host + listener.AbsolutePath).Replace('/', '%');
    bool IsSubscription = pusherKey.IndexOf("%subscriptions%") > 0;

    if (msg.Properties.ContainsKey(Microsoft.ServiceBus.Messaging.BrokeredMessageProperty.Name) == false)
    {
       Trace.TraceWarning("Only brokered message is supported. " + pusherKey);
       return;
    }

    if (msg.Headers.Action.StartsWith("urn:rkiss.sb/pusher/2012/03/IPusher/", StringComparison.CurrentCultureIgnoreCase))
    {
       // Controller IPusher 
    }
    else
    {
       var bmp = msg.Properties[Microsoft.ServiceBus.Messaging.BrokeredMessageProperty.Name] as Microsoft.ServiceBus.Messaging.BrokeredMessageProperty;

       try
       {
          #region Push this message           
          if (string.IsNullOrEmpty(bmp.To) == false && bmp.To.IndexOf("://") > 0)
          {
             // delivery message based on the message's url                   
          }
          else
          {
             // delivery message based on the Repository
          }
          #endregion
        }
        catch (Exception ex)
        {
            // Notification
        }
    }
  }
}

Hosting

Each Subscriber/Receiver has its own hosting instance. Basically, there are two kinds of hosting such as hosting for Controller and for application Push Messages. The Controller is a mandatory subscriber built in for control purposes of the Pusher (Update, Restart, NotifyStatus, etc.) and it is configured by the settings in the config file. The other hosts are for Subscribers/Receivers that want to Push their message to the specific Target. For the hosting concept is used a standard WCF technique within the WorkerRole instance process.

The following picture shows this scenario:

Controller for Pusher

The Pusher itself is also using a capability of the Service Bus for its controlling and notifications. In other words, the Pusher is using a PubSub features of the Azure Service Bus itself. There are built-in publishers for notifications of the Pusher behavior and also a subscriber for receiving a request command, for instance, Restart WorkerRole.

The Pusher can run in the multiple WorkerRole Instances in the parallel manner, sharing the same Repository data. For this design pattern, the Controller can use a feature of the Service Bus such as multicasting Topic/Subscription.

Each Controller in the WorkerRole will create (if doesn't exist) own Subscription on the Pusher Topic with one $Default Rule for pairing with the admin tool. The following picture shows a screen snippet from the Service Bus Explorer which is showing that the pusher topic has two subscriptions (instance 0 and 1).

Any Publisher on the Topic with a pairing key/value can fire a message to the Pusher Controller. You will see more details about this scenario in the Usage section when the Service Bus Tester is used.

WorkerRole

The ServiceHosts for Subscribers, Receiver and Controller must run non-stop 24/7 for pulling and pushing messages from the Service Bus. Therefore the best candidate for keeping all components in the runable process is a WorkerRole like on-promises is a Windows NT Service. Basically, the WorkerRole role is a singleton instance for holding ServiceHosts references and for keeping them in the active state.

The following code snippet shows an override method of the WorkerRole.OnStart. This is a place for creating a hosting for each subscriber, receiver and pusher controller.

public override bool OnStart()
{
    // Set the maximum number of concurrent connections 
    ServicePointManager.DefaultConnectionLimit = 12;

    CloudStorageAccount.SetConfigurationSettingPublisher((configName, configSetter) => configSetter(configName));

    // mark this start
    Guid? sessionId = Guid.NewGuid();

    // subscriber for Controller
    string subscriptionName = RoleEnvironment.CurrentRoleInstance.Id.Split('.').Reverse().FirstOrDefault();
    SubscriptionDescription  sd = ServiceBusExtension.CreateSubscriptionIfNotExist(subscriptionName);
    if (sd != null)
    {
        // create subscriber for this subscription
        hostWorkerRoleSubscriber = Hosting.CreateDefaultSubscriberHost(subscriptionName);
        hostWorkerRoleSubscriber.Extensions.Add(new HostExtension { SessionId = sessionId });
        hostWorkerRoleSubscriber.Open();
        hostWorkerRoleSubscriber.Faulted += delegate(object source, EventArgs e)
        {
            ServiceHost sh = source as ServiceHost;
            Trace.TraceError("Host_{0} for Controller {1} failed", sh.Id(), sh.PusherName());
            sh.Abort();
            hostWorkerRoleSubscriber = null;
        };

        Trace.TraceInformation("WorkerRole Host_{0} for Controller {1} has been opened.", sessionId, hostWorkerRoleSubscriber.PusherName());
    }
    else
    {
        Publisher.PublishWarning("Creating Subscriber for Pusher Control failed. The Pusher is not ready to received any control message");
    }

    // Load all Application Subscribers from Repository
    Hosting.Load(sessionId);

    RoleEnvironment.Changing += RoleEnvironmentChanging;
    return base.OnStart();
}

The first subscriber is created for Controller and opened synchronously based on the setting configuration data. The following picture shows its configuration setting for Storage, Service Bus, Subscription and Publisher:

The others subscribers/receivers are created asynchronously based on the Repository metadata. For this reason, each WorkerRole start has its own id to distinguish a post-processing for closing a ServiceHost.

Repository (Table Storage)

The Repository represents a start-up and run-time knowledge base of the Pusher. This is a shareable place between the Pusher and its consumers. The Repository can be updated via the Admin tool or by the client programmatically via the REST API.

During the WorkerRole start up, all enabled pushers described in the Repository must be subscribed to the Service Bus. This step will allow us to pull-up messages from the Topic and/or Queue. When the message is received by the PusherService, based on the runtime Repository, the message can be delivered to the Target endpoint. Note, that this runtime Repository data can be changed any time without Updating the WorkerRole. In the case of adding a new subscriber/receiver, the WorkerRole will update its state (closing or opening ServiceHost for specific subscriber/receiver)

The following picture shows three entities supported by the Repository. The first one is for holding a Service Bus Account needed for accessing to the Service Bus. The second one is for Subscriber/Receiver push message and the last one is for HTTPS Pusher credential.

The above properties of these entities are for runtime decision, where and how the received message needs to be forwarded/pushed. The start-up data are stored in the Repository in the TableServiceEntity using the PartitionKey and RowKey. These two properties are represented as a unique logical Pusher entry in the Repository.

For Pusher Repository a Table Storage is used for its administration I am using a popular free Azure Storage Explorer. For the production version I do recommend to build a Pusher Admin Tool for administrating this Repository and hiding its setup properties.

The following picture shows a screen snippet of my Push Storage made by Azure Storage Explorer:

As I mentioned earlier, the Pusher entry in the Repository can be for two entities such as AccountEntity and PusherEntity. In addition the PusherEntity must be done for Service Bus component Queue and Topic. To optimize query on the Table Storage, the RowKey is used for identifying type of the row (Pusher Entry).

There are the following predefined values in the RowKey:

  • $Default, this is the default name of the Subscription first Rule declared by Service Bus
  • $Namespace, this is the key for Account Entry
  • $Credential, this is the key for client credential (UserName/Password)
  • $Queue, this is the key for Queue entry
  • any valid name of the Subscription Rule.

The following screen snippet shows an example of the Account entry for specified Namespace in the PartitionKey:

The PartitionKey for other entry than $Namespace will represent either the Subscription address or Queue address. Since the format of the url address cannot be inserted into PartitionKey, it is necessary to make some changes such as: deleting a schema with delimiter and replacing character '/' by '%'.

The following screen snippet shows these changes for Queue entry:

Based on the above entry, we can register a Pusher for Queue with address sb://rk2011.servicebus.windows.net/search. The received message is pushed to the Target defined by Url address, message version and with an option to pass a BrokeredMessageProperty as a custom Soap Header. If the message cannot be delivered to the Target, the Notification message is published to the Service Bus Topic. The Pusher entry can be disabled by property Enabled.

The following two pictures show examples of the Subscription entries in the Repository. The first one is for $Default Rule and the second one is for bridgeByPass Rule:

BrokeredMessageProperty Header (BMP Header)

In the case of the message pushing to the Soap Target, we can enable IncludeBMP option to insert the BrokeredMessageProperty into the Soap Headers section. This feature allows filtering these properties without deseralizing a message payload, for example, using the WCF RoutingService.

The following screen snippet shows an example of the BMP Header in the Soap11 Message:

OK, it's show time. Let's describe what this Service Bus Pusher can do for you.

Usage and Test

First of all, the following are prerequisites:

The Pusher requires knowledge and experience from the building and deploying application on the Azure, Table Storage and usage of the Azure Service Bus Messaging. Therefore, I am assuming you have a working knowledge of the Windows Azure Platform.

The Pusher Solution

The Pusher Solution contains only one WorkerRole where all features based on the metadata stored in the Table Storage are implemented. There are test resources for Push table and Virtual Publisher Tester in the folder called Test.

The following screen snippet shows this Solution:

Note, that the following steps are based on the configuration settings such as Table Name, Topic Name, etc. I do recommend keeping these names for now. Of course, they can be changed based on the needs. In the following steps I will use 3rd party tools for simulate and explore the usage of the Pusher for Azure Service Bus.

The first steps (A) will prepare our Pusher for administration such as sending the control messages, receiving notifications, etc.

Step A. Update Settings for your Storage Account and Service Bus Namespace

Step A1. Create pusher Topic and Notifications Subscription on the Service Bus

Use the Service Bus Explorer for creating the Topic and Subscription on the Azure Service Bus.

The following screen snippet shows the pusher Topic and Notifications Subscription on the Service Bus:

Note, that the pusher Topic will have additional Subscriptions created by each instance of the WorkerRole.

Step A2. Create Virtual Admin Tool

Use the Azure Service Bus Tester to create a virtual Subscriber for receiving notifications from the Pusher.

Use the Azure Service Bus Tester to create virtual Publishers for firing a control messages to the Pusher. The templates of the messages can be loaded from the solution folder Test.

Step A3. Compile AzurePushService Solution from this article

In this step we will run the package with Windows Azure Compute Emulator and watch all trace notifications on the Service Bus Tester.

The following screen snippet shows result on the Emulator:

And on the Tester we should see two notifications one from each instance. The messages notified that the Pusher has been started.

Step A4. Fire NotifyStatus from the Tester

Firing a virtual publisher pusher.NotifyStatus, the control message is sent to the pusher Topic and received by both WorkerRole instances. If the message is recognized by PusherService, the Pusher will publish a list of the hosted subscribers. Since, the Push Table is empty at this moment; the following status is also empty:

The same way, we can fire other Publishers to see their notifications.

That's great. We have a full control over the Pusher. Now, it is time to add a few push entries into the Repository (Push Table Storage).

Let's make some examples.

Step B1. Example of the Queue Multicasting

This is an example when the event interest is pushed to the Queue and multicast on the Topic across the namespaces. The following picture shows this scenario:

To demonstrate a Sender and Subscriber I will use Azure Service Bus Tester, for editing a Table storage Azure Storage Explorer and for creating components on the Service Bus it will be Service Bus Explorer. This example requires creating two namespaces on the Service Bus.

Please, create the queue search on your first namespace (in my example is rk2011). On the second one, create the Topic topic/worker and below this topic two subscriptions such as image1 and imageN.

The following picture is a screen snippet of the Service Bus Explorer for each namespace:

As the next step is to add a Push entry to the Repository. This is a Pusher of the BrokeredMessage from the Queue to the Topic, therefore we need to setup only the following properties for this entity entry:

The PartitionKey and RowKey is a unique identifier for entry row. Basically, there is a mapping (routing) info for delivery message from the queue to the topic based on the PartitionKey and Url. Note that the Pusher needs for this connectivity such as pull and push process across the namespaces two accounts. These accounts must be stored into the Push Table. as well.

The Azure Storage Explorer allows uploading the Table from the file system. The Test folder includes this file, the following picture shows its contents:

That's all for Pusher setup and metadata plumbing. Now we can use the Tester with virtual Sender and Subscribers. Please, add the Sender for search Queue and subscribers for Image1 and imageN subscriptions.

Once we have these components on the Tester, we are ready to make a test. Right click on the Sender and select Send for sending a message to the queue. The message will be pulled from the queue by Pusher and pushed to the topic/worker, where are two subscribers.

You should see a result of the multicasting receiving two messages on the Tester, like it is shown in the following screen snippet:

That's great. But wait a moment, let's make some additional experiment. Add virtual receiver for queue search and repeat the test again. You should see message balancing between the Pusher and virtual Receiver.

One more example, but this time I would like to demonstrate pushing a message to the legacy http web service, see the next step.

Step B2. Example of the HTTP Pusher.

The use-case for this test is very simple. We need to add one more multicast Target to the previously example. This Target is a legacy BasicHttpBinding web service with an address, for instance, http://localhost:8982/Service.

The first step is to add a new subscription, let's say imageNplusOne with an Action set sys.To='$Default'. Note, that this RuleAction is very important for query in the Repository, see earlier description.

The following picture shows a Service Bus Explorer with a new subscription imageNplusOne:

Next step is to add the following Push entry in the Repository. Note, that the RowKey is a RuleName, MessageVersion is Soap11 and Url is endpoint address of our target service:

Now, back to the Tester. Sending a message from the virtual Sender, we can see like from previous example messages in the image1 and imageN subscribers, but the error message in the pusher subscriber. This error message shows that the Pusher couldn't deliver the message to this Target endpoint.

In the case of having this service on your machine and self-hosted in the Console program, the following message should show up on the screen:

The reason why I am showing this picture is the BMP Header in the Soap11 message. Of course, this is an option property in the Repository.

Step B3. Example of the HTTPS Pusher

In this scenario, the push message is sent to the target via secure channel and client must be authorized by the service using its basic credential such as UserName and Password. This example shows you what must be done for this scenario:

  1. editing PusherEntity row using the UserName and Password for the Target service
  2. changing transport to the https, for instance: https://myMachine/Service
  3. installing a SSL certificate on IIS and configure the virtual directory to require SSL.
  4. IIS publicly available
  5. your service need to configure your custom UserNamePasswordValidator

The following picture shows all changes on the Pusher side described in the above a) and b) bullets:

Moving Pusher to the secure transport is very straightforward, the setup of HTTPS and Basic Authentication on the service side will take majority of the time. In the case of multiple Push messages to the same Target (endpoint), we can use $Credential entity row for this endpoint. The following pictures show these entries. Note, that the PusherEntity row must be empty (null).

Note: I do recommend to test this scenario on the local machine, including the Emulator and then move to the cloud both, the Pusher and your Service. This link How to Configure an SSL Certificate on an HTTPS Endpoint can help you as well.

That's all for examples, you can create your Push entries based on the needs, but what about if we need to Push a message to the Target with a different binding protocol, etc.? That's a good point. The solution is called Virtual Bridge and is described in the Appendix A of this article and I am going to publish it soon on the codeproject.

Step C1. Deploying on the Azure.

The previous steps were done with Emulator where all trace messages can be seen on instance screens. Now, it is the time to move it to the Azure. Exit the Emulator, create the package and deploy it to the cloud. You can also use manual process, such as creating a hosted service and uploading package and configuration files.

Once the service on the cloud is Ready, the above test steps can be repeated including the last one with the error of the endpoint address (http://localhost:8982/Service).

Conclusion

This article described a Push Service for Service Bus Components. The Pusher enables to use a Push Model for delivery an event interest from the Service Bus Components such as Queue and Topic. The Table Storage (Repository) allows us to create a runtime mapping of the Subscribers or Receivers to the Target endpoints in the Push model driven architecture. I hope you enjoyed it.

Appendix A

The Pusher has built-in a simple proxy for BasicHttpBinding endpoint where we can push a brokered message. In other cases, such as message mediation and targeting we need a bridge. Basically, the Bridge is a special service for processing a message in the predefined pattern Validate-Enrich-Transform-Enrich-Route (VETER). The following picture shows this scenario:

The above picture shows a Publisher for pushing an event interest to the Service Bus (queue or topic) and the Pusher will push the brokered message to the Bridge, where the message can be mediated (transform) and dispatched to the specific Target.

How this process is accomplished and abstracted (virtualized) for Windows Azure infrastructure and hosted in the WebRole, you can see in my upcoming article, soon. It is based on my previous article RoutingService on Azure. Also, I recommend to look at Windows Azure Service Bus EAI & EDI Labs Release.

References:

[0] Service Bus

[1] Service Bus Explorer

[2] Azure Service Bus Tester

[3] Azure Storage Explorer

[4] The Developers Guide to AppFabric

[5] Using Windows Azure Service Bus Messaging

[6] Windows Azure Service Bus EAI & EDI

[7] How to: Use a Custom User Name and Password Validator

[8] How to Configure an SSL Certificate on an HTTPS Endpoint

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Roman Kiss
Software Developer (Senior)
United States United States
No Biography provided

Comments and Discussions

 
GeneralMy vote of 5 PinmvpKanasz Robert28-Sep-12 6:37 
Great article
GeneralMy vote of 5 Pinmembercrystalmac4-Apr-12 23:48 
GeneralMy vote of 5 PinmemberBigTimber@home3-Apr-12 16:53 
GeneralMy vote of 5 Pinmembermw12601-Apr-12 10:25 
GeneralRe: My vote of 5 PinmemberRoman Kiss2-Apr-12 8:09 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Terms of Use | Mobile
Web02 | 2.8.1411022.1 | Last Updated 3 Apr 2012
Article Copyright 2012 by Roman Kiss
Everything else Copyright © CodeProject, 1999-2014
Layout: fixed | fluid