From 7bf14e7e8a483ce1eae14d13c41e415ce7c39434 Mon Sep 17 00:00:00 2001 From: Xiaoxuan Meng Date: Mon, 16 Dec 2024 17:02:57 -0800 Subject: [PATCH] fix: Fix and improve table scan tracing and make to e2e work (#11867) Summary: Make table scan tracing e2e work in Meta and fix issues found in existing table scan tracing plus improvement such as adding more stats to help analysis. This PR also add e2e testing for trace replay runner. Reviewed By: gggrace14 Differential Revision: D67258161 --- velox/exec/OperatorTraceReader.cpp | 8 +++ velox/exec/OperatorTraceWriter.cpp | 33 ++++----- velox/exec/OperatorTraceWriter.h | 1 - velox/exec/Trace.cpp | 28 ++++++-- velox/exec/Trace.h | 14 +++- velox/exec/tests/OperatorTraceTest.cpp | 4 ++ velox/exec/tests/TraceUtilTest.cpp | 10 ++- velox/tool/trace/TraceReplayRunner.cpp | 24 ++++--- velox/tool/trace/TraceReplayRunner.h | 2 +- .../trace/tests/TableScanReplayerTest.cpp | 67 ++++++++++++++++++- 10 files changed, 154 insertions(+), 37 deletions(-) diff --git a/velox/exec/OperatorTraceReader.cpp b/velox/exec/OperatorTraceReader.cpp index e08c6cf4d1a2..0e64111d6886 100644 --- a/velox/exec/OperatorTraceReader.cpp +++ b/velox/exec/OperatorTraceReader.cpp @@ -80,8 +80,16 @@ OperatorTraceSummary OperatorTraceSummaryReader::read() const { folly::dynamic summaryObj = folly::parseJson(summaryStr); OperatorTraceSummary summary; summary.opType = summaryObj[OperatorTraceTraits::kOpTypeKey].asString(); + if (summary.opType == "TableScan") { + summary.numSplits = summaryObj[OperatorTraceTraits::kNumSplitsKey].asInt(); + } summary.peakMemory = summaryObj[OperatorTraceTraits::kPeakMemoryKey].asInt(); summary.inputRows = summaryObj[OperatorTraceTraits::kInputRowsKey].asInt(); + summary.inputBytes = summaryObj[OperatorTraceTraits::kInputBytesKey].asInt(); + summary.rawInputRows = + summaryObj[OperatorTraceTraits::kRawInputRowsKey].asInt(); + summary.rawInputBytes = + summaryObj[OperatorTraceTraits::kRawInputBytesKey].asInt(); return summary; } diff --git a/velox/exec/OperatorTraceWriter.cpp b/velox/exec/OperatorTraceWriter.cpp index 409406ba2114..36719246f290 100644 --- a/velox/exec/OperatorTraceWriter.cpp +++ b/velox/exec/OperatorTraceWriter.cpp @@ -27,6 +27,21 @@ #include "velox/exec/TraceUtil.h" namespace facebook::velox::exec::trace { +namespace { +void recordOperatorSummary(Operator* op, folly::dynamic& obj) { + obj[OperatorTraceTraits::kOpTypeKey] = op->operatorType(); + const auto stats = op->stats(/*clear=*/false); + if (op->operatorType() == "TableScan") { + obj[OperatorTraceTraits::kNumSplitsKey] = stats.numSplits; + } + obj[OperatorTraceTraits::kPeakMemoryKey] = + stats.memoryStats.peakTotalMemoryReservation; + obj[OperatorTraceTraits::kInputRowsKey] = stats.inputPositions; + obj[OperatorTraceTraits::kInputBytesKey] = stats.inputBytes; + obj[OperatorTraceTraits::kRawInputRowsKey] = stats.rawInputPositions; + obj[OperatorTraceTraits::kRawInputBytesKey] = stats.rawInputBytes; +} +} // namespace OperatorTraceInputWriter::OperatorTraceInputWriter( Operator* traceOp, @@ -47,9 +62,6 @@ void OperatorTraceInputWriter::write(const RowVectorPtr& rows) { if (FOLLY_UNLIKELY(finished_)) { return; } - if (FOLLY_UNLIKELY(dataType_ == nullptr)) { - dataType_ = rows->type(); - } if (batch_ == nullptr) { batch_ = std::make_unique(pool_, serde_); @@ -88,14 +100,7 @@ void OperatorTraceInputWriter::writeSummary() const { const auto summaryFilePath = getOpTraceSummaryFilePath(traceDir_); const auto file = fs_->openFileForWrite(summaryFilePath); folly::dynamic obj = folly::dynamic::object; - if (dataType_ != nullptr) { - obj[TraceTraits::kDataTypeKey] = dataType_->serialize(); - } - obj[OperatorTraceTraits::kOpTypeKey] = traceOp_->operatorType(); - const auto stats = traceOp_->stats(/*clear=*/false); - obj[OperatorTraceTraits::kPeakMemoryKey] = - stats.memoryStats.peakTotalMemoryReservation; - obj[OperatorTraceTraits::kInputRowsKey] = stats.inputPositions; + recordOperatorSummary(traceOp_, obj); file->append(folly::toJson(obj)); file->close(); } @@ -151,11 +156,7 @@ void OperatorTraceSplitWriter::writeSummary() const { const auto summaryFilePath = getOpTraceSummaryFilePath(traceDir_); const auto file = fs_->openFileForWrite(summaryFilePath); folly::dynamic obj = folly::dynamic::object; - obj[OperatorTraceTraits::kOpTypeKey] = traceOp_->operatorType(); - const auto stats = traceOp_->stats(/*clear=*/false); - obj[OperatorTraceTraits::kPeakMemoryKey] = - stats.memoryStats.peakTotalMemoryReservation; - obj[OperatorTraceTraits::kNumSplits] = stats.numSplits; + recordOperatorSummary(traceOp_, obj); file->append(folly::toJson(obj)); file->close(); } diff --git a/velox/exec/OperatorTraceWriter.h b/velox/exec/OperatorTraceWriter.h index 451b87f792bb..94634eaa8ecb 100644 --- a/velox/exec/OperatorTraceWriter.h +++ b/velox/exec/OperatorTraceWriter.h @@ -65,7 +65,6 @@ class OperatorTraceInputWriter { const UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB_; std::unique_ptr traceFile_; - TypePtr dataType_; std::unique_ptr batch_; bool limitExceeded_{false}; bool finished_{false}; diff --git a/velox/exec/Trace.cpp b/velox/exec/Trace.cpp index b4d224fe4daf..6ff55787fc50 100644 --- a/velox/exec/Trace.cpp +++ b/velox/exec/Trace.cpp @@ -18,15 +18,33 @@ #include +#include "velox/common/base/Exceptions.h" #include "velox/common/base/SuccinctPrinter.h" namespace facebook::velox::exec::trace { std::string OperatorTraceSummary::toString() const { - return fmt::format( - "opType {}, inputRows {}, peakMemory {}", - opType, - inputRows, - succinctBytes(peakMemory)); + if (numSplits.has_value()) { + VELOX_CHECK_EQ(opType, "TableScan"); + return fmt::format( + "opType {}, numSplits {}, inputRows {}, inputBytes {}, rawInputRows {}, rawInputBytes {}, peakMemory {}", + opType, + numSplits.value(), + inputRows, + succinctBytes(inputBytes), + rawInputRows, + succinctBytes(rawInputBytes), + succinctBytes(peakMemory)); + } else { + VELOX_CHECK_NE(opType, "TableScan"); + return fmt::format( + "opType {}, inputRows {}, inputBytes {}, rawInputRows {}, rawInputBytes {}, peakMemory {}", + opType, + inputRows, + succinctBytes(inputBytes), + rawInputRows, + succinctBytes(rawInputBytes), + succinctBytes(peakMemory)); + } } } // namespace facebook::velox::exec::trace diff --git a/velox/exec/Trace.h b/velox/exec/Trace.h index 9826596b9a9f..e9e5495c7648 100644 --- a/velox/exec/Trace.h +++ b/velox/exec/Trace.h @@ -17,6 +17,7 @@ #pragma once #include +#include #include namespace facebook::velox::exec::trace { @@ -24,7 +25,6 @@ namespace facebook::velox::exec::trace { struct TraceTraits { static inline const std::string kPlanNodeKey = "planNode"; static inline const std::string kQueryConfigKey = "queryConfig"; - static inline const std::string kDataTypeKey = "rowType"; static inline const std::string kConnectorPropertiesKey = "connectorProperties"; @@ -40,13 +40,23 @@ struct OperatorTraceTraits { static inline const std::string kOpTypeKey = "opType"; static inline const std::string kPeakMemoryKey = "peakMemory"; static inline const std::string kInputRowsKey = "inputRows"; - static inline const std::string kNumSplits = "numSplits"; + static inline const std::string kInputBytesKey = "inputBytes"; + static inline const std::string kRawInputRowsKey = "rawInputRows"; + static inline const std::string kRawInputBytesKey = "rawInputBytes"; + static inline const std::string kNumSplitsKey = "numSplits"; }; /// Contains the summary of an operator trace. struct OperatorTraceSummary { std::string opType; + /// The number of splits processed by a table scan operator, nullopt for the + /// other operator types. + std::optional numSplits{std::nullopt}; + uint64_t inputRows{0}; + uint64_t inputBytes{0}; + uint64_t rawInputRows{0}; + uint64_t rawInputBytes{0}; uint64_t peakMemory{0}; std::string toString() const; diff --git a/velox/exec/tests/OperatorTraceTest.cpp b/velox/exec/tests/OperatorTraceTest.cpp index bbacd171f234..a1c145ed7aa9 100644 --- a/velox/exec/tests/OperatorTraceTest.cpp +++ b/velox/exec/tests/OperatorTraceTest.cpp @@ -233,6 +233,10 @@ TEST_F(OperatorTraceTest, traceData) { ASSERT_EQ(summary.opType, "Aggregation"); ASSERT_GT(summary.peakMemory, 0); ASSERT_EQ(summary.inputRows, testData.numTracedBatches * 16); + ASSERT_GT(summary.inputBytes, 0); + ASSERT_EQ(summary.rawInputRows, 0); + ASSERT_EQ(summary.rawInputBytes, 0); + ASSERT_FALSE(summary.numSplits.has_value()); const auto reader = OperatorTraceInputReader(opTraceDir, dataType_, pool()); RowVectorPtr actual; diff --git a/velox/exec/tests/TraceUtilTest.cpp b/velox/exec/tests/TraceUtilTest.cpp index e53152c1a5ab..14a62f2159c4 100644 --- a/velox/exec/tests/TraceUtilTest.cpp +++ b/velox/exec/tests/TraceUtilTest.cpp @@ -89,7 +89,15 @@ TEST_F(TraceUtilTest, OperatorTraceSummary) { summary.inputRows = 100; summary.peakMemory = 200; ASSERT_EQ( - summary.toString(), "opType summary, inputRows 100, peakMemory 200B"); + summary.toString(), + "opType summary, inputRows 100, inputBytes 0B, rawInputRows 0, rawInputBytes 0B, peakMemory 200B"); + summary.numSplits = 10; + summary.rawInputBytes = 222; + VELOX_ASSERT_THROW(summary.toString(), "summary vs. TableScan"); + summary.opType = "TableScan"; + ASSERT_EQ( + summary.toString(), + "opType TableScan, numSplits 10, inputRows 100, inputBytes 0B, rawInputRows 0, rawInputBytes 222B, peakMemory 200B"); } TEST_F(TraceUtilTest, traceDirectoryLayoutUtilities) { diff --git a/velox/tool/trace/TraceReplayRunner.cpp b/velox/tool/trace/TraceReplayRunner.cpp index 83e48ef6a7ac..d2162d2da479 100644 --- a/velox/tool/trace/TraceReplayRunner.cpp +++ b/velox/tool/trace/TraceReplayRunner.cpp @@ -223,7 +223,9 @@ void TraceReplayRunner::init() { VELOX_USER_CHECK(!FLAGS_query_id.empty(), "--query_id must be provided"); VELOX_USER_CHECK(!FLAGS_node_id.empty(), "--node_id must be provided"); - memory::initializeMemoryManager({}); + if (memory::memoryManager() == nullptr) { + memory::initializeMemoryManager({}); + } filesystems::registerLocalFileSystem(); filesystems::registerS3FileSystem(); filesystems::registerHdfsFileSystem(); @@ -265,15 +267,17 @@ void TraceReplayRunner::init() { aggregate::prestosql::registerAllAggregateFunctions(); parse::registerTypeResolver(); - connector::registerConnectorFactory( - std::make_shared()); - const auto hiveConnector = - connector::getConnectorFactory("hive")->newConnector( - "test-hive", - std::make_shared( - std::unordered_map()), - ioExecutor_.get()); - connector::registerConnector(hiveConnector); + if (!facebook::velox::connector::hasConnectorFactory("hive")) { + connector::registerConnectorFactory( + std::make_shared()); + const auto hiveConnector = + connector::getConnectorFactory("hive")->newConnector( + "test-hive", + std::make_shared( + std::unordered_map()), + ioExecutor_.get()); + connector::registerConnector(hiveConnector); + } fs_ = filesystems::getFileSystem(FLAGS_root_dir, nullptr); } diff --git a/velox/tool/trace/TraceReplayRunner.h b/velox/tool/trace/TraceReplayRunner.h index fa2bef827874..65640357e533 100644 --- a/velox/tool/trace/TraceReplayRunner.h +++ b/velox/tool/trace/TraceReplayRunner.h @@ -50,7 +50,7 @@ class TraceReplayRunner { /// Runs the trace replay with a set of gflags passed from replayer tool. virtual void run(); - private: + protected: std::unique_ptr createReplayer() const; const std::unique_ptr ioExecutor_; diff --git a/velox/tool/trace/tests/TableScanReplayerTest.cpp b/velox/tool/trace/tests/TableScanReplayerTest.cpp index 22b8c66d221c..e30b7ffa9158 100644 --- a/velox/tool/trace/tests/TableScanReplayerTest.cpp +++ b/velox/tool/trace/tests/TableScanReplayerTest.cpp @@ -25,8 +25,8 @@ #include "velox/common/base/Fs.h" #include "velox/common/file/FileSystems.h" #include "velox/common/hyperloglog/SparseHll.h" +#include "velox/exec/OperatorTraceReader.h" #include "velox/exec/PartitionFunction.h" -#include "velox/exec/TableWriter.h" #include "velox/exec/TraceUtil.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" @@ -34,6 +34,7 @@ #include "velox/exec/tests/utils/TempDirectoryPath.h" #include "velox/serializers/PrestoSerializer.h" #include "velox/tool/trace/TableScanReplayer.h" +#include "velox/tool/trace/TraceReplayRunner.h" #include "velox/vector/tests/utils/VectorTestBase.h" using namespace facebook::velox; @@ -100,6 +101,70 @@ class TableScanReplayerTest : public HiveConnectorTestBase { core::PlanNodeId traceNodeId_; }; +TEST_F(TableScanReplayerTest, runner) { + const auto vectors = makeVectors(10, 100); + const auto testDir = TempDirectoryPath::create(); + const auto traceRoot = fmt::format("{}/{}", testDir->getPath(), "traceRoot"); + const auto fs = filesystems::getFileSystem(testDir->getPath(), nullptr); + const int numSplits{5}; + std::vector> splitFiles; + for (int i = 0; i < numSplits; ++i) { + auto filePath = TempFilePath::create(); + writeToFile(filePath->getPath(), vectors); + splitFiles.push_back(std::move(filePath)); + } + + const auto plan = tableScanNode(); + std::shared_ptr task; + auto traceResult = + AssertQueryBuilder(plan) + .maxDrivers(1) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceRoot) + .config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config(core::QueryConfig::kQueryTraceNodeIds, traceNodeId_) + .splits(makeHiveConnectorSplits(splitFiles)) + .copyResults(pool(), task); + + const auto taskTraceDir = + exec::trace::getTaskTraceDirectory(traceRoot, *task); + const auto opTraceDir = exec::trace::getOpTraceDirectory( + taskTraceDir, + traceNodeId_, + /*pipelineId=*/0, + /*driverId=*/0); + const auto summary = + exec::trace::OperatorTraceSummaryReader(opTraceDir, pool()).read(); + ASSERT_EQ(summary.opType, "TableScan"); + ASSERT_GT(summary.peakMemory, 0); + const int expectedTotalRows = numSplits * 10 * 100; + ASSERT_EQ(summary.inputRows, expectedTotalRows); + ASSERT_GT(summary.inputBytes, 0); + ASSERT_EQ(summary.rawInputRows, expectedTotalRows); + ASSERT_GT(summary.rawInputBytes, 0); + ASSERT_EQ(summary.numSplits.value(), numSplits); + + FLAGS_root_dir = traceRoot; + FLAGS_query_id = task->queryCtx()->queryId(); + FLAGS_node_id = traceNodeId_; + FLAGS_summary = true; + { + TraceReplayRunner runner; + runner.init(); + runner.run(); + } + + FLAGS_task_id = task->taskId(); + FLAGS_driver_ids = ""; + FLAGS_summary = false; + { + TraceReplayRunner runner; + runner.init(); + runner.run(); + } +} + TEST_F(TableScanReplayerTest, basic) { const auto vectors = makeVectors(10, 100); const auto testDir = TempDirectoryPath::create();