Although event sourcing is not a mandatory part of CQRS, and indeed event sourcing is used outside of CQRS the two are often used together. In this article I will be looking at an implementation that uses Windows Azure Storage (Tables) as the persistence mechanism.
Getting your head around event sourcing
For developers schooled in the relational database model, event sourcing can seem to be a very confusing way of doing things.
Hopefully the following hints can help:
- Events only get added to the end of the event list (you can conceptualise this as a stack that doesn't have a pop option)
- Events are stored by the thing they occur to rather than the type of event that the are. For example we don't have a separate table for "payments" and "standing orders" in a bank account type of system - these are just different events that occur in the life of the bank account.
- Events cannot be deleted - if you need to "undo" something that has happend a reversal or undo event needs to be added to the event store
- Events are past-tense.
However, if you get your head out of the CRUD you will see some benefits
- You automatically get a complete audit trail for everything that occured (If you have fields like "Last Modified", "Updated By" in your database tables and a set of triggers that write to an audit table whenever a
field is updated in the database you are already doing this and doing it backwards)
- You can query the event history in ways that weren't anticipated when the system was first created
- Your event payload schema can be very flexible - if a particluar attribute doesn't apply to a particular type of event you don't need to store a "null" there
Fig 1: Example of an event store for vehicles, using the vehicle registration number as the aggregation identifier
Aside - a quick glossary of terms
There are a couple of words that can cause confusion when dealing with event sources: Aggregation and Sequence. My (perhaps over-simplicstic) definition is as
The Aggregation is the thing to which events occur. In a bank example, this could be something like a bank account, in a vehicle leasing company it could be the vehicle and so on.
Each aggregation must be uniquely identified. Often a business domain already has unique identifiers you can use but if that is not the case you can create them using
The Sequence is the order in which the events occured - this is almost always implemented as an incremental number
Using Windows Azure Storage Tables to persist events
An event can be represented by a CLR class. It is usual to have an empty IEvent interface to indicate the intent of the developer that a given class is an event.
When storing the event we need to add the agrgegation identifier and sequence number - so these two are specified in an interface for storing events:
Public Interface IEventIdentity
Function GetAggregateIdentifier() As String
ReadOnly Property Version As UInteger
ReadOnly Property EventInstance As IEvent
In this case the aggregate identifier is implemented as a string so that the business can dictate what actual unique identifier to use to for it. For clarity I also added an
interface that can be used to set these aggregate identifiers but this is totally optional.
Public Interface IAggregateIdentity
Function GetAggregateIdentifier() As String
Turning a version into a row identifier
Since the version is an incremental number and the azure table takes a string for its row key you need to pad the version numbers with zero so as to store it in a manner that it
will sort correctly
Private Const VERSION_FORMAT As String = "0000000000000000000"
Public Shared Function VersionToRowkey(ByVal version As Long) As String
If (version <= 0) Then
Return (Long.MaxValue - version).ToString(VERSION_FORMAT)
Saving an event record
The event record itself (everything except the partition key and row key) can be any kind of .NET class that inherits
IEventIdentity. Because the fields this record can have are dynamic (depending on the event type - recall from above that different event types are stored in the same event store) we have to use a
DynamicTableEntity class and fill it with the properties of our event class passed in:
Public Shared Function MakeDynamicTableEntity(ByVal eventToSave As IEventContext)
Dim ret As New DynamicTableEntity
ret.PartitionKey = eventToSave.GetAggregateIdentifier()
ret.RowKey = VersionToRowkey(eventToSave.Version)
If (eventToSave.SequenceNumber <= 0) Then
If (Not String.IsNullOrWhiteSpace(eventToSave.Commentary)) Then
If (Not String.IsNullOrWhiteSpace(eventToSave.Who)) Then
ret.Properties.Add("Who", New EntityProperty(eventToSave.Who))
If (Not String.IsNullOrWhiteSpace(eventToSave.Source)) Then
ret.Properties.Add("Source", New EntityProperty(eventToSave.Source))
For Each pi As System.Reflection.PropertyInfo In eventToSave.EventInstance.GetType().GetProperties()
If (pi.CanRead) Then
ret.Properties.Add(pi.Name, MakeEntityProperty(pi, eventToSave.EventInstance))
Then turning the
DynamicTableEntity back into an appropriate event class is a matter of reading the event type, creating an instance of that type and then populating its properties from the
DynamicTableEntity.Properties collection by reflection.
Consuming events and projections
In order to turn your event stream into something interesting (at least, interesting to a user that wants to query the data) you need to create a projection. A projection is a view of the effect of a set of events. For example a financial projection on the above cars event example would be interested in any event that impacted the cost or profit from any given car.
To consume events you need to create a class that "knows" what kind of events it deals with and what to do with them. This requires an interface to mark a class as "I do something with this kind of event"
Public Interface IEventConsumer(Of In Taggregate As IAggregateIdentity, TEvent As IEvent(Of Taggregate))
Sub ConsumeEvent(ByVal eventToConsume As TEvent)
Since we have an interface that defines a type of event that occurs and another interface that defines a type of thing that can consume these events we can wire these together at application start-up time (by using your IoC container of choice or even just hard-coding the connections)
To allow the current state of a projection to be saved - both for use by any readers and for allowing us to have a starting point if we have to rebuild a projection after a service interruption - an interface to define a snapshot of a projection is also defined. This is a way of saying "it was like this at a given known point"
Public Interface ISnaphot(Of In IAggregateIdentity)
ReadOnly Property AsOfVersion As Long
In practice I save these snapshots as a blob (file) in JSON format - this makes (some of) the query side of the CQRS architecture as simple as finding the snapshot and reading it.
Sequencing and synchronising events
It is often useful to be able to sequence events that occurred to different aggregations together - for example you might have one aggregation for a stock price and another for an account and need to combine the two to give an account valuation at a given point in time.
To facilitate this a master synchronisation field is needed - this could be an incremental number or you can use the date/time of the event occurrence.
To do this an abstract class is used as the base class of all the event types and it handles the synchronisation key.
Public MustInherit Class EventBase
Public Property SynchronisationStamp As Long
Public Sub New()
SynchronisationStamp = DateTime.UtcNow.Ticks
Points of Interest
When using Windows Azure Table Storage, queries that use the partition key and row identifier are very fast. Those that don't are much slower - therefore mapping these two fields
to the aggregate identifier and sequence number is a very sensible way to start.
- 2014-01-23 Initial draft
- 2014-02-06 Added projection explanation
- 2014-07-16 Detail on how event streams can be synchronised