-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-38615][python] Optimize AsyncFunction to remove ResultFuture #27184
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
3fbf8f5 to
e7cbc94
Compare
|
|
||
|
|
||
| class ResultFuture(Generic[OUT]): | ||
| """ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should deprecate the old API and introduce this, in this way exiting applications will not break and the older API version can be managed out. This is less a consideration for master - but is a consideration for the backports.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@davidradl Good point! This is introduced in Flink 2.2 and so there is no backward compatibility issues (I'd like to make sure this is merged into Flink 2.2).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change is great as it is more pythonis. My only concern is that the exception propagation is somehow different.
| # complete with empty result, so that we remove timer and move ahead processing | ||
| self._process_results([]) | ||
|
|
||
| raise RuntimeError("The 'result_future' of AsyncFunction should be completed with " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we get rid of 'result_future' in the AsyncFunction, we may change the error description here as well when users return non-iterable result
| async def async_invoke(self, value: Row): | ||
| await asyncio.sleep(2) | ||
| result_future.complete([value[0] + value[1]]) | ||
| return [value[0] + value[1]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I modify the test a little to return None and I find that the test can run without throwing The 'result_future' of AsyncFunction should be completed with ... exception.
Is that expected?
13de53d to
fdedefc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the great work, LGTM!
What is the purpose of the change
Brief change log
Verifying this change
This change is already covered by existing tests in test_async_function.py*.
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (yes / no)Documentation