Some More Lessons Learned in Event Driven Architectures

This post has been sitting in my drafts for about a year, its about time that I push this out! Grab a coffee (or three) if you intend to read it! ☕☕☕

About 2 years ago I wrote a post about how we architected an event driven system at work in the Supply Planning & Stock Management domain that I am a part of, to respond to purchase order changes, stock deliveries, stock mutations etc in near-real time. I noted several lessons we learned at the time not only from a implementation perspective but also in terms of observability, testability, maintainability perspectives. In this post I present some finer grained lessons we’ve learned (a. k. a burn marks) since then having solved some foundational problems that surfaced due to not paying enough attention to the failure modes inherent in EDAs, upfront.

I will continue where the previous post left off:

Lesson 8: Event ordering matters for events that carry full state/partial state

Message ordering in distributed systems is a very complex and deceptive subject in and of itself and you can go as deep as you want, infact, I recommend it in order (no pun) to appreciate and learn it better. I won’t be going too deep here though!

One of the events we subscribed to was a PurchaseOrderChanged event that looked like this:

public class PurchaseOrderChanges
{
// other attributes not relevant for this
PurchaseOrderLineChange[] PurchaseOrderLineChanges;
}
public class PurchaseOrderLineChange
{
int PurchaseOrderLineId;
int? QuantityCancelled;
DateTime? ExpectedDeliveryDate;
int ResultingVersion;
}

If the date on the purchase order line changed, the ExpectedDeliveryDate property will contain a non-null, non-default value, else it will be null. If some quantity on the order was cancelled the QuantityCancelled property would be non-null, non-default value, else it will be null. Each change was also accompanied by an updated version number stamped by the producer service. For all intents and purposes these could well have been two separate events but the problems are still ever present, only slightly more observable.

We used standard SQS (not FIFO) as a message broker which meant that the messages could arrive in any order and any number of times. Even if the messages are delivered (i.e. stored in the message broker and received by the consumers) in the right order, the consumers could still end up processing them in a different order due to a variety of environmental factors for e.g. GC pauses, thread interrupts & rescheduling, subtle timing issues etc. All of this gets problematic in a hurry! Consider the following scenario:

The starting state of the entity in our service database is E0 and then in the source system a user changes the entity twice. First action results in event E1 and shortly afterwards they execute another action, resulting in event E2. Due to the nature of the message broker and various other factors like queueing delays and queue depth etc, let’s say E2 gets delivered to the processor instance P2 first and it starts to process the event by retrieving the current state E0 from the database, mutating the entity using information from the event and then suffers a GC induced delay. Meanwhile, processor P1 receives the event E1 and does the same thing however, finishes before P2’s GC pause is over and updates the entity to E1. Now P2 resumes its processing, unaware of the fact that the entity has been updated in the meantime and overwrites the state to E2.

The events seemingly were processed in the right temporal order i.e. E1 followed by E2 however the outcome was still not correct, notice the Qty and Version fields are correct but the Date field has the wrong i.e. an older value.

Another scenario, this time P1 hits a GC delay and P2 races to completion followed by P1 resulting in Date and Qty being correct but Version being wrong (notice the events were actually delivered in the correct order by the broker):

The happy path we want is this instead:

No matter what order the events get delivered in and processed, the entity state should transition in the right logical order i.e. we effectively want both events to be applied but in the right order. We can even skip/discard E1 and go straight to E2 if we can be sure enough that the event contains the current state of the entity as opposed to only changes. In order to determine that , we can look at the stamped version on the event (Vevent) and compare it to the one we pull out from the database (Vdb) and if Vdb > Vevent, then we can discard the event because we already have the latest version, otherwise, apply the event by updating the entity and the version in the database.

So we asked the producing team to publish the entity state in the event payload instead of just changes and changed the code on processing side to ignore out of order events (essential code shown in the gist below).

public async Task Handle(PurchaseOrderLineChange poChangeEvent)
{
var currentPol = await repository.GetBy(poChangeEvent.PurchaseOrderLineId);
if (currentPol.Version <= poChangeEvent.Version)
{
currentPol.ApplyChanges(poChangeEvent);
await repository.Update(currentPol);
}
}
view raw HandleEvent.cs hosted with ❤ by GitHub

Alternative solutions could have been:

  • Use FIFO message brokers but that would have necessitated architectural changes at both producer and consumer levels which would have meant more time and effort. Plus, there are edge case considerations to be made even with FIFO SQS. You can’t simply throw advanced infrastructure at a bad solution design and expect the problems to go away magically!
  • Instead of using the Event Carried State Transfer pattern for purchase order changes, use the Notification pattern instead (see Martin Fowler) where the producer only publishes the identity of the entity that has changed. The consumers can then go and retrieve the current state of the entity by id from the producer API independently. This could actually be beneficial when the events are sparse and infrequent (such as in this case) otherwise, it could lead to the source API getting bombarded with requests for the same entity which can easily overwhelm it.

    This means that producer API has to be designed from the beginning to be able handle this onslaught of requests which can add more architectural complexity and costs. We’d still need the version check to ensure that the state hasn’t changed in the meantime.

    This solution was a latent realisation and by then our initial solution had already been working so we didn’t feel a strong reason to change it, however, it will certainly be a consideration the next time we are looking to handle change events of any kind.

Lesson 9: Idempotency is key with events that carry deltas

One of the other event types in our system was a “stock level change” event that looked something like this:

public class ProductStockChanged
{
int ProductId;
int Quantity;
bool IsStockRaised;
int PurchaseOrderLineId;
}

On any given day, the stock in our central warehouse for products can go up and down about tens of thousands of times a day. New stock deliveries, customer purchases, stock damages, internal stock relocations etc are just some of the reasons that trigger these changes. So if stock was raised, then the boolean flag in the event schema above, will be true and the quantity will be added to the product’s current stock quantity, else it will be subtracted from it. If the stock increment happened as a result of stock delivery, then the PurchaseOrderLineId attribute will contain the id of the line against which the delivery happened so we can update our database accordingly, else it will be defaulted to -1 and the system will just ignore that.

The message broker we use for these events is Oracle Advanced Queues which is essentially an Oracle database table that’s been tarted up (by Oracle) to look and behave like a queue. The nice side benefit is transactional message processing (if your application can leverage that benefit…ours didn’t fully):

public async Task ProcessMessages()
{
var message = default(AcknowledgeableMessage)
try
{
var message = await GetMessage();
// process the message here
message.Acknowledge(); //a.k.a. transaction.Commit()
}
catch(Exception ex)
{
message.Abandon();
}
}
public class AcknowledgeableMessage
{
private IDbTransaction _transaction;
public string Id {get;}
//...other properties
public AcknowledgeableMessage(IDbTransaction transaction, string messageId,...)
{
_transaction = transaction;
Id = messageId;
// map other properties
}
public void Acknowledge()
{
_transaction?.Commit();
}
public void Abandon()
{
_transaction?.Rollback();
}
}
view raw TrxOAQ.cs hosted with ❤ by GitHub

Our pending deliveries recommendation system listens for these changes and syncs the stock levels of products in its local database which helps keep the recommendations up to date. The code for this handler looks something like this:

public async Task Handle(ProductStockChanged stockChangedEvent)
{
var product = await productRepository.GetBy(stockChangedEvent.ProductId);
var purchaseOrderLine = await purchaseOrderLineRepository.GetBy(stockChangedEvent.PurchaseOrderLineId);
// validation...
if (stockChangedEvent.IsStockRaised)
{
// OOPSIES ⬇️
product.Stock += stockChangedEvent.Quantity;
purchaseOrderLine.QuantityDelivered += stockChangedEvent.Quantity;
await purchaseOrderLineRepository.Update(purchaseOrderLine);
await productRepository.Save(product);
}
else
product.Stock -= stockChangedEvent.Quantity; // stock decrements don't affect purchase deliveries
// trigger a use case that depends on updated product and purchase order line
}

The problem here was that message processing was not idempotent at all, consider a scenario where, the handler successfully updates product stock (and the delivered quantity) in the local database but then the use case execution later on throws an exception. The overall message processing code is surrounded by a try...catch (as seen in the gist preceding the one above) so the message will be abandoned i.e. the queue transaction will rollback and the message will be redelivered. Handler being none the wiser about any previous attempts to process this event, will gladly apply the same stock change to the entities again resulting in incorrect stock values which will make the recommendations go all out of whack! If this happens a bunch of times during the day (and it can due to transient faults) you can basically drop any hopes of having anywhere near consistent and correct stock figures for the product.

The culprit? Lack of idempotency! Idempotency is the attribute of an operation that allows that operation to be executed multiple times with the side-effect only resulting once. For e.g. assigning a value to a variable is by design an idempotent operation, once the value is set no matter how many times that assignment is executed its value will remain the same. The same can’t be said about an increment operation i.e. x += 1 though🙂. In message driven architectures, this can usually be achieved fairly straightforwardly by deduplication i.e. the consumer keeps a list of previously processed messages and every time it receives a message, it checks this list to see if its “seen” this message before, if yes, then ignores the message otherwise, processes the messages and stores it for future reference.

Depending on what you call processing a message in your system and whether or not logging event in the history is transactional with the rest of this processing, there can be edge cases with de-dup as well. For e.g. if you update your event history in a different transaction to your actual processing, then a failure there could result in message/data loss effectively. Why? because the message will not be acknowledged & would be redelivered but the system will say, "I've already seen this message, so I am going to discard it!". My recommendation here is to keep the processing short and within the same database transaction as the event history update, if possible. Do the majority of the business processing of the message later. Better yet, design your domain operations to be truly idempotent instead, then you don't need to deduplicate. Simplifies life!

In our troubled system, this meant adding a nested transaction around: product updates, purchase order line updates and storing the message id in “seen events” list and before processing the incoming message, checking to see if we’ve already “seen” this message before and skipping it if we have:

public async Task Process()
{
var message = default(AcknowledgeableMessage)
try
{
// Outer transaction A
var message = await GetMessage();
if (!seenMessageList.Exists(message.Id))
{
// Inner transaction B
using(var unitOfWork = new TransactionScope(...))
{
await UpdateProduct(...);
await UpdatePurchaseOrderLine(...);
await MarkEventAsSeen(...);
unitOfWork.Complete();
}
// trigger domain use case for updating recommendations
}
message.Acknowledge(); //a.k.a. transaction.Commit()
}
catch(Exception ex)
{
message.Abandon();
}
}
view raw NestedTrx.cs hosted with ❤ by GitHub

Now, if the transaction B fails (i.e. throws an exception) the message will be redelivered, it will hopefully be processed correctly and the “seen events list” will be updated. If transaction B succeeds but transaction A fails (i.e. throws an exception), a retry will not result in the the same message being processed multiple times since we’ve already processed this message before. The message can then simply be ack-ed and removed from the queue!

NB: for brevity I have shown both transactions in the same method, in reality transaction B is in the domain layer and that's why it uses the TransactionScope API in .NET so as to enlist operations against 3 different database tables defined in a different layer in the same ambient transaction i.e. a unit of work.

After this change was put in place, the stock levels in the recommendations became more or less consistent and whilst its hard to achieve full consistency in a distributed EDA system, we’ve not had another user report inconsistent stock levels for over 6 months (we do monitor this a bit more proactively than a user shouting!).

In order to make sure that this event history doesn’t grow unbounded, we added a CRON job in the service using Hangfire to clean up the list at a time of the day when the warehouse activities are at their lowest because that will mean shallower queue depth with low waiting times therefore higher chance of messages being processed quickly. We also chose this time because any duplicate messages will have already arrived in the intervening hours and handled accordingly, so a really late duplicate message that seeps into the system post-clean up, is very unlikely. Our current message processing throughput makes sure to keep the queue depth at an average of less than 5 and that’s worked out well so far.

Lesson 10: Shorten database transactions to avoid lock contentions and deadlocks

After fixing number 9, we noticed our database transactions were deadlocking frequently, turns out the way the entity updates were applied to the product was causing lock escalation in the MySql database which contended with other lock requiring transactions leading to deadlocks (and on occasions lock wait timeouts). Full resolution of this problem is captured in this post. TLDR; keep database transactions as short as possible and pay attention to transaction isolation levels that your SQL statements are actually using. If using MySql enable printing all deadlocks in the MySql logs that can help pin-point which transactions are contending for locks.

Lesson 11 Protect user actions from being overwritten & understand natural data partitions in the domain

In an event driven system like ours, its likely that a user acts upon slightly stale information in the application because an external event a short while later would have updated the state. The problem is we don’t know when that event will occur so we cannot block user actions indefinitely but have to accept that command and handle any “mismatches” in the backend. For e.g. we allowed the user action to proceed only if the recommendation that the user is trying to handle has not already been removed by the system (may be it was not needed anymore due to reality changing) or has not already been handled.

Another problem we had was a race condition here where shortly after the user action was applied to a perfectly up to date recommendation, an unrelated event reverted the user action for no good reason. As a consequence, this user’s change request was excluded from the e-mails we send to suppliers.

Given the fact that we have no way of knowing beforehand when the reality will change, we looked at how many times this kind of issue had happened in the past and spoke to our domain folks where we learnt that purchase orders are usually partitioned by single supply planner meaning only one person at a time should work on it. In exceptional circumstances, an admin type user might go and override something in the system simultaneously but then they need to talk to each other (and the supplier) and resolve it manually. Our logging largely corroborated the domain feedback and this meant that we could prioritise user actions over change events and stop the events from updating the recommendations if they were already handled by a user. So we did!

Of course, in an e-commerce system that has been built over the span of 20 years, there are always going to be back channel updates, people with write access to tables, hidden triggers added 10 years ago to just “shimmy” the record , weird stored procedures that update tables they really shouldn’t etc, so there is always going to be these fringe cases that will throw a spanner in our domain model works but for maybe 90% of the cases, what we put in place works. We are also working on shutting down some of these back channels, more on this in a future post!

Lesson 12 Do not be tempted by distributed transactions across disparate systems that cannot guarantee atomicity

When the application requirements dictated that upon a button click in the frontend, the application should: Update recommendation, update the e-commerce database and send e-mail to suppliers all within the same flow, we knew that a distributed “transaction” like that will have absolutely zero guarantee of atomicity and full consistency. Each of these operations could fail independently without a realistic chance of a rollback (what if the rollback itself fails?).

Yes, I know Sagas is a cool-kid pattern to solve issues like this but :

a) you (i.e. the organisation) needs to be at a certain level of maturity with event driven communication to successfully implement it and do so well

b) it adds a non-trivial amount of architectural complexity

c) depending on whether you use choreography or orchestration, you could still have a single point of failure which you need to design for! Bottomline, sometimes you will have to draft in human help, no getting away from it.

We spoke to the domain folks about sending e-mails asynchronously i.e. not right when the user clicks the button but at a pre-determined timeslot later on in the day. The questions we asked were, “why do we want to send e-mails right there and then?” or “what part of our business process is at risk of breaking if we don’t send the e-mail right away but later, and how?“. These questions were enough for them to reflect a bit deeply on the matter and they came to the conclusion that we could send e-mails a bit later on in the day and that would be absolutely fine. This helped us reduce code and business process complexity quite a bit.

Next thing we tackled was dealing with failures from the e-commerce database during user action. Since this database was fronted by a thin web API layer, we could handle the HTTP 409 Conflict to deal with concurrency issues and essentially just keep retrying with the updated version if nothing critical had changed, otherwise, we discard user action and put the purchase order change back on the queue to be re-reviewed. This is a complex bit of code to maintain, understand and test but it does solve almost all of our conflicts. We’re looking at ways to decouple ourselves from the main e-commerce database for this part altogether such that this kind of interaction will no longer be necessary since the microservice responsible for it will maintain its own storage, but we are not there yet. Subject for a different post!

We use Polly retries to minimise business process failures due to transient faults in the central database interaction path but we still have a failure mode yet to address: what happens when, despite retries, this interaction fails? Currently we cannot submit those purchase order changes to suppliers, but this could be handled by looking at the error code and for anything other than BadRequest, manually trigger a retry when the error condition has been resolved. For BadRequests, we’ll need to investigate in our logs to see what business constraint was violated and why, the current design has the accidental benefit of not sending anything to suppliers in case of any failures which is a more preferred alternative over propagating that inconsistency to external systems which we don’t control.

Conclusion

In software architecture (event driven or not) everything is a trade-off, there are no free lunches. The problems, solutions and lessons I have highlighted here are very specific to our context and may not necessarily translate into a different context but the general guidelines whilst architecting such a system still hold.

Discussions around idempotency, concurrency, partial transactional failures, event ordering have to be had from day one and overtly optimistic perspectives must be discouraged. You may not have to address any or all of them, but at least you have taken time to talk about these which is a lot cheaper and less risky. Retro-fitting these solutions can be a P-A-I-N because it increases the risks and costs even more as we build on top of code that was written always assuming happy path. If you can think up a failure scenario, you can rest assured it will happen, at the very least have a plan B ready. If you can’t think up a scenario or just don’t know, go with the simplest design possible, observe and evolve.

You must also involve the domain in these discussions because the real solutions to these problems are rarely technical. If the domain is ok not addressing these in-system but handling them out of band and accepting that bit of risk, then not doing much about them is also an acceptable solution. Document these decisions in your ADRs and move on. There are patterns that can help achieve better consistency, fault tolerance but they all come with added architectural complexity, higher financial costs and in some cases, reduced availability. Pick what’s important for your domain use case, make pragmatic architectural decisions, measure your systems and ignore dogma!

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.