-
Notifications
You must be signed in to change notification settings - Fork 25.6k
PoC - Avoid retrieving unnecessary fields on node-reduce phase #137920
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,9 +9,11 @@ | |
|
|
||
| import org.elasticsearch.index.IndexMode; | ||
| import org.elasticsearch.xpack.esql.core.expression.Attribute; | ||
| import org.elasticsearch.xpack.esql.core.expression.AttributeSet; | ||
| import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; | ||
| import org.elasticsearch.xpack.esql.core.tree.Source; | ||
| import org.elasticsearch.xpack.esql.core.util.CollectionUtils; | ||
| import org.elasticsearch.xpack.esql.expression.Order; | ||
| import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext; | ||
| import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.InsertFieldExtraction; | ||
| import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushTopNToSource; | ||
|
|
@@ -30,8 +32,10 @@ | |
| import org.elasticsearch.xpack.esql.planner.mapper.LocalMapper; | ||
| import org.elasticsearch.xpack.esql.stats.SearchStats; | ||
|
|
||
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Optional; | ||
| import java.util.Set; | ||
| import java.util.function.Function; | ||
|
|
||
| /** | ||
|
|
@@ -74,7 +78,7 @@ | |
| * The above actually reads the {@code x} field "unnecessarily", since it's only needed to conform to the output schema of the original | ||
| * plan. See #134363 for a way to optimize this little problem. | ||
| */ | ||
| class LateMaterializationPlanner { | ||
| public class LateMaterializationPlanner { | ||
| public static Optional<ReductionPlan> planReduceDriverTopN( | ||
| Function<SearchStats, LocalPhysicalOptimizerContext> contextFactory, | ||
| ExchangeSinkExec originalPlan | ||
|
|
@@ -95,8 +99,15 @@ public static Optional<ReductionPlan> planReduceDriverTopN( | |
| } | ||
|
|
||
| LocalPhysicalOptimizerContext context = contextFactory.apply(SEARCH_STATS_TOP_N_REPLACEMENT); | ||
| List<Attribute> expectedDataOutput = toPhysical(topN, context).output(); | ||
| Attribute doc = expectedDataOutput.stream().filter(EsQueryExec::isDocAttribute).findFirst().orElse(null); | ||
|
|
||
| AttributeSet expectedDataOutputAttrSet = AttributeSet.builder().addAll(topLevelProject.output()).build(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can be simplified: AttributeSet expectedDataOutputAttrSet = AttributeSet.of(topLevelProject.output()); |
||
| for (Order order : topN.order()) { | ||
| expectedDataOutputAttrSet = expectedDataOutputAttrSet.combine(order.references()); | ||
| } | ||
|
|
||
| Set<Attribute> topLevelProjectAttrs = new HashSet<>(expectedDataOutputAttrSet); | ||
| List<Attribute> physicalDataOutput = toPhysical(topN, context).output(); | ||
| Attribute doc = physicalDataOutput.stream().filter(EsQueryExec::isDocAttribute).findFirst().orElse(null); | ||
| if (doc == null) { | ||
| return Optional.empty(); | ||
| } | ||
|
|
@@ -114,8 +125,23 @@ public static Optional<ReductionPlan> planReduceDriverTopN( | |
| return Optional.empty(); | ||
| } | ||
|
|
||
| // Calculate the expected output attributes for the data driver plan. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: redundant comment (You can rename the variable to |
||
| AttributeSet.Builder expectedDataOutputAttrs = AttributeSet.builder(); | ||
| // We need to add the doc attribute to the project since otherwise when the fragment is converted to a physical plan for the data | ||
| // driver, the resulting ProjectExec won't have the doc attribute in its output, which is needed by the reduce driver. | ||
| expectedDataOutputAttrs.add(doc); | ||
| // Add all references used in the ordering | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI: this can be shortened to one line: AttributeSet orderRefsSet = AttributeSet.of(topN.order().stream().flatMap(o -> o.references().stream()).toList()); |
||
| AttributeSet.Builder orderRefs = AttributeSet.builder(); | ||
| for (Order order : topN.order()) { | ||
| orderRefs.addAll(order.references()); | ||
| } | ||
| AttributeSet orderRefsSet = orderRefs.build(); | ||
| // Get the output from the physical plan below the TopN, and filter it to only the attributes needed for the final output (either | ||
| // because they are in the top-level Project's output, or because they are needed for ordering) | ||
| expectedDataOutputAttrs.addAll( | ||
| physicalDataOutput.stream().filter(a -> topLevelProject.outputSet().contains(a) || orderRefsSet.contains(a)).toList() | ||
| ); | ||
| List<Attribute> expectedDataOutput = expectedDataOutputAttrs.build().stream().toList(); | ||
| var updatedFragment = new Project(Source.EMPTY, withAddedDocToRelation, expectedDataOutput); | ||
| FragmentExec updatedFragmentExec = fragmentExec.withFragment(updatedFragment); | ||
| ExchangeSinkExec updatedDataPlan = originalPlan.replaceChild(updatedFragmentExec); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change the visibility?