From b200cf939179aefcbf39e121db1efced55f354e0 Mon Sep 17 00:00:00 2001 From: mihailoale-db Date: Thu, 6 Feb 2025 21:36:01 +0100 Subject: [PATCH] initial commmit --- ...NondeterministicExpressionCollection.scala | 42 +++++++++++++++++++ .../analysis/PullOutNondeterministic.scala | 22 ++-------- 2 files changed, 46 insertions(+), 18 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NondeterministicExpressionCollection.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NondeterministicExpressionCollection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NondeterministicExpressionCollection.scala new file mode 100644 index 0000000000000..206bba77d1e16 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NondeterministicExpressionCollection.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.expressions._ + +object NondeterministicExpressionCollection { + def getNondeterministicToAttributes( + expressions: Seq[Expression]): Map[Expression, NamedExpression] = { + expressions + .filterNot(_.deterministic) + .flatMap { expr => + val leafNondeterministic = expr.collect { + case n: Nondeterministic => n + case udf: UserDefinedExpression if !udf.deterministic => udf + } + leafNondeterministic.distinct.map { e => + val ne = e match { + case n: NamedExpression => n + case _ => Alias(e, "_nondeterministic")() + } + e -> ne + } + } + .toMap + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/PullOutNondeterministic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/PullOutNondeterministic.scala index 3955142166831..c10cc2faf35d1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/PullOutNondeterministic.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/PullOutNondeterministic.scala @@ -34,7 +34,8 @@ object PullOutNondeterministic extends Rule[LogicalPlan] { case f: Filter => f case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) => - val nondeterToAttr = getNondeterToAttr(a.groupingExpressions) + val nondeterToAttr = + NondeterministicExpressionCollection.getNondeterministicToAttributes(a.groupingExpressions) val newChild = Project(a.child.output ++ nondeterToAttr.values, a.child) a.transformExpressions { case e => nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e) @@ -51,27 +52,12 @@ object PullOutNondeterministic extends Rule[LogicalPlan] { // from LogicalPlan, currently we only do it for UnaryNode which has same output // schema with its child. case p: UnaryNode if p.output == p.child.output && p.expressions.exists(!_.deterministic) => - val nondeterToAttr = getNondeterToAttr(p.expressions) + val nondeterToAttr = + NondeterministicExpressionCollection.getNondeterministicToAttributes(p.expressions) val newPlan = p.transformExpressions { case e => nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e) } val newChild = Project(p.child.output ++ nondeterToAttr.values, p.child) Project(p.output, newPlan.withNewChildren(newChild :: Nil)) } - - private def getNondeterToAttr(exprs: Seq[Expression]): Map[Expression, NamedExpression] = { - exprs.filterNot(_.deterministic).flatMap { expr => - val leafNondeterministic = expr.collect { - case n: Nondeterministic => n - case udf: UserDefinedExpression if !udf.deterministic => udf - } - leafNondeterministic.distinct.map { e => - val ne = e match { - case n: NamedExpression => n - case _ => Alias(e, "_nondeterministic")() - } - e -> ne - } - }.toMap - } }