Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The emitter.hasRequests method does not take into account the buffer size #2465

Open
Sgitario opened this issue Feb 1, 2024 · 10 comments
Open
Assignees

Comments

@Sgitario
Copy link

Sgitario commented Feb 1, 2024

We need to send a large number of events at the same time to a Kafka topic through an emitter:

public class EmitterService {
  private final Emitter<MyMessage> emitter;

  public EmitterService(@Channel("events-out") Emitter<MyMessage> emitter) {
    this.emitter = emitter;
  }

  public void send(Message<T> message) {
    emitter.send(message);
  }
}

Then, when sending a large number of events, we get the following exception:

java.lang.IllegalStateException: SRMSG00034: Insufficient downstream requests to emit item
        at io.smallrye.reactive.messaging.providers.extension.ThrowingEmitter.emit(ThrowingEmitter.java:65)
        at io.smallrye.reactive.messaging.providers.extension.AbstractEmitter.emit(AbstractEmitter.java:176)
        at io.smallrye.reactive.messaging.providers.extension.EmitterImpl.send(EmitterImpl.java:47)
        at com.redhat.swatch.kafka.EmitterService.send(EmitterService.java:57)

This is an expected situation and happening because we are suffering some network slowness that we can't control and the Kafka broker does not respond as quickly as we would like.

To address this issue, we're implementing the following back-pressure strategy:

public class EmitterService {
  private final Emitter<MyMessage> emitter;

  public EmitterService(@Channel("events-out") Emitter<MyMessage> emitter) {
    this.emitter = emitter;
  }

  public void send(Message<T> message) {
    while (!emitter.hasRequests()) {
      sleep();
    }

    emitter.send(message);
  }

  private void sleep() {
    try {
      Thread.sleep(50);
    } catch (InterruptedException ex) {
      log.warn("Thread was interrupted to send messages to the Kafka topic. ", ex);
      Thread.currentThread().interrupt();
    }
  }
}

However, we're hitting the same exception than above and the problem seems to be that the hasRequests method takes into account only the downstream capacity (which can be configured using the max-inflight-messages property - default value is 1024), but the exception is being thrown when the buffer limit is exceeded (which can be configured using the annotation @OnOverflow(value = BUFFER, bufferSize = XX) - default value is 128).

Our existing workaround is to configure the max-inflight-messages to 128 (which is the default value of the buffer size).

Implementation ideas

  • Either the ThrowingEmitter.requested method could return the minimal value of the capacity of the buffer and the capacity of the downstream emitter:
public long requested() {
        return Math.min(requested.get(), delegate.requested());
    }
  • Or somehow expose the current capacity of the buffer, so we can wait some time or not.
  • Or implement the back-pressure strategy in a different way (proposals are welcome)
@ozangunalp
Copy link
Collaborator

I'll create a reproducer in the upstream repo (https://github.com/smallrye/smallrye-reactive-messaging), where we have tests to inject latency.

@ozangunalp
Copy link
Collaborator

@Sgitario here are some tests simulating your case: #2466

Could you look at how we can improve the situation in this controlled scenario?

@ozangunalp ozangunalp self-assigned this Feb 1, 2024
@Sgitario
Copy link
Author

Sgitario commented Feb 1, 2024

@Sgitario here are some tests simulating your case: #2466

Could you look at how we can improve the situation in this controlled scenario?

Many thanks @ozangunalp ! I will give my feedback asap (probably tomorrow)

@ozangunalp
Copy link
Collaborator

No worries, I am thinking that if you've the hasRequests check in place you should never have the error Insufficient downstream requests to emit item. Maybe it is a race condition.

@Sgitario
Copy link
Author

Sgitario commented Feb 1, 2024

No worries, I am thinking that if you've the hasRequests check in place you should never have the error Insufficient downstream requests to emit item. Maybe it is a race condition.

I think the problem is that the hasRequests method only depends on the max-inflight-messages property which default value is 1024, but the exception is being thrown depending on the buffer size which default value is 128.
Therefore, if you trust on the hasRequests method and keep sending more than 128 messages (in a very poor performant environment), you will eventually hit the exception.

@Sgitario
Copy link
Author

Sgitario commented Feb 2, 2024

@ozangunalp I took a look into the tests you wrote and it perfectly exercises the use case, super many thanks!
As expected, the tests are failing.
My proposal is to change the ThrowingEmitter.requested() method to:

public long requested() {
        return Math.min(requested.get(), delegate.requested());
    }

So, it takes into account both capacities (the buffer size and the downstream capacity).
I'm not sure about the correctness of this solution, but we can't expose the buffer size capacity because this is an implementation detail that strongly depends on the emitter configuration and thus we can't add a method for this into the Emitter interface.

With the above change, the tests are now passing.

@ozangunalp
Copy link
Collaborator

I'll push more cases once I finish. I am a bit puzzled how the requested change is fixing the tests. (Doesn't really change for me)

For example, why not return the requested.get() directly instead?

I still think that you are hitting that error case because of a race condition.

I reckon if you do the requested check + send block in a synchronized or a lock you'd not hit the error.

@Sgitario
Copy link
Author

Sgitario commented Feb 2, 2024

For example, why not return the requested.get() directly instead?

Because in case of the max-inflight-messages value is greater than the bufferSize, we would be hitting the same issue (maybe a different exception will be thrown due to the downstream capacity is violated).

I'll push more cases once I finish. I am a bit puzzled how the requested change is fixing the tests. (Doesn't really change for me)

This is odd. I could reproduce the issue with the tests you wrote and after I changed the ThrowingEmitter.requested() method, the same tests that were failing worked.

Note that the hasRequests() method internally uses the ThrowingEmitter.requested() method, so it will block the emission of messages because of the bufferSize limit.

Sgitario added a commit to RedHatInsights/rhsm-subscriptions that referenced this issue Feb 2, 2024
This solution has been agreed with Clement (the maintainer of the Quarkus messaging extensions). He suggested to use the emitter.hasRequests method to be sure the broker can handle more messages. The hasRequests works like a capacity barrier: when we send up to 1024 messages (1024 is the default value) without getting an ack from the broker, we wait until sending more messages).

Update: there seems to be two upper limits, (1) the one configured by the max-inflight-messages which is the one used by the hasRequests method and default value is 1024; and (2) the buffer limit which is used by the overflow strategy and default value is 128. Therefore, for this solution to work, we need to set the max-inflight-messages limit to the same as the buffer limit.

Relates: smallrye/smallrye-reactive-messaging#2465
@ozangunalp
Copy link
Collaborator

I just updated the tests and added a couple more of them. In my runs, the only one that's failing is the disabled one with unbounded concurrency with virtual threads without synchronization.
Previously tests were removing the latency too soon.

Here is what I've learnt from setting up these tests:

I think you are misunderstanding how ThrowingEmitter works.
The max-inflight-messages control the number of concurrent items the downstream (Kafka sender) requests (defaults to 1024).
So the ThrowingEmitter will start with the bufferSize (default 128) of available requests and add the downstream requests (1024) on top of that.
Therefore won't (normally) hit the Insufficient downstream requests error until 1152 unsatisfied requests. That is 1152 records trying to be sent from the Emitter that aren't transmitted to the broker.

However, if the hasRequested() check is used, as you've noticed it will effectively look at the downstream requested (and not its requested counter) and stop at 1024. With high levels of concurrency, if the hasRequested check is not single entry, you can spill into the buffer, and that is usually not a problem, thanks to the buffer.

Depending on the level of concurrency and the Kafka send latency, if the hasRequested check is not synchronized you'll eventually hit the race condition, fill the buffer and hit the insufficient downstream requests error.
This is shown by the disabled test using VTs.

Please tell me if you are having different results running these tests.

@Sgitario
Copy link
Author

Sgitario commented Feb 5, 2024

Depending on the level of concurrency and the Kafka send latency, if the hasRequested check is not synchronized you'll eventually hit the race condition, fill the buffer and hit the insufficient downstream requests error. This is shown by the disabled test using VTs.

This is indeed our case. We're hitting this issue in our production environment that uses Java 17.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants