Skip to content

Commit

Permalink
misc: Add support for TimestampWithTimeZone in AggregationFuzzer (#11897
Browse files Browse the repository at this point in the history
)

Summary:
Pull Request resolved: #11897

This change adds support for TimestampWithTimeZone in AggregationFuzzer as partition keys (it cannot be a
sorting key as it does not support +/-) with PrestoQueryRunner.

The primary challenge is that PrestoQueryRunner does not support TimestampWithTimeZone as an input (it's
not supported in DWRF).  To address this the general strategy is when we see a TimestampWithTimeZone in
the input types we convert it to a BIGINT millisUtc and VARCHAR time_zone and then call from_unixtime as
part of an initial projection in the query to produce TimestampWithTimeZone values.

More concretely I added a general purpose utility library PrestoQueryRunnerIntermediateTypeTransforms to
handle converting Vectors of intermediate only types (custom types not supported in the input) to Vectors of
types supported in the input, and to generate expressions to do the conversion back to the intermediate only
type as part of the query.  This is supported when the types are nested in complex types as well.

I added a function inputProjections to ReferenceQueryRunner which takes in the input batch and does this
conversion and generates the expressions.

The rest of this change is just plumbing these projections into all plans throughout AggregationFuzzer, as well
as the plans generated in the ResultVerifiers.

If this approach is acceptable, it's straight forward to then support custom types like
TimestampWithTimeZone as arguments to aggregates in the fuzzer, as well as add support in other fuzzers
like JoinFuzzer.

Differential Revision: D67346942
  • Loading branch information
Kevin Wilfong authored and facebook-github-bot committed Dec 17, 2024
1 parent ebcb021 commit 61e7dbc
Show file tree
Hide file tree
Showing 19 changed files with 695 additions and 47 deletions.
98 changes: 80 additions & 18 deletions velox/exec/fuzzer/AggregationFuzzer.cpp

Large diffs are not rendered by default.

16 changes: 2 additions & 14 deletions velox/exec/fuzzer/AggregationFuzzerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,28 +210,16 @@ std::vector<std::string> AggregationFuzzerBase::generateKeys(
const std::string& prefix,
std::vector<std::string>& names,
std::vector<TypePtr>& types) {
static const std::vector<TypePtr> kNonFloatingPointTypes{
BOOLEAN(),
TINYINT(),
SMALLINT(),
INTEGER(),
BIGINT(),
VARCHAR(),
VARBINARY(),
TIMESTAMP(),
};

auto numKeys = boost::random::uniform_int_distribution<uint32_t>(1, 5)(rng_);
std::vector<std::string> keys;
for (auto i = 0; i < numKeys; ++i) {
keys.push_back(fmt::format("{}{}", prefix, i));

// Pick random, possibly complex, type.
if (orderableGroupKeys_) {
types.push_back(
vectorFuzzer_.randOrderableType(kNonFloatingPointTypes, 2));
types.push_back(vectorFuzzer_.randOrderableType(supportedKeyTypes_, 2));
} else {
types.push_back(vectorFuzzer_.randType(kNonFloatingPointTypes, 2));
types.push_back(vectorFuzzer_.randType(supportedKeyTypes_, 2));
}
names.push_back(keys.back());
}
Expand Down
8 changes: 8 additions & 0 deletions velox/exec/fuzzer/AggregationFuzzerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ class AggregationFuzzerBase {
dwrf::registerDwrfReaderFactory();
dwrf::registerDwrfWriterFactory();
referenceQueryRunner_->registerCustomVectorFuzzers(vectorFuzzer_);

for (const auto& type : referenceQueryRunner_->supportedScalarTypes()) {
if (!type->isReal() && !type->isDouble()) {
supportedKeyTypes_.push_back(type);
}
}

seed(initialSeed);
}

Expand Down Expand Up @@ -276,6 +283,7 @@ class AggregationFuzzerBase {
std::shared_ptr<memory::MemoryPool> writerPool_{
rootPool_->addAggregateChild("aggregationFuzzerWriter")};
VectorFuzzer vectorFuzzer_;
std::vector<TypePtr> supportedKeyTypes_;
};

// Returns true if the elapsed time is greater than or equal to
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/fuzzer/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
# limitations under the License.

add_library(velox_fuzzer_util DuckQueryRunner.cpp PrestoQueryRunner.cpp
FuzzerUtil.cpp ToSQLUtil.cpp)
FuzzerUtil.cpp ToSQLUtil.cpp
PrestoQueryRunnerIntermediateTypeTransforms.cpp)

target_link_libraries(
velox_fuzzer_util
Expand Down
78 changes: 77 additions & 1 deletion velox/exec/fuzzer/PrestoQueryRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "velox/dwio/common/WriterFactory.h"
#include "velox/dwio/dwrf/writer/Writer.h"
#include "velox/exec/fuzzer/FuzzerUtil.h"
#include "velox/exec/fuzzer/PrestoQueryRunnerIntermediateTypeTransforms.h"
#include "velox/exec/fuzzer/ToSQLUtil.h"
#include "velox/exec/tests/utils/QueryAssertions.h"
#include "velox/functions/prestosql/types/IPAddressType.h"
Expand Down Expand Up @@ -205,6 +206,11 @@ std::optional<std::string> PrestoQueryRunner::toSql(
return toSql(valuesNode);
}

if (const auto tableScanNode =
std::dynamic_pointer_cast<const core::TableScanNode>(plan)) {
return toSql(tableScanNode);
}

VELOX_NYI();
}

Expand Down Expand Up @@ -251,6 +257,7 @@ const std::vector<TypePtr>& PrestoQueryRunner::supportedScalarTypes() const {
VARCHAR(),
VARBINARY(),
TIMESTAMP(),
TIMESTAMP_WITH_TIME_ZONE(),
};
return kScalarTypes;
}
Expand All @@ -262,6 +269,62 @@ void PrestoQueryRunner::registerCustomVectorFuzzers(
std::make_unique<TimestampWithTimeZoneVectorFuzzer>());
}

std::pair<std::vector<RowVectorPtr>, std::vector<core::ExprPtr>>
PrestoQueryRunner::inputProjections(
const std::vector<RowVectorPtr>& input) const {
if (input.empty()) {
return {input, {}};
}

std::vector<core::ExprPtr> projections;
std::vector<std::string> names = input[0]->type()->asRow().names();
std::vector<std::vector<VectorPtr>> children(input.size());
for (int childIndex = 0; childIndex < input[0]->childrenSize();
childIndex++) {
const auto& childType = input[0]->childAt(childIndex)->type();
// If it's an intermediate only type, transform the input and add
// expressions to reverse the transformation. Otherwise the input is
// unchanged and the projection is just an identity mapping.
if (isIntermediateOnlyType(childType)) {
for (int batchIndex = 0; batchIndex < input.size(); batchIndex++) {
children[batchIndex].push_back(transformIntermediateOnlyType(
input[batchIndex]->childAt(childIndex)));
}
projections.push_back(getIntermediateOnlyTypeProjectionExpr(
childType,
std::make_shared<core::FieldAccessExpr>(
names[childIndex], names[childIndex]),
names[childIndex]));
} else {
for (int batchIndex = 0; batchIndex < input.size(); batchIndex++) {
children[batchIndex].push_back(input[batchIndex]->childAt(childIndex));
}

projections.push_back(std::make_shared<core::FieldAccessExpr>(
names[childIndex], names[childIndex]));
}
}

std::vector<TypePtr> types;
for (const auto& child : children[0]) {
types.push_back(child->type());
}

auto rowType = ROW(std::move(names), std::move(types));

std::vector<RowVectorPtr> output;
for (int batchIndex = 0; batchIndex < input.size(); batchIndex++) {
output.push_back(std::make_shared<RowVector>(
input[batchIndex]->pool(),
rowType,
nullptr,
input[batchIndex]->size(),
std::move(children[batchIndex])));
}

return std::make_pair(output, projections);
}

const std::unordered_map<std::string, DataSpec>&
PrestoQueryRunner::aggregationFunctionDataSpecs() const {
// For some functions, velox supports NaN, Infinity better than presto query
Expand Down Expand Up @@ -324,7 +387,12 @@ std::optional<std::string> PrestoQueryRunner::toSql(
}
}

sql << " FROM tmp";
auto sourceSql = toSql(aggregationNode->sources()[0]);
if (!sourceSql.has_value()) {
return std::nullopt;
}

sql << " FROM (" << sourceSql.value() << ")";

if (!groupingKeys.empty()) {
sql << " GROUP BY " << folly::join(", ", groupingKeys);
Expand Down Expand Up @@ -711,6 +779,14 @@ std::optional<std::string> PrestoQueryRunner::toSql(
return "tmp";
}

std::optional<std::string> PrestoQueryRunner::toSql(
const std::shared_ptr<const core::TableScanNode>& tableScanNode) {
if (!isSupportedDwrfType(tableScanNode->outputType())) {
return std::nullopt;
}
return "tmp";
}

std::multiset<std::vector<variant>> PrestoQueryRunner::execute(
const std::string& sql,
const std::vector<RowVectorPtr>& input,
Expand Down
6 changes: 6 additions & 0 deletions velox/exec/fuzzer/PrestoQueryRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ class PrestoQueryRunner : public velox::exec::test::ReferenceQueryRunner {

void registerCustomVectorFuzzers(VectorFuzzer& vectorFuzzer) const override;

std::pair<std::vector<RowVectorPtr>, std::vector<core::ExprPtr>>
inputProjections(const std::vector<RowVectorPtr>& input) const override;

const std::unordered_map<std::string, DataSpec>&
aggregationFunctionDataSpecs() const override;

Expand Down Expand Up @@ -147,6 +150,9 @@ class PrestoQueryRunner : public velox::exec::test::ReferenceQueryRunner {
std::optional<std::string> toSql(
const std::shared_ptr<const core::ValuesNode>& valuesNode);

std::optional<std::string> toSql(
const std::shared_ptr<const core::TableScanNode>& tableScanNode);

std::string startQuery(
const std::string& sql,
const std::string& sessionProperty = "");
Expand Down
Loading

0 comments on commit 61e7dbc

Please sign in to comment.