Skip to content

Commit

Permalink
Merge branch 'apache:master' into PrivateMethod-cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
LuciferYang authored Feb 20, 2025
2 parents b3cf9c3 + fb17856 commit f3c3c13
Show file tree
Hide file tree
Showing 33 changed files with 1,970 additions and 380 deletions.
21 changes: 21 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -3587,6 +3587,16 @@
"message" : [
"Variable <varName> can only be declared at the beginning of the compound."
]
},
"QUALIFIED_LOCAL_VARIABLE" : {
"message" : [
"The variable <varName> must be declared without a qualifier, as qualifiers are not allowed for local variable declarations."
]
},
"REPLACE_LOCAL_VARIABLE" : {
"message" : [
"The variable <varName> does not support DECLARE OR REPLACE, as local variables cannot be replaced."
]
}
},
"sqlState" : "42K0M"
Expand Down Expand Up @@ -3733,6 +3743,12 @@
],
"sqlState" : "42K0L"
},
"LABEL_NAME_FORBIDDEN" : {
"message" : [
"The label name <label> is forbidden."
],
"sqlState" : "42K0L"
},
"LOAD_DATA_PATH_NOT_EXISTS" : {
"message" : [
"LOAD DATA input path does not exist: <path>."
Expand Down Expand Up @@ -5807,6 +5823,11 @@
"SQL Scripting is under development and not all features are supported. SQL Scripting enables users to write procedural SQL including control flow and error handling. To enable existing features set <sqlScriptingEnabled> to `true`."
]
},
"SQL_SCRIPTING_DROP_TEMPORARY_VARIABLE" : {
"message" : [
"DROP TEMPORARY VARIABLE is not supported within SQL scripts. To bypass this, use `EXECUTE IMMEDIATE 'DROP TEMPORARY VARIABLE ...'` ."
]
},
"SQL_SCRIPTING_WITH_POSITIONAL_PARAMETERS" : {
"message" : [
"Positional parameters are not supported with SQL Scripting."
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util

/**
* Helper trait for defining thread locals with lexical scoping. With this helper, the thread local
* is private and can only be set by the [[Handle]]. The [[Handle]] only exposes the thread local
* value to functions passed into its runWith method. This pattern allows for
* the lifetime of the thread local value to be strictly controlled.
*
* Rather than calling `tl.set(...)` and `tl.remove()` you would get a handle and execute your code
* in `handle.runWith { ... }`.
*
* Example:
* {{{
* object Credentials extends LexicalThreadLocal[Int] {
* def create(creds: Map[String, String]) = new Handle(Some(creds))
* }
* ...
* val handle = Credentials.create(Map("key" -> "value"))
* assert(Credentials.get() == None)
* handle.runWith {
* assert(Credentials.get() == Some(Map("key" -> "value")))
* }
* }}}
*/
trait LexicalThreadLocal[T] {
private val tl = new ThreadLocal[T]

private def set(opt: Option[T]): Unit = {
opt match {
case Some(x) => tl.set(x)
case None => tl.remove()
}
}

protected def createHandle(opt: Option[T]): Handle = new Handle(opt)

def get(): Option[T] = Option(tl.get)

/** Final class representing a handle to a thread local value. */
final class Handle private[LexicalThreadLocal] (private val opt: Option[T]) {
def runWith[R](f: => R): R = {
val old = get()
set(opt)
try f finally {
set(old)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst

import org.apache.spark.sql.catalyst.catalog.VariableManager
import org.apache.spark.util.LexicalThreadLocal

object SqlScriptingLocalVariableManager extends LexicalThreadLocal[VariableManager] {
def create(variableManager: VariableManager): Handle = createHandle(Option(variableManager))
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ import org.apache.spark.sql.connector.catalog.TableChange.{After, ColumnPosition
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.connector.catalog.procedures.{BoundProcedure, ProcedureParameter, UnboundProcedure}
import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.{PartitionOverwriteMode, StoreAssignmentPolicy}
Expand Down Expand Up @@ -3564,55 +3564,17 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
hint: JoinHint): LogicalPlan = {
import org.apache.spark.sql.catalyst.util._

val leftKeys = joinNames.map { keyName =>
left.output.find(attr => resolver(attr.name, keyName)).getOrElse {
throw QueryCompilationErrors.unresolvedUsingColForJoinError(
keyName, left.schema.fieldNames.sorted.map(toSQLId).mkString(", "), "left")
}
}
val rightKeys = joinNames.map { keyName =>
right.output.find(attr => resolver(attr.name, keyName)).getOrElse {
throw QueryCompilationErrors.unresolvedUsingColForJoinError(
keyName, right.schema.fieldNames.sorted.map(toSQLId).mkString(", "), "right")
}
}
val joinPairs = leftKeys.zip(rightKeys)

val newCondition = (condition ++ joinPairs.map(EqualTo.tupled)).reduceOption(And)

// columns not in joinPairs
val lUniqueOutput = left.output.filterNot(att => leftKeys.contains(att))
val rUniqueOutput = right.output.filterNot(att => rightKeys.contains(att))

// the output list looks like: join keys, columns from left, columns from right
val (projectList, hiddenList) = joinType match {
case LeftOuter =>
(leftKeys ++ lUniqueOutput ++ rUniqueOutput.map(_.withNullability(true)),
rightKeys.map(_.withNullability(true)))
case LeftExistence(_) =>
(leftKeys ++ lUniqueOutput, Seq.empty)
case RightOuter =>
(rightKeys ++ lUniqueOutput.map(_.withNullability(true)) ++ rUniqueOutput,
leftKeys.map(_.withNullability(true)))
case FullOuter =>
// In full outer join, we should return non-null values for the join columns
// if either side has non-null values for those columns. Therefore, for each
// join column pair, add a coalesce to return the non-null value, if it exists.
val joinedCols = joinPairs.map { case (l, r) =>
// Since this is a full outer join, either side could be null, so we explicitly
// set the nullability to true for both sides.
Alias(Coalesce(Seq(l.withNullability(true), r.withNullability(true))), l.name)()
}
(joinedCols ++
lUniqueOutput.map(_.withNullability(true)) ++
rUniqueOutput.map(_.withNullability(true)),
leftKeys.map(_.withNullability(true)) ++
rightKeys.map(_.withNullability(true)))
case _ : InnerLike =>
(leftKeys ++ lUniqueOutput ++ rUniqueOutput, rightKeys)
case _ =>
throw QueryExecutionErrors.unsupportedNaturalJoinTypeError(joinType)
}
val (projectList, hiddenList, newCondition) =
NaturalAndUsingJoinResolution.computeJoinOutputsAndNewCondition(
left,
left.output,
right,
right.output,
joinType,
joinNames,
condition,
(attributeName, keyName) => resolver(attributeName, keyName)
)

// use Project to hide duplicated common keys
// propagate hidden columns from nested USING/NATURAL JOINs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import scala.collection.mutable

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.SqlScriptingLocalVariableManager
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.SubExprUtils.wrapOuterReference
import org.apache.spark.sql.catalyst.parser.SqlScriptingLabelContext.isForbiddenLabelName
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.trees.CurrentOrigin.withOrigin
import org.apache.spark.sql.catalyst.trees.TreePattern._
Expand Down Expand Up @@ -229,6 +231,14 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
}
}

/**
* Look up variable by nameParts.
* If in SQL Script, first check local variables, unless in EXECUTE IMMEDIATE
* (EXECUTE IMMEDIATE generated query cannot access local variables).
* if not found fall back to session variables.
* @param nameParts NameParts of the variable.
* @return Reference to the variable.
*/
def lookupVariable(nameParts: Seq[String]): Option[VariableReference] = {
// The temp variables live in `SYSTEM.SESSION`, and the name can be qualified or not.
def maybeTempVariableName(nameParts: Seq[String]): Boolean = {
Expand All @@ -244,22 +254,41 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
}
}

if (maybeTempVariableName(nameParts)) {
val variableName = if (conf.caseSensitiveAnalysis) {
nameParts.last
} else {
nameParts.last.toLowerCase(Locale.ROOT)
}
catalogManager.tempVariableManager.get(variableName).map { varDef =>
val namePartsCaseAdjusted = if (conf.caseSensitiveAnalysis) {
nameParts
} else {
nameParts.map(_.toLowerCase(Locale.ROOT))
}

SqlScriptingLocalVariableManager.get()
// If we are in EXECUTE IMMEDIATE lookup only session variables.
.filterNot(_ => AnalysisContext.get.isExecuteImmediate)
// If variable name is qualified with session.<varName> treat it as a session variable.
.filterNot(_ =>
nameParts.length > 2 || (nameParts.length == 2 && isForbiddenLabelName(nameParts.head)))
.flatMap(_.get(namePartsCaseAdjusted))
.map { varDef =>
VariableReference(
nameParts,
FakeSystemCatalog,
Identifier.of(Array(CatalogManager.SESSION_NAMESPACE), variableName),
FakeLocalCatalog,
Identifier.of(Array(varDef.identifier.namespace().last), namePartsCaseAdjusted.last),
varDef)
}
} else {
None
}
.orElse(
if (maybeTempVariableName(nameParts)) {
catalogManager.tempVariableManager
.get(namePartsCaseAdjusted)
.map { varDef =>
VariableReference(
nameParts,
FakeSystemCatalog,
Identifier.of(Array(CatalogManager.SESSION_NAMESPACE), namePartsCaseAdjusted.last),
varDef
)}
} else {
None
}
)
}

// Resolves `UnresolvedAttribute` to its value.
Expand Down
Loading

0 comments on commit f3c3c13

Please sign in to comment.