Skip to content

Commit f2d5abb

Browse files
committed
[SPARK-52243] Add NERF support for schema-related InvalidPlanInput errors
1 parent e2f4f5b commit f2d5abb

File tree

5 files changed

+177
-26
lines changed

5 files changed

+177
-26
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3198,6 +3198,12 @@
31983198
},
31993199
"sqlState" : "42K06"
32003200
},
3201+
"INVALID_OUTPUT_SCHEMA_DATATYPE_FOR_TRANSFORM_WITH_STATE_IN_PANDAS_NON_STRUCT" : {
3202+
"message" : [
3203+
"Invalid user-defined output schema type for TransformWithStateInPandas. Expect a struct type, but got <dataType>."
3204+
],
3205+
"sqlState" : "42K09"
3206+
},
32013207
"INVALID_PANDAS_UDF_PLACEMENT" : {
32023208
"message" : [
32033209
"The group aggregate pandas UDF <functionList> cannot be invoked together with as other, non-pandas aggregate functions."
@@ -3354,6 +3360,12 @@
33543360
],
33553361
"sqlState" : "42602"
33563362
},
3363+
"INVALID_PYTHON_UDTF_RETURN_DATATYPE_NON_STRUCT" : {
3364+
"message" : [
3365+
"Invalid Python user-defined table function return data type. Expect a struct type, but got <dataType>."
3366+
],
3367+
"sqlState" : "42K09"
3368+
},
33573369
"INVALID_QUERY_MIXED_QUERY_PARAMETERS" : {
33583370
"message" : [
33593371
"Parameterized query must either use positional, or named parameters, but not both."
@@ -3426,6 +3438,12 @@
34263438
},
34273439
"sqlState" : "42K07"
34283440
},
3441+
"INVALID_SCHEMA_DATATYPE_NON_STRUCT" : {
3442+
"message" : [
3443+
"Invalid schema data type. Expect a struct type, but got <dataType>."
3444+
],
3445+
"sqlState" : "42K09"
3446+
},
34293447
"INVALID_SCHEMA_OR_RELATION_NAME" : {
34303448
"message" : [
34313449
"<name> is not a valid name for tables/schemas. Valid names only contain alphabet characters, numbers and _."
@@ -3642,6 +3660,12 @@
36423660
],
36433661
"sqlState" : "42601"
36443662
},
3663+
"INVALID_STATE_SCHEMA_DATATYPE_FOR_FLAT_MAP_GROUPS_WITH_STATE_NON_STRUCT" : {
3664+
"message" : [
3665+
"Invalid state schema data type for flatMapGroupsWithState. Expect a struct type, but got <dataType>."
3666+
],
3667+
"sqlState" : "42K09"
3668+
},
36453669
"INVALID_SUBQUERY_EXPRESSION" : {
36463670
"message" : [
36473671
"Invalid subquery:"

sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrorsBase.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ private[sql] trait DataTypeErrorsBase {
8585
else value.toString
8686
}
8787

88-
protected def quoteByDefault(elem: String): String = {
88+
protected[sql] def quoteByDefault(elem: String): String = {
8989
"\"" + elem + "\""
9090
}
9191

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,22 @@ package org.apache.spark.sql.connect.planner
1919

2020
import scala.collection.mutable
2121

22+
import org.apache.spark.SparkThrowableHelper
2223
import org.apache.spark.connect.proto
2324
import org.apache.spark.sql.connect.common.{InvalidCommandInput, InvalidPlanInput}
25+
import org.apache.spark.sql.errors.DataTypeErrors.{quoteByDefault, toSQLType}
2426
import org.apache.spark.sql.types.DataType
2527

2628
object InvalidInputErrors {
2729

30+
// invalidPlanInput is a helper function to facilitate the migration of InvalidInputErrors
31+
// to support NERF.
32+
private def invalidPlanInput(
33+
errorCondition: String,
34+
messageParameters: Map[String, String] = Map.empty): InvalidPlanInput = {
35+
InvalidPlanInput(SparkThrowableHelper.getMessage(errorCondition, messageParameters))
36+
}
37+
2838
def unknownRelationNotSupported(rel: proto.Relation): InvalidPlanInput =
2939
InvalidPlanInput(s"${rel.getUnknown} not supported.")
3040

@@ -72,11 +82,6 @@ object InvalidInputErrors {
7282
def rowNotSupportedForUdf(errorType: String): InvalidPlanInput =
7383
InvalidPlanInput(s"Row is not a supported $errorType type for this UDF.")
7484

75-
def invalidUserDefinedOutputSchemaType(actualType: String): InvalidPlanInput =
76-
InvalidPlanInput(
77-
s"Invalid user-defined output schema type for TransformWithStateInPandas. " +
78-
s"Expect a struct type, but got $actualType.")
79-
8085
def notFoundCachedLocalRelation(hash: String, sessionUUID: String): InvalidPlanInput =
8186
InvalidPlanInput(
8287
s"Not found any cached local relation with the hash: " +
@@ -91,8 +96,10 @@ object InvalidInputErrors {
9196
def schemaRequiredForLocalRelation(): InvalidPlanInput =
9297
InvalidPlanInput("Schema for LocalRelation is required when the input data is not provided.")
9398

94-
def invalidSchema(schema: DataType): InvalidPlanInput =
95-
InvalidPlanInput(s"Invalid schema $schema")
99+
def invalidSchemaNonStructType(schema: String, dataType: DataType): InvalidPlanInput =
100+
invalidPlanInput(
101+
"INVALID_SCHEMA.NON_STRUCT_TYPE",
102+
Map("inputSchema" -> quoteByDefault(schema), "dataType" -> toSQLType(dataType)))
96103

97104
def invalidJdbcParams(): InvalidPlanInput =
98105
InvalidPlanInput("Invalid jdbc params, please specify jdbc url and table.")
@@ -107,7 +114,7 @@ object InvalidInputErrors {
107114
InvalidPlanInput(s"Does not support $what")
108115

109116
def invalidSchemaDataType(dataType: DataType): InvalidPlanInput =
110-
InvalidPlanInput(s"Invalid schema dataType $dataType")
117+
invalidPlanInput("INVALID_SCHEMA_DATATYPE_NON_STRUCT", Map("dataType" -> toSQLType(dataType)))
111118

112119
def expressionIdNotSupported(exprId: Int): InvalidPlanInput =
113120
InvalidPlanInput(s"Expression with ID: $exprId is not supported")
@@ -189,8 +196,10 @@ object InvalidInputErrors {
189196
def usingColumnsOrJoinConditionSetInJoin(): InvalidPlanInput =
190197
InvalidPlanInput("Using columns or join conditions cannot be set at the same time in Join")
191198

192-
def invalidStateSchemaDataType(dataType: DataType): InvalidPlanInput =
193-
InvalidPlanInput(s"Invalid state schema dataType $dataType for flatMapGroupsWithState")
199+
def invalidStateSchemaDataTypeForFlatMapGroupsWithState(dataType: DataType): InvalidPlanInput =
200+
invalidPlanInput(
201+
"INVALID_STATE_SCHEMA_DATATYPE_FOR_FLAT_MAP_GROUPS_WITH_STATE_NON_STRUCT",
202+
Map("dataType" -> toSQLType(dataType)))
194203

195204
def sqlCommandExpectsSqlOrWithRelations(other: proto.Relation.RelTypeCase): InvalidPlanInput =
196205
InvalidPlanInput(s"SQL command expects either a SQL or a WithRelations, but got $other")
@@ -213,16 +222,16 @@ object InvalidInputErrors {
213222
def invalidBucketCount(numBuckets: Int): InvalidCommandInput =
214223
InvalidCommandInput("INVALID_BUCKET_COUNT", Map("numBuckets" -> numBuckets.toString))
215224

216-
def invalidPythonUdtfReturnType(actualType: String): InvalidPlanInput =
217-
InvalidPlanInput(
218-
s"Invalid Python user-defined table function return type. " +
219-
s"Expect a struct type, but got $actualType.")
225+
def invalidPythonUdtfReturnDataType(dataType: DataType): InvalidPlanInput =
226+
invalidPlanInput(
227+
"INVALID_PYTHON_UDTF_RETURN_DATATYPE_NON_STRUCT",
228+
Map("dataType" -> toSQLType(dataType)))
220229

221-
def invalidUserDefinedOutputSchemaTypeForTransformWithState(
222-
actualType: String): InvalidPlanInput =
223-
InvalidPlanInput(
224-
s"Invalid user-defined output schema type for TransformWithStateInPandas. " +
225-
s"Expect a struct type, but got $actualType.")
230+
def invalidOutputSchemaDataTypeForTransformWithStateInPandas(
231+
dataType: DataType): InvalidPlanInput =
232+
invalidPlanInput(
233+
"INVALID_OUTPUT_SCHEMA_DATATYPE_FOR_TRANSFORM_WITH_STATE_IN_PANDAS_NON_STRUCT",
234+
Map("dataType" -> toSQLType(dataType)))
226235

227236
def unsupportedUserDefinedFunctionImplementation(clazz: Class[_]): InvalidPlanInput =
228237
InvalidPlanInput(s"Unsupported UserDefinedFunction implementation: ${clazz}")

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -773,7 +773,7 @@ class SparkConnectPlanner(
773773
val stateSchema = DataTypeProtoConverter.toCatalystType(rel.getStateSchema) match {
774774
case s: StructType => s
775775
case other =>
776-
throw InvalidInputErrors.invalidStateSchemaDataType(other)
776+
throw InvalidInputErrors.invalidStateSchemaDataTypeForFlatMapGroupsWithState(other)
777777
}
778778
val stateEncoder = TypedScalaUdf.encoderFor(
779779
// the state agnostic encoder is the second element in the input encoders.
@@ -1105,8 +1105,7 @@ class SparkConnectPlanner(
11051105
transformDataType(twsInfo.getOutputSchema) match {
11061106
case s: StructType => s
11071107
case dt =>
1108-
throw InvalidInputErrors.invalidUserDefinedOutputSchemaTypeForTransformWithState(
1109-
dt.typeName)
1108+
throw InvalidInputErrors.invalidOutputSchemaDataTypeForTransformWithStateInPandas(dt)
11101109
}
11111110
}
11121111

@@ -1502,7 +1501,7 @@ class SparkConnectPlanner(
15021501
StructType.fromDDL,
15031502
fallbackParser = DataType.fromJson) match {
15041503
case s: StructType => s
1505-
case other => throw InvalidInputErrors.invalidSchema(other)
1504+
case other => throw InvalidInputErrors.invalidSchemaNonStructType(schema, other)
15061505
}
15071506
}
15081507

@@ -2967,8 +2966,7 @@ class SparkConnectPlanner(
29672966
val returnType = if (udtf.hasReturnType) {
29682967
transformDataType(udtf.getReturnType) match {
29692968
case s: StructType => Some(s)
2970-
case dt =>
2971-
throw InvalidInputErrors.invalidPythonUdtfReturnType(dt.typeName)
2969+
case dt => throw InvalidInputErrors.invalidPythonUdtfReturnDataType(dt)
29722970
}
29732971
} else {
29742972
None
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connect.planner
19+
20+
import org.apache.spark.SparkThrowableHelper
21+
import org.apache.spark.connect.proto
22+
import org.apache.spark.sql.catalyst.expressions.AttributeReference
23+
import org.apache.spark.sql.catalyst.plans.PlanTest
24+
import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, InvalidPlanInput}
25+
import org.apache.spark.sql.connect.planner.SparkConnectPlanTest
26+
import org.apache.spark.sql.types._
27+
28+
class InvalidInputErrorsSuite extends PlanTest with SparkConnectPlanTest {
29+
30+
lazy val testLocalRelation =
31+
createLocalRelationProto(
32+
Seq(AttributeReference("id", IntegerType)(), AttributeReference("name", StringType)()),
33+
Seq.empty)
34+
35+
val testCases = Seq(
36+
TestCase(
37+
name = "Invalid schema data type",
38+
expectedErrorCondition = "INVALID_SCHEMA_DATATYPE_NON_STRUCT",
39+
expectedParameters = Map("dataType" -> "\"ARRAY<INT>\""),
40+
invalidInput = {
41+
val parse = proto.Parse
42+
.newBuilder()
43+
.setSchema(DataTypeProtoConverter.toConnectProtoType(ArrayType(IntegerType)))
44+
.setFormat(proto.Parse.ParseFormat.PARSE_FORMAT_CSV)
45+
.build()
46+
47+
proto.Relation.newBuilder().setParse(parse).build()
48+
}),
49+
TestCase(
50+
name = "Invalid schema string non-struct type",
51+
expectedErrorCondition = "INVALID_SCHEMA.NON_STRUCT_TYPE",
52+
expectedParameters = Map(
53+
"inputSchema" -> """"{"type":"array","elementType":"integer","containsNull":false}"""",
54+
"dataType" -> "\"ARRAY<INT>\""),
55+
invalidInput = {
56+
val invalidSchema = """{"type":"array","elementType":"integer","containsNull":false}"""
57+
58+
val dataSource = proto.Read.DataSource
59+
.newBuilder()
60+
.setFormat("csv")
61+
.setSchema(invalidSchema)
62+
.build()
63+
64+
val read = proto.Read
65+
.newBuilder()
66+
.setDataSource(dataSource)
67+
.build()
68+
69+
proto.Relation.newBuilder().setRead(read).build()
70+
}),
71+
TestCase(
72+
name = "Invalid output schema type for TransformWithStateInPandas",
73+
expectedErrorCondition =
74+
"INVALID_OUTPUT_SCHEMA_DATATYPE_FOR_TRANSFORM_WITH_STATE_IN_PANDAS_NON_STRUCT",
75+
expectedParameters = Map("dataType" -> "\"ARRAY<INT>\""),
76+
invalidInput = {
77+
val pythonUdf = proto.CommonInlineUserDefinedFunction
78+
.newBuilder()
79+
.setPythonUdf(
80+
proto.PythonUDF
81+
.newBuilder()
82+
.setEvalType(211)
83+
.setOutputType(DataTypeProtoConverter.toConnectProtoType(ArrayType(IntegerType)))
84+
.build())
85+
.build()
86+
87+
val groupMap = proto.GroupMap
88+
.newBuilder()
89+
.setInput(testLocalRelation)
90+
.setFunc(pythonUdf)
91+
.setTransformWithStateInfo(
92+
proto.TransformWithStateInfo
93+
.newBuilder()
94+
.setOutputSchema(DataTypeProtoConverter.toConnectProtoType(ArrayType(IntegerType)))
95+
.build())
96+
.build()
97+
98+
proto.Relation.newBuilder().setGroupMap(groupMap).build()
99+
}))
100+
101+
// Run all test cases
102+
testCases.foreach { testCase =>
103+
test(s"${testCase.name}") {
104+
val exception = intercept[InvalidPlanInput] {
105+
transform(testCase.invalidInput)
106+
}
107+
val expectedMessage = SparkThrowableHelper.getMessage(
108+
testCase.expectedErrorCondition,
109+
testCase.expectedParameters)
110+
assert(exception.getMessage == expectedMessage)
111+
}
112+
}
113+
114+
// Helper case class to define test cases
115+
case class TestCase(
116+
name: String,
117+
expectedErrorCondition: String,
118+
expectedParameters: Map[String, String],
119+
invalidInput: proto.Relation)
120+
}

0 commit comments

Comments
 (0)