Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into sleep-case
Browse files Browse the repository at this point in the history
# Conflicts:
#	regression-test/suites/job_p0/test_base_insert_job.groovy
  • Loading branch information
CalvinKirs committed Mar 22, 2024
2 parents 962b3ca + 7bb1d79 commit 374b76a
Show file tree
Hide file tree
Showing 17 changed files with 657 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ public class JdbcDataSourceConfig {
private int batchSize;
private TJdbcOperation op;
private TOdbcTableType tableType;
private int connectionPoolMinSize;
private int connectionPoolMaxSize;
private int connectionPoolMaxWaitTime;
private int connectionPoolMaxLifeTime;
private boolean connectionPoolKeepAlive;
private int connectionPoolMinSize = 1;
private int connectionPoolMaxSize = 10;
private int connectionPoolMaxWaitTime = 5000;
private int connectionPoolMaxLifeTime = 1800000;
private boolean connectionPoolKeepAlive = false;

public String createCacheKey() {
return catalogId + jdbcUrl + jdbcUser + jdbcPassword + jdbcDriverUrl + jdbcDriverClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,9 @@ private void cancelInternal() {
}
}

// meta info and job info not need save in log when cancel, we need to clean them here
backupMeta = null;
jobInfo = null;
releaseSnapshots();
snapshotInfos.clear();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ public class FunctionRegistry {
// to record the global alias function and other udf.
private static final String GLOBAL_FUNCTION = "__GLOBAL_FUNCTION__";

private final Map<String, List<FunctionBuilder>> name2InternalBuiltinBuilders;
private final Map<String, List<FunctionBuilder>> name2BuiltinBuilders;
private final Map<String, Map<String, List<FunctionBuilder>>> name2UdfBuilders;

public FunctionRegistry() {
name2InternalBuiltinBuilders = new ConcurrentHashMap<>();
name2BuiltinBuilders = new ConcurrentHashMap<>();
name2UdfBuilders = new ConcurrentHashMap<>();
registerBuiltinFunctions(name2InternalBuiltinBuilders);
afterRegisterBuiltinFunctions(name2InternalBuiltinBuilders);
registerBuiltinFunctions(name2BuiltinBuilders);
afterRegisterBuiltinFunctions(name2BuiltinBuilders);
}

// this function is used to test.
Expand All @@ -78,12 +78,33 @@ public FunctionBuilder findFunctionBuilder(String name, Object argument) {
}

public Optional<List<FunctionBuilder>> tryGetBuiltinBuilders(String name) {
List<FunctionBuilder> builders = name2InternalBuiltinBuilders.get(name);
return name2InternalBuiltinBuilders.get(name) == null
List<FunctionBuilder> builders = name2BuiltinBuilders.get(name);
return name2BuiltinBuilders.get(name) == null
? Optional.empty()
: Optional.of(ImmutableList.copyOf(builders));
}

public boolean isAggregateFunction(String dbName, String name) {
name = name.toLowerCase();
Class<?> aggClass = org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction.class;
if (StringUtils.isEmpty(dbName)) {
List<FunctionBuilder> functionBuilders = name2BuiltinBuilders.get(name);
for (FunctionBuilder functionBuilder : functionBuilders) {
if (aggClass.isAssignableFrom(functionBuilder.functionClass())) {
return true;
}
}
}

List<FunctionBuilder> udfBuilders = findUdfBuilder(dbName, name);
for (FunctionBuilder udfBuilder : udfBuilders) {
if (aggClass.isAssignableFrom(udfBuilder.functionClass())) {
return true;
}
}
return false;
}

// currently we only find function by name and arity and args' types.
public FunctionBuilder findFunctionBuilder(String dbName, String name, List<?> arguments) {
List<FunctionBuilder> functionBuilders = null;
Expand All @@ -92,11 +113,11 @@ public FunctionBuilder findFunctionBuilder(String dbName, String name, List<?> a

if (StringUtils.isEmpty(dbName)) {
// search internal function only if dbName is empty
functionBuilders = name2InternalBuiltinBuilders.get(name.toLowerCase());
functionBuilders = name2BuiltinBuilders.get(name.toLowerCase());
if (CollectionUtils.isEmpty(functionBuilders) && AggCombinerFunctionBuilder.isAggStateCombinator(name)) {
String nestedName = AggCombinerFunctionBuilder.getNestedName(name);
String combinatorSuffix = AggCombinerFunctionBuilder.getCombinatorSuffix(name);
functionBuilders = name2InternalBuiltinBuilders.get(nestedName.toLowerCase());
functionBuilders = name2BuiltinBuilders.get(nestedName.toLowerCase());
if (functionBuilders != null) {
List<FunctionBuilder> candidateBuilders = Lists.newArrayListWithCapacity(functionBuilders.size());
for (FunctionBuilder functionBuilder : functionBuilders) {
Expand Down Expand Up @@ -199,8 +220,8 @@ public void dropUdf(String dbName, String name, List<DataType> argTypes) {
}
synchronized (name2UdfBuilders) {
Map<String, List<FunctionBuilder>> builders = name2UdfBuilders.getOrDefault(dbName, ImmutableMap.of());
builders.getOrDefault(name, Lists.newArrayList()).removeIf(builder -> ((UdfBuilder) builder).getArgTypes()
.equals(argTypes));
builders.getOrDefault(name, Lists.newArrayList())
.removeIf(builder -> ((UdfBuilder) builder).getArgTypes().equals(argTypes));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1166,7 +1166,8 @@ public void afterVisible(TransactionState txnState, boolean txnOperated) {
return;
}
RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get();
if (routineLoadTaskInfo.getTxnStatus() != TransactionStatus.COMMITTED) {
if (routineLoadTaskInfo.getTxnStatus() != TransactionStatus.COMMITTED
&& routineLoadTaskInfo.getTxnStatus() != TransactionStatus.VISIBLE) {
// TODO(cmy): Normally, this should not happen. But for safe reason, just pause the job
String msg = String.format(
"should not happen, we find that task %s is not COMMITTED when handling afterVisble."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FunctionRegistry;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.MappingSlot;
import org.apache.doris.nereids.analyzer.Scope;
import org.apache.doris.nereids.analyzer.UnboundFunction;
import org.apache.doris.nereids.analyzer.UnboundOneRowRelation;
import org.apache.doris.nereids.analyzer.UnboundResultSink;
import org.apache.doris.nereids.analyzer.UnboundSlot;
Expand Down Expand Up @@ -351,12 +353,12 @@ private LogicalHaving<Plan> bindHaving(MatchingContext<LogicalHaving<Plan>> ctx)
CascadesContext cascadesContext = ctx.cascadesContext;

// bind slot by child.output first
Scope defaultScope = toScope(cascadesContext, childPlan.getOutput());
Scope childOutput = toScope(cascadesContext, childPlan.getOutput());
// then bind slot by child.children.output
Supplier<Scope> backupScope = Suppliers.memoize(() ->
Supplier<Scope> childChildrenOutput = Suppliers.memoize(() ->
toScope(cascadesContext, PlanUtils.fastGetChildrenOutputs(childPlan.children()))
);
return bindHavingByScopes(having, cascadesContext, defaultScope, backupScope);
return bindHavingByScopes(having, cascadesContext, childOutput, childChildrenOutput);
}

private LogicalHaving<Plan> bindHavingAggregate(
Expand All @@ -365,13 +367,115 @@ private LogicalHaving<Plan> bindHavingAggregate(
Aggregate<Plan> aggregate = having.child();
CascadesContext cascadesContext = ctx.cascadesContext;

// having(aggregate) should bind slot by aggregate.child.output first
Scope defaultScope = toScope(cascadesContext, PlanUtils.fastGetChildrenOutputs(aggregate.children()));
// then bind slot by aggregate.output
Supplier<Scope> backupScope = Suppliers.memoize(() ->
toScope(cascadesContext, aggregate.getOutput())
);
return bindHavingByScopes(ctx.root, ctx.cascadesContext, defaultScope, backupScope);
// keep same behavior as mysql
Supplier<CustomSlotBinderAnalyzer> bindByAggChild = Suppliers.memoize(() -> {
Scope aggChildOutputScope
= toScope(cascadesContext, PlanUtils.fastGetChildrenOutputs(aggregate.children()));
return (analyzer, unboundSlot) -> analyzer.bindSlotByScope(unboundSlot, aggChildOutputScope);
});

Scope aggOutputScope = toScope(cascadesContext, aggregate.getOutput());
Supplier<CustomSlotBinderAnalyzer> bindByGroupByThenAggOutputThenAggChild = Suppliers.memoize(() -> {
List<Expression> groupByExprs = aggregate.getGroupByExpressions();
ImmutableList.Builder<Slot> groupBySlots
= ImmutableList.builderWithExpectedSize(groupByExprs.size());
for (Expression groupBy : groupByExprs) {
if (groupBy instanceof Slot) {
groupBySlots.add((Slot) groupBy);
}
}
Scope groupBySlotsScope = toScope(cascadesContext, groupBySlots.build());

Supplier<Pair<Scope, Scope>> separateAggOutputScopes = Suppliers.memoize(() -> {
ImmutableList.Builder<Slot> groupByOutputs = ImmutableList.builderWithExpectedSize(
aggregate.getOutputExpressions().size());
ImmutableList.Builder<Slot> aggFunOutputs = ImmutableList.builderWithExpectedSize(
aggregate.getOutputExpressions().size());
for (NamedExpression outputExpression : aggregate.getOutputExpressions()) {
if (outputExpression.anyMatch(AggregateFunction.class::isInstance)) {
aggFunOutputs.add(outputExpression.toSlot());
} else {
groupByOutputs.add(outputExpression.toSlot());
}
}
Scope nonAggFunSlotsScope = toScope(cascadesContext, groupByOutputs.build());
Scope aggFuncSlotsScope = toScope(cascadesContext, aggFunOutputs.build());
return Pair.of(nonAggFunSlotsScope, aggFuncSlotsScope);
});

return (analyzer, unboundSlot) -> {
List<Slot> boundInGroupBy = analyzer.bindSlotByScope(unboundSlot, groupBySlotsScope);
if (boundInGroupBy.size() == 1) {
return boundInGroupBy;
}

Pair<Scope, Scope> separateAggOutputScope = separateAggOutputScopes.get();
List<Slot> boundInNonAggFuncs = analyzer.bindSlotByScope(unboundSlot, separateAggOutputScope.first);
if (boundInNonAggFuncs.size() == 1) {
return boundInNonAggFuncs;
}

List<Slot> boundInAggFuncs = analyzer.bindSlotByScope(unboundSlot, separateAggOutputScope.second);
if (boundInAggFuncs.size() == 1) {
return boundInAggFuncs;
}

return bindByAggChild.get().bindSlot(analyzer, unboundSlot);
};
});

FunctionRegistry functionRegistry = cascadesContext.getConnectContext().getEnv().getFunctionRegistry();
ExpressionAnalyzer havingAnalyzer = new ExpressionAnalyzer(having, aggOutputScope, cascadesContext,
false, true) {
private boolean currentIsInAggregateFunction;

@Override
public Expression visitAggregateFunction(AggregateFunction aggregateFunction,
ExpressionRewriteContext context) {
if (!currentIsInAggregateFunction) {
currentIsInAggregateFunction = true;
try {
return super.visitAggregateFunction(aggregateFunction, context);
} finally {
currentIsInAggregateFunction = false;
}
} else {
return super.visitAggregateFunction(aggregateFunction, context);
}
}

@Override
public Expression visitUnboundFunction(UnboundFunction unboundFunction, ExpressionRewriteContext context) {
if (!currentIsInAggregateFunction && isAggregateFunction(unboundFunction, functionRegistry)) {
currentIsInAggregateFunction = true;
try {
return super.visitUnboundFunction(unboundFunction, context);
} finally {
currentIsInAggregateFunction = false;
}
} else {
return super.visitUnboundFunction(unboundFunction, context);
}
}

@Override
protected List<? extends Expression> bindSlotByThisScope(UnboundSlot unboundSlot) {
if (currentIsInAggregateFunction) {
return bindByAggChild.get().bindSlot(this, unboundSlot);
} else {
return bindByGroupByThenAggOutputThenAggChild.get().bindSlot(this, unboundSlot);
}
}
};

Set<Expression> havingExprs = having.getConjuncts();
ImmutableSet.Builder<Expression> analyzedHaving = ImmutableSet.builderWithExpectedSize(havingExprs.size());
ExpressionRewriteContext rewriteContext = new ExpressionRewriteContext(cascadesContext);
for (Expression expression : havingExprs) {
analyzedHaving.add(havingAnalyzer.analyze(expression, rewriteContext));
}

return new LogicalHaving<>(analyzedHaving.build(), having.child());
}

private LogicalHaving<Plan> bindHavingByScopes(
Expand Down Expand Up @@ -764,6 +868,11 @@ private void checkIfOutputAliasNameDuplicatedForGroupBy(Collection<Expression> e
}
}

private boolean isAggregateFunction(UnboundFunction unboundFunction, FunctionRegistry functionRegistry) {
return functionRegistry.isAggregateFunction(
unboundFunction.getDbName(), unboundFunction.getName());
}

private <E extends Expression> E checkBoundExceptLambda(E expression, Plan plan) {
if (expression instanceof Lambda) {
return expression;
Expand Down Expand Up @@ -797,6 +906,12 @@ private SimpleExprAnalyzer buildSimpleExprAnalyzer(
boolean enableExactMatch, boolean bindSlotInOuterScope) {
List<Slot> childrenOutputs = PlanUtils.fastGetChildrenOutputs(children);
Scope scope = toScope(cascadesContext, childrenOutputs);
return buildSimpleExprAnalyzer(currentPlan, cascadesContext, scope, enableExactMatch, bindSlotInOuterScope);
}

private SimpleExprAnalyzer buildSimpleExprAnalyzer(
Plan currentPlan, CascadesContext cascadesContext, Scope scope,
boolean enableExactMatch, boolean bindSlotInOuterScope) {
ExpressionRewriteContext rewriteContext = new ExpressionRewriteContext(cascadesContext);
ExpressionAnalyzer expressionAnalyzer = new ExpressionAnalyzer(currentPlan,
scope, cascadesContext, enableExactMatch, bindSlotInOuterScope);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy.RelatedPolicy;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalRelation;
import org.apache.doris.nereids.util.ExpressionUtils;

import com.google.common.collect.ImmutableList;

import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;

Expand Down Expand Up @@ -60,7 +62,7 @@ public List<Rule> buildRules() {
return ctx.root.child();
}
LogicalRelation relation = (LogicalRelation) child;
Set<Expression> combineFilter = new HashSet<>();
Set<Expression> combineFilter = new LinkedHashSet<>();

// replace incremental params as AND expression
if (relation instanceof LogicalFileScan) {
Expand All @@ -72,18 +74,20 @@ public List<Rule> buildRules() {
}
}

// row policy
checkPolicy.getFilter(relation, ctx.connectContext)
.ifPresent(expression -> combineFilter.addAll(
RelatedPolicy relatedPolicy = checkPolicy.findPolicy(relation, ctx.cascadesContext);
relatedPolicy.rowPolicyFilter.ifPresent(expression -> combineFilter.addAll(
ExpressionUtils.extractConjunctionToSet(expression)));

if (combineFilter.isEmpty()) {
return ctx.root.child();
}
Plan result = relation;
if (upperFilter != null) {
combineFilter.addAll(upperFilter.getConjuncts());
}
return new LogicalFilter<>(combineFilter, relation);
if (!combineFilter.isEmpty()) {
result = new LogicalFilter<>(combineFilter, relation);
}
if (relatedPolicy.dataMaskProjects.isPresent()) {
result = new LogicalProject<>(relatedPolicy.dataMaskProjects.get(), result);
}
return result;
})
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public AggCombinerFunctionBuilder(String combinatorSuffix, FunctionBuilder neste
this.nestedBuilder = Objects.requireNonNull(nestedBuilder, "nestedBuilder can not be null");
}

@Override
public Class<? extends BoundFunction> functionClass() {
return nestedBuilder.functionClass();
}

@Override
public boolean canApply(List<? extends Object> arguments) {
if (combinatorSuffix.equals(STATE) || combinatorSuffix.equals(FOREACH)) {
Expand Down
Loading

0 comments on commit 374b76a

Please sign in to comment.