Skip to content

Commit

Permalink
fix: Fix and improve table scan tracing and make to e2e work (faceboo…
Browse files Browse the repository at this point in the history
…kincubator#11867)

Summary:
Pull Request resolved: facebookincubator#11867

Make table scan tracing e2e work in Meta and fix issues found in existing table scan tracing
plus improvement such as adding more stats to help analysis.
This PR also add e2e testing for trace replay runner.

Reviewed By: gggrace14

Differential Revision: D67258161

fbshipit-source-id: e8695d59d89d3a4dd0ed34950bb9c135865b2620
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Dec 17, 2024
1 parent 9b77bd5 commit f1622ab
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 37 deletions.
8 changes: 8 additions & 0 deletions velox/exec/OperatorTraceReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,16 @@ OperatorTraceSummary OperatorTraceSummaryReader::read() const {
folly::dynamic summaryObj = folly::parseJson(summaryStr);
OperatorTraceSummary summary;
summary.opType = summaryObj[OperatorTraceTraits::kOpTypeKey].asString();
if (summary.opType == "TableScan") {
summary.numSplits = summaryObj[OperatorTraceTraits::kNumSplitsKey].asInt();
}
summary.peakMemory = summaryObj[OperatorTraceTraits::kPeakMemoryKey].asInt();
summary.inputRows = summaryObj[OperatorTraceTraits::kInputRowsKey].asInt();
summary.inputBytes = summaryObj[OperatorTraceTraits::kInputBytesKey].asInt();
summary.rawInputRows =
summaryObj[OperatorTraceTraits::kRawInputRowsKey].asInt();
summary.rawInputBytes =
summaryObj[OperatorTraceTraits::kRawInputBytesKey].asInt();
return summary;
}

Expand Down
33 changes: 17 additions & 16 deletions velox/exec/OperatorTraceWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,21 @@
#include "velox/exec/TraceUtil.h"

namespace facebook::velox::exec::trace {
namespace {
void recordOperatorSummary(Operator* op, folly::dynamic& obj) {
obj[OperatorTraceTraits::kOpTypeKey] = op->operatorType();
const auto stats = op->stats(/*clear=*/false);
if (op->operatorType() == "TableScan") {
obj[OperatorTraceTraits::kNumSplitsKey] = stats.numSplits;
}
obj[OperatorTraceTraits::kPeakMemoryKey] =
stats.memoryStats.peakTotalMemoryReservation;
obj[OperatorTraceTraits::kInputRowsKey] = stats.inputPositions;
obj[OperatorTraceTraits::kInputBytesKey] = stats.inputBytes;
obj[OperatorTraceTraits::kRawInputRowsKey] = stats.rawInputPositions;
obj[OperatorTraceTraits::kRawInputBytesKey] = stats.rawInputBytes;
}
} // namespace

OperatorTraceInputWriter::OperatorTraceInputWriter(
Operator* traceOp,
Expand All @@ -47,9 +62,6 @@ void OperatorTraceInputWriter::write(const RowVectorPtr& rows) {
if (FOLLY_UNLIKELY(finished_)) {
return;
}
if (FOLLY_UNLIKELY(dataType_ == nullptr)) {
dataType_ = rows->type();
}

if (batch_ == nullptr) {
batch_ = std::make_unique<VectorStreamGroup>(pool_, serde_);
Expand Down Expand Up @@ -88,14 +100,7 @@ void OperatorTraceInputWriter::writeSummary() const {
const auto summaryFilePath = getOpTraceSummaryFilePath(traceDir_);
const auto file = fs_->openFileForWrite(summaryFilePath);
folly::dynamic obj = folly::dynamic::object;
if (dataType_ != nullptr) {
obj[TraceTraits::kDataTypeKey] = dataType_->serialize();
}
obj[OperatorTraceTraits::kOpTypeKey] = traceOp_->operatorType();
const auto stats = traceOp_->stats(/*clear=*/false);
obj[OperatorTraceTraits::kPeakMemoryKey] =
stats.memoryStats.peakTotalMemoryReservation;
obj[OperatorTraceTraits::kInputRowsKey] = stats.inputPositions;
recordOperatorSummary(traceOp_, obj);
file->append(folly::toJson(obj));
file->close();
}
Expand Down Expand Up @@ -151,11 +156,7 @@ void OperatorTraceSplitWriter::writeSummary() const {
const auto summaryFilePath = getOpTraceSummaryFilePath(traceDir_);
const auto file = fs_->openFileForWrite(summaryFilePath);
folly::dynamic obj = folly::dynamic::object;
obj[OperatorTraceTraits::kOpTypeKey] = traceOp_->operatorType();
const auto stats = traceOp_->stats(/*clear=*/false);
obj[OperatorTraceTraits::kPeakMemoryKey] =
stats.memoryStats.peakTotalMemoryReservation;
obj[OperatorTraceTraits::kNumSplits] = stats.numSplits;
recordOperatorSummary(traceOp_, obj);
file->append(folly::toJson(obj));
file->close();
}
Expand Down
1 change: 0 additions & 1 deletion velox/exec/OperatorTraceWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ class OperatorTraceInputWriter {
const UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB_;

std::unique_ptr<WriteFile> traceFile_;
TypePtr dataType_;
std::unique_ptr<VectorStreamGroup> batch_;
bool limitExceeded_{false};
bool finished_{false};
Expand Down
28 changes: 23 additions & 5 deletions velox/exec/Trace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,33 @@

#include <fmt/core.h>

#include "velox/common/base/Exceptions.h"
#include "velox/common/base/SuccinctPrinter.h"

namespace facebook::velox::exec::trace {

std::string OperatorTraceSummary::toString() const {
return fmt::format(
"opType {}, inputRows {}, peakMemory {}",
opType,
inputRows,
succinctBytes(peakMemory));
if (numSplits.has_value()) {
VELOX_CHECK_EQ(opType, "TableScan");
return fmt::format(
"opType {}, numSplits {}, inputRows {}, inputBytes {}, rawInputRows {}, rawInputBytes {}, peakMemory {}",
opType,
numSplits.value(),
inputRows,
succinctBytes(inputBytes),
rawInputRows,
succinctBytes(rawInputBytes),
succinctBytes(peakMemory));
} else {
VELOX_CHECK_NE(opType, "TableScan");
return fmt::format(
"opType {}, inputRows {}, inputBytes {}, rawInputRows {}, rawInputBytes {}, peakMemory {}",
opType,
inputRows,
succinctBytes(inputBytes),
rawInputRows,
succinctBytes(rawInputBytes),
succinctBytes(peakMemory));
}
}
} // namespace facebook::velox::exec::trace
14 changes: 12 additions & 2 deletions velox/exec/Trace.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
#pragma once

#include <cstdint>
#include <optional>
#include <string>

namespace facebook::velox::exec::trace {
/// Defines the shared constants used by query trace implementation.
struct TraceTraits {
static inline const std::string kPlanNodeKey = "planNode";
static inline const std::string kQueryConfigKey = "queryConfig";
static inline const std::string kDataTypeKey = "rowType";
static inline const std::string kConnectorPropertiesKey =
"connectorProperties";

Expand All @@ -40,13 +40,23 @@ struct OperatorTraceTraits {
static inline const std::string kOpTypeKey = "opType";
static inline const std::string kPeakMemoryKey = "peakMemory";
static inline const std::string kInputRowsKey = "inputRows";
static inline const std::string kNumSplits = "numSplits";
static inline const std::string kInputBytesKey = "inputBytes";
static inline const std::string kRawInputRowsKey = "rawInputRows";
static inline const std::string kRawInputBytesKey = "rawInputBytes";
static inline const std::string kNumSplitsKey = "numSplits";
};

/// Contains the summary of an operator trace.
struct OperatorTraceSummary {
std::string opType;
/// The number of splits processed by a table scan operator, nullopt for the
/// other operator types.
std::optional<uint32_t> numSplits{std::nullopt};

uint64_t inputRows{0};
uint64_t inputBytes{0};
uint64_t rawInputRows{0};
uint64_t rawInputBytes{0};
uint64_t peakMemory{0};

std::string toString() const;
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/tests/OperatorTraceTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,10 @@ TEST_F(OperatorTraceTest, traceData) {
ASSERT_EQ(summary.opType, "Aggregation");
ASSERT_GT(summary.peakMemory, 0);
ASSERT_EQ(summary.inputRows, testData.numTracedBatches * 16);
ASSERT_GT(summary.inputBytes, 0);
ASSERT_EQ(summary.rawInputRows, 0);
ASSERT_EQ(summary.rawInputBytes, 0);
ASSERT_FALSE(summary.numSplits.has_value());

const auto reader = OperatorTraceInputReader(opTraceDir, dataType_, pool());
RowVectorPtr actual;
Expand Down
10 changes: 9 additions & 1 deletion velox/exec/tests/TraceUtilTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,15 @@ TEST_F(TraceUtilTest, OperatorTraceSummary) {
summary.inputRows = 100;
summary.peakMemory = 200;
ASSERT_EQ(
summary.toString(), "opType summary, inputRows 100, peakMemory 200B");
summary.toString(),
"opType summary, inputRows 100, inputBytes 0B, rawInputRows 0, rawInputBytes 0B, peakMemory 200B");
summary.numSplits = 10;
summary.rawInputBytes = 222;
VELOX_ASSERT_THROW(summary.toString(), "summary vs. TableScan");
summary.opType = "TableScan";
ASSERT_EQ(
summary.toString(),
"opType TableScan, numSplits 10, inputRows 100, inputBytes 0B, rawInputRows 0, rawInputBytes 222B, peakMemory 200B");
}

TEST_F(TraceUtilTest, traceDirectoryLayoutUtilities) {
Expand Down
24 changes: 14 additions & 10 deletions velox/tool/trace/TraceReplayRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,9 @@ void TraceReplayRunner::init() {
VELOX_USER_CHECK(!FLAGS_query_id.empty(), "--query_id must be provided");
VELOX_USER_CHECK(!FLAGS_node_id.empty(), "--node_id must be provided");

memory::initializeMemoryManager({});
if (memory::memoryManager() == nullptr) {
memory::initializeMemoryManager({});
}
filesystems::registerLocalFileSystem();
filesystems::registerS3FileSystem();
filesystems::registerHdfsFileSystem();
Expand Down Expand Up @@ -265,15 +267,17 @@ void TraceReplayRunner::init() {
aggregate::prestosql::registerAllAggregateFunctions();
parse::registerTypeResolver();

connector::registerConnectorFactory(
std::make_shared<connector::hive::HiveConnectorFactory>());
const auto hiveConnector =
connector::getConnectorFactory("hive")->newConnector(
"test-hive",
std::make_shared<config::ConfigBase>(
std::unordered_map<std::string, std::string>()),
ioExecutor_.get());
connector::registerConnector(hiveConnector);
if (!facebook::velox::connector::hasConnectorFactory("hive")) {
connector::registerConnectorFactory(
std::make_shared<connector::hive::HiveConnectorFactory>());
const auto hiveConnector =
connector::getConnectorFactory("hive")->newConnector(
"test-hive",
std::make_shared<config::ConfigBase>(
std::unordered_map<std::string, std::string>()),
ioExecutor_.get());
connector::registerConnector(hiveConnector);
}

fs_ = filesystems::getFileSystem(FLAGS_root_dir, nullptr);
}
Expand Down
2 changes: 1 addition & 1 deletion velox/tool/trace/TraceReplayRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class TraceReplayRunner {
/// Runs the trace replay with a set of gflags passed from replayer tool.
virtual void run();

private:
protected:
std::unique_ptr<tool::trace::OperatorReplayerBase> createReplayer() const;

const std::unique_ptr<folly::IOThreadPoolExecutor> ioExecutor_;
Expand Down
67 changes: 66 additions & 1 deletion velox/tool/trace/tests/TableScanReplayerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@
#include "velox/common/base/Fs.h"
#include "velox/common/file/FileSystems.h"
#include "velox/common/hyperloglog/SparseHll.h"
#include "velox/exec/OperatorTraceReader.h"
#include "velox/exec/PartitionFunction.h"
#include "velox/exec/TableWriter.h"
#include "velox/exec/TraceUtil.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
#include "velox/exec/tests/utils/HiveConnectorTestBase.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"
#include "velox/serializers/PrestoSerializer.h"
#include "velox/tool/trace/TableScanReplayer.h"
#include "velox/tool/trace/TraceReplayRunner.h"
#include "velox/vector/tests/utils/VectorTestBase.h"

using namespace facebook::velox;
Expand Down Expand Up @@ -100,6 +101,70 @@ class TableScanReplayerTest : public HiveConnectorTestBase {
core::PlanNodeId traceNodeId_;
};

TEST_F(TableScanReplayerTest, runner) {
const auto vectors = makeVectors(10, 100);
const auto testDir = TempDirectoryPath::create();
const auto traceRoot = fmt::format("{}/{}", testDir->getPath(), "traceRoot");
const auto fs = filesystems::getFileSystem(testDir->getPath(), nullptr);
const int numSplits{5};
std::vector<std::shared_ptr<TempFilePath>> splitFiles;
for (int i = 0; i < numSplits; ++i) {
auto filePath = TempFilePath::create();
writeToFile(filePath->getPath(), vectors);
splitFiles.push_back(std::move(filePath));
}

const auto plan = tableScanNode();
std::shared_ptr<Task> task;
auto traceResult =
AssertQueryBuilder(plan)
.maxDrivers(1)
.config(core::QueryConfig::kQueryTraceEnabled, true)
.config(core::QueryConfig::kQueryTraceDir, traceRoot)
.config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30)
.config(core::QueryConfig::kQueryTraceTaskRegExp, ".*")
.config(core::QueryConfig::kQueryTraceNodeIds, traceNodeId_)
.splits(makeHiveConnectorSplits(splitFiles))
.copyResults(pool(), task);

const auto taskTraceDir =
exec::trace::getTaskTraceDirectory(traceRoot, *task);
const auto opTraceDir = exec::trace::getOpTraceDirectory(
taskTraceDir,
traceNodeId_,
/*pipelineId=*/0,
/*driverId=*/0);
const auto summary =
exec::trace::OperatorTraceSummaryReader(opTraceDir, pool()).read();
ASSERT_EQ(summary.opType, "TableScan");
ASSERT_GT(summary.peakMemory, 0);
const int expectedTotalRows = numSplits * 10 * 100;
ASSERT_EQ(summary.inputRows, expectedTotalRows);
ASSERT_GT(summary.inputBytes, 0);
ASSERT_EQ(summary.rawInputRows, expectedTotalRows);
ASSERT_GT(summary.rawInputBytes, 0);
ASSERT_EQ(summary.numSplits.value(), numSplits);

FLAGS_root_dir = traceRoot;
FLAGS_query_id = task->queryCtx()->queryId();
FLAGS_node_id = traceNodeId_;
FLAGS_summary = true;
{
TraceReplayRunner runner;
runner.init();
runner.run();
}

FLAGS_task_id = task->taskId();
FLAGS_driver_ids = "";
FLAGS_summary = false;
{
TraceReplayRunner runner;
runner.init();
runner.run();
}
}

TEST_F(TableScanReplayerTest, basic) {
const auto vectors = makeVectors(10, 100);
const auto testDir = TempDirectoryPath::create();
Expand Down

0 comments on commit f1622ab

Please sign in to comment.