Skip to content

Commit

Permalink
Advance Velox and adjust presto unit tests accordingly
Browse files Browse the repository at this point in the history
  • Loading branch information
zation99 committed Jan 6, 2025
1 parent 7913aa4 commit a88341e
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ class BroadcastTest : public exec::test::OperatorTestBase {
std::dynamic_pointer_cast<const BroadcastWriteNode>(writerPlan)
->serdeRowType();

exec::test::CursorParameters params;
exec::CursorParameters params;
params.planNode = writerPlan;
auto [taskCursor, results] = readCursor(params, [](auto /*task*/) {});
auto [taskCursor, results] = exec::test::readCursor(params, [](auto /*task*/) {});

std::vector<std::string> broadcastFilePaths;
for (auto result : results) {
Expand All @@ -97,7 +97,7 @@ class BroadcastTest : public exec::test::OperatorTestBase {
}

std::pair<
std::unique_ptr<velox::exec::test::TaskCursor>,
std::unique_ptr<velox::exec::TaskCursor>,
std::vector<RowVectorPtr>>
executeBroadcastRead(
RowTypePtr dataType,
Expand All @@ -107,7 +107,7 @@ class BroadcastTest : public exec::test::OperatorTestBase {
auto readerPlan = exec::test::PlanBuilder()
.exchange(dataType, velox::VectorSerde::Kind::kPresto)
.planNode();
exec::test::CursorParameters broadcastReadParams;
exec::CursorParameters broadcastReadParams;
broadcastReadParams.planNode = readerPlan;

std::vector<std::string> fileInfos;
Expand All @@ -118,7 +118,7 @@ class BroadcastTest : public exec::test::OperatorTestBase {

uint8_t splitIndex = 0;
// Read back result using BroadcastExchangeSource.
return readCursor(broadcastReadParams, [&](auto* task) {
return exec::test::readCursor(broadcastReadParams, [&](auto* task) {
if (splitIndex >= broadcastFilePaths.size()) {
task->noMoreSplits("0");
return;
Expand Down Expand Up @@ -361,11 +361,11 @@ TEST_F(BroadcastTest, malformedBroadcastInfoJson) {
auto readerPlan = exec::test::PlanBuilder()
.exchange(dataType, velox::VectorSerde::Kind::kPresto)
.planNode();
exec::test::CursorParameters broadcastReadParams;
exec::CursorParameters broadcastReadParams;
broadcastReadParams.planNode = readerPlan;

VELOX_ASSERT_THROW(
readCursor(
exec::test::readCursor(
broadcastReadParams,
[&](auto* task) {
auto fileInfos =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,10 +407,10 @@ class UnsafeRowShuffleTest : public exec::test::OperatorTestBase {
void testPartitionAndSerialize(
const core::PlanNodePtr& plan,
const RowVectorPtr& expected,
const exec::test::CursorParameters params,
const exec::CursorParameters params,
const std::optional<uint32_t> expectedOutputCount = std::nullopt) {
auto [taskCursor, serializedResults] =
readCursor(params, [](auto /*task*/) {});
exec::test::readCursor(params, [](auto /*task*/) {});

RowVectorPtr result =
BaseVector::create<RowVector>(expected->type(), 0, pool());
Expand All @@ -432,18 +432,18 @@ class UnsafeRowShuffleTest : public exec::test::OperatorTestBase {
void testPartitionAndSerialize(
const core::PlanNodePtr& plan,
const RowVectorPtr& expected) {
exec::test::CursorParameters params;
exec::CursorParameters params;
params.planNode = plan;
params.maxDrivers = 2;
testPartitionAndSerialize(plan, expected, params);
}

std::pair<std::unique_ptr<exec::test::TaskCursor>, std::vector<RowVectorPtr>>
std::pair<std::unique_ptr<exec::TaskCursor>, std::vector<RowVectorPtr>>
runShuffleReadTask(
const exec::test::CursorParameters& params,
const exec::CursorParameters& params,
const std::string& shuffleInfo) {
bool noMoreSplits = false;
return readCursor(params, [&](auto* task) {
return exec::test::readCursor(params, [&](auto* task) {
if (noMoreSplits) {
return;
}
Expand Down Expand Up @@ -527,7 +527,7 @@ class UnsafeRowShuffleTest : public exec::test::OperatorTestBase {
.project(dataType->names())
.planNode();

exec::test::CursorParameters params;
exec::CursorParameters params;
params.planNode = plan;
params.destination = partition;

Expand Down Expand Up @@ -713,7 +713,7 @@ class UnsafeRowShuffleTest : public exec::test::OperatorTestBase {

auto queryCtx =
core::QueryCtx::create(executor_.get(), core::QueryConfig(properties));
auto params = exec::test::CursorParameters();
auto params = exec::CursorParameters();
params.planNode = plan;
params.queryCtx = queryCtx;

Expand Down Expand Up @@ -743,12 +743,12 @@ TEST_F(UnsafeRowShuffleTest, operators) {
4, std::string(TestShuffleFactory::kShuffleName), info))
.planNode();

exec::test::CursorParameters params;
exec::CursorParameters params;
params.planNode = plan;
params.maxDrivers = 2;

auto [taskCursor, serializedResults] =
readCursor(params, [](auto /*task*/) {});
exec::test::readCursor(params, [](auto /*task*/) {});
ASSERT_EQ(serializedResults.size(), 0);
TestShuffleWriter::reset();
}
Expand All @@ -769,7 +769,7 @@ DEBUG_ONLY_TEST_F(UnsafeRowShuffleTest, shuffleWriterExceptions) {
VELOX_CHECK(nullFunction());
}));

exec::test::CursorParameters params;
exec::CursorParameters params;
params.planNode =
exec::test::PlanBuilder()
.values({data})
Expand All @@ -779,7 +779,7 @@ DEBUG_ONLY_TEST_F(UnsafeRowShuffleTest, shuffleWriterExceptions) {
.planNode();

VELOX_ASSERT_THROW(
readCursor(params, [](auto /*task*/) {}),
exec::test::readCursor(params, [](auto /*task*/) {}),
"ShuffleWriter::collect failed");

TestShuffleWriter::reset();
Expand All @@ -795,7 +795,7 @@ DEBUG_ONLY_TEST_F(UnsafeRowShuffleTest, shuffleReaderExceptions) {
auto info = testShuffleInfo(4, 1 << 20 /* 1MB */);
TestShuffleWriter::createWriter(info, pool());

exec::test::CursorParameters params;
exec::CursorParameters params;
params.planNode =
exec::test::PlanBuilder()
.values({data})
Expand All @@ -804,7 +804,7 @@ DEBUG_ONLY_TEST_F(UnsafeRowShuffleTest, shuffleReaderExceptions) {
2, std::string(TestShuffleFactory::kShuffleName), info))
.planNode();

ASSERT_NO_THROW(readCursor(params, [](auto /*task*/) {}));
ASSERT_NO_THROW(exec::test::readCursor(params, [](auto /*task*/) {}));

std::function<void(TestShuffleReader*)> injectFailure =
[&](TestShuffleReader* /*reader*/) {
Expand Down

0 comments on commit a88341e

Please sign in to comment.