Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pull] master from apache:master #441

Merged
merged 6 commits into from
Mar 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions python/pyspark/errors/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@
"Calling property or member '<member>' is not supported in PySpark Classic, please use Spark Connect instead."
]
},
"COLLATION_INVALID_PROVIDER" : {
"message" : [
"COLLATION_INVALID_PROVIDER": {
"message": [
"The value <provider> does not represent a correct collation provider. Supported providers are: [<supportedProviders>]."
]
},
Expand Down Expand Up @@ -372,8 +372,8 @@
"All items in `<arg_name>` should be in <allowed_types>, got <item_type>."
]
},
"INVALID_JSON_DATA_TYPE_FOR_COLLATIONS" : {
"message" : [
"INVALID_JSON_DATA_TYPE_FOR_COLLATIONS": {
"message": [
"Collations can only be applied to string types, but the JSON data type is <jsonType>."
]
},
Expand Down Expand Up @@ -502,8 +502,8 @@
"<arg1> and <arg2> should be of the same length, got <arg1_length> and <arg2_length>."
]
},
"MALFORMED_VARIANT" : {
"message" : [
"MALFORMED_VARIANT": {
"message": [
"Variant binary is malformed. Please check the data source is valid."
]
},
Expand All @@ -517,7 +517,7 @@
"A master URL must be set in your configuration."
]
},
"MEMORY_PROFILE_INVALID_SOURCE":{
"MEMORY_PROFILE_INVALID_SOURCE": {
"message": [
"Memory profiler can only be used on editors with line numbers."
]
Expand Down Expand Up @@ -812,7 +812,7 @@
"<package_name> >= <minimum_version> must be installed; however, it was not found."
]
},
"PANDAS_UDF_OUTPUT_EXCEEDS_INPUT_ROWS" : {
"PANDAS_UDF_OUTPUT_EXCEEDS_INPUT_ROWS": {
"message": [
"The Pandas SCALAR_ITER UDF outputs more rows than input rows."
]
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/errors/exceptions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ def _write_self() -> None:
sort_keys=True,
indent=2,
)
f.write("\n")
2 changes: 1 addition & 1 deletion python/pyspark/sql/pandas/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ def calculate(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
# Note: DDL formatted string is used for 'SQL Type' for simplicity. This string can be
# used in `returnType`.
# Note: The values inside of the table are generated by `repr`.
# Note: Python 3.9.5, Pandas 1.4.0 and PyArrow 6.0.1 are used.
# Note: Python 3.11.9, Pandas 2.2.3 and PyArrow 17.0.0 are used.
# Note: Timezone is KST.
# Note: 'X' means it throws an exception during the conversion.
require_minimum_pandas_version()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ object Cast extends QueryErrorsBase {
case (_: YearMonthIntervalType, _: YearMonthIntervalType) => true

case (_: StringType, DateType) => true
case (_: StringType, _: TimeType) => true
case (TimestampType, DateType) => true
case (TimestampNTZType, DateType) => true

Expand Down Expand Up @@ -219,6 +220,7 @@ object Cast extends QueryErrorsBase {
case (TimestampType, TimestampNTZType) => true

case (_: StringType, DateType) => true
case (_: StringType, _: TimeType) => true
case (TimestampType, DateType) => true
case (TimestampNTZType, DateType) => true

Expand Down Expand Up @@ -727,6 +729,15 @@ case class Cast(
buildCast[Long](_, t => microsToDays(t, ZoneOffset.UTC))
}

private[this] def castToTime(from: DataType): Any => Any = from match {
case _: StringType =>
if (ansiEnabled) {
buildCast[UTF8String](_, s => DateTimeUtils.stringToTimeAnsi(s, getContextOrNull()))
} else {
buildCast[UTF8String](_, s => DateTimeUtils.stringToTime(s).orNull)
}
}

// IntervalConverter
private[this] def castToInterval(from: DataType): Any => Any = from match {
case _: StringType =>
Expand Down Expand Up @@ -1134,6 +1145,7 @@ case class Cast(
case s: StringType => castToString(from, s.constraint)
case BinaryType => castToBinary(from)
case DateType => castToDate(from)
case _: TimeType => castToTime(from)
case decimal: DecimalType => castToDecimal(from, decimal)
case TimestampType => castToTimestamp(from)
case TimestampNTZType => castToTimestampNTZ(from)
Expand Down Expand Up @@ -1241,6 +1253,7 @@ case class Cast(
(c, evPrim, _) => castToStringCode(from, ctx, s.constraint).apply(c, evPrim)
case BinaryType => castToBinaryCode(from)
case DateType => castToDateCode(from, ctx)
case _: TimeType => castToTimeCode(from, ctx)
case decimal: DecimalType => castToDecimalCode(from, decimal, ctx)
case TimestampType => castToTimestampCode(from, ctx)
case TimestampNTZType => castToTimestampNTZCode(from, ctx)
Expand Down Expand Up @@ -1313,8 +1326,7 @@ case class Cast(
"""
} else {
code"""
scala.Option<Integer> $intOpt =
org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToDate($c);
scala.Option<Integer> $intOpt = $dateTimeUtilsCls.stringToDate($c);
if ($intOpt.isDefined()) {
$evPrim = ((Integer) $intOpt.get()).intValue();
} else {
Expand All @@ -1327,8 +1339,7 @@ case class Cast(
val zidClass = classOf[ZoneId]
val zid = JavaCode.global(ctx.addReferenceObj("zoneId", zoneId, zidClass.getName), zidClass)
(c, evPrim, evNull) =>
code"""$evPrim =
org.apache.spark.sql.catalyst.util.DateTimeUtils.microsToDays($c, $zid);"""
code"""$evPrim = $dateTimeUtilsCls.microsToDays($c, $zid);"""
case TimestampNTZType =>
(c, evPrim, evNull) =>
code"$evPrim = $dateTimeUtilsCls.microsToDays($c, java.time.ZoneOffset.UTC);"
Expand All @@ -1337,6 +1348,34 @@ case class Cast(
}
}

private[this] def castToTimeCode(
from: DataType,
ctx: CodegenContext): CastFunction = {
from match {
case _: StringType =>
val longOpt = ctx.freshVariable("longOpt", classOf[Option[Long]])
(c, evPrim, evNull) =>
if (ansiEnabled) {
val errorContext = getContextOrNullCode(ctx)
code"""
$evPrim = $dateTimeUtilsCls.stringToTimeAnsi($c, $errorContext);
"""
} else {
code"""
scala.Option<Long> $longOpt = $dateTimeUtilsCls.stringToTime($c);
if ($longOpt.isDefined()) {
$evPrim = ((Long) $longOpt.get()).longValue();
} else {
$evNull = true;
}
"""
}

case _ =>
(_, _, evNull) => code"$evNull = true;"
}
}

private[this] def changePrecision(
d: ExprValue,
decimalType: DecimalType,
Expand Down Expand Up @@ -1481,8 +1520,7 @@ case class Cast(
"""
} else {
code"""
scala.Option<Long> $longOpt =
org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToTimestamp($c, $zid);
scala.Option<Long> $longOpt = $dateTimeUtilsCls.stringToTimestamp($c, $zid);
if ($longOpt.isDefined()) {
$evPrim = ((Long) $longOpt.get()).longValue();
} else {
Expand All @@ -1500,8 +1538,7 @@ case class Cast(
ctx.addReferenceObj("zoneId", zoneId, zoneIdClass.getName),
zoneIdClass)
(c, evPrim, evNull) =>
code"""$evPrim =
org.apache.spark.sql.catalyst.util.DateTimeUtils.daysToMicros($c, $zid);"""
code"""$evPrim = $dateTimeUtilsCls.daysToMicros($c, $zid);"""
case TimestampNTZType =>
val zoneIdClass = classOf[ZoneId]
val zid = JavaCode.global(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1469,4 +1469,16 @@ abstract class CastSuiteBase extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(Cast(Literal(time), StringType), expectedStr)
}
}

test("cast string to time") {
checkEvaluation(cast(Literal.create("0:0:0"), TimeType()), 0L)
checkEvaluation(cast(Literal.create(" 01:2:3.01 "), TimeType(2)), localTime(1, 2, 3, 10000))
checkEvaluation(cast(Literal.create(" 12:13:14.999"),
TimeType(3)), localTime(12, 13, 14, 999 * 1000))
checkEvaluation(cast(Literal.create("23:0:59.0001 "), TimeType(4)), localTime(23, 0, 59, 100))
checkEvaluation(cast(Literal.create("23:59:0.99999"),
TimeType(5)), localTime(23, 59, 0, 999990))
checkEvaluation(cast(Literal.create("23:59:59.000001 "),
TimeType(6)), localTime(23, 59, 59, 1))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -901,4 +901,10 @@ class CastWithAnsiOffSuite extends CastSuiteBase {
castOverflowErrMsg(toType))
}
}

test("cast invalid string input to time") {
Seq("a", "123", "00:00:00ABC", "24:00:00").foreach { invalidInput =>
checkEvaluation(cast(invalidInput, TimeType()), null)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -737,4 +737,12 @@ class CastWithAnsiOnSuite extends CastSuiteBase with QueryErrorsBase {
val input = Literal.create(Decimal(0.000000123), DecimalType(9, 9))
checkEvaluation(cast(input, StringType), "0.000000123")
}

test("cast invalid string input to time") {
Seq("a", "123", "00:00:00ABC", "24:00:00").foreach { invalidInput =>
checkExceptionInExpression[DateTimeException](
cast(invalidInput, TimeType()),
castErrMsg(invalidInput, TimeType()))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,10 @@ case class ApplyColumnarRulesAndInsertTransitions(
case write: DataWritingCommandExec
if write.cmd.isInstanceOf[V1WriteCommand] && conf.plannedWriteEnabled =>
write.child.supportsColumnar
// If it is not required to output columnar (`outputsColumnar` is false), and the plan
// supports row-based and columnar, we don't need to output row-based data on its children
// nodes. So we set `outputsColumnar` to true.
case _ if plan.supportsColumnar && plan.supportsRowBased => true
case _ =>
false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.vectorized.ColumnarBatch

class ColumnarRulesSuite extends PlanTest with SharedSparkSession {

Expand Down Expand Up @@ -51,6 +52,15 @@ class ColumnarRulesSuite extends PlanTest with SharedSparkSession {
val appliedTwice = rules.apply(appliedOnce)
assert(appliedTwice == expected)
}

test("SPARK-51474: Don't insert redundant ColumnarToRowExec") {
val rules = ApplyColumnarRulesAndInsertTransitions(
spark.sessionState.columnarRules, false)

val plan = CanDoColumnarAndRowOp(UnaryOp(LeafOp(true), true))
val appliedOnce = rules.apply(plan)
assert(appliedOnce == plan)
}
}

case class LeafOp(override val supportsColumnar: Boolean) extends LeafExecNode {
Expand All @@ -63,3 +73,15 @@ case class UnaryOp(child: SparkPlan, override val supportsColumnar: Boolean) ext
override def output: Seq[Attribute] = child.output
override protected def withNewChildInternal(newChild: SparkPlan): UnaryOp = copy(child = newChild)
}

case class CanDoColumnarAndRowOp(child: SparkPlan) extends UnaryExecNode {
override val supportsRowBased: Boolean = true
override val supportsColumnar: Boolean = true

override protected def doExecute(): RDD[InternalRow] = throw SparkUnsupportedOperationException()
override protected def doExecuteColumnar(): RDD[ColumnarBatch] =
throw SparkUnsupportedOperationException()
override def output: Seq[Attribute] = child.output
override protected def withNewChildInternal(newChild: SparkPlan): CanDoColumnarAndRowOp =
copy(child = newChild)
}
Loading