Introduction

A fanout service is a service that processes events in a system and for each event it broadcasts messages to multiple destinations. For each event, it might need to first figure out who the recipients are, what needs to be broadcasted and then perform a large number of network operations to deliver messages to destinations. For example, a social media notification service might need to send notifications to millions of users.


Scope

A complex system that works is invariably found to have evolved from a simple system that worked. A complex system designed from scratch never works and cannot be patched up to make it work. You have to start over with a working simple system — Gall’s Law

We’ll define a clear scope for our service to keep our design grounded.

  • The service will provide a way to register events
  • The service will guarantee at least once delivery to the recipients
  • The service will allow defining functions to get_recipients, generate_broadcast_message, deliver_to_recipient for an event
  • For each event, the service will deliver messages to recipients
  • The service should be linearly scalable(more compute = more throughput) and should be able to support millions of events and millions of deliveries per event (this is a bit hand wavy but it’s fine as we’re dealing at an abstract level and the real design will take into account the actual use case)

Challenges

  • For each event, we need to do heavy/uneven processing for figuring out who the recipients are:
    • An event might need to fan-out to millions of recipients, whereas another event just needs to fan out to a couple of 100s of recipients
    • Figuring out who the recipients are can also take a long time, as we might want to employ a complex logic/query to decide who to fan-out to
  • Once we know the recipients, we might need to do some/large numbers of network calls to actually deliver the message

Key Observation

The delivery of messages(deliver_to_recipient) is mostly network I/O and uniform load(each message is equally expensive regardless of the event or the recipient), whereas the event processing(get_recipients and generate_broadcast_message) is event dependent. In most cases, many will be inexpensive compared to some expensive ones.

From this observation, we’re better off separating the event processing from the actual delivery. There are many benefits to this:

  • Different scaling needs: It’s easier to reason about how each workload has separate scaling needs(and even different hardware needs: better network chips for delivery and event processing will work with mediocre network cards).
    Apart from this, event processing will need scaling when we have more events or events taking more time to process, whereas delivery will need scaling when we have more messages to deliver.
  • Separation of responsibility: By separating the responsibility of event processing and delivery, we make the system easier to reason about and we can build each component to do one thing well.

Another important thing is to make event processing throughput scalable and non-blocking — An event which needs more processing time shouldn’t block other events from getting processed and we should be able to scale event processing throughput by adding more compute.

With this in mind, let’s design a super-cool fan-out system.


The Architecture

Kafka as an event bus

  • Kafka will allow the publishers to publish events to a Kafka topic
  • An event consumer group will consume those events and put the events inside a distributed queue(like SQS) for the actual processing
  • This makes consuming very fast. Consumers can process a large number of events. Additionally, we can create partitions for parallel processing

Q: Why use Kafka?
A: Using Kafka for event streaming has several advantages:

  1. High throughput event publishing due to append only log. This will allow us to create large numbers of events
  2. Durable events, Kafka stores events on disk, making it easier to replay, debug failed events
  3. Partition count as a parallelisation factor: By increasing partitions for a topic, we can scale event publishing and event consuming

Q: Why is processing event by event consumer group a bad idea?
A: Inside a Kafka topic, the parallelism is limited by the number of partitions a topic has. Along with that, if by random luck all consumers inside the group got assigned a heavy event with millions of recipients, all other events will starve until the heavy events are processed.

Event processing via Event queue and Fanout workers

The Event queue allowed us to separate event processing from event consuming and scale them independently.

When events are inside the Event queue, we can have Fanout workers poll events and do event processing(get_recipients, generate_broadcast_message).

The Fanout workers will put messages for each recipient inside the delivery queue. Once messages for each recipient are enqueued, the worker will acknowledge that the event is processed and the queue will delete the message.

If the worker fails mid-processing, the queue will retry the event again for reprocessing. More on this is covered in the failure handling section.

To increase event processing throughput, we can simply scale the Fanout workers.

Message Delivery via Delivery queues and Delivery workers

The Delivery queue allowed us to separate the event processing from the message delivery and scale them separately.

Delivery workers will dequeue the delivery queue and make a network call to deliver the message to recipients(deliver_to_recipient). Once delivery is done, they will acknowledge processing with the delivery queue.

Dequeuing in batches will be even more efficient as delivery workers can take advantage of asynchronous I/O.

It might be reasonable to have multiple delivery queues with different priorities, which will allow us to scale delivery workers for each delivery queue independently. For example, we might want to deliver some messages quickly (high priority) whereas we are okay if some messages are delayed (low priority).

Q: Why use queues?
A: Queues allow us to separate task creation/acceptation from task execution. Here, the event queue separated the event consumption(via consumer group) from event processing(via Fanout worker). Similarly, the delivery queue separated the message creation(via Fanout worker) from the actual delivery(via delivery worker).

This separation allows us to separate different types of work and scale them independently.


Failure handling

Everybody has a plan until they get punched in the mouth. — Mike Tyson

In this section, I use the term message to refer to an object inside a queue — For the event queue, message is actually an event, whereas for the delivery queue, message refers to the “actual message that needs to be delivered”.

Why are duplicate messages inevitable?

Kafka provides acknowledgement-based consumption, i.e., once the consumer processes an event, it must send an acknowledgement to Kafka. This allows Kafka to retry the event if a consumer has failed to acknowledge that event. This makes sure no event is lost in case of consumer failure.

Although now there is another issue: if a consumer dies after processing the event but before acknowledging, Kafka has no way of knowing that the event was processed, and it will retry the event again, resulting in duplicate messages inside the event queue.

It is not hard to imagine how the delivery queue can also end up with duplicate messages — The fanout worker fails during processing, leading to a partial failure. When the event is retried, it will result in duplicate messages inside the delivery queue.

Let’s imagine we store somewhere ‘if a message was added to the queue’, then before enqueuing, we check if a message was already added to a queue. If a message is already added, we can just skip it. Voilà, no duplicate events — Let’s not get ahead of ourselves. Suppose the enqueue operation succeeds but the worker dies before saving that the message was enqueued(notice this is the same as Kafka ack), and we’re back where we started.

Only when the enqueue and saving state are atomic (either both succeed or neither succeeds), can we guarantee no duplicates. Achieving such atomicity requires additional coordination between systems, which increases complexity and often reduces throughput while adding latency.

To make sure no events are lost, this duplication is acceptable for our use case and we can try to handle the duplicate events in downstream workers.

Handling duplicate messages and worker failures

As established earlier, both the event queue and the delivery queue can have duplicate messages.

To differentiate between duplicate events, during enqueue, we can pass along a unique attempt_id with each message. Now each message inside the queue can be uniquely identified using the attempt_id. As we shall see, this attempt_id will help us handle duplicate messages for both queues.

To avoid processing duplicate messages, workers can try inserting message_id:attempt_id in Redis using an atomic “get or create” operation after dequeuing from the queue, with a TTL long enough in which we expect the worker to have processed the message.

# For event queue
SET event_id attempt_id NX GET TTL

# For delivery queue
SET event_id:recipient_id attempt_id NX GET TTL

The worker that succeeds with the insert will proceed with message processing.

The worker that received an attempt_id from Redis will check it against the attempt_id of the message:

  • If attempt_ids match:

    • This means the worker who created the entry in Redis and was responsible for the processing has failed and the message is redelivered so it must be processed again.
    • We can also track how many times the message has been retried and if the retry count is exceeded, the message can be moved to a Dead letter queue. This helps us bound the max retries the system will perform.
  • If attempt_ids are different

    • This means the message is a duplicate and can be discarded.

We use Redis’s atomicity guarantee to avoid processing duplicate events.

At least once semantics

Notice how the system prevented duplicate message processing using attempt_id, but it is still possible that the same message might be processed more than once in case of partial failures (the case when attempt_ids match), though the system can guarantee that all the messages will be processed at least once.

The system cannot guarantee exactly once processing of messages without making workers idempotent or by adding atomicity. Even then, a message can be processed more than once, but it will have the same side-effects as exactly once processing, which is just as good.

Partial failures inside fanout workers can cause duplicate messages inside the delivery queue. Similarly, partial failures inside delivery workers can cause recipients to receive the same message multiple times — Eliminating this requires making recipients idempotent, though this is not always feasible.

For example, if the delivery worker is sending an email: Once the email is sent and the delivery worker dies before acknowledging, another worker will again try to send an email and since sending an email is not idempotent, the recipient will see multiple emails in their inbox.

No machine is immune to failure, and networks are inherently unreliable. Therefore partial failures are unavoidable.


Wrapping up

So we split the fanout problem into 3 simpler problems: event consumption, event processing and delivery, each independently scalable and connected with queues. We used Redis’s atomicity to handle duplicates and we also realised why any system cannot guarantee exactly once processing due to partial failures (though it can pretend to do so!).


If you found this useful, share it with someone who’d enjoy it. You can also subscribe to my RSS feed to get notified when I publish new posts.


References