-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-6371] Indexing catchup tasks should handle failed commits based on heartbeat #12369
[HUDI-6371] Indexing catchup tasks should handle failed commits based on heartbeat #12369
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple of confirmations. overall looks ok, if assumptions are right
try { | ||
if (table.getConfig().getFailedWritesCleanPolicy().isLazy() && heartbeatClient.isHeartbeatExpired(instant.requestedTime())) { | ||
LOG.info("Ignoring instant " + instant + " as heartbeat expired"); | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please dont return null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, should we call heartbeatExists
, followed by isHeartbeatExpired
with the heart beat client, rather than directly calling isHeartbeatExpired.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let me summarize my understanding.
I also went over the design in the RFC https://github.com/apache/hudi/blob/master/rfc/rfc-45/rfc-45.md as well.
6 Start indexing catchup in a separate thread (that can be interrupted upon timeout)
6.a For each instant to catchup
6.a.i if instant is completed and has corresponding deltacommit in metadata timeline then continue
6.a.ii if instant is inflight, then reload active timeline periodically until completed or timed out
6.a.iii update metadata table, if needed, within a lock
so, in this case, just that we missed to account for a failed write scenario.
a. for a single writer, we should ignore the instant to catch up and continue in the loop to next instant.
b. for multi-writer scenario, if there is active heart beat, we have to wait. so that it either goes to completion. or it elapses time out on which case we can again ignore from the list of instant to catchup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and yes. same comment as vc.
awaitInstantCaughtUp(HoodieInstant instant) does not change the instant anyways. So, why not return boolean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't recall why it was this way but no use, so now i have changed it to boolean and heartbeat check is as follows:
- If no heart beat, then ignore and just move on.
- If heart beat exists, but expire, then also ignore and move on. If not expired then wait.
...ient-common/src/main/java/org/apache/hudi/table/action/index/IndexingCatchupTaskFactory.java
Outdated
Show resolved
Hide resolved
...ient-common/src/main/java/org/apache/hudi/table/action/index/IndexingCatchupTaskFactory.java
Outdated
Show resolved
Hide resolved
try { | ||
if (table.getConfig().getFailedWritesCleanPolicy().isLazy() && heartbeatClient.isHeartbeatExpired(instant.requestedTime())) { | ||
LOG.info("Ignoring instant " + instant + " as heartbeat expired"); | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and yes. same comment as vc.
awaitInstantCaughtUp(HoodieInstant instant) does not change the instant anyways. So, why not return boolean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2 minor comments
Change Logs
Indexing catchup task iterates over all the commits to sync them to MDT. It will periodically check if the commit completed. But if the commit actually failed then it will forever be in inflight mode and hence the indexing run will not progress. This PR fixes the issue by using heartbeat to detect failed commits and ignore them. Without the change in
awaitInstantCaughtUp
, the new testtestHeartbeatExpired
will fail.Impact
Indexer can be used even with pending/failed commits.
Risk level (write none, low medium or high below)
low
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist