Skip to content

Commit

Permalink
Add FilterProjectReplayer (facebookincubator#11351)
Browse files Browse the repository at this point in the history
Summary:
Add FilterProjectReplayer. For the plan fragment involving
FilterNode->ProjectNode, users should use the ProjectNode
ID for tracing. This is because the planner will combine these
two operators into a single FilterProject operator. During replay,
the ProjectNode ID will locate the trace data directory.

Part of facebookincubator#9668

Pull Request resolved: facebookincubator#11351

Reviewed By: tanjialiang

Differential Revision: D65320080

Pulled By: xiaoxmeng

fbshipit-source-id: eefa5541f892f08b587c83f78110898e3f041d66
  • Loading branch information
duanmeng authored and facebook-github-bot committed Nov 1, 2024
1 parent 2090d78 commit ad27145
Show file tree
Hide file tree
Showing 10 changed files with 527 additions and 3 deletions.
104 changes: 104 additions & 0 deletions velox/exec/tests/OperatorTraceTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -620,4 +620,108 @@ TEST_F(OperatorTraceTest, traceTableWriter) {
ASSERT_EQ(numOutputVectors, testData.numTracedBatches);
}
}

TEST_F(OperatorTraceTest, filterProject) {
std::vector<RowVectorPtr> inputVectors;
constexpr auto numBatch = 5;
inputVectors.reserve(numBatch);
for (auto i = 0; i < numBatch; ++i) {
inputVectors.push_back(vectorFuzzer_.fuzzInputFlatRow(dataType_));
}

struct {
std::string taskRegExpr;
uint64_t maxTracedBytes;
uint8_t numTracedBatches;
bool limitExceeded;

std::string debugString() const {
return fmt::format(
"taskRegExpr: {}, maxTracedBytes: {}, numTracedBatches: {}, limitExceeded {}",
taskRegExpr,
maxTracedBytes,
numTracedBatches,
limitExceeded);
}
} testSettings[]{
{".*", 10UL << 30, numBatch, false},
{".*", 0, numBatch, true},
{"wrong id", 10UL << 30, 0, false},
{"test_cursor \\d+", 10UL << 30, numBatch, false},
{"test_cursor \\d+", 800, 2, true}};

for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());
const auto outputDir = TempDirectoryPath::create();
core::PlanNodeId projectNodeId;
const auto planNode = PlanBuilder()
.values(inputVectors)
.filter("a % 10 < 9")
.project({"a", "b", "a % 100 + c % 50 AS d"})
.capturePlanNodeId(projectNodeId)
.planNode();
const auto testDir = TempDirectoryPath::create();
const auto traceRoot =
fmt::format("{}/{}", testDir->getPath(), "traceRoot");
std::shared_ptr<Task> task;
if (testData.limitExceeded) {
VELOX_ASSERT_THROW(
AssertQueryBuilder(planNode)
.maxDrivers(1)
.config(core::QueryConfig::kQueryTraceEnabled, true)
.config(core::QueryConfig::kQueryTraceDir, traceRoot)
.config(
core::QueryConfig::kQueryTraceMaxBytes,
testData.maxTracedBytes)
.config(
core::QueryConfig::kQueryTraceTaskRegExp,
testData.taskRegExpr)
.config(core::QueryConfig::kQueryTraceNodeIds, projectNodeId)
.copyResults(pool(), task),
"Query exceeded per-query local trace limit of");
continue;
}
AssertQueryBuilder(planNode)
.maxDrivers(1)
.config(core::QueryConfig::kQueryTraceEnabled, true)
.config(core::QueryConfig::kQueryTraceDir, traceRoot)
.config(core::QueryConfig::kQueryTraceMaxBytes, testData.maxTracedBytes)
.config(core::QueryConfig::kQueryTraceTaskRegExp, testData.taskRegExpr)
.config(core::QueryConfig::kQueryTraceNodeIds, projectNodeId)
.copyResults(pool(), task);

const auto taskTraceDir = getTaskTraceDirectory(traceRoot, *task);
const auto fs = filesystems::getFileSystem(taskTraceDir, nullptr);

if (testData.taskRegExpr == "wrong id") {
ASSERT_FALSE(fs->exists(traceRoot));
continue;
}

// Query metadata file should exist.
const auto traceMetaFilePath = getTaskTraceMetaFilePath(taskTraceDir);
ASSERT_TRUE(fs->exists(traceMetaFilePath));

const auto opTraceDir =
getOpTraceDirectory(taskTraceDir, projectNodeId, 0, 0);

ASSERT_EQ(fs->list(opTraceDir).size(), 2);

const auto summary = OperatorTraceSummaryReader(opTraceDir, pool()).read();
const auto reader =
trace::OperatorTraceInputReader(opTraceDir, dataType_, pool());
RowVectorPtr actual;
size_t numOutputVectors{0};
while (reader.read(actual)) {
const auto& expected = inputVectors[numOutputVectors];
const auto size = actual->size();
ASSERT_EQ(size, expected->size());
for (auto i = 0; i < size; ++i) {
actual->compare(expected.get(), i, i, {.nullsFirst = true});
}
++numOutputVectors;
}
ASSERT_EQ(numOutputVectors, testData.numTracedBatches);
}
}
} // namespace facebook::velox::exec::trace::test
12 changes: 12 additions & 0 deletions velox/exec/tests/utils/RowContainerTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ class RowContainerTestBase : public testing::Test,
public velox::test::VectorTestBase {
protected:
void SetUp() override {
if (!isRegisteredVectorSerde()) {
facebook::velox::serializer::presto::PrestoVectorSerde::
registerVectorSerde();
}
if (!isRegisteredVectorSerde()) {
facebook::velox::serializer::presto::PrestoVectorSerde::
registerVectorSerde();
}
if (!isRegisteredVectorSerde()) {
facebook::velox::serializer::presto::PrestoVectorSerde::
registerVectorSerde();
}
if (!isRegisteredVectorSerde()) {
facebook::velox::serializer::presto::PrestoVectorSerde::
registerVectorSerde();
Expand Down
3 changes: 2 additions & 1 deletion velox/tool/trace/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ velox_add_library(
OperatorReplayerBase.cpp
PartitionedOutputReplayer.cpp
TableWriterReplayer.cpp
FilterProjectReplayer.cpp
TraceReplayRunner.cpp)

velox_link_libraries(
Expand All @@ -34,7 +35,7 @@ velox_link_libraries(
glog::glog
gflags::gflags)

add_executable(velox_query_replayer TraceReplayerMain.cpp)
add_executable(velox_query_replayer TraceReplayerMain.cpp TraceReplayRunner.cpp)

target_link_libraries(
velox_query_replayer
Expand Down
76 changes: 76 additions & 0 deletions velox/tool/trace/FilterProjectReplayer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/tool/trace/FilterProjectReplayer.h"
#include "velox/exec/TraceUtil.h"
#include "velox/exec/tests/utils/PlanBuilder.h"

using namespace facebook::velox;
using namespace facebook::velox::exec;
using namespace facebook::velox::exec::test;

namespace facebook::velox::tool::trace {
core::PlanNodePtr FilterProjectReplayer::createPlanNode(
const core::PlanNode* node,
const core::PlanNodeId& nodeId,
const core::PlanNodePtr& source) const {
if (node->name() == "Filter") {
const auto* filterNode = dynamic_cast<const core::FilterNode*>(node);
VELOX_CHECK(
!isFilterProject(dynamic_cast<const core::FilterNode*>(node)),
"If the target node is a FilterNode, it must be a standalone FilterNode");

// A standalone FilterNode.
return std::make_shared<core::FilterNode>(
nodeId, filterNode->filter(), source);
}

const auto* projectNode = dynamic_cast<const core::ProjectNode*>(node);

// A standalone ProjectNode.
if (node->sources().empty() || node->sources().front()->name() != "Filter") {
return std::make_shared<core::ProjectNode>(
nodeId, projectNode->names(), projectNode->projections(), source);
}

// A ProjectNode with a FilterNode as its source.
// -- ProjectNode [nodeId]
// -- FilterNode [nodeId - 1]
const auto originalFilterNode =
std::dynamic_pointer_cast<const core::FilterNode>(
node->sources().front());
const auto filterNode = std::make_shared<core::FilterNode>(
nodeId, originalFilterNode->filter(), source);
const auto projectNodeId = planNodeIdGenerator_->next();
return std::make_shared<core::ProjectNode>(
projectNodeId,
projectNode->names(),
projectNode->projections(),
filterNode);
}

bool FilterProjectReplayer::isFilterProject(
const core::PlanNode* filterNode) const {
const auto* projectNode =
dynamic_cast<const core::ProjectNode*>(core::PlanNode::findFirstNode(
planFragment_.get(), [this](const core::PlanNode* node) {
return node->id() == std::to_string(std::stoull(nodeId_) + 1);
}));
return projectNode != nullptr && projectNode->name() == "Project" &&
projectNode->sources().size() == 1 &&
projectNode->sources().front()->id() == nodeId_;
}
} // namespace facebook::velox::tool::trace
60 changes: 60 additions & 0 deletions velox/tool/trace/FilterProjectReplayer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/core/PlanNode.h"
#include "velox/tool/trace/OperatorReplayerBase.h"

namespace facebook::velox::tool::trace {
/// The replayer to replay the traced 'FilterProject' operator.
///
/// NOTE: For the plan fragment involving FilterNode->ProjectNode, users must
/// use the ProjectNode ID for tracing. This is because the planner will combine
/// these two operators into a single FilterProject operator. During replay,
/// the ProjectNode ID will be used to locate the trace data directory.
class FilterProjectReplayer : public OperatorReplayerBase {
public:
FilterProjectReplayer(
const std::string& rootDir,
const std::string& queryId,
const std::string& taskId,
const std::string& nodeId,
const int32_t pipelineId,
const std::string& operatorType)
: OperatorReplayerBase(
rootDir,
queryId,
taskId,
nodeId,
pipelineId,
operatorType) {}

private:
// Create either a standalone FilterNode, a standalone ProjectNode, or a
// ProjectNode with a FilterNode as its source.
//
// NOTE: If the target node is a FilterNode, it must be a standalone
// FilterNode, without a ProjectNode as its parent.
core::PlanNodePtr createPlanNode(
const core::PlanNode* node,
const core::PlanNodeId& nodeId,
const core::PlanNodePtr& source) const override;

// Checks whether the FilterNode is a source node of a ProjectNode.
bool isFilterProject(const core::PlanNode* filterNode) const;
};
} // namespace facebook::velox::tool::trace
2 changes: 1 addition & 1 deletion velox/tool/trace/OperatorReplayerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ core::PlanNodePtr OperatorReplayerBase::createPlan() const {
const auto* replayNode = core::PlanNode::findFirstNode(
planFragment_.get(),
[this](const core::PlanNode* node) { return node->id() == nodeId_; });
return exec::test::PlanBuilder()
return exec::test::PlanBuilder(planNodeIdGenerator_)
.traceScan(
nodeTraceDir_,
pipelineId_,
Expand Down
4 changes: 4 additions & 0 deletions velox/tool/trace/OperatorReplayerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "velox/common/file/FileSystems.h"
#include "velox/core/PlanNode.h"
#include "velox/exec/tests/utils/PlanBuilder.h"

namespace facebook::velox::exec {
class Task;
Expand Down Expand Up @@ -62,6 +63,9 @@ class OperatorReplayerBase {
const std::shared_ptr<filesystems::FileSystem> fs_;
const int32_t maxDrivers_;

const std::shared_ptr<core::PlanNodeIdGenerator> planNodeIdGenerator_{
std::make_shared<core::PlanNodeIdGenerator>()};

std::unordered_map<std::string, std::string> queryConfigs_;
std::unordered_map<std::string, std::unordered_map<std::string, std::string>>
connectorConfigs_;
Expand Down
9 changes: 9 additions & 0 deletions velox/tool/trace/TraceReplayRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "velox/functions/prestosql/registration/RegistrationFunctions.h"
#include "velox/parse/TypeResolver.h"
#include "velox/tool/trace/AggregationReplayer.h"
#include "velox/tool/trace/FilterProjectReplayer.h"
#include "velox/tool/trace/OperatorReplayerBase.h"
#include "velox/tool/trace/PartitionedOutputReplayer.h"
#include "velox/tool/trace/TableWriterReplayer.h"
Expand Down Expand Up @@ -109,6 +110,14 @@ std::unique_ptr<tool::trace::OperatorReplayerBase> createReplayer() {
FLAGS_node_id,
FLAGS_pipeline_id,
FLAGS_operator_type);
} else if (FLAGS_operator_type == "FilterProject") {
replayer = std::make_unique<tool::trace::FilterProjectReplayer>(
FLAGS_root_dir,
FLAGS_query_id,
FLAGS_task_id,
FLAGS_node_id,
FLAGS_pipeline_id,
FLAGS_operator_type);
} else {
VELOX_UNSUPPORTED("Unsupported operator type: {}", FLAGS_operator_type);
}
Expand Down
2 changes: 1 addition & 1 deletion velox/tool/trace/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
add_executable(
velox_tool_trace_test
AggregationReplayerTest.cpp PartitionedOutputReplayerTest.cpp
TableWriterReplayerTest.cpp)
TableWriterReplayerTest.cpp FilterProjectReplayerTest.cpp)

add_test(
NAME velox_tool_trace_test
Expand Down
Loading

0 comments on commit ad27145

Please sign in to comment.