diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadPlanInfoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadPlanInfoCollector.java index b14f03af102391..359691ee827861 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadPlanInfoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadPlanInfoCollector.java @@ -37,7 +37,6 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; -import org.apache.doris.common.util.FileFormatConstants; import org.apache.doris.info.PartitionNamesInfo; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.StatementContext; @@ -63,6 +62,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.logical.LogicalPostProject; import org.apache.doris.nereids.trees.plans.logical.LogicalPreFilter; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor; @@ -220,16 +220,6 @@ public TFileScanRangeParams toFileScanRangeParams(TUniqueId loadId, NereidsFileG return params; } - - private String getHeaderType(String formatType) { - if (formatType != null) { - if (formatType.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV_WITH_NAMES) - || formatType.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES)) { - return formatType; - } - } - return ""; - } } private LoadPlanInfo loadPlanInfo; @@ -347,38 +337,6 @@ public Void visitLogicalProject(LogicalProject logicalProject, P } } - // For Broker load with multiple file groups, all file groups share the same destTuple. - // Create slots for destTuple only when processing the first file group (when slots are empty). - // Subsequent file groups will reuse the slots created by the first file group. - if (loadPlanInfo.destTuple.getSlots().isEmpty()) { - List slotList = outputs.stream().map(NamedExpression::toSlot).collect(Collectors.toList()); - - // ignore projectList's nullability and set the expr's nullable info same as - // dest table column - // why do this? looks like be works in this way... - // and we have to do some extra work in visitLogicalFilter because this ood - // behavior - int size = slotList.size(); - List newSlotList = new ArrayList<>(size); - for (int i = 0; i < size; ++i) { - SlotReference slot = (SlotReference) slotList.get(i); - Column col = destTable.getColumn(slot.getName()); - if (col != null) { - slot = slot.withColumn(col); - if (col.isAutoInc()) { - newSlotList.add(slot.withNullable(true)); - } else { - newSlotList.add(slot.withNullable(col.isAllowNull())); - } - } else { - newSlotList.add(slot); - } - } - - for (Slot slot : newSlotList) { - context.createSlotDesc(loadPlanInfo.destTuple, (SlotReference) slot, destTable); - } - } List slotDescriptorList = loadPlanInfo.destTuple.getSlots(); loadPlanInfo.destSlotIdToExprMap = Maps.newHashMap(); for (int i = 0; i < slotDescriptorList.size(); ++i) { @@ -400,16 +358,35 @@ public Void visitLogicalProject(LogicalProject logicalProject, P return null; } + @Override + public Void visitLogicalPostProject(LogicalPostProject logicalPostProject, + PlanTranslatorContext context) { + List outputs = logicalPostProject.getOutputs(); + for (NamedExpression expr : outputs) { + if (expr.containsType(AggregateFunction.class)) { + throw new AnalysisException("Don't support aggregation function in load expression"); + } + } + + // For Broker load with multiple file groups, all file groups share the same destTuple. + // Create slots for destTuple only when processing the first file group (when slots are empty). + // Subsequent file groups will reuse the slots created by the first file group. + if (loadPlanInfo.destTuple.getSlots().isEmpty()) { + List slotList = outputs.stream().map(NamedExpression::toSlot).collect(Collectors.toList()); + for (Slot slot : slotList) { + context.createSlotDesc(loadPlanInfo.destTuple, (SlotReference) slot, destTable); + } + } + logicalPostProject.child().accept(this, context); + return null; + } + @Override public Void visitLogicalFilter(LogicalFilter logicalFilter, PlanTranslatorContext context) { logicalFilter.child().accept(this, context); loadPlanInfo.postFilterExprList = new ArrayList<>(logicalFilter.getConjuncts().size()); for (Expression conjunct : logicalFilter.getConjuncts()) { Expr expr = ExpressionTranslator.translate(conjunct, context); - // in visitLogicalProject, we set project exprs nullability same as dest table columns - // the conjunct's nullability is based on project exprs, so we need clear the nullable info - // and let conjunct calculate the nullability by itself to get the correct nullable info - clearNullableFromNereidsRecursively(expr); loadPlanInfo.postFilterExprList.add(expr); } filterPredicate = logicalFilter.getPredicate(); @@ -428,19 +405,6 @@ public Void visitLogicalFilter(LogicalFilter logicalFilter, Plan return null; } - /** - * Recursively clear nullable info from expression and all its children - */ - private void clearNullableFromNereidsRecursively(Expr expr) { - if (expr == null) { - return; - } - expr.clearNullableFromNereids(); - for (Expr child : expr.getChildren()) { - clearNullableFromNereidsRecursively(child); - } - } - @Override public Void visitLogicalPreFilter(LogicalPreFilter logicalPreFilter, PlanTranslatorContext context) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadUtils.java index 0afd0bf04f2dec..b52415d4f75bf6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadUtils.java @@ -19,6 +19,7 @@ import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; import org.apache.doris.common.UserException; import org.apache.doris.info.PartitionNamesInfo; @@ -42,6 +43,7 @@ import org.apache.doris.nereids.trees.expressions.Alias; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator; import org.apache.doris.nereids.trees.expressions.functions.scalar.JsonbParseErrorToNull; @@ -54,6 +56,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.logical.LogicalPostProject; import org.apache.doris.nereids.trees.plans.logical.LogicalPreFilter; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor; @@ -199,13 +202,18 @@ public static LogicalPlan createLoadPlan(NereidsFileGroupInfo fileGroupInfo, Par CascadesContext cascadesContext = CascadesContext.initContext(new StatementContext(), currentRootPlan, PhysicalProperties.ANY); ConnectContext ctx = cascadesContext.getConnectContext(); + // we force convert nullable column to non-nullable column for load + // so set feDebug to false to avoid AdjustNullableRule report error + boolean oldFeDebugValue = ctx.getSessionVariable().feDebug; try { ctx.getSessionVariable().setDebugSkipFoldConstant(true); + ctx.getSessionVariable().feDebug = false; Analyzer.buildCustomAnalyzer(cascadesContext, ImmutableList.of(Analyzer.bottomUp( new BindExpression(), new LoadProjectRewrite(fileGroupInfo.getTargetTable()), new BindSink(false), + new AddPostProject(), new AddPostFilter( context.fileGroup.getWhereExpr() ), @@ -236,6 +244,7 @@ public static LogicalPlan createLoadPlan(NereidsFileGroupInfo fileGroupInfo, Par throw new UserException(exception.getMessage()); } finally { ctx.getSessionVariable().setDebugSkipFoldConstant(false); + ctx.getSessionVariable().feDebug = oldFeDebugValue; } return (LogicalPlan) cascadesContext.getRewritePlan(); @@ -324,4 +333,43 @@ public Rule build() { }).toRule(RuleType.ADD_POST_FILTER_FOR_LOAD); } } + + /** AddPostProject + * The BindSink rule will produce the final project list for load, we need cast the outputs according to + * dest table's schema + * */ + private static class AddPostProject extends OneRewriteRuleFactory { + public AddPostProject() { + } + + @Override + public Rule build() { + return logicalOlapTableSink().whenNot(plan -> plan.child() instanceof LogicalPostProject + || plan.child() instanceof LogicalFilter).thenApply(ctx -> { + LogicalOlapTableSink logicalOlapTableSink = ctx.root; + LogicalPlan childPlan = (LogicalPlan) logicalOlapTableSink.child(); + List childOutputs = childPlan.getOutput(); + OlapTable destTable = logicalOlapTableSink.getTargetTable(); + int size = childOutputs.size(); + List projectList = new ArrayList<>(size); + for (int i = 0; i < size; ++i) { + SlotReference slot = (SlotReference) childOutputs.get(i); + Column col = destTable.getColumn(slot.getName()); + if (col != null) { + slot = slot.withColumn(col); + if (col.isAutoInc()) { + projectList.add(slot.withNullable(true)); + } else { + projectList.add(slot.withNullable(col.isAllowNull())); + } + } else { + projectList.add(slot); + } + } + return logicalOlapTableSink.withChildren( + Lists.newArrayList( + new LogicalPostProject(projectList, (Plan) logicalOlapTableSink.child(0)))); + }).toRule(RuleType.ADD_POST_PROJECT_FOR_LOAD); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java index f1b4c3e91f2160..7b32bb710e0383 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java @@ -262,6 +262,8 @@ public enum RuleType { REWRITE_LOAD_PROJECT_FOR_STREAM_LOAD(RuleTypeClass.REWRITE), // add post filter node for load ADD_POST_FILTER_FOR_LOAD(RuleTypeClass.REWRITE), + // add post project node for load + ADD_POST_PROJECT_FOR_LOAD(RuleTypeClass.REWRITE), // Merge Consecutive plan MERGE_PROJECTS(RuleTypeClass.REWRITE), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalPostProject.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalPostProject.java new file mode 100644 index 00000000000000..e3d1ad85296f91 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalPostProject.java @@ -0,0 +1,234 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.logical; + +import org.apache.doris.nereids.analyzer.Unbound; +import org.apache.doris.nereids.analyzer.UnboundStar; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.DataTrait; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.trees.expressions.Alias; +import org.apache.doris.nereids.trees.expressions.BoundStar; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.functions.NoneMovableFunction; +import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.algebra.Project; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.util.Utils; + +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableList.Builder; +import com.google.common.collect.ImmutableSet; +import org.json.JSONObject; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +/** + * Logical post project only use for load plan. + */ +public class LogicalPostProject extends LogicalUnary + implements Project, OutputPrunable { + + private final List projects; + private final Supplier> projectsSet; + private final boolean isDistinct; + + public LogicalPostProject(List projects, CHILD_TYPE child) { + this(projects, false, ImmutableList.of(child)); + } + + public LogicalPostProject(List projects, boolean isDistinct, List child) { + this(projects, isDistinct, Optional.empty(), Optional.empty(), child); + } + + public LogicalPostProject(List projects, boolean isDistinct, Plan child) { + this(projects, isDistinct, + Optional.empty(), Optional.empty(), ImmutableList.of(child)); + } + + private LogicalPostProject(List projects, boolean isDistinct, + Optional groupExpression, Optional logicalProperties, + List child) { + super(PlanType.LOGICAL_PROJECT, groupExpression, logicalProperties, child); + Preconditions.checkArgument(projects != null, "projects can not be null"); + // only ColumnPrune rule may produce empty projects, this happens in rewrite phase + // so if projects is empty, all plans have been bound already. + Preconditions.checkArgument(!projects.isEmpty() || !(child instanceof Unbound), + "projects can not be empty when child plan is unbound"); + this.projects = projects.isEmpty() + ? ImmutableList.of(new Alias(new TinyIntLiteral((byte) 1))) + : projects; + this.projectsSet = Suppliers.memoize(() -> ImmutableSet.copyOf(this.projects)); + this.isDistinct = isDistinct; + } + + /** + * Get project list. + * + * @return all project of this node. + */ + @Override + public List getProjects() { + return projects; + } + + @Override + public List computeOutput() { + Builder slots = ImmutableList.builderWithExpectedSize(projects.size()); + for (NamedExpression project : projects) { + slots.add(project.toSlot()); + } + return slots.build(); + } + + @Override + public String toString() { + return Utils.toSqlString("LogicalPostProject[" + id.asInt() + "]", + "distinct", isDistinct, + "projects", projects); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitLogicalPostProject(this, context); + } + + @Override + public List getExpressions() { + return projects; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + LogicalPostProject that = (LogicalPostProject) o; + boolean equal = projectsSet.get().equals(that.projectsSet.get()) + && isDistinct == that.isDistinct; + // TODO: should add exprId for UnBoundStar and BoundStar for equality comparison + if (!projects.isEmpty() && (projects.get(0) instanceof UnboundStar || projects.get(0) instanceof BoundStar)) { + equal = child().getLogicalProperties().equals(that.child().getLogicalProperties()); + } + return equal; + } + + @Override + public int hashCode() { + return Objects.hash(projectsSet.get(), isDistinct); + } + + @Override + public LogicalPostProject withChildren(List children) { + Preconditions.checkArgument(children.size() == 1); + return new LogicalPostProject<>(projects, isDistinct, Utils.fastToImmutableList(children)); + } + + @Override + public LogicalPostProject withGroupExpression(Optional groupExpression) { + return new LogicalPostProject<>(projects, isDistinct, + groupExpression, Optional.of(getLogicalProperties()), children); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional groupExpression, + Optional logicalProperties, List children) { + Preconditions.checkArgument(children.size() == 1); + return new LogicalPostProject<>(projects, isDistinct, + groupExpression, logicalProperties, children); + } + + public LogicalPostProject withProjects(List projects) { + return new LogicalPostProject<>(projects, isDistinct, children); + } + + public LogicalPostProject withProjectsAndChild(List projects, Plan child) { + return new LogicalPostProject<>(projects, isDistinct, ImmutableList.of(child)); + } + + public LogicalPostProject withDistinct(boolean isDistinct) { + return new LogicalPostProject<>(projects, isDistinct, children); + } + + public boolean isDistinct() { + return isDistinct; + } + + @Override + public List getOutputs() { + return projects; + } + + @Override + public Plan pruneOutputs(List prunedOutputs) { + List allProjects = new ArrayList<>(prunedOutputs); + for (NamedExpression expression : projects) { + if (expression.containsType(NoneMovableFunction.class)) { + if (!prunedOutputs.contains(expression)) { + allProjects.add(expression); + } + } + } + return withProjects(allProjects); + } + + @Override + public JSONObject toJson() { + JSONObject logicalProject = super.toJson(); + JSONObject properties = new JSONObject(); + properties.put("Projects", projects.toString()); + properties.put("IsDistinct", isDistinct); + logicalProject.put("Properties", properties); + return logicalProject; + } + + @Override + public void computeUnique(DataTrait.Builder builder) { + } + + @Override + public void computeUniform(DataTrait.Builder builder) { + } + + @Override + public void computeEqualSet(DataTrait.Builder builder) { + } + + @Override + public void computeFd(DataTrait.Builder builder) { + } + + @Override + public boolean canProcessProject(List parentProjects) { + return false; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java index 291f9ed5d80c0a..2a1c8c4dc59dd6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java @@ -40,6 +40,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalLimit; import org.apache.doris.nereids.trees.plans.logical.LogicalLoadProject; import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN; +import org.apache.doris.nereids.trees.plans.logical.LogicalPostProject; import org.apache.doris.nereids.trees.plans.logical.LogicalPreAggOnHint; import org.apache.doris.nereids.trees.plans.logical.LogicalPreFilter; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; @@ -227,6 +228,10 @@ public R visitLogicalLoadProject(LogicalLoadProject project, C c return visit(project, context); } + public R visitLogicalPostProject(LogicalPostProject project, C context) { + return visit(project, context); + } + public R visitLogicalRepeat(LogicalRepeat repeat, C context) { return visit(repeat, context); }