Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add query trace TaskRunner #11927

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions velox/docs/develop/debugging/tracing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -366,3 +366,7 @@ Here is a full list of supported command line arguments.
* ``--shuffle_serialization_format``: Specify the shuffle serialization format.
* ``--table_writer_output_dir``: Specify the output directory of TableWriter.
* ``--hiveConnectorExecutorHwMultiplier``: Hardware multiplier for hive connector.
* ``--driver_cpu_executor_hw_multiplier``: Hardware multipler for driver cpu executor.
* ``--memory_arbitrator_type``: Specify the memory arbitrator type.
* ``--query_memory_capacity_gb``: Specify the query memory capacity limit in GB. If it is zero, then there is no limit.
* ``--copy_results``: If true, copy the replaying result.
3 changes: 2 additions & 1 deletion velox/tool/trace/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ add_library(
PartitionedOutputReplayer.cpp
TableScanReplayer.cpp
TableWriterReplayer.cpp
TraceReplayRunner.cpp)
TraceReplayRunner.cpp
TraceReplayTaskRunner.cpp)

target_link_libraries(
velox_query_trace_replayer_base
Expand Down
49 changes: 25 additions & 24 deletions velox/tool/trace/OperatorReplayerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"
#include "velox/tool/trace/OperatorReplayerBase.h"
#include "velox/tool/trace/TraceReplayTaskRunner.h"

using namespace facebook::velox;

Expand Down Expand Up @@ -72,36 +73,18 @@ OperatorReplayerBase::OperatorReplayerBase(
queryConfigs_[core::QueryConfig::kQueryTraceEnabled] = "false";
}

RowVectorPtr OperatorReplayerBase::run() {
std::shared_ptr<exec::Task> task;
const auto restoredPlanNode = createPlan();
auto queryPool = memory::memoryManager()->addRootPool(
"OperatorReplayerBase", queryCapacity_);
std::unordered_map<std::string, std::shared_ptr<config::ConfigBase>>
connectorConfigs;
for (auto& [connectorId, configs] : connectorConfigs_) {
connectorConfigs.emplace(
connectorId, std::make_shared<config::ConfigBase>(std::move(configs)));
}
auto queryCtx = core::QueryCtx::create(
executor_,
core::QueryConfig{queryConfigs_},
std::move(connectorConfigs),
nullptr,
std::move(queryPool),
executor_);

RowVectorPtr OperatorReplayerBase::run(bool copyResults) {
auto queryCtx = createQueryCtx();
std::shared_ptr<exec::test::TempDirectoryPath> spillDirectory;
if (queryCtx->queryConfig().spillEnabled()) {
spillDirectory = exec::test::TempDirectoryPath::create();
}

const auto result =
exec::test::AssertQueryBuilder(restoredPlanNode)
.maxDrivers(driverIds_.size())
.queryCtx(std::move(queryCtx))
TraceReplayTaskRunner traceTaskRunner(createPlan(), std::move(queryCtx));
auto [task, result] =
traceTaskRunner.maxDrivers(driverIds_.size())
.spillDirectory(spillDirectory ? spillDirectory->getPath() : "")
.copyResults(memory::MemoryManager::getInstance()->tracePool(), task);
.run(copyResults);
printStats(task);
return result;
}
Expand Down Expand Up @@ -129,6 +112,24 @@ core::PlanNodePtr OperatorReplayerBase::createPlan() {
.planNode();
}

std::shared_ptr<core::QueryCtx> OperatorReplayerBase::createQueryCtx() {
auto queryPool = memory::memoryManager()->addRootPool(
fmt::format("{}_replayer", operatorType_), queryCapacity_);
std::unordered_map<std::string, std::shared_ptr<config::ConfigBase>>
connectorConfigs;
for (auto& [connectorId, configs] : connectorConfigs_) {
connectorConfigs.emplace(
connectorId, std::make_shared<config::ConfigBase>(std::move(configs)));
}
return core::QueryCtx::create(
executor_,
core::QueryConfig{queryConfigs_},
std::move(connectorConfigs),
nullptr,
std::move(queryPool),
executor_);
}

std::function<core::PlanNodePtr(std::string, core::PlanNodePtr)>
OperatorReplayerBase::replayNodeFactory(const core::PlanNode* node) const {
return [=](const core::PlanNodeId& nodeId,
Expand Down
5 changes: 4 additions & 1 deletion velox/tool/trace/OperatorReplayerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "velox/common/file/FileSystems.h"
#include "velox/core/PlanNode.h"
#include "velox/core/QueryCtx.h"
#include "velox/parse/PlanNodeIdGenerator.h"

namespace facebook::velox::exec {
Expand All @@ -44,7 +45,7 @@ class OperatorReplayerBase {
OperatorReplayerBase& operator=(OperatorReplayerBase&& other) noexcept =
delete;

virtual RowVectorPtr run();
virtual RowVectorPtr run(bool copyResults = true);

protected:
virtual core::PlanNodePtr createPlanNode(
Expand All @@ -54,6 +55,8 @@ class OperatorReplayerBase {

core::PlanNodePtr createPlan();

std::shared_ptr<core::QueryCtx> createQueryCtx();

const std::string queryId_;
const std::string taskId_;
const std::string nodeId_;
Expand Down
2 changes: 1 addition & 1 deletion velox/tool/trace/PartitionedOutputReplayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ PartitionedOutputReplayer::PartitionedOutputReplayer(
std::make_shared<folly::NamedThreadFactory>("Consumer"));
}

RowVectorPtr PartitionedOutputReplayer::run() {
RowVectorPtr PartitionedOutputReplayer::run(bool /*unused*/) {
const auto task = Task::create(
"local://partitioned-output-replayer",
core::PlanFragment{createPlan()},
Expand Down
2 changes: 1 addition & 1 deletion velox/tool/trace/PartitionedOutputReplayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class PartitionedOutputReplayer final : public OperatorReplayerBase {
folly::Executor* executor,
const ConsumerCallBack& consumerCb = [](auto partition, auto page) {});

RowVectorPtr run() override;
RowVectorPtr run(bool /*unused*/) override;

private:
core::PlanNodePtr createPlanNode(
Expand Down
17 changes: 7 additions & 10 deletions velox/tool/trace/TableScanReplayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,25 @@
*/

#include "velox/tool/trace/TableScanReplayer.h"

#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/exec/OperatorTraceReader.h"
#include "velox/exec/TraceUtil.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/tool/trace/TraceReplayTaskRunner.h"

using namespace facebook::velox;
using namespace facebook::velox::exec;
using namespace facebook::velox::exec::test;

namespace facebook::velox::tool::trace {

RowVectorPtr TableScanReplayer::run() {
std::shared_ptr<exec::Task> task;
const auto plan = createPlan();
const auto result =
exec::test::AssertQueryBuilder(plan)
.maxDrivers(driverIds_.size())
.configs(queryConfigs_)
.connectorSessionProperties(connectorConfigs_)
.splits(getSplits())
.copyResults(memory::MemoryManager::getInstance()->tracePool(), task);
RowVectorPtr TableScanReplayer::run(bool copyResults) {
TraceReplayTaskRunner traceTaskRunner(createPlan(), createQueryCtx());
auto [task, result] = traceTaskRunner.maxDrivers(driverIds_.size())
.splits(replayPlanNodeId_, getSplits())
.run(copyResults);
printStats(task);
return result;
}
Expand Down
2 changes: 1 addition & 1 deletion velox/tool/trace/TableScanReplayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class TableScanReplayer final : public OperatorReplayerBase {
queryCapacity,
executor) {}

RowVectorPtr run() override;
RowVectorPtr run(bool copyResults = true) override;

private:
core::PlanNodePtr createPlanNode(
Expand Down
3 changes: 2 additions & 1 deletion velox/tool/trace/TraceReplayRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ DEFINE_uint64(
query_memory_capacity_gb,
0,
"Specify the query memory capacity limit in GB. If it is zero, then there is no limit.");
DEFINE_bool(copy_results, false, "Copy the replaying results.");

namespace facebook::velox::tool::trace {
namespace {
Expand Down Expand Up @@ -395,6 +396,6 @@ void TraceReplayRunner::run() {
return;
}
VELOX_USER_CHECK(!FLAGS_task_id.empty(), "--task_id must be provided");
createReplayer()->run();
createReplayer()->run(FLAGS_copy_results);
}
} // namespace facebook::velox::tool::trace
2 changes: 2 additions & 0 deletions velox/tool/trace/TraceReplayRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ DECLARE_string(node_id);
DECLARE_int32(driver_id);
DECLARE_string(driver_ids);
DECLARE_string(table_writer_output_dir);
DECLARE_double(hive_connector_executor_hw_multiplier);
DECLARE_int32(shuffle_serialization_format);
DECLARE_uint64(query_memory_capacity_gb);
DECLARE_double(driver_cpu_executor_hw_multiplier);
DECLARE_string(memory_arbitrator_type);
DECLARE_bool(copy_results);

namespace facebook::velox::tool::trace {

Expand Down
84 changes: 84 additions & 0 deletions velox/tool/trace/TraceReplayTaskRunner.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/tool/trace/TraceReplayTaskRunner.h"

namespace facebook::velox::tool::trace {

std::pair<std::shared_ptr<exec::Task>, RowVectorPtr> TraceReplayTaskRunner::run(
bool copyResults) {
auto cursor = exec::test::TaskCursor::create(cursorParams_);
std::vector<RowVectorPtr> results;
auto* task = cursor->task().get();
addSplits(task);

while (cursor->moveNext()) {
results.push_back(cursor->current());
}

task->taskCompletionFuture().wait();

if (copyResults) {
return {cursor->task(), copy(results)};
}

return {cursor->task(), nullptr};
}

std::shared_ptr<RowVector> TraceReplayTaskRunner::copy(
const std::vector<RowVectorPtr>& results) {
auto totalRows = 0;
for (const auto& result : results) {
totalRows += result->size();
}
auto copyResult = BaseVector::create<RowVector>(
results[0]->type(), totalRows, memory::traceMemoryPool());
auto resultRowOffset = 0;
for (const auto& result : results) {
copyResult->copy(result.get(), resultRowOffset, 0, result->size());
resultRowOffset += result->size();
}
return copyResult;
}

TraceReplayTaskRunner& TraceReplayTaskRunner::maxDrivers(int32_t maxDrivers) {
cursorParams_.maxDrivers = maxDrivers;
return *this;
}

TraceReplayTaskRunner& TraceReplayTaskRunner::spillDirectory(
const std::string& dir) {
cursorParams_.spillDirectory = dir;
return *this;
}

TraceReplayTaskRunner& TraceReplayTaskRunner::splits(
const core::PlanNodeId& planNodeId,
std::vector<exec::Split> splits) {
splits_[planNodeId] = std::move(splits);
return *this;
}

void TraceReplayTaskRunner::addSplits(exec::Task* task) {
for (auto& [nodeId, nodeSplits] : splits_) {
for (auto& split : nodeSplits) {
task->addSplit(nodeId, std::move(split));
}
task->noMoreSplits(nodeId);
}
}

} // namespace facebook::velox::tool::trace
60 changes: 60 additions & 0 deletions velox/tool/trace/TraceReplayTaskRunner.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <folly/executors/IOThreadPoolExecutor.h>
#include "velox/exec/Cursor.h"

namespace facebook::velox::tool::trace {

class TraceReplayTaskRunner {
public:
explicit TraceReplayTaskRunner(
core::PlanNodePtr plan,
std::shared_ptr<core::QueryCtx> queryCtx) {
cursorParams_.planNode = std::move(plan);
cursorParams_.queryCtx = std::move(queryCtx);
}

/// Run the replaying task. Return the copied results if 'copyResults' is
/// true or return null.
std::pair<std::shared_ptr<exec::Task>, RowVectorPtr> run(
bool copyResults = false);

/// Max number of drivers. Default is 1.
TraceReplayTaskRunner& maxDrivers(int32_t maxDrivers);

/// Spilling directory, if not empty, then the task's spilling directory would
/// be built from it.
TraceReplayTaskRunner& spillDirectory(const std::string& dir);

/// Splits of this task.
TraceReplayTaskRunner& splits(
const core::PlanNodeId& planNodeId,
std::vector<exec::Split> splits);

private:
void addSplits(exec::Task* task);

static std::shared_ptr<RowVector> copy(
const std::vector<RowVectorPtr>& results);

exec::test::CursorParameters cursorParams_;
std::unordered_map<core::PlanNodeId, std::vector<exec::Split>> splits_;
bool noMoreSplits_ = false;
};
} // namespace facebook::velox::tool::trace
Loading
Loading