-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add make_timestamp Spark function #8812
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,209 @@ | ||
/* | ||
* Copyright (c) Facebook, Inc. and its affiliates. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
#include "velox/functions/sparksql/DateTimeFunctions.h" | ||
#include "velox/expression/DecodedArgs.h" | ||
#include "velox/expression/VectorFunction.h" | ||
|
||
namespace facebook::velox::functions::sparksql { | ||
namespace { | ||
|
||
std::optional<Timestamp> makeTimeStampFromDecodedArgs( | ||
vector_size_t row, | ||
DecodedVector* yearVector, | ||
DecodedVector* monthVector, | ||
DecodedVector* dayVector, | ||
DecodedVector* hourVector, | ||
DecodedVector* minuteVector, | ||
DecodedVector* microsVector) { | ||
// Check hour. | ||
auto hour = hourVector->valueAt<int32_t>(row); | ||
if (hour < 0 || hour > 24) { | ||
return std::nullopt; | ||
} | ||
// Check minute. | ||
auto minute = minuteVector->valueAt<int32_t>(row); | ||
if (minute < 0 || minute > 60) { | ||
return std::nullopt; | ||
} | ||
// Check microseconds. | ||
auto micros = microsVector->valueAt<int64_t>(row); | ||
if (micros < 0) { | ||
return std::nullopt; | ||
} | ||
auto seconds = micros / util::kMicrosPerSec; | ||
if (seconds > 60 || (seconds == 60 && micros % util::kMicrosPerSec != 0)) { | ||
return std::nullopt; | ||
} | ||
|
||
// year, month, day will be checked in utils::daysSinceEpochFromDate; | ||
int64_t daysSinceEpoch; | ||
auto status = util::daysSinceEpochFromDate( | ||
yearVector->valueAt<int32_t>(row), | ||
monthVector->valueAt<int32_t>(row), | ||
dayVector->valueAt<int32_t>(row), | ||
daysSinceEpoch); | ||
if (!status.ok()) { | ||
VELOX_DCHECK(status.isUserError()); | ||
return std::nullopt; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To use try-catch for each row may have poor performace. Maybe we can take use of Status to represent the computing outcome of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we can optimize that in a separate PR. @mbasmanova Do you have any suggestion? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rui-mo Rui has a good point. We should not throw and catch exceptions per row. See https://velox-lib.io/blog/optimize-try_cast. |
||
} | ||
// micros has at most 8 digits (2 for seconds + 6 for microseconds), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see any checks for micros having at most 8 digits. Would you point to me where do we enforce that? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seconds should between [0-60]. Invalid inputs will fail on this check https://github.com/marin-ma/velox-oap/blob/0da4ffae84ccacdd8257b803f61caf70da18d5e9/velox/functions/sparksql/DateTimeFunctions.cpp#L47-L50 |
||
// thus it's safe to cast micros from int64_t to int32_t. | ||
auto localMicros = util::fromTime(hour, minute, 0, (int32_t)micros); | ||
return util::fromDatetime(daysSinceEpoch, localMicros); | ||
} | ||
|
||
void setTimestampOrNull( | ||
int32_t row, | ||
std::optional<Timestamp> timestamp, | ||
DecodedVector* timeZoneVector, | ||
FlatVector<Timestamp>* result) { | ||
if (timestamp.has_value()) { | ||
auto timeZone = timeZoneVector->valueAt<StringView>(row); | ||
auto tzID = util::getTimeZoneID(std::string_view(timeZone)); | ||
(*timestamp).toGMT(tzID); | ||
result->set(row, *timestamp); | ||
} else { | ||
result->setNull(row, true); | ||
} | ||
} | ||
|
||
void setTimestampOrNull( | ||
int32_t row, | ||
std::optional<Timestamp> timestamp, | ||
int64_t tzID, | ||
FlatVector<Timestamp>* result) { | ||
if (timestamp.has_value()) { | ||
(*timestamp).toGMT(tzID); | ||
result->set(row, *timestamp); | ||
} else { | ||
result->setNull(row, true); | ||
} | ||
} | ||
|
||
class MakeTimestampFunction : public exec::VectorFunction { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this function be implemented as a simple function? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The 6th parameter is of decimal type. I'm not sure it it's possible to implement it as a simple function. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Wow... I didn't realize that. Would you update the documentation to state that clearly? Are there restrictions on precision and scale? I assume scale must be <= 3. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mbasmanova Looks like the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
@marin-ma Are you saying that Fuzzer does cover this function? Would you run the Fuzzer with --only make_timestamp to make sure there are no failures? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mbasmanova I get exception with this command
Does it mean this function is not covered by the fuzzer test? Or did I use the wrong command? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @marin-ma I believe Fuzzer doesn't support DECIMAL types yet. It would be nice to add this support, otherwise, test coverage of VectorFunctions that use DECIMAL types is limited. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mbasmanova Yes, fuzzer test for decimal type is not supported. #5791 (comment) is about my previous finding , and I will remove the two limitations to see where the gap is, thanks. |
||
public: | ||
MakeTimestampFunction(int64_t sessionTzID) : sessionTzID_(sessionTzID) {} | ||
|
||
void apply( | ||
const SelectivityVector& rows, | ||
std::vector<VectorPtr>& args, | ||
const TypePtr& outputType, | ||
exec::EvalCtx& context, | ||
VectorPtr& result) const override { | ||
context.ensureWritable(rows, TIMESTAMP(), result); | ||
auto* resultFlatVector = result->as<FlatVector<Timestamp>>(); | ||
|
||
exec::DecodedArgs decodedArgs(rows, args, context); | ||
auto* year = decodedArgs.at(0); | ||
auto* month = decodedArgs.at(1); | ||
auto* day = decodedArgs.at(2); | ||
auto* hour = decodedArgs.at(3); | ||
auto* minute = decodedArgs.at(4); | ||
auto* micros = decodedArgs.at(5); | ||
|
||
if (args.size() == 7) { | ||
// If the timezone argument is specified, treat the input timestamp as the | ||
// time in that timezone. | ||
if (args[6]->isConstantEncoding()) { | ||
auto tz = | ||
args[6]->asUnchecked<ConstantVector<StringView>>()->valueAt(0); | ||
auto constantTzID = util::getTimeZoneID(std::string_view(tz)); | ||
rows.applyToSelected([&](vector_size_t row) { | ||
auto timestamp = makeTimeStampFromDecodedArgs( | ||
row, year, month, day, hour, minute, micros); | ||
setTimestampOrNull(row, timestamp, constantTzID, resultFlatVector); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In what cases the result is NULL while no input is NULL? Please, update documentation to describe these and add test cases. Please, double check that this behavior matches Spark. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This behavior aligns with spark's output under non-ansi mode. Spark returns NULL if for invalid inputs, such as month > 12, seconds > 60, etc. Added examples with invalid inputs in the document. |
||
}); | ||
} else { | ||
auto* timeZone = decodedArgs.at(6); | ||
rows.applyToSelected([&](vector_size_t row) { | ||
auto timestamp = makeTimeStampFromDecodedArgs( | ||
row, year, month, day, hour, minute, micros); | ||
setTimestampOrNull(row, timestamp, timeZone, resultFlatVector); | ||
}); | ||
} | ||
} else { | ||
// Otherwise use session timezone. | ||
rows.applyToSelected([&](vector_size_t row) { | ||
auto timestamp = makeTimeStampFromDecodedArgs( | ||
row, year, month, day, hour, minute, micros); | ||
setTimestampOrNull(row, timestamp, sessionTzID_, resultFlatVector); | ||
}); | ||
} | ||
} | ||
|
||
static std::vector<std::shared_ptr<exec::FunctionSignature>> signatures() { | ||
return { | ||
exec::FunctionSignatureBuilder() | ||
.integerVariable("precision") | ||
.returnType("timestamp") | ||
.argumentType("integer") | ||
.argumentType("integer") | ||
.argumentType("integer") | ||
.argumentType("integer") | ||
.argumentType("integer") | ||
.argumentType("decimal(precision, 6)") | ||
.build(), | ||
exec::FunctionSignatureBuilder() | ||
.integerVariable("precision") | ||
.returnType("timestamp") | ||
.argumentType("integer") | ||
.argumentType("integer") | ||
.argumentType("integer") | ||
.argumentType("integer") | ||
.argumentType("integer") | ||
.argumentType("decimal(precision, 6)") | ||
.argumentType("varchar") | ||
.build(), | ||
}; | ||
} | ||
|
||
private: | ||
const int64_t sessionTzID_; | ||
}; | ||
|
||
std::shared_ptr<exec::VectorFunction> createMakeTimestampFunction( | ||
const std::string& /* name */, | ||
const std::vector<exec::VectorFunctionArg>& inputArgs, | ||
const core::QueryConfig& config) { | ||
const auto sessionTzName = config.sessionTimezone(); | ||
VELOX_USER_CHECK( | ||
!sessionTzName.empty(), | ||
"make_timestamp requires session time zone to be set.") | ||
const auto sessionTzID = util::getTimeZoneID(sessionTzName); | ||
|
||
const auto& secondsType = inputArgs[5].type; | ||
VELOX_USER_CHECK( | ||
secondsType->isShortDecimal(), | ||
"Seconds must be short decimal type but got {}", | ||
secondsType->toString()); | ||
auto secondsScale = secondsType->asShortDecimal().scale(); | ||
VELOX_USER_CHECK_EQ( | ||
secondsScale, | ||
6, | ||
"Seconds fraction must have 6 digits for microseconds but got {}", | ||
secondsScale); | ||
|
||
return std::make_shared<MakeTimestampFunction>(sessionTzID); | ||
} | ||
} // namespace | ||
|
||
VELOX_DECLARE_STATEFUL_VECTOR_FUNCTION( | ||
udf_make_timestamp, | ||
MakeTimestampFunction::signatures(), | ||
createMakeTimestampFunction); | ||
|
||
} // namespace facebook::velox::functions::sparksql |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you generate the docs and verify that they render nicely?