Click here to Skip to main content
13,197,105 members (61,692 online)
Click here to Skip to main content
Add your own
alternative version

Stats

5.4K views
1 bookmarked
Posted 4 Mar 2016

CQRS with Decoupled Messaging - Part IV

, 4 Mar 2016
Rate this:
Please Sign up or sign in to vote.
Fourth post in series of articles that show practical application of CQRS architecture with emphasis on decoupling messaging as infrastructure component

Introduction

In this article, we will look at Inventory Manager Application with focus on implementation of Aggregates and Event sourcing. Use cases for Inventory Manager are based on “Super simple cqrs” example by Greg Young. In Inventory Manager, only the first use case of creating an inventory item has been implemented.

Code for Inventory Manager Application is on GitHub Repository - Located Here

Note that this post is part of series of articles, links to all the articles in the series are provided below:

  1. Introduction
  2. Need for Enterprise Servicebus Frameworks and Decoupled messaging with their samples
  3. Inventory Manager - CQRS application (with decoupled messaging) - Commands
  4. Inventory Manager - CQRS application (with decoupled messaging) - Aggregate and Event Sourcing (this article)
  5. Inventory Manager - CQRS application (with decoupled messaging) - ReadSide

InventoryManager – Aggregate and Event Sourcing

  • As mentioned in videos by Greg Young, Aggregate and Read model DTOs have structural representation and they capture only the final state of the system. However, the source of truth are the events.
  • Aggregate exposes behavior for use cases to consume. This behavior fires off events which are the source of truth. Thus, we persist these events (behavioral model) and derive state (structural model) from events by providing handlers for those events.
  • Thus, aggregate has handlers for its events.
    public class InventoryItem : EventSourced
    {
        private string _name = string.Empty;//Rename functionality will use this field
    
        protected InventoryItem(Guid id)
            : base(id)
        {
            Handles<InventoryItemCreated>(OnInventoryItemCreated); // A. Register handler for event
        }
    
        private void OnInventoryItemCreated(InventoryItemCreated e)
        {
            this._name = e.Name; // C. State is updated in the handler for the event.
            // State is not persisted for the aggregate, 
            // it is the events (source of truth) that get persisted.
        }
    
        public InventoryItem(Guid id, string name):this(id)
        {
            Update(new InventoryItemCreated(id, name)); // B. Some behavior fires off the event
        }
    }

Azure Event Sourced Repository

Ensuring Publishing of Events Upon Save

  • EventStore stores events in Azure Table Storage.
  • In AzureEventSourcedRepository, we need to ensure that upon saving of the events, they are published also.
    public class AzureEventSourcedRepository<T> : IEventSourcedRepository<T> 
                     where T : class, IEventSourced
    {
        // .. Code
        
        public void Save(T eventSourced, string correlationId)
            {
                // TODO: guarantee that only incremental versions of the event are stored
                var events = eventSourced.Events.ToArray();
                var serialized = events.Select
    		(e => _versionedEventSerializer.Serialize(e, typeof (T), correlationId));
    
                _eventStore.Save(eventSourced.Id.ToString(), serialized);
    
                _publisher.Send(eventSourced.Id.ToString(), events.Length);
            }
    }
  • However, it’s not possible to have a Distributed transaction across Azure Table and Azure Service Bus. Azure table storage supports transaction for a unique partition key only. We store Aggregate’s identity in that partition key column. Thus, in order to ensure that events are published, we save 2 copies of events in the event store.
  • One copy is the permanent record of that event, and the other copy becomes part of a virtual queue of events that must be published on the Windows Azure Service Bus. The following code sample shows the Save method in the EventStore class. The prefix “Unpublished” identifies the copy of the event that is part of the virtual queue of unpublished events.
    public class EventStore<T> : IEventStore<T>, IPendingEventsQueue<T>
    {
        // .. Code
        public void Save(string sourceId, IEnumerable<EventData> events)
        {
            var table = _tableClient.GetTableReference(_tableName);
            var tableBatchOperation = new TableBatchOperation();
            foreach (var eventData in events)
            {
                if (eventData.SourceId != sourceId)
                    throw new Exception("Events from different aggregate instances found during EventStore 
    				save. Events from only single Aggregate instance can be saved.");
    
                var creationDate = DateTime.UtcNow;
    
                tableBatchOperation.Insert(eventData.ToAzureTableEntry(creationDate));
    
                // Add a duplicate of this event to the Unpublished "queue"
                tableBatchOperation.Insert(eventData.ToUnpublishedAzureTableEntry(creationDate));
            }
    
            try
            {
                table.ExecuteBatch(tableBatchOperation);
            }
            catch (DataServiceRequestException ex)
            {
                var inner = ex.InnerException as DataServiceClientException;
                if (inner != null && inner.StatusCode == (int)HttpStatusCode.Conflict)
                {
                    throw new ConcurrencyException();
                }
    
                throw;
            }
        }
    }
  • These 2 records are for the same aggregate and thus have the same partitionkey. Hence, they will be saved in a single transaction.
  • Thus, the EventStore class implements both interfaces IEventStore and IPendingEventsQueue.
  • The AzureEventSourced Repository calls save on the EventStore and then calls upon the publisher to publish events for the aggregate instance.
  • The publisher reads the unpublished events records for the specified aggregate and publishes them via IServicebus. After publishing, the publisher deletes the records for unpublished events.
    public class EventStoreBusPublisher<T> : IEventStoreBusPublisher<T>, IDisposable
    {
        // .. Code
        private void SendAndDeletePending(EventData eventData)
        {
            var ev = new VersionedEventSerializer().Deserialize(eventData);
            _sender.Publish(ev);
            _queue.DeletePending(eventData);
        }
    }

EventPublishProcessor

  • There is the possibility of a crash happening after saving of the 2 copies of events but prior to publishing of events in save of the AzureEventSourcedRepository. Since those 2 statements are not wrapped in any (Distributed) Transaction, the system would go into an inconsistent state if such a crash were to happen.
    public class AzureEventSourcedRepository<T> : IEventSourcedRepository<T> where T : class, IEventSourced
    {
        public void Save(T eventSourced, string correlationId)
        {
            // TODO: guarantee that only incremental versions of the event are stored
            var events = eventSourced.Events.ToArray();
            var serialized = events.Select(e => _versionedEventSerializer.Serialize
    						(e, typeof (T), correlationId));
    
            _eventStore.Save(eventSourced.Id.ToString(), serialized);
            // ...IF CRASH HAPPENS HERE, 
            // SYSTEM WILL END UP WITH EVENTS THAT ARE SAVED BUT NEVER PUBLISHED
            _publisher.Send(eventSourced.Id.ToString(), events.Length);
        }
    }
  • Thus, in the case of a failure, the system must include a mechanism for scanning all of the partitions in table storage for aggregates with unpublished events and then publishing those events. This process will take some time to run, but will only need to run when the application restarts.
  • EventPublishProcessor component is registered such that it runs at startup in worker role for the above purpose. It takes dependency on the IEventStoreBusPublisher.
  • EventPublishProcessor is registered as a IProcessor in BootStrapper.
    namespace InventoryManager.Worker.Main
    {
        public class Bootstrapper
        {
            //.. Code
    
            private static void RegisterRepository<T>(CloudStorageAccount storageAccount, 
    			string eventStoreTableName) where T: class, IEventSourced  
            {
                // .. Code
                IoC.RegisterAsSingleton<IProcessor, EventPublishProcessor<T>>(name: typeof(T).Name + 
    				"_EventPublisherProcessor");
            }
        }
    }

InventoryManagerProcessor

  • In the worker role, InventoryManagerProcessor retrieves all the registered IProcessors registered via IoC and then starts them up.
    class InventoryManagerProcessor : IDisposable
    {
        private readonly CancellationTokenSource _cancellationTokenSource;
        private readonly List<IProcessor> _processors;
    
        public InventoryManagerProcessor()
        {
            _cancellationTokenSource = new CancellationTokenSource();
            _processors = IoC.ResolveAll<IProcessor>().ToList();
        }
    
        public void Start()
        {
            _processors.ForEach(p => p.Start());
        }
    
        public void Stop()
        {
            _cancellationTokenSource.Cancel();
            _processors.ForEach(p => p.Stop());
        }
    
        // .. Code
    }
  • The InventoryManagerProcessor is called up during start of the workerRole.
    public class WorkerRole : RoleEntryPoint
    {
        private async Task RunAsync(CancellationToken cancellationToken)
        {
            // .. Code
            using (var processor = new InventoryManagerProcessor())
            {
                processor.Start();
    
                while (!cancellationToken.IsCancellationRequested)
                {   
                    Thread.Sleep(10000);
                }
    
                processor.Stop();
    
                // cause the process to recycle
                return;
            }
        }
    }

Registering EventSourcedRepository

  • EventSourced repository has several dependencies.
  • The method to register a repository has thus been templated out as shown below:
    namespace InventoryManager.Worker.Main
    {
        public class Bootstrapper
        {
            //.. Code
    
            private static void RegisterRepository<T>(CloudStorageAccount storageAccount, 
    			string eventStoreTableName) where T: class, IEventSourced  
            {
                // TODO: See if we can work with registerType (non-singletons) for all of the following
                var eventStore = new EventStore<T>(storageAccount, eventStoreTableName);
                IoC.RegisterInstance<IEventStore<T>>(eventStore);
                IoC.RegisterInstance<IPendingEventsQueue<T>>(eventStore);
                IoC.RegisterAsSingleton<IEventStoreBusPublisher<T>, EventStoreBusPublisher<T>>();
                IoC.RegisterAsSingleton<IEventSourcedRepository<T>, AzureEventSourcedRepository<T>>();
                IoC.RegisterAsSingleton<IProcessor, EventPublishProcessor<T>>
    				(name: typeof(T).Name + "_EventPublisherProcessor");
            }
        }
    }
  • This simplifies creating more repositories for future aggregates.

Registering Handlers for Commands and Events

  • This is similar to the sample application and is done during the bootstrapping of worker role.
    namespace InventoryManager.Worker.Main
    {
        public class Bootstrapper
        {
            //.. Code
    
            private static void RegisterServiceBus()
            {
                var bus = new MassTransitServiceBus(
                    x => new MassTransitWithAzureServiceBusConfigurator
    				(ConfigurationManager.AppSettings.Get("azure-namespace"),
                                     "InventoryManager.WriteSide",
                                     ConfigurationManager.AppSettings.Get("azure-key"), x)
                                     .WithHandler<CreateInventoryItem, InventoryItemAppService>()
                                     .WithHandler<InventoryItemCreated, InventoryViewModelGenerator>());
                ;
                IoC.RegisterInstance<IServiceBus>(bus);
            }
        }
    }
  • Note how the event “InventoryItemCreated” is being subscribed in the ViewModelGenerator. The ViewModelGenerator is part of the worker role and responsible for updating the ReadModel based on the events that were generated.

Next Article in the Series

The next article in the series will focus on Read side in Inventory Manager application.

For a complete list of articles in this series, please go to the Introduction section of this article.
Thanks for reading the articles, hope they are proving insightful.

References

  1. Super simple cqrs” example by Greg Young
  2. Greg Young video on CQRS (about 6 hours)

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Rishabh S Ajmera
United States United States
No Biography provided

You may also be interested in...

Comments and Discussions

 
PraiseNice article Pin
Doug Hieber17-Jun-16 4:13
memberDoug Hieber17-Jun-16 4:13 
QuestionBut you still don't have transactional integrity... Pin
DaveVdE6-Mar-16 23:47
memberDaveVdE6-Mar-16 23:47 
AnswerRe: But you still don't have transactional integrity... Pin
Rishabh S Ajmera12-Mar-16 12:19
memberRishabh S Ajmera12-Mar-16 12:19 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

Permalink | Advertise | Privacy | Terms of Use | Mobile
Web03 | 2.8.171020.1 | Last Updated 4 Mar 2016
Article Copyright 2016 by Rishabh S Ajmera
Everything else Copyright © CodeProject, 1999-2017
Layout: fixed | fluid