This article shows a way of using Windows Azure storage -specifically the Queue and the Table storage - to implement the command side of a Command Query Responsibility Segregation architecture. It is implemented in VB.Net but there is nothing in it preventing a very quick conversion to C# if that is your language of choice.
It does not cover the query side of the architecture nor the related concepts of Event Sourcing which I hope to get to as the application this code comes from is built out.
At its simplest CQRS is an architecture pattern that separates commands (doing) from queries (asking). What this means in practice is that the command and query side do not need to share a common model and this vertical shearing layer is often matched with a vertical shearing layer between the definition (of a command or query) and the implementation (in a command handler or a query handler).
In my experience, CQRS is very useful when you are developing an application with a distributed team to a very short timescale as it reduces the risk of "model contention" which can occur if everyone is updating the model at the same time. It is also useful for financial scenarios where having an audit trail of all the changes that have occurred in an application is required.
(If you are not familiar with CQRS can I recommend you start with this excellent article for an overview of the how and why)
Defining the command
Every type of action you can perform is considered its own command and each command has a definition - covering both what the command is and what additional parameters are needed. You need a distinct class for each type of command your system uses which defined both what the command is, and what payload is needed in order to execute it.
For example if you have a bank account based system you might have a command OpenAccount that has parameters that cover the data needed to open a bank account such as the account owner, holding branch, denomination currency and so on whereas a command DepositFunds would require an account number and an amount. You could also have a command that does not require any parameters at all.
In order that a given class can be semantically identified as being a command it derives from a standard interface ICommandDefinition. This also adds two mandatory properties - an unique identifier by which every instance of a command can be identified and a human-readable name by which the command is known.
(Many implementations do not use a human readable command name - you can use it or not)
Public Interface ICommandDefinition
ReadOnly Property InstanceIdentifier As Guid
ReadOnly Property CommandName As String
At this point we can go ahead and define concrete classes for each command - for example our OpenAccount command definition could look like:
Public Class OpenAccountCommandDefinition
Private m_instanceid As Guid = Guid.NewGuid
Public ReadOnly Property InstanceIdentifier As Guid Implements ICommandDefinition.InstanceIdentifier
Public Overrides ReadOnly Property CommandName As String
Return "Open a new account"
Public Property BranchIdentifier As String
Public Property AccountOwner As String
Public Property DenominationCurrency As String
However, in order to pass the command definition across to the command handler I am using windows azure storage - specifically a queue and a matching table. The queue has the command type and unique identifier appended on to it and the matching table is used to hold any additional payload that the command requires.
The maximum size of a message on an Azure queue is 64kb but it is possible that a command would require more payload than that so this dual approach is used
Dispatching the command
The first step in putting the payload in an Azure table is to identify the payload properties of the command to save them off. This can either be done using reflection or by having a class the represents a command parameter and using a dictionary of them to back the payload properties.
When a command definition has been created it then needs to be dispatched. The command dispatcher is responsible for the transfer to the command handlers. In the case of the Azure dispatcher this is a two step process- first save the payload to a commands table, then add a message to the queue. The (partial) code snippet to do this is:-
Dim applicationStorageAccount As CloudStorageAccount
Dim applicationQueueClient As CloudQueueClient
Dim applicationTableClient As CloudTableClient
Public Sub Send(command As ICommandDefinition) Implements ICommandDispatcher.Send
If (applicationTableClient IsNot Nothing) Then
Dim commandTable As CloudTable = applicationTableClient.GetTableReference("commandsparameterstable")
Dim cmdRecord As New CommandTableEntity(command)
Dim insertOperation As TableOperation = TableOperation.InsertOrReplace(cmdRecord)
Dim insertResult = commandTable.Execute(insertOperation)
If (applicationQueueClient IsNot Nothing) Then
Dim queue As CloudQueue = applicationQueueClient.GetQueueReference("commandsqueue")
Dim msg As CloudQueueMessage = New CloudQueueMessage(command.GetType.Name & "::" & command.InstanceIdentifier.ToString())
If (msg IsNot Nothing) Then
It is important to follow the naming standards for queues and tables - all lower case and no punctuation.
In order to save a command record to an azure table it must either be put in a class that inherits from TableEntity or, if you are doing manipulations behind the scenes then you would need a class that inherits from ITableEntity.
I wasn't able to find many examples of inheriting from ITableEntity so mine is as follows:
Public Class CommandTableEntity
Private m_parameters As Dictionary(Of String, CommandParameter) = New Dictionary(Of String, CommandParameter)
Public Property ETag As String Implements ITableEntity.ETag
Public Property PartitionKey As String Implements ITableEntity.PartitionKey
Public Property RowKey As String Implements ITableEntity.RowKey
Public Property Timestamp As DateTimeOffset Implements ITableEntity.Timestamp
Public Property Processed As Boolean
Public Property InError As Boolean
Public Property ProcessedBy As String
Public Sub ReadEntity(properties As IDictionary(Of String, EntityProperty), operationContext As OperationContext) Implements ITableEntity.ReadEntity
For Each propertyPair In properties
If (propertyPair.Key.Contains("__")) Then
Dim fixedPropertyName As String = propertyPair.Key.Replace("__", "[").Trim() & "]"
Dim parameterName As String = CommandParameter.GetParameterName(fixedPropertyName)
Dim parameterIndex As Integer = CommandParameter.GetParameterIndex(fixedPropertyName)
Dim parameterPayload As String = propertyPair.Value.StringValue
If (Not String.IsNullOrWhiteSpace(parameterPayload)) Then
m_parameters.Add(propertyPair.Key, CommandParameter.Create(parameterName, parameterIndex))
If propertyPair.Key.Equals("Processed", StringComparison.OrdinalIgnoreCase) Then
If (propertyPair.Value.BooleanValue.HasValue) Then
Me.Processed = propertyPair.Value.BooleanValue.Value
If propertyPair.Key.Equals("InError", StringComparison.OrdinalIgnoreCase) Then
If (propertyPair.Value.BooleanValue.HasValue) Then
Me.InError = propertyPair.Value.BooleanValue.Value
If propertyPair.Key.Equals("ProcessedBy", StringComparison.OrdinalIgnoreCase) Then
Me.ProcessedBy = propertyPair.Value.StringValue
Public Function WriteEntity(operationContext As OperationContext) As IDictionary(Of String, EntityProperty) Implements ITableEntity.WriteEntity
Dim properties As New Dictionary(Of String, EntityProperty)
For Each param As CommandParameter In m_parameters.Values
Dim fixedPropertyName As String = CommandParameter.GetParameterKey(param).Replace("[", "__").Replace("]", "")
properties.Add(fixedPropertyName, New EntityProperty(param.ToString))
properties.Add("Processed", New EntityProperty(Me.Processed))
properties.Add("InError", New EntityProperty(Me.InError))
properties.Add("ProcessedBy", New EntityProperty(Me.ProcessedBy))
Next step - getting the command off the queue and processing it - for the next article...
Querying the command
As we have given each command instance it's own unique identifier in InstanceIdentifier, we can create a user interface component to query the state of the command queue and table so as to perform a "How is my command getting on" type of query.
This solves one of the biggest areas of confusion in CQRS - how does the user get feedback that their command has executed if it is totally asynchronous. The solution is for them to ask - either by explicit user interaction or by some polling mechanism.
This also means that any systems that don't require status feedback (such as automated sensor feeds or data pipelines) can use exactly the same command handling functions as are used by an interactive front end.
Points of Interest
If you are doing any work with Windows Azure Storage I recommend downloading the Azure Storage Explorer project from CodePlex.
Tips I have learnt so far:-
1) When storing data in windows azure table storage, put extra care into deciding what the partition key property is - I started out with too high level an object (client) which meant my tables only had a small number of partitions which makes data access slower.
2) It costs a (very small) amount of money to poll a windows storage queue even if it is empty therefore a way of throttling the polling rate up or down depending on demand is a very good idea
v1 - 2014-01-22 Rewritten to emphasise why over how - removed the code dump aspect
v2 - 2014-01-19 Outline of the classes involved
v3 - 2016-03-05 Added thoughts on querying the command