Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: Emit watermarks from the IcebergSource #8553
Flink: Emit watermarks from the IcebergSource #8553
Changes from 12 commits
d592d27
b2ff85f
7771586
cf8e080
d5226a7
7283b56
4f11bba
0734faf
b3c41bf
6f1f521
6e2cda8
f80ffb5
3ccdcf1
3f823f3
d6dc3a0
File filter
Filter by extension
Conversations
Jump to
There are no files selected for viewing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
based on the file statistics
->based on the min value of column statistics from file metadata
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
surface an earlier comment again on avoiding multiple small files in one split. it can increase out of orderliness, as multiple files with different time ranges are merged into one split.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The focus of the feature is correct watermark generation, and we need to make sure that the watermarks are emitted in order, but this does not mean automatically that the records need to be emitted in order too. These are two different aspects of a data stream.
In case of combined splits, we do not advance the watermark, so it doesn't cause issues wrt watermark generation. The user can decide if the record out of orderness is a problem them. If they decide so, they can set the configuration, but if they have enough memory, to keep the state, they can decide that reading speed (combining files to splits) is more important than reading files in order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
those are reasonable points. I would be interested in other people's take.
To me, it is more important to limit the out of orderliness by default. that is the whole point of watermark alignment. I would be ok to sacrifice some read throughput with smaller files. Also, with stateful applications, source read throughput is rarely the bottleneck. typical bottleneck is the stateful operator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least when we use event-time alignment, the only bound we expect to be respected is the main "max out-of-orderness" constraint; the ordering of rows before the watermark is advanced isn't something we rely on. Our custom operators that use event-time won't emit results till the watermark is advanced beyond our timers anyway. So, even if the rows were received in perfect order, they'd still be buffered into state, which suggests to me there is little speed or memory benefit. Am I missing anything here?
We do use some custom process-time operators, where, in theory, less out-of-orderness would give more accurate results. But we discard the results emitted during the Iceberg backfill phase anyway, since our data are partitioned by day & the out-of-orderness allowed by ~25 hours constraint (we set it slightly above 24 hours as a precaution to avoid the aligner getting stuck w/ daily partitioned files) is too high for accurate-enough results.
I'm a bit confused on how
read.split.open-file-cost
relates to the code line this discussion is tagged at, and if maybe I'm not fully understanding.Does the current code try to respect
read.split.open-file-cost
when selecting files to include in a split?The only other case I can think of where ordering of row reads is key is in minimizing "straggler" files that would hold up reads, i.e. if all files within a perfectly sorted daily partition have been read except for the one with the earliest timestamp, a max out-of-orderness of ~24 hours would mean most SplitReaders are idle, since they cannot read more than 24 hours ahead of the min timestamp. But AFAIK, the current PR's enumerator sorts the files by min timestamp & assigns them in order for this exact reason, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dchristle: The code line is mostly irrelevant for the conversation 😄
Iceberg planner collects the files which should be scanned by the execution engine, and creates
ScanTask
s from them. If a file is big, then it creates multiple splits for a given file, so multiple readers could read the given file parallel. OTOH if there are multiple small files, then it combines them to aCombinedScanTask
which are read by a single reader - this way decreasing the number of splits, split assignments etc.We generate a single watermark for every
ScanTask
. OneCombineScanTask
could group multiple data files, with wide range of timestamps, so generating a single watermark for it could be suboptimal. Settingread.split.open-file-cost
could prevent the creation suchCombinedScanTask
s, and could result in better ordered input, and finer grained watermarks.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stevenzwu @pvary , given that the combining logic was already present before this PR and is independent, I suggest we keep the current default behaviour. (Like it's currently done in the PR)
@stevenzwu if you feel that the default combining logic should change for the Flink source, please open a separate discussion, but I personally feel that the current default is reasonable. This way Flink aligns with the default iceberg planner behaviour, but this should not be discussed further here I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fine with tabling this as a follow-up.
@pvary not sure every users understand the internal details. At least, we can at least document this option of disabling combining for better ordering in the doc then. Then users can make an informed choice. @dchristle can probably also chime in here and help review the doc change in a follow up PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Watermark alignment is one of the use-cases of watermarks. Watermarks could be used for handling late data, and windowing too. So for now, added this the the javadoc of the
watermarkColumn
method:I hope this will help users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pvary let's create an issue to follow up on the time unit when PR #9008 is merged
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the pointer!
Created #9137 to tackle this