diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index ec23c7346216f..746c9d9386d0a 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -1155,7 +1155,7 @@ datetimeUnit ; primaryExpression - : name=(CURRENT_DATE | CURRENT_TIMESTAMP | CURRENT_USER | USER | SESSION_USER) #currentLike + : name=(CURRENT_DATE | CURRENT_TIMESTAMP | CURRENT_USER | USER | SESSION_USER | CURRENT_TIME) #currentLike | name=(TIMESTAMPADD | DATEADD | DATE_ADD) LEFT_PAREN (unit=datetimeUnit | invalidUnit=stringLit) COMMA unitsAmount=valueExpression COMMA timestamp=valueExpression RIGHT_PAREN #timestampadd | name=(TIMESTAMPDIFF | DATEDIFF | DATE_DIFF | TIMEDIFF) LEFT_PAREN (unit=datetimeUnit | invalidUnit=stringLit) COMMA startTimestamp=valueExpression COMMA endTimestamp=valueExpression RIGHT_PAREN #timestampdiff | CASE whenClause+ (ELSE elseExpression=expression)? END #searchedCase diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala index 73fbebeb98c5a..b16ee9ad1929a 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala @@ -133,6 +133,39 @@ trait SparkDateTimeUtils { } } + /** + * Gets the number of microseconds since midnight using the session time zone. + */ + def instantToMicrosOfDay(instant: Instant, timezone: String): Long = { + val zoneId = getZoneId(timezone) + val localDateTime = LocalDateTime.ofInstant(instant, zoneId) + localDateTime.toLocalTime.getLong(MICRO_OF_DAY) + } + + /** + * Truncates a time value (in microseconds) to the specified fractional precision `p`. + * + * For example, if `p = 3`, we keep millisecond resolution and discard any digits beyond the + * thousand-microsecond place. So a value like `123456` microseconds (12:34:56.123456) becomes + * `123000` microseconds (12:34:56.123). + * + * @param micros + * The original time in microseconds. + * @param p + * The fractional second precision (range 0 to 6). + * @return + * The truncated microsecond value, preserving only `p` fractional digits. + */ + def truncateTimeMicrosToPrecision(micros: Long, p: Int): Long = { + assert( + p >= TimeType.MIN_PRECISION && p <= TimeType.MICROS_PRECISION, + s"Fractional second precision $p out" + + s" of range [${TimeType.MIN_PRECISION}..${TimeType.MICROS_PRECISION}].") + val scale = TimeType.MICROS_PRECISION - p + val factor = math.pow(10, scale).toLong + (micros / factor) * factor + } + /** * Converts the timestamp `micros` from one timezone to another. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 537469b2af278..7ffeedcf94741 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -627,6 +627,7 @@ object FunctionRegistry { expression[CurrentDate]("current_date"), expressionBuilder("curdate", CurDateExpressionBuilder, setAlias = true), expression[CurrentTimestamp]("current_timestamp"), + expression[CurrentTime]("current_time"), expression[CurrentTimeZone]("current_timezone"), expression[LocalTimestamp]("localtimestamp"), expression[DateDiff]("datediff"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/LiteralFunctionResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/LiteralFunctionResolution.scala index c7faf0536b77d..865a780b61dae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/LiteralFunctionResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/LiteralFunctionResolution.scala @@ -17,16 +17,7 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.catalyst.expressions.{ - Alias, - CurrentDate, - CurrentTimestamp, - CurrentUser, - Expression, - GroupingID, - NamedExpression, - VirtualColumn -} +import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, CurrentTime, CurrentTimestamp, CurrentUser, Expression, GroupingID, NamedExpression, VirtualColumn} import org.apache.spark.sql.catalyst.util.toPrettySQL /** @@ -47,10 +38,12 @@ object LiteralFunctionResolution { } } - // support CURRENT_DATE, CURRENT_TIMESTAMP, CURRENT_USER, USER, SESSION_USER and grouping__id + // support CURRENT_DATE, CURRENT_TIMESTAMP, CURRENT_TIME, + // CURRENT_USER, USER, SESSION_USER and grouping__id private val literalFunctions: Seq[(String, () => Expression, Expression => String)] = Seq( (CurrentDate().prettyName, () => CurrentDate(), toPrettySQL(_)), (CurrentTimestamp().prettyName, () => CurrentTimestamp(), toPrettySQL(_)), + (CurrentTime().prettyName, () => CurrentTime(), toPrettySQL(_)), (CurrentUser().prettyName, () => CurrentUser(), toPrettySQL), ("user", () => CurrentUser(), toPrettySQL), ("session_user", () => CurrentUser(), toPrettySQL), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/timeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/timeExpressions.scala index 8b4fa13fe2766..505b00231ecdd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/timeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/timeExpressions.scala @@ -19,13 +19,17 @@ package org.apache.spark.sql.catalyst.expressions import java.time.DateTimeException -import org.apache.spark.sql.catalyst.analysis.ExpressionBuilder +import org.apache.spark.sql.catalyst.analysis.{ExpressionBuilder, TypeCheckResult} +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.Cast.{toSQLExpr, toSQLId, toSQLType, toSQLValue} import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke} +import org.apache.spark.sql.catalyst.trees.TreePattern.{CURRENT_LIKE, TreePattern} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.util.TimeFormatter +import org.apache.spark.sql.catalyst.util.TypeUtils.{ordinalNumber} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.internal.types.StringTypeWithCollation -import org.apache.spark.sql.types.{AbstractDataType, IntegerType, ObjectType, TimeType, TypeCollection} +import org.apache.spark.sql.types.{AbstractDataType, DataType, IntegerType, ObjectType, TimeType, TypeCollection} import org.apache.spark.unsafe.types.UTF8String /** @@ -349,3 +353,109 @@ object SecondExpressionBuilder extends ExpressionBuilder { } } +/** + * Returns the current time at the start of query evaluation. + * There is no code generation since this expression should get constant folded by the optimizer. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = """ + _FUNC_([precision]) - Returns the current time at the start of query evaluation. + All calls of current_time within the same query return the same value. + + _FUNC_ - Returns the current time at the start of query evaluation. + """, + arguments = """ + Arguments: + * precision - An optional integer literal in the range [0..6], indicating how many + fractional digits of seconds to include. If omitted, the default is 6. + """, + examples = """ + Examples: + > SELECT _FUNC_(); + 15:49:11.914120 + > SELECT _FUNC_; + 15:49:11.914120 + > SELECT _FUNC_(0); + 15:49:11 + > SELECT _FUNC_(3); + 15:49:11.914 + > SELECT _FUNC_(1+1); + 15:49:11.91 + """, + group = "datetime_funcs", + since = "4.1.0" +) +case class CurrentTime(child: Expression = Literal(TimeType.MICROS_PRECISION)) + extends UnaryExpression with FoldableUnevaluable with ImplicitCastInputTypes { + + def this() = { + this(Literal(TimeType.MICROS_PRECISION)) + } + + final override val nodePatterns: Seq[TreePattern] = Seq(CURRENT_LIKE) + + override def nullable: Boolean = false + + override def checkInputDataTypes(): TypeCheckResult = { + // Check foldability + if (!child.foldable) { + return DataTypeMismatch( + errorSubClass = "NON_FOLDABLE_INPUT", + messageParameters = Map( + "inputName" -> toSQLId("precision"), + "inputType" -> toSQLType(child.dataType), + "inputExpr" -> toSQLExpr(child) + ) + ) + } + + // Evaluate + val precisionValue = child.eval() + if (precisionValue == null) { + return DataTypeMismatch( + errorSubClass = "UNEXPECTED_NULL", + messageParameters = Map("exprName" -> "precision")) + } + + // Check numeric range + precisionValue match { + case n: Number => + val p = n.intValue() + if (p < TimeType.MIN_PRECISION || p > TimeType.MICROS_PRECISION) { + return DataTypeMismatch( + errorSubClass = "VALUE_OUT_OF_RANGE", + messageParameters = Map( + "exprName" -> toSQLId("precision"), + "valueRange" -> s"[${TimeType.MIN_PRECISION}, ${TimeType.MICROS_PRECISION}]", + "currentValue" -> toSQLValue(p, IntegerType) + ) + ) + } + case _ => + return DataTypeMismatch( + errorSubClass = "UNEXPECTED_INPUT_TYPE", + messageParameters = Map( + "paramIndex" -> ordinalNumber(0), + "requiredType" -> toSQLType(IntegerType), + "inputSql" -> toSQLExpr(child), + "inputType" -> toSQLType(child.dataType)) + ) + } + TypeCheckSuccess + } + + // Because checkInputDataTypes ensures the argument is foldable & valid, + // we can directly evaluate here. + lazy val precision: Int = child.eval().asInstanceOf[Number].intValue() + + override def dataType: DataType = TimeType(precision) + + override def prettyName: String = "current_time" + + override protected def withNewChildInternal(newChild: Expression): Expression = { + copy(child = newChild) + } + + override def inputTypes: Seq[AbstractDataType] = Seq(IntegerType) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala index 0fbfce5962c73..21e09f2e56d19 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.catalyst.trees.TreePatternBits import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ, instantToMicros, localDateTimeToMicros} +import org.apache.spark.sql.catalyst.util.SparkDateTimeUtils.{instantToMicrosOfDay, truncateTimeMicrosToPrecision} import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLExpr import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.types._ @@ -113,6 +114,7 @@ object ComputeCurrentTime extends Rule[LogicalPlan] { val instant = Instant.now() val currentTimestampMicros = instantToMicros(instant) val currentTime = Literal.create(currentTimestampMicros, TimestampType) + val currentTimeOfDayMicros = instantToMicrosOfDay(instant, conf.sessionLocalTimeZone) val timezone = Literal.create(conf.sessionLocalTimeZone, StringType) val currentDates = collection.mutable.HashMap.empty[ZoneId, Literal] val localTimestamps = collection.mutable.HashMap.empty[ZoneId, Literal] @@ -129,6 +131,10 @@ object ComputeCurrentTime extends Rule[LogicalPlan] { Literal.create( DateTimeUtils.microsToDays(currentTimestampMicros, cd.zoneId), DateType) }) + case currentTimeType : CurrentTime => + val truncatedTime = truncateTimeMicrosToPrecision(currentTimeOfDayMicros, + currentTimeType.precision) + Literal.create(truncatedTime, TimeType(currentTimeType.precision)) case CurrentTimestamp() | Now() => currentTime case CurrentTimeZone() => timezone case localTimestamp: LocalTimestamp => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index b40075fcccee0..d8ce90f40b899 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2888,12 +2888,14 @@ class AstBuilder extends DataTypeAstBuilder CurrentDate() case SqlBaseParser.CURRENT_TIMESTAMP => CurrentTimestamp() + case SqlBaseParser.CURRENT_TIME => + CurrentTime() case SqlBaseParser.CURRENT_USER | SqlBaseParser.USER | SqlBaseParser.SESSION_USER => CurrentUser() } } else { // If the parser is not in ansi mode, we should return `UnresolvedAttribute`, in case there - // are columns named `CURRENT_DATE` or `CURRENT_TIMESTAMP`. + // are columns named `CURRENT_DATE` or `CURRENT_TIMESTAMP` or `CURRENT_TIME` UnresolvedAttribute.quoted(ctx.name.getText) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index eab4ddc666be4..f0dabfd976a7c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -819,6 +819,25 @@ class AnalysisSuite extends AnalysisTest with Matchers { } } + test("CURRENT_TIME should be case insensitive") { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + val input = Project(Seq( + // The user references "current_time" or "CURRENT_TIME" in the query + UnresolvedAttribute("current_time"), + UnresolvedAttribute("CURRENT_TIME") + ), testRelation) + + // The analyzer should resolve both to the same expression: CurrentTime() + val expected = Project(Seq( + Alias(CurrentTime(), toPrettySQL(CurrentTime()))(), + Alias(CurrentTime(), toPrettySQL(CurrentTime()))() + ), testRelation).analyze + + checkAnalysis(input, expected) + } + } + + test("CTE with non-existing column alias") { assertAnalysisErrorCondition(parsePlan("WITH t(x) AS (SELECT 1) SELECT * FROM t WHERE y = 1"), "UNRESOLVED_COLUMN.WITH_SUGGESTION", diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala index f231164d5c25a..662c740e2f96c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala @@ -21,11 +21,11 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.EvaluateUnresolvedInlineTable -import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, CurrentTimestamp, Literal, Rand} +import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, CurrentTime, CurrentTimestamp, Literal, Rand} import org.apache.spark.sql.catalyst.expressions.aggregate.Count import org.apache.spark.sql.catalyst.optimizer.{ComputeCurrentTime, EvalInlineTables} import org.apache.spark.sql.catalyst.plans.logical.LocalRelation -import org.apache.spark.sql.types.{LongType, NullType, TimestampType} +import org.apache.spark.sql.types.{LongType, NullType, TimestampType, TimeType} /** * Unit tests for [[ResolveInlineTables]]. Note that there are also test cases defined in @@ -113,6 +113,32 @@ class ResolveInlineTablesSuite extends AnalysisTest with BeforeAndAfter { } } + test("cast and execute CURRENT_TIME expressions") { + val table = UnresolvedInlineTable( + Seq("c1"), + Seq( + Seq(CurrentTime()), + Seq(CurrentTime()) + ) + ) + val resolved = ResolveInlineTables(table) + assert(resolved.isInstanceOf[ResolvedInlineTable], + "Expected an inline table to be resolved into a ResolvedInlineTable") + + val transformed = ComputeCurrentTime(resolved) + EvalInlineTables(transformed) match { + case LocalRelation(output, data, _, _) => + // expect default precision = 6 + assert(output.map(_.dataType) == Seq(TimeType(6))) + // Should have 2 rows + assert(data.size == 2) + // Both rows should have the *same* microsecond value for current_time + assert(data(0).getLong(0) == data(1).getLong(0), + "Both CURRENT_TIME calls must yield the same value in the same query") + } + } + + test("convert TimeZoneAwareExpression") { val table = UnresolvedInlineTable(Seq("c1"), Seq(Seq(Cast(lit("1991-12-06 00:00:00.0"), TimestampType)))) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeExpressionsSuite.scala index e0aebb46cccec..add505321b8c0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeExpressionsSuite.scala @@ -19,8 +19,10 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.{SPARK_DOC_ROOT, SparkDateTimeException, SparkFunSuite} import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.Cast.{toSQLId, toSQLValue} import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._ -import org.apache.spark.sql.types.{StringType, TimeType} +import org.apache.spark.sql.types.{IntegerType, StringType, TimeType} class TimeExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { test("ParseToTime") { @@ -226,4 +228,49 @@ class TimeExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkConsistencyBetweenInterpretedAndCodegen( (child: Expression) => SecondsOfTime(child).replacement, TimeType()) } + + test("CurrentTime") { + // test valid precision + var expr = CurrentTime(Literal(3)) + assert(expr.dataType == TimeType(3), "Should produce TIME(3) data type") + assert(expr.checkInputDataTypes() == TypeCheckSuccess) + + // test default constructor => TIME(6) + expr = CurrentTime() + assert(expr.precision == 6, "Default precision should be 6") + assert(expr.dataType == TimeType(6)) + assert(expr.checkInputDataTypes() == TypeCheckSuccess) + + // test no value => TIME() + expr = CurrentTime() + assert(expr.precision == 6, "Default precision should be 6") + assert(expr.dataType == TimeType(6)) + assert(expr.checkInputDataTypes() == TypeCheckSuccess) + + // test foldable value + expr = CurrentTime(Literal(1 + 1)) + assert(expr.precision == 2, "Precision should be 2") + assert(expr.dataType == TimeType(2)) + assert(expr.checkInputDataTypes() == TypeCheckSuccess) + + // test out of range precision => checkInputDataTypes fails + expr = CurrentTime(Literal(2 + 8)) + assert(expr.checkInputDataTypes() == + DataTypeMismatch( + errorSubClass = "VALUE_OUT_OF_RANGE", + messageParameters = Map( + "exprName" -> toSQLId("precision"), + "valueRange" -> s"[${TimeType.MIN_PRECISION}, ${TimeType.MICROS_PRECISION}]", + "currentValue" -> toSQLValue(10, IntegerType) + ) + ) + ) + + // test non number value should fail since we skip analyzer here + expr = CurrentTime(Literal("2")) + val failure = intercept[ClassCastException] { + expr.precision + } + assert(failure.getMessage.contains("cannot be cast to class java.lang.Number")) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala index 6e1c7fc887d4e..3fa6459a93e24 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala @@ -24,12 +24,13 @@ import scala.concurrent.duration._ import scala.jdk.CollectionConverters.MapHasAsScala import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, CurrentTimestamp, CurrentTimeZone, Expression, InSubquery, ListQuery, Literal, LocalTimestamp, Now} +import org.apache.spark.sql.catalyst.expressions.{Add, Alias, Cast, CurrentDate, CurrentTime, CurrentTimestamp, CurrentTimeZone, Expression, InSubquery, ListQuery, Literal, LocalTimestamp, Now} import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.IntegerType import org.apache.spark.unsafe.types.UTF8String class ComputeCurrentTimeSuite extends PlanTest { @@ -52,6 +53,83 @@ class ComputeCurrentTimeSuite extends PlanTest { assert(lits(0) == lits(1)) } + test("analyzer should replace current_time with literals") { + // logical plan that calls current_time() twice in the Project + val planInput = Project( + Seq( + Alias(CurrentTime(Literal(3)), "a")(), + Alias(CurrentTime(Literal(3)), "b")() + ), + LocalRelation() + ) + + val analyzed = planInput.analyze + val optimized = Optimize.execute(analyzed).asInstanceOf[Project] + + // We expect 2 literals in the final Project. Each literal is a Long + // representing microseconds since midnight, truncated to precision=3. + val lits = literals[Long](optimized) // a helper that extracts all Literal values of type Long + assert(lits.size == 2, s"Expected two literal values, found ${lits.size}") + + // The rule should produce the same microsecond value for both columns "a" and "b". + assert(lits(0) == lits(1), + s"Expected both current_time(3) calls to yield the same literal, " + + s"but got ${lits(0)} vs ${lits(1)}") + } + + test("analyzer should replace current_time with foldable child expressions") { + // We build a plan that calls current_time(2 + 1) twice + val foldableExpr = Add(Literal(2), Literal(1)) // a foldable arithmetic expression => 3 + val planInput = Project( + Seq( + Alias(CurrentTime(foldableExpr), "a")(), + Alias(CurrentTime(foldableExpr), "b")() + ), + LocalRelation() + ) + + val analyzed = planInput.analyze + val optimized = Optimize.execute(analyzed).asInstanceOf[Project] + + // We expect the optimizer to replace current_time(2 + 1) with a literal time value, + // so let's extract those literal values. + val lits = literals[Long](optimized) + assert(lits.size == 2, s"Expected two literal values, found ${lits.size}") + + // Both references to current_time(2 + 1) should be replaced by the same microsecond-of-day + assert(lits(0) == lits(1), + s"Expected both current_time(2 + 1) calls to yield the same literal, " + + s"but got ${lits(0)} vs. ${lits(1)}" + ) + } + + test("analyzer should replace current_time with foldable casted string-literal") { + // We'll build a foldable cast expression: CAST(' 0005 ' AS INT) => 5 + val castExpr = Cast(Literal(" 0005 "), IntegerType) + + // Two references to current_time(castExpr) => so we can check they're replaced consistently + val planInput = Project( + Seq( + Alias(CurrentTime(castExpr), "a")(), + Alias(CurrentTime(castExpr), "b")() + ), + LocalRelation() + ) + + val analyzed = planInput.analyze + val optimized = Optimize.execute(analyzed).asInstanceOf[Project] + + val lits = literals[Long](optimized) + assert(lits.size == 2, s"Expected two literal values, found ${lits.size}") + + // Both references to current_time(CAST(' 0005 ' AS INT)) in the same query + // should produce the same microsecond-of-day literal. + assert(lits(0) == lits(1), + s"Expected both references to yield the same literal, but got ${lits(0)} vs. ${lits(1)}" + ) + } + + test("analyzer should respect time flow in current timestamp calls") { val in = Project(Alias(CurrentTimestamp(), "t1")() :: Nil, LocalRelation()) @@ -65,6 +143,20 @@ class ComputeCurrentTimeSuite extends PlanTest { assert(t2 - t1 <= 1000 && t2 - t1 > 0) } + test("analyzer should respect time flow in current_time calls") { + val in = Project(Alias(CurrentTime(Literal(4)), "t1")() :: Nil, LocalRelation()) + + val planT1 = Optimize.execute(in.analyze).asInstanceOf[Project] + sleep(5) + val planT2 = Optimize.execute(in.analyze).asInstanceOf[Project] + + val t1 = literals[Long](planT1)(0) // the microseconds-of-day for planT1 + val t2 = literals[Long](planT2)(0) // the microseconds-of-day for planT2 + + assert(t2 > t1, s"Expected a newer time in the second analysis, but got t1=$t1, t2=$t2") + } + + test("analyzer should replace current_date with literals") { val in = Project(Seq(Alias(CurrentDate(), "a")(), Alias(CurrentDate(), "b")()), LocalRelation()) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala index 2cae4e4fe95ff..17b03946251a2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala @@ -1194,16 +1194,18 @@ class ExpressionParserSuite extends AnalysisTest { } } - test("current date/timestamp braceless expressions") { + test("current date/timestamp/time braceless expressions") { withSQLConf(SQLConf.ANSI_ENABLED.key -> "true", SQLConf.ENFORCE_RESERVED_KEYWORDS.key -> "true") { assertEqual("current_date", CurrentDate()) assertEqual("current_timestamp", CurrentTimestamp()) + assertEqual("current_time", CurrentTime()) } def testNonAnsiBehavior(): Unit = { assertEqual("current_date", UnresolvedAttribute.quoted("current_date")) assertEqual("current_timestamp", UnresolvedAttribute.quoted("current_timestamp")) + assertEqual("current_time", UnresolvedAttribute.quoted("current_time")) } withSQLConf( SQLConf.ANSI_ENABLED.key -> "false", diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md index 2a3f28fe496cc..f2301c4526b83 100644 --- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md +++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md @@ -104,6 +104,7 @@ | org.apache.spark.sql.catalyst.expressions.CurrentDatabase | current_database | SELECT current_database() | struct | | org.apache.spark.sql.catalyst.expressions.CurrentDatabase | current_schema | SELECT current_schema() | struct | | org.apache.spark.sql.catalyst.expressions.CurrentDate | current_date | SELECT current_date() | struct | +| org.apache.spark.sql.catalyst.expressions.CurrentTime | current_time | SELECT current_time() | struct | | org.apache.spark.sql.catalyst.expressions.CurrentTimeZone | current_timezone | SELECT current_timezone() | struct | | org.apache.spark.sql.catalyst.expressions.CurrentTimestamp | current_timestamp | SELECT current_timestamp() | struct | | org.apache.spark.sql.catalyst.expressions.CurrentUser | current_user | SELECT current_user() | struct | diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestHelper.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestHelper.scala index e807ae306ce76..cd54161f54acc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestHelper.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestHelper.scala @@ -27,7 +27,7 @@ import org.apache.spark.ErrorMessageFormat.MINIMAL import org.apache.spark.SparkThrowableHelper.getMessage import org.apache.spark.internal.Logging import org.apache.spark.sql.IntegratedUDFTestUtils.{TestUDF, TestUDTFSet} -import org.apache.spark.sql.catalyst.expressions.{CurrentDate, CurrentTimestampLike, CurrentUser, Literal} +import org.apache.spark.sql.catalyst.expressions.{CurrentDate, CurrentTime, CurrentTimestampLike, CurrentUser, Literal} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.fileToString @@ -101,6 +101,9 @@ trait SQLQueryTestHelper extends Logging { case expr: CurrentTimestampLike => deterministic = false expr + case expr: CurrentTime => + deterministic = false + expr case expr: CurrentUser => deterministic = false expr diff --git a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala index a7af22a0554e9..e90907b904bd5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala @@ -209,6 +209,7 @@ class ExpressionInfoSuite extends SparkFunSuite with SharedSparkSession { "org.apache.spark.sql.catalyst.expressions.CurrentTimeZone", "org.apache.spark.sql.catalyst.expressions.Now", "org.apache.spark.sql.catalyst.expressions.LocalTimestamp", + "org.apache.spark.sql.catalyst.expressions.CurrentTime", // Random output without a seed "org.apache.spark.sql.catalyst.expressions.Rand", "org.apache.spark.sql.catalyst.expressions.Randn",