Retrying consumer architecture in the Apache Kafka

Łukasz Kyć

In this article I would like to explain what kind of problems can we encounter in Kafka messages processing and how can we deal with them. Before you go further basic Kafka knowledge is recommended.

Basics of Apache Kafka

Apache Kafka is a popular distributed streaming platform. It is used for building real-time data pipelines, but because of persistence of topics it can be also used as the messages stream storage for processing historical data. In order to improve the scalability Kafka topic consists of one or more partitions. Under the hood one partition is represented as one append-only file. Using such a simple data structure results in a very high throughput. The internal topic format has very important implication – we can consume messages from one topic partition one by one in sequential order only.
The sequential anatomy of a topic partition is presented in the picture below:

Message processing problems

Implementation of a consumer that processes messages immediately just after receiving them from the Kafka topic is very straightforward. Unfortunately, the reality is much more complicated and the message processing might fail because of various reasons. Some of those reasons are permanent problems, like failure on the database constraint or invalid message format. Other, like temporary unavailability of dependent system that is involved in message handling, can be resolved in the future. In those cases retrying of the message processing might be a valid solution.

Simple retry logic

In the simplest form we can just retry message processing indefinitely with some fixed delay between subsequent retries. Sample pseudo-code of the consumer might look like this:


void consumeMainTopicWithSimpleRetry() {
    while (true) {
        boolean processedSuccessfully = true;
        Message message = takeNextMessage("main_topic");
        do {
            try {
                process(message);
            } catch (Exception ex) {
                waitSomeTime();
                processedSuccessfully = false;
                LOGGER.warn("Message processing failure. Will try once again.", ex);
            }
        } while (!processedSuccessfully);
    }
}

Non-blocking retry logic

In streaming systems, like Kafka, we cannot skip messages and come back to them later. Once we move the pointer, called offset in Kafka, of current message we cannot go back. Just for simplicity let’s assume that the consumer offset is remembered just after successful message processing. In such situation we cannot take the next message unless we process the current successfully. If processing single message fails constantly it stops system from handling next messages. It is obvious we would like to avoid such scenario because very often failure of one message handling does not imply failure of next messages handling. Moreover, after longer time, for example one hour, the processing of failed messages may succeed for various reason. On of them can be, that the system we are depending on, is up once again. What can we do then to improve this naive implementation?

On message processing failure we can publish a copy of the message to another topic and wait for the next message. Let’s call the new topic the ‘retry_topic’. The consumer of the ‘retry_topic’ will receive the message from the Kafka and then will wait some predefined time, for example one hour, before starting the message processing. This way we can postpone next attempts of the message processing without any impact on the ‘main_topic’ consumer. If processing in the ‘retry_topic’ consumer fails we just have to give up and store the message in the ‘failed_topic’ for further manual handling of this problem. The ‘main_topic’ consumer code may look like this:


void consumeMainTopicWithPostponedRetry() {
    while (true) {
        Message message = takeNextMessage("main_topic");
        try {
            process(message);
        } catch (Exception ex) {
            publishTo("retry_topic");
            LOGGER.warn("Message processing failure. Will try once again in the future.", ex);
        }
    }
}

And the code of the ‘retry_topic’ consumer:


void consumeRetryTopic() {
    while (true) {
        Message message = takeNextMessage("retry_topic");
        try {
            process(message);
            waitSomeLongerTime();
        } catch (Exception ex) {
            publishTo("failed_topic");
            LOGGER.warn("Message processing failure. Will skip it.", ex);
        }
    }
}

Flexible non-blocking retry logic

The aforementioned approach looks good, but there are still some elements to improve. The depending system may be down for longer time than we expected. To solve the problem we should retry many times before we finally give up. In order to avoid flooding the external system or overusing the CPU because of the retry logic, we can increase the interval for subsequent attempts. Let’s improve the logic!

Assuming we want to have the following retrying strategy:
– Every 5 minutes – 2 times
– Then after 30 minutes – 3 times
– Then after 1 hour only one time
– Then we skip the message

We can represent it as a sequence of values: 5m, 5m, 30m, 30m, 30m, 1h. It also means that we have maximum 6 retries, because the sequence has 6 elements.

Now we can create 3 separate topics for retry logic handling, each for only one delay value:
– ‘retry_5m_topic’ – for retry in 5 minutes
– ‘retry_30m_topic’ – for retry in 30 minutes
– ‘retry_1h_topic’ – for retry in 1 hour

The message routing algorithm is very similar like in the previous approach. It only extends it from 1 to 3 available delay values and allows to retry predefined number of times.

Now let’s consider the following scenario. One new message was written to the topic ‘main_topic’. If the processing of this message fails, then we should try once again in 5 minutes, since 5m is the first value in the Retries Sequence. How can we do it? We should write a new message to the ‘retry_5m_topic’ that wraps the failed message and adds 2 fields:
– ‘retry_number’ with value 1
– ‘retry_timestamp’ with value calculated as now + 5 minutes

It means that that the ‘main_topic’ consumer delegates the responsibility of the failed message processing to another component. The ‘main_topic’ consumer is not blocked and can take the next message. The ‘retry_5m_topic’ consumer will receive the message published by the ‘main_topic’ consumer immediately. It has to read the ‘retry_timestamp’ value from the message and wait until that moment, blocking the thread. After the thread wakes up, it will try to process the message once again. If successfully then we can take the next available message. Otherwise we have to try once again because the Retries Sequence has 6 elements and current retry was the first. What we have to do is to clone the message, increment the ‘attempt_number’ value (it will be 2) and set the ‘retry_timestamp’ value as now + 5 minutes (because the second value in the Retries Sequence is 5m). The message clone will be published to the ‘retry_5m_topicv once again. You can notice that on each message processing failure, the copy of the message will be routed to one of ‘retry_5m_topic’, ‘retry_30m_topic’ or ‘retry_1h_topic’ topics. The very important thing is not to mix messages in one topic with ‘retry_timestamp’ property calculated from different delay values.
If we reach the last element in the Retries Sequence it means that it was the last attempt. Now it’s time to say “stop”. We will write the message to the ‘failed_topic’ and treat this message as not processed. Someone has to handle it manually or we just forget about it.
The picture below may help you to understand the message flow:

Summary

As you could notice, implementation of postponing message processing in case of some failures, is not a trivial thing to do. Have in mind that:
– Messages can be consumed from topic partitions in sequential order only
– You cannot skip messages and come back to them later
– If you want to postpone processing of some messages you can republish them to separate topics, one for each delay value
– Processing failed messages can be achieved by cloning the message and republishing it to one of retry topics with updated information about attempt number and next retry timestamp
– Consumers of retry topics should block the thread unless it is time to process the message
– Messages in retry topics are naturally organized in the chronological order, sorted by the ‘retry_timestamp’ field


Find Pragmatists on Facebook and Twitter

comments powered by Disqus