Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix and improve table scan tracing and make to e2e work #11867

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions velox/exec/OperatorTraceReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,16 @@ OperatorTraceSummary OperatorTraceSummaryReader::read() const {
folly::dynamic summaryObj = folly::parseJson(summaryStr);
OperatorTraceSummary summary;
summary.opType = summaryObj[OperatorTraceTraits::kOpTypeKey].asString();
if (summary.opType == "TableScan") {
summary.numSplits = summaryObj[OperatorTraceTraits::kNumSplitsKey].asInt();
}
summary.peakMemory = summaryObj[OperatorTraceTraits::kPeakMemoryKey].asInt();
summary.inputRows = summaryObj[OperatorTraceTraits::kInputRowsKey].asInt();
summary.inputBytes = summaryObj[OperatorTraceTraits::kInputBytesKey].asInt();
summary.rawInputRows =
summaryObj[OperatorTraceTraits::kRawInputRowsKey].asInt();
summary.rawInputBytes =
summaryObj[OperatorTraceTraits::kRawInputBytesKey].asInt();
return summary;
}

Expand Down
33 changes: 17 additions & 16 deletions velox/exec/OperatorTraceWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,21 @@
#include "velox/exec/TraceUtil.h"

namespace facebook::velox::exec::trace {
namespace {
void recordOperatorSummary(Operator* op, folly::dynamic& obj) {
obj[OperatorTraceTraits::kOpTypeKey] = op->operatorType();
const auto stats = op->stats(/*clear=*/false);
if (op->operatorType() == "TableScan") {
obj[OperatorTraceTraits::kNumSplitsKey] = stats.numSplits;
}
obj[OperatorTraceTraits::kPeakMemoryKey] =
stats.memoryStats.peakTotalMemoryReservation;
obj[OperatorTraceTraits::kInputRowsKey] = stats.inputPositions;
obj[OperatorTraceTraits::kInputBytesKey] = stats.inputBytes;
obj[OperatorTraceTraits::kRawInputRowsKey] = stats.rawInputPositions;
obj[OperatorTraceTraits::kRawInputBytesKey] = stats.rawInputBytes;
}
} // namespace

OperatorTraceInputWriter::OperatorTraceInputWriter(
Operator* traceOp,
Expand All @@ -47,9 +62,6 @@ void OperatorTraceInputWriter::write(const RowVectorPtr& rows) {
if (FOLLY_UNLIKELY(finished_)) {
return;
}
if (FOLLY_UNLIKELY(dataType_ == nullptr)) {
dataType_ = rows->type();
}

if (batch_ == nullptr) {
batch_ = std::make_unique<VectorStreamGroup>(pool_, serde_);
Expand Down Expand Up @@ -88,14 +100,7 @@ void OperatorTraceInputWriter::writeSummary() const {
const auto summaryFilePath = getOpTraceSummaryFilePath(traceDir_);
const auto file = fs_->openFileForWrite(summaryFilePath);
folly::dynamic obj = folly::dynamic::object;
if (dataType_ != nullptr) {
obj[TraceTraits::kDataTypeKey] = dataType_->serialize();
}
obj[OperatorTraceTraits::kOpTypeKey] = traceOp_->operatorType();
const auto stats = traceOp_->stats(/*clear=*/false);
obj[OperatorTraceTraits::kPeakMemoryKey] =
stats.memoryStats.peakTotalMemoryReservation;
obj[OperatorTraceTraits::kInputRowsKey] = stats.inputPositions;
recordOperatorSummary(traceOp_, obj);
file->append(folly::toJson(obj));
file->close();
}
Expand Down Expand Up @@ -151,11 +156,7 @@ void OperatorTraceSplitWriter::writeSummary() const {
const auto summaryFilePath = getOpTraceSummaryFilePath(traceDir_);
const auto file = fs_->openFileForWrite(summaryFilePath);
folly::dynamic obj = folly::dynamic::object;
obj[OperatorTraceTraits::kOpTypeKey] = traceOp_->operatorType();
const auto stats = traceOp_->stats(/*clear=*/false);
obj[OperatorTraceTraits::kPeakMemoryKey] =
stats.memoryStats.peakTotalMemoryReservation;
obj[OperatorTraceTraits::kNumSplits] = stats.numSplits;
recordOperatorSummary(traceOp_, obj);
file->append(folly::toJson(obj));
file->close();
}
Expand Down
1 change: 0 additions & 1 deletion velox/exec/OperatorTraceWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ class OperatorTraceInputWriter {
const UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB_;

std::unique_ptr<WriteFile> traceFile_;
TypePtr dataType_;
std::unique_ptr<VectorStreamGroup> batch_;
bool limitExceeded_{false};
bool finished_{false};
Expand Down
28 changes: 23 additions & 5 deletions velox/exec/Trace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,33 @@

#include <fmt/core.h>

#include "velox/common/base/Exceptions.h"
#include "velox/common/base/SuccinctPrinter.h"

namespace facebook::velox::exec::trace {

std::string OperatorTraceSummary::toString() const {
return fmt::format(
"opType {}, inputRows {}, peakMemory {}",
opType,
inputRows,
succinctBytes(peakMemory));
if (numSplits.has_value()) {
VELOX_CHECK_EQ(opType, "TableScan");
return fmt::format(
"opType {}, numSplits {}, inputRows {}, inputBytes {}, rawInputRows {}, rawInputBytes {}, peakMemory {}",
opType,
numSplits.value(),
inputRows,
succinctBytes(inputBytes),
rawInputRows,
succinctBytes(rawInputBytes),
succinctBytes(peakMemory));
} else {
VELOX_CHECK_NE(opType, "TableScan");
return fmt::format(
"opType {}, inputRows {}, inputBytes {}, rawInputRows {}, rawInputBytes {}, peakMemory {}",
opType,
inputRows,
succinctBytes(inputBytes),
rawInputRows,
succinctBytes(rawInputBytes),
succinctBytes(peakMemory));
}
}
} // namespace facebook::velox::exec::trace
14 changes: 12 additions & 2 deletions velox/exec/Trace.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
#pragma once

#include <cstdint>
#include <optional>
#include <string>

namespace facebook::velox::exec::trace {
/// Defines the shared constants used by query trace implementation.
struct TraceTraits {
static inline const std::string kPlanNodeKey = "planNode";
static inline const std::string kQueryConfigKey = "queryConfig";
static inline const std::string kDataTypeKey = "rowType";
static inline const std::string kConnectorPropertiesKey =
"connectorProperties";

Expand All @@ -40,13 +40,23 @@ struct OperatorTraceTraits {
static inline const std::string kOpTypeKey = "opType";
static inline const std::string kPeakMemoryKey = "peakMemory";
static inline const std::string kInputRowsKey = "inputRows";
static inline const std::string kNumSplits = "numSplits";
static inline const std::string kInputBytesKey = "inputBytes";
static inline const std::string kRawInputRowsKey = "rawInputRows";
static inline const std::string kRawInputBytesKey = "rawInputBytes";
static inline const std::string kNumSplitsKey = "numSplits";
};

/// Contains the summary of an operator trace.
struct OperatorTraceSummary {
std::string opType;
/// The number of splits processed by a table scan operator, nullopt for the
/// other operator types.
std::optional<uint32_t> numSplits{std::nullopt};

uint64_t inputRows{0};
uint64_t inputBytes{0};
uint64_t rawInputRows{0};
uint64_t rawInputBytes{0};
uint64_t peakMemory{0};

std::string toString() const;
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/tests/OperatorTraceTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,10 @@ TEST_F(OperatorTraceTest, traceData) {
ASSERT_EQ(summary.opType, "Aggregation");
ASSERT_GT(summary.peakMemory, 0);
ASSERT_EQ(summary.inputRows, testData.numTracedBatches * 16);
ASSERT_GT(summary.inputBytes, 0);
ASSERT_EQ(summary.rawInputRows, 0);
ASSERT_EQ(summary.rawInputBytes, 0);
ASSERT_FALSE(summary.numSplits.has_value());

const auto reader = OperatorTraceInputReader(opTraceDir, dataType_, pool());
RowVectorPtr actual;
Expand Down
10 changes: 9 additions & 1 deletion velox/exec/tests/TraceUtilTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,15 @@ TEST_F(TraceUtilTest, OperatorTraceSummary) {
summary.inputRows = 100;
summary.peakMemory = 200;
ASSERT_EQ(
summary.toString(), "opType summary, inputRows 100, peakMemory 200B");
summary.toString(),
"opType summary, inputRows 100, inputBytes 0B, rawInputRows 0, rawInputBytes 0B, peakMemory 200B");
summary.numSplits = 10;
summary.rawInputBytes = 222;
VELOX_ASSERT_THROW(summary.toString(), "summary vs. TableScan");
summary.opType = "TableScan";
ASSERT_EQ(
summary.toString(),
"opType TableScan, numSplits 10, inputRows 100, inputBytes 0B, rawInputRows 0, rawInputBytes 222B, peakMemory 200B");
}

TEST_F(TraceUtilTest, traceDirectoryLayoutUtilities) {
Expand Down
24 changes: 14 additions & 10 deletions velox/tool/trace/TraceReplayRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,9 @@ void TraceReplayRunner::init() {
VELOX_USER_CHECK(!FLAGS_query_id.empty(), "--query_id must be provided");
VELOX_USER_CHECK(!FLAGS_node_id.empty(), "--node_id must be provided");

memory::initializeMemoryManager({});
if (memory::memoryManager() == nullptr) {
memory::initializeMemoryManager({});
}
filesystems::registerLocalFileSystem();
filesystems::registerS3FileSystem();
filesystems::registerHdfsFileSystem();
Expand Down Expand Up @@ -265,15 +267,17 @@ void TraceReplayRunner::init() {
aggregate::prestosql::registerAllAggregateFunctions();
parse::registerTypeResolver();

connector::registerConnectorFactory(
std::make_shared<connector::hive::HiveConnectorFactory>());
const auto hiveConnector =
connector::getConnectorFactory("hive")->newConnector(
"test-hive",
std::make_shared<config::ConfigBase>(
std::unordered_map<std::string, std::string>()),
ioExecutor_.get());
connector::registerConnector(hiveConnector);
if (!facebook::velox::connector::hasConnectorFactory("hive")) {
connector::registerConnectorFactory(
std::make_shared<connector::hive::HiveConnectorFactory>());
const auto hiveConnector =
connector::getConnectorFactory("hive")->newConnector(
"test-hive",
std::make_shared<config::ConfigBase>(
std::unordered_map<std::string, std::string>()),
ioExecutor_.get());
connector::registerConnector(hiveConnector);
}

fs_ = filesystems::getFileSystem(FLAGS_root_dir, nullptr);
}
Expand Down
2 changes: 1 addition & 1 deletion velox/tool/trace/TraceReplayRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class TraceReplayRunner {
/// Runs the trace replay with a set of gflags passed from replayer tool.
virtual void run();

private:
protected:
std::unique_ptr<tool::trace::OperatorReplayerBase> createReplayer() const;

const std::unique_ptr<folly::IOThreadPoolExecutor> ioExecutor_;
Expand Down
67 changes: 66 additions & 1 deletion velox/tool/trace/tests/TableScanReplayerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@
#include "velox/common/base/Fs.h"
#include "velox/common/file/FileSystems.h"
#include "velox/common/hyperloglog/SparseHll.h"
#include "velox/exec/OperatorTraceReader.h"
#include "velox/exec/PartitionFunction.h"
#include "velox/exec/TableWriter.h"
#include "velox/exec/TraceUtil.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
#include "velox/exec/tests/utils/HiveConnectorTestBase.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"
#include "velox/serializers/PrestoSerializer.h"
#include "velox/tool/trace/TableScanReplayer.h"
#include "velox/tool/trace/TraceReplayRunner.h"
#include "velox/vector/tests/utils/VectorTestBase.h"

using namespace facebook::velox;
Expand Down Expand Up @@ -100,6 +101,70 @@ class TableScanReplayerTest : public HiveConnectorTestBase {
core::PlanNodeId traceNodeId_;
};

TEST_F(TableScanReplayerTest, runner) {
const auto vectors = makeVectors(10, 100);
const auto testDir = TempDirectoryPath::create();
const auto traceRoot = fmt::format("{}/{}", testDir->getPath(), "traceRoot");
const auto fs = filesystems::getFileSystem(testDir->getPath(), nullptr);
const int numSplits{5};
std::vector<std::shared_ptr<TempFilePath>> splitFiles;
for (int i = 0; i < numSplits; ++i) {
auto filePath = TempFilePath::create();
writeToFile(filePath->getPath(), vectors);
splitFiles.push_back(std::move(filePath));
}

const auto plan = tableScanNode();
std::shared_ptr<Task> task;
auto traceResult =
AssertQueryBuilder(plan)
.maxDrivers(1)
.config(core::QueryConfig::kQueryTraceEnabled, true)
.config(core::QueryConfig::kQueryTraceDir, traceRoot)
.config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30)
.config(core::QueryConfig::kQueryTraceTaskRegExp, ".*")
.config(core::QueryConfig::kQueryTraceNodeIds, traceNodeId_)
.splits(makeHiveConnectorSplits(splitFiles))
.copyResults(pool(), task);

const auto taskTraceDir =
exec::trace::getTaskTraceDirectory(traceRoot, *task);
const auto opTraceDir = exec::trace::getOpTraceDirectory(
taskTraceDir,
traceNodeId_,
/*pipelineId=*/0,
/*driverId=*/0);
const auto summary =
exec::trace::OperatorTraceSummaryReader(opTraceDir, pool()).read();
ASSERT_EQ(summary.opType, "TableScan");
ASSERT_GT(summary.peakMemory, 0);
const int expectedTotalRows = numSplits * 10 * 100;
ASSERT_EQ(summary.inputRows, expectedTotalRows);
ASSERT_GT(summary.inputBytes, 0);
ASSERT_EQ(summary.rawInputRows, expectedTotalRows);
ASSERT_GT(summary.rawInputBytes, 0);
ASSERT_EQ(summary.numSplits.value(), numSplits);

FLAGS_root_dir = traceRoot;
FLAGS_query_id = task->queryCtx()->queryId();
FLAGS_node_id = traceNodeId_;
FLAGS_summary = true;
{
TraceReplayRunner runner;
runner.init();
runner.run();
}

FLAGS_task_id = task->taskId();
FLAGS_driver_ids = "";
FLAGS_summary = false;
{
TraceReplayRunner runner;
runner.init();
runner.run();
}
}

TEST_F(TableScanReplayerTest, basic) {
const auto vectors = makeVectors(10, 100);
const auto testDir = TempDirectoryPath::create();
Expand Down
Loading