-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Error handling for streaming reactive controllers #10058
base: 4.5.x
Are you sure you want to change the base?
Conversation
Error handling for streaming reactive controller methods, such as those that use a `Flux` as the return type, is improved such that immediate error signals in the stream that occur before any data has been written will be routed appropriately to any user-supplied `@Error` handler methods. A new test specification is added to cover this scenario, and the TCK's `ErrorHandlerFluxTest` is improved to cover the enhanced error handling.
} | ||
bufferWritten = true; | ||
bufferedFirstValue = null; | ||
subscriber.request(Long.MAX_VALUE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can't do that, backpressure must be maintained
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this was a mistake. I've replace this line with an onRequest callback on the sink that will still respect demand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it can still lose requests when the subscription happens after the downstream request
} | ||
|
||
@SuppressWarnings("unchecked") | ||
private CorePublisher<MutableHttpResponse<?>> chunkedResponsePublisher(PropagatedContext propagatedContext, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this needs to be thread safe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yawkat Maybe I'm missing something...how is this method not thread-safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for example the second accept can be called concurrently with any of the methods of StreamSubscriber
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, you mean the publisher itself. That is a good point. I don't believe that will currently happen just because of the way the publisher is being consumed by the rest of the pipeline, but that is definitely too unsafe an assumption.
The general idea is correct but implementing a custom reactive publisher is complicated wrt concurrency, backpressure, nesting etc.. We regularly have bugs like #10017 even with simpler code than this. This PR needs a better reactive processor (without FluxSink etc bc of backpressure). Even better would be if you could come up with a solution using the built-in reactor operators. I'm not sure if it's possible, but if this PR used only normal operators, it'd be much easier to be sure it works for all the edge cases. |
@yawkat FluxSink supports backpressure and I have made a quick fix to take advantage of that. I agree in that I would prefer a simpler solution. I attempted several other approaches purely using the built-in operators before settling on this...they all failed to support this use case in a way that did not result in problems such as the first item in the stream needing to generated twice, etc. Now that I have a working solution and understand the necessary flow completely, I can revisit it one more time to see if there is a way to reproduce this flow purely using the built-in operators, but I too am skeptical that it is possible. |
Thinking through this some more, I agree that I need to try harder to simplify this implementation further. I've got some further ideas to try...I'm going to convert this to a draft until I have a chance to explore them. |
Error handling for streaming reactive controller methods, such as those that use a
Flux
as the return type, isimproved such that immediate error signals in the stream that occur before any data has been written will be routed
appropriately to any user-supplied
@Error
handler methods.A new test specification is added to cover this scenario, and the TCK's
ErrorHandlerFluxTest
is improved to coverthe enhanced error handling.
This resolves micronaut-projects/micronaut-reactor#238