Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.FileFormatConstants;
import org.apache.doris.info.PartitionNamesInfo;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.StatementContext;
Expand All @@ -63,6 +62,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalPostProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalPreFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor;
Expand Down Expand Up @@ -220,16 +220,6 @@ public TFileScanRangeParams toFileScanRangeParams(TUniqueId loadId, NereidsFileG

return params;
}

private String getHeaderType(String formatType) {
if (formatType != null) {
if (formatType.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV_WITH_NAMES)
|| formatType.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES)) {
return formatType;
}
}
return "";
}
}

private LoadPlanInfo loadPlanInfo;
Expand Down Expand Up @@ -347,38 +337,6 @@ public Void visitLogicalProject(LogicalProject<? extends Plan> logicalProject, P
}
}

// For Broker load with multiple file groups, all file groups share the same destTuple.
// Create slots for destTuple only when processing the first file group (when slots are empty).
// Subsequent file groups will reuse the slots created by the first file group.
if (loadPlanInfo.destTuple.getSlots().isEmpty()) {
List<Slot> slotList = outputs.stream().map(NamedExpression::toSlot).collect(Collectors.toList());

// ignore projectList's nullability and set the expr's nullable info same as
// dest table column
// why do this? looks like be works in this way...
// and we have to do some extra work in visitLogicalFilter because this ood
// behavior
int size = slotList.size();
List<Slot> newSlotList = new ArrayList<>(size);
for (int i = 0; i < size; ++i) {
SlotReference slot = (SlotReference) slotList.get(i);
Column col = destTable.getColumn(slot.getName());
if (col != null) {
slot = slot.withColumn(col);
if (col.isAutoInc()) {
newSlotList.add(slot.withNullable(true));
} else {
newSlotList.add(slot.withNullable(col.isAllowNull()));
}
} else {
newSlotList.add(slot);
}
}

for (Slot slot : newSlotList) {
context.createSlotDesc(loadPlanInfo.destTuple, (SlotReference) slot, destTable);
}
}
List<SlotDescriptor> slotDescriptorList = loadPlanInfo.destTuple.getSlots();
loadPlanInfo.destSlotIdToExprMap = Maps.newHashMap();
for (int i = 0; i < slotDescriptorList.size(); ++i) {
Expand All @@ -400,16 +358,35 @@ public Void visitLogicalProject(LogicalProject<? extends Plan> logicalProject, P
return null;
}

@Override
public Void visitLogicalPostProject(LogicalPostProject<? extends Plan> logicalPostProject,
PlanTranslatorContext context) {
List<NamedExpression> outputs = logicalPostProject.getOutputs();
for (NamedExpression expr : outputs) {
if (expr.containsType(AggregateFunction.class)) {
throw new AnalysisException("Don't support aggregation function in load expression");
}
}

// For Broker load with multiple file groups, all file groups share the same destTuple.
// Create slots for destTuple only when processing the first file group (when slots are empty).
// Subsequent file groups will reuse the slots created by the first file group.
if (loadPlanInfo.destTuple.getSlots().isEmpty()) {
List<Slot> slotList = outputs.stream().map(NamedExpression::toSlot).collect(Collectors.toList());
for (Slot slot : slotList) {
context.createSlotDesc(loadPlanInfo.destTuple, (SlotReference) slot, destTable);
}
}
logicalPostProject.child().accept(this, context);
return null;
}

@Override
public Void visitLogicalFilter(LogicalFilter<? extends Plan> logicalFilter, PlanTranslatorContext context) {
logicalFilter.child().accept(this, context);
loadPlanInfo.postFilterExprList = new ArrayList<>(logicalFilter.getConjuncts().size());
for (Expression conjunct : logicalFilter.getConjuncts()) {
Expr expr = ExpressionTranslator.translate(conjunct, context);
// in visitLogicalProject, we set project exprs nullability same as dest table columns
// the conjunct's nullability is based on project exprs, so we need clear the nullable info
// and let conjunct calculate the nullability by itself to get the correct nullable info
clearNullableFromNereidsRecursively(expr);
loadPlanInfo.postFilterExprList.add(expr);
}
filterPredicate = logicalFilter.getPredicate();
Expand All @@ -428,19 +405,6 @@ public Void visitLogicalFilter(LogicalFilter<? extends Plan> logicalFilter, Plan
return null;
}

/**
* Recursively clear nullable info from expression and all its children
*/
private void clearNullableFromNereidsRecursively(Expr expr) {
if (expr == null) {
return;
}
expr.clearNullableFromNereids();
for (Expr child : expr.getChildren()) {
clearNullableFromNereidsRecursively(child);
}
}

@Override
public Void visitLogicalPreFilter(LogicalPreFilter<? extends Plan> logicalPreFilter,
PlanTranslatorContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.UserException;
import org.apache.doris.info.PartitionNamesInfo;
Expand All @@ -42,6 +43,7 @@
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
import org.apache.doris.nereids.trees.expressions.functions.scalar.JsonbParseErrorToNull;
Expand All @@ -54,6 +56,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalPostProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalPreFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor;
Expand Down Expand Up @@ -199,13 +202,18 @@ public static LogicalPlan createLoadPlan(NereidsFileGroupInfo fileGroupInfo, Par
CascadesContext cascadesContext = CascadesContext.initContext(new StatementContext(), currentRootPlan,
PhysicalProperties.ANY);
ConnectContext ctx = cascadesContext.getConnectContext();
// we force convert nullable column to non-nullable column for load
// so set feDebug to false to avoid AdjustNullableRule report error
boolean oldFeDebugValue = ctx.getSessionVariable().feDebug;
try {
ctx.getSessionVariable().setDebugSkipFoldConstant(true);
ctx.getSessionVariable().feDebug = false;

Analyzer.buildCustomAnalyzer(cascadesContext, ImmutableList.of(Analyzer.bottomUp(
new BindExpression(),
new LoadProjectRewrite(fileGroupInfo.getTargetTable()),
new BindSink(false),
new AddPostProject(),
new AddPostFilter(
context.fileGroup.getWhereExpr()
),
Expand Down Expand Up @@ -236,6 +244,7 @@ public static LogicalPlan createLoadPlan(NereidsFileGroupInfo fileGroupInfo, Par
throw new UserException(exception.getMessage());
} finally {
ctx.getSessionVariable().setDebugSkipFoldConstant(false);
ctx.getSessionVariable().feDebug = oldFeDebugValue;
}

return (LogicalPlan) cascadesContext.getRewritePlan();
Expand Down Expand Up @@ -324,4 +333,43 @@ public Rule build() {
}).toRule(RuleType.ADD_POST_FILTER_FOR_LOAD);
}
}

/** AddPostProject
* The BindSink rule will produce the final project list for load, we need cast the outputs according to
* dest table's schema
* */
private static class AddPostProject extends OneRewriteRuleFactory {
public AddPostProject() {
}

@Override
public Rule build() {
return logicalOlapTableSink().whenNot(plan -> plan.child() instanceof LogicalPostProject
|| plan.child() instanceof LogicalFilter).thenApply(ctx -> {
LogicalOlapTableSink logicalOlapTableSink = ctx.root;
LogicalPlan childPlan = (LogicalPlan) logicalOlapTableSink.child();
List<Slot> childOutputs = childPlan.getOutput();
OlapTable destTable = logicalOlapTableSink.getTargetTable();
int size = childOutputs.size();
List<SlotReference> projectList = new ArrayList<>(size);
for (int i = 0; i < size; ++i) {
SlotReference slot = (SlotReference) childOutputs.get(i);
Column col = destTable.getColumn(slot.getName());
if (col != null) {
slot = slot.withColumn(col);
if (col.isAutoInc()) {
projectList.add(slot.withNullable(true));
} else {
projectList.add(slot.withNullable(col.isAllowNull()));
}
} else {
projectList.add(slot);
}
}
return logicalOlapTableSink.withChildren(
Lists.newArrayList(
new LogicalPostProject(projectList, (Plan) logicalOlapTableSink.child(0))));
}).toRule(RuleType.ADD_POST_PROJECT_FOR_LOAD);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ public enum RuleType {
REWRITE_LOAD_PROJECT_FOR_STREAM_LOAD(RuleTypeClass.REWRITE),
// add post filter node for load
ADD_POST_FILTER_FOR_LOAD(RuleTypeClass.REWRITE),
// add post project node for load
ADD_POST_PROJECT_FOR_LOAD(RuleTypeClass.REWRITE),

// Merge Consecutive plan
MERGE_PROJECTS(RuleTypeClass.REWRITE),
Expand Down
Loading
Loading