Click here to Skip to main content
Click here to Skip to main content

CQRS on Windows Azure - Event sourcing

, 16 Jul 2014
Rate this:
Please Sign up or sign in to vote.
How to use a windows azure storage table as an event store for event sourcing

Introduction

Although event sourcing is not a mandatory part of CQRS, and indeed event sourcing is used outside of CQRS the two are often used together.  In this article I will be looking at an implementation that uses Windows Azure Storage (Tables) as the persistence mechanism.

 

Getting your head around event sourcing 

For developers schooled in the relational database model, event sourcing can seem to be a very confusing way of doing things. 
Hopefully the following hints can help:

  1. Events only get added to the end of the event list (you can conceptualise this as a stack that doesn't have a pop option)
  2. Events are stored by the thing they occur to rather than the type of event that the are.  For example we don't have a separate table for "payments" and "standing orders" in a bank account type of system - these are just different events that occur in the life of the bank account.
  3. Events cannot be deleted - if you need to "undo" something that has happend a reversal or undo event needs to be added to the event store
  4. Events are past-tense.

However, if you get your head out of the CRUD you will see some benefits

 

  1. You automatically get a complete audit trail for everything that occured (If you have fields like "Last Modified", "Updated By" in your database tables and a set of triggers that write to an audit table whenever a
    field is updated in the database you are already doing this and doing it backwards)
  2. You can query the event history in ways that weren't anticipated when the system was first created
  3. Your event payload schema can be very flexible - if a particluar attribute doesn't apply to a particular type of event you don't need to store a "null" there

Fig 1: Example of an event store for vehicles, using the vehicle registration number as the aggregation identifier

Aside - a quick glossary of terms 

There are a couple of words that can cause confusion when dealing with event sources: Aggregation and Sequence. My (perhaps over-simplicstic) definition is as
follows.

The Aggregation is the thing to which events occur. In a bank example, this could be something like a bank account, in a vehicle leasing company it could be the vehicle and so on.

Each aggregation must be uniquely identified. Often a business domain already has unique identifiers you can use but if that is not the case you can create them using

The Sequence is the order in which the events occured - this is almost always implemented as an incremental number

Using Windows Azure Storage Tables  to persist events 

An event can be represented by a CLR class. It is usual to have an empty IEvent interface to indicate the intent of the developer that a given class is an event.

When storing the event we need to add the agrgegation identifier and sequence number - so these two are specified in an interface for storing events:

''' <summary>
''' Additional properties that uniquely identify an event
''' </summary>
Public Interface IEventIdentity
 
    ''' <summary>
    ''' Get the identifier by which this events aggregate is uniquely known
    ''' </summary>
    ''' <remarks>
    ''' Most implementation suse a GUID for this but if you have a known unique identifier 
    ''' then that can be used instead - e.g. ISBN, CUSIP, VIN etc.
    ''' </remarks>
    Function GetAggregateIdentifier() As String
 
    ''' <summary>
    ''' The event version 
    ''' </summary>
    ReadOnly Property Version As UInteger
 
    ''' <summary>
    ''' The event that is identified by this event identity
    ''' </summary>
    ReadOnly Property EventInstance As IEvent
 
End Interface

In this case the aggregate identifier is implemented as a string so that the business can dictate what actual unique identifier to use to for it. For clarity I also added an
interface that can be used to set these aggregate identifiers but this is totally optional.

''' <summary>
''' Interface to be implemented by any class that provides an aggregate 
''' identity
''' </summary>
''' <remarks>
''' This allows for different objects to define their aggregate identity differently - 
''' for example books might aggregate by ISBN, Stocks by CUSIP, cars by vehicle registration number etc
''' </remarks>
Public Interface IAggregateIdentity
 
    ''' Get the identifier by which this events aggregate is uniquely known
    Function GetAggregateIdentifier() As String
 
End Interface

Turning a version into a row identifier

 

Since the version is an incremental number and the azure table takes a string for its row key you need to pad the version numbers with zero so as to store it in a manner that it
will sort correctly

    Private Const VERSION_FORMAT As String = "0000000000000000000"
    Public Shared Function VersionToRowkey(ByVal version As Long) As String

        If (version <= 0) Then
            Return Long.MaxValue.ToString(VERSION_FORMAT)
        Else
            Return (Long.MaxValue - version).ToString(VERSION_FORMAT)
        End If

    End Function

Saving an event record  

The event record itself (everything except the partition key and row key) can be any kind of .NET class that inherits IEventIdentity.  Because the fields this record can have are dynamic (depending on the event type - recall from above that different event types are stored in the same event store) we have to use a DynamicTableEntity class and fill it with the properties of our event class passed in:

Public Shared Function MakeDynamicTableEntity(ByVal eventToSave As IEventContext) 
          As DynamicTableEntity

        Dim ret As New DynamicTableEntity

        ret.PartitionKey = eventToSave.GetAggregateIdentifier()
        ret.RowKey = VersionToRowkey(eventToSave.Version)

        'Add the event type - currently this is the event class name
        ret.Properties.Add("EventType", 
              New EntityProperty(eventToSave.EventInstance.GetType().Name))

        'Add the context
        If (eventToSave.SequenceNumber <= 0) Then
            'Default sequence number is the current UTC date
            ret.Properties.Add("SequenceNumber", 
                New EntityProperty(DateTime.UtcNow.Ticks))
        Else
            ret.Properties.Add("SequenceNumber", 
                New EntityProperty(eventToSave.SequenceNumber))
        End If

        If (Not String.IsNullOrWhiteSpace(eventToSave.Commentary)) Then
            ret.Properties.Add("Commentary", 
                 New EntityProperty(eventToSave.Commentary))
        End If



        If (Not String.IsNullOrWhiteSpace(eventToSave.Who)) Then
            ret.Properties.Add("Who", New EntityProperty(eventToSave.Who))
        End If


        If (Not String.IsNullOrWhiteSpace(eventToSave.Source)) Then
            ret.Properties.Add("Source", New EntityProperty(eventToSave.Source))
        End If


        'Now add in the different properties of the payload
        For Each pi As System.Reflection.PropertyInfo In eventToSave.EventInstance.GetType().GetProperties()
            If (pi.CanRead) Then
                ret.Properties.Add(pi.Name, MakeEntityProperty(pi, eventToSave.EventInstance))
            End If
        Next pi
    End Function 

 

Then turning the DynamicTableEntity back into an appropriate event class is a matter of reading the event type, creating an instance of that type and then populating its properties from the DynamicTableEntity.Properties collection by reflection.

Consuming events and projections 

In order to turn your event stream into something interesting (at least, interesting to a user that wants to query the data) you need to create a projection.  A projection is a view of the effect of a set of events.  For example a financial projection on the above cars event example would be interested in any event that impacted the cost or profit from any given car.   

To consume events you need to create a class that "knows" what kind of events it deals with and what to do with them.  This requires an interface to mark a class as "I do something with this kind of event"

''' The type of aggregate that uniquely identifies the thing the events pertain to
''' </typeparam>
''' <typeparam name="TEvent">
''' The type of the event that the consumer consumes
''' </typeparam>
''' <remarks>
''' In practice a projection will consume many different types of event to 
''' derive its view
''' </remarks>
Public Interface IEventConsumer(Of In Taggregate As IAggregateIdentity, TEvent As IEvent(Of Taggregate))

    ''' <summary>
    ''' Perform whatever function you need to process the event
    ''' </summary>
    ''' <param name="eventToConsume">
    ''' The event that has occured to be consumed
    ''' </param>
    ''' <remarks>
    ''' An event may be routed through many consumers and a consumer class may 
    ''' consume multiple events.  This can all be wired up at start-up (by IoC 
    ''' or hard coded)
    ''' </remarks>
    Sub ConsumeEvent(ByVal eventToConsume As TEvent)


End Interface
 

Since we have an interface that defines a type of event that occurs and another interface that defines a type of thing that can consume these events we can wire these together at application start-up time (by using your IoC container of choice or even just hard-coding the connections)

Snapshots

To allow the current state of a projection to be saved - both for use by any readers and for allowing us to have a starting point if we have to rebuild a projection after a service interruption - an interface to define a snapshot of a projection is also defined. This is a way of saying "it was like this at a given known point"

''' <summary>
''' A snapshot of a projection as at a point in time
''' </summary>
''' <typeparam name="IAggregateIdentity">
''' The type of thing that we are snapshotting that can be uniquely identified
''' </typeparam>
''' <remarks>
''' For entites that have a busy or long history it may be performant to store
''' point-in-time snapshots and only project forward from the most recent 
''' snapshot
''' </remarks>
Public Interface ISnaphot(Of In IAggregateIdentity)

    ''' <summary>
    ''' The version number of the highest event that contributed to 
    ''' this snapshot
    ''' </summary>
    ''' <remarks>
    ''' All events of higher version should be applied to this projection
    ''' so as to get the current as-of-now view
    ''' </remarks>
    ReadOnly Property AsOfVersion As Long

End Interface

In practice I save these snapshots as a blob (file) in JSON format - this makes (some of) the query side of the CQRS architecture as simple as finding the snapshot and reading it.

Sequencing and synchronising events

It is often useful to be able to sequence events that occurred to different aggregations together - for example you might have one aggregation for a stock price and another for an account and need to combine the two to give an account valuation at a given point in time.

To facilitate this a master synchronisation field is needed - this could be an incremental number or you can use the date/time of the event occurrence.

To do this an abstract class is used as the base class of all the event types and it handles the synchronisation key.

    ''' <summary>
    ''' Base class for all events - allows for a common synchronising property
    ''' </summary>
    Public MustInherit Class EventBase

        ''' <summary>
        ''' The creation timestamp of this event 
        ''' </summary>
        ''' <remarks>
        ''' This allows event streams to be combined in a synchronised fashion for 
        ''' multi-aggregate snapshots
        ''' </remarks>
        Public Property SynchronisationStamp As Long

        Public Sub New()
            ' Default the synchronisation stamp to now
            SynchronisationStamp = DateTime.UtcNow.Ticks
        End Sub
    End Class

 

Points of Interest 

When using Windows Azure Table Storage, queries that use the partition key and row identifier are very fast. Those that don't are much slower - therefore mapping these two fields
to the aggregate identifier and sequence number is a very sensible way to start.

History

  • 2014-01-23 Initial draft
  • 2014-02-06 Added projection explanation
  • 2014-07-16 Detail on how event streams can be synchronised

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Duncan Edwards Jones
Software Developer (Senior)
Ireland Ireland
C# / SQL Server developer
Microsoft MVP 2006, 2007
Visual Basic .NET
Follow on   Twitter   LinkedIn

Comments and Discussions

 
QuestionUse of RowKey for sequencing event history PinmemberDuncan Edwards Jones24-Jan-14 3:25 
AnswerRe: Use of RowKey for sequencing event history PinmemberDuncan Edwards Jones29-Jan-14 9:05 
Questionyaay Pinmemberbryce23-Jan-14 17:16 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Mobile
Web02 | 2.8.140827.1 | Last Updated 16 Jul 2014
Article Copyright 2014 by Duncan Edwards Jones
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid