From cccf3a3214b206afd68277482d2575b40e54381c Mon Sep 17 00:00:00 2001 From: mihailoale-db Date: Thu, 6 Feb 2025 21:36:01 +0100 Subject: [PATCH] initial commmit --- .../analysis/PullOutNondeterministic.scala | 48 ++++++++++++------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/PullOutNondeterministic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/PullOutNondeterministic.scala index 3955142166831..baf5a135be2b8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/PullOutNondeterministic.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/PullOutNondeterministic.scala @@ -17,6 +17,10 @@ package org.apache.spark.sql.catalyst.analysis +import java.util.HashMap + +import scala.jdk.CollectionConverters._ + import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule @@ -34,10 +38,10 @@ object PullOutNondeterministic extends Rule[LogicalPlan] { case f: Filter => f case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) => - val nondeterToAttr = getNondeterToAttr(a.groupingExpressions) - val newChild = Project(a.child.output ++ nondeterToAttr.values, a.child) + val nondeterToAttr = getNondeterministicToAttributes(a.groupingExpressions) + val newChild = Project(a.child.output ++ nondeterToAttr.values.asScala.toSeq, a.child) a.transformExpressions { case e => - nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e) + tryConvertNondeterministicToAttribute(e, nondeterToAttr) }.copy(child = newChild) // Don't touch collect metrics. Top-level metrics are not supported (check analysis will fail) @@ -51,27 +55,39 @@ object PullOutNondeterministic extends Rule[LogicalPlan] { // from LogicalPlan, currently we only do it for UnaryNode which has same output // schema with its child. case p: UnaryNode if p.output == p.child.output && p.expressions.exists(!_.deterministic) => - val nondeterToAttr = getNondeterToAttr(p.expressions) + val nondeterToAttr = getNondeterministicToAttributes(p.expressions) val newPlan = p.transformExpressions { case e => - nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e) + tryConvertNondeterministicToAttribute(e, nondeterToAttr) } - val newChild = Project(p.child.output ++ nondeterToAttr.values, p.child) + val newChild = Project(p.child.output ++ nondeterToAttr.values.asScala.toSeq, p.child) Project(p.output, newPlan.withNewChildren(newChild :: Nil)) } - private def getNondeterToAttr(exprs: Seq[Expression]): Map[Expression, NamedExpression] = { - exprs.filterNot(_.deterministic).flatMap { expr => - val leafNondeterministic = expr.collect { + def getNondeterministicToAttributes( + expressions: Seq[Expression]): HashMap[Expression, NamedExpression] = { + val nondeterministicToAttributes: HashMap[Expression, NamedExpression] = + new HashMap[Expression, NamedExpression] + expressions.filterNot(_.deterministic).foreach { expression => + val leafNondeterministic = expression.collect { case n: Nondeterministic => n case udf: UserDefinedExpression if !udf.deterministic => udf } - leafNondeterministic.distinct.map { e => - val ne = e match { - case n: NamedExpression => n - case _ => Alias(e, "_nondeterministic")() - } - e -> ne + leafNondeterministic.distinct.foreach { + case n: NamedExpression => nondeterministicToAttributes.put(expression, n) + case other => + nondeterministicToAttributes.put(other, Alias(other, "_nondeterministic")()) } - }.toMap + } + nondeterministicToAttributes + } + + private def tryConvertNondeterministicToAttribute( + expression: Expression, + nondeterministicToAttributes: HashMap[Expression, NamedExpression]): Expression = { + nondeterministicToAttributes.get(expression) match { + case null => expression + case other => + other.toAttribute + } } }