Click here to Skip to main content
12,758,462 members (30,605 online)
Click here to Skip to main content
Add your own
alternative version

Stats

33.9K views
197 downloads
26 bookmarked
Posted 23 Jan 2014

CQRS on Windows Azure - Event Sourcing

, 19 Jan 2017 CPOL
Rate this:
Please Sign up or sign in to vote.
How to use a windows azure storage table, blob or file as an event store for event sourcing

Introduction

Although event sourcing is not a mandatory part of CQRS, and indeed event sourcing is used outside of CQRS the two are often used together.  In this article I will be looking at an implementation that uses Windows Azure Storage (either Tables, Blobs or Files) as the persistence mechanism.

Getting your head around event sourcing 

For developers schooled in the relational database model, event sourcing can seem to be a very confusing way of doing things. 
Hopefully the following hints can help:-

  1. Events only get added to the end of the event list (you can conceptualise this as a stack that doesn't have a pop option)
  2. Events are stored by the thing they occur to rather than the type of event that the are.  For example we don't have a separate table for "payments" and "standing orders" in a bank account type of system - these are just different events that occur in the life of the bank account.
  3. Events cannot be deleted - if you need to "undo" something that has happend a reversal or undo event needs to be added to the event store
  4. Events are past-tense.

However, if you get your head out of the CRUD you will see some benefits

  1. You automatically get a complete audit trail for everything that occured (If you have fields like "Last Modified", "Updated By" in your database tables and a set of triggers that write to an audit table whenever a field is updated in the database you are already doing this and doing it backwards)
  2. You can query the event history in ways that weren't anticipated when the system was first created
  3. Your event payload schema can be very flexible - if a particular attribute doesn't apply to a particular type of event you don't need to store a "null" there

Fig 1: Example of an event store for vehicles, using the vehicle registration number as the aggregation identifier

Aside - a quick glossary of terms 

There are a couple of words that can cause confusion when dealing with event sources: Aggregation and Sequence. My (perhaps over-simplicstic) definition is as follows:

The Aggregation is the thing to which events occur. In a bank example, this could be something like a bank account, in a vehicle leasing company it could be the vehicle and so on.

Each aggregation must be uniquely identified. Often a business domain already has unique identifiers you can use but if that is not the case you can create them using

The Sequence is the order in which the events occured - this is almost always implemented as an incremental number (although in the case of a file based stream I use a file pointer for the start of the event).

Architecture

The above whiteboard overview shows where event streams would fit in a CQRS architecture.  Within Azure storage there are a large number of different ways you could store the events underlying the event stream - in my case I have provided implementation for four of these:  SQL , Table, File or AppendBlob.  The implementation is different depending on which of these underlying technologies you choose to use.

1) Using Azure Storage "Tables"  to persist events 

An event can be represented by a CLR class. It is usual to have an empty IEvent interface to indicate the intent of the developer that a given class is an event.

When storing the event we need to add the agrgegation identifier and sequence number - so these two are specified in an interface for storing events:

''' <summary>
''' Additional properties that uniquely identify an event
''' </summary>
Public Interface IEventIdentity
 
    ''' <summary>
    ''' Get the identifier by which this events aggregate is uniquely known
    ''' </summary>
    ''' <remarks>
    ''' Most implementation suse a GUID for this but if you have a known unique identifier 
    ''' then that can be used instead - e.g. ISBN, CUSIP, VIN etc.
    ''' </remarks>
    Function GetAggregateIdentifier() As String
 
    ''' <summary>
    ''' The event version 
    ''' </summary>
    ReadOnly Property Version As UInteger
 
    ''' <summary>
    ''' The event that is identified by this event identity
    ''' </summary>
    ReadOnly Property EventInstance As IEvent
 
End Interface

In this case the aggregate identifier is implemented as a string so that the business can dictate what actual unique identifier to use to for it. For clarity I also added an
interface that can be used to set these aggregate identifiers but this is totally optional.

''' <summary>
''' Interface to be implemented by any class that provides an aggregate 
''' identity
''' </summary>
''' <remarks>
''' This allows for different objects to define their aggregate identity differently - 
''' for example books might aggregate by ISBN, Stocks by CUSIP, cars by vehicle registration number etc
''' </remarks>
Public Interface IAggregateIdentity
 
    ''' Get the identifier by which this events aggregate is uniquely known
    Function GetAggregateIdentifier() As String
 
End Interface

Turning a version into a row identifier

 

Since the version is an incremental number and the azure table takes a string for its row key you need to pad the version numbers with zero so as to store it in a manner that it
will sort correctly

Private Const VERSION_FORMAT As String = "0000000000000000000"
Public Shared Function VersionToRowkey(ByVal version As Long) As String

    If (version <= 0) Then
        Return Long.MaxValue.ToString(VERSION_FORMAT)
    Else
        Return (Long.MaxValue - version).ToString(VERSION_FORMAT)
    End If

End Function

Saving an event record  

The event record itself (everything except the partition key and row key) can be any kind of .NET class that inherits IEventIdentity.  Because the fields this record can have are dynamic (depending on the event type - recall from above that different event types are stored in the same event store) we have to use a DynamicTableEntity class and fill it with the properties of our event class passed in:

Public Shared Function MakeDynamicTableEntity(ByVal eventToSave As IEventContext) 
          As DynamicTableEntity

        Dim ret As New DynamicTableEntity

        ret.PartitionKey = eventToSave.GetAggregateIdentifier()
        ret.RowKey = VersionToRowkey(eventToSave.Version)

        'Add the event type - currently this is the event class name
        ret.Properties.Add("EventType", 
              New EntityProperty(eventToSave.EventInstance.GetType().Name))

        'Add the context
        If (eventToSave.SequenceNumber <= 0) Then
            'Default sequence number is the current UTC date
            ret.Properties.Add("SequenceNumber", 
                New EntityProperty(DateTime.UtcNow.Ticks))
        Else
            ret.Properties.Add("SequenceNumber", 
                New EntityProperty(eventToSave.SequenceNumber))
        End If

        If (Not String.IsNullOrWhiteSpace(eventToSave.Commentary)) Then
            ret.Properties.Add("Commentary", 
                 New EntityProperty(eventToSave.Commentary))
        End If



        If (Not String.IsNullOrWhiteSpace(eventToSave.Who)) Then
            ret.Properties.Add("Who", New EntityProperty(eventToSave.Who))
        End If


        If (Not String.IsNullOrWhiteSpace(eventToSave.Source)) Then
            ret.Properties.Add("Source", New EntityProperty(eventToSave.Source))
        End If


        'Now add in the different properties of the payload
        For Each pi As System.Reflection.PropertyInfo In eventToSave.EventInstance.GetType().GetProperties()
            If (pi.CanRead) Then
                ret.Properties.Add(pi.Name, MakeEntityProperty(pi, eventToSave.EventInstance))
            End If
        Next pi
    End Function 

 

Then turning the DynamicTableEntity back into an appropriate event class is a matter of reading the event type, creating an instance of that type and then populating its properties from the DynamicTableEntity.Properties collection by reflection.

Points of Interest 

When using Windows Azure Table Storage, queries that use the partition key and row identifier are very fast. Those that don't are much slower - therefore mapping these two fields
to the aggregate identifier and sequence number is a very sensible way to start.

2) Using Azure Storage "Append Blob" to persist events 

An append blob is a new, special type of binary large object store in Windows Azure storage which is optimised such that you can only add data onto the end of it. 

The append blob has a maximum size of 195Mb (or 50,000 events) so the usual setup is to create one blob per unique event stream.  This also allows for a very high degree of prallelism.

The additional metadata needed for each event stream (such as the record count) can then be stored using the file metadata for the Azure blob:-

Protected Const METATDATA_DOMAIN As String = "DOMAIN"
Protected Const METADATA_AGGREGATE_CLASS As String = "AGGREGATECLASS"
Protected Const METADATA_SEQUENCE As String = "SEQUENCE"
Protected Const METADATA_RECORD_COUNT As String = "RECORDCOUNT"
Protected Const METADATA_AGGREGATE_KEY As String = "AGGREGATEKEY"

The blob itself is created when the event stream reader or writer class is instantiated : 

''' <summary>
''' Create a new base for a reader or writer class in the given domain
''' </summary>
''' <param name="AggregateDomainName">
''' The name of the domain to store/retrieve the event streams under
''' </param>
Protected Sub New(ByVal AggregateDomainName As String, ByVal AggregateKey As TAggregationKey)
    MyBase.New(AggregateDomainName)
     'Get the aggregation instance key to use when creating a blob file name
    m_key = AggregateKey
     If (BlobContainer IsNot Nothing) Then
        m_blob = BlobContainer.GetAppendBlobReference(EventStreamBlobFilename)
        If Not m_blob.Exists() Then
            'Make the file to append to if it doesn't already exist
            m_blob.CreateOrReplace()
            'Set the initial metadata
            m_blob.Metadata(METATDATA_DOMAIN) = DomainName
            m_blob.Metadata(METADATA_AGGREGATE_CLASS) = GetType(TAggregate).Name
            m_blob.Metadata(METADATA_AGGREGATE_KEY) = m_key.ToString()
            m_blob.Metadata(METADATA_SEQUENCE) = "0" 'Sequence starts at zero
            m_blob.Metadata(METADATA_RECORD_COUNT) = "0" 'Record count starts at zero
            m_blob.SetMetadata()
        Else
            m_blob.FetchAttributes()
        End If
    End If
End Sub

This is then used to append an event as a two-step process:  first wrap the raw event in a class that indicates the event type, sequence number and other event level metadata and then write the whole to the end of the append blob.

Public Sub AppendEvent(EventInstance As IEvent) Implements IEventStreamWriter(Of TAggregate, TAggregationKey).AppendEvent
    If (AppendBlob IsNot Nothing) Then
        Dim nextSequence As Long = IncrementSequence()
        Dim evtToWrite As New BlobBlockWrappedEvent(nextSequence, 0, EventInstance)
        'Turn the event into a binary stream and append it to the blob
        Dim recordWritten As Boolean = False
        Try
            Using es As System.IO.Stream = evtToWrite.ToBinaryStream()
                 Dim offset As Long = AppendBlob.AppendBlock(es)
            End Using
            recordWritten = True
        Catch exBlob As Microsoft.WindowsAzure.Storage.StorageException
            Throw New EventStreamWriteException(DomainName, AggregateClassName, Key.ToString(), _ 
                nextSequence, "Unable to save a record to the event stream - " & evtToWrite.EventName, exBlob)
        End Try
         If (recordWritten) Then
            IncrementRecordCount()
        End If
    End If
End Sub    

To read events we simply open a snapshot of the append blob as a stream and deserialise wrapped events from that stream:

''' <summary>
''' Get a snapshot of the append blob to use when reading this event stream
''' </summary>
''' <returns></returns>
Private Function GetAppendBlobSnapshot() As CloudAppendBlob
    If (AppendBlob IsNot Nothing) Then
        Return AppendBlob.CreateSnapshot()
    Else
        Return Nothing
    End If
End Function

Private Function GetUnderlyingStream() As System.IO.Stream
    If (AppendBlob IsNot Nothing) Then
       Dim targetStream As New System.IO.MemoryStream()
       Try
           GetAppendBlobSnapshot().DownloadToStream(targetStream)
       Catch exBlob As Microsoft.WindowsAzure.Storage.StorageException
           Throw New EventStreamReadException(DomainName, AggregateClassName, m_key.ToString(), 
             0, "Unable to access underlying event stream", exBlob)
       End Try
       targetStream.Seek(0, IO.SeekOrigin.Begin)
       Return targetStream
   Else
       Return Nothing
   End If
End Function

Public Function GetEvents() As IEnumerable(Of IEvent) 
       Implements IEventStreamReader(Of TAggregate, TAggregationKey).GetEvents
    If (AppendBlob IsNot Nothing) Then
       Dim ret As New List(Of IEvent)
       Dim bf As New BinaryFormatter()
       Using rawStream As System.IO.Stream = GetUnderlyingStream()
           While Not (rawStream.Position >= rawStream.Length)
               Dim record As BlobBlockWrappedEvent = CTypeDynamic(Of BlobBlockWrappedEvent)(bf.Deserialize(rawStream))
               If (record IsNot Nothing) Then
                   ret.Add(record.EventInstance)
               End If
           End While
       End Using
       Return ret
   Else
       Throw New EventStreamReadException(DomainName, AggregateClassName, MyBase.m_key.ToString(),
              0, "Unable to read events - Azure blob not initialised")
   End If
End Function 

3) Using Azure Storage "Files" to persist events 

The implementation that uses Azure files is quite similar to that which uses append blobs except that it pre-allocates the full size of file the first time it is used and thereafter just fills this in as events are added.  In addition it co-opts the file pointer of the start of the event record to be the sequence number of that event.

The record count, aggregate type, aggregate key and current sequence number are also stored as attributes in each event stream file.

''' <summary>
''' An event instance wrapped up in a way that allows it to be stored in an Azure file
''' </summary>
''' <remarks>
''' The size of the event is stored in the outer wrapper to allow a file reader to skip over
''' any events it doesn't need to process
''' </remarks>
<Serializable()>
<DataContract()>
Public Class FileBlockWrappedEvent


    <DataMember(Name:="EventName", Order:=0)>
    Private ReadOnly m_eventName As String
    ''' <summary>
    ''' The class name of the event
    ''' </summary>
    Public ReadOnly Property EventName As String
        Get
            Return m_eventName
        End Get
    End Property

    <DataMember(Name:="Sequence", Order:=1)>
    Private ReadOnly m_sequence As Long
    ''' <summary>
    ''' The sequence number of this record
    ''' </summary>
    Public ReadOnly Property Sequence As Long
        Get
            Return m_sequence
        End Get
    End Property

    <DataMember(Name:="Version", Order:=2)>
    Private ReadOnly m_version As UInteger
    Public ReadOnly Property Version As UInteger
        Get
            Return m_version
        End Get
    End Property

    <DataMember(Name:="Timestamp", Order:=3)>
    Private ReadOnly m_timestamp As DateTime
    Public ReadOnly Property Timestamp As DateTime
        Get
            Return m_timestamp
        End Get
    End Property

    <DataMember(Name:="Size", Order:=4)>
    Private ReadOnly m_eventSize As UInteger
    Public ReadOnly Property EventSize As UInteger
        Get
            Return m_eventSize
        End Get
    End Property

    ''' <summary>
    ''' The .NET class used to serialise/deserialise the underlying event blob data
    ''' </summary>
    ''' <remarks>
    ''' It is possible to derive this by a lookup table from the event name and version if you
    ''' prefer not to save the class name in the event record.
    ''' Usually any storage space critical systems would do this so as to reduce redundant data
    ''' stored.
    ''' </remarks>
    <DataMember(Name:="Class", Order:=4)>
    Private ReadOnly m_eventClassName As String
    Public ReadOnly Property ClassName As String
        Get
            Return m_eventClassName
        End Get
    End Property

    <DataMember(Name:="Data", Order:=5)>
    Private ReadOnly m_eventData As Byte()

    Public ReadOnly Property EventInstance As IEvent
        Get
            If (String.IsNullOrWhiteSpace(m_eventClassName)) Then
                Throw New SerializationException("Unable to determine the event type that wrote this event instance")
            End If

            If (m_eventSize = 0) Then
                Throw New SerializationException("Unable to return the event data for this event instance - size is zero")
            End If

            Dim evtType As Type = Type.GetType(m_eventClassName, True, True)
            If (evtType IsNot Nothing) Then
                Dim bf As New BinaryFormatter()
                Using memStream As New System.IO.MemoryStream(m_eventData)
                    Return CTypeDynamic(bf.Deserialize(memStream), evtType)
                End Using
            End If

            Return Nothing
        End Get
    End Property

    Public Sub New(ByVal sequenceInit As String,
       ByVal versionInit As UInteger,
       ByVal timestampInit As DateTime,
       ByVal eventInstanceInit As IEvent)

        m_eventName = EventNameAttribute.GetEventName(eventInstanceInit)
        m_sequence = sequenceInit
        m_version = versionInit
        m_timestamp = timestampInit

        Dim bf As New BinaryFormatter()
        Using memStream As New System.IO.MemoryStream()
            bf.Serialize(memStream, eventInstanceInit)
            m_eventSize = memStream.Length
            m_eventData = memStream.ToArray()
        End Using
        m_eventClassName = eventInstanceInit.GetType().AssemblyQualifiedName

    End Sub

    Public Sub WriteToBinaryStream(ByVal stream As System.IO.Stream)

        Dim bf As New BinaryFormatter()
        bf.Serialize(stream, Me)

    End Sub

End Class

4) Using local files to store events

If you want to perform unit testing on an event sourcing system or want to set up a trial project to get your head around the concepts before committing to use the technique in a full scale project you can use files on your local machine to store the event streams. 

To do this it uses the aggregate identifier to create an unique filename and appends events to each file accordingly.  There is an "information record" at the start of the file that indicates what domain and full class name the event stream pertains to:

''' <summary>
''' Append an event to the file, without saving the record count
''' </summary>
''' <param name="EventInstance"></param>
Private Sub AppendEventInternal(EventInstance As IEvent(Of TAggregate))

    If (MyBase.m_file IsNot Nothing) Then
        Dim evtToWrite As New LocalFileWrappedEvent(m_eventStreamDetailBlock.SequenceNumber,
                                                    EventInstance.Version,
                                                    DateTime.UtcNow,
                                                    EventInstance, m_setings.UnderlyingSerialiser)
        If (evtToWrite IsNot Nothing) Then
            Using fs = m_file.OpenWrite()
                fs.Seek(0, IO.SeekOrigin.End)
                'write the event to the stream here..
                evtToWrite.WriteToBinaryStream(fs)
                m_eventStreamDetailBlock.SequenceNumber = fs.Position
            End Using
            m_eventStreamDetailBlock.RecordCount += 1
        End If
    End If

End Sub

5) Using local memory to store events

For unit testing or for scenarios where the whole event stream can be stored in the local memory of a machine (for example when an undo-redo buffer is implemented using this technology).

Choosing the storage type to use

In my experience the choice of storage technology depends on the specifics of the system you are building but I would recommend using Azure Tables (or even SQL on Azure) if you want to be able to look into the event streams but use Append Blobs or Files when you need the maximum performance and horizontal scaling. 

In particular if your event streams are likely to have a higher write rate (for example in any IoT instrumentation scenario, a multi player game or a trading platform) then the AppendBlob scales well and is very very fast.

In order to switch between these without major rewriting I have allowed for configuration by specific configuration settings (I think there is still some work to do on this however) to map the aggregate class to the backing technology used to store its event streams and projection:

<CQRSAzureEventSourcingConfiguration>
  <ImplementationMaps>
    <Map AggregateDomainQualifiedName="HospitalWard.Nurse" ImplementationName="InMemoryImplementationExample" SnapshotSettingsName="InMemorySnapshotExample" />
  </ImplementationMaps>

  <Implementations>
    <Implementation Name="InMemoryImplementationExample" ImplementationType="InMemory">
     <InMemorySettings />
    </Implementation>
    <Implementation Name="AzureBlobImplementationExample" ImplementationType="AzureBlob">
      <BlobSettings ConnectionStringName="UnitTestStorageConnectionString" />
    </Implementation>
    <Implementation Name="AzureBlobImplementationDomainExample" ImplementationType="AzureBlob">
      <BlobSettings ConnectionStringName="UnitTestStorageConnectionString" DomainName="Test" />
    </Implementation>
    <Implementation Name="AzureFileImplementationExample" ImplementationType="AzureFile">
      <FileSettings ConnectionStringName="UnitTestStorageConnectionString" InitialSize="20000" />
    </Implementation>
    <Implementation Name="AzureSQLImplementationExample" ImplementationType="AzureSQL">
      <SQLSettings ConnectionStringName="UnitTestStorageConnectionString" AggregateIdentifierField="AggregateKey" />
    </Implementation>
    <Implementation Name="AzureTableImplementationExample" ImplementationType="AzureTable">
      <TableSettings ConnectionStringName="UnitTestStorageConnectionString" SequenceNumberFormat="00000000" />
    </Implementation>
    <Implementation Name="LocalFileSettingsExample" ImplementationType="LocalFileSettings">
      <LocalFileSettings EventStreamRootFolder="C:\CQRS\Data\EventStreams" UnderlyingSerialiser="JSON"/>
    </Implementation>
  </Implementations>

  <SnapshotSettings>
    <SnapshotSetting Name="InMemorySnapshotExample" ImplementationType="InMemory">
      <InMemorySettings />
    </SnapshotSetting>
  </SnapshotSettings>

</CQRSAzureEventSourcingConfiguration>

 

Consuming events and projections 

In order to turn your event stream into something interesting (at least, interesting to a user that wants to query the data) you need to create a projection.  A projection is a view of the effect of a set of events.  For example a financial projection on the above cars event example would be interested in any event that impacted the cost or profit from any given car.   

To consume events you need to create a class that "knows" what kind of events it deals with and what to do with them.  These specific projections can be run over a single aggregate's event stream in order to perform some calculation or operation based on the underlying data of that event stream.

The underlying interface for any projection is:-

    ''' <summary>
''' Marker interface to denote anything as being a projection over the given aggregate identifier
''' </summary>
''' <remarks>
''' The type-safety is to ensure the projection only operates on events of one kind
''' </remarks>
Public Interface IProjection(Of TAggregate As IAggregationIdentifier, TAggregateKey)
    Inherits IProjection

    ''' <summary>
    ''' Perform whatever processing is required to handle the specific event
    ''' </summary>
    ''' <param name="eventToHandle">
    ''' The specific event to handle and perform whatever processing is required
    ''' </param>
    Sub HandleEvent(Of TEvent As IEvent(Of TAggregate))(ByVal eventToHandle As TEvent)

    ' --8<------------------------------------------

End Interface

Each specific projection that implements the class decides what action to perform with the event's data payload

    Public Overrides Sub HandleEvent(Of TEvent As IEvent(Of MockAggregate))(eventToHandle As TEvent) _
           Implements IProjection(Of MockAggregate, String).HandleEvent

    Select Case eventToHandle.GetType()
        Case GetType(MockEventTypeOne)
            HandleMockEventOne(CTypeDynamic(Of MockEventTypeOne)(eventToHandle))
        Case GetType(MockEventTypeTwo)
            HandleMockEventTwo(CTypeDynamic(Of MockEventTypeTwo)(eventToHandle))
        Case Else
            'Nothing to do with this event type
            Throw New ArgumentException("Unexpected event type - " & eventToHandle.GetType().Name)
    End Select

End Sub

' --8<-------------

    Private Sub HandleMockEventOne(ByVal eventToHandle As MockEventTypeOne)

    AddOrUpdateValue(Of Integer)(NameOf(Total), _
            ProjectionSnapshotProperty.NO_ROW_NUMBER, Total + eventToHandle.EventOneIntegerProperty)
    AddOrUpdateValue(Of String)(NameOf(LastString), _
            ProjectionSnapshotProperty.NO_ROW_NUMBER, eventToHandle.EventOneStringProperty)

End Sub

Snapshots

To allow the current state of a projection to be saved - both for use by any readers and for allowing us to have a starting point if we have to rebuild a projection after a service interruption - an interface to define a snapshot of a projection is also defined. This is a way of saying "it was like this at a given known point"

''' <summary>
''' A snapshot of a projection as at a point in time
''' </summary>
''' <typeparam name="IAggregateIdentity">
''' The type of thing that we are snapshotting that can be uniquely identified
''' </typeparam>
''' <remarks>
''' For entites that have a busy or long history it may be performant to store
''' point-in-time snapshots and only project forward from the most recent 
''' snapshot
''' </remarks>
Public Interface ISnaphot(Of In IAggregateIdentity)

    ''' <summary>
    ''' The version number of the highest event that contributed to 
    ''' this snapshot
    ''' </summary>
    ''' <remarks>
    ''' All events of higher version should be applied to this projection
    ''' so as to get the current as-of-now view
    ''' </remarks>
    ReadOnly Property AsOfVersion As Long

End Interface

In practice I save these snapshots as a blob (file) in JSON format - this makes (some of) the query side of the CQRS architecture as simple as finding the snapshot and reading it.

Sequencing and synchronising events

It is often useful to be able to sequence events that occurred to different aggregations together - for example you might have one aggregation for a stock price and another for an account and need to combine the two to give an account valuation at a given point in time.

To facilitate this a master synchronisation field is needed - this could be an incremental number or you can use the date/time of the event occurrence.

To do this an abstract class is used as the base class of all the event types and it handles the synchronisation key.

''' <summary>
''' Base class for all events - allows for a common synchronising property
''' </summary>
Public MustInherit Class EventBase

    ''' <summary>
    ''' The creation timestamp of this event
    ''' </summary>
    ''' <remarks>
    ''' This allows event streams to be combined in a synchronised fashion for
    ''' multi-aggregate snapshots
    ''' </remarks>
    Public Property SynchronisationStamp As Long

    Public Sub New()
        ' Default the synchronisation stamp to now
        SynchronisationStamp = DateTime.UtcNow.Ticks
    End Sub
End Class

Running a projection

Now we want to run a projection over the event stream in order to read the current state of the aggregate from the events that have occured to it.  

Because a projection is a business object rather than a framework object I want to separate it from the underlying implementation of the event stream itself. (This is also very handy to allow you to perform unit tests on a projection by using an in-memory event stream created by the unit test method itself).

In order to do this we have an interface IEventStreamReader which provides the event stream to the projection. Each implementation of a different backing store to an event stream must support this interface:

 

''' <summary>
''' Definition for any implementation that can read events from an event stream
''' </summary>
''' <typeparam name="TAggregate">
''' The data type of the aggregate that identifies the event stream to read
''' </typeparam>
''' <typeparam name="TAggregationKey">
''' The data type of the key that uniquely identifies the specific event  
''' stream instance to read
''' </typeparam>
Public Interface IEventStreamReader(Of TAggregate As IAggregationIdentifier, 
                                       TAggregationKey)

    ''' <summary>
    ''' Get the event stream for a given aggregate
    ''' </summary>
    Function GetEvents() As IEnumerable(Of IEvent(Of TAggregate))

    ''' <summary>
    ''' Gets the event stream for a given aggregate from a given starting version
    ''' </summary>
    ''' <param name="StartingVersion">
    ''' The starting version number for our snapshot
    ''' </param>
    ''' <remarks>
    ''' This is used in scenario where we are starting from  
    ''' a given snapshot version
    ''' </remarks>
    Function GetEvents(ByVal StartingVersion As UInteger) As IEnumerable(Of IEvent(Of TAggregate))

    ''' <summary>
    ''' Gets the event stream and the context information recorded for each event
    ''' </summary>
    ''' <remarks>
    ''' This is typically only used for audit trails as all business functionality 
    ''' should depend on the event data alone
    ''' </remarks>
    Function GetEventsWithContext() As IEnumerable(Of IEventContext)


End Interface

The particular event stream reader to use is passed to a "Projection Processor" which in turn can take a projection class and run that over the event stream:-

    Public Class ProjectionProcessor(Of TAggregate As IAggregationIdentifier,
                                       TAggregateKey)

    ''' <summary>
    ''' The stream reader instance that will be used to run the projections
    ''' </summary>
    Private ReadOnly m_streamReader As IEventStreamReader(Of TAggregate, 
                                        TAggregateKey)


    ''' <summary>
    ''' Process the given projection using the event stream reader 
    ''' we have set up
    ''' </summary>
    ''' <param name="projectionToProcess">
    ''' The class that defines the projection operation we are going to process
    ''' </param>
    Public Sub Process(ByVal projectionToProcess As IProjection(Of TAggregate, 
                                                 TAggregateKey))

        If (m_streamReader IsNot Nothing) Then
            If (projectionToProcess IsNot Nothing) Then
                'Does it support snapshots?
                Dim startingSequence As UInteger = 0
                If (projectionToProcess.SupportsSnapshots) Then
                    'load the most recent snapshot for it

                    'and update the starting sequence
                End If
                For Each evt In m_streamReader.GetEvents(startingSequence)
                    If (projectionToProcess.HandlesEventType(evt.GetType())) Then
                        projectionToProcess.HandleEvent(evt)
                    End If
                Next
            End If
        Else
            'Unable to use this projection as it has no stream reader associated
        End If

    End Sub

    ''' <summary>
    ''' Create a new projection processor that will use the given event 
    ''' stream reader to do its processing
    ''' </summary>
    ''' <param name="readerTouse">
    ''' The event stream processor to use
    ''' </param>
    Friend Sub New(ByVal readerTouse As IEventStreamReader(Of TAggregate, TAggregateKey))
        m_streamReader = readerTouse
    End Sub

End Class

The specific event stream reader implementation class has a factory method to create a projection processor for any given aggregate instance thus:-

    #Region "Factory methods"
 
        ''' <summary>
        ''' Creates an azure blob storage based event stream reader for the
        '''  given aggregate  
        ''' </summary>
        ''' <param name="instance">
        ''' The instance of the aggregate for which we want to read 
        ''' the event stream
        ''' </param>
        ''' <returns>
        ''' </returns>
        Public Shared Function Create(ByVal instance As IAggregationIdentifier(Of TAggregationKey)) 
              As IEventStreamReader(Of TAggregate, TAggregationKey)

            Return New BlobEventStreamReader(Of TAggregate, TAggregationKey)
                (DomainNameAttribute.GetDomainName(instance), instance.GetKey())

        End Function

        ''' <summary>
        ''' Create a projection processor that works off an azure
        ''' blob backed event stream
        ''' </summary>
        ''' <param name="instance">
        ''' The instance of the aggregate for which we want to run projections
        ''' </param>
        ''' <returns>
        ''' A projection processor that can run projections over this event stream
        ''' </returns>
        Public Shared Function CreateProjectionProces(
           ByVal instance As IAggregationIdentifier(Of TAggregationKey)) 
                  As ProjectionProcessor(Of TAggregate, TAggregationKey)

            Return New ProjectionProcessor(Of TAggregate, TAggregationKey )
                     (Create(instance))

        End Function

#End Region

Having created this framework code we can now create our aggregate, event and projection (business logic) classes totally independent of the underlying implementation of their event stream and allowing them to be portable between these different technologies.

Running a classifier

A classifier is a special kind of projection that is used to decide whether or not a given instance of an aggregate is in or out of some defined business meaningful group.  For example if you were implementing a banking system you might have a classifier which ran over the event stream of each account so as to decide which accounts were in the group "accounts in arrears".

Designing your business code

Once you have a framework to use as the basis for an event stream based system you have to create the classes that represent the business parts of that system: the aggregates, events and projections.  You can do this with a graphical designer or if you prefer you can just create the classes in code yourself.

Starting with the aggregate identifier (the "thing" to which events can occur and be recorded) you need to create a class that defines how that aggregate can be uniquely identified.  For this we need to identify the data type for its unique key and, because so many systems use strings for their data storage, a consistent way to turn that unique key into a string.

If we take the example of a bank account that is identified by an unique account number we would end up with an aggregate class like:-

''' <summary>
''' An aggregate representing bank account in the cloud bank demo project
''' </summary>
<DomainName("CloudBank")>
Public Class Account
    Implements IAggregationIdentifier(Of String)

    Private m_bankAccount As String

    Public Sub SetKey(key As String) _
           Implements IAggregationIdentifier(Of String).SetKey
        m_bankAccount = key
    End Sub

    Public Function GetAggregateIdentifier() As String _
              Implements IAggregationIdentifier.GetAggregateIdentifier
        Return m_bankAccount
    End Function

    Public Function GetBankAccountNumber() As String _
           Implements IAggregationIdentifier(Of String).GetKey
        Return m_bankAccount
    End Function


    Public Sub New(ByVal accountNumber As String)
        m_bankAccount = accountNumber
    End Sub

End Class

Then for each event that can occur against this bank account you would need to create an event class that has all of the properties that can be known about the event.  Note that there is no need to define any key event or the mandatory nature - basically any data that can be stored about an event should go into the event definition.

So the event that corresponds to money being deposited into a bank account could look something like:-

''' <summary>
''' Money has been deposited in an account
''' </summary>
<DomainName("CloudBank")>
<AggregateIdentifier(GetType(Account))>
<EventAsOfDate(NameOf(MoneyDeposited.DepositDate))>
Public Class MoneyDeposited
    Implements IEvent(Of Account)

    ''' <summary>
    ''' The effective date of the deposit
    ''' </summary>
    Public Property DepositDate As Nullable(Of DateTime)

    ''' <summary>
    ''' The amount deposited
    ''' </summary>
    Public Property Amount As Decimal

    Public ReadOnly Property Version As UInteger Implements IEvent(Of Account).Version
        Get
            Return 1
        End Get
    End Property


End Class

Notice that the event definition also includes a version number, which you should increment if you ever change or add properties. There is also an optional attribute EventAsOfDate which allows you to specify which property of the event contains the actual real world date and time that the event occured.

Having defined all of the event types you would then move to defining the projections that allow you to get the point-in-time state of the aggregate from its event stream. A projection class needs to know which event types it handles, and for each of these what to do when that event type is encountered. For example a projection that was to give you the running balance of an account would need to handle MoneyDeposited and MoneyWithdrawn events and to update the CurrentBalance property when it encounters them:-

''' <summary>
''' Gets the current balance for any given bank account
''' </summary>
<DomainName("CloudBank")>
<AggregateIdentifier(GetType(Account))>
Public Class CurrentBalanceProjection
    Inherits ProjectionBase(Of Account, String)
    Implements IHandleEvent(Of MoneyDeposited)
    Implements IHandleEvent(Of MoneyWithdrawn)

    Private m_currentBalance As Decimal

    Public ReadOnly Property CurrentBalance As Decimal
        Get
            Return m_currentBalance
        End Get
    End Property

    ''' <summary>
    ''' What events does this projection handle
    ''' </summary>
    ''' <param name="eventType">
    ''' The possible event type
    ''' </param>
    ''' <returns>
    ''' True if the event type is one of:
    ''' MoneyDeposited
    ''' MoneyWithdrawn
    ''' </returns>
    Public Overrides Function HandlesEventType(eventType As Type) As Boolean

        If eventType Is GetType(MoneyDeposited) Then
            Return True
        End If

        If eventType Is GetType(MoneyDeposited) Then
            Return True
        End If

        Return False
    End Function

    Public Overrides ReadOnly Property SupportsSnapshots As Boolean
        Get
            Return True
        End Get
    End Property


    Public Overrides Sub HandleEvent(Of TEvent As IEvent)(eventToHandle As TEvent)

        If GetType(TEvent) Is GetType(MoneyDeposited) Then
            HandleMoneyDepositedEvent(CTypeDynamic(Of MoneyDeposited)(eventToHandle))
        End If

        If GetType(TEvent) Is GetType(MoneyWithdrawn) Then
            HandleMoneyWithdrawnEvent(CTypeDynamic(Of MoneyWithdrawn)(eventToHandle))
        End If

    End Sub

    Public Sub HandleMoneyWithdrawnEvent(eventHandled As MoneyWithdrawn) _
                      Implements IHandleEvent(Of MoneyWithdrawn).HandleEvent

        m_currentBalance -= eventHandled.Amount

    End Sub

    Public Shadows Sub HandleMoneyDepositedEvent(eventHandled As MoneyDeposited) _
                        Implements IHandleEvent(Of MoneyDeposited).HandleEvent

        m_currentBalance += eventHandled.Amount

    End Sub


End Class

As you can see in this example there is a clear and complete separation between the business logic code and the franework code that supports it, allowing for mock-free testing of all the business rule classes.

Notes on the source code

The source code I have attached to this project contains the framework code (and unit test projects) for all the "CQRS on Azure" articles I have uploaded:-

Each project has a read-me file and I recommend reading this and also running through the unit tests to improve the use you can get from this code.

I have deleted the App.Config from the unit test as it contains a reference to my own Azure storage account.  You will need an Azure account in order to run the on-cloud unit tests, but the in memory tests and local file tests can be run without any such access.

<connectionStrings>
  <add name="UnitTestStorageConnectionString" connectionString="TODO - Add your connection string here" providerName="" />
  <!-- UnclassifiedStorageConnectionString -->
  <add name="StorageConnectionString" connectionString="TODO - Add your connection string here" providerName="" />
</connectionStrings>

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Duncan Edwards Jones
Software Developer (Senior)
Ireland Ireland
C# / SQL Server developer
Microsoft MVP 2006, 2007
Visual Basic .NET

You may also be interested in...

Comments and Discussions

 
GeneralMy vote of 5 Pin
ClaudeJean13-Dec-16 17:58
memberClaudeJean13-Dec-16 17:58 
SuggestionSerialising events Pin
Duncan Edwards Jones31-Oct-16 9:30
professionalDuncan Edwards Jones31-Oct-16 9:30 
QuestionPerformance results (not very scientific) Pin
Duncan Edwards Jones8-Aug-16 10:45
professionalDuncan Edwards Jones8-Aug-16 10:45 
AnswerRe: Performance results (not very scientific) Pin
Duncan Edwards Jones10-Nov-16 12:54
professionalDuncan Edwards Jones10-Nov-16 12:54 
QuestionBob. Pin
Member 1263531414-Jul-16 5:21
memberMember 1263531414-Jul-16 5:21 
PraiseAgree with YAY! Pin
John Willson13-Jul-16 9:55
professionalJohn Willson13-Jul-16 9:55 
GeneralRe: Agree with YAY! Pin
Duncan Edwards Jones13-Jul-16 11:48
professionalDuncan Edwards Jones13-Jul-16 11:48 
QuestionProjection snapshots as AppendBlob Pin
Duncan Edwards Jones12-Jul-16 21:44
professionalDuncan Edwards Jones12-Jul-16 21:44 
QuestionAppend Blob Metadata Pin
Tim Murphy25-Jun-16 11:43
memberTim Murphy25-Jun-16 11:43 
AnswerRe: Append Blob Metadata Pin
Duncan Edwards Jones25-Jun-16 12:51
professionalDuncan Edwards Jones25-Jun-16 12:51 
AnswerRe: Append Blob Metadata Pin
Duncan Edwards Jones12-Jul-16 6:01
professionalDuncan Edwards Jones12-Jul-16 6:01 
QuestionPersisting an event stream to a relational db - how would you do it? Pin
Duncan Edwards Jones14-Apr-16 12:12
professionalDuncan Edwards Jones14-Apr-16 12:12 
AnswerRe: Persisting an event stream to a relational db - how would you do it? Pin
nizachon9-Jun-16 13:16
membernizachon9-Jun-16 13:16 
GeneralRe: Persisting an event stream to a relational db - how would you do it? Pin
Duncan Edwards Jones10-Jun-16 1:46
professionalDuncan Edwards Jones10-Jun-16 1:46 
GeneralRe: Persisting an event stream to a relational db - how would you do it? Pin
nizachon10-Jun-16 6:20
membernizachon10-Jun-16 6:20 
QuestionUse of RowKey for sequencing event history Pin
Duncan Edwards Jones24-Jan-14 4:25
memberDuncan Edwards Jones24-Jan-14 4:25 
AnswerRe: Use of RowKey for sequencing event history Pin
Duncan Edwards Jones29-Jan-14 10:05
memberDuncan Edwards Jones29-Jan-14 10:05 
GeneralRe: Use of RowKey for sequencing event history Pin
Haccp Bob16-Feb-15 9:20
memberHaccp Bob16-Feb-15 9:20 
Questionyaay Pin
bryce23-Jan-14 18:16
memberbryce23-Jan-14 18:16 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

Permalink | Advertise | Privacy | Terms of Use | Mobile
Web01 | 2.8.170217.1 | Last Updated 19 Jan 2017
Article Copyright 2014 by Duncan Edwards Jones
Everything else Copyright © CodeProject, 1999-2017
Layout: fixed | fluid