Skip to content

Commit

Permalink
initial commmit
Browse files Browse the repository at this point in the history
  • Loading branch information
mihailoale-db committed Feb 6, 2025
1 parent e89b19f commit e08b60b
Showing 1 changed file with 32 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.spark.sql.catalyst.analysis

import java.util.HashMap

import scala.jdk.CollectionConverters._

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
Expand All @@ -34,10 +38,10 @@ object PullOutNondeterministic extends Rule[LogicalPlan] {
case f: Filter => f

case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) =>
val nondeterToAttr = getNondeterToAttr(a.groupingExpressions)
val newChild = Project(a.child.output ++ nondeterToAttr.values, a.child)
val nondeterToAttr = getNondeterministicToAttributes(a.groupingExpressions)
val newChild = Project(a.child.output ++ nondeterToAttr.values.asScala.toSeq, a.child)
a.transformExpressions { case e =>
nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
tryConvertNondeterministicToAttribute(e, nondeterToAttr)
}.copy(child = newChild)

// Don't touch collect metrics. Top-level metrics are not supported (check analysis will fail)
Expand All @@ -51,27 +55,39 @@ object PullOutNondeterministic extends Rule[LogicalPlan] {
// from LogicalPlan, currently we only do it for UnaryNode which has same output
// schema with its child.
case p: UnaryNode if p.output == p.child.output && p.expressions.exists(!_.deterministic) =>
val nondeterToAttr = getNondeterToAttr(p.expressions)
val nondeterToAttr = getNondeterministicToAttributes(p.expressions)
val newPlan = p.transformExpressions { case e =>
nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
tryConvertNondeterministicToAttribute(e, nondeterToAttr)
}
val newChild = Project(p.child.output ++ nondeterToAttr.values, p.child)
val newChild = Project(p.child.output ++ nondeterToAttr.values.asScala.toSeq, p.child)
Project(p.output, newPlan.withNewChildren(newChild :: Nil))
}

private def getNondeterToAttr(exprs: Seq[Expression]): Map[Expression, NamedExpression] = {
exprs.filterNot(_.deterministic).flatMap { expr =>
val leafNondeterministic = expr.collect {
def getNondeterministicToAttributes(
expressions: Seq[Expression]): HashMap[Expression, NamedExpression] = {
val nondeterministicToAttributes: HashMap[Expression, NamedExpression] =
new HashMap[Expression, NamedExpression]
expressions.filterNot(_.deterministic).foreach { expression =>
val leafNondeterministic = expression.collect {
case n: Nondeterministic => n
case udf: UserDefinedExpression if !udf.deterministic => udf
}
leafNondeterministic.distinct.map { e =>
val ne = e match {
case n: NamedExpression => n
case _ => Alias(e, "_nondeterministic")()
}
e -> ne
leafNondeterministic.distinct.foreach {
case n: NamedExpression => nondeterministicToAttributes.put(expression, n)
case other =>
nondeterministicToAttributes.put(other, Alias(other, "_nondeterministic")())
}
}.toMap
}
nondeterministicToAttributes
}

private def tryConvertNondeterministicToAttribute(
expression: Expression,
nondeterministicToAttributes: HashMap[Expression, NamedExpression]): Expression = {
nondeterministicToAttributes.get(expression) match {
case null => expression
case other =>
other.toAttribute
}
}
}

0 comments on commit e08b60b

Please sign in to comment.