-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[MINOR] Fix skew in clustering operator #12765
base: master
Are you sure you want to change the base?
Conversation
for (HoodieClusteringGroup clusteringGroup : clusteringPlan.getInputGroups()) { | ||
LOG.info("Execute clustering plan for instant {} as {} file slices", clusteringInstantTime, clusteringGroup.getSlices().size()); | ||
output.collect(new StreamRecord<>( | ||
new ClusteringPlanEvent(clusteringInstantTime, ClusteringGroupInfo.create(clusteringGroup), clusteringPlan.getStrategy().getStrategyParams()) | ||
new ClusteringPlanEvent(clusteringInstantTime, ClusteringGroupInfo.create(clusteringGroup), clusteringPlan.getStrategy().getStrategyParams(), operationIndex) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need a hash map like in this PR: https://github.com/apache/hudi/pull/11757/files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fileids of each operation in clustering plan is unique. So I think a hash map is not necessary. WDYT
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible the operation come from two different plans?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, a hash map here can only store operations of one plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why, because the plan generator can actually handle multiple plans actually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// the first instant takes the highest priority.
Option<HoodieInstant> firstRequested = Option.fromJavaOptional(
pendingClusteringInstantTimes.stream()
.filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).findFirst());
The scheduleClustering
method in ClusteringPlanOperator
only handles the first plan each time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add a hash map just like the compaction does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok.
Change Logs
related to #11757
same skew in clustering
Impact
none
Risk level (write none, low medium or high below)
none
Documentation Update
none
Contributor's checklist