Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1155,7 +1155,7 @@ datetimeUnit
;

primaryExpression
: name=(CURRENT_DATE | CURRENT_TIMESTAMP | CURRENT_USER | USER | SESSION_USER) #currentLike
: name=(CURRENT_DATE | CURRENT_TIMESTAMP | CURRENT_USER | USER | SESSION_USER | CURRENT_TIME) #currentLike
| name=(TIMESTAMPADD | DATEADD | DATE_ADD) LEFT_PAREN (unit=datetimeUnit | invalidUnit=stringLit) COMMA unitsAmount=valueExpression COMMA timestamp=valueExpression RIGHT_PAREN #timestampadd
| name=(TIMESTAMPDIFF | DATEDIFF | DATE_DIFF | TIMEDIFF) LEFT_PAREN (unit=datetimeUnit | invalidUnit=stringLit) COMMA startTimestamp=valueExpression COMMA endTimestamp=valueExpression RIGHT_PAREN #timestampdiff
| CASE whenClause+ (ELSE elseExpression=expression)? END #searchedCase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,39 @@ trait SparkDateTimeUtils {
}
}

/**
* Gets the number of microseconds since midnight using the session time zone.
*/
def instantToMicrosOfDay(instant: Instant, timezone: String): Long = {
val zoneId = getZoneId(timezone)
val localDateTime = LocalDateTime.ofInstant(instant, zoneId)
localDateTime.toLocalTime.getLong(MICRO_OF_DAY)
}

/**
* Truncates a time value (in microseconds) to the specified fractional precision `p`.
*
* For example, if `p = 3`, we keep millisecond resolution and discard any digits beyond the
* thousand-microsecond place. So a value like `123456` microseconds (12:34:56.123456) becomes
* `123000` microseconds (12:34:56.123).
*
* @param micros
* The original time in microseconds.
* @param p
* The fractional second precision (range 0 to 6).
* @return
* The truncated microsecond value, preserving only `p` fractional digits.
*/
def truncateTimeMicrosToPrecision(micros: Long, p: Int): Long = {
assert(
p >= TimeType.MIN_PRECISION && p <= TimeType.MICROS_PRECISION,
s"Fractional second precision $p out" +
s" of range [${TimeType.MIN_PRECISION}..${TimeType.MICROS_PRECISION}].")
val scale = TimeType.MICROS_PRECISION - p
val factor = math.pow(10, scale).toLong
(micros / factor) * factor
}

/**
* Converts the timestamp `micros` from one timezone to another.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,7 @@ object FunctionRegistry {
expression[CurrentDate]("current_date"),
expressionBuilder("curdate", CurDateExpressionBuilder, setAlias = true),
expression[CurrentTimestamp]("current_timestamp"),
expression[CurrentTime]("current_time"),
expression[CurrentTimeZone]("current_timezone"),
expression[LocalTimestamp]("localtimestamp"),
expression[DateDiff]("datediff"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,7 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.expressions.{
Alias,
CurrentDate,
CurrentTimestamp,
CurrentUser,
Expression,
GroupingID,
NamedExpression,
VirtualColumn
}
import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, CurrentTime, CurrentTimestamp, CurrentUser, Expression, GroupingID, NamedExpression, VirtualColumn}
import org.apache.spark.sql.catalyst.util.toPrettySQL

/**
Expand All @@ -47,10 +38,12 @@ object LiteralFunctionResolution {
}
}

// support CURRENT_DATE, CURRENT_TIMESTAMP, CURRENT_USER, USER, SESSION_USER and grouping__id
// support CURRENT_DATE, CURRENT_TIMESTAMP, CURRENT_TIME,
// CURRENT_USER, USER, SESSION_USER and grouping__id
private val literalFunctions: Seq[(String, () => Expression, Expression => String)] = Seq(
(CurrentDate().prettyName, () => CurrentDate(), toPrettySQL(_)),
(CurrentTimestamp().prettyName, () => CurrentTimestamp(), toPrettySQL(_)),
(CurrentTime().prettyName, () => CurrentTime(), toPrettySQL(_)),
(CurrentUser().prettyName, () => CurrentUser(), toPrettySQL),
("user", () => CurrentUser(), toPrettySQL),
("session_user", () => CurrentUser(), toPrettySQL),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@ package org.apache.spark.sql.catalyst.expressions

import java.time.DateTimeException

import org.apache.spark.sql.catalyst.analysis.ExpressionBuilder
import org.apache.spark.sql.catalyst.analysis.{ExpressionBuilder, TypeCheckResult}
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, TypeCheckSuccess}
import org.apache.spark.sql.catalyst.expressions.Cast.{toSQLExpr, toSQLId, toSQLType, toSQLValue}
import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke}
import org.apache.spark.sql.catalyst.trees.TreePattern.{CURRENT_LIKE, TreePattern}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.TimeFormatter
import org.apache.spark.sql.catalyst.util.TypeUtils.{ordinalNumber}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.internal.types.StringTypeWithCollation
import org.apache.spark.sql.types.{AbstractDataType, IntegerType, ObjectType, TimeType, TypeCollection}
import org.apache.spark.sql.types.{AbstractDataType, DataType, IntegerType, ObjectType, TimeType, TypeCollection}
import org.apache.spark.unsafe.types.UTF8String

/**
Expand Down Expand Up @@ -349,3 +353,109 @@ object SecondExpressionBuilder extends ExpressionBuilder {
}
}

/**
* Returns the current time at the start of query evaluation.
* There is no code generation since this expression should get constant folded by the optimizer.
*/
// scalastyle:off line.size.limit
@ExpressionDescription(
usage = """
_FUNC_([precision]) - Returns the current time at the start of query evaluation.
All calls of current_time within the same query return the same value.

_FUNC_ - Returns the current time at the start of query evaluation.
""",
arguments = """
Arguments:
* precision - An optional integer literal in the range [0..6], indicating how many
fractional digits of seconds to include. If omitted, the default is 6.
""",
examples = """
Examples:
> SELECT _FUNC_();
15:49:11.914120
> SELECT _FUNC_;
15:49:11.914120
> SELECT _FUNC_(0);
15:49:11
> SELECT _FUNC_(3);
15:49:11.914
> SELECT _FUNC_(1+1);
15:49:11.91
""",
group = "datetime_funcs",
since = "4.1.0"
)
case class CurrentTime(child: Expression = Literal(TimeType.MICROS_PRECISION))
extends UnaryExpression with FoldableUnevaluable with ImplicitCastInputTypes {

def this() = {
this(Literal(TimeType.MICROS_PRECISION))
}

final override val nodePatterns: Seq[TreePattern] = Seq(CURRENT_LIKE)

override def nullable: Boolean = false

override def checkInputDataTypes(): TypeCheckResult = {
// Check foldability
if (!child.foldable) {
return DataTypeMismatch(
errorSubClass = "NON_FOLDABLE_INPUT",
messageParameters = Map(
"inputName" -> toSQLId("precision"),
"inputType" -> toSQLType(child.dataType),
"inputExpr" -> toSQLExpr(child)
)
)
}

// Evaluate
val precisionValue = child.eval()
if (precisionValue == null) {
return DataTypeMismatch(
errorSubClass = "UNEXPECTED_NULL",
messageParameters = Map("exprName" -> "precision"))
}

// Check numeric range
precisionValue match {
case n: Number =>
val p = n.intValue()
if (p < TimeType.MIN_PRECISION || p > TimeType.MICROS_PRECISION) {
return DataTypeMismatch(
errorSubClass = "VALUE_OUT_OF_RANGE",
messageParameters = Map(
"exprName" -> toSQLId("precision"),
"valueRange" -> s"[${TimeType.MIN_PRECISION}, ${TimeType.MICROS_PRECISION}]",
"currentValue" -> toSQLValue(p, IntegerType)
)
)
}
case _ =>
return DataTypeMismatch(
errorSubClass = "UNEXPECTED_INPUT_TYPE",
messageParameters = Map(
"paramIndex" -> ordinalNumber(0),
"requiredType" -> toSQLType(IntegerType),
"inputSql" -> toSQLExpr(child),
"inputType" -> toSQLType(child.dataType))
)
}
TypeCheckSuccess
}

// Because checkInputDataTypes ensures the argument is foldable & valid,
// we can directly evaluate here.
lazy val precision: Int = child.eval().asInstanceOf[Number].intValue()

override def dataType: DataType = TimeType(precision)

override def prettyName: String = "current_time"

override protected def withNewChildInternal(newChild: Expression): Expression = {
copy(child = newChild)
}

override def inputTypes: Seq[AbstractDataType] = Seq(IntegerType)
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.catalyst.trees.TreePatternBits
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ, instantToMicros, localDateTimeToMicros}
import org.apache.spark.sql.catalyst.util.SparkDateTimeUtils.{instantToMicrosOfDay, truncateTimeMicrosToPrecision}
import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLExpr
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -113,6 +114,7 @@ object ComputeCurrentTime extends Rule[LogicalPlan] {
val instant = Instant.now()
val currentTimestampMicros = instantToMicros(instant)
val currentTime = Literal.create(currentTimestampMicros, TimestampType)
val currentTimeOfDayMicros = instantToMicrosOfDay(instant, conf.sessionLocalTimeZone)
val timezone = Literal.create(conf.sessionLocalTimeZone, StringType)
val currentDates = collection.mutable.HashMap.empty[ZoneId, Literal]
val localTimestamps = collection.mutable.HashMap.empty[ZoneId, Literal]
Expand All @@ -129,6 +131,10 @@ object ComputeCurrentTime extends Rule[LogicalPlan] {
Literal.create(
DateTimeUtils.microsToDays(currentTimestampMicros, cd.zoneId), DateType)
})
case currentTimeType : CurrentTime =>
val truncatedTime = truncateTimeMicrosToPrecision(currentTimeOfDayMicros,
currentTimeType.precision)
Literal.create(truncatedTime, TimeType(currentTimeType.precision))
case CurrentTimestamp() | Now() => currentTime
case CurrentTimeZone() => timezone
case localTimestamp: LocalTimestamp =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2888,12 +2888,14 @@ class AstBuilder extends DataTypeAstBuilder
CurrentDate()
case SqlBaseParser.CURRENT_TIMESTAMP =>
CurrentTimestamp()
case SqlBaseParser.CURRENT_TIME =>
CurrentTime()
case SqlBaseParser.CURRENT_USER | SqlBaseParser.USER | SqlBaseParser.SESSION_USER =>
CurrentUser()
}
} else {
// If the parser is not in ansi mode, we should return `UnresolvedAttribute`, in case there
// are columns named `CURRENT_DATE` or `CURRENT_TIMESTAMP`.
// are columns named `CURRENT_DATE` or `CURRENT_TIMESTAMP` or `CURRENT_TIME`
UnresolvedAttribute.quoted(ctx.name.getText)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,25 @@ class AnalysisSuite extends AnalysisTest with Matchers {
}
}

test("CURRENT_TIME should be case insensitive") {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
val input = Project(Seq(
// The user references "current_time" or "CURRENT_TIME" in the query
UnresolvedAttribute("current_time"),
UnresolvedAttribute("CURRENT_TIME")
), testRelation)

// The analyzer should resolve both to the same expression: CurrentTime()
val expected = Project(Seq(
Alias(CurrentTime(), toPrettySQL(CurrentTime()))(),
Alias(CurrentTime(), toPrettySQL(CurrentTime()))()
), testRelation).analyze

checkAnalysis(input, expected)
}
}


test("CTE with non-existing column alias") {
assertAnalysisErrorCondition(parsePlan("WITH t(x) AS (SELECT 1) SELECT * FROM t WHERE y = 1"),
"UNRESOLVED_COLUMN.WITH_SUGGESTION",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import org.scalatest.BeforeAndAfter

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.EvaluateUnresolvedInlineTable
import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, CurrentTimestamp, Literal, Rand}
import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, CurrentTime, CurrentTimestamp, Literal, Rand}
import org.apache.spark.sql.catalyst.expressions.aggregate.Count
import org.apache.spark.sql.catalyst.optimizer.{ComputeCurrentTime, EvalInlineTables}
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.types.{LongType, NullType, TimestampType}
import org.apache.spark.sql.types.{LongType, NullType, TimestampType, TimeType}

/**
* Unit tests for [[ResolveInlineTables]]. Note that there are also test cases defined in
Expand Down Expand Up @@ -113,6 +113,32 @@ class ResolveInlineTablesSuite extends AnalysisTest with BeforeAndAfter {
}
}

test("cast and execute CURRENT_TIME expressions") {
val table = UnresolvedInlineTable(
Seq("c1"),
Seq(
Seq(CurrentTime()),
Seq(CurrentTime())
)
)
val resolved = ResolveInlineTables(table)
assert(resolved.isInstanceOf[ResolvedInlineTable],
"Expected an inline table to be resolved into a ResolvedInlineTable")

val transformed = ComputeCurrentTime(resolved)
EvalInlineTables(transformed) match {
case LocalRelation(output, data, _, _) =>
// expect default precision = 6
assert(output.map(_.dataType) == Seq(TimeType(6)))
// Should have 2 rows
assert(data.size == 2)
// Both rows should have the *same* microsecond value for current_time
assert(data(0).getLong(0) == data(1).getLong(0),
"Both CURRENT_TIME calls must yield the same value in the same query")
}
}


test("convert TimeZoneAwareExpression") {
val table = UnresolvedInlineTable(Seq("c1"),
Seq(Seq(Cast(lit("1991-12-06 00:00:00.0"), TimestampType))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.{SPARK_DOC_ROOT, SparkDateTimeException, SparkFunSuite}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, TypeCheckSuccess}
import org.apache.spark.sql.catalyst.expressions.Cast.{toSQLId, toSQLValue}
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
import org.apache.spark.sql.types.{StringType, TimeType}
import org.apache.spark.sql.types.{IntegerType, StringType, TimeType}

class TimeExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
test("ParseToTime") {
Expand Down Expand Up @@ -226,4 +228,49 @@ class TimeExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
checkConsistencyBetweenInterpretedAndCodegen(
(child: Expression) => SecondsOfTime(child).replacement, TimeType())
}

test("CurrentTime") {
// test valid precision
var expr = CurrentTime(Literal(3))
assert(expr.dataType == TimeType(3), "Should produce TIME(3) data type")
assert(expr.checkInputDataTypes() == TypeCheckSuccess)

// test default constructor => TIME(6)
expr = CurrentTime()
assert(expr.precision == 6, "Default precision should be 6")
assert(expr.dataType == TimeType(6))
assert(expr.checkInputDataTypes() == TypeCheckSuccess)

// test no value => TIME()
expr = CurrentTime()
assert(expr.precision == 6, "Default precision should be 6")
assert(expr.dataType == TimeType(6))
assert(expr.checkInputDataTypes() == TypeCheckSuccess)

// test foldable value
expr = CurrentTime(Literal(1 + 1))
assert(expr.precision == 2, "Precision should be 2")
assert(expr.dataType == TimeType(2))
assert(expr.checkInputDataTypes() == TypeCheckSuccess)

// test out of range precision => checkInputDataTypes fails
expr = CurrentTime(Literal(2 + 8))
assert(expr.checkInputDataTypes() ==
DataTypeMismatch(
errorSubClass = "VALUE_OUT_OF_RANGE",
messageParameters = Map(
"exprName" -> toSQLId("precision"),
"valueRange" -> s"[${TimeType.MIN_PRECISION}, ${TimeType.MICROS_PRECISION}]",
"currentValue" -> toSQLValue(10, IntegerType)
)
)
)

// test non number value should fail since we skip analyzer here
expr = CurrentTime(Literal("2"))
val failure = intercept[ClassCastException] {
expr.precision
}
assert(failure.getMessage.contains("cannot be cast to class java.lang.Number"))
}
}
Loading