Saturday, 19 November 2011

Event Store Methods - part 2 of 2 (guest post)

This is the second and concluding part of Colleague C's account of his experience using an Event Store within a system designed on the Command Query Responsibility Segregation (CQRS) pattern. All text references to actual projects and namespaces have been removed, leaving only the code snippets essential to the general discussion.

Previously: Aggregates, Entities and Events.

Applying and Replaying Events

Now I can finally get to the interesting implementation: how events are applied and replayed. It's important to understand how this happens for events applied far down the object graph, distant from the root. Most examples you’ll see online don’t go beyond the root level, and really lack infrastructure to propagate events down to the entities where they must be applied. Our implementation has been based around how the Ncqrs Framework handles these scenarios.

I’m going to detail how this is done, by going through the commands used for creating a new workflow, a stage for that workflow, and a tasklist for that stage. Let’s first take a look at the command handler for creating a new Workflow, which is pretty straightforward:

To take a look at how the events are being applied, we’ll need to have a look at the code inside that CreateWorkFlow factory method:

You can see here that some basic business logic is performed before the event is applied. Our little example application unfortunately isn’t very interesting, but it illustrates the point. When we make a call to that private constructor, before we reach that code, a call is made to the constructor of the AggregateRoot base class. Let’s have a look at that constructor:

The interesting bit is the mapping strategy. It alleviates the need to write code manually for registering event handlers, which are just methods on the aggregate root or entity. The mapping strategy implements an interface, so it would be possible to define other strategies, and you could have different aggregate root base classes using different strategies (rather that than inject the mapping strategy in; I prefer to keep the domain clear of constructor injection). You’ll notice that you pass a reference to the instance of the aggregate root into the method.

Let’s take a look at that method on the mapping strategy:

First, we get all the public and private instance methods on the target object. We then run a LINQ query to filter those methods to the ones starting with “On”, and having a single parameter implementing IEvent.

After that, we loop through each of those methods and create a delegate to call each, with a reference to the target object. This is important for the entity-based event handlers; if you have a set of children, you need to be able to handle an event on the correct child (we’ll come to this a bit later). The method is then wrapped inside an event handler, which has some logic to determine whether a particular handler can handle an event that it’s been passed. Finally, the list of handlers is returned to the aggregate root constructor, and the aggregate root adds each to its event handler collection.

In the case of the Workflow instance, a couple of event handler methods will be found:

We now come out of the base class constructor, and enter the constructor of the Workflow instance:

We can see that we’re now going to attempt to apply the WorkflowCreatedEvent to this instance, which takes us back to the AggregateRoot base class, since the Apply method is a protected method on the base. Let’s take a look at that:

Since we’re just creating the workflow at this stage, this call to GetNewEventVersion() is going to return 1. We then take a copy of the current list of event handlers - at this point in time, a couple of handlers. Why take a copy? During these event handler calls, other event handlers can be added to the aggregate root’s collection. Don’t be alarmed by this, it will make sense shortly! If you didn’t take a copy, you’d get an exception thrown, as you can't add items to a collection that you’re iterating over.

Anyway, we want to apply a WorkFlowCreatedEvent. We’re going to loop over all the handlers and figure out which one should handle this event. Remember that the event handlers are actually inside a wrapper, so the call to HandleEvent here will be a call into this wrapper method. It's a really simple method, and can be found by going to the EventHandler class:

Again, this is pretty simple. We figure out if this handler is to handle this event by checking that the type of event passed in is the same type as the first parameter on the handler (this type was passed in when we created the event handler in the mapping strategy). If it is, we’ll call the handler, passing in our event to be applied. Remember that the handler has a reference to a target, which in this case happens to be the Workflow instance.

As a result, we’ll end up calling the OnWorkflowCreated method on the Workflow instance, which will set its ID and title. After that, we’ll return back to the base Apply method. You’ll notice that we assign the AggregateId of the event in the base class. We need to do that after the event application, because in the case of an event that creates an aggregate, the ID would still be empty if we attempted to assign it before the event had been applied. It would also be possible to assign the AggregateId in the event handler, but it’s preferable to have that behaviour in the base class so it’s not forgotten. Finally, we add the applied event to the list of uncommitted events, get back to the private constructor that called the Apply method, then return to the static CreateWorkflow method, which will return our new instance back to our command handler.

The only thing that we need to do now to complete this command is to use the repository to save the Workflow that we’ve just created. Or more correctly, to save the WorkflowCreatedEvent we’ve just generated. The repository doesn’t do much. It has access to an IEventStore interface with a Store method to commit events. The aggregate root implements an IEventProvider interface, exposing a method to get the uncommitted changes, so this event provider is passed in through the Store method.

Here is a sample implementation of the Store method, using SQL Server based storage for the event store:

You can see it’s actually fairly straight forward. There is a concurrency check, that the current version of the aggregate root you’re about to save matches that of the stored one. Otherwise, someone has updated the same aggregate root during the time in which we’ve tried to apply other events. If everything is OK with respect to concurrency, each event is saved and published. The publish method will put the event on a queue without blocking, until it has been processed. Then, the version number is updated on the aggregate root, and updated in the Aggregates table. In the case of the exception there, I think that that exception should probably be wrapped up in some sort of EventStoreSaveException. That exception would then bubble up to the point where the command handler is being called, and you could deal with it in whatever way is appropriate.

With all that done, the command has now been handled successfully.

The next command will demonstrate how event handlers are registered on ‘child’ entities:

It’s worth taking a quick look at restoring the Workflow. Here’s the implementation of that repository method:

When the aggregate constructor is called there, we go through the same process as previously to register all the event handlers. After that, the previous events related to that aggregate are fetched from the event store, using a simple SELECT SerializedEvent FROM Events WHERE AggregateId = Id query. These events are then applied to restore the object to its current state. You call exactly the same Apply method already discussed. The only difference: when the object is restored, the applied events collection gets cleared at the end of the process, as you’re only interested in saving subsequently applied events.

So now that we have our workflow restored, we want to add a stage to it. If you take a look at the AddStage method, after performing a little bit of uninteresting business logic, we’ll apply a WorkflowStageAddedEvent. The event handler for this was registered when the aggregate root was being constructed, so we’ll find the event handler based on the type of the event. Eventually, this simple handler will be called:

This is obviously an extremely simple method. What's interesting is what goes on in the construction of the Stage class, which inherits from the base Entity class. You can see we’re passing a reference to the ‘parent’ aggregate root via the “this” keyword. The constructor for the Stage itself is pretty simple, the interesting work taking place in the base constructor:

It’s important for the entity to have a reference to its aggregate root. The event handlers for the entity get registered at the aggregate root level, because events are applied and saved at that level. We can see here that the exact same mapping strategy is used, as with the aggregate root. So, the same mechanism for finding event handler methods on the aggregate root is used for the Stage. Notice there that the reference passed into the mapping strategy is a reference to the current entity, rather than the aggregate root. This means any event handlers found are registered for this particular entity. In the case of the stage, there's one event handler method, OnWorkflowStageTaskListAdded. However, if the workflow had 6 different stages, you’d have 6 different event handlers - one for each stage instance.

This is why it’s key for the event handler to store a reference to the instance on which to call the method. If you remember back to the process of applying events, the aggregate root goes through its registered handlers, figuring out what handler to use by matching up the type of the received event with the type of the first parameter on the registered handler. The process is just slightly different for events on entities, which is why you put the event handler in a wrapper. Here’s the code that figures out whether the event gets handled in the case of the entity:

As you can see, there is actually a different type for entity based events. The difference is that the entity event exposes an EntityId property, which it uses to make a comparison. Remember, this method is being called from the context of an aggregate root. That’s why you have the first null check; non-entity-based events could be received by this method, in which case you'd return straight away, since you obviously can’t handle those.

The second check, to see whether we should use this handler for this event, is a comparison of the ID of the entity with the ID that got stored with the event. Again, if this test fails, we return; we need the correct instance upon which to apply the event. After passing all checks, we call into the same event handler code that the aggregate roots do, and this will just match up the parameter types.

So, for each handler found on the entity, a wrapper is created and registered with the aggregate root, rather than the entity. While these entities are being created, their handlers are registered. So when the item is being restored from the event store, and a WorkflowStageAdded event is encountered, this will create a new Stage and register its event handlers. This works no matter far down the object graph you go in terms of descendants: you will never encounter a WorkflowStageAdded event before a WorkflowCreated event, if everything has been versioned correctly.

When that Stage was created there, one event handler got registered - OnWorkflowStageTaskListAdded. This would be applied when you called the AddTaskList method on the Stage class.

I guess there's no necessity to walk through the essentially identical command logic to create a TaskList on a Stage. However, there is a difference in the event application on the entities, which is nicely handled in the base class:

The entity ID gets set. Again, it would be possible to do this in the event handler, but it’s easier to ensure it takes place on the base class. The important part again is that the actual application of the event takes place on the aggregate root, where the handler is registered. So, the event will be applied to the entity, then saved at the aggregate root level, and committed to the event store.

~ END ~

No comments:

Post a Comment