Click here to Skip to main content
16,016,306 members
Articles / Programming Languages / C#

CQRS : A Cross Examination Of How It Works

Rate me:
Please Sign up or sign in to vote.
4.97/5 (141 votes)
21 May 2015CPOL38 min read 409.3K   211   117
A look at how a CQRS app may work, with the added bonus of a demo app

Introduction

About 3 years ago I went to a talk given my some dude on CQRS (Command Query Responsibility Segregation). At the time I thought mmm that's a neat idea, but I could not really see how it applied to my every day job. So I kind of forgot about it a bit. 2 years or so pass, and the place where I work hired this new dude, turns out it was the guy that did the talk on CQRS. That person is Ashic Mahtab whom I have now had the pleasure to work with. Working with Ashic on a day to day basis spiked my interest in CQRS again. On more than one occasion I would have an issue with my code, and Ashic would overhear me cursing at the screen, where he would casually walk over and go through the issue with me, and then with an all knowing (AKA smug) smile, explain how that could never happen using CQRS & Event Sourcing. Ashic is a good CQRS knowledge base and was asked to review the Microsoft Patterns & Practices book "A Journey Into CQRS & Event Sourcing", and lists Greg Young amongst his friends/associates. As you can imagine Ashic has been very useful to bounce ideas/questions off while I have been trying to learn CQRS. As such I have vetted a great many questions via him, so I hope the material I present here is accurate.

Ashic  if you are reading this, thanks so much for putting up with my (sometimes) silly questions, and thanks for ALWAYS taking the time to explain things to me in an easy to understand manner, that nearly always made more sense of things in the end. Sir I salute you.

Anyway I decided that I needed to understand this pattern/architecture a bit better. So I set about reading a lot of material on it, and watching some videos and reading some books on it, and examining frameworks out there on it. The fruit of that is this article and the attached demo app.

 

In this article I will be covering both CQRS and event sourcing. This article is about me trying to understand CQRS, and trying to explain it to others.

 

So What Exactly Is CQRS?

As stated in the introduction CQRS stands for "Command Query Responsibility Segregation", but what does that mean? Lets look a few quotes from well known technologists.

In his book "Object Oriented Software Construction," Bertrand Meyer introduced the term "Command Query Separation" to describe the principle that an object's methods should be either commands or queries. A query returns data and does not alter the state of the object; a command changes the state of an object but does not return any data. The benefit is that you have a better understanding what does, and what does not, change the state in your system.

CQRS takes this principle a step further to define a simple pattern.


"CQRS is simply the creation of two objects where there was previously only one. The separation occurs based upon whether the methods are a command or a query (the same definition that is used by Meyer in Command and Query Separation: a command is any method that mutates state and a query is any method that returns a value)."


�Greg Young, CQRS, Task Based UIs, Event Sourcing age!


What is important and interesting about this simple pattern is how, where, and why you use it when you build enterprise systems. Using this simple pattern enables you to meet a wide range of architectural challenges, such as achieving scalability, managing complexity, and managing changing business rules in some portions of your system.


"CQRS is a simple pattern that strictly segregates the responsibility of handling command input into an autonomous system from the responsibility of handling side-effect-free query/read access on the same system. Consequently, the decoupling allows for any number of homogeneous or heterogeneous query/read modules to be paired with a command processor. This principle presents a very suitable foundation for event sourcing, eventual-consistency state replication/fan-out and, thus, high-scale read access. In simple terms, you don't service queries via the same module of a service that you process commands through. In REST terminology, GET requests wire up to a different thing from what PUT, POST, and DELETE requests wire up to."

�Clemens Vasters (CQRS Advisors Mail List)

 

Some Good CQRS Resources

 

 

 

So What Exactly Is Event Sourcing?

I quite like the way Martin Fowler describes this one:

We can query an application's state to find out the current state of the world, and this answers many questions. However there are times when we don't just want to see where we are, we also want to know how we got there.

Event Sourcing ensures that all changes to application state are stored as a sequence of events. Not just can we query these events, we can also use the event log to reconstruct past states, and as a foundation to automatically adjust the state to cope with retroactive changes.

 

http://martinfowler.com/eaaDev/EventSourcing.html

 

If that doesn't make sense to you, think about a typical CRUD operation where we would store the current state of an object in the database. With event sourcing we would store events associated with a particular object based on some generated ID (most often a Guid), which is the ID of the aggregate. Where if we wanted to know the current state of the object we would retrieve all the past events and play them in sequence to get to the current state. In a nutshell that is how event sourcing works.

 

NOTE : If you want to use EventSourcing you should probably use CQRS, but if you want to create a CQRS system you don't necessarily need Event Sourcing.

 

 

So Why Might We Use CQRS?

In most applications I have seen I think it is fair to say that you may have something like this

  • UI code
  • Service to facilitate remote call from clients (REST / SOAP)
  • Transformation from DTOs
  • Validation layer
  • Business logic
  • Repositories / unit of work
  • Data access layer
  • Data access objects

 

That is quite a few layers.  When you consider that most apps read a hell of a lot more than they write, does it really make sense for the reads to go through all this layering? Probably not. Perhaps a more sane model would be one where we use these layers when we want to write something, and when we want to read something we could expect the data to be valid, and as such, could we not bypass a lot of this and just grab the data directly from the database? That is certainly one compelling reason to use CQRS.

Event sourcing may also be used with CQRS, and when it does, it adds the ability to get full audit of the changes performed for a given aggregate. This is accomplished by storing the events in some form of event store and replaying them on the aggregate.

 

These are 2 compelling reasons to use CQRS.

 

 

Is It A Silver Bullet?

I personally do not think it is suitable for every part of your application. For example imagine this quite common web based scenario: 

"The user must type their username and password into some input fields, after which the system must try and identify the user, by some lookup mechanism. If the user is obtained, the system should retrieve some profile information which is held in session for 4 hours or until the user logs out"

 

I personally do NOT think this is a good fit for CQRS (other people may disagree), as it implies a more request/response type of operation, where an immediate response is needed based on the user login request. This is kind of at logger heads with a typical CQRS arrangement. With CQRS we should be able to clearly separate the command to do something , from a query to retrieve something. In this scenario we do need a pretty much immediate response to a command, and we do not want/wish to perform any additional query, we need the response right now for the current request. So it is not really a suitable CQRS candidate (at least in my opinion).

 

Contrast that with this sort of scenario: 

"The user may alter an open order, that has not yet been dispatched, and where the difference between today and the dispatch date is greater than 24 hours. The user may alter certain attributes of the order such as

  • Delivery Address
  • Quantity

The system shall allow the user requested changes to be stored against the order, and shall inform the user that the changes have been made and shall be dealt with shortly"

 

This (to my mind) is a much better fit for being modelled using CQRS, as it is not really a request/response type of operation, as the command to change the details can clearly be separated from the query to obtain the modified order details.

Albeit there may be other hairy issues raised by doing the CQRS operation fully asynchrounsly, I will talk about that next.

 

Where Is The Code

The demo code for this article can be obtained from my github account :

https://github.com/sachabarber/SachaBarber.CQRS.Demo

 

Prerequisites

I have tried my hardest to keep the prerequisites of the demo code down to the bare minimum. I have been good to you by using RavenDB embedded for the read model, and by creating an in memory event store, there is still however a need for the following bits to be installed:

 

 

I would try and make sure the RabbitMQ server is set to start as a windows service automatically using the system account.

 

 

The Building Blocks Of A CQRS Architected Application

The diagram below shows the typical building blocks/process flows of a CQRS application.

Image 1

CLICK FOR BIGGER IMAGE

 

There may be some subtle changes made here and there to the above building blocks, but this seems to be the most commonly accepted architecture for building CQRS apps. I will describe each of the blocks above in bit of detail below, but we will revisit these in much more detail when we discuss the demo app as a whole.

 

Client

The client could be any form of client really, a win forms app, WPF app, web site etc. As long as it can send a command it could be a client.

The demo app I use here is a WPF client that utilises WCF to send commands to the domain model.

 

Command Bus

The command bus is really just used to obtain a handler for the incoming command. There are some cool libraries around that do things like this, such as Jimmy Bogard's MediatR. Thing is with that is that library is that it is a bit overkill for the requirements. As such I have used a simple bit of reflection.

 

Command Handlers

These are the handlers that will deal with executing the command logic. There should be a 1:1 mapping between an incoming command and command handler.

 

Domain

The domain model is the central part of the system, and it this that will have any historical events from the event store played against the domain model aggregates. Or have any new events (via incoming commands) applied to the aggregates, before any new events are persisted to the events store.

 

Services

These are any domain services that do not fit with aggregate roots or value types that are part of the domain.

 

Repository

The repository pattern is used for 2 main purposes:

  • To store any uncommitted events to the event store.
  • To load an aggregate from the event store based on its ID (which as I have already stated, is usually a Guid generated before the aggregates 1st event is even stored in the event store)

 

NOTE :  This is writing to the "Write" model, where we store the events in the event store. This is due to the fact that this article uses CQRS with Event Sourcing.

 

Event Bus

This is used to publish out events that been applied to the domain model.

 

Event Handlers

These are handlers that will deal with executing the event logic. There may be more than 1 handler associated with an event. For example suppose you have mutilpe read models, one for the UI to read from, another to drive some reporting requirements etc etc. So there may be a requirement for more event handlers than just 1, but that will depend on your specific needs.

For this demo app there is a 1:1 mapping from event to event handler, as there is only 1 read model, and no other event listeners.

As such for the demo app it is this bit of the process where the read model is updated. The read model may be a relational database or may even be NoSQL, but it should be a denormalized representation of the client requirements. Or at the very least in some sort of representation that makes it very easy for the client to deal with the data within the read model. There may or may not be a good parity between handled events and the representation that the read model requires, it may even be required to transform the data in some way. This may also be known as "Projections", we will see more on that later.

 

NOTE :  This is writing to the "Read" model (but only internally in the domain, the client may NOT write to the read model only read from it)

 

Thin Data Layer

As previously stated we want to read model to be as easy to deal with as possible, as such a simple data access layer is all that is needed. Perhaps some uber simple repository is needed, but that would be all, no validation/no business logic, just a way of getting stuff out of the read model.

 

Query Facade

We should allow queries to be executed almost directly over the read model. Remember from the clients point of view, its a read ONLY model. As such the client may even read directly from the read model.

 

 

Synchronous vs Asynchronous (Eventual Consistency)

One of the things that I immediately realised when I started this demo app, was that there is a tremendous difference in the complexities if you choose to go with everything asynchronous versus synchronous. What I mean by that is that if you go down the synchronous route you may have something like this (pseudo code)

 

Synchronous Code (Pseudo Code)

If we assume that the  knowledge that all parts of the system are synchronous. That is the internals of the CQRS architecture that we using are fully synchronous components. Then we might have something like this:

 

FireCommand(new CommitOrderCommand(Guid.NewGuid(), [] { "bag of peas"});

Orders = readModelService.GetAllOrders();

 

We have no issues here, as we simply fire a command (that would talk to the write model of the CQRS domain model), and let that run to completion, and then read from the read model, at which point we would expect our new order to be there. All good

 

Asynchronous Code (Pseudo Code)

This approach uses the idea of eventual consistency, where we make a write to the domain model (initiated via a command), but we do this with the knowledge that all parts of the system are asynchronous. That is the internals of the CQRS architecture that we using are fully asynchronous components. This could mean using interprocess buses such as

  • NetMQ
  • RabbitMQ
  • NServiceBus
  • MassTransit
  • MSMQ
  • Azure ServiceBus

 

FireCommand(new CommitOrderCommand(Guid.NewGuid(), [] { "bag of peas"});

......

......

 

Essentially we now can not longer predict the correct time to update from the read model. When would we do that? How do we know when to do that? The answer lies in a message being passed from the event handlers in the CQRS system, that tell those interested (primarily a user interface UI) that something has happened in the domain model. The message could contain the actual data or could simply act as a trigger to state that a certain event has occurred in the domain model and let the consumers of this new message.

This later approach is the one I have taken in the app (Where RabbitMQ is used for this purpose). Where the read model is updated 1st in response to a published event from the event store, and then the CQRS framework pushes out a message in response to the read model being updated, and those parties that subscribe to this message will be notified at which point they may choose to do something about it, such as update themselves (a ViewModel/View if the subscriber is a UI) by reading from the read model again.

This does raise an interesting question. Say you fire a command (assume from a UI) across to change something about a domain aggregateRoot object. Say you have an order and you fire a "Change Address Command" at the CQRS system. You still have the original order with the current address in the UI (which we assumed above), and at some point down the line the UI will be notified about some event that occurred in the domain model at which point the UI would read from the read model. The read model WILL ONLY contain the new address once the entire workflow has been completed (well up to the update to the read model that is). So up until this time the UI will be in a different (some would call it in-consistent) state than the read model.

 

This is known as "eventual consistency", and is something that you will have to get used to when doing fully asynchronous systems. The data will eventually be consistent, or it may end up in a error state that may even require some manual intervention to make it consistent again (that is worst error case though).

 

 

To Framework Or NOT?

Now Ashic that I work with, has always stated that he thinks using a framework for CQRS is not a great way to go, as you may need more control than a framework offers sometimes. This is certainly true, and is a common problem. That said, this advise to not use a framework was coming from someone who has a lot of experience of working with CQRS that has written these systems from scratch, I have not, and I still wanted to get a CQRS application to work.

 

So I decided that I would use a framework, I hunted around and found quite a few, but the one I liked the most was called CQRSlite. Which is an extended version of an older framework by Greg Young. It offers the following features:

  • Command sending and event publishing
  • Unit of work through session with aggregate tracking
  • Repository for getting and saving aggregates
  • Optimistic concurrency checking
  • In process bus with auto registration of handlers
  • Snapshotting
  • Caching with concurrency checks and updating to latest version

 

I have found it be quite intuitive to work with, and have not regretted my decision to use it.

 

 

 

The Demo App

The demo app is a small WPF app. We will not be dissecting any code in this section, that will be discussed in a while. In this section I just wanted to cover the main scenarios that we will cover, and how the UI allows for them.

 

WARNINGS

 

#1

The demo app uses WCF for its comms between the UI and server side (write model). To give things a chance to spin up and be ready, there is a hardcoded delay of 10 seconds before the UI attempts to do anything. You should NOT change this. In a normal environment the WCF service would be up and running all the time, so this fake delay would not be needed.

 

#2

Since I did not want to burden you lot with having to install too much extra software, I have used as much in memory stuff as I can. As such there is an in memory event store. This means that events are not persisted if the application is killed/dies. Due to this feature, we also need to ensure the read model is cleansed on every run too. This is handled for you, don't worry. I just did not want anyone being too surprised when they stopped the demo app, and started it again, and saw none of the stuff they saved before. This is by design, and it is for your own good

 

How To Run The Demo App

Once you have the codebase and have installed the prerequisites you should open the Visual Studio (as an Administrator) and ensure that the following 2 projects are set to run:

  1. SachaBarber.CQRS.Demo.Domain
  2. SachaBarber.CQRS.Demo.WPFClient

 

 

 

What's Included In The Demo App?

The demo app that this article offers the following scenarios:

 

Create A New Order

The demo app starts up with a bunch of comics. Where you may select the comics you want, and then click the add button, which will allow some minimal information. This will eventually create a new order (via the write model) which will cause an OrderCreatedEvent notification to come back to the UI (via RabbitMQ message). At which point the UI will show a toast style message, and will update itself by reading from the read model.  The current order(s) view will not be shown initially but the user may choose to show that by using the side bar button (right hand side of screen).

The following screen shots show you how you would carry out this scenario

 

Image 2

CLICK FOR BIGGER IMAGE

 

You can select the items you want in your order, and click the button at the top to create a new order.

 

Image 3

Once you click the new order button you can fill in some minimal data about the order.

 

Image 4

CLICK FOR BIGGER IMAGE

 

When the order is created in the domain model and has also been added to the read model. At which point the UI will receive a RabbitMQ notification after which the UI will read from the read model which will show the list of all orders from the read model. The orders view is available from the right hand side button.

 

 

Image 5

CLICK FOR BIGGER IMAGE

This is the orders view, that shows the orders from the read model.

 

 

Delete An Order

From the current order(s) view the user may choose to delete an order. This will eventually delete the requested order (via the write model) which will cause an OrderDeletedEvent notification to come back to the UI (via RabbitMQ message). At which point the UI will show a toast style message, and will update itself by reading from the read model.

Image 6

 

 

Change Address Of An Order

From the current order(s) view the user may choose to go into edit mode (the user may also cancel edit mode) for the selected order.  

Image 7

At which point the user may edit the order address. Once the user edits the user address they will have the ability to update the order address by clicking a button.

Image 8

 

This will eventually update the order address (via the write model) which will cause a OrderAddressChangedEvent notification to come back to the UI (via RabbitMQ message). At which point the UI will show a toast style message, and will update itself by reading from the read model.

 

 

Ok, so we have now covered quite a lot of ground, we should hopefully now know :

  • What CQRS is
  • Wat event sourcing is
  • That we are using the CQRSlite framework
  • What the demo app does

 

So its about time to start examining some code then.

 

 

The Write Model

This section discusses the write model. Which comes into play when a client of the domain model issues a command. For the demo app this means that a UI will be issuing the commands.

 

How The Demo UI Issues Commands

The demo app calls a async/await enabled WCF service. There is the standard WCF stuff, a proxy/ClientChannel etc etc. I will not bore you with that, but this is what typical usage of the WCF service looks like from the UI to send commands:

C#
await orderServiceInvoker.CallService(service =>
    service.SendCommandAsync(new ChangeOrderAddressCommand()
    {
        ExpectedVersion = this.Order.Version,
        Id = this.Order.OrderId,
        NewAddress = this.Order.Address
    }));

 

This is the WCF service that the UI calls. It is even using aynsc/await (neat huh).

 

C#
[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
//Useful when debugging, see App.Config too
//[ErrorHandlerBehavior]
public class OrderService : IOrderService
{
    private readonly OrderCommandHandlers commandHandlers;
    private readonly IReadModelRepository readModelRepository;

    public OrderService(
    	OrderCommandHandlers commandHandlers, 
    	IReadModelRepository readModelRepository)
    {
        this.commandHandlers = commandHandlers;
        this.readModelRepository = readModelRepository;
    }


    public async Task<bool> SendCommandAsync(Command command)
    {
        await Task.Run(() =>
        {
            var meth = (from m in typeof(OrderCommandHandlers)
                .GetMethods(BindingFlags.Public | BindingFlags.Instance)
                        let prms = m.GetParameters()
                        where prms.Count() == 1 && 
                        	prms[0].ParameterType == command.GetType()
                        select m).FirstOrDefault();

            if (meth == null)
                throw new BusinessLogicException(
                    string.Format("Handler for {0} could not be found", 
                    	command.GetType().Name));

            meth.Invoke(commandHandlers, new[] { command });
        });
        return true;
    }
}

 

Seperate Commands

Note that there is a single SendCommandAsync(Command command) method. So how does that work? That is standard WCF stuff actually. If we examine the Command class that will become clear . We simply use the [KnownType] attribute.

C#
[DataContract]
[KnownType(typeof(CreateOrderCommand))]
[KnownType(typeof(ChangeOrderAddressCommand))]
[KnownType(typeof(DeleteOrderCommand))]
public abstract class Command : ICommand
{
    [DataMember]
    public Guid Id { get; set; }

    [DataMember]
    public int ExpectedVersion { get; set; }
}

 

How Are Command Handlers Called?

Within the demo app OrderService, there is an IOC satisified constructor dependency for OrderCommandHandlers. This object contains all the command handlers for the demo app. This is just how I have done it for the demo app, but you could do it another way, entirely up to you. Anyway there is a OrderCommandHandlers class which is called via reflection within the OrderService where the correct Handle(..) method is located (based on incoming Command type) and called.

Here is the code for the OrderCommandHandlers class:

 

C#
public class OrderCommandHandlers : ICommandHandler<CreateOrderCommand>,
                                    ICommandHandler<ChangeOrderAddressCommand>,
                                    ICommandHandler<DeleteOrderCommand>
{
    private readonly ISession _session;

    public OrderCommandHandlers(ISession session)
    {
        _session = session;
    }

    public void Handle(CreateOrderCommand command)
    {
        var item = new Order(
            command.Id, 
            command.ExpectedVersion, 
            command.Description, 
            command.Address,
            command.OrderItems.Select(x => new OrderItem()
            {
                OrderId = x.OrderId,
                StoreItemDescription = x.StoreItemDescription,
                StoreItemId = x.StoreItemId,
                StoreItemUrl = x.StoreItemUrl
            }).ToList());
        _session.Add(item);
        _session.Commit();
    }


    public void Handle(ChangeOrderAddressCommand command)
    {
        Order item = _session.Get<Order>(
            command.Id, command.ExpectedVersion);
        item.ChangeAddress(command.NewAddress);
        _session.Commit();
    }

    public void Handle(DeleteOrderCommand command)
    {
        Order item = _session.Get<Order>(
            command.Id, command.ExpectedVersion);
        item.Delete();
        _session.Commit();
    }
}

 

What Does A Command Actually Do?

Well that depends, but before we get into the the possible scenarios, lets talk about something that happens inn all the command handle methods. Whicb is to use the ISession object to Add()/Get() and Commit() methods. Shown below is the ISession implementation (this is part of the CQRSlite framework)

 

ISession Implementation

C#
public class Session : ISession
{
    private readonly IRepository _repository;
    private readonly Dictionary<Guid, AggregateDescriptor> _trackedAggregates;

    public Session(IRepository repository)
    {
        if(repository == null)
            throw new ArgumentNullException("repository");

        _repository = repository;
        _trackedAggregates = new Dictionary<Guid, AggregateDescriptor>();
    }

    public void Add<T>(T aggregate) where T : AggregateRoot
    {
        if (!IsTracked(aggregate.Id))
            _trackedAggregates.Add(aggregate.Id,
                new AggregateDescriptor
                {
                    Aggregate = aggregate, 
                    Version = aggregate.Version
                });
        else if (_trackedAggregates[aggregate.Id].Aggregate != aggregate)
            throw new ConcurrencyException(aggregate.Id);
    }

    public T Get<T>(Guid id, int? expectedVersion = null) where T : AggregateRoot
    {
        if(IsTracked(id))
        {
            var trackedAggregate = (T)_trackedAggregates[id].Aggregate;
            if (expectedVersion != null && trackedAggregate.Version != expectedVersion)
                throw new ConcurrencyException(trackedAggregate.Id);
            return trackedAggregate;
        }

        var aggregate = _repository.Get<T>(id);
        if (expectedVersion != null && aggregate.Version != expectedVersion)
            throw new ConcurrencyException(id);
        Add(aggregate);

        return aggregate;
    }

    private bool IsTracked(Guid id)
    {
        return _trackedAggregates.ContainsKey(id);
    }

    public void Commit()
    {
        foreach (var descriptor in _trackedAggregates.Values)
        {
            _repository.Save(descriptor.Aggregate, descriptor.Version);
        }
        _trackedAggregates.Clear();
    }

    private class AggregateDescriptor
    {
        public AggregateRoot Aggregate { get; set; }
        public int Version { get; set; }
    }
}

 

The main things that this class provides are:

  • The ability to add a new aggregate, this simply adds the aggregate to a list of tracked aggregates
  • The ability to track aggregates. Those are ones that have been seen for a particular session
  • The ability to get aggregates. This delegates the work to an IRepository implementation, which we look at next
  • The ability to commit changes made during the current session

 

CacheRepository

The exact IRepository that the ISession implementation uses in the demo app is a CacheRepository, which decorates a regular IRepository implementation. Anyway here is the CacheRepository implementation (this is part of the CQRSlite framework)

C#
public class CacheRepository : IRepository
{
    private readonly IRepository _repository;
    private readonly IEventStore _eventStore;
    private readonly MemoryCache _cache;
    private readonly Func<CacheItemPolicy> _policyFactory;
    private static readonly ConcurrentDictionary<string, object> _locks = 
        new ConcurrentDictionary<string, object>();

    public CacheRepository(IRepository repository, IEventStore eventStore)
    {
        if(repository == null)
            throw new ArgumentNullException("repository");
        if(eventStore == null)
            throw new ArgumentNullException("eventStore");

        _repository = repository;
        _eventStore = eventStore;
        _cache = MemoryCache.Default;
        _policyFactory = () => new CacheItemPolicy
            {
                SlidingExpiration = new TimeSpan(0,0,15,0),
                RemovedCallback = x =>
                {
                    object o;
                    _locks.TryRemove(x.CacheItem.Key, out o);
                }
            };
    }

    public void Save<T>(T aggregate, int? expectedVersion = null) 
        where T : AggregateRoot
    {
        var idstring = aggregate.Id.ToString();
        try
        {
            lock (_locks.GetOrAdd(idstring, _ => new object()))
            {
                if (aggregate.Id != Guid.Empty && !IsTracked(aggregate.Id))
                    _cache.Add(idstring, aggregate, _policyFactory.Invoke());
                _repository.Save(aggregate, expectedVersion);
            }
        }
        catch (Exception)
        {
            _cache.Remove(idstring);
            throw;
        }
    }

    public T Get<T>(Guid aggregateId) where T : AggregateRoot
    {
        var idstring = aggregateId.ToString();
        try
        {
            lock (_locks.GetOrAdd(idstring, _ => new object()))
            {
                T aggregate;
                if (IsTracked(aggregateId))
                {
                    aggregate = (T)_cache.Get(idstring);
                    var events = _eventStore.Get(aggregateId, aggregate.Version);
                    if (events.Any() && events.First().Version != aggregate.Version + 1)
                    {
                        _cache.Remove(idstring);
                    }
                    else
                    {
                        aggregate.LoadFromHistory(events);
                        return aggregate;
                    }
                }

                aggregate = _repository.Get<T>(aggregateId);
                _cache.Add(
                    aggregateId.ToString(), 
                    aggregate, 
                    _policyFactory.Invoke());
                return aggregate;
            }
        }
        catch (Exception)
        {
            _cache.Remove(idstring);
            throw;
        }
    }

    private bool IsTracked(Guid id)
    {
        return _cache.Contains(id.ToString());
    }
}

 

Before we look at the regular IRepository implementation, there is one uber important point in the above code. See how in the Get(..) method the event store is queried for ALL historical events, which are then used to populate the current state of the aggregate. This is how the event sourcing parts works. We basically load the aggregate state based on past events.

The other thing that the above code doe is to cache any aggregates it sees for some time period. After all it is a CacheRepository.

 

Repository Class

The regular Repository implementation (ie non caching), deals with storing uncommitted aggregate events to the event store, and notifying the read model via the IEventPublisher implementation (internal application bus) that is IOCd in (more on this functionality later). It also deals with loading all historical events for a particular aggregate by using the IEventStore implementation, which again is IOCd in. In the demo app the methods in the Repository instance are called by the CacheRepository.

 

Here is the code for the Repository class (this is part of the CQRSlite framework)

 

C#
public class Repository : IRepository
{
    private readonly IEventStore _eventStore;
    private readonly IEventPublisher _publisher;

    public Repository(IEventStore eventStore, IEventPublisher publisher)
    {
        if(eventStore == null)
            throw new ArgumentNullException("eventStore");
        if(publisher == null)
            throw new ArgumentNullException("publisher");
        _eventStore = eventStore;
        _publisher = publisher;
    }

    public void Save<T>(T aggregate, int? expectedVersion = null) 
	where T : AggregateRoot
    {
        if (expectedVersion != null && _eventStore.Get(
                aggregate.Id, expectedVersion.Value).Any())
            throw new ConcurrencyException(aggregate.Id);
        var i = 0;
        foreach (var @event in aggregate.GetUncommittedChanges())
        {
            if (@event.Id == Guid.Empty) 
                @event.Id = aggregate.Id;
            if (@event.Id == Guid.Empty)
                throw new AggregateOrEventMissingIdException(
                    aggregate.GetType(), @event.GetType());
            i++;
            @event.Version = aggregate.Version + i;
            @event.TimeStamp = DateTimeOffset.UtcNow;
            _eventStore.Save(@event);
            _publisher.Publish(@event);
        }
        aggregate.MarkChangesAsCommitted();
    }

    public T Get<T>(Guid aggregateId) where T : AggregateRoot
    {
        return LoadAggregate<T>(aggregateId);
    }

    private T LoadAggregate<T>(Guid id) where T : AggregateRoot
    {
        var aggregate = AggregateFactory.CreateAggregate<T>();

        var events = _eventStore.Get(id, -1);
        if (!events.Any())
            throw new AggregateNotFoundException(id);

        aggregate.LoadFromHistory(events);
        return aggregate;
    }
}

 

 

WOWSERS. That Was Quite The Diversion, Getting Back On Track

Ok sorry about that, as I say that was a bit of a diversion. We were talking about how commands were handled, before we started looking onto how the ISession workflow worked. Lets remind outselves of what that the command handler code looked like. Here it is again:

 

Here is the code for the OrderCommandHandlers class:

C#
public class OrderCommandHandlers : ICommandHandler<CreateOrderCommand>,
                                    ICommandHandler<ChangeOrderAddressCommand>,
                                    ICommandHandler<DeleteOrderCommand>
{
    private readonly ISession _session;

    public OrderCommandHandlers(ISession session)
    {
        _session = session;
    }

    public void Handle(CreateOrderCommand command)
    {
        var item = new Order(
            command.Id, 
            command.ExpectedVersion, 
            command.Description, 
            command.Address,
            command.OrderItems.Select(x => new OrderItem()
            {
                OrderId = x.OrderId,
                StoreItemDescription = x.StoreItemDescription,
                StoreItemId = x.StoreItemId,
                StoreItemUrl = x.StoreItemUrl
            }).ToList());
        _session.Add(item);
        _session.Commit();
    }


    public void Handle(ChangeOrderAddressCommand command)
    {
        Order item = _session.Get<Order>(
            command.Id, command.ExpectedVersion);
        item.ChangeAddress(command.NewAddress);
        _session.Commit();
    }

    public void Handle(DeleteOrderCommand command)
    {
        Order item = _session.Get<Order>(
            command.Id, command.ExpectedVersion);
        item.Delete();
        _session.Commit();
    }
}

Ok cool, refreshed and ready to rock.......so back to what the commands do with when they are handled.

  1. If the aggregate is a new one, the IOC'd ISession.Add() method be called, followed by the ISession.Commit()
  2. If the aggregate is NOT a new one the  IOC'd ISession.Get(..) method be called

You should examine the code above a bit to see how that works in detail. What I want to show you now is what happens when we handle a command for an existing aggregate. Lets take the DeleteOrderCommand for example.

 

Aggregate Apply

So lets follow the flow for DeleteOrderCommand , which so far has given us an aggregate, where we call the Order aggregate Delete() method. Lets see that method shall we.

 

Here is the Order aggregate

C#
public class Order : AggregateRoot
{
    private string description;
    private string address;
    private bool isDeleted;
    private List<OrderItem> orderItems;

    private void Apply(OrderCreatedEvent e)
    {
        Version = e.Version;
        description = e.Description;
        address = e.Address;
        isDeleted = false;
        orderItems = e.OrderItems;
    }


    private void Apply(OrderDeletedEvent e)
    {
        Version = e.Version;
        isDeleted = true;
    }

    public void Delete()
    {
        ApplyChange(new OrderDeletedEvent(Id, Version));
    }

    private void Apply(OrderAddressChangedEvent e)
    {
        Version = e.Version;
        address = e.NewOrderAddress;
    }

    public void ChangeAddress(string newAddress)
    {
        if (string.IsNullOrEmpty(newAddress))
            throw new ArgumentException("newAddress");
        ApplyChange(new OrderAddressChangedEvent(Id, 
		newAddress,Version));
    }


    private Order() { }

    public Order(
        Guid id,
        int version,
        string description,
        string address,
        List<OrderItem> orderItems
        )
    {
        Id = id;
        ApplyChange(new OrderCreatedEvent(id, description, 
		address, orderItems, version));
    }
}

Where it can be seen that there are several overloads of the Apply(..) method. Essentially there is one per command action. We will continue to follow the .Delete() method.  This simply calls ApplyChange(new OrderDeletedEvent(Id, Version));

To understand what this ApplyChange(..) method is doing we need to go into the CQRSlite framework AggregateRoot code. So lets see that.

Here it is:

C#
public abstract class AggregateRoot
{
    private readonly List<IEvent> _changes = new List<IEvent>();

    public Guid Id { get; protected set; }
    public int Version { get; protected set; }

    public IEnumerable<IEvent> GetUncommittedChanges()
    {
        lock (_changes)
        {
            return _changes.ToArray();
        }
    }

    public void MarkChangesAsCommitted()
    {
        lock(_changes)
        {
            Version = Version + _changes.Count;
            _changes.Clear();
        }
    }

    public void LoadFromHistory(IEnumerable<IEvent> history)
    {
        foreach (var e in history)
        {
            if (e.Version != Version + 1)
                throw new EventsOutOfOrderException(e.Id);
            ApplyChange(e, false);
        }
    }

    protected void ApplyChange(IEvent @event)
    {
        ApplyChange(@event, true);
    }

    private void ApplyChange(IEvent @event, bool isNew)
    {
        lock (_changes)
        {
            this.AsDynamic().Apply(@event);
            if (isNew)
            {
                _changes.Add(@event);
            }
            else
            {
                Id = @event.Id;
                Version++;
            }
        }
    }
}

It can be seen that this base class is responsibe for managing events for a particular aggregate. This includes loading them from history and also working out which applied events are new (for the session) and need to applied to the event store.

When we call the ApplyChange(..) method this will result in the actual aggregates Apply(..) overloaded method being called for the correct event. Once it is played on the aggregate the aggregate can change its only internal state based on the applied event.

For example here is the Order aggregates Apply(OrderDeletedEvent e) where it can be seen that the Order aggregates internal state is mutated.

C#
private void Apply(OrderDeletedEvent e)
{
    Version = e.Version;
    isDeleted = true;
}

This workflow is pretty hairy, so you may need to read all of the above again (and again), to get a correct understanding

 

 

The Event Store

The CQRSlite framework contains the following interface for the event store.

C#
public interface IEventStore 
{
    void Save(IEvent @event);
    IEnumerable<IEvent> Get(Guid aggregateId, int fromVersion);
}

 

As such we need to create an implementation of this to deal with the saving/fetching of events that the domain model raises/needs (to apply to the aggregates(s)).

As I wanted to keep the dependencies as small as possible for this demo app, I have gone for an in memory event store, but it should not be too hard to plug in another one. There are some good ones available such as

  • NEventStore which has adaptors for quite a few databases (NoSQL and relational)
  • GetEventStore (Greg Young is a partner in this one)

 

Anyway like I say this demo app contains an in memory event store, which is as follows:

C#
public class InMemoryEventStore : IEventStore
{
    private readonly Dictionary<Guid, List<IEvent>> _inMemoryDB = 
        new Dictionary<Guid, List<IEvent>>();

    public IEnumerable<IEvent> Get(Guid aggregateId, int fromVersion)
    {
        List<IEvent> events;
        _inMemoryDB.TryGetValue(aggregateId, out events);
        return events != null 
            ? events.Where(x => x.Version > fromVersion) 
            : new List<IEvent>();
    }

    public void Save(IEvent @event)
    {
        List<IEvent> list;
        _inMemoryDB.TryGetValue(@event.Id, out list);
        if (list == null)
        {
            list = new List<IEvent>();
            _inMemoryDB.Add(@event.Id, list);
        }
        list.Add(@event);
    }
}

 

Hopefully nothing too daunting there, its essentially a wrapper around a Dictionary<TKey,TValue>, so I think its fine.

 

 

The Read Model

This section discusses the read model. Which comes into play after the new events have been stored in the event store, and published internally within the domain model by the the Repository class which is part of the CQRSlite framework.

 

Event Handlers

Any events that are stored in the current session and that end up being persisted to the event store, and also published internally within the domain model using a simple internal process publisher. There are then event handlers that act on those published events. The event handlers have the job of updating the read model.

If you were using a synchronous call (that is call a command and then be able to read from the read model directly afterwards) we would be done at this point. But since I opted to make a fully asynchronous system (just to see how far down the rabbit hole goes),  things are not finished at this point.

So proceeding then.....once the read model is updated we also need to tell any consumers of the read model that something has changed. For the demo app this is the job of the event handler(s).

Lets see the pieces that make this happen.

 

Source Of Published Events

Within the CQRSlite framework there is the following code inside of the regular IRepository implemenation code, where it can be seen that there is a line that publishes out any events (see this line _publisher.Publish(@event) ) that are saved to the event store. We saw this earlier, when we were discussing the write model/commanding side of things. This is the crossover between the incoming writes, and updating the read model based on persisted events (remember the demo app uses CQRS + event sourcing).

 

C#
public void Save<T>(T aggregate, int? expectedVersion = null) 
	where T : AggregateRoot
{
    if (expectedVersion != null && _eventStore.Get(
		aggregate.Id, expectedVersion.Value).Any())
        throw new ConcurrencyException(aggregate.Id);
    var i = 0;
    foreach (var @event in aggregate.GetUncommittedChanges())
    {
        if (@event.Id == Guid.Empty) 
            @event.Id = aggregate.Id;
        if (@event.Id == Guid.Empty)
            throw new AggregateOrEventMissingIdException(
		aggregate.GetType(), @event.GetType());
        i++;
        @event.Version = aggregate.Version + i;
        @event.TimeStamp = DateTimeOffset.UtcNow;
        _eventStore.Save(@event);
        _publisher.Publish(@event);
    }
    aggregate.MarkChangesAsCommitted();
}

 

IEventPublisher

The call to the IEventPublisher.Publish(..) made within the CQRSlite framework regular IRepository, should result in a event handler being called that will handle the published event.  The CQRSlite framework has an implementation of IEventPublisher, but I chose to roll my own one (as I wanted to use async/await), which is shown below. One of the really nice things about the CQRSlite framework is that it is very pluggable. i.e. you don't like the default implementation, just swap it out.

The code below is a pretty simple implemention that attempts to find the correct IBusEventHandler for an incoming event, and calls the event handlers Handle method. The array of IBusEventHandlers is supplied by an IOC container (Castle).

C#
public class BusEventPublisher : IEventPublisher
{
    private readonly IBusEventHandler[] _handlers;
    private Dictionary<Type,MethodInfo> methodLookups = 
        new Dictionary<Type, MethodInfo>(); 

    public BusEventPublisher(IBusEventHandler[] handlers)
    {
        _handlers = handlers;

        foreach (var handler in _handlers)
        {
            var meth = (from m in handler.GetType()
                    .GetMethods(BindingFlags.Public | BindingFlags.Instance)
                        let prms = m.GetParameters()
                        where prms.Count() == 1 && m.Name.Contains("Handle")
                        select new
                        {
                            EventType = prms.First().ParameterType,
                            Method = m
                        }).FirstOrDefault();
            if (meth != null)
            {
                methodLookups.Add(meth.EventType, meth.Method);
            }

        }

    }

    public void Publish<T>(T @event) where T : IEvent
    {

        var theHandler = _handlers.SingleOrDefault(
            x => x.HandlerType == @event.GetType());

        if (theHandler == null)
            throw new BusinessLogicException(
                string.Format("Handler for {0} could not be found", 
                @event.GetType().Name));

        Task.Run(() =>
        {
            methodLookups[@event.GetType()].Invoke(
                theHandler, new[] {(object) @event});
        }).Wait();

    }
}

 

Typical Event Handler Implementation

Lets take a moment here.....where are we?

 

We have seen who is the source of the new domain events that need to be acted apon, which is done by the CQRSlite regular IRepository implementation.

We have also seen how these events are published within the domain model, which is done by the CQRSlite IEventPublisher implementation.

 

So what does an event handler look like and what does it do with an incoming event?

Well a typical event handler looks like this, where it can be seen that the code below, where a typical event handler will take the following dependencies:

  1. IReadModelRepository :  To allow the event handler to make changes to the read model
  2. IInterProcessBus : To allow the domain model to broadcast read model change notifications to any read model consumer process (RabbitMQ is used for this, obviously the consumer will need to have some RabbitMQ subscriber code to make this happen)

 

C#
public class OrderCreatedEventHandler : IBusEventHandler<OrderCreatedEvent>
{
    private readonly IReadModelRepository readModelRepository;
    private readonly IInterProcessBus interProcessBus;

    public OrderCreatedEventHandler(
        IReadModelRepository readModelRepository,
        IInterProcessBus interProcessBus)
    {
        this.readModelRepository = readModelRepository;
        this.interProcessBus = interProcessBus;
    }

    public Type HandlerType
    {
        get { return typeof (OrderCreatedEvent); }
    }

    public async void Handle(OrderCreatedEvent orderCreatedEvent)
    {
        await readModelRepository.AddOrder(new ReadModel.Models.Order()
        {
            OrderId = orderCreatedEvent.Id,
            Address = orderCreatedEvent.Address,
            Description = orderCreatedEvent.Description,
            Version = orderCreatedEvent.Version,
            OrderItems = orderCreatedEvent.OrderItems.Select(x =>
                new ReadModel.Models.OrderItem()
                {
                    OrderId = x.OrderId,
                    StoreItemId = x.StoreItemId,
                    StoreItemDescription = x.StoreItemDescription,
                    StoreItemUrl = x.StoreItemUrl
                }).ToList()
        });

        interProcessBus.SendMessage("OrderCreatedEvent");
    }
}

 

Just in case it is not clear enough, the event handler would be used to update the read model with any changes that need to happen as the result of the imcoming domain event. As I have also stated I went for a full asyncronous example, so the demo app code needs to notifiy read model consumers of the changes to the read model. This is discussed in a bit more detail below.

 

 

 

The Database

For the read model I chose to use a NoSQL database. This however could easily be a relational database. Though if you do choose to go for a traditional relational database route, you should think about how to denormalize the data contained in the events, such that the UI (or other consumers of the read model) will have everything they need by querying as fewer tables as possible. Ideally the real model should have everything needed for a particular view in the UI (or a report if some sort of reporting software is the read model consumer) in a single table. There may be duplicated data in this table, but that is ok, speed is what we care about here, everything we need in a read only format as quickly as possible.

One thing if you do go for raditional relational database, you may find that some sort of projection library may be helpful. There is a good one on GitHub called Projac. Which allows you to take in some type T and project that out to some SQL code.

 

Though as I say, I am using a NoSQL database for the read model, namely RavenDB embedded, which I chose for 2 reasons:

  1. The read model I needed was quite suited to a document database
  2. By using RavenDB embedded I could just install it via NuGet, and it was no extra burden on you guys

 

The read model for the demo app is realised using a very simple repository. I could have gone crazy with this and created a unit of work implementation (since RavenDB is based on the NHibernate ISession concepe, in fact I have that code if anyone is interested let me know) pretty easily, but for the sake of brevity decided to keep things as simple as possible.

 Anyway here is the read model repository

C#
public interface IReadModelRepository
{
    Task<bool> CreateFreshDb();
    Task<bool> SeedProducts();
    Task<List<T>> GetAll<T>();
    Task<bool> AddOrder(Order order);
    Task<bool> DeleteOrder(Guid orderId);
    Task<bool> UpdateOrderAddress(Guid orderId, string newAddress, int version);
    Task<Order> GetOrder(Guid orderId);
}


public class ReadModelRepository : IReadModelRepository
{

    private IDocumentStore documentStore = null;
    private string dataDir = @"C:\temp\RavenDb";

    public ReadModelRepository()
    {
       
    }

    public async Task<bool> CreateFreshDb()
    {

        documentStore = new EmbeddableDocumentStore
        {
            DataDirectory = dataDir,
            UseEmbeddedHttpServer = true
        };

        documentStore.Initialize();

        //Add order Index
        if (documentStore.DatabaseCommands.GetIndex("Order/ById") == null)
        {
            documentStore.DatabaseCommands.PutIndex(
                "Order/ById",
                new IndexDefinitionBuilder<Order>
                {
                    Map = ords => from order in ords 
                                    select new { Id = order.Id }
                });
        }

        var storeItems = await this.GetAll<StoreItem>();
        if (!storeItems.Any())
        {
            await SeedProducts();
        }
        await DeleteAllOrders();
        return true;
    }

    public Task<bool> SeedProducts()
    {
        return Task.Run(() =>
            {
                using (IDocumentSession session = documentStore.OpenSession())
                {
                        CreateStoreItem(session,"RatGood.jpg","Rat God");
                        CreateStoreItem(session, "NeverBoy.jpg", "Never Boy");
                        CreateStoreItem(session, "Witcher.jpg", "Witcher");
                        CreateStoreItem(session, "Eight.jpg", "Eight");
                        CreateStoreItem(session, "MisterX.jpg", "Mister X");
                        CreateStoreItem(session, "CaptainMidnight.jpg", "Captain Midnight");
                    session.SaveChanges();
                }
                return true;
            });
    }

    public Task<List<T>> GetAll<T>()
    {
        List<T> items = new List<T>();

        return Task.Run(() =>
            {
                using (IDocumentSession session = documentStore.OpenSession())
                {
                    int start = 0;
                    while (true)
                    {
                        var current = session.Query<T>()
                            .Customize(x => x.WaitForNonStaleResults())
                            .Take(1024).Skip(start).ToList();
                        if (current.Count == 0) break;

                        start += current.Count;
                        items.AddRange(current);

                    }
                }
                return items;
            });
    }

    public Task<bool> AddOrder(Order order)
    {
        return Task.Run(() =>
            {
                using (IDocumentSession session = documentStore.OpenSession())
                {
                    session.Store(order);
                    session.SaveChanges();
                }
                return true;
            });
    }

    public Task<bool> DeleteOrder(Guid orderId)
    {
        return Task.Run(() =>
        {
            using (IDocumentSession session = documentStore.OpenSession())
            {
                var order = session.Query<Order>()
                    .SingleOrDefault(x => x.OrderId == orderId);
                session.Delete(order);
                session.SaveChanges();
            }
            return true;
        });
    }

    public Task<bool> UpdateOrderAddress(Guid orderId, string newAddress, int version)
    {
        return Task.Run(() =>
        {
            using (IDocumentSession session = documentStore.OpenSession())
            {
                var order = session.Query<Order>()
                    .SingleOrDefault(x => x.OrderId == orderId);
                order.Address = newAddress;
                order.Version = version;
                session.SaveChanges();
            }
            return true;
        });
    }


    public Task<Order> GetOrder(Guid orderId)
    {
        return Task.Run(() =>
        {
            using (IDocumentSession session = documentStore.OpenSession())
            {
                return session.Query<Order>()
                    .SingleOrDefault(x => x.OrderId == orderId);
            }
        });
    }




    private void CreateStoreItem(IDocumentSession session, string imageUrl, 
        string description)
    {
        StoreItem newStoreItem = new StoreItem
        {
            StoreItemId = Guid.NewGuid(),
            ImageUrl = imageUrl,
            Description = description
        };
        session.Store(newStoreItem);
    }

    private async Task<bool> DeleteAllOrders()
    {
        await Task.Run(() =>
        {
            var staleIndexesWaitAction = new Action(() =>
                {
                    while (documentStore.DatabaseCommands.GetStatistics()
                        .StaleIndexes.Length != 0)
                    {
                        Thread.Sleep(10);
                    }
                });
            staleIndexesWaitAction.Invoke();
            documentStore.DatabaseCommands
                .DeleteByIndex("Order/ById", new IndexQuery());
            staleIndexesWaitAction.Invoke();
        });
        return true;
    }
}

 

 

Notifying The UI Of Changes

As I have stated on  numerous occassions in this text, the decision I took with this article, was to go for a fully asynchronous system, just so I could see what difficulties one would come up against in real life (I know I am a masichist). As such we need a way of notifying the read model consumers (UI in the demo apps case) that something has happened to the read model, and that the read model consumer should refresh its data based on this incoming notification (or it could choose not to, that is the consumers call).

I decided to use RabbitMQ for this job. You could use other technologies, but I chose RabbitMQ.

I chose to send a simple notification message (event name as a string) rather than some DTO. This way the consumer can decide what they need to read from the read model.

There are obviously 2 parts to this communication of events from read model to the read model consumer, interprocess notification producer/comsumer. We will look at each of these parts here.

 

InterProcessBus (Producer)

This would typically be called at the end of an event handler in the domain model, right after the read model has been updated. As I say, this for me, is a simple string, which states what type of event just changed the read mode. There is not too much to say here, we simply publish a string (the name of the event that just changed the read model) on a queue using RabbitMQ

C#
public class InterProcessBus : IInterProcessBus
{

    private readonly string busName;
    private readonly string connectionString;

    public InterProcessBus()
    {
        this.busName = "InterProcessBus";
        this.connectionString = ConfigurationManager.AppSettings["RabbitMqHost"];
    }

    public void SendMessage(string message)
    {
        var factory = new ConnectionFactory() { HostName = connectionString };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                var bytes = Encoding.ASCII.GetBytes(message);
                channel.ExchangeDeclare(busName, "fanout");
                channel.BasicPublish(busName, string.Empty, null, bytes);
            }
        }
    }
}

 

So that is the read model (domain model) side, but what about the read model consumer. As I say in the demo apps case this is a UI. So lets see what that looks like next.

 

How The UI Reads The ReadModel (Consumer)

The first part of this is to use a RabbitMQ subscriber that is listening for messages from the RabbitMQ publisher. This is shown below in the InterProcessBusSubscriber code

 

C#
public class InterProcessBusSubscriber : IInterProcessBusSubscriber, IDisposable
{

    private readonly string busName;
    private readonly string connectionString;
    private CancellationTokenSource cancellationToken;
    private Task workerTask;
    private Subject<string> eventsSubject = new Subject<string>();

    public InterProcessBusSubscriber()
    {
        this.busName = "InterProcessBus";
        this.connectionString = 
            ConfigurationManager.AppSettings["RabbitMqHost"];
        StartMessageListener();
    }

    private void StartMessageListener()
    {
        cancellationToken = new CancellationTokenSource();
        workerTask = Task.Factory.StartNew(
            () => ListenForMessage(), cancellationToken.Token);
    }

    public void Dispose()
    {
        CancelWorkerTask();
    }

    private void CancelWorkerTask()
    {
        if (workerTask == null) return;
        cancellationToken.Cancel();
        workerTask.Wait();
        workerTask.Dispose();
    }

    private void ListenForMessage()
    {
        var factory = new ConnectionFactory() { HostName = connectionString };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(busName, "fanout");

                bool durable = true;
                bool exclusive = false;
                bool autoDelete = false;

                var queue = channel.QueueDeclare(
                    Assembly.GetEntryAssembly().GetName().Name,
                    durable, exclusive, autoDelete, null);
                channel.QueueBind(queue.QueueName, busName, string.Empty);
                var consumer = new QueueingBasicConsumer(channel);

                channel.BasicConsume(queue.QueueName, false, string.Empty, consumer);

                while (true)
                {
                    if (cancellationToken.IsCancellationRequested)
                        break;
                    BasicDeliverEventArgs ea;
                    consumer.Queue.Dequeue(10, out ea);

                    if (ea == null)
                        continue;

                    var message = Encoding.ASCII.GetString(ea.Body);
                    Task.Run(async () =>
                        {
                            await Task.Run(() =>
                                {
                                    eventsSubject.OnNext(message);
                                });
                        });
                    channel.BasicAck(ea.DeliveryTag, false);
                }

            }

        }
    }


    public IObservable<string> GetEventStream()
    {
        return eventsSubject.AsObservable();
    }
}

 

I have taken the decision to expose an RX IObservable<string> from this RabbitMQ subscriber. This is nice as that means that any ViewModel that makes use of this RabbitMQ subscriber, can also subscribe (using RX) to the IObservable<string>, where it may choose to use any of the standard RX/LINQ operators, which is pretty useful. Here is an example of how this process works.

 

C#
public OrdersViewModel(
        IInterProcessBusSubscriber interProcessBusSubscriber,
        OrderServiceInvoker orderServiceInvoker,
        IMessageBoxService messageBoxService)
    {
        orderEvents = new List<string>()
        {
            "OrderCreatedEvent","OrderAddressChangedEvent","OrderDeletedEvent"
        };

        growlNotifications.Top = SystemParameters.WorkArea.Top + topOffset;
        growlNotifications.Left = SystemParameters.WorkArea.Left + 
            SystemParameters.WorkArea.Width - leftOffset;

        var stream = interProcessBusSubscriber.GetEventStream();


        disposables.Add(stream.Where(x => orderEvents.Contains(x))
            .Subscribe(async x =>
            {
                var newOrders = await orderServiceInvoker.CallService(service =>
                            service.GetAllOrdersAsync());

                this.Orders = new List<OrderViewModel>(
                    newOrders.Select(ord => new OrderViewModel(
				ord, messageBoxService, orderServiceInvoker)));
                this.HasOrders = Orders.Any();

                if (this.HasOrders)
                {
                    growlNotifications.AddNotification(new Notification
                    {
                        Title = "Orders changed",
                        ImageUrl = "pack://application:,,,/Images/metroInfo.png",
                        Message =
                            "New/modified orders have been obtained from the ReadModel. " + 
			    "Click on the right hand side panel to see them"
                    });
                }
            })
        );
           
    }

}

 

There are a couple of take away points in this ViewModel code.

  1. By using RX, we are able to only react to events of interest to the actual ViewModel we are in
  2. We can use the event name to determine what action to perform in the ViewModel. In this case ALL of the events simply cause the ViewModel to refresh the list of orders by reading from the read model again (via a simple WCF call orderServiceInvoker.CallService(service => service.GetAllOrdersAsync())) which simple uses the ReadModelRepository we saw earlier. IDue to the way I want to clean out the RavenDB when the app was run, I needed one place where this would happen, so had to have the acess to the read model behind a service (WCF) layer call. f were using a relational database we could simply have the client call into some ADO / micro orm code directly.

 

And that is pretty much how the demo app updates after a change to the read model.

 

 

 

Further Considerations

The following 2 points are things that I was aware of, but did not implement in the demo

  • Event Versioning
  • Snapshoting

That said I am not a complete deuche bag, and will certainly talk you through these items, and shall also point you at some good resources that talk you through them in a bit more detail.

 

Event Versioning

When using Event Sourcing you store your events in an Event Store. This Event Store can only insert new events and read historical events, nothing more nothing less. So when you change your domain logic and also the events belonging to this behavior, then you cannot go back into the Event Store and do a one time convert of all the historical events belonging to the same behavior. The Event Store needs to stay intact, that is one of its powers.

So you make a new version of the original event, this new version carries more or less information then the original one.

http://cre8ivethought.com/blog/2010/02/09/cqrs-event-versioning

 

Mark shows you one approach that would work when you have a need for multiple versions of events. That is a great place to start.

 

Snapshoting

One part of Event Sourcing that could become problematic is those objects with long, complex lifetimes. In virtually all cases, an object's lifetime is relatively short--perhaps a dozen events or so. But there are cases in which an object may live for a very, very long time and be used frequently. Greg gives an example in one if his talks of an object that gets thousands of new events per day. Loading up this object can be expensive because you have to load up all of the state transitions since the object's inception.

One shortcut around this is the concept of a snapshot. You send the aggregate a snapshot command message of some kind and it produces a snapshot message which contains all of its state--along the lines of converting a domain object to a DTO, except this comes from inside of the domain object rather than from the outside.

Once we have this snapshot message, we persist it. Then, when loading up the object from storage, we load all of the events up to and including the last snapshot. This allows us to restore the object to a certain state and then "replay" all events since the last snapshot.

http://blog.jonathanoliver.com/event-sourcing-and-snapshots/

 

Like I have stated this is not something that I have implemented in the demo code base, but it is something that is catered for in the CQRSlite framework, and would not take too much effort to add. You would simply need to do the following:

  • Swap to using a SnapshotAggregateRoot instead of AggregateRoot type, and implement some extra methods
  • Swap to using a SnapshotRepository instead of a regular Repository

 

There is some examples of snapshots working in one of the test cases, that would be a good place to start if you wanted to also include snapshotting.

 

 

That's It

That is all I wanted to say this time. I do hope this is useful for some of you out there. I have certainly learned a hell of a lot by wanting to write an article about this. I honestly believe if you can learn enough to explain something to someone else you have gained a good insight into something. Obviously I could be talking absolute rubbish, but I certainly hope I am not, and would hope any practising DDD/CQRS experts that may stumble across this article, and read it would correct anything I have got completly wrong.

 

That said I have spent quite a bit of time on this one, so I am optimistic (perhaps falsely) that I have done a semi-reasonable job on it.

 

Anyway if you like what you see, feel free to leave a comment, or even better a vote, that would be freeking awesome.

Credits

A big shout goes out to the team of highly talented devs, that told me that there is no such thing as a golden bullet. It should in fact have been a silver bullet. Next they will be telling me there are no bunyips or sky panthers or flying otters. Jeez

 

 

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer (Senior)
United Kingdom United Kingdom
I currently hold the following qualifications (amongst others, I also studied Music Technology and Electronics, for my sins)

- MSc (Passed with distinctions), in Information Technology for E-Commerce
- BSc Hons (1st class) in Computer Science & Artificial Intelligence

Both of these at Sussex University UK.

Award(s)

I am lucky enough to have won a few awards for Zany Crazy code articles over the years

  • Microsoft C# MVP 2016
  • Codeproject MVP 2016
  • Microsoft C# MVP 2015
  • Codeproject MVP 2015
  • Microsoft C# MVP 2014
  • Codeproject MVP 2014
  • Microsoft C# MVP 2013
  • Codeproject MVP 2013
  • Microsoft C# MVP 2012
  • Codeproject MVP 2012
  • Microsoft C# MVP 2011
  • Codeproject MVP 2011
  • Microsoft C# MVP 2010
  • Codeproject MVP 2010
  • Microsoft C# MVP 2009
  • Codeproject MVP 2009
  • Microsoft C# MVP 2008
  • Codeproject MVP 2008
  • And numerous codeproject awards which you can see over at my blog

Comments and Discussions

 
QuestionRabbitMq part is missing from the diagram. Pin
Blue Sky Sep202124-Sep-21 7:44
Blue Sky Sep202124-Sep-21 7:44 
AnswerRe: RabbitMq part is missing from the diagram. Pin
Sacha Barber24-Sep-21 7:48
Sacha Barber24-Sep-21 7:48 
GeneralGreat Article Pin
Alireza_136212-Jul-20 13:58
Alireza_136212-Jul-20 13:58 
PraiseGreat work ! Pin
Emmanuel Nuyttens17-Apr-19 0:21
Emmanuel Nuyttens17-Apr-19 0:21 
GeneralRe: Great work ! Pin
Sacha Barber24-Jun-19 0:23
Sacha Barber24-Jun-19 0:23 
QuestionCQRS Question for web apps Pin
Eduard Keilholz18-Sep-18 3:23
Eduard Keilholz18-Sep-18 3:23 
QuestionValidation of Aggregate Properties before raising event. Pin
mccperalta25-Aug-18 23:50
mccperalta25-Aug-18 23:50 
QuestionConnecting two aggregates by events Pin
SergeySHT18-Nov-17 14:33
SergeySHT18-Nov-17 14:33 
QuestionRead model concerns Pin
bumbem20-Aug-17 21:57
bumbem20-Aug-17 21:57 
AnswerRe: Read model concerns Pin
Sacha Barber4-Sep-17 3:52
Sacha Barber4-Sep-17 3:52 
GeneralRe: Read model concerns Pin
bumbem4-Sep-17 3:58
bumbem4-Sep-17 3:58 
PraiseAmazing Article !!! Pin
pikougr27-Apr-17 22:51
pikougr27-Apr-17 22:51 
GeneralRe: Amazing Article !!! Pin
Sacha Barber11-May-17 3:49
Sacha Barber11-May-17 3:49 
QuestionResponsibility of publishing events within implementation of IEventStore Pin
Won Lee20-Mar-17 6:51
Won Lee20-Mar-17 6:51 
QuestionMe, a meticulous reader ... Pin
peterkmx18-Jan-17 2:06
professionalpeterkmx18-Jan-17 2:06 
QuestionVery lengthy description. Pin
swapnilmj19-Dec-16 20:18
swapnilmj19-Dec-16 20:18 
AnswerRe: Very lengthy description. Pin
Eduard Keilholz18-Sep-18 3:25
Eduard Keilholz18-Sep-18 3:25 
QuestionSend event metadata over the InterProcessBus for events Pin
abrasat1-Sep-16 5:39
abrasat1-Sep-16 5:39 
Questionabout InProcessBus`s generic method -- RegisterHandler Pin
Member 1025638814-Aug-16 19:57
Member 1025638814-Aug-16 19:57 
AnswerRe: about InProcessBus`s generic method -- RegisterHandler Pin
Sacha Barber14-Aug-16 21:32
Sacha Barber14-Aug-16 21:32 
GeneralRe: about InProcessBus`s generic method -- RegisterHandler Pin
Member 1025638814-Aug-16 21:56
Member 1025638814-Aug-16 21:56 
GeneralRe: about InProcessBus`s generic method -- RegisterHandler Pin
Sacha Barber14-Aug-16 22:04
Sacha Barber14-Aug-16 22:04 
PraiseSplendid Article! Pin
User 2710097-Aug-16 8:57
User 2710097-Aug-16 8:57 
GeneralRe: Splendid Article! Pin
Sacha Barber8-Aug-16 5:06
Sacha Barber8-Aug-16 5:06 
GeneralRe: Splendid Article! Pin
User 27100914-Aug-16 10:45
User 27100914-Aug-16 10:45 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.