Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug Report]: RetryForever stuck after partitions rebalance #151

Open
1 task done
SonicGD opened this issue Apr 23, 2024 · 3 comments
Open
1 task done

[Bug Report]: RetryForever stuck after partitions rebalance #151

SonicGD opened this issue Apr 23, 2024 · 3 comments
Assignees
Labels
bug Something isn't working high priority High priority issue

Comments

@SonicGD
Copy link

SonicGD commented Apr 23, 2024

Prerequisites

  • I have searched issues to ensure it has not already been reported

Description

Hello. We are using RetryForeverMiddleware in our project. And we experienced this strange behavior. If there is partitions rebalance ( other worker join/leave group ) while message is in retry loop - the loop will stop and processing of partition will not continue.

I'm not sure that this is retry middleware problem, maybe it caused by KafkaFlow/Confluent.Kafka/librdkafka. But let's start here =)

Steps to reproduce

  1. https://github.com/SonicGD/KafkaRetryStuck - clone example project.
  2. You can use docker-compose.yml from sample repo to start kafka and zookeeper.
  3. Build project and go to bin/Debug/net7.0 directory
  4. Run app without any parameters - this will create topic and fill it with messages. One "bad" message will be produced.

image

  1. Now we can start our consumers. For example
./KafkaRetryStuck.exe 1
./KafkaRetryStuck.exe 2
./KafkaRetryStuck.exe 3

image

  1. Now we wait until one of the consumers will go to retry loop:

image

  1. And after that we just restart any other consumer to trigger rebalance:

image

  1. After rebalance partitions assigned again

image

  1. And nothing happening. We can see that there is lag for partition:
    image
    But no attempts to continue processing again.

  2. If we restart this stuck consumer - then processing will begin:

image

Expected behavior

After rebalance is complete consumer should again start to process "bad" message.

Actual behavior

Consumer is stuck, processing is stopped

KafkaFlow Retry Extensions version

3.0.1

@massada
Copy link

massada commented Jul 20, 2024

Hello @SonicGD
I've pin-pointed the problem in KafkaFlow.

The issue lies with the revoking handling code. The service responsible for pausing/resuming the consumer is being stopped before the retry loop has a chance to resume the consumer. Effectively not doing anything that actually resumes the consumer.

I've opened an issue there where you can read a bit more in detail and I've also opened a PR with a fix.

@massada
Copy link

massada commented Jul 20, 2024

Also, if you'd like to see the problem manifesting in you repo, change your handler to the following code:

using KafkaFlow;

namespace KafkaRetryStuck;

using KafkaFlow.Consumers;
using Microsoft.Extensions.Logging;

class MessagesHandler : IMessageHandler<TestMessage>
{
    private readonly IConsumerAccessor consumerAccessor;

    private readonly ILogger<MessageHeaders> logger;

    public MessagesHandler(IConsumerAccessor consumerAccessor, ILogger<MessageHeaders> logger)
    {
        this.consumerAccessor = consumerAccessor;
        this.logger = logger;
    }

    public async Task Handle(IMessageContext context, TestMessage message)
    {
        var pausedPartitions = this.consumerAccessor.GetConsumer(context.ConsumerContext.ConsumerName)
            .PausedPartitions;

        this.logger.LogInformation($"Paused partitions count {pausedPartitions.Count}");

        await Task.Delay(TimeSpan.FromSeconds(1));
        if (message.Id == TestMessage.BadId)
        {
            throw new InvalidOperationException("BAD MESSAGE");
        }
    }
}

It prints the number of paused partitions. You will see it print 0 right before the last retry of the loop, this is when the internal list of topic+partitions was cleared, and why the resume doesn't work.

22:33:46 info: KafkaFlow.MessageHeaders[0] Paused partitions count 8
22:33:58 warn: KafkaFlow[0] Partitions revoked | Data: {"GroupId":"consumers","ConsumerName":"consumer-2","Topics":[{"Topic":"test-topic","PartitionsCount":8,"Partitions":[1,4,7,10,13,16,19,22]}]}
22:34:12 info: KafkaFlow.MessageHeaders[0] Paused partitions count 0
22:34:13 fail: KafkaFlow[0] Exception captured by RetryForeverMiddleware. Retry in process. | Data: {"AttemptNumber":3,"WaitMilliseconds":120000,"PartitionNumber":16,"Worker":5,"ExceptionType":"System.InvalidOperationException"} System.InvalidOperationException: BAD MESSAGE    at KafkaRetryStuck.MessagesHandler.Handle(IMessageContext context, TestMessage message) in C:\source\KafkaRetryStuck\KafkaRetryStuck\MessagesHandler.cs:line 30    at KafkaFlow.Middlewares.TypedHandler.TypedHandlerMiddleware.Invoke(IMessageContext context, MiddlewareDelegate next)    at KafkaFlow.Middlewares.Serializer.DeserializerConsumerMiddleware.Invoke(IMessageContext context, MiddlewareDelegate next)    at Polly.AsyncPolicy.<>c__DisplayClass40_0.<<ImplementationAsync>b__0>d.MoveNext() --- End of stack trace from previous location ---    at Polly.Retry.AsyncRetryEngine.ImplementationAsync[TResult](Func`3 action, Context context, CancellationToken cancellationToken, ExceptionPredicates shouldRetryExceptionPredicates, ResultPredicates`1 shouldRetryResultPredicates, Func`5 onRetryAsync, Int32 permittedRetryCount, IEnumerable`1 sleepDurationsEnumerable, Func`4 sleepDurationProvider, Boolean continueOnCapturedContext)
22:34:28 info: KafkaFlow[0] Consumer resumed by retry process | Data: {"ConsumerGroup":"consumers","ConsumerName":"consumer-2","Worker":5}

@SonicGD
Copy link
Author

SonicGD commented Jul 21, 2024

Thank you so much! This issue caused us many problems. Our current fix is to use custom rerty middleware, which stores paused partitions list and assignment handler to restart consumer if those paused partitions are assigned again. Will wait for your fix to be merged :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working high priority High priority issue
Projects
None yet
Development

No branches or pull requests

4 participants