Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReactorKafkaBinder - not respecting back pressure (identical vanilla ReactorKafka consumer does) #2957

Closed
adamsmith118 opened this issue Jun 5, 2024 · 11 comments

Comments

@adamsmith118
Copy link

adamsmith118 commented Jun 5, 2024

Expected behaviour

Parity in how back pressure is handled when using the SCS ReactorKafkaBinder and ReactorKafka directly.

If this isn't expected, guidance on how to achieve parity would be appreciated.

Actual behaviour

Consider two consumer implementations performing identical tasks; a simulation of some work followed by a WebClient call.

Using Reactor Kafka Directly

max.poll.records=1 for both, to keep things simple.

  @EventListener(ApplicationStartedEvent.class)
  public Disposable start() {
    log.info("Starting receiver");

    var client = WebClient.create("http://localhost:9063/actuator/health");

    ReceiverOptions<String, String> ro = ReceiverOptions.<String, String>create...// Omitted.  See code.

    return KafkaReceiver.create(ro)
        .receive()
        .concatMap(event -> {
          log.info("Got an event: {}", event);
          pretendWork(1000);

          return client.get()
              .retrieve()
              .bodyToMono(String.class)
              .doOnNext(s -> log.info("Result = {}", s))
              .doOnSuccess(s -> event.receiverOffset().acknowledge());
        }, 1)
        .subscribe();
  }

  @SneakyThrows
  private void pretendWork(int ms) {
    if (ms > 0) {
      log.info("Sleeping for {}ms", ms);
      Thread.sleep(ms);
    }
  }

Using SCS Reactor Kafka Binder

  @Bean
  public Function<Flux<Message<String>>, Mono<Void>> test() {
    var client = WebClient.create("http://localhost:9063/actuator/health");

    return events -> events.concatMap(event -> {
      log.info("Got an event: {}", event);
      pretendWork(1000);

      return client.get()
          .retrieve()
          .bodyToMono(String.class)
          .doOnNext(s -> log.info("Result = {}", s))
          .doOnSuccess(s -> event.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, ReceiverOffset.class).acknowledge());
    }, 1).then();
  }

  @SneakyThrows
  private void pretendWork(int ms) {
    if (ms > 0) {
      log.info("Sleeping for {}ms", ms);
      Thread.sleep(ms);
    }
  }

In reactor kafka (example 1) we can see behaviour in-line with back pressure requirements. One record is emitted at a time. The consumer pauses as necessary.

r.k.r.internals.ConsumerEventLoop        : Emitting 1 records, requested now 0
o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-test-group-1, groupId=test-group] Committing offsets: {test-1=OffsetAndMetadata{offset=78
r.k.r.internals.ConsumerEventLoop        : onRequest.toAdd 1, paused true
r.k.r.internals.ConsumerEventLoop        : Consumer woken
...etc

In Spring Cloud Streams, using the ReactorKafkaBinder, this isn't the case.

Here, 100's of records are emitted to the sink:

r.k.r.internals.ConsumerEventLoop        : Emitting 1 records, requested now 0
r.k.r.internals.ConsumerEventLoop        : onRequest.toAdd 1, paused false
r.k.r.internals.ConsumerEventLoop        : Emitting 1 records, requested now 0
r.k.r.internals.ConsumerEventLoop        : onRequest.toAdd 1, paused false
r.k.r.internals.ConsumerEventLoop        : Emitting 1 records, requested now 0
r.k.r.internals.ConsumerEventLoop        : onRequest.toAdd 1, paused false
r.k.r.internals.ConsumerEventLoop        : Emitting 1 records, requested now 0
r.k.r.internals.ConsumerEventLoop        : onRequest.toAdd 1, paused false
r.k.r.internals.ConsumerEventLoop        : Emitting 1 records, requested now 0
<snip>

This causes problems if a rebalance occurs during a period of heavy load as the pipeline can contain 100's of pending records.
We'd need to set an intolerably high maxDelayRebalance to get through them all or handle lots of duplicates.

Logs resembling the below are visible during a rebalance.

Rebalancing; waiting for 523 records in pipeline

Presumably something in the binder/channel implementation is causing this?

Repro

See here for complete repro.

Requires a Kafka on localhost:9092 and a topic called "test".

  • Producer class in test will send 100 messages.
  • demo.reactor.DemoReactorApp - Pure Reactor Kafka example
  • demo.streams.DemoStreamsApplication - Spring Cloud Streams example

DEBUG logging has been enabled for the ConsumerEventLoop for emit visibility.

Environment details

Java 21
Boot 3.2.2
SCS: 4.1.0
Reactor Kafka: 1.3.22

Loosely related issue here.

@olegz
Copy link
Contributor

olegz commented Jun 5, 2024

KafkaBinder is not a reactive source or target, so it simply will not honor some of the reactive features (back pressure is one of them). That is by definition. It is imperative binder adapted to work with reactive functions - nothing more.
If you want to rely on such features consider using KafkaStrams binder - a fully reactive binder

@olegz
Copy link
Contributor

olegz commented Jun 5, 2024

Tagging @sobychacko

@adamsmith118
Copy link
Author

adamsmith118 commented Jun 5, 2024

@olegz This is using the Reactive Kafka Binder (i.e. the one in spring-cloud-stream-binder-kafka-reactive), not the old imperative one.

Are you saying this doesn't honour back pressure either?

To be clear - my issue is that the ReactorKafkaBinder in Spring Cloud Streams does not exhibit the same back pressure behaviour as when using a reactive KafkaReceiver directly (even though the Reactive Kafka Binder uses it internally).

@olegz
Copy link
Contributor

olegz commented Jun 5, 2024

No, i just wanted to clarify. With reactive binder we need to look, hence tagging @sobychacko

@adamsmith118
Copy link
Author

Ok - thanks.

I'll have a stab at debugging it now.

@adamsmith118
Copy link
Author

adamsmith118 commented Jun 5, 2024

If I update FluxMessageChannel to:

  • Use .publish(1).refCount() instead of share() (which specifies its own prefetch/queue defaults)
  • Remove the .publishOn(this.scheduler)

Then I get behaviour I want (if that's in any way useful?).

@artembilan
Copy link
Member

OK. I might see the value of the .publish(1).refCount(), and FluxMessageChannel might be fixed for that instead of Queues.SMALL_BUFFER_SIZE.
But why to remove .publishOn(this.scheduler)?

Typically used for fast publisher, slow consumer(s) scenarios.

according to its Javadocs.

@adamsmith118
Copy link
Author

adamsmith118 commented Jun 6, 2024

Hi @artembilan,

Thanks for your response.

Whilst I see your point about publishOn() I wonder if this choice should be left to the developer rather than being baked in to the framework? For example, I can easily add this into my consumer code if needed.

Open to suggestions...

@artembilan
Copy link
Member

Closed in favor of spring-projects/spring-integration#9215.
There is just nothing to do on the Spring Cloud Stream side.
The fix in Spring Integration is going to be available as transitive dependency in the next release over here.

Thank you for your contribution!

@artembilan artembilan closed this as not planned Won't fix, can't repro, duplicate, stale Jun 6, 2024
@artembilan
Copy link
Member

Spring Integration 6.2.6-SNAPSHOT (or 6.3.1-SNAPSHOT) is available if you wish to give it a try in your project with Reactive Kafka Binder.

@sobychacko
Copy link
Contributor

Keep in mind to override the spring-integration version in the application since the one that Spring Cloud Stream brings is what is currently available via Boot (which is not the snapshot version right now).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants