Home Consistent message delivery with Transactional Outbox Pattern
Post
Cancel

Consistent message delivery with Transactional Outbox Pattern


After a long run with this fun series of articles walking through a pet eCommerce project, I wanted to keep that as concise as possible. Still, there are subjects worth dedicating articles in more detail. So here I’ll talk about a pattern called Transactional Outbox that helps solve a known challenge when using events for exchanging data between microservices.


Sh** happens!


Everything works nicely until we have a problem that could prevent us from ensuring the operation’s atomicity. For example, although Kafka is robust and reliable while running, the service can be down for any reason, making it impossible to have the message queued; consequently, it can’t be delivered. This inconsistency is known as dual-write problem, and it’s fairly simple to overcome it and ensure the message will be delivered at least once by implementing a pattern known as Outbox.

With a small step, you can go further.

Simply saying, in a scenario where we’re basing on events within a given transaction, and these events must be moved along for further processing, the pattern consists of persisting them into an outbox table rather than directly publishing the message to the message broker. The event will then be stored in this outbox table, but it’s essential that everything happens under the same process transaction that changes the aggregate, which causes the event we want to fan out. That way, everything is simultaneously. If one of the counterparts fails, nothing should be partially committed. It’s all or nothing.

In another process, a background worker will fetch all **unprocessed ** messages and only then publish them to the message broker. Anything else remains the same with the Kafka consumer worker, and so on.

The OutboxMessages table

I built a simple structure with the following fields to demonstrate a basic structure for the outbox table:

1
2
3
4
5
6
7
Column        |          Type          | Modifiers
--------------+------------------------+-----------
Id            | GUID                   | not null
Payload       | jsonb                  | not null
Type          | character varying      | not null
OcurredAt     | datetime               | not null
ProcessedAt   | datetime               | null


Producing messages


Alright, let’s play around with this naive implementation of a bug management system. The goal is to generate a Ticket to solve a bug, which can contain a title, description, and priority. Go to the Domain/Outbox_101.Domain/Tickets, and you will find the Ticket aggregate root, which has a few actions allowing it to change its status. Every time the ticket is created, the initial status is Open, and it causes a TicketCreated domain event.

The same idea goes for the other methods putting the ticket as InProgress and Closed.

Also, there are two ConsoleApps that will start running simultaneously; Outbox_101.EventProducer and Outbox_101.EventConsumer. You can run everything in docker using:

1
docker-compose up

Then, the producer will start calling a static procedure with the following code:

The code above does the following steps:

  • A Ticket is built and causes a domain event.
  • Ticket is added to the Tickets table through UnitOfWork.Tickets repository.
  • SaveChangesAsync is called to commit the whole transaction.

The code above does nothing more than get every uncommitted domain event from the currently tracked aggregates (in this case, Ticket). The OutboxUncommitedEvents() extension method builds an OutboxMessage object for each. Ultimately, the _dbContext.SaveChangesAsync() commits the whole transaction, which includes Tickets and their respective outboxed domain events. Having this ultimate logic in the SaveChangesAsync is not part of the pattern, but certainly, avoid repeating this operation every time.

Processing outbox messages

We can use a background service to process the messages saved in the database. Look at Infrastructure/Outbox_101.Infrastructure.Workers/Outbox/PollingOutboxMessageProcessingWorker.cs. With the help of the OutboxMessageProcessor service; it can fetch a batch of unprocessed messages within a defined interval; moreover, it will publish the message to Kafka and finally set the message as processed so that it won’t be processed any longer. You must ensure that, for obvious reasons of not propagating it again.


Consuming messages


The final component is the EventConsumer application, which also hosts a background service implementing a Kafka consumer, configured to read the subscribed Tickets topic, consume its messages, deserialize the payload, and publish them using the IEventDispatcher service. This dispatcher was implemented using Mediator, which provides the INotification interface where I based the EventBase class inheriting from, so when the notification is published, it will be handled by the TicketHandler subscribed to it.

The result of the whole flow is what you can see below:

At this point, we achieved the goal. Messages were delivered at least once, and you can experiment with shutting the Kafka container down before sending some more new messages to turn it on later. With this logic, once the broker is up and running again, it can catch up with unprocessed messages from the outbox table to be consumed further.


Change Data Capture (CDC) with Debezium


What if we could do the same thing without constantly polling the database? What if, once the event is inserted into the OutboxMessages table, the message can be published to Kafka immediately? Meet Debezium, an implementation of the Change Data Capture (CDC) pattern:


The Kafka Connect PostgreSQL Change Data Capture (CDC) Source connector (Debezium) for Confluent Cloud can obtain a snapshot of the existing data in a PostgreSQL database and then monitor and record all subsequent row-level changes to that data. The connector supports Avro, JSON Schema, Protobuf, or JSON (schemaless) output data formats. All of the events for each table are recorded in a separate Apache Kafka® topic. The events can then be easily consumed by applications and services.


The description above is self-explanatory. Debezium will free you from writing all that boilerplate for fetching, processing, and publishing messages. I’m still experimenting with this technology, but I wanted to show you an alternative to database polling. You can read and learn more from their documentation. for more detail.

Like everything in this project, the environment for this is defined in the docker-compose.yml file.

NOTE: it’s important to mention that I had to configure Postgres to use wal_level=logical, as explained here.

Connector configuration

If you check Docker, you will notice the connect container running on *PORT 8083, but it’s not configured yet. We need to configure the Outbox Event Router with Debezium. Read more about each configuration parameter in their documentation. You need just to PUT a connector config to http://localhost:8083/connectors/{connector-name}/config

If you’re already running the project, let’s start over with a few changes. You can skip this step otherwise.

  • 1: First stop the containers with the following command:
    1
    
    docker-compose down
    
  • 2: Delete all containers
    1
    
    docker rm -f $(docker ps -a -q)
    

    I already put the JSON body with the configuration I used in the Outbox_101.EventProducerdebezium-ticket-connector.json, so the only thing you need to do is go to Outbox_101.EventProducer/Program.cs host configuration, there’s the AddOutboxSetup extension method with an optional parameter to start another background service that will PUT the configuration for Debezium instead of the workers we have used so far. All you need to do is set it to true and rerun the containers.

1
2
3
4
services
    .AddHttpClient()
    .AddPersistence(configuration)
    .AddOutboxSetup(configuration, true);

Everything should work like before, except if checking the OutboxMessages table, the ProcessedAt field will now be null because it’s no longer managed by the OutboxMessageProcessor, but now by Debezium. This project is just a for outbox, so I used the same model interchangeably, although it’s different from what you’ll do in a real-life scenario.

I’m not using a schema registry here, but you can check this documentation to learn more about Avro serialization.


Final thoughts


This article showed a basic implementation of the Outbox pattern and the problem it intends to solve to guarantee message delivery between applications using events to exchange data integrally. Bear in mind that you can use any of the many message brokers available to stream messages once the pattern is not technology-specific; however, using CDC can have more limited support and require extra configuration in the database and message streamer.


Check the project on GitHub



This post is licensed under CC BY 4.0 by the author.