-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[flink-runner] Improve Datastream for batch performances #32440
Conversation
fea7323
to
893c19f
Compare
Assigning reviewers. If you would like to opt out of this review, comment R: @damccorm for label website. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
@kennknowles would you mind taking a look at this one? |
Reminder, please take a look at this pr: @damccorm |
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @melap for label website. Available commands:
|
R: @kennknowles |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
To test this thoroughly, let us add some of the postcommits by touching trigger files. In #32648 you can see how I edited the JSON files (including some new ones) and I think these are all the Flink-specific postcommit jobs. |
d88deed
to
e2fc26e
Compare
e060213
to
ae0f3b4
Compare
@kennknowles done. I also rebased master but some of the tests seem to be quite flaky now. There are test failing on things I did not touch (direct runner) and the Flink tests that are failing here are not failing on my machine... Any idea how I could make them work ? |
I opened jto#236 with some more trigger files. The "PVR" trigger files stands for "Portable Validates Runner" that isn't as directly impacted. I think the non-portable ValidatesRunner tests should test that the runner still complies with the model and passes the basic tests. |
Thanks! I just merged it. |
Hey @kennknowles! The Python PostCommits are failing but the error is:
which I think is unrelated to those changes and I could not find anything in the logs suggesting otherwise. |
Hey @kennknowles @jto does this PR have any next steps? |
I agree with your change to sickbay that Lifecycle test. It sounds like it is incorrectly specified, not taking into account allowable runner variation. Sorry for the delay which has led to conflicts - if you resolve them I think this is good to merge. |
cfdfbfd
to
7f83bbe
Compare
7f83bbe
to
24a5a51
Compare
Hey @kennknowles ! Thanks for following up. I merged the latest changes from master and the PR can now be merged. I had to add another test to the sickbay. Some of the Python tests are failing with the following exception:
which does not look like to be caused by this PR.
It'd be ideal if fixes were available and could be rebased into my branch to get all the tests green before merging but otherwise I think the changes in the Flink runner are in a good shape. |
Also history is a bit messy because I tested a bunch of options to get the test to pass. Cleaning it up right now :) |
…stSingleton to sickbay restore gradle parallel mode
e147b02
to
aaaa023
Compare
Ready for merge :) |
@@ -251,7 +257,7 @@ def createValidatesRunnerTask(Map m) { | |||
def pipelineOptionsArray = ["--runner=TestFlinkRunner", | |||
"--streaming=${config.streaming}", | |||
"--useDataStreamForBatch=${config.useDataStreamForBatch}", | |||
"--parallelism=2", | |||
"--parallelism=1", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like it could be a problem, actually. Why did you have to do this? Memory pressure on test workers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I think validate runner was getting OOMKilled
#parallelism.default: 23 | ||
taskmanager.memory.network.fraction: 0.2 | ||
taskmanager.memory.network.max: 1gb | ||
pipeline.operator-chaining.enabled: false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this OK? I guess it forces more serialization so everything will get slower during testing, but coder errors will be caught. Could it also mask errors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hum tbh I don't really remember why I changed that. I'll check maybe this is not required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey! So at least locally it does not break the test. I'll make another PR to revert this change.
Thanks for reviewing and merging :) |
@kennknowles made a PR here: #33838 |
* [Flink] Set return type of bounded sources * [Flink] Use a lazy split enumerator for bounded sources * [Flink] fix lazy enumerator package * [Flink] Default to maxParallelism = parallelism in batch * [Flink] Avoid re-serializing trigger on every element * [Flink] Avoid re-evaluating options every time a new state is stored * [Flink] Only serialize states namespace keys if necessary * [Flink] Make ToKeyedWorkItem part of the DoFnOperator * [Flink] Remove ToBinaryKV * [Flink] Refactor CombinePerKeyTranslator * [Flink] Combine before Reduce (no side-input only) * [Flink] Implement partial reduce * [Flink] dead code cleanup * [Flink] persistent PartialReduceBundleOperator operator state * [Flink] Combine before GBK * [Flink] Combine before reduce (with side input) * [Flink] Force slot sharing group in batch mode * [Flink] Disable bundling in batch mode * [Flink] Lower default max bundle size in batch mode * [Flink] Code cleanup * [Flink] fix WindowDoFnOperatorTest * [Flink] Remove 1.14 compat code * [Flink] Fix flaky test * [Flink] Use a custom key type to better distribute load * [Flink] fix GBK streaming with side input * [Flink] fix error management in lazy source * [Flink] disable operator chaining in validatesRunner * [Flink] fix lazy source enumerator behaviour on error * [Flink] set validates runner parallelism to 1 * [Flink] add org.apache.beam.sdk.transforms.ParDoTest to sickbay * [Flink] fix lazy source enumerator behaviour on error (again) * [Flink] Add org.apache.beam.sdk.transforms.ViewTest.testTriggeredLatestSingleton to sickbay
Context
Flink will drop support for the dataset API in 2.0 which should be released by EOY so it quite important for Beam to support Datastream well.
The PR
This PR improves the performances of Batch jobs executed with
--useDatastreamForBatch
by porting the following performance optimizations already present inFlinkBatchTransformTranslators
but lacking inFlinkStreamingTransformTranslators
.It also implements the following optimizations:
maxParallelism
toparallelism
as the total number of key groups is equal tomaxParallelism
. Again this reduces skew.ToKeyedWorkItem
part ofDoFnOperator
which reduces the size of the job graph and avoid unnecessary inter-task communication.GBK -> map -> CombinePerKey
). Add a flag to control this feature (defaults to active).Benchmarks
The patched version was tested against a few of Spotify's production batch workflows. All settings were left unchanged except for the followings:
--useDatastreamForBatch=true
jobmanager.scheduler: default
(otherwise datastream default to adaptive scheduler).Note
Job 3 fails with a stackoverflow exception because of a bug in versions of Kryo < 3.0. I believe this is because the job uses
taskmanager.runtime.large-record-handler: true
and it should be fixed in Flink 2.0 since Kryo is upgraded to a more recent version.Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.