CQRS on Windows Azure - Identity Groups and Classifiers






4.89/5 (2 votes)
One way to apply set-theory (relational) operations to event stream based data structures
Introduction
One of the valid criticisms of using event sourcing (also known as event streams) as the data storage mechanism for a business system is that it is more difficult to run business queries over since you do not know the properties of any given entity (or aggregate) until you run the relevant projection over the event stream.
Identity groups and classifiers are one way of addressing this.
Background
If you have not used event streams / event sourcing before, I would really recommend reading the section "Getting your head around event sourcing" from the article: CQRS on Windows Azure - Event Sourcing.
Identity Groups
An identity group is a collection of entities of the same type which share some common attribute or property by which they are grouped into a business-meaningful group.
For example, if you have a banking application with an entity type of "Bank Account", it would be meaningful to create a business meaningful group "Accounts Overdrawn" being the subset of all the accounts which currently have a balance amount below zero. In turn, you might have a business group "Delinquent Accounts" being those members of the "Accounts Overdrawn" group that have not been in credit in over 30 days.
Therefore, an identity group can be identified in code by its name, its parent group name (if it is not a subset of the group "all") :
Public Interface IIdentifierGroup
''' <summary>
''' The unique name of the identity group
''' </summary>
''' <remarks>
''' This name can be passed as a parameter for a query definition.
''' There are two predefined names:-
''' "Identity" being the group of one specified aggregate identifier and
''' "All" being the group of all
''' instances of an aggregate identifier type.
''' </remarks>
ReadOnly Property Name As String
''' <summary>
''' The name of the outer parent group of which all members
''' must be members of to be checked
''' for membership of this group
''' </summary>
''' <remarks>
''' This can be used to speed up evaluation of
''' group membership by starting from a smaller
''' initial group than "All"
''' If not set then "All" is assumed
''' </remarks>
ReadOnly Property ParentGroupName As String
End Interface
And for type-safety, an identity group can be typed to only allow one entity type and to be specific to the unique identifier used to identify instances of the entity. In our bank accounts example, this would be a "Bank Accounts" class uniquely identified by a unique "Account Number" (string).
''' <remarks>
''' The group is uniquely named per aggregate identifier type,
''' and is populated by its own projection which decides
''' if any given aggregate identifier is in or out of the group
''' </remarks>
Public Interface IIdentifierGroup(Of TAggregateIdentifier _
As IAggregationIdentifier, TAggregateKey)
Inherits IIdentifierGroup
End Interface
Classifier
A classifier
is a class that, when run over the event stream of a single entity can classify whether it is in the identity group or not based on some function performed for some or all of the events in the event stream.
A classifier
for a bank account entity that has to classify if the account is in the "Accounts Overdrawn" category would need to handle any events that affected the balance (deposit, withdrawal, charge, tax, interest, etc.) and keep a running total of the balance. If the classifier reached the end of the stream and that running total is below zero, then the classifier deems that account to be inside the identity group, otherwise it is outside.
A classifier
is therefore a very specialized form of projection and can use much of the same underlying functionality.
Public Interface IClassifier
''' <summary>
''' Does the projection handle the data for the given event type
''' </summary>
''' <param name="eventType">
''' The type of the event containing the data that may or may not be handled
''' </param>
''' <returns>
''' True if this event type should get processed
''' </returns>
Function HandlesEventType(ByVal eventType As Type) As Boolean
End Interface
Public Interface IClassifier(Of TAggregate As IAggregationIdentifier, TAggregateKey)
Inherits IClassifier
''' <summary>
''' Perform whatever evaluation is required to handle the specific event
''' </summary>
''' <param name="eventToHandle">
''' The specific event to handle and perform whatever processing is required in order to
''' evaluate the status of the aggregate instance in relation to the identity group
''' </param>
Function Evaluate(Of TEvent As IEvent(Of TAggregate))_
(ByVal eventToHandle As TEvent) As IClassifierEventHandler.EvaluationResult
End Interface
Alternatively, you can perform a classification by first running a projection and then applying some logic to the outcome of that projection. For example, a classifier for "accounts in credit" would first run the "current balance" projection and then run a classification based on whether the balance was greater than zero:-
/// <summary>
/// Use the running balance of the account to decide if it is in credit
/// </summary>
public IClassifierDataSourceHandler.EvaluationResult EvaluateProjection
(IRunning_Balance projection)
{
if (projection.Balance > 0)
{
return IClassifierDataSourceHandler.EvaluationResult.Include;
}
return IClassifierDataSourceHandler.EvaluationResult.Exclude;
}
Processors
The identity group and the classifier are business related classes which do not have any concrete implementation logic for running them over actual event streams. For that, we need special processor classes that can perform the classification and grouping functionality:
''' <summary>
''' Class to run defined classifiers over an event stream to classify
''' an aggregate instance as being
''' inside or outside of the identity group the classifier pertains to
''' </summary>
''' <typeparam name="TAggregate">
''' The class of the aggregate of which this is an instance
''' </typeparam>
''' <typeparam name="TAggregateKey">
''' The data type of the key that uniquely identifies an instance of this aggregate
''' </typeparam>
Public NotInheritable Class ClassifierProcessor_
(Of TAggregate As IAggregationIdentifier, TAggregateKey, TClassifier As IClassifier)
Implements IClassifierProcessor(Of TAggregate, TAggregateKey, TClassifier)
' The stream reader instance that will be used to run the projections
Private ReadOnly m_streamReader As IEventStreamReader(Of TAggregate, TAggregateKey)
Private ReadOnly m_classifier As IClassifier(Of TAggregate, TAggregateKey)
Public Function Classify(Optional ByVal classifierToProcess As IClassifier_
(Of TAggregate, TAggregateKey) = Nothing) _
As IClassifierEventHandler.EvaluationResult Implements IClassifierProcessor_
(Of TAggregate, TAggregateKey, TClassifier).Classify
If (classifierToProcess Is Nothing) Then
If (m_classifier IsNot Nothing) Then
classifierToProcess = m_classifier
End If
End If
If m_streamReader IsNot Nothing Then
If (classifierToProcess IsNot Nothing) Then
Dim startingSequence As UInteger = 0
Dim retVal As IClassifierEventHandler.EvaluationResult = _
IClassifierEventHandler.EvaluationResult.Unchanged
For Each evt In m_streamReader.GetEvents(startingSequence)
If (classifierToProcess.HandlesEventType(evt.GetType())) Then
retVal = classifierToProcess.Evaluate(evt)
End If
Next
' Return the evaluation status as at the end of the event stream
Return retVal
End If
End If
'If no classification was performed, leave the result as unchanged..
Return IClassifierEventHandler.EvaluationResult.Unchanged
End Function
''' <summary>
''' Create a new classifier processor that will use the given event stream reader
''' to do its processing
''' </summary>
''' <param name="readerTouse">
''' The event stream processor to use
''' </param>
''' <param name="classifier">
''' (Optional) The classifier class that does the actual evaluation
''' </param>
Friend Sub New(ByVal readerTouse As IEventStreamReader(Of TAggregate, TAggregateKey),
Optional classifier As IClassifier(Of TAggregate, TAggregateKey) = Nothing)
m_streamReader = readerTouse
If (classifier IsNot Nothing) Then
m_classifier = classifier
End If
End Sub
Snapshots
Given that every event stream is immutable, it therefore follows that if a classifier
run over it is deterministic then it is possible to take snapshots of the classifier
state as at a given point in the event stream which can then be used as a starting point for the classifier
from that point onwards.
Equally, any identity group that is based off these deterministic classifier
s is also snapshot-able. In practice, I have added a property to the classifier
itself that the developer can use to indicate whether a classifier
can be written to a snapshot or whether the entire event stream must be read every time.
Parallelism
Because classifier
s run on one event stream and cannot have any interaction with each other, the classification process is inherently parallel. This allows for a scale-across architecture of simply adding more classifier
processor machines if the load on the identity groups exceeds that which can be handled at present.
Points of Interest
An alternative (and far more usual) way to deal with the issue of running business queries over event streams is to persist the projections to a read-only database over which business queries can be run.
History
- 23rd August, 2016: Initial design
- 28th December 2016: Added projection evaluation as an alternative classification method