Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Spark CAST(timestamp as integral) #11468

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions velox/expression/CastExpr-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,20 @@ void CastExpr::applyCastKernel(
return;
}

if constexpr (
(ToKind == TypeKind::TINYINT || ToKind == TypeKind::SMALLINT ||
ToKind == TypeKind::INTEGER || ToKind == TypeKind::BIGINT) &&
FromKind == TypeKind::TIMESTAMP) {
using To = typename TypeTraits<ToKind>::NativeType;
const auto castResult = hooks_->castTimestampToInt(inputRowValue);
if (castResult.hasError()) {
setError(castResult.error().message());
} else {
result->set(row, static_cast<To>(castResult.value()));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static_cast(castResult.value())

Is overflow well handled if casting int64_t as a lower-byte type?

}
return;
}

// Optimize empty input strings casting by avoiding throwing exceptions.
if constexpr (
FromKind == TypeKind::VARCHAR || FromKind == TypeKind::VARBINARY) {
Expand Down
2 changes: 2 additions & 0 deletions velox/expression/CastHooks.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class CastHooks {

virtual Expected<Timestamp> castIntToTimestamp(int64_t seconds) const = 0;

virtual Expected<int64_t> castTimestampToInt(Timestamp timestamp) const = 0;

virtual Expected<int32_t> castStringToDate(
const StringView& dateString) const = 0;

Expand Down
9 changes: 8 additions & 1 deletion velox/expression/PrestoCastHooks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,18 @@ Expected<Timestamp> PrestoCastHooks::castStringToTimestamp(
return result.first;
}

Expected<Timestamp> PrestoCastHooks::castIntToTimestamp(int64_t seconds) const {
Expected<Timestamp> PrestoCastHooks::castIntToTimestamp(
int64_t /*seconds*/) const {
return folly::makeUnexpected(
Status::UserError("Conversion to Timestamp is not supported"));
}

Expected<int64_t> PrestoCastHooks::castTimestampToInt(
Timestamp /*timestamp*/) const {
return folly::makeUnexpected(
Status::UserError("Conversion from Timestamp is not supported"));
}

Expected<int32_t> PrestoCastHooks::castStringToDate(
const StringView& dateString) const {
// Cast from string to date allows only complete ISO 8601 formatted strings:
Expand Down
2 changes: 2 additions & 0 deletions velox/expression/PrestoCastHooks.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ class PrestoCastHooks : public CastHooks {

Expected<Timestamp> castIntToTimestamp(int64_t seconds) const override;

Expected<int64_t> castTimestampToInt(Timestamp timestamp) const override;

// Uses standard cast mode to cast from string to date.
Expected<int32_t> castStringToDate(
const StringView& dateString) const override;
Expand Down
11 changes: 8 additions & 3 deletions velox/functions/sparksql/specialforms/SparkCastHooks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ Expected<Timestamp> SparkCastHooks::castStringToTimestamp(
Expected<Timestamp> SparkCastHooks::castIntToTimestamp(int64_t seconds) const {
// Spark internally use microsecond precision for timestamp.
// To avoid overflow, we need to check the range of seconds.
static constexpr int64_t maxSeconds = std::numeric_limits<int64_t>::max() /
(Timestamp::kMicrosecondsInMillisecond *
Timestamp::kMillisecondsInSecond);
static constexpr int64_t maxSeconds =
std::numeric_limits<int64_t>::max() / Timestamp::kMicrosecondsInSecond;
if (seconds > maxSeconds) {
return Timestamp::fromMicrosNoError(std::numeric_limits<int64_t>::max());
}
Expand All @@ -41,6 +40,12 @@ Expected<Timestamp> SparkCastHooks::castIntToTimestamp(int64_t seconds) const {
return Timestamp(seconds, 0);
}

Expected<int64_t> SparkCastHooks::castTimestampToInt(
Timestamp timestamp) const {
return std::floor(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does timestamp.toMicros() / Timestamp::kMicrosecondsInSecond work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the timestamp.toMicros() is negative, we need to round towards negative infinity instead of 0, so here we need std::floor, like the implementation in Spark using Math.floorDiv(ts, MICROS_PER_SECOND)

timestamp.toMicros() / (Timestamp::kMicrosecondsInSecond * 1.0));
}

Expected<int32_t> SparkCastHooks::castStringToDate(
const StringView& dateString) const {
// Allows all patterns supported by Spark:
Expand Down
2 changes: 2 additions & 0 deletions velox/functions/sparksql/specialforms/SparkCastHooks.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class SparkCastHooks : public exec::CastHooks {
/// number of seconds since the epoch (1970-01-01 00:00:00 UTC).
Expected<Timestamp> castIntToTimestamp(int64_t seconds) const override;

Expected<int64_t> castTimestampToInt(Timestamp timestamp) const override;

/// 1) Removes all leading and trailing UTF8 white-spaces before cast. 2) Uses
/// non-standard cast mode to cast from string to date.
Expected<int32_t> castStringToDate(
Expand Down
46 changes: 46 additions & 0 deletions velox/functions/sparksql/tests/SparkCastExprTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,24 @@ class SparkCastExprTest : public functions::test::CastBaseTest {
Timestamp(std::numeric_limits<T>::min(), 0),
std::nullopt}));
}

template <typename T>
void testTimestampToIntegralCast() {
testCast(
makeNullableFlatVector<Timestamp>(
{Timestamp(0, 0),
Timestamp(1, 0),
Timestamp(std::numeric_limits<T>::max(), 0),
Timestamp(std::numeric_limits<T>::min(), 0),
std::nullopt}),
makeNullableFlatVector<T>({
0,
1,
std::numeric_limits<T>::max(),
std::numeric_limits<T>::min(),
std::nullopt,
}));
}
};

TEST_F(SparkCastExprTest, date) {
Expand Down Expand Up @@ -291,6 +309,34 @@ TEST_F(SparkCastExprTest, intToTimestamp) {
testIntegralToTimestampCast<int32_t>();
}

TEST_F(SparkCastExprTest, timestampToInt) {
// Cast timestamp as bigint.
testCast(
makeNullableFlatVector<Timestamp>({
Timestamp(0, 0),
Timestamp(1727181032, 0),
Timestamp(-1727181032, 0),
Timestamp(9223372036854, 775'807'000),
Timestamp(-9223372036855, 224'192'000),
}),
makeNullableFlatVector<int64_t>({
0,
1727181032,
-1727181032,
9223372036854,
-9223372036855,
}));
testInvalidCast<Timestamp>(
"bigint",
{Timestamp(9223372036856, 0)},
"Could not convert Timestamp(9223372036856, 0) to microseconds");

// Cast timestamp as tinyint/smallint/integer.
testTimestampToIntegralCast<int8_t>();
testTimestampToIntegralCast<int16_t>();
testTimestampToIntegralCast<int32_t>();
}

TEST_F(SparkCastExprTest, primitiveInvalidCornerCases) {
// To integer.
{
Expand Down
24 changes: 14 additions & 10 deletions velox/type/Timestamp.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ struct Timestamp {
public:
static constexpr int64_t kMillisecondsInSecond = 1'000;
static constexpr int64_t kMicrosecondsInMillisecond = 1'000;
static constexpr int64_t kMicrosecondsInSecond =
kMicrosecondsInMillisecond * kMillisecondsInSecond;
static constexpr int64_t kNanosecondsInMicrosecond = 1'000;
static constexpr int64_t kNanosecondsInMillisecond = 1'000'000;
static constexpr int64_t kNanosInSecond =
Expand Down Expand Up @@ -183,19 +185,21 @@ struct Timestamp {

// Keep it in header for getting inlined.
int64_t toMicros() const {
// When an integer overflow occurs in the calculation,
// an exception will be thrown.
try {
return checkedPlus(
checkedMultiply(seconds_, (int64_t)1'000'000),
(int64_t)(nanos_ / 1'000));
} catch (const std::exception& e) {
// We use int128_t to make sure the computation does not overflows since
// there are cases such that seconds*1000000 does not fit in int64_t,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark is using int64 type to represent micro second, so the max allowed seconds should be INT64_MAX / 1000000. For a valid timestamp from Spark, why would seconds * 1000000 overflow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The min second could overflow, the min second is -9223372036855

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you like to extract this fix to a separate PR like 671e126? We could add test in 'velox/type/tests/TimestampTest.cpp'.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, let me extract this to a new pr

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added here: #11774

// but seconds*1000000 + nanos does, an example is TimeStamp::minMillis().
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo TimeStamp::minMillis(). -> Timestamp::minMillis().


// If the final result does not fit in int64_tw we throw.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo int64_tw

__int128_t result =
(__int128_t)seconds_ * 1'000'000 + (int64_t)(nanos_ / 1'000);
if (result < std::numeric_limits<int64_t>::min() ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

INT64_MAX and INT64_MIN

result > std::numeric_limits<int64_t>::max()) {
VELOX_USER_FAIL(
"Could not convert Timestamp({}, {}) to microseconds, {}",
"Could not convert Timestamp({}, {}) to microseconds",
seconds_,
nanos_,
e.what());
nanos_);
}
return result;
}

/// Exports the current timestamp as a std::chrono::time_point of millisecond
Expand Down
Loading