Skip to content

Commit

Permalink
create sql udf
Browse files Browse the repository at this point in the history
  • Loading branch information
allisonwang-db committed Dec 10, 2024
1 parent 3985a76 commit 4f96e8c
Show file tree
Hide file tree
Showing 16 changed files with 960 additions and 32 deletions.
66 changes: 66 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -1216,6 +1216,18 @@
},
"sqlState" : "4274K"
},
"DUPLICATE_ROUTINE_PARAMETER_NAMES" : {
"message" : [
"Found duplicate name(s) in the parameter list of the user-defined routine <routineName>: <names>."
],
"sqlState" : "42734"
},
"DUPLICATE_ROUTINE_RETURNS_COLUMNS" : {
"message" : [
"Found duplicate column(s) in the RETURNS clause column list of the user-defined routine <routineName>: <columns>."
],
"sqlState" : "42711"
},
"EMITTING_ROWS_OLDER_THAN_WATERMARK_NOT_ALLOWED" : {
"message" : [
"Previous node emitted a row with eventTime=<emittedRowEventTime> which is older than current_watermark_value=<currentWatermark>",
Expand Down Expand Up @@ -4572,6 +4584,12 @@
],
"sqlState" : "42P01"
},
"TABLE_VALUED_ARGUMENTS_NOT_YET_IMPLEMENTED_FOR_SQL_FUNCTIONS" : {
"message" : [
"Cannot <action> SQL user-defined function <functionName> with TABLE arguments because this functionality is not yet implemented."
],
"sqlState" : "0A000"
},
"TABLE_VALUED_FUNCTION_FAILED_TO_ANALYZE_IN_PYTHON" : {
"message" : [
"Failed to analyze the Python user defined table function: <msg>"
Expand Down Expand Up @@ -5592,6 +5610,54 @@
],
"sqlState" : "42K0E"
},
"USER_DEFINED_FUNCTIONS" : {
"message" : [
"User defined function is invalid:"
],
"subClass" : {
"CANNOT_CONTAIN_COMPLEX_FUNCTIONS" : {
"message" : [
"SQL scalar function cannot contain aggregate/window/generate functions: <queryText>"
]
},
"CANNOT_REPLACE_NON_SQL_UDF_WITH_SQL_UDF" : {
"message" : [
"Cannot replace the non-SQL function <name> with a SQL function."
]
},
"NOT_A_VALID_DEFAULT_EXPRESSION" : {
"message" : [
"The DEFAULT expression of `<functionName>`.`<parameterName>` is not supported because it contains a subquery."
]
},
"NOT_A_VALID_DEFAULT_PARAMETER_POSITION" : {
"message" : [
"In routine `<functionName>` parameter `<parameterName>` with DEFAULT must not be followed by parameter `<nextParameterName>` without DEFAULT."
]
},
"NOT_NULL_ON_FUNCTION_PARAMETERS" : {
"message" : [
"Cannot specify NOT NULL on <languageName> function parameters: <input>"
]
},
"RETURN_COLUMN_COUNT_MISMATCH" : {
"message" : [
"The number of columns produced by the RETURN clause (num: `<outputSize>`) does not match the number of column names specified by the RETURNS clause (num: `<returnParamSize>`) of <name>."
]
},
"SQL_TABLE_UDF_BODY_MUST_BE_A_QUERY" : {
"message" : [
"SQL table function <name> body must be a query."
]
},
"SQL_TABLE_UDF_MISSING_COLUMN_NAMES" : {
"message" : [
"The relation returned by the query in the CREATE FUNCTION statement for <functionName> with RETURNS TABLE clause lacks explicit names for one or more output columns; please rewrite the function body to provide explicit column names or add column names to the RETURNS TABLE clause, and re-run the command."
]
}
},
"sqlState" : "42601"
},
"USER_RAISED_EXCEPTION" : {
"message" : [
"<errorMessage>"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,17 @@ case class StructField(
if (metadata.contains("comment")) Option(metadata.getString("comment")) else None
}

/**
* Return the default value of this StructField.
*/
private[sql] def getDefault(): Option[String] = {
if (metadata.contains("default")) {
Option(metadata.getString("default"))
} else {
None
}
}

/**
* Updates the StructField with a new current default value.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ case class ResolvedInlineTable(rows: Seq[Seq[Expression]], output: Seq[Attribute
*/
case class UnresolvedTableValuedFunction(
name: Seq[String],
functionArgs: Seq[Expression])
functionArgs: Seq[Expression],
override val isStreaming: Boolean = false)
extends UnresolvedLeafNode {

final override val nodePatterns: Seq[TreePattern] = Seq(UNRESOLVED_TABLE_VALUED_FUNCTION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package org.apache.spark.sql.catalyst.catalog

import org.apache.spark.SparkException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.errors.QueryErrorsBase

/**
* Errors during registering and executing [[UserDefinedFunction]]s.
* Errors during registering and executing
* [[org.apache.spark.sql.expressions.UserDefinedFunction]]s.
*/
object UserDefinedFunctionErrors extends QueryErrorsBase {
def unsupportedUserDefinedFunction(language: RoutineLanguage): Throwable = {
Expand All @@ -31,4 +33,69 @@ object UserDefinedFunctionErrors extends QueryErrorsBase {
def unsupportedUserDefinedFunction(language: String): Throwable = {
SparkException.internalError(s"Unsupported user defined function type: $language")
}

def duplicateParameterNames(routineName: String, names: String): Throwable = {
new AnalysisException(
errorClass = "DUPLICATE_ROUTINE_PARAMETER_NAMES",
messageParameters = Map("routineName" -> routineName, "names" -> names))
}

def duplicateReturnsColumns(routineName: String, columns: String): Throwable = {
new AnalysisException(
errorClass = "DUPLICATE_ROUTINE_RETURNS_COLUMNS",
messageParameters = Map("routineName" -> routineName, "columns" -> columns))
}

def cannotSpecifyNotNullOnFunctionParameters(
language: RoutineLanguage, input: String): Throwable = {
new AnalysisException(
errorClass = "USER_DEFINED_FUNCTIONS.NOT_NULL_ON_FUNCTION_PARAMETERS",
messageParameters = Map("languageName" -> language.name, "input" -> input))
}

def bodyIsNotAQueryForSqlTableUdf(functionName: String): Throwable = {
new AnalysisException(
errorClass = "USER_DEFINED_FUNCTIONS.SQL_TABLE_UDF_BODY_MUST_BE_A_QUERY",
messageParameters = Map("name" -> functionName))
}

def missingColumnNamesForSqlTableUdf(functionName: String): Throwable = {
new AnalysisException(
errorClass = "USER_DEFINED_FUNCTIONS.SQL_TABLE_UDF_MISSING_COLUMN_NAMES",
messageParameters = Map("functionName" -> toSQLId(functionName)))
}

def invalidTempViewReference(routineName: Seq[String], tempViewName: Seq[String]): Throwable = {
new AnalysisException(
errorClass = "INVALID_TEMP_OBJ_REFERENCE",
messageParameters = Map(
"obj" -> "FUNCTION",
"objName" -> toSQLId(routineName),
"tempObj" -> "VIEW",
"tempObjName" -> toSQLId(tempViewName)
)
)
}

def invalidTempFuncReference(routineName: Seq[String], tempFuncName: String): Throwable = {
new AnalysisException(
errorClass = "INVALID_TEMP_OBJ_REFERENCE",
messageParameters = Map(
"obj" -> "FUNCTION",
"objName" -> toSQLId(routineName),
"tempObj" -> "FUNCTION",
"tempObjName" -> toSQLId(tempFuncName)
)
)
}

def invalidTempVarReference(routineName: Seq[String], varName: Seq[String]): Throwable = {
new AnalysisException(
errorClass = "INVALID_TEMP_OBJ_REFERENCE",
messageParameters = Map(
"obj" -> "FUNCTION",
"objName" -> toSQLId(routineName),
"tempObj" -> "VARIABLE",
"tempObjName" -> toSQLId(varName)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import org.apache.spark.{SparkIllegalArgumentException, SparkUnsupportedOperatio
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, AssignmentUtils, EliminateSubqueryAliases, FieldName, NamedRelation, PartitionSpec, ResolvedIdentifier, ResolvedProcedure, TypeCheckResult, UnresolvedException, UnresolvedProcedure, ViewSchemaMode}
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, TypeCheckSuccess}
import org.apache.spark.sql.catalyst.catalog.{FunctionResource, RoutineLanguage}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.FunctionResource
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, MetadataAttribute, NamedExpression, UnaryExpression, Unevaluable, V2ExpressionUtils}
import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema
import org.apache.spark.sql.catalyst.trees.BinaryLike
Expand Down Expand Up @@ -1066,6 +1066,26 @@ case class CreateFunction(
copy(child = newChild)
}

/**
* The logical plan of the CREATE FUNCTION command for SQL Functions.
*/
case class CreateUserDefinedFunction(
child: LogicalPlan,
inputParamText: Option[String],
returnTypeText: String,
exprText: Option[String],
queryText: Option[String],
comment: Option[String],
isDeterministic: Option[Boolean],
containsSQL: Option[Boolean],
language: RoutineLanguage,
isTableFunc: Boolean,
ignoreIfExists: Boolean,
replace: Boolean) extends UnaryCommand {
override protected def withNewChildInternal(newChild: LogicalPlan): CreateUserDefinedFunction =
copy(child = newChild)
}

/**
* The logical plan of the DROP FUNCTION command.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2139,6 +2139,15 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
"ability" -> ability))
}

def tableValuedArgumentsNotYetImplementedForSqlFunctions(
action: String, functionName: String): Throwable = {
new AnalysisException(
errorClass = "TABLE_VALUED_ARGUMENTS_NOT_YET_IMPLEMENTED_FOR_SQL_FUNCTIONS",
messageParameters = Map(
"action" -> action,
"functionName" -> functionName))
}

def tableValuedFunctionTooManyTableArgumentsError(num: Int): Throwable = {
new AnalysisException(
errorClass = "TABLE_VALUED_FUNCTION_TOO_MANY_TABLE_ARGUMENTS",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,21 @@ trait AnalysisTest extends PlanTest {
}
}

protected def assertParseErrorClass(
parser: String => Any,
sqlCommand: String,
errorClass: String,
parameters: Map[String, String],
queryContext: Array[ExpectedContext] = Array.empty): Unit = {
val e = parseException(parser)(sqlCommand)
checkError(
exception = e,
condition = errorClass,
parameters = parameters,
queryContext = queryContext
)
}

protected def interceptParseException(parser: String => Any)(
sqlCommand: String, messages: String*)(condition: Option[String] = None): Unit = {
val e = parseException(parser)(sqlCommand)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,27 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)

case CreateFunction(ResolvedIdentifier(catalog, _), _, _, _, _) =>
throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "CREATE FUNCTION")

case c @ CreateUserDefinedFunction(
ResolvedIdentifierInSessionCatalog(ident), _, _, _, _, _, _, _, _, _, _, _) =>
CreateUserDefinedFunctionCommand(
FunctionIdentifier(ident.table, ident.database, ident.catalog),
c.inputParamText,
c.returnTypeText,
c.exprText,
c.queryText,
c.comment,
c.isDeterministic,
c.containsSQL,
c.language,
c.isTableFunc,
isTemp = false,
c.ignoreIfExists,
c.replace)

case CreateUserDefinedFunction(
ResolvedIdentifier(catalog, _), _, _, _, _, _, _, _, _, _, _, _) =>
throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "CREATE FUNCTION")
}

private def constructV1TableCmd(
Expand Down
Loading

0 comments on commit 4f96e8c

Please sign in to comment.