Skip to content

Commit

Permalink
ESQL: Rework sequence for analyzing queries
Browse files Browse the repository at this point in the history
In service of the incoming INLINESTATS this flips the ordering of
analysis. Previously we made the entire sequence of analyze, optimize,
convert to physical plan, and optimize in a single async sequence in
`EsqlSession`. This flips it so `analyze` comes first in it's own async
sequence and then runs the remaining stuff in a separate sequence.
That's nice for INLINESTATS where we want to analyze one time, and then
run many runs of the extra sequence.

While we're here, we also take that sequence call it directly from the
CsvTests. That works well because that final sequence is exactly what
CsvTests have to do. They "analyze" totally differently, but they run
the final sequence in the same way.
  • Loading branch information
nik9000 committed Jul 7, 2024
1 parent 27b1779 commit 50371eb
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 164 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import java.util.stream.Collectors;

import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.xpack.esql.core.util.ActionListeners.map;
import static org.elasticsearch.xpack.esql.core.util.StringUtils.WILDCARD;

public class EsqlSession {
Expand Down Expand Up @@ -111,33 +110,29 @@ public String sessionId() {
return sessionId;
}

/**
* Execute an ESQL request.
*/
public void execute(
EsqlQueryRequest request,
BiConsumer<PhysicalPlan, ActionListener<Result>> runPhase,
ActionListener<Result> listener
) {
LOGGER.debug("ESQL query:\n{}", request.query());
LogicalPlan logicalPlan = parse(request.query(), request.params());
logicalPlanToPhysicalPlan(logicalPlan, request, listener.delegateFailureAndWrap((l, r) -> runPhase.accept(r, l)));
analyzedPlan(
parse(request.query(), request.params()),
listener.delegateFailureAndWrap((next, analyzedPlan) -> executeAnalyzedPlan(request, runPhase, analyzedPlan, next))
);
}

private void logicalPlanToPhysicalPlan(LogicalPlan logicalPlan, EsqlQueryRequest request, ActionListener<PhysicalPlan> listener) {
optimizedPhysicalPlan(
logicalPlan,
listener.map(plan -> EstimatesRowSize.estimateRowSize(0, plan.transformUp(FragmentExec.class, f -> {
QueryBuilder filter = request.filter();
if (filter != null) {
var fragmentFilter = f.esFilter();
// TODO: have an ESFilter and push down to EsQueryExec / EsSource
// This is an ugly hack to push the filter parameter to Lucene
// TODO: filter integration testing
filter = fragmentFilter != null ? boolQuery().filter(fragmentFilter).must(filter) : filter;
LOGGER.debug("Fold filter {} to EsQueryExec", filter);
f = f.withFilter(filter);
}
return f;
})))
);
public void executeAnalyzedPlan(
EsqlQueryRequest request,
BiConsumer<PhysicalPlan, ActionListener<Result>> runPhase,
LogicalPlan analyzedPlan,
ActionListener<Result> listener
) {
// TODO phased execution lands here.
runPhase.accept(logicalPlanToPhysicalPlan(analyzedPlan, request), listener);
}

private LogicalPlan parse(String query, QueryParams params) {
Expand All @@ -155,6 +150,7 @@ public void analyzedPlan(LogicalPlan parsed, ActionListener<LogicalPlan> listene
preAnalyze(parsed, (indices, policies) -> {
Analyzer analyzer = new Analyzer(new AnalyzerContext(configuration, functionRegistry, indices, policies), verifier);
var plan = analyzer.analyze(parsed);
plan.setAnalyzed();
LOGGER.debug("Analyzed plan:\n{}", plan);
return plan;
}, listener);
Expand Down Expand Up @@ -315,28 +311,41 @@ private static Set<String> subfields(Set<String> names) {
return names.stream().filter(name -> name.endsWith(WILDCARD) == false).map(name -> name + ".*").collect(Collectors.toSet());
}

public void optimizedPlan(LogicalPlan logicalPlan, ActionListener<LogicalPlan> listener) {
analyzedPlan(logicalPlan, map(listener, p -> {
var plan = logicalPlanOptimizer.optimize(p);
LOGGER.debug("Optimized logicalPlan plan:\n{}", plan);
return plan;
}));
private PhysicalPlan logicalPlanToPhysicalPlan(LogicalPlan logicalPlan, EsqlQueryRequest request) {
PhysicalPlan physicalPlan = optimizedPhysicalPlan(logicalPlan);
physicalPlan = physicalPlan.transformUp(FragmentExec.class, f -> {
QueryBuilder filter = request.filter();
if (filter != null) {
var fragmentFilter = f.esFilter();
// TODO: have an ESFilter and push down to EsQueryExec / EsSource
// This is an ugly hack to push the filter parameter to Lucene
// TODO: filter integration testing
filter = fragmentFilter != null ? boolQuery().filter(fragmentFilter).must(filter) : filter;
LOGGER.debug("Fold filter {} to EsQueryExec", filter);
f = f.withFilter(filter);
}
return f;
});
return EstimatesRowSize.estimateRowSize(0, physicalPlan);
}

public void physicalPlan(LogicalPlan optimized, ActionListener<PhysicalPlan> listener) {
optimizedPlan(optimized, map(listener, p -> {
var plan = mapper.map(p);
LOGGER.debug("Physical plan:\n{}", plan);
return plan;
}));
public LogicalPlan optimizedPlan(LogicalPlan logicalPlan) {
assert logicalPlan.analyzed();
var plan = logicalPlanOptimizer.optimize(logicalPlan);
LOGGER.debug("Optimized logicalPlan plan:\n{}", plan);
return plan;
}

public void optimizedPhysicalPlan(LogicalPlan logicalPlan, ActionListener<PhysicalPlan> listener) {
physicalPlan(logicalPlan, map(listener, p -> {
var plan = physicalPlanOptimizer.optimize(p);
LOGGER.debug("Optimized physical plan:\n{}", plan);
return plan;
}));
public PhysicalPlan physicalPlan(LogicalPlan logicalPlan) {
var plan = mapper.map(optimizedPlan(logicalPlan));
LOGGER.debug("Physical plan:\n{}", plan);
return plan;
}

public PhysicalPlan optimizedPhysicalPlan(LogicalPlan logicalPlan) {
var plan = physicalPlanOptimizer.optimize(physicalPlan(logicalPlan));
LOGGER.debug("Optimized physical plan:\n{}", plan);
return plan;
}

public static InvalidMappedField specificValidity(String fieldName, Map<String, FieldCapabilities> types) {
Expand Down
Loading

0 comments on commit 50371eb

Please sign in to comment.