Skip to content

Commit

Permalink
Share
Browse files Browse the repository at this point in the history
  • Loading branch information
nik9000 committed Jul 9, 2024
1 parent 141a63f commit d0dc736
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.DateTimeArithmeticOperation;
import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.EsqlArithmeticOperation;
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.In;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.Drop;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
Expand All @@ -75,6 +74,7 @@
import org.elasticsearch.xpack.esql.plan.logical.MvExpand;
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.Rename;
import org.elasticsearch.xpack.esql.plan.logical.Stats;
import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
Expand Down Expand Up @@ -397,12 +397,12 @@ protected LogicalPlan doRule(LogicalPlan plan) {
childrenOutput.addAll(output);
}

if (plan instanceof Aggregate agg) {
return resolveAggregate(agg, childrenOutput);
if (plan instanceof EsqlAggregate agg) {
return resolveStats(agg, childrenOutput);
}

if (plan instanceof InlineStats stats) {
return resolveInlineStats(stats, childrenOutput);
return resolveStats(stats, childrenOutput);
}

if (plan instanceof Drop d) {
Expand Down Expand Up @@ -436,61 +436,7 @@ protected LogicalPlan doRule(LogicalPlan plan) {
return plan.transformExpressionsOnly(UnresolvedAttribute.class, ua -> maybeResolveAttribute(ua, childrenOutput));
}

private LogicalPlan resolveAggregate(Aggregate a, List<Attribute> childrenOutput) {
// if the grouping is resolved but the aggs are not, use the former to resolve the latter
// e.g. STATS a ... GROUP BY a = x + 1
Holder<Boolean> changed = new Holder<>(false);
List<Expression> groupings = a.groupings();
// first resolve groupings since the aggs might refer to them
// trying to globally resolve unresolved attributes will lead to some being marked as unresolvable
if (Resolvables.resolved(groupings) == false) {
List<Expression> newGroupings = new ArrayList<>(groupings.size());
for (Expression g : groupings) {
Expression resolved = g.transformUp(UnresolvedAttribute.class, ua -> maybeResolveAttribute(ua, childrenOutput));
if (resolved != g) {
changed.set(true);
}
newGroupings.add(resolved);
}
groupings = newGroupings;
if (changed.get()) {
a = new EsqlAggregate(a.source(), a.child(), a.aggregateType(), newGroupings, a.aggregates());
changed.set(false);
}
}

if (a.expressionsResolved() == false) {
AttributeMap<Expression> resolved = new AttributeMap<>();
for (Expression e : groupings) {
Attribute attr = Expressions.attribute(e);
if (attr != null && attr.resolved()) {
resolved.put(attr, attr);
}
}
List<Attribute> resolvedList = NamedExpressions.mergeOutputAttributes(new ArrayList<>(resolved.keySet()), childrenOutput);
List<NamedExpression> newAggregates = new ArrayList<>();

for (NamedExpression aggregate : a.aggregates()) {
var agg = (NamedExpression) aggregate.transformUp(UnresolvedAttribute.class, ua -> {
Expression ne = ua;
Attribute maybeResolved = maybeResolveAttribute(ua, resolvedList);
if (maybeResolved != null) {
changed.set(true);
ne = maybeResolved;
}
return ne;
});
newAggregates.add(agg);
}

a = changed.get() ? new EsqlAggregate(a.source(), a.child(), a.aggregateType(), groupings, newAggregates) : a;
}

return a;
}

private LogicalPlan resolveInlineStats(InlineStats stats, List<Attribute> childrenOutput) {
// NOCOMMIT this is a carbon copy of above - we should be able to share code
private LogicalPlan resolveStats(Stats stats, List<Attribute> childrenOutput) {
// if the grouping is resolved but the aggs are not, use the former to resolve the latter
// e.g. STATS a ... GROUP BY a = x + 1
Holder<Boolean> changed = new Holder<>(false);
Expand All @@ -508,7 +454,7 @@ private LogicalPlan resolveInlineStats(InlineStats stats, List<Attribute> childr
}
groupings = newGroupings;
if (changed.get()) {
stats = new InlineStats(stats.source(), stats.child(), newGroupings, stats.aggregates());
stats = stats.resolve(newGroupings, stats.aggregates());
changed.set(false);
}
}
Expand Down Expand Up @@ -537,10 +483,10 @@ private LogicalPlan resolveInlineStats(InlineStats stats, List<Attribute> childr
newAggregates.add(agg);
}

stats = changed.get() ? new InlineStats(stats.source(), stats.child(), groupings, newAggregates) : stats;
stats = changed.get() ? stats.resolve(groupings, newAggregates) : stats;
}

return stats;
return (LogicalPlan) stats;
}

private LogicalPlan resolveMvExpand(MvExpand p, List<Attribute> childrenOutput) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* a proper output.
* To simplify things, the Aggregate class will be replaced with a vanilla one.
*/
public class EsqlAggregate extends Aggregate {
public class EsqlAggregate extends Aggregate implements Stats {

private List<Attribute> lazyOutput;

Expand Down Expand Up @@ -61,4 +61,9 @@ protected NodeInfo<Aggregate> info() {
public EsqlAggregate replaceChild(LogicalPlan newChild) {
return new EsqlAggregate(source(), newChild, aggregateType(), groupings(), aggregates());
}

@Override
public EsqlAggregate resolve(List<Expression> newGroupings, List<? extends NamedExpression> newAggregates) {
return new EsqlAggregate(source(), child(), aggregateType(), newGroupings, newAggregates);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;

public class InlineStats extends UnaryPlan implements NamedWriteable, Phased {
public class InlineStats extends UnaryPlan implements NamedWriteable, Phased, Stats {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
InlineStats.class,
"InlineStats",
Expand Down Expand Up @@ -87,10 +87,17 @@ public InlineStats replaceChild(LogicalPlan newChild) {
return new InlineStats(source(), newChild, groupings, aggregates);
}

@Override
public InlineStats resolve(List<Expression> newGroupings, List<? extends NamedExpression> newAggregates) {
return new InlineStats(source(), child(), newGroupings, newAggregates);
}

@Override
public List<Expression> groupings() {
return groupings;
}

@Override
public List<? extends NamedExpression> aggregates() {
return aggregates;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ static LogicalPlan applyResultsFromFirstPhase(LogicalPlan plan, List<Attribute>
assert plan.analyzed();
Holder<Boolean> seen = new Holder<>(false);
LogicalPlan applied = plan.transformUp(logicalPlan -> {
// NOCOMMIT make sure this stops after the first one.
if (seen.get() == false && logicalPlan instanceof Phased phased) {
seen.set(true);
return phased.nextPhase(schema, result);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.plan.logical;

import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;

import java.util.List;

/**
* STATS-like operations. List {@link Aggregate} and {@link InlineStats}.
*/
public interface Stats {
/**
* Rebuild this plan with new groupings and new aggregates.
*/
Stats resolve(List<Expression> newGroupings, List<? extends NamedExpression> newAggregates);

/**
* Have all the expressions in this plan been resolved?
*/
boolean expressionsResolved();

/**
* List containing both the aggregate expressions and grouping expressions.
*/
List<? extends NamedExpression> aggregates();

/**
* List containing just the grouping expressions.
*/
List<Expression> groupings();

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.expression.function.scalar.math.Log;

import java.util.List;
import java.util.Map;
Expand Down

0 comments on commit d0dc736

Please sign in to comment.