diff --git a/velox/exec/OperatorTraceReader.cpp b/velox/exec/OperatorTraceReader.cpp index e08c6cf4d1a2..0e64111d6886 100644 --- a/velox/exec/OperatorTraceReader.cpp +++ b/velox/exec/OperatorTraceReader.cpp @@ -80,8 +80,16 @@ OperatorTraceSummary OperatorTraceSummaryReader::read() const { folly::dynamic summaryObj = folly::parseJson(summaryStr); OperatorTraceSummary summary; summary.opType = summaryObj[OperatorTraceTraits::kOpTypeKey].asString(); + if (summary.opType == "TableScan") { + summary.numSplits = summaryObj[OperatorTraceTraits::kNumSplitsKey].asInt(); + } summary.peakMemory = summaryObj[OperatorTraceTraits::kPeakMemoryKey].asInt(); summary.inputRows = summaryObj[OperatorTraceTraits::kInputRowsKey].asInt(); + summary.inputBytes = summaryObj[OperatorTraceTraits::kInputBytesKey].asInt(); + summary.rawInputRows = + summaryObj[OperatorTraceTraits::kRawInputRowsKey].asInt(); + summary.rawInputBytes = + summaryObj[OperatorTraceTraits::kRawInputBytesKey].asInt(); return summary; } diff --git a/velox/exec/OperatorTraceWriter.cpp b/velox/exec/OperatorTraceWriter.cpp index 409406ba2114..36719246f290 100644 --- a/velox/exec/OperatorTraceWriter.cpp +++ b/velox/exec/OperatorTraceWriter.cpp @@ -27,6 +27,21 @@ #include "velox/exec/TraceUtil.h" namespace facebook::velox::exec::trace { +namespace { +void recordOperatorSummary(Operator* op, folly::dynamic& obj) { + obj[OperatorTraceTraits::kOpTypeKey] = op->operatorType(); + const auto stats = op->stats(/*clear=*/false); + if (op->operatorType() == "TableScan") { + obj[OperatorTraceTraits::kNumSplitsKey] = stats.numSplits; + } + obj[OperatorTraceTraits::kPeakMemoryKey] = + stats.memoryStats.peakTotalMemoryReservation; + obj[OperatorTraceTraits::kInputRowsKey] = stats.inputPositions; + obj[OperatorTraceTraits::kInputBytesKey] = stats.inputBytes; + obj[OperatorTraceTraits::kRawInputRowsKey] = stats.rawInputPositions; + obj[OperatorTraceTraits::kRawInputBytesKey] = stats.rawInputBytes; +} +} // namespace OperatorTraceInputWriter::OperatorTraceInputWriter( Operator* traceOp, @@ -47,9 +62,6 @@ void OperatorTraceInputWriter::write(const RowVectorPtr& rows) { if (FOLLY_UNLIKELY(finished_)) { return; } - if (FOLLY_UNLIKELY(dataType_ == nullptr)) { - dataType_ = rows->type(); - } if (batch_ == nullptr) { batch_ = std::make_unique(pool_, serde_); @@ -88,14 +100,7 @@ void OperatorTraceInputWriter::writeSummary() const { const auto summaryFilePath = getOpTraceSummaryFilePath(traceDir_); const auto file = fs_->openFileForWrite(summaryFilePath); folly::dynamic obj = folly::dynamic::object; - if (dataType_ != nullptr) { - obj[TraceTraits::kDataTypeKey] = dataType_->serialize(); - } - obj[OperatorTraceTraits::kOpTypeKey] = traceOp_->operatorType(); - const auto stats = traceOp_->stats(/*clear=*/false); - obj[OperatorTraceTraits::kPeakMemoryKey] = - stats.memoryStats.peakTotalMemoryReservation; - obj[OperatorTraceTraits::kInputRowsKey] = stats.inputPositions; + recordOperatorSummary(traceOp_, obj); file->append(folly::toJson(obj)); file->close(); } @@ -151,11 +156,7 @@ void OperatorTraceSplitWriter::writeSummary() const { const auto summaryFilePath = getOpTraceSummaryFilePath(traceDir_); const auto file = fs_->openFileForWrite(summaryFilePath); folly::dynamic obj = folly::dynamic::object; - obj[OperatorTraceTraits::kOpTypeKey] = traceOp_->operatorType(); - const auto stats = traceOp_->stats(/*clear=*/false); - obj[OperatorTraceTraits::kPeakMemoryKey] = - stats.memoryStats.peakTotalMemoryReservation; - obj[OperatorTraceTraits::kNumSplits] = stats.numSplits; + recordOperatorSummary(traceOp_, obj); file->append(folly::toJson(obj)); file->close(); } diff --git a/velox/exec/OperatorTraceWriter.h b/velox/exec/OperatorTraceWriter.h index 451b87f792bb..94634eaa8ecb 100644 --- a/velox/exec/OperatorTraceWriter.h +++ b/velox/exec/OperatorTraceWriter.h @@ -65,7 +65,6 @@ class OperatorTraceInputWriter { const UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB_; std::unique_ptr traceFile_; - TypePtr dataType_; std::unique_ptr batch_; bool limitExceeded_{false}; bool finished_{false}; diff --git a/velox/exec/Trace.cpp b/velox/exec/Trace.cpp index b4d224fe4daf..6ff55787fc50 100644 --- a/velox/exec/Trace.cpp +++ b/velox/exec/Trace.cpp @@ -18,15 +18,33 @@ #include +#include "velox/common/base/Exceptions.h" #include "velox/common/base/SuccinctPrinter.h" namespace facebook::velox::exec::trace { std::string OperatorTraceSummary::toString() const { - return fmt::format( - "opType {}, inputRows {}, peakMemory {}", - opType, - inputRows, - succinctBytes(peakMemory)); + if (numSplits.has_value()) { + VELOX_CHECK_EQ(opType, "TableScan"); + return fmt::format( + "opType {}, numSplits {}, inputRows {}, inputBytes {}, rawInputRows {}, rawInputBytes {}, peakMemory {}", + opType, + numSplits.value(), + inputRows, + succinctBytes(inputBytes), + rawInputRows, + succinctBytes(rawInputBytes), + succinctBytes(peakMemory)); + } else { + VELOX_CHECK_NE(opType, "TableScan"); + return fmt::format( + "opType {}, inputRows {}, inputBytes {}, rawInputRows {}, rawInputBytes {}, peakMemory {}", + opType, + inputRows, + succinctBytes(inputBytes), + rawInputRows, + succinctBytes(rawInputBytes), + succinctBytes(peakMemory)); + } } } // namespace facebook::velox::exec::trace diff --git a/velox/exec/Trace.h b/velox/exec/Trace.h index 9826596b9a9f..e9e5495c7648 100644 --- a/velox/exec/Trace.h +++ b/velox/exec/Trace.h @@ -17,6 +17,7 @@ #pragma once #include +#include #include namespace facebook::velox::exec::trace { @@ -24,7 +25,6 @@ namespace facebook::velox::exec::trace { struct TraceTraits { static inline const std::string kPlanNodeKey = "planNode"; static inline const std::string kQueryConfigKey = "queryConfig"; - static inline const std::string kDataTypeKey = "rowType"; static inline const std::string kConnectorPropertiesKey = "connectorProperties"; @@ -40,13 +40,23 @@ struct OperatorTraceTraits { static inline const std::string kOpTypeKey = "opType"; static inline const std::string kPeakMemoryKey = "peakMemory"; static inline const std::string kInputRowsKey = "inputRows"; - static inline const std::string kNumSplits = "numSplits"; + static inline const std::string kInputBytesKey = "inputBytes"; + static inline const std::string kRawInputRowsKey = "rawInputRows"; + static inline const std::string kRawInputBytesKey = "rawInputBytes"; + static inline const std::string kNumSplitsKey = "numSplits"; }; /// Contains the summary of an operator trace. struct OperatorTraceSummary { std::string opType; + /// The number of splits processed by a table scan operator, nullopt for the + /// other operator types. + std::optional numSplits{std::nullopt}; + uint64_t inputRows{0}; + uint64_t inputBytes{0}; + uint64_t rawInputRows{0}; + uint64_t rawInputBytes{0}; uint64_t peakMemory{0}; std::string toString() const; diff --git a/velox/exec/tests/OperatorTraceTest.cpp b/velox/exec/tests/OperatorTraceTest.cpp index bbacd171f234..a1c145ed7aa9 100644 --- a/velox/exec/tests/OperatorTraceTest.cpp +++ b/velox/exec/tests/OperatorTraceTest.cpp @@ -233,6 +233,10 @@ TEST_F(OperatorTraceTest, traceData) { ASSERT_EQ(summary.opType, "Aggregation"); ASSERT_GT(summary.peakMemory, 0); ASSERT_EQ(summary.inputRows, testData.numTracedBatches * 16); + ASSERT_GT(summary.inputBytes, 0); + ASSERT_EQ(summary.rawInputRows, 0); + ASSERT_EQ(summary.rawInputBytes, 0); + ASSERT_FALSE(summary.numSplits.has_value()); const auto reader = OperatorTraceInputReader(opTraceDir, dataType_, pool()); RowVectorPtr actual; diff --git a/velox/exec/tests/TraceUtilTest.cpp b/velox/exec/tests/TraceUtilTest.cpp index e53152c1a5ab..14a62f2159c4 100644 --- a/velox/exec/tests/TraceUtilTest.cpp +++ b/velox/exec/tests/TraceUtilTest.cpp @@ -89,7 +89,15 @@ TEST_F(TraceUtilTest, OperatorTraceSummary) { summary.inputRows = 100; summary.peakMemory = 200; ASSERT_EQ( - summary.toString(), "opType summary, inputRows 100, peakMemory 200B"); + summary.toString(), + "opType summary, inputRows 100, inputBytes 0B, rawInputRows 0, rawInputBytes 0B, peakMemory 200B"); + summary.numSplits = 10; + summary.rawInputBytes = 222; + VELOX_ASSERT_THROW(summary.toString(), "summary vs. TableScan"); + summary.opType = "TableScan"; + ASSERT_EQ( + summary.toString(), + "opType TableScan, numSplits 10, inputRows 100, inputBytes 0B, rawInputRows 0, rawInputBytes 222B, peakMemory 200B"); } TEST_F(TraceUtilTest, traceDirectoryLayoutUtilities) { diff --git a/velox/tool/trace/TraceReplayRunner.cpp b/velox/tool/trace/TraceReplayRunner.cpp index 83e48ef6a7ac..d2162d2da479 100644 --- a/velox/tool/trace/TraceReplayRunner.cpp +++ b/velox/tool/trace/TraceReplayRunner.cpp @@ -223,7 +223,9 @@ void TraceReplayRunner::init() { VELOX_USER_CHECK(!FLAGS_query_id.empty(), "--query_id must be provided"); VELOX_USER_CHECK(!FLAGS_node_id.empty(), "--node_id must be provided"); - memory::initializeMemoryManager({}); + if (memory::memoryManager() == nullptr) { + memory::initializeMemoryManager({}); + } filesystems::registerLocalFileSystem(); filesystems::registerS3FileSystem(); filesystems::registerHdfsFileSystem(); @@ -265,15 +267,17 @@ void TraceReplayRunner::init() { aggregate::prestosql::registerAllAggregateFunctions(); parse::registerTypeResolver(); - connector::registerConnectorFactory( - std::make_shared()); - const auto hiveConnector = - connector::getConnectorFactory("hive")->newConnector( - "test-hive", - std::make_shared( - std::unordered_map()), - ioExecutor_.get()); - connector::registerConnector(hiveConnector); + if (!facebook::velox::connector::hasConnectorFactory("hive")) { + connector::registerConnectorFactory( + std::make_shared()); + const auto hiveConnector = + connector::getConnectorFactory("hive")->newConnector( + "test-hive", + std::make_shared( + std::unordered_map()), + ioExecutor_.get()); + connector::registerConnector(hiveConnector); + } fs_ = filesystems::getFileSystem(FLAGS_root_dir, nullptr); } diff --git a/velox/tool/trace/TraceReplayRunner.h b/velox/tool/trace/TraceReplayRunner.h index fa2bef827874..65640357e533 100644 --- a/velox/tool/trace/TraceReplayRunner.h +++ b/velox/tool/trace/TraceReplayRunner.h @@ -50,7 +50,7 @@ class TraceReplayRunner { /// Runs the trace replay with a set of gflags passed from replayer tool. virtual void run(); - private: + protected: std::unique_ptr createReplayer() const; const std::unique_ptr ioExecutor_; diff --git a/velox/tool/trace/tests/TableScanReplayerTest.cpp b/velox/tool/trace/tests/TableScanReplayerTest.cpp index 22b8c66d221c..e30b7ffa9158 100644 --- a/velox/tool/trace/tests/TableScanReplayerTest.cpp +++ b/velox/tool/trace/tests/TableScanReplayerTest.cpp @@ -25,8 +25,8 @@ #include "velox/common/base/Fs.h" #include "velox/common/file/FileSystems.h" #include "velox/common/hyperloglog/SparseHll.h" +#include "velox/exec/OperatorTraceReader.h" #include "velox/exec/PartitionFunction.h" -#include "velox/exec/TableWriter.h" #include "velox/exec/TraceUtil.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" @@ -34,6 +34,7 @@ #include "velox/exec/tests/utils/TempDirectoryPath.h" #include "velox/serializers/PrestoSerializer.h" #include "velox/tool/trace/TableScanReplayer.h" +#include "velox/tool/trace/TraceReplayRunner.h" #include "velox/vector/tests/utils/VectorTestBase.h" using namespace facebook::velox; @@ -100,6 +101,70 @@ class TableScanReplayerTest : public HiveConnectorTestBase { core::PlanNodeId traceNodeId_; }; +TEST_F(TableScanReplayerTest, runner) { + const auto vectors = makeVectors(10, 100); + const auto testDir = TempDirectoryPath::create(); + const auto traceRoot = fmt::format("{}/{}", testDir->getPath(), "traceRoot"); + const auto fs = filesystems::getFileSystem(testDir->getPath(), nullptr); + const int numSplits{5}; + std::vector> splitFiles; + for (int i = 0; i < numSplits; ++i) { + auto filePath = TempFilePath::create(); + writeToFile(filePath->getPath(), vectors); + splitFiles.push_back(std::move(filePath)); + } + + const auto plan = tableScanNode(); + std::shared_ptr task; + auto traceResult = + AssertQueryBuilder(plan) + .maxDrivers(1) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceRoot) + .config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config(core::QueryConfig::kQueryTraceNodeIds, traceNodeId_) + .splits(makeHiveConnectorSplits(splitFiles)) + .copyResults(pool(), task); + + const auto taskTraceDir = + exec::trace::getTaskTraceDirectory(traceRoot, *task); + const auto opTraceDir = exec::trace::getOpTraceDirectory( + taskTraceDir, + traceNodeId_, + /*pipelineId=*/0, + /*driverId=*/0); + const auto summary = + exec::trace::OperatorTraceSummaryReader(opTraceDir, pool()).read(); + ASSERT_EQ(summary.opType, "TableScan"); + ASSERT_GT(summary.peakMemory, 0); + const int expectedTotalRows = numSplits * 10 * 100; + ASSERT_EQ(summary.inputRows, expectedTotalRows); + ASSERT_GT(summary.inputBytes, 0); + ASSERT_EQ(summary.rawInputRows, expectedTotalRows); + ASSERT_GT(summary.rawInputBytes, 0); + ASSERT_EQ(summary.numSplits.value(), numSplits); + + FLAGS_root_dir = traceRoot; + FLAGS_query_id = task->queryCtx()->queryId(); + FLAGS_node_id = traceNodeId_; + FLAGS_summary = true; + { + TraceReplayRunner runner; + runner.init(); + runner.run(); + } + + FLAGS_task_id = task->taskId(); + FLAGS_driver_ids = ""; + FLAGS_summary = false; + { + TraceReplayRunner runner; + runner.init(); + runner.run(); + } +} + TEST_F(TableScanReplayerTest, basic) { const auto vectors = makeVectors(10, 100); const auto testDir = TempDirectoryPath::create();