From 7994bda2fef4ccaece55369261974437369f4d8f Mon Sep 17 00:00:00 2001 From: Xiaoxuan Meng Date: Tue, 29 Oct 2024 22:58:53 -0700 Subject: [PATCH] Improve operator tracing and make it E2E work (#11360) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/11360 This PR improves the operator trace framework and replay tool implementation: (1) Let driver execution framework to handle the trace input collection and summary file generation. The finish trace is either called when trace limit exceeds or on operator close instead of no more input. So it can capture more information in summary like peak memory usage. By removing the trace input collection into the driver framework it eases the trace input collection for a spilling operator. We add to capture the peak memory and input rows in summary so it helps identify the hot operator or task on replay debugging. (2) Change the trace storage layout to have root with query id for traces from different tasks (3) Abstract the replay tool function into TraceReplayRunner class and derive in Meta internal code repo to handle the Meta internal env setup and keep most common part in TraceReplayRunner for OSS (4) A couple of fixes in trace replay tool to make it E2E function in Meta for table writer use case (5) Simplify the trace control logic by throwing if hit trace limit instead of logging in trace summary. (6) Strict the check if trace plan not is not specified or has specified the wrong node as we are only supposed to use trace for debugging purpose instead of production query running. (7) A couple of file/class renaming to make the file/class name to be more specific as currently we only support operator level trace collection and replay Reviewed By: tanjialiang Differential Revision: D64946367 --- velox/common/base/VeloxException.h | 3 + velox/common/file/File.cpp | 5 +- velox/core/PlanNode.cpp | 6 +- velox/core/PlanNode.h | 17 +- velox/core/QueryCtx.cpp | 13 +- velox/core/QueryCtx.h | 6 +- velox/docs/develop/debugging/tracing.rst | 8 +- velox/exec/CMakeLists.txt | 15 +- velox/exec/Driver.cpp | 5 +- velox/exec/Driver.h | 4 +- velox/exec/HashAggregation.cpp | 1 - velox/exec/LocalPlanner.cpp | 11 +- velox/exec/Operator.cpp | 39 +- velox/exec/Operator.h | 13 +- velox/exec/OperatorTraceReader.cpp | 78 ++++ ...ueryDataReader.h => OperatorTraceReader.h} | 33 +- ...eryTraceScan.cpp => OperatorTraceScan.cpp} | 31 +- .../{QueryTraceScan.h => OperatorTraceScan.h} | 11 +- ...DataWriter.cpp => OperatorTraceWriter.cpp} | 56 +-- ...ueryDataWriter.h => OperatorTraceWriter.h} | 30 +- velox/exec/PartitionedOutput.cpp | 2 - velox/exec/QueryDataReader.cpp | 59 --- velox/exec/QueryTraceUtil.cpp | 111 ------ velox/exec/SortWindowBuild.h | 2 +- velox/exec/TableWriter.cpp | 2 +- velox/exec/Task.cpp | 65 ++- velox/exec/Task.h | 12 +- ...MetadataReader.cpp => TaskTraceReader.cpp} | 26 +- ...ueryMetadataReader.h => TaskTraceReader.h} | 8 +- ...MetadataWriter.cpp => TaskTraceWriter.cpp} | 25 +- ...ueryMetadataWriter.h => TaskTraceWriter.h} | 7 +- velox/exec/{QueryTraceTraits.h => Trace.cpp} | 25 +- velox/exec/Trace.h | 60 +++ .../{QueryTraceConfig.cpp => TraceConfig.cpp} | 18 +- .../{QueryTraceConfig.h => TraceConfig.h} | 10 +- velox/exec/TraceUtil.cpp | 182 +++++++++ velox/exec/{QueryTraceUtil.h => TraceUtil.h} | 79 +++- velox/exec/tests/CMakeLists.txt | 3 +- ...eryTraceTest.cpp => OperatorTraceTest.cpp} | 377 +++++++++++------- velox/exec/tests/TraceUtilTest.cpp | 186 +++++++++ velox/exec/tests/utils/OperatorTestBase.h | 2 - velox/exec/tests/utils/PlanBuilder.cpp | 5 +- velox/exec/tests/utils/PlanBuilder.h | 3 + velox/tool/trace/AggregationReplayer.cpp | 2 +- velox/tool/trace/AggregationReplayer.h | 6 +- velox/tool/trace/CMakeLists.txt | 2 +- velox/tool/trace/OperatorReplayerBase.cpp | 36 +- velox/tool/trace/OperatorReplayerBase.h | 13 +- .../tool/trace/PartitionedOutputReplayer.cpp | 13 +- velox/tool/trace/PartitionedOutputReplayer.h | 3 +- velox/tool/trace/TableWriterReplayer.cpp | 2 +- velox/tool/trace/TableWriterReplayer.h | 11 +- ...ueryReplayer.cpp => TraceReplayRunner.cpp} | 258 +++++++----- velox/tool/trace/TraceReplayRunner.h | 54 +++ velox/tool/trace/TraceReplayerMain.cpp | 28 ++ .../trace/tests/AggregationReplayerTest.cpp | 16 +- .../tests/PartitionedOutputReplayerTest.cpp | 16 +- .../trace/tests/TableWriterReplayerTest.cpp | 6 +- 58 files changed, 1393 insertions(+), 727 deletions(-) create mode 100644 velox/exec/OperatorTraceReader.cpp rename velox/exec/{QueryDataReader.h => OperatorTraceReader.h} (62%) rename velox/exec/{QueryTraceScan.cpp => OperatorTraceScan.cpp} (62%) rename velox/exec/{QueryTraceScan.h => OperatorTraceScan.h} (88%) rename velox/exec/{QueryDataWriter.cpp => OperatorTraceWriter.cpp} (59%) rename velox/exec/{QueryDataWriter.h => OperatorTraceWriter.h} (79%) delete mode 100644 velox/exec/QueryDataReader.cpp delete mode 100644 velox/exec/QueryTraceUtil.cpp rename velox/exec/{QueryMetadataReader.cpp => TaskTraceReader.cpp} (73%) rename velox/exec/{QueryMetadataReader.h => TaskTraceReader.h} (84%) rename velox/exec/{QueryMetadataWriter.cpp => TaskTraceWriter.cpp} (75%) rename velox/exec/{QueryMetadataWriter.h => TaskTraceWriter.h} (85%) rename velox/exec/{QueryTraceTraits.h => Trace.cpp} (50%) create mode 100644 velox/exec/Trace.h rename velox/exec/{QueryTraceConfig.cpp => TraceConfig.cpp} (75%) rename velox/exec/{QueryTraceConfig.h => TraceConfig.h} (88%) create mode 100644 velox/exec/TraceUtil.cpp rename velox/exec/{QueryTraceUtil.h => TraceUtil.h} (50%) rename velox/exec/tests/{QueryTraceTest.cpp => OperatorTraceTest.cpp} (58%) create mode 100644 velox/exec/tests/TraceUtilTest.cpp rename velox/tool/trace/{QueryReplayer.cpp => TraceReplayRunner.cpp} (57%) create mode 100644 velox/tool/trace/TraceReplayRunner.h create mode 100644 velox/tool/trace/TraceReplayerMain.cpp diff --git a/velox/common/base/VeloxException.h b/velox/common/base/VeloxException.h index fa8d18e15d09..148ae38f06a2 100644 --- a/velox/common/base/VeloxException.h +++ b/velox/common/base/VeloxException.h @@ -105,6 +105,9 @@ inline constexpr auto kNoCacheSpace = "NO_CACHE_SPACE"_fs; /// An error raised when spill bytes exceeds limits. inline constexpr auto kSpillLimitExceeded = "SPILL_LIMIT_EXCEEDED"_fs; +/// An error raised when trace bytes exceeds limits. +inline constexpr auto kTraceLimitExceeded = "TRACE_LIMIT_EXCEEDED"_fs; + /// Errors indicating file read corruptions. inline constexpr auto kFileCorruption = "FILE_CORRUPTION"_fs; diff --git a/velox/common/file/File.cpp b/velox/common/file/File.cpp index 46156c2f1aa7..6a30f0a26159 100644 --- a/velox/common/file/File.cpp +++ b/velox/common/file/File.cpp @@ -170,9 +170,10 @@ void LocalReadFile::preadInternal(uint64_t offset, uint64_t length, char* pos) VELOX_CHECK_EQ( bytesRead, length, - "fread failure in LocalReadFile::PReadInternal, {} vs {}.", + "fread failure in LocalReadFile::PReadInternal, {} vs {}: {}", bytesRead, - length); + length, + folly::errnoStr(errno)); } std::string_view diff --git a/velox/core/PlanNode.cpp b/velox/core/PlanNode.cpp index 9e3c1fc0fad3..609ad7183513 100644 --- a/velox/core/PlanNode.cpp +++ b/velox/core/PlanNode.cpp @@ -2355,15 +2355,15 @@ folly::dynamic PlanNode::serialize() const { return obj; } -const std::vector& QueryTraceScanNode::sources() const { +const std::vector& TraceScanNode::sources() const { return kEmptySources; } -std::string QueryTraceScanNode::traceDir() const { +std::string TraceScanNode::traceDir() const { return traceDir_; } -void QueryTraceScanNode::addDetails(std::stringstream& stream) const { +void TraceScanNode::addDetails(std::stringstream& stream) const { stream << "Trace dir: " << traceDir_; } diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index 331f3618c286..c893a834fda8 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -314,13 +314,17 @@ class ArrowStreamNode : public PlanNode { std::shared_ptr arrowStream_; }; -class QueryTraceScanNode final : public PlanNode { +class TraceScanNode final : public PlanNode { public: - QueryTraceScanNode( + TraceScanNode( const PlanNodeId& id, const std::string& traceDir, + uint32_t pipelineId, const RowTypePtr& outputType) - : PlanNode(id), traceDir_(traceDir), outputType_(outputType) {} + : PlanNode(id), + traceDir_(traceDir), + pipelineId_(pipelineId), + outputType_(outputType) {} const RowTypePtr& outputType() const override { return outputType_; @@ -333,17 +337,22 @@ class QueryTraceScanNode final : public PlanNode { } folly::dynamic serialize() const override { - VELOX_UNSUPPORTED("QueryReplayScanNode is not serializable"); + VELOX_UNSUPPORTED("TraceScanNode is not serializable"); return nullptr; } std::string traceDir() const; + uint32_t pipelineId() const { + return pipelineId_; + } + private: void addDetails(std::stringstream& stream) const override; // Directory of traced data, which is $traceRoot/$taskId/$nodeId. const std::string traceDir_; + const uint32_t pipelineId_; const RowTypePtr outputType_; }; diff --git a/velox/core/QueryCtx.cpp b/velox/core/QueryCtx.cpp index ee238c081800..19428281d5b6 100644 --- a/velox/core/QueryCtx.cpp +++ b/velox/core/QueryCtx.cpp @@ -87,16 +87,13 @@ void QueryCtx::updateSpilledBytesAndCheckLimit(uint64_t bytes) { } } -bool QueryCtx::updateTracedBytesAndCheckLimit(uint64_t bytes) { - if (numTracedBytes_.fetch_add(bytes) + bytes < +void QueryCtx::updateTracedBytesAndCheckLimit(uint64_t bytes) { + if (numTracedBytes_.fetch_add(bytes) + bytes >= queryConfig_.queryTraceMaxBytes()) { - return false; + VELOX_SPILL_LIMIT_EXCEEDED(fmt::format( + "Query exceeded per-query local trace limit of {}", + succinctBytes(queryConfig_.queryTraceMaxBytes()))); } - - numTracedBytes_.fetch_sub(bytes); - LOG(WARNING) << "Query exceeded trace limit of " - << succinctBytes(queryConfig_.queryTraceMaxBytes()); - return true; } std::unique_ptr QueryCtx::MemoryReclaimer::create( diff --git a/velox/core/QueryCtx.h b/velox/core/QueryCtx.h index c6cc96a3caf2..5e89ccc4c907 100644 --- a/velox/core/QueryCtx.h +++ b/velox/core/QueryCtx.h @@ -122,9 +122,9 @@ class QueryCtx : public std::enable_shared_from_this { /// the max spill bytes limit. void updateSpilledBytesAndCheckLimit(uint64_t bytes); - /// Updates the aggregated trace bytes of this query, and return true if - /// exceeds the max query trace bytes limit. - bool updateTracedBytesAndCheckLimit(uint64_t bytes); + /// Updates the aggregated trace bytes of this query, and throws if exceeds + /// the max query trace bytes limit. + void updateTracedBytesAndCheckLimit(uint64_t bytes); void testingOverrideMemoryPool(std::shared_ptr pool) { pool_ = std::move(pool); diff --git a/velox/docs/develop/debugging/tracing.rst b/velox/docs/develop/debugging/tracing.rst index 1750296503e1..0536546d2a6e 100644 --- a/velox/docs/develop/debugging/tracing.rst +++ b/velox/docs/develop/debugging/tracing.rst @@ -57,7 +57,7 @@ The tracing framework consists of three components: Query Trace Writer ^^^^^^^^^^^^^^^^^^ -**QueryMetadataWriter** records the query metadata during task creation, +**TaskTraceMetadataWriter** records the query metadata during task creation, serializes, and writes them into a file in JSON format. There are two kinds of metadata: @@ -66,20 +66,20 @@ of metadata: - Plan fragment of the task (also known as a plan node tree). It can be serialized as a JSON object, which is already supported in Velox. -**QueryDataWriter** records the input vectors from the target operator, which are +**OperatorTraceWriter** records the input vectors from the target operator, which are serialized and written as a binary file. Query Trace Reader ^^^^^^^^^^^^^^^^^^ -**QueryMetadataReader** can load the query metadata JSON file, and extract the query +**TaskTraceMetadataReader** can load the query metadata JSON file, and extract the query configurations, connector properties, and the plan fragment. **QueryDataReader** can read and deserialize the input vectors of the target operator. It is used as the utility to replay the input data as a source operator in the target operator replay. -**NOTE**: `QueryDataWriter` serializes and flushes the input vectors in batches, +**NOTE**: `OperatorTraceWriter` serializes and flushes the input vectors in batches, allowing it to replay the input process using the same sequence of batches. Query Trace Util diff --git a/velox/exec/CMakeLists.txt b/velox/exec/CMakeLists.txt index 9972f46b3ecb..73ca66635895 100644 --- a/velox/exec/CMakeLists.txt +++ b/velox/exec/CMakeLists.txt @@ -57,13 +57,14 @@ velox_add_library( OrderBy.cpp OutputBuffer.cpp OutputBufferManager.cpp - QueryDataReader.cpp - QueryDataWriter.cpp - QueryMetadataReader.cpp - QueryMetadataWriter.cpp - QueryTraceConfig.cpp - QueryTraceScan.cpp - QueryTraceUtil.cpp + OperatorTraceReader.cpp + OperatorTraceScan.cpp + OperatorTraceWriter.cpp + TaskTraceReader.cpp + TaskTraceWriter.cpp + Trace.cpp + TraceConfig.cpp + TraceUtil.cpp PartitionedOutput.cpp PartitionFunction.cpp PartitionStreamingWindowBuild.cpp diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index 56614cf823fb..27fe4275c572 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -95,8 +95,8 @@ const core::QueryConfig& DriverCtx::queryConfig() const { return task->queryCtx()->queryConfig(); } -const std::optional& DriverCtx::traceConfig() const { - return task->queryTraceConfig(); +const std::optional& DriverCtx::traceConfig() const { + return task->traceConfig(); } velox::memory::MemoryPool* DriverCtx::addOperatorPool( @@ -618,6 +618,7 @@ StopReason Driver::runInternal( lockedStats->addInputVector( resultBytes, intermediateResult->size()); } + nextOp->traceInput(intermediateResult); TestValue::adjust( "facebook::velox::exec::Driver::runInternal::addInput", nextOp); diff --git a/velox/exec/Driver.h b/velox/exec/Driver.h index 2af297dfef11..1605d09e4316 100644 --- a/velox/exec/Driver.h +++ b/velox/exec/Driver.h @@ -27,7 +27,7 @@ #include "velox/common/time/CpuWallTimer.h" #include "velox/core/PlanFragment.h" #include "velox/core/QueryCtx.h" -#include "velox/exec/QueryTraceConfig.h" +#include "velox/exec/TraceConfig.h" namespace facebook::velox::exec { @@ -291,7 +291,7 @@ struct DriverCtx { const core::QueryConfig& queryConfig() const; - const std::optional& traceConfig() const; + const std::optional& traceConfig() const; velox::memory::MemoryPool* addOperatorPool( const core::PlanNodeId& planNodeId, diff --git a/velox/exec/HashAggregation.cpp b/velox/exec/HashAggregation.cpp index b15eab7bf441..ef072d059d2c 100644 --- a/velox/exec/HashAggregation.cpp +++ b/velox/exec/HashAggregation.cpp @@ -117,7 +117,6 @@ bool HashAggregation::abandonPartialAggregationEarly(int64_t numOutput) const { } void HashAggregation::addInput(RowVectorPtr input) { - traceInput(input); if (!pushdownChecked_) { mayPushdown_ = operatorCtx_->driver()->mayPushdownAggregation(this); pushdownChecked_ = true; diff --git a/velox/exec/LocalPlanner.cpp b/velox/exec/LocalPlanner.cpp index e75e489168a4..aec3104be3a2 100644 --- a/velox/exec/LocalPlanner.cpp +++ b/velox/exec/LocalPlanner.cpp @@ -32,9 +32,9 @@ #include "velox/exec/MergeJoin.h" #include "velox/exec/NestedLoopJoinBuild.h" #include "velox/exec/NestedLoopJoinProbe.h" +#include "velox/exec/OperatorTraceScan.h" #include "velox/exec/OrderBy.h" #include "velox/exec/PartitionedOutput.h" -#include "velox/exec/QueryTraceScan.h" #include "velox/exec/RowNumber.h" #include "velox/exec/StreamingAggregation.h" #include "velox/exec/TableScan.h" @@ -595,11 +595,10 @@ std::shared_ptr DriverFactory::createDriver( assignUniqueIdNode->taskUniqueId(), assignUniqueIdNode->uniqueIdCounter())); } else if ( - const auto queryReplayScanNode = - std::dynamic_pointer_cast( - planNode)) { - operators.push_back(std::make_unique( - id, ctx.get(), queryReplayScanNode)); + const auto traceScanNode = + std::dynamic_pointer_cast(planNode)) { + operators.push_back(std::make_unique( + id, ctx.get(), traceScanNode)); } else { std::unique_ptr extended; if (planNode->requiresExchangeClient()) { diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 910e4cec76b2..bb55b790426b 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -19,10 +19,8 @@ #include "velox/common/base/SuccinctPrinter.h" #include "velox/common/testutil/TestValue.h" #include "velox/exec/Driver.h" -#include "velox/exec/HashJoinBridge.h" #include "velox/exec/OperatorUtils.h" -#include "velox/exec/QueryTraceUtil.h" -#include "velox/exec/Task.h" +#include "velox/exec/TraceUtil.h" #include "velox/expression/Expr.h" using facebook::velox::common::testutil::TestValue; @@ -107,17 +105,13 @@ void Operator::maybeSetReclaimer() { } void Operator::maybeSetTracer() { - const auto& queryTraceConfig = operatorCtx_->driverCtx()->traceConfig(); - if (!queryTraceConfig.has_value()) { - return; - } - - if (operatorCtx_->driverCtx()->queryConfig().queryTraceMaxBytes() == 0) { + const auto& traceConfig = operatorCtx_->driverCtx()->traceConfig(); + if (!traceConfig.has_value()) { return; } const auto nodeId = planNodeId(); - if (queryTraceConfig->queryNodes.count(nodeId) == 0) { + if (traceConfig->queryNodes.count(nodeId) == 0) { return; } @@ -134,20 +128,17 @@ void Operator::maybeSetTracer() { const auto pipelineId = operatorCtx_->driverCtx()->pipelineId; const auto driverId = operatorCtx_->driverCtx()->driverId; - LOG(INFO) << "Trace data for operator type: " << operatorType() + LOG(INFO) << "Trace input for operator type: " << operatorType() << ", operator id: " << operatorId() << ", pipeline: " << pipelineId << ", driver: " << driverId << ", task: " << taskId(); - const auto opTraceDirPath = fmt::format( - "{}/{}/{}/{}/data", - queryTraceConfig->queryTraceDir, - planNodeId(), - pipelineId, - driverId); + const auto opTraceDirPath = trace::getOpTraceDirectory( + traceConfig->queryTraceDir, planNodeId(), pipelineId, driverId); trace::createTraceDirectory(opTraceDirPath); - inputTracer_ = std::make_unique( + inputTracer_ = std::make_unique( + this, opTraceDirPath, memory::traceMemoryPool(), - queryTraceConfig->updateAndCheckTraceLimitCB); + traceConfig->updateAndCheckTraceLimitCB); } void Operator::traceInput(const RowVectorPtr& input) { @@ -319,6 +310,16 @@ OperatorStats Operator::stats(bool clear) { return stats; } +void Operator::close() { + input_ = nullptr; + results_.clear(); + recordSpillStats(); + finishTrace(); + + // Release the unused memory reservation on close. + operatorCtx_->pool()->release(); +} + vector_size_t Operator::outputBatchRows( std::optional averageRowSize) const { const auto& queryConfig = operatorCtx_->task()->queryCtx()->queryConfig(); diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index 20561b3e788d..2138fa47e3cc 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -21,7 +21,7 @@ #include "velox/core/PlanNode.h" #include "velox/exec/Driver.h" #include "velox/exec/JoinBridge.h" -#include "velox/exec/QueryDataWriter.h" +#include "velox/exec/OperatorTraceWriter.h" #include "velox/exec/Spiller.h" #include "velox/type/Filter.h" @@ -408,7 +408,6 @@ class Operator : public BaseRuntimeStatWriter { /// e.g. the first operator in the pipeline. virtual void noMoreInput() { noMoreInput_ = true; - finishTrace(); } /// Returns a RowVector with the result columns. Returns nullptr if @@ -483,13 +482,7 @@ class Operator : public BaseRuntimeStatWriter { /// Frees all resources associated with 'this'. No other methods /// should be called after this. - virtual void close() { - input_ = nullptr; - results_.clear(); - recordSpillStats(); - // Release the unused memory reservation on close. - operatorCtx_->pool()->release(); - } + virtual void close(); // Returns true if 'this' never has more output rows than input rows. virtual bool isFilter() const { @@ -781,7 +774,7 @@ class Operator : public BaseRuntimeStatWriter { folly::Synchronized stats_; folly::Synchronized spillStats_; - std::unique_ptr inputTracer_; + std::unique_ptr inputTracer_; /// Indicates if an operator is under a non-reclaimable execution section. /// This prevents the memory arbitrator from reclaiming memory from this diff --git a/velox/exec/OperatorTraceReader.cpp b/velox/exec/OperatorTraceReader.cpp new file mode 100644 index 000000000000..4477717b4042 --- /dev/null +++ b/velox/exec/OperatorTraceReader.cpp @@ -0,0 +1,78 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "velox/exec/OperatorTraceReader.h" + +#include "velox/exec/TraceUtil.h" + +namespace facebook::velox::exec::trace { + +OperatorTraceInputReader::OperatorTraceInputReader( + std::string traceDir, + RowTypePtr dataType, + memory::MemoryPool* pool) + : traceDir_(std::move(traceDir)), + fs_(filesystems::getFileSystem(traceDir_, nullptr)), + dataType_(std::move(dataType)), + pool_(pool), + inputStream_(getInputStream()) { + VELOX_CHECK_NOT_NULL(dataType_); + VELOX_CHECK_NOT_NULL(inputStream_); +} + +bool OperatorTraceInputReader::read(RowVectorPtr& batch) const { + if (inputStream_->atEnd()) { + batch = nullptr; + return false; + } + + VectorStreamGroup::read( + inputStream_.get(), pool_, dataType_, &batch, &readOptions_); + return true; +} + +std::unique_ptr +OperatorTraceInputReader::getInputStream() const { + auto traceFile = fs_->openFileForRead(getOpTraceInputFilePath(traceDir_)); + // TODO: Make the buffer size configurable. + return std::make_unique( + std::move(traceFile), 1 << 20, pool_); +} + +OperatorTraceSummaryReader::OperatorTraceSummaryReader( + std::string traceDir, + memory::MemoryPool* pool) + : traceDir_(std::move(traceDir)), + fs_(filesystems::getFileSystem(traceDir_, nullptr)), + pool_(pool), + summaryFile_(fs_->openFileForRead(getOpTraceSummaryFilePath(traceDir_))) { +} + +OperatorTraceSummary OperatorTraceSummaryReader::read() const { + VELOX_CHECK_NOT_NULL(summaryFile_); + const auto summaryStr = summaryFile_->pread(0, summaryFile_->size()); + VELOX_CHECK(!summaryStr.empty()); + + folly::dynamic summaryObj = folly::parseJson(summaryStr); + OperatorTraceSummary summary; + summary.opType = summaryObj[OperatorTraceTraits::kOpTypeKey].asString(); + summary.peakMemory = summaryObj[OperatorTraceTraits::kPeakMemoryKey].asInt(); + summary.inputRows = summaryObj[OperatorTraceTraits::kInputRowsKey].asInt(); + return summary; +} +} // namespace facebook::velox::exec::trace diff --git a/velox/exec/QueryDataReader.h b/velox/exec/OperatorTraceReader.h similarity index 62% rename from velox/exec/QueryDataReader.h rename to velox/exec/OperatorTraceReader.h index b5e6d24e011a..2236d73fe2f7 100644 --- a/velox/exec/QueryDataReader.h +++ b/velox/exec/OperatorTraceReader.h @@ -18,16 +18,16 @@ #include "velox/common/file/FileInputStream.h" #include "velox/common/file/FileSystems.h" -#include "velox/core/PlanNode.h" -#include "velox/core/QueryCtx.h" +#include "velox/exec/Trace.h" #include "velox/serializers/PrestoSerializer.h" -#include "velox/vector/VectorStream.h" namespace facebook::velox::exec::trace { -class QueryDataReader { +/// Used to read an operator trace input. +class OperatorTraceInputReader { public: - explicit QueryDataReader( + /// 'traceDir' specifies the operator trace directory. + OperatorTraceInputReader( std::string traceDir, RowTypePtr dataType, memory::MemoryPool* pool); @@ -37,16 +37,33 @@ class QueryDataReader { bool read(RowVectorPtr& batch) const; private: - std::unique_ptr getDataInputStream() const; + std::unique_ptr getInputStream() const; const std::string traceDir_; const serializer::presto::PrestoVectorSerde::PrestoOptions readOptions_{ true, common::CompressionKind_ZSTD, // TODO: Use trace config. - /*nullsFirst=*/true}; + /*_nullsFirst=*/true}; const std::shared_ptr fs_; const RowTypePtr dataType_; memory::MemoryPool* const pool_; - const std::unique_ptr dataStream_; + const std::unique_ptr inputStream_; +}; + +/// Used to read an operator trace summary. +class OperatorTraceSummaryReader { + public: + /// 'traceDir' specifies the operator trace directory. + OperatorTraceSummaryReader(std::string traceDir, memory::MemoryPool* pool); + + /// Read and return the operator trace 'summary'. The function throws if it + /// fails. + OperatorTraceSummary read() const; + + private: + const std::string traceDir_; + const std::shared_ptr fs_; + memory::MemoryPool* const pool_; + const std::unique_ptr summaryFile_; }; } // namespace facebook::velox::exec::trace diff --git a/velox/exec/QueryTraceScan.cpp b/velox/exec/OperatorTraceScan.cpp similarity index 62% rename from velox/exec/QueryTraceScan.cpp rename to velox/exec/OperatorTraceScan.cpp index 5cc4f0b5c223..a976367afa74 100644 --- a/velox/exec/QueryTraceScan.cpp +++ b/velox/exec/OperatorTraceScan.cpp @@ -14,33 +14,32 @@ * limitations under the License. */ -#include "velox/exec/QueryTraceScan.h" +#include "velox/exec/OperatorTraceScan.h" -#include "QueryTraceUtil.h" +#include "velox/exec/TraceUtil.h" namespace facebook::velox::exec::trace { -QueryTraceScan::QueryTraceScan( +OperatorTraceScan::OperatorTraceScan( int32_t operatorId, DriverCtx* driverCtx, - const std::shared_ptr& queryTraceScanNode) + const std::shared_ptr& traceScanNode) : SourceOperator( driverCtx, - queryTraceScanNode->outputType(), + traceScanNode->outputType(), operatorId, - queryTraceScanNode->id(), - "QueryReplayScan") { - const auto dataDir = getDataDir( - queryTraceScanNode->traceDir(), - driverCtx->pipelineId, - driverCtx->driverId); - traceReader_ = std::make_unique( - dataDir, - queryTraceScanNode->outputType(), + traceScanNode->id(), + "OperatorTraceScan") { + traceReader_ = std::make_unique( + getOpTraceDirectory( + traceScanNode->traceDir(), + traceScanNode->pipelineId(), + driverCtx->driverId), + traceScanNode->outputType(), memory::MemoryManager::getInstance()->tracePool()); } -RowVectorPtr QueryTraceScan::getOutput() { +RowVectorPtr OperatorTraceScan::getOutput() { RowVectorPtr batch; if (traceReader_->read(batch)) { return batch; @@ -49,7 +48,7 @@ RowVectorPtr QueryTraceScan::getOutput() { return nullptr; } -bool QueryTraceScan::isFinished() { +bool OperatorTraceScan::isFinished() { return finished_; } diff --git a/velox/exec/QueryTraceScan.h b/velox/exec/OperatorTraceScan.h similarity index 88% rename from velox/exec/QueryTraceScan.h rename to velox/exec/OperatorTraceScan.h index 8bff5bca486d..1542e43f2c41 100644 --- a/velox/exec/QueryTraceScan.h +++ b/velox/exec/OperatorTraceScan.h @@ -18,7 +18,7 @@ #include "velox/core/PlanNode.h" #include "velox/exec/Operator.h" -#include "velox/exec/QueryDataReader.h" +#include "velox/exec/OperatorTraceReader.h" namespace facebook::velox::exec::trace { /// This is a scan operator for query replay. It uses traced data from a @@ -38,13 +38,12 @@ namespace facebook::velox::exec::trace { /// It can be found from the QueryReplayScanNode. However the pipeline ID and /// driver ID are only known during operator creation, so we need to figure out /// the input traced data file and the output type dynamically. -class QueryTraceScan final : public SourceOperator { +class OperatorTraceScan final : public SourceOperator { public: - QueryTraceScan( + OperatorTraceScan( int32_t operatorId, DriverCtx* driverCtx, - const std::shared_ptr& - queryTraceScanNode); + const std::shared_ptr& traceScanNode); RowVectorPtr getOutput() override; @@ -55,7 +54,7 @@ class QueryTraceScan final : public SourceOperator { bool isFinished() override; private: - std::unique_ptr traceReader_; + std::unique_ptr traceReader_; bool finished_{false}; }; diff --git a/velox/exec/QueryDataWriter.cpp b/velox/exec/OperatorTraceWriter.cpp similarity index 59% rename from velox/exec/QueryDataWriter.cpp rename to velox/exec/OperatorTraceWriter.cpp index 7ab4e6d02ec0..75f2a5609c01 100644 --- a/velox/exec/QueryDataWriter.cpp +++ b/velox/exec/OperatorTraceWriter.cpp @@ -14,31 +14,32 @@ * limitations under the License. */ -#include "velox/exec/QueryDataWriter.h" +#include "velox/exec/OperatorTraceWriter.h" #include -#include "velox/common/base/SpillStats.h" #include "velox/common/file/File.h" #include "velox/common/file/FileSystems.h" -#include "velox/exec/QueryTraceTraits.h" -#include "velox/serializers/PrestoSerializer.h" +#include "velox/exec/Operator.h" +#include "velox/exec/Trace.h" +#include "velox/exec/TraceUtil.h" namespace facebook::velox::exec::trace { -QueryDataWriter::QueryDataWriter( - std::string path, +OperatorTraceWriter::OperatorTraceWriter( + Operator* traceOp, + std::string traceDir, memory::MemoryPool* pool, UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB) - : dirPath_(std::move(path)), - fs_(filesystems::getFileSystem(dirPath_, nullptr)), + : traceOp_(traceOp), + traceDir_(std::move(traceDir)), + fs_(filesystems::getFileSystem(traceDir_, nullptr)), pool_(pool), updateAndCheckTraceLimitCB_(std::move(updateAndCheckTraceLimitCB)) { - dataFile_ = fs_->openFileForWrite( - fmt::format("{}/{}", dirPath_, QueryTraceTraits::kDataFileName)); - VELOX_CHECK_NOT_NULL(dataFile_); + traceFile_ = fs_->openFileForWrite(getOpTraceInputFilePath(traceDir_)); + VELOX_CHECK_NOT_NULL(traceFile_); } -void QueryDataWriter::write(const RowVectorPtr& rows) { +void OperatorTraceWriter::write(const RowVectorPtr& rows) { if (FOLLY_UNLIKELY(finished_)) { return; } @@ -61,37 +62,36 @@ void QueryDataWriter::write(const RowVectorPtr& rows) { batch_->flush(&out); batch_->clear(); auto iobuf = out.getIOBuf(); - if (FOLLY_UNLIKELY( - updateAndCheckTraceLimitCB_(iobuf->computeChainDataLength()))) { - finish(true); - return; - } - dataFile_->append(std::move(iobuf)); + updateAndCheckTraceLimitCB_(iobuf->computeChainDataLength()); + traceFile_->append(std::move(iobuf)); } -void QueryDataWriter::finish(bool limitExceeded) { +void OperatorTraceWriter::finish() { if (finished_) { return; } VELOX_CHECK_NOT_NULL( - dataFile_, "The query data writer has already been finished"); - dataFile_->close(); - dataFile_.reset(); + traceFile_, "The query data writer has already been finished"); + traceFile_->close(); + traceFile_.reset(); batch_.reset(); - writeSummary(limitExceeded); + writeSummary(); finished_ = true; } -void QueryDataWriter::writeSummary(bool limitExceeded) const { - const auto summaryFilePath = - fmt::format("{}/{}", dirPath_, QueryTraceTraits::kDataSummaryFileName); +void OperatorTraceWriter::writeSummary() const { + const auto summaryFilePath = getOpTraceSummaryFilePath(traceDir_); const auto file = fs_->openFileForWrite(summaryFilePath); folly::dynamic obj = folly::dynamic::object; if (dataType_ != nullptr) { - obj[QueryTraceTraits::kDataTypeKey] = dataType_->serialize(); + obj[TraceTraits::kDataTypeKey] = dataType_->serialize(); } - obj[QueryTraceTraits::kTraceLimitExceededKey] = limitExceeded; + obj[OperatorTraceTraits::kOpTypeKey] = traceOp_->operatorType(); + const auto stats = traceOp_->stats(/*clear=*/false); + obj[OperatorTraceTraits::kPeakMemoryKey] = + stats.memoryStats.peakTotalMemoryReservation; + obj[OperatorTraceTraits::kInputRowsKey] = stats.inputPositions; file->append(folly::toJson(obj)); file->close(); } diff --git a/velox/exec/QueryDataWriter.h b/velox/exec/OperatorTraceWriter.h similarity index 79% rename from velox/exec/QueryDataWriter.h rename to velox/exec/OperatorTraceWriter.h index 8e6073dcd3b7..9999f2c31dbd 100644 --- a/velox/exec/QueryDataWriter.h +++ b/velox/exec/OperatorTraceWriter.h @@ -16,21 +16,27 @@ #pragma once -#include "QueryTraceConfig.h" +#include "TraceConfig.h" #include "velox/common/file/File.h" #include "velox/common/file/FileSystems.h" -#include "velox/exec/QueryTraceTraits.h" #include "velox/serializers/PrestoSerializer.h" #include "velox/vector/VectorStream.h" +namespace facebook::velox::exec { +class Operator; +} + namespace facebook::velox::exec::trace { /// Used to serialize and write the input vectors from a given operator into a /// file. -class QueryDataWriter { +class OperatorTraceWriter { public: - explicit QueryDataWriter( - std::string path, + /// 'traceOp' is the operator to trace. 'traceDir' specifies the trace + /// directory for the operator. + explicit OperatorTraceWriter( + Operator* traceOp, + std::string traceDir, memory::MemoryPool* pool, UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB); @@ -38,18 +44,16 @@ class QueryDataWriter { void write(const RowVectorPtr& rows); /// Closes the data file and writes out the data summary. - /// - /// @param limitExceeded A flag indicates the written data bytes exceed the - /// limit causing the 'QueryDataWriter' to finish early. - void finish(bool limitExceeded = false); + void finish(); private: // Flushes the trace data summaries to the disk. // // TODO: add more summaries such as number of rows etc. - void writeSummary(bool limitExceeded = false) const; + void writeSummary() const; - const std::string dirPath_; + Operator* const traceOp_; + const std::string traceDir_; // TODO: make 'useLosslessTimestamp' configuerable. const serializer::presto::PrestoVectorSerde::PrestoOptions options_ = { true, @@ -58,9 +62,11 @@ class QueryDataWriter { const std::shared_ptr fs_; memory::MemoryPool* const pool_; const UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB_; - std::unique_ptr dataFile_; + + std::unique_ptr traceFile_; TypePtr dataType_; std::unique_ptr batch_; + bool limitExceeded_{false}; bool finished_{false}; }; diff --git a/velox/exec/PartitionedOutput.cpp b/velox/exec/PartitionedOutput.cpp index cabff0661146..6b49fe22efa8 100644 --- a/velox/exec/PartitionedOutput.cpp +++ b/velox/exec/PartitionedOutput.cpp @@ -233,8 +233,6 @@ void PartitionedOutput::estimateRowSizes() { } void PartitionedOutput::addInput(RowVectorPtr input) { - traceInput(input); - initializeInput(std::move(input)); initializeDestinations(); initializeSizeBuffers(); diff --git a/velox/exec/QueryDataReader.cpp b/velox/exec/QueryDataReader.cpp deleted file mode 100644 index ba08205dd8f2..000000000000 --- a/velox/exec/QueryDataReader.cpp +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include - -#include "velox/exec/QueryDataReader.h" - -#include "velox/common/file/File.h" -#include "velox/exec/QueryTraceTraits.h" - -namespace facebook::velox::exec::trace { - -QueryDataReader::QueryDataReader( - std::string traceDir, - RowTypePtr dataType, - memory::MemoryPool* pool) - : traceDir_(std::move(traceDir)), - fs_(filesystems::getFileSystem(traceDir_, nullptr)), - dataType_(std::move(dataType)), - pool_(pool), - dataStream_(getDataInputStream()) { - VELOX_CHECK_NOT_NULL(dataType_); - VELOX_CHECK_NOT_NULL(dataStream_); -} - -bool QueryDataReader::read(RowVectorPtr& batch) const { - if (dataStream_->atEnd()) { - batch = nullptr; - return false; - } - - VectorStreamGroup::read( - dataStream_.get(), pool_, dataType_, &batch, &readOptions_); - return true; -} - -std::unique_ptr QueryDataReader::getDataInputStream() - const { - auto dataFile = fs_->openFileForRead( - fmt::format("{}/{}", traceDir_, QueryTraceTraits::kDataFileName)); - // TODO: Make the buffer size configurable. - return std::make_unique( - std::move(dataFile), 1 << 20, pool_); -} - -} // namespace facebook::velox::exec::trace diff --git a/velox/exec/QueryTraceUtil.cpp b/velox/exec/QueryTraceUtil.cpp deleted file mode 100644 index f3a339eec080..000000000000 --- a/velox/exec/QueryTraceUtil.cpp +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "velox/exec/QueryTraceUtil.h" - -#include - -#include "velox/common/base/Exceptions.h" -#include "velox/common/file/File.h" -#include "velox/common/file/FileSystems.h" -#include "velox/exec/QueryTraceTraits.h" - -namespace facebook::velox::exec::trace { - -void createTraceDirectory(const std::string& traceDir) { - try { - const auto fs = filesystems::getFileSystem(traceDir, nullptr); - if (fs->exists(traceDir)) { - fs->rmdir(traceDir); - } - fs->mkdir(traceDir); - } catch (const std::exception& e) { - VELOX_FAIL( - "Failed to create trace directory '{}' with error: {}", - traceDir, - e.what()); - } -} - -std::vector getTaskIds( - const std::string& traceDir, - const std::shared_ptr& fs) { - VELOX_USER_CHECK(fs->exists(traceDir), "{} dose not exist", traceDir); - try { - const auto taskDirs = fs->list(traceDir); - std::vector taskIds; - for (const auto& taskDir : taskDirs) { - std::vector pathNodes; - folly::split("/", taskDir, pathNodes); - taskIds.emplace_back(std::move(pathNodes.back())); - } - return taskIds; - } catch (const std::exception& e) { - VELOX_FAIL( - "Failed to list the directory '{}' with error: {}", traceDir, e.what()); - } -} - -folly::dynamic getMetadata( - const std::string& metadataFile, - const std::shared_ptr& fs) { - try { - const auto file = fs->openFileForRead(metadataFile); - VELOX_CHECK_NOT_NULL(file); - const auto metadata = file->pread(0, file->size()); - VELOX_USER_CHECK(!metadata.empty()); - return folly::parseJson(metadata); - } catch (const std::exception& e) { - VELOX_FAIL( - "Failed to get the query metadata from '{}' with error: {}", - metadataFile, - e.what()); - } -} - -RowTypePtr getDataType( - const core::PlanNodePtr& tracedPlan, - const std::string& tracedNodeId, - size_t sourceIndex) { - const auto* traceNode = core::PlanNode::findFirstNode( - tracedPlan.get(), [&tracedNodeId](const core::PlanNode* node) { - return node->id() == tracedNodeId; - }); - VELOX_CHECK_NOT_NULL( - traceNode, - "traced node id {} not found in the traced plan", - tracedNodeId); - return traceNode->sources().at(sourceIndex)->outputType(); -} - -uint8_t getNumDrivers( - const std::string& rootDir, - const std::string& taskId, - const std::string& nodeId, - int32_t pipelineId, - const std::shared_ptr& fs) { - const auto traceDir = - fmt::format("{}/{}/{}/{}", rootDir, taskId, nodeId, pipelineId); - const auto driverDirs = fs->list(traceDir); - return driverDirs.size(); -} - -std::string -getDataDir(const std::string& traceDir, int pipelineId, int driverId) { - return fmt::format("{}/{}/{}/data", traceDir, pipelineId, driverId); -} - -} // namespace facebook::velox::exec::trace diff --git a/velox/exec/SortWindowBuild.h b/velox/exec/SortWindowBuild.h index aa364846ba51..45e7b3390cb1 100644 --- a/velox/exec/SortWindowBuild.h +++ b/velox/exec/SortWindowBuild.h @@ -35,7 +35,7 @@ class SortWindowBuild : public WindowBuild { tsan_atomic* nonReclaimableSection, folly::Synchronized* spillStats); - ~SortWindowBuild() { + ~SortWindowBuild() override { pool_->release(); } diff --git a/velox/exec/TableWriter.cpp b/velox/exec/TableWriter.cpp index 3d289621f38a..4899e4b4ebf8 100644 --- a/velox/exec/TableWriter.cpp +++ b/velox/exec/TableWriter.cpp @@ -126,7 +126,7 @@ void TableWriter::addInput(RowVectorPtr input) { if (input->size() == 0) { return; } - traceInput(input); + std::vector mappedChildren; mappedChildren.reserve(inputMapping_.size()); for (const auto i : inputMapping_) { diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 22e68d6c475d..0bce65f91d14 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -24,14 +24,14 @@ #include "velox/common/testutil/TestValue.h" #include "velox/common/time/Timer.h" #include "velox/exec/Exchange.h" -#include "velox/exec/HashBuild.h" +#include "velox/exec/HashJoinBridge.h" #include "velox/exec/LocalPlanner.h" #include "velox/exec/MemoryReclaimer.h" #include "velox/exec/NestedLoopJoinBuild.h" #include "velox/exec/OperatorUtils.h" #include "velox/exec/OutputBufferManager.h" -#include "velox/exec/QueryTraceUtil.h" #include "velox/exec/Task.h" +#include "velox/exec/TraceUtil.h" using facebook::velox::common::testutil::TestValue; @@ -304,7 +304,7 @@ Task::Task( dynamic_cast(queryCtx_->executor())); } - maybeInitQueryTrace(); + maybeInitTrace(); } Task::~Task() { @@ -2888,7 +2888,7 @@ std::shared_ptr Task::getExchangeClientLocked( return exchangeClients_[pipelineId]; } -std::optional Task::maybeMakeTraceConfig() const { +std::optional Task::maybeMakeTraceConfig() const { const auto& queryConfig = queryCtx_->queryConfig(); if (!queryConfig.queryTraceEnabled()) { return std::nullopt; @@ -2906,41 +2906,62 @@ std::optional Task::maybeMakeTraceConfig() const { return std::nullopt; } - const auto traceDir = - fmt::format("{}/{}", queryConfig.queryTraceDir(), taskId_); - const auto queryTraceNodes = queryConfig.queryTraceNodeIds(); - if (queryTraceNodes.empty()) { - LOG(INFO) << "Trace metadata for task: " << taskId_; - return trace::QueryTraceConfig(traceDir); + const auto traceNodes = queryConfig.queryTraceNodeIds(); + VELOX_USER_CHECK(!traceNodes.empty(), "Query trace nodes are not set"); + + const auto traceDir = trace::getTaskTraceDirectory( + queryConfig.queryTraceDir(), queryCtx_->queryId(), taskId_); + + std::vector traceNodeIds; + folly::split(',', traceNodes, traceNodeIds); + std::unordered_set traceNodeIdSet( + traceNodeIds.begin(), traceNodeIds.end()); + VELOX_USER_CHECK_EQ( + traceNodeIdSet.size(), + traceNodeIds.size(), + "Duplicate trace nodes found: {}", + folly::join(", ", traceNodeIds)); + + bool foundTraceNode{false}; + for (const auto& traceNodeId : traceNodeIds) { + if (core::PlanNode::findFirstNode( + planFragment_.planNode.get(), + [traceNodeId](const core::PlanNode* node) -> bool { + return node->id() == traceNodeId; + })) { + foundTraceNode = true; + break; + } } + VELOX_USER_CHECK( + foundTraceNode, + "Trace plan nodes not found from task {}: {}", + taskId_, + folly::join(",", traceNodeIdSet)); - std::vector nodes; - folly::split(',', queryTraceNodes, nodes); - std::unordered_set nodeSet(nodes.begin(), nodes.end()); - VELOX_CHECK_EQ(nodeSet.size(), nodes.size()); - LOG(INFO) << "Trace data for task " << taskId_ << " with plan nodes " - << queryTraceNodes; + LOG(INFO) << "Trace input for plan nodes " << traceNodes << " from task " + << taskId_; trace::UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB = [this](uint64_t bytes) { - return queryCtx_->updateTracedBytesAndCheckLimit(bytes); + queryCtx_->updateTracedBytesAndCheckLimit(bytes); }; - return trace::QueryTraceConfig( - std::move(nodeSet), + return trace::TraceConfig( + std::move(traceNodeIdSet), traceDir, std::move(updateAndCheckTraceLimitCB), queryConfig.queryTraceTaskRegExp()); } -void Task::maybeInitQueryTrace() { +void Task::maybeInitTrace() { if (!traceConfig_) { return; } trace::createTraceDirectory(traceConfig_->queryTraceDir); - const auto queryMetadatWriter = std::make_unique( + const auto metadataWriter = std::make_unique( traceConfig_->queryTraceDir, memory::traceMemoryPool()); - queryMetadatWriter->write(queryCtx_, planFragment_.planNode); + metadataWriter->write(queryCtx_, planFragment_.planNode); } void Task::testingVisitDrivers(const std::function& callback) { diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 526ab710400c..7f421952ce51 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -21,11 +21,11 @@ #include "velox/exec/LocalPartition.h" #include "velox/exec/MemoryReclaimer.h" #include "velox/exec/MergeSource.h" -#include "velox/exec/QueryMetadataWriter.h" -#include "velox/exec/QueryTraceConfig.h" #include "velox/exec/Split.h" #include "velox/exec/TaskStats.h" #include "velox/exec/TaskStructs.h" +#include "velox/exec/TaskTraceWriter.h" +#include "velox/exec/TraceConfig.h" #include "velox/vector/ComplexVector.h" namespace facebook::velox::exec { @@ -139,7 +139,7 @@ class Task : public std::enable_shared_from_this { } /// Returns query trace config if specified. - const std::optional& queryTraceConfig() const { + const std::optional& traceConfig() const { return traceConfig_; } @@ -980,11 +980,11 @@ class Task : public std::enable_shared_from_this { int32_t pipelineId) const; // Builds the query trace config. - std::optional maybeMakeTraceConfig() const; + std::optional maybeMakeTraceConfig() const; // Create a 'QueryMetadtaWriter' to trace the query metadata if the query // trace enabled. - void maybeInitQueryTrace(); + void maybeInitTrace(); // Universally unique identifier of the task. Used to identify the task when // calling TaskListener. @@ -1004,7 +1004,7 @@ class Task : public std::enable_shared_from_this { core::PlanFragment planFragment_; - const std::optional traceConfig_; + const std::optional traceConfig_; // Hook in the system wide task list. TaskListEntry taskListEntry_; diff --git a/velox/exec/QueryMetadataReader.cpp b/velox/exec/TaskTraceReader.cpp similarity index 73% rename from velox/exec/QueryMetadataReader.cpp rename to velox/exec/TaskTraceReader.cpp index f9fdc9ec2e80..6ed295190f06 100644 --- a/velox/exec/QueryMetadataReader.cpp +++ b/velox/exec/TaskTraceReader.cpp @@ -14,44 +14,40 @@ * limitations under the License. */ -#include "velox/exec/QueryMetadataReader.h" +#include "velox/exec/TaskTraceReader.h" -#include "velox/common/file/File.h" #include "velox/common/file/FileSystems.h" #include "velox/core/PlanNode.h" -#include "velox/exec/QueryTraceTraits.h" -#include "velox/exec/QueryTraceUtil.h" +#include "velox/exec/Trace.h" +#include "velox/exec/TraceUtil.h" namespace facebook::velox::exec::trace { -QueryMetadataReader::QueryMetadataReader( +TaskTraceMetadataReader::TaskTraceMetadataReader( std::string traceDir, memory::MemoryPool* pool) : traceDir_(std::move(traceDir)), fs_(filesystems::getFileSystem(traceDir_, nullptr)), - metaFilePath_(fmt::format( - "{}/{}", - traceDir_, - QueryTraceTraits::kQueryMetaFileName)), + traceFilePath_(getTaskTraceMetaFilePath(traceDir_)), pool_(pool) { VELOX_CHECK_NOT_NULL(fs_); - VELOX_CHECK(fs_->exists(metaFilePath_)); + VELOX_CHECK(fs_->exists(traceFilePath_)); } -void QueryMetadataReader::read( +void TaskTraceMetadataReader::read( std::unordered_map& queryConfigs, std::unordered_map< std::string, std::unordered_map>& connectorProperties, core::PlanNodePtr& queryPlan) const { - folly::dynamic metaObj = getMetadata(metaFilePath_, fs_); - const auto& queryConfigObj = metaObj[QueryTraceTraits::kQueryConfigKey]; + folly::dynamic metaObj = getTaskMetadata(traceFilePath_, fs_); + const auto& queryConfigObj = metaObj[TraceTraits::kQueryConfigKey]; for (const auto& [key, value] : queryConfigObj.items()) { queryConfigs[key.asString()] = value.asString(); } const auto& connectorPropertiesObj = - metaObj[QueryTraceTraits::kConnectorPropertiesKey]; + metaObj[TraceTraits::kConnectorPropertiesKey]; for (const auto& [connectorId, configs] : connectorPropertiesObj.items()) { const auto connectorIdStr = connectorId.asString(); connectorProperties[connectorIdStr] = {}; @@ -61,6 +57,6 @@ void QueryMetadataReader::read( } queryPlan = ISerializable::deserialize( - metaObj[QueryTraceTraits::kPlanNodeKey], pool_); + metaObj[TraceTraits::kPlanNodeKey], pool_); } } // namespace facebook::velox::exec::trace diff --git a/velox/exec/QueryMetadataReader.h b/velox/exec/TaskTraceReader.h similarity index 84% rename from velox/exec/QueryMetadataReader.h rename to velox/exec/TaskTraceReader.h index 71217653d4dc..df3abcffc3b6 100644 --- a/velox/exec/QueryMetadataReader.h +++ b/velox/exec/TaskTraceReader.h @@ -18,13 +18,11 @@ #include "velox/common/file/FileSystems.h" #include "velox/core/PlanNode.h" -#include "velox/core/QueryCtx.h" -#include "velox/vector/VectorStream.h" namespace facebook::velox::exec::trace { -class QueryMetadataReader { +class TaskTraceMetadataReader { public: - explicit QueryMetadataReader(std::string traceDir, memory::MemoryPool* pool); + TaskTraceMetadataReader(std::string traceDir, memory::MemoryPool* pool); void read( std::unordered_map& queryConfigs, @@ -36,7 +34,7 @@ class QueryMetadataReader { private: const std::string traceDir_; const std::shared_ptr fs_; - const std::string metaFilePath_; + const std::string traceFilePath_; memory::MemoryPool* const pool_; }; } // namespace facebook::velox::exec::trace diff --git a/velox/exec/QueryMetadataWriter.cpp b/velox/exec/TaskTraceWriter.cpp similarity index 75% rename from velox/exec/QueryMetadataWriter.cpp rename to velox/exec/TaskTraceWriter.cpp index a8bbe6633c66..cbf0413fd0b4 100644 --- a/velox/exec/QueryMetadataWriter.cpp +++ b/velox/exec/TaskTraceWriter.cpp @@ -14,30 +14,27 @@ * limitations under the License. */ -#include "velox/exec/QueryMetadataWriter.h" -#include "velox/common/config/Config.h" +#include "velox/exec/TaskTraceWriter.h" #include "velox/common/file/File.h" #include "velox/core/PlanNode.h" #include "velox/core/QueryCtx.h" -#include "velox/exec/QueryTraceTraits.h" +#include "velox/exec/Trace.h" +#include "velox/exec/TraceUtil.h" namespace facebook::velox::exec::trace { -QueryMetadataWriter::QueryMetadataWriter( +TaskTraceMetadataWriter::TaskTraceMetadataWriter( std::string traceDir, memory::MemoryPool* pool) : traceDir_(std::move(traceDir)), fs_(filesystems::getFileSystem(traceDir_, nullptr)), - metaFilePath_(fmt::format( - "{}/{}", - traceDir_, - QueryTraceTraits::kQueryMetaFileName)), + traceFilePath_(getTaskTraceMetaFilePath(traceDir_)), pool_(pool) { VELOX_CHECK_NOT_NULL(fs_); - VELOX_CHECK(!fs_->exists(metaFilePath_)); + VELOX_CHECK(!fs_->exists(traceFilePath_)); } -void QueryMetadataWriter::write( +void TaskTraceMetadataWriter::write( const std::shared_ptr& queryCtx, const core::PlanNodePtr& planNode) { VELOX_CHECK(!finished_, "Query metadata can only be written once"); @@ -59,12 +56,12 @@ void QueryMetadataWriter::write( } folly::dynamic metaObj = folly::dynamic::object; - metaObj[QueryTraceTraits::kQueryConfigKey] = queryConfigObj; - metaObj[QueryTraceTraits::kConnectorPropertiesKey] = connectorPropertiesObj; - metaObj[QueryTraceTraits::kPlanNodeKey] = planNode->serialize(); + metaObj[TraceTraits::kQueryConfigKey] = queryConfigObj; + metaObj[TraceTraits::kConnectorPropertiesKey] = connectorPropertiesObj; + metaObj[TraceTraits::kPlanNodeKey] = planNode->serialize(); const auto metaStr = folly::toJson(metaObj); - const auto file = fs_->openFileForWrite(metaFilePath_); + const auto file = fs_->openFileForWrite(traceFilePath_); file->append(metaStr); file->close(); } diff --git a/velox/exec/QueryMetadataWriter.h b/velox/exec/TaskTraceWriter.h similarity index 85% rename from velox/exec/QueryMetadataWriter.h rename to velox/exec/TaskTraceWriter.h index f2cc66123851..5b3105104dd6 100644 --- a/velox/exec/QueryMetadataWriter.h +++ b/velox/exec/TaskTraceWriter.h @@ -19,12 +19,11 @@ #include "velox/common/file/FileSystems.h" #include "velox/core/PlanNode.h" #include "velox/core/QueryCtx.h" -#include "velox/vector/VectorStream.h" namespace facebook::velox::exec::trace { -class QueryMetadataWriter { +class TaskTraceMetadataWriter { public: - explicit QueryMetadataWriter(std::string traceDir, memory::MemoryPool* pool); + TaskTraceMetadataWriter(std::string traceDir, memory::MemoryPool* pool); void write( const std::shared_ptr& queryCtx, @@ -33,7 +32,7 @@ class QueryMetadataWriter { private: const std::string traceDir_; const std::shared_ptr fs_; - const std::string metaFilePath_; + const std::string traceFilePath_; memory::MemoryPool* const pool_; bool finished_{false}; }; diff --git a/velox/exec/QueryTraceTraits.h b/velox/exec/Trace.cpp similarity index 50% rename from velox/exec/QueryTraceTraits.h rename to velox/exec/Trace.cpp index ad817115b490..b4d224fe4daf 100644 --- a/velox/exec/QueryTraceTraits.h +++ b/velox/exec/Trace.cpp @@ -14,22 +14,19 @@ * limitations under the License. */ -#pragma once +#include "velox/exec/Trace.h" -#include +#include + +#include "velox/common/base/SuccinctPrinter.h" namespace facebook::velox::exec::trace { -/// Defines the shared constants used by query trace implementation. -struct QueryTraceTraits { - static inline const std::string kPlanNodeKey = "planNode"; - static inline const std::string kQueryConfigKey = "queryConfig"; - static inline const std::string kDataTypeKey = "rowType"; - static inline const std::string kConnectorPropertiesKey = - "connectorProperties"; - static inline const std::string kTraceLimitExceededKey = "traceLimitExceeded"; - static inline const std::string kQueryMetaFileName = "query_meta.json"; - static inline const std::string kDataSummaryFileName = "data_summary.json"; - static inline const std::string kDataFileName = "trace.data"; -}; +std::string OperatorTraceSummary::toString() const { + return fmt::format( + "opType {}, inputRows {}, peakMemory {}", + opType, + inputRows, + succinctBytes(peakMemory)); +} } // namespace facebook::velox::exec::trace diff --git a/velox/exec/Trace.h b/velox/exec/Trace.h new file mode 100644 index 000000000000..6bf7c6103b84 --- /dev/null +++ b/velox/exec/Trace.h @@ -0,0 +1,60 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace facebook::velox::exec::trace { +/// Defines the shared constants used by query trace implementation. +struct TraceTraits { + static inline const std::string kPlanNodeKey = "planNode"; + static inline const std::string kQueryConfigKey = "queryConfig"; + static inline const std::string kDataTypeKey = "rowType"; + static inline const std::string kConnectorPropertiesKey = + "connectorProperties"; + + static inline const std::string kTaskMetaFileName = "task_trace_meta.json"; +}; + +struct OperatorTraceTraits { + static inline const std::string kSummaryFileName = "op_trace_summary.json"; + static inline const std::string kInputFileName = "op_input_trace.data"; + + /// Keys for operator trace summary file. + static inline const std::string kOpTypeKey = "opType"; + static inline const std::string kPeakMemoryKey = "peakMemory"; + static inline const std::string kInputRowsKey = "inputhRows"; +}; + +/// Contains the summary of an operator trace. +struct OperatorTraceSummary { + std::string opType; + uint64_t inputRows{0}; + uint64_t peakMemory{0}; + + std::string toString() const; +}; + +#define VELOX_TRACE_LIMIT_EXCEEDED(errorMessage) \ + _VELOX_THROW( \ + ::facebook::velox::VeloxRuntimeError, \ + ::facebook::velox::error_source::kErrorSourceRuntime.c_str(), \ + ::facebook::velox::error_code::kTraceLimitExceeded.c_str(), \ + /* isRetriable */ true, \ + "{}", \ + errorMessage); +} // namespace facebook::velox::exec::trace diff --git a/velox/exec/QueryTraceConfig.cpp b/velox/exec/TraceConfig.cpp similarity index 75% rename from velox/exec/QueryTraceConfig.cpp rename to velox/exec/TraceConfig.cpp index d43479d64608..ecb64aec3b74 100644 --- a/velox/exec/QueryTraceConfig.cpp +++ b/velox/exec/TraceConfig.cpp @@ -14,13 +14,15 @@ * limitations under the License. */ -#include "velox/exec/QueryTraceConfig.h" +#include "velox/exec/TraceConfig.h" #include +#include "velox/common/base/Exceptions.h" + namespace facebook::velox::exec::trace { -QueryTraceConfig::QueryTraceConfig( +TraceConfig::TraceConfig( std::unordered_set _queryNodeIds, std::string _queryTraceDir, UpdateAndCheckTraceLimitCB _updateAndCheckTraceLimitCB, @@ -28,13 +30,7 @@ QueryTraceConfig::QueryTraceConfig( : queryNodes(std::move(_queryNodeIds)), queryTraceDir(std::move(_queryTraceDir)), updateAndCheckTraceLimitCB(std::move(_updateAndCheckTraceLimitCB)), - taskRegExp(std::move(_taskRegExp)) {} - -QueryTraceConfig::QueryTraceConfig(std::string _queryTraceDir) - : QueryTraceConfig( - std::unordered_set{}, - std::move(_queryTraceDir), - [](uint64_t) { return false; }, - ".*") {} - + taskRegExp(std::move(_taskRegExp)) { + VELOX_CHECK(!queryNodes.empty(), "Query trace nodes cannot be empty"); +} } // namespace facebook::velox::exec::trace diff --git a/velox/exec/QueryTraceConfig.h b/velox/exec/TraceConfig.h similarity index 88% rename from velox/exec/QueryTraceConfig.h rename to velox/exec/TraceConfig.h index 200e5cb52ce4..084df3173e3c 100644 --- a/velox/exec/QueryTraceConfig.h +++ b/velox/exec/TraceConfig.h @@ -25,9 +25,9 @@ namespace facebook::velox::exec::trace { /// The callback used to update and aggregate the trace bytes of a query. If the /// query trace limit is set, the callback return true if the aggregate traced /// bytes exceed the set limit otherwise return false. -using UpdateAndCheckTraceLimitCB = std::function; +using UpdateAndCheckTraceLimitCB = std::function; -struct QueryTraceConfig { +struct TraceConfig { /// Target query trace nodes. std::unordered_set queryNodes; /// Base dir of query trace. @@ -36,14 +36,10 @@ struct QueryTraceConfig { /// The trace task regexp. std::string taskRegExp; - QueryTraceConfig( + TraceConfig( std::unordered_set _queryNodeIds, std::string _queryTraceDir, UpdateAndCheckTraceLimitCB _updateAndCheckTraceLimitCB, std::string _taskRegExp); - - QueryTraceConfig(std::string _queryTraceDir); - - QueryTraceConfig() = default; }; } // namespace facebook::velox::exec::trace diff --git a/velox/exec/TraceUtil.cpp b/velox/exec/TraceUtil.cpp new file mode 100644 index 000000000000..4df3b3f12e10 --- /dev/null +++ b/velox/exec/TraceUtil.cpp @@ -0,0 +1,182 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/TraceUtil.h" + +#include + +#include "velox/common/base/Exceptions.h" +#include "velox/common/file/File.h" +#include "velox/common/file/FileSystems.h" +#include "velox/exec/Trace.h" + +namespace facebook::velox::exec::trace { +namespace { +std::string findLastPathNode(const std::string& path) { + std::vector pathNodes; + folly::split("/", path, pathNodes); + while (!pathNodes.empty() && pathNodes.back().empty()) { + pathNodes.pop_back(); + } + VELOX_CHECK(!pathNodes.empty(), "No valid path nodes found from {}", path); + return pathNodes.back(); +} +} // namespace + +void createTraceDirectory(const std::string& traceDir) { + try { + const auto fs = filesystems::getFileSystem(traceDir, nullptr); + if (fs->exists(traceDir)) { + fs->rmdir(traceDir); + } + fs->mkdir(traceDir); + } catch (const std::exception& e) { + VELOX_FAIL( + "Failed to create trace directory '{}' with error: {}", + traceDir, + e.what()); + } +} + +std::string getQueryTraceDirectory( + const std::string& traceDir, + const std::string& queryId) { + return fmt::format("{}/{}", traceDir, queryId); +} + +std::string getTaskTraceDirectory( + const std::string& traceDir, + const Task& task) { + return getTaskTraceDirectory( + traceDir, task.queryCtx()->queryId(), task.taskId()); +} + +std::string getTaskTraceDirectory( + const std::string& traceDir, + const std::string& queryId, + const std::string& taskId) { + return fmt::format( + "{}/{}", getQueryTraceDirectory(traceDir, queryId), taskId); +} + +std::string getTaskTraceMetaFilePath(const std::string& taskTraceDir) { + return fmt::format("{}/{}", taskTraceDir, TraceTraits::kTaskMetaFileName); +} + +std::string getNodeTraceDirectory( + const std::string& taskTraceDir, + const std::string& nodeId) { + return fmt::format("{}/{}", taskTraceDir, nodeId); +} + +std::string getPipelineTraceDirectory( + const std::string& nodeTraceDir, + uint32_t pipelineId) { + return fmt::format("{}/{}", nodeTraceDir, pipelineId); +} + +std::string getOpTraceDirectory( + const std::string& taskTraceDir, + const std::string& nodeId, + uint32_t pipelineId, + uint32_t driverId) { + return getOpTraceDirectory( + getNodeTraceDirectory(taskTraceDir, nodeId), pipelineId, driverId); +} + +std::string getOpTraceDirectory( + const std::string& nodeTraceDir, + int pipelineId, + int driverId) { + return fmt::format("{}/{}/{}", nodeTraceDir, pipelineId, driverId); +} + +std::string getOpTraceInputFilePath(const std::string& opTraceDir) { + return fmt::format("{}/{}", opTraceDir, OperatorTraceTraits::kInputFileName); +} + +std::string getOpTraceSummaryFilePath(const std::string& opTraceDir) { + return fmt::format( + "{}/{}", opTraceDir, OperatorTraceTraits::kSummaryFileName); +} + +std::vector getTaskIds( + const std::string& traceDir, + const std::string& queryId, + const std::shared_ptr& fs) { + const auto queryTraceDir = getQueryTraceDirectory(traceDir, queryId); + VELOX_USER_CHECK( + fs->exists(queryTraceDir), "{} dose not exist", queryTraceDir); + const auto taskDirs = fs->list(queryTraceDir); + std::vector taskIds; + for (const auto& taskDir : taskDirs) { + taskIds.emplace_back(findLastPathNode(taskDir)); + } + return taskIds; +} + +folly::dynamic getTaskMetadata( + const std::string& taskMetaFilePath, + const std::shared_ptr& fs) { + try { + const auto file = fs->openFileForRead(taskMetaFilePath); + VELOX_CHECK_NOT_NULL(file); + const auto taskMeta = file->pread(0, file->size()); + VELOX_USER_CHECK(!taskMeta.empty()); + return folly::parseJson(taskMeta); + } catch (const std::exception& e) { + VELOX_FAIL( + "Failed to get the query metadata from '{}' with error: {}", + taskMetaFilePath, + e.what()); + } +} + +RowTypePtr getDataType( + const core::PlanNodePtr& tracedPlan, + const std::string& tracedNodeId, + size_t sourceIndex) { + const auto* traceNode = core::PlanNode::findFirstNode( + tracedPlan.get(), [&tracedNodeId](const core::PlanNode* node) { + return node->id() == tracedNodeId; + }); + VELOX_CHECK_NOT_NULL( + traceNode, + "traced node id {} not found in the traced plan", + tracedNodeId); + return traceNode->sources().at(sourceIndex)->outputType(); +} + +std::vector listDriverIds( + const std::string& nodeTraceDir, + uint32_t pipelineId, + const std::shared_ptr& fs) { + const auto pipelineDir = getPipelineTraceDirectory(nodeTraceDir, pipelineId); + const auto driverDirs = fs->list(pipelineDir); + std::vector driverIds; + for (const auto& driverDir : driverDirs) { + driverIds.emplace_back(folly::to(findLastPathNode(driverDir))); + } + return driverIds; +} + +size_t getNumDrivers( + const std::string& nodeTraceDir, + uint32_t pipelineId, + const std::shared_ptr& fs) { + return listDriverIds(nodeTraceDir, pipelineId, fs).size(); +} +} // namespace facebook::velox::exec::trace diff --git a/velox/exec/QueryTraceUtil.h b/velox/exec/TraceUtil.h similarity index 50% rename from velox/exec/QueryTraceUtil.h rename to velox/exec/TraceUtil.h index 633c0bc27b2a..7b226844b759 100644 --- a/velox/exec/QueryTraceUtil.h +++ b/velox/exec/TraceUtil.h @@ -20,6 +20,7 @@ #include #include "velox/common/file/FileSystems.h" #include "velox/core/PlanNode.h" +#include "velox/exec/Task.h" #include "velox/type/Type.h" #include @@ -29,6 +30,52 @@ namespace facebook::velox::exec::trace { /// Creates a directory to store the query trace metdata and data. void createTraceDirectory(const std::string& traceDir); +/// Returns the trace directory for a given query. +std::string getQueryTraceDirectory( + const std::string& traceDir, + const std::string& queryId); + +/// Returns the trace directory for a given query task. +std::string getTaskTraceDirectory( + const std::string& traceDir, + const Task& task); + +std::string getTaskTraceDirectory( + const std::string& traceDir, + const std::string& queryId, + const std::string& taskId); + +/// Returns the file path for a given task's metadata trace file. +std::string getTaskTraceMetaFilePath(const std::string& taskTraceDir); + +/// Returns the trace directory for a given traced plan node. +std::string getNodeTraceDirectory( + const std::string& taskTraceDir, + const std::string& nodeId); + +/// Returns the trace directory for a given traced pipeline. +std::string getPipelineTraceDirectory( + const std::string& nodeTraceDir, + uint32_t pipelineId); + +/// Returns the trace directory for a given traced operator. +std::string getOpTraceDirectory( + const std::string& taskTraceDir, + const std::string& nodeId, + uint32_t pipelineId, + uint32_t driverId); + +std::string getOpTraceDirectory( + const std::string& nodeTraceDir, + int pipelineId, + int driverId); + +/// Returns the file path for a given operator's traced input file. +std::string getOpTraceInputFilePath(const std::string& opTraceDir); + +/// Returns the file path for a given operator's traced input file. +std::string getOpTraceSummaryFilePath(const std::string& opTraceDir); + /// Extracts the input data type for the trace scan operator. The function first /// uses the traced node id to find traced operator's plan node from the traced /// plan fragment. Then it uses the specified source node index to find the @@ -48,28 +95,32 @@ RowTypePtr getDataType( const std::string& tracedNodeId, size_t sourceIndex = 0); +/// Extracts the driver ids by listing the sub-directors under the trace +/// directory for a given pipeline and decode the sub-directory names to get +/// driver id. 'nodeTraceDir' is the trace directory of the plan node. +std::vector listDriverIds( + const std::string& nodeTraceDir, + uint32_t pipelineId, + const std::shared_ptr& fs); + /// Extracts the number of drivers by listing the number of sub-directors under -/// the trace directory for a given pipeline. -uint8_t getNumDrivers( - const std::string& rootDir, - const std::string& taskId, - const std::string& nodeId, - int32_t pipelineId, +/// the trace directory for a given pipeline. 'nodeTraceDir' is the trace +/// directory of the plan node. +size_t getNumDrivers( + const std::string& nodeTraceDir, + uint32_t pipelineId, const std::shared_ptr& fs); -/// Extracts task ids of the query tracing by listing the trace directory. +/// Extracts task ids of the query tracing by listing the query trace directory. +/// 'traceDir' is the root trace directory. 'queryId' is the query id. std::vector getTaskIds( const std::string& traceDir, + const std::string& queryId, const std::shared_ptr& fs); /// Gets the metadata from a given task metadata file which includes query plan, /// configs and connector properties. -folly::dynamic getMetadata( - const std::string& metadataFile, +folly::dynamic getTaskMetadata( + const std::string& taskMetaFilePath, const std::shared_ptr& fs); - -/// Gets the traced data directory. 'traceaDir' is the trace directory for a -/// given plan node, which is $traceRoot/$taskId/$nodeId. -std::string -getDataDir(const std::string& traceDir, int pipelineId, int driverId); } // namespace facebook::velox::exec::trace diff --git a/velox/exec/tests/CMakeLists.txt b/velox/exec/tests/CMakeLists.txt index 8904fdc20648..9b228e302463 100644 --- a/velox/exec/tests/CMakeLists.txt +++ b/velox/exec/tests/CMakeLists.txt @@ -60,14 +60,15 @@ add_executable( MultiFragmentTest.cpp NestedLoopJoinTest.cpp OrderByTest.cpp + OperatorTraceTest.cpp OutputBufferManagerTest.cpp PartitionedOutputTest.cpp PlanNodeSerdeTest.cpp PlanNodeToStringTest.cpp PrefixSortTest.cpp PrintPlanWithStatsTest.cpp - QueryTraceTest.cpp ProbeOperatorStateTest.cpp + TraceUtilTest.cpp RoundRobinPartitionFunctionTest.cpp RowContainerTest.cpp RowNumberTest.cpp diff --git a/velox/exec/tests/QueryTraceTest.cpp b/velox/exec/tests/OperatorTraceTest.cpp similarity index 58% rename from velox/exec/tests/QueryTraceTest.cpp rename to velox/exec/tests/OperatorTraceTest.cpp index 71b49da483a0..36d9eb467d5e 100644 --- a/velox/exec/tests/QueryTraceTest.cpp +++ b/velox/exec/tests/OperatorTraceTest.cpp @@ -16,26 +16,25 @@ #include #include -#include #include +#include "velox/common/base/tests/GTestUtils.h" #include "velox/common/file/FileSystems.h" +#include "velox/exec/OperatorTraceReader.h" #include "velox/exec/PartitionFunction.h" -#include "velox/exec/QueryDataReader.h" -#include "velox/exec/QueryDataWriter.h" -#include "velox/exec/QueryMetadataReader.h" -#include "velox/exec/QueryMetadataWriter.h" -#include "velox/exec/QueryTraceUtil.h" -#include "velox/exec/tests/utils/ArbitratorTestUtil.h" +#include "velox/exec/TaskTraceReader.h" +#include "velox/exec/Trace.h" +#include "velox/exec/TraceUtil.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" #include "velox/serializers/PrestoSerializer.h" -#include "velox/vector/tests/utils/VectorTestBase.h" using namespace facebook::velox::exec::test; namespace facebook::velox::exec::trace::test { -class QueryTracerTest : public HiveConnectorTestBase { +class OperatorTraceTest : public HiveConnectorTestBase { protected: static void SetUpTestCase() { memory::MemoryManager::testingSetInstance({}); @@ -55,6 +54,11 @@ class QueryTracerTest : public HiveConnectorTestBase { registerPartitionFunctionSerDe(); } + void SetUp() override { + HiveConnectorTestBase::SetUp(); + dataType_ = ROW({"a", "b", "c"}, {BIGINT(), BIGINT(), BIGINT()}); + } + static VectorFuzzer::Options getFuzzerOptions() { return VectorFuzzer::Options{ .vectorSize = 16, @@ -65,7 +69,7 @@ class QueryTracerTest : public HiveConnectorTestBase { }; } - QueryTracerTest() : vectorFuzzer_{getFuzzerOptions(), pool_.get()} { + OperatorTraceTest() : vectorFuzzer_{getFuzzerOptions(), pool_.get()} { filesystems::registerLocalFileSystem(); } @@ -99,32 +103,63 @@ class QueryTracerTest : public HiveConnectorTestBase { return true; } + std::unique_ptr driverCtx() { + return std::make_unique(nullptr, 0, 0, 0, 0); + } + + RowTypePtr dataType_; VectorFuzzer vectorFuzzer_; }; -TEST_F(QueryTracerTest, emptyTrace) { - const auto outputDir = TempDirectoryPath::create(); - auto writer = trace::QueryDataWriter( - outputDir->getPath(), pool(), [&](uint64_t bytes) { return false; }); - writer.finish(); - - const auto fs = filesystems::getFileSystem(outputDir->getPath(), nullptr); - const auto summaryFile = fs->openFileForRead(fmt::format( - "{}/{}", outputDir->getPath(), QueryTraceTraits::kDataSummaryFileName)); - const auto summary = summaryFile->pread(0, summaryFile->size()); - ASSERT_FALSE(summary.empty()); - folly::dynamic obj = folly::parseJson(summary); - ASSERT_EQ(obj[QueryTraceTraits::kTraceLimitExceededKey].asBool(), false); +TEST_F(OperatorTraceTest, emptyTrace) { + auto input = vectorFuzzer_.fuzzInputRow(dataType_); + input->childAt(0) = + makeFlatVector(input->size(), [](auto /*unused*/) { return 0; }); + createDuckDbTable({input}); + + std::string planNodeId; + auto traceDirPath = TempDirectoryPath::create(); + auto plan = PlanBuilder() + .values({input}) + .filter("a > 0") + .singleAggregation({"a"}, {"count(1)"}) + .capturePlanNodeId(planNodeId) + .planNode(); + + const auto task = + AssertQueryBuilder(duckDbQueryRunner_) + .plan(plan) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceDirPath->getPath()) + .config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config(core::QueryConfig::kQueryTraceNodeIds, planNodeId) + .assertResults("SELECT a, count(1) FROM tmp WHERE a > 0 GROUP BY 1"); + + const auto taskTraceDir = + getTaskTraceDirectory(traceDirPath->getPath(), *task); + const auto opTraceDir = getOpTraceDirectory( + taskTraceDir, + planNodeId, + /*pipelineId=*/0, + /*driverId=*/0); + const auto summary = OperatorTraceSummaryReader(opTraceDir, pool()).read(); + ASSERT_EQ(summary.inputRows, 0); + ASSERT_EQ(summary.opType, "Aggregation"); + // The hash aggregation operator might allocate memory when prepare output + // buffer even though there is no output. We could optimize out this later if + // needs. + ASSERT_GT(summary.peakMemory, 0); } -TEST_F(QueryTracerTest, traceData) { - const auto rowType = ROW({"a", "b", "c"}, {BIGINT(), BIGINT(), BIGINT()}); +TEST_F(OperatorTraceTest, traceData) { std::vector inputVectors; constexpr auto numBatch = 5; inputVectors.reserve(numBatch); for (auto i = 0; i < numBatch; ++i) { - inputVectors.push_back(vectorFuzzer_.fuzzInputFlatRow(rowType)); + inputVectors.push_back(vectorFuzzer_.fuzzInputFlatRow(dataType_)); } + createDuckDbTable(inputVectors); struct { uint64_t maxTracedBytes; @@ -143,37 +178,62 @@ TEST_F(QueryTracerTest, traceData) { for (const auto& testData : testSettings) { SCOPED_TRACE(testData.debugString()); - const auto outputDir = TempDirectoryPath::create(); - // Ensure the writer only write one batch. - uint64_t numTracedBytes{0}; - auto writer = trace::QueryDataWriter( - outputDir->getPath(), pool(), [&](uint64_t bytes) { - numTracedBytes += bytes; - return numTracedBytes >= testData.maxTracedBytes; - }); - for (auto i = 0; i < numBatch; ++i) { - writer.write(inputVectors[i]); - } - writer.finish(); - - const auto fs = filesystems::getFileSystem(outputDir->getPath(), nullptr); - const auto summaryFile = fs->openFileForRead(fmt::format( - "{}/{}", outputDir->getPath(), QueryTraceTraits::kDataSummaryFileName)); - const auto summary = summaryFile->pread(0, summaryFile->size()); - ASSERT_FALSE(summary.empty()); - folly::dynamic obj = folly::parseJson(summary); - ASSERT_EQ( - obj[QueryTraceTraits::kTraceLimitExceededKey].asBool(), - testData.limitExceeded); - if (testData.maxTracedBytes == 0) { - const auto dataFile = fs->openFileForRead(fmt::format( - "{}/{}", outputDir->getPath(), QueryTraceTraits::kDataFileName)); - ASSERT_EQ(dataFile->size(), 0); + std::string planNodeId; + auto traceDirPath = TempDirectoryPath::create(); + auto plan = PlanBuilder() + .values(inputVectors) + .singleAggregation({"a"}, {"count(1)"}) + .capturePlanNodeId(planNodeId) + .planNode(); + + if (testData.limitExceeded) { + VELOX_ASSERT_THROW( + AssertQueryBuilder(duckDbQueryRunner_) + .plan(plan) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config( + core::QueryConfig::kQueryTraceDir, traceDirPath->getPath()) + .config( + core::QueryConfig::kQueryTraceMaxBytes, + testData.maxTracedBytes) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config(core::QueryConfig::kQueryTraceNodeIds, planNodeId) + .assertResults("SELECT a, count(1) FROM tmp GROUP BY 1"), + "Query exceeded per-query local trace limit of"); continue; } - - const auto reader = QueryDataReader(outputDir->getPath(), rowType, pool()); + const auto task = + AssertQueryBuilder(duckDbQueryRunner_) + .plan(plan) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceDirPath->getPath()) + .config( + core::QueryConfig::kQueryTraceMaxBytes, testData.maxTracedBytes) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config(core::QueryConfig::kQueryTraceNodeIds, planNodeId) + .assertResults("SELECT a, count(1) FROM tmp GROUP BY 1"); + + const auto fs = + filesystems::getFileSystem(traceDirPath->getPath(), nullptr); + const auto taskTraceDir = + getTaskTraceDirectory(traceDirPath->getPath(), *task); + const auto opTraceDir = getOpTraceDirectory( + taskTraceDir, + planNodeId, + /*pipelineId=*/0, + /*driverId=*/0); + const auto summaryFilePath = getOpTraceSummaryFilePath(opTraceDir); + const auto dataFilePath = getOpTraceInputFilePath(opTraceDir); + ASSERT_TRUE(fs->exists(summaryFilePath)); + ASSERT_TRUE(fs->exists(dataFilePath)); + + const auto summary = OperatorTraceSummaryReader(opTraceDir, pool()).read(); + ASSERT_EQ(summary.opType, "Aggregation"); + ASSERT_GT(summary.peakMemory, 0); + ASSERT_EQ(summary.inputRows, testData.numTracedBatches * 16); + + const auto reader = OperatorTraceInputReader(opTraceDir, dataType_, pool()); RowVectorPtr actual; size_t numOutputVectors{0}; while (reader.read(actual)) { @@ -189,7 +249,7 @@ TEST_F(QueryTracerTest, traceData) { } } -TEST_F(QueryTracerTest, traceMetadata) { +TEST_F(OperatorTraceTest, traceMetadata) { const auto rowType = ROW({"c0", "c1", "c2", "c3", "c4", "c5"}, {BIGINT(), SMALLINT(), TINYINT(), VARCHAR(), VARCHAR(), VARCHAR()}); @@ -234,14 +294,14 @@ TEST_F(QueryTracerTest, traceMetadata) { executor_.get(), core::QueryConfig(expectedQueryConfigs), expectedConnectorProperties); - auto writer = trace::QueryMetadataWriter(outputDir->getPath(), pool()); + auto writer = trace::TaskTraceMetadataWriter(outputDir->getPath(), pool()); writer.write(queryCtx, planNode); std::unordered_map acutalQueryConfigs; std::unordered_map> actualConnectorProperties; core::PlanNodePtr actualQueryPlan; - auto reader = trace::QueryMetadataReader(outputDir->getPath(), pool()); + auto reader = trace::TaskTraceMetadataReader(outputDir->getPath(), pool()); reader.read(acutalQueryConfigs, actualConnectorProperties, actualQueryPlan); ASSERT_TRUE(isSamePlan(actualQueryPlan, planNode)); @@ -262,7 +322,7 @@ TEST_F(QueryTracerTest, traceMetadata) { } } -TEST_F(QueryTracerTest, task) { +TEST_F(OperatorTraceTest, task) { const auto rowType = ROW({"c0", "c1", "c2", "c3", "c4", "c5"}, {BIGINT(), SMALLINT(), TINYINT(), VARCHAR(), VARCHAR(), VARCHAR()}); @@ -310,6 +370,8 @@ TEST_F(QueryTracerTest, task) { {core::QueryConfig::kSpillEnabled, "true"}, {core::QueryConfig::kSpillNumPartitionBits, "17"}, {core::QueryConfig::kQueryTraceEnabled, "true"}, + {core::QueryConfig::kQueryTraceMaxBytes, + std::to_string(100UL << 30)}, {core::QueryConfig::kQueryTraceDir, outputDir->getPath()}, {core::QueryConfig::kQueryTraceTaskRegExp, testData.taskRegExpr}, {core::QueryConfig::kQueryTraceNodeIds, "1,2"}, @@ -345,7 +407,8 @@ TEST_F(QueryTracerTest, task) { } ASSERT_EQ(actaulDirs.size(), testData.expectedNumDirs); ASSERT_EQ(actaulDirs.at(0), expectedDir); - const auto taskIds = getTaskIds(outputDir->getPath(), fs); + const auto taskIds = + getTaskIds(outputDir->getPath(), task->queryCtx()->queryId(), fs); ASSERT_EQ(taskIds.size(), testData.expectedNumDirs); ASSERT_EQ(taskIds.at(0), task->taskId()); @@ -354,7 +417,7 @@ TEST_F(QueryTracerTest, task) { unordered_map> actualConnectorProperties; core::PlanNodePtr actualQueryPlan; - auto reader = trace::QueryMetadataReader(expectedDir, pool()); + auto reader = trace::TaskTraceMetadataReader(expectedDir, pool()); reader.read(acutalQueryConfigs, actualConnectorProperties, actualQueryPlan); ASSERT_TRUE(isSamePlan(actualQueryPlan, planNode)); @@ -377,76 +440,93 @@ TEST_F(QueryTracerTest, task) { } } -TEST_F(QueryTracerTest, error) { +TEST_F(OperatorTraceTest, error) { const auto planNode = PlanBuilder().values({}).planNode(); - const auto expectedQueryConfigs = - std::unordered_map{ - {core::QueryConfig::kSpillEnabled, "true"}, - {core::QueryConfig::kSpillNumPartitionBits, "17"}, - {core::QueryConfig::kQueryTraceEnabled, "true"}, - }; - const auto queryCtx = core::QueryCtx::create( - executor_.get(), core::QueryConfig(expectedQueryConfigs)); - VELOX_ASSERT_USER_THROW( - AssertQueryBuilder(planNode).queryCtx(queryCtx).maxDrivers(1).copyResults( - pool()), - "Query trace enabled but the trace dir is not set"); -} - -TEST_F(QueryTracerTest, traceDir) { - const auto outputDir = TempDirectoryPath::create(); - const auto rootDir = outputDir->getPath(); - const auto fs = filesystems::getFileSystem(rootDir, nullptr); - auto dir1 = fmt::format("{}/{}", outputDir->getPath(), "t1"); - trace::createTraceDirectory(dir1); - ASSERT_TRUE(fs->exists(dir1)); - - auto dir2 = fmt::format("{}/{}", dir1, "t1_1"); - trace::createTraceDirectory(dir2); - ASSERT_TRUE(fs->exists(dir2)); - - // It will remove the old dir1 along with its subdir when created the dir1 - // again. - trace::createTraceDirectory(dir1); - ASSERT_TRUE(fs->exists(dir1)); - ASSERT_FALSE(fs->exists(dir2)); - - const auto parentDir = fmt::format("{}/{}", outputDir->getPath(), "p"); - fs->mkdir(parentDir); - - constexpr auto numThreads = 5; - std::vector traceThreads; - traceThreads.reserve(numThreads); - std::mutex mutex; - std::set expectedDirs; - for (int i = 0; i < numThreads; ++i) { - traceThreads.emplace_back([&, i]() { - const auto dir = fmt::format("{}/s{}", parentDir, i); - trace::createTraceDirectory(dir); - std::lock_guard l(mutex); - expectedDirs.insert(dir); - }); + // No trace dir. + { + const auto queryConfigs = std::unordered_map{ + {core::QueryConfig::kQueryTraceEnabled, "true"}}; + const auto queryCtx = core::QueryCtx::create( + executor_.get(), core::QueryConfig(queryConfigs)); + VELOX_ASSERT_USER_THROW( + AssertQueryBuilder(planNode) + .queryCtx(queryCtx) + .maxDrivers(1) + .copyResults(pool()), + "Query trace enabled but the trace dir is not set"); } - - for (auto& traceThread : traceThreads) { - traceThread.join(); + // Duplicate trace plan node ids. + { + const auto queryConfigs = std::unordered_map{ + {core::QueryConfig::kQueryTraceEnabled, "true"}, + {core::QueryConfig::kQueryTraceDir, "traceDir"}, + }; + const auto queryCtx = core::QueryCtx::create( + executor_.get(), core::QueryConfig(queryConfigs)); + VELOX_ASSERT_USER_THROW( + AssertQueryBuilder(planNode) + .queryCtx(queryCtx) + .maxDrivers(1) + .copyResults(pool()), + "Query trace enabled but the trace task regexp is not set"); } - - const auto actualDirs = fs->list(parentDir); - ASSERT_EQ(actualDirs.size(), numThreads); - ASSERT_EQ(actualDirs.size(), expectedDirs.size()); - for (const auto& dir : actualDirs) { - ASSERT_EQ(expectedDirs.count(dir), 1); + // No trace plan node ids. + { + const auto queryConfigs = std::unordered_map{ + {core::QueryConfig::kQueryTraceEnabled, "true"}, + {core::QueryConfig::kQueryTraceDir, "traceDir"}, + {core::QueryConfig::kQueryTraceTaskRegExp, ".*"}}; + const auto queryCtx = core::QueryCtx::create( + executor_.get(), core::QueryConfig(queryConfigs)); + VELOX_ASSERT_USER_THROW( + AssertQueryBuilder(planNode) + .queryCtx(queryCtx) + .maxDrivers(1) + .copyResults(pool()), + "Query trace nodes are not set"); + } + // Duplicate trace plan node ids. + { + const auto queryConfigs = std::unordered_map{ + {core::QueryConfig::kQueryTraceEnabled, "true"}, + {core::QueryConfig::kQueryTraceDir, "traceDir"}, + {core::QueryConfig::kQueryTraceTaskRegExp, ".*"}, + {core::QueryConfig::kQueryTraceNodeIds, "1,1"}, + }; + const auto queryCtx = core::QueryCtx::create( + executor_.get(), core::QueryConfig(queryConfigs)); + VELOX_ASSERT_USER_THROW( + AssertQueryBuilder(planNode) + .queryCtx(queryCtx) + .maxDrivers(1) + .copyResults(pool()), + "Duplicate trace nodes found: 1, 1"); + } + // Nonexist trace plan node id. + { + const auto queryConfigs = std::unordered_map{ + {core::QueryConfig::kQueryTraceEnabled, "true"}, + {core::QueryConfig::kQueryTraceDir, "traceDir"}, + {core::QueryConfig::kQueryTraceTaskRegExp, ".*"}, + {core::QueryConfig::kQueryTraceNodeIds, "nonexist"}, + }; + const auto queryCtx = core::QueryCtx::create( + executor_.get(), core::QueryConfig(queryConfigs)); + VELOX_ASSERT_USER_THROW( + AssertQueryBuilder(planNode) + .queryCtx(queryCtx) + .maxDrivers(1) + .copyResults(pool()), + "Trace plan nodes not found from task"); } } -TEST_F(QueryTracerTest, traceTableWriter) { - const auto rowType = ROW({"a", "b", "c"}, {BIGINT(), BIGINT(), BIGINT()}); +TEST_F(OperatorTraceTest, traceTableWriter) { std::vector inputVectors; constexpr auto numBatch = 5; inputVectors.reserve(numBatch); for (auto i = 0; i < numBatch; ++i) { - inputVectors.push_back(vectorFuzzer_.fuzzInputFlatRow(rowType)); + inputVectors.push_back(vectorFuzzer_.fuzzInputFlatRow(dataType_)); } struct { @@ -465,7 +545,7 @@ TEST_F(QueryTracerTest, traceTableWriter) { } } testSettings[]{ {".*", 10UL << 30, numBatch, false}, - {".*", 0, numBatch, false}, + {".*", 0, numBatch, true}, {"wrong id", 10UL << 30, 0, false}, {"test_cursor \\d+", 10UL << 30, numBatch, false}, {"test_cursor \\d+", 800, 2, true}}; @@ -481,6 +561,23 @@ TEST_F(QueryTracerTest, traceTableWriter) { const auto traceRoot = fmt::format("{}/{}", testDir->getPath(), "traceRoot"); std::shared_ptr task; + if (testData.limitExceeded) { + VELOX_ASSERT_THROW( + AssertQueryBuilder(planNode) + .maxDrivers(1) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceRoot) + .config( + core::QueryConfig::kQueryTraceMaxBytes, + testData.maxTracedBytes) + .config( + core::QueryConfig::kQueryTraceTaskRegExp, + testData.taskRegExpr) + .config(core::QueryConfig::kQueryTraceNodeIds, "1") + .copyResults(pool(), task), + "Query exceeded per-query local trace limit of"); + continue; + } AssertQueryBuilder(planNode) .maxDrivers(1) .config(core::QueryConfig::kQueryTraceEnabled, true) @@ -490,8 +587,8 @@ TEST_F(QueryTracerTest, traceTableWriter) { .config(core::QueryConfig::kQueryTraceNodeIds, "1") .copyResults(pool(), task); - const auto metadataDir = fmt::format("{}/{}", traceRoot, task->taskId()); - const auto fs = filesystems::getFileSystem(metadataDir, nullptr); + const auto taskTraceDir = getTaskTraceDirectory(traceRoot, *task); + const auto fs = filesystems::getFileSystem(taskTraceDir, nullptr); if (testData.taskRegExpr == "wrong id") { ASSERT_FALSE(fs->exists(traceRoot)); @@ -499,34 +596,16 @@ TEST_F(QueryTracerTest, traceTableWriter) { } // Query metadta file should exist. - const auto traceMetaFile = fmt::format( - "{}/{}/{}", - traceRoot, - task->taskId(), - trace::QueryTraceTraits::kQueryMetaFileName); - ASSERT_TRUE(fs->exists(traceMetaFile)); - - const auto dataDir = - fmt::format("{}/{}/{}", traceRoot, task->taskId(), "1/0/0/data"); - - // Query data tracing disabled. - if (testData.maxTracedBytes == 0) { - ASSERT_FALSE(fs->exists(dataDir)); - continue; - } + const auto traceMetaFilePath = getTaskTraceMetaFilePath(taskTraceDir); + ASSERT_TRUE(fs->exists(traceMetaFilePath)); - ASSERT_EQ(fs->list(dataDir).size(), 2); - // Check data summaries. - const auto summaryFile = fs->openFileForRead( - fmt::format("{}/{}", dataDir, QueryTraceTraits::kDataSummaryFileName)); - const auto summary = summaryFile->pread(0, summaryFile->size()); - ASSERT_FALSE(summary.empty()); - folly::dynamic obj = folly::parseJson(summary); - ASSERT_EQ( - obj[QueryTraceTraits::kTraceLimitExceededKey].asBool(), - testData.limitExceeded); + const auto opTraceDir = getOpTraceDirectory(taskTraceDir, "1", 0, 0); + + ASSERT_EQ(fs->list(opTraceDir).size(), 2); - const auto reader = trace::QueryDataReader(dataDir, rowType, pool()); + const auto summary = OperatorTraceSummaryReader(opTraceDir, pool()).read(); + const auto reader = + trace::OperatorTraceInputReader(opTraceDir, dataType_, pool()); RowVectorPtr actual; size_t numOutputVectors{0}; while (reader.read(actual)) { diff --git a/velox/exec/tests/TraceUtilTest.cpp b/velox/exec/tests/TraceUtilTest.cpp new file mode 100644 index 000000000000..5dbbdae535cb --- /dev/null +++ b/velox/exec/tests/TraceUtilTest.cpp @@ -0,0 +1,186 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/file/FileSystems.h" +#include "velox/exec/Trace.h" +#include "velox/exec/TraceUtil.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" + +using namespace facebook::velox::exec::test; + +namespace facebook::velox::exec::trace::test { +class TraceUtilTest : public testing::Test { + protected: + static void SetUpTestCase() { + filesystems::registerLocalFileSystem(); + } +}; + +TEST_F(TraceUtilTest, traceDir) { + const auto outputDir = TempDirectoryPath::create(); + const auto rootDir = outputDir->getPath(); + const auto fs = filesystems::getFileSystem(rootDir, nullptr); + auto dir1 = fmt::format("{}/{}", outputDir->getPath(), "t1"); + trace::createTraceDirectory(dir1); + ASSERT_TRUE(fs->exists(dir1)); + + auto dir2 = fmt::format("{}/{}", dir1, "t1_1"); + trace::createTraceDirectory(dir2); + ASSERT_TRUE(fs->exists(dir2)); + + // It will remove the old dir1 along with its subdir when created the dir1 + // again. + trace::createTraceDirectory(dir1); + ASSERT_TRUE(fs->exists(dir1)); + ASSERT_FALSE(fs->exists(dir2)); + + const auto parentDir = fmt::format("{}/{}", outputDir->getPath(), "p"); + fs->mkdir(parentDir); + + constexpr auto numThreads = 5; + std::vector traceThreads; + traceThreads.reserve(numThreads); + std::mutex mutex; + std::set expectedDirs; + for (int i = 0; i < numThreads; ++i) { + traceThreads.emplace_back([&, i]() { + const auto dir = fmt::format("{}/s{}", parentDir, i); + trace::createTraceDirectory(dir); + std::lock_guard l(mutex); + expectedDirs.insert(dir); + }); + } + + for (auto& traceThread : traceThreads) { + traceThread.join(); + } + + const auto actualDirs = fs->list(parentDir); + ASSERT_EQ(actualDirs.size(), numThreads); + ASSERT_EQ(actualDirs.size(), expectedDirs.size()); + for (const auto& dir : actualDirs) { + ASSERT_EQ(expectedDirs.count(dir), 1); + } +} + +TEST_F(TraceUtilTest, OperatorTraceSummary) { + exec::trace::OperatorTraceSummary summary; + summary.opType = "summary"; + summary.inputRows = 100; + summary.peakMemory = 200; + ASSERT_EQ( + summary.toString(), "opType summary, inputRows 100, peakMemory 200B"); +} + +TEST_F(TraceUtilTest, traceDirectoryLayoutUtilities) { + const std::string traceRoot = "/traceRoot"; + const std::string queryId = "queryId"; + ASSERT_EQ( + getQueryTraceDirectory(traceRoot, queryId), + fmt::format("{}/{}", traceRoot, queryId)); + const std::string taskId = "taskId"; + const std::string taskTraceDir = + getTaskTraceDirectory(traceRoot, queryId, taskId); + ASSERT_EQ(taskTraceDir, fmt::format("{}/{}/{}", traceRoot, queryId, taskId)); + ASSERT_EQ( + getTaskTraceMetaFilePath( + getTaskTraceDirectory(traceRoot, queryId, taskId)), + "/traceRoot/queryId/taskId/task_trace_meta.json"); + const std::string nodeId = "1"; + const std::string nodeTraceDir = getNodeTraceDirectory(taskTraceDir, nodeId); + ASSERT_EQ(nodeTraceDir, "/traceRoot/queryId/taskId/1"); + const uint32_t pipelineId = 1; + ASSERT_EQ( + getPipelineTraceDirectory(nodeTraceDir, pipelineId), + "/traceRoot/queryId/taskId/1/1"); + const uint32_t driverId = 1; + const std::string opTraceDir = + getOpTraceDirectory(taskTraceDir, nodeId, pipelineId, driverId); + ASSERT_EQ(opTraceDir, "/traceRoot/queryId/taskId/1/1/1"); + ASSERT_EQ( + getOpTraceDirectory(nodeTraceDir, pipelineId, driverId), + "/traceRoot/queryId/taskId/1/1/1"); + ASSERT_EQ( + getOpTraceInputFilePath(opTraceDir), + "/traceRoot/queryId/taskId/1/1/1/op_input_trace.data"); + ASSERT_EQ( + getOpTraceSummaryFilePath(opTraceDir), + "/traceRoot/queryId/taskId/1/1/1/op_trace_summary.json"); +} + +TEST_F(TraceUtilTest, getTaskIds) { + const auto rootDir = TempDirectoryPath::create(); + const auto rootPath = rootDir->getPath(); + const auto fs = filesystems::getFileSystem(rootPath, nullptr); + const std::string queryId = "queryId"; + fs->mkdir(trace::getQueryTraceDirectory(rootPath, queryId)); + ASSERT_TRUE(getTaskIds(rootPath, queryId, fs).empty()); + const std::string taskId1 = "task1"; + fs->mkdir(trace::getTaskTraceDirectory(rootPath, queryId, taskId1)); + const std::string taskId2 = "task2"; + fs->mkdir(trace::getTaskTraceDirectory(rootPath, queryId, taskId2)); + auto taskIds = getTaskIds(rootPath, queryId, fs); + ASSERT_EQ(taskIds.size(), 2); + std::set taskIdSet({taskId1, taskId2}); + ASSERT_EQ(*taskIds.begin(), taskId1); + ASSERT_EQ(*taskIds.rbegin(), taskId2); +} + +TEST_F(TraceUtilTest, getDriverIds) { + const auto rootDir = TempDirectoryPath::create(); + const auto rootPath = rootDir->getPath(); + const auto fs = filesystems::getFileSystem(rootPath, nullptr); + const std::string queryId = "queryId"; + fs->mkdir(trace::getQueryTraceDirectory(rootPath, queryId)); + ASSERT_TRUE(getTaskIds(rootPath, queryId, fs).empty()); + const std::string taskId = "task"; + const std::string taskTraceDir = + trace::getTaskTraceDirectory(rootPath, queryId, taskId); + fs->mkdir(taskTraceDir); + const std::string nodeId = "node"; + const std::string nodeTraceDir = + trace::getNodeTraceDirectory(taskTraceDir, nodeId); + fs->mkdir(nodeTraceDir); + const uint32_t pipelineId = 1; + fs->mkdir(trace::getPipelineTraceDirectory(nodeTraceDir, pipelineId)); + ASSERT_EQ(getNumDrivers(nodeTraceDir, pipelineId, fs), 0); + ASSERT_TRUE(listDriverIds(nodeTraceDir, pipelineId, fs).empty()); + // create 3 drivers. + const uint32_t driverId1 = 1; + fs->mkdir(trace::getOpTraceDirectory(nodeTraceDir, pipelineId, driverId1)); + const uint32_t driverId2 = 2; + fs->mkdir(trace::getOpTraceDirectory(nodeTraceDir, pipelineId, driverId2)); + const uint32_t driverId3 = 3; + fs->mkdir(trace::getOpTraceDirectory(nodeTraceDir, pipelineId, driverId3)); + ASSERT_EQ(getNumDrivers(nodeTraceDir, pipelineId, fs), 3); + auto driverIds = listDriverIds(nodeTraceDir, pipelineId, fs); + ASSERT_EQ(driverIds.size(), 3); + std::sort(driverIds.begin(), driverIds.end()); + ASSERT_EQ(driverIds[0], driverId1); + ASSERT_EQ(driverIds[1], driverId2); + ASSERT_EQ(driverIds[2], driverId3); + // Bad driver id. + const std::string BadDriverId = "badDriverId"; + fs->mkdir(fmt::format("{}/{}/{}", nodeTraceDir, pipelineId, BadDriverId)); + ASSERT_ANY_THROW(getNumDrivers(nodeTraceDir, pipelineId, fs)); + ASSERT_ANY_THROW(listDriverIds(nodeTraceDir, pipelineId, fs)); +} +} // namespace facebook::velox::exec::trace::test diff --git a/velox/exec/tests/utils/OperatorTestBase.h b/velox/exec/tests/utils/OperatorTestBase.h index 4e5def101cfa..079bfc2ff6b0 100644 --- a/velox/exec/tests/utils/OperatorTestBase.h +++ b/velox/exec/tests/utils/OperatorTestBase.h @@ -24,8 +24,6 @@ #include "velox/exec/HashProbe.h" #include "velox/exec/tests/utils/QueryAssertions.h" #include "velox/parse/ExpressionsParser.h" -#include "velox/type/Variant.h" -#include "velox/vector/FlatVector.h" #include "velox/vector/tests/utils/VectorMaker.h" #include "velox/vector/tests/utils/VectorTestBase.h" diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index 57d12d57bd13..5deed1140f4a 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -222,9 +222,10 @@ PlanBuilder& PlanBuilder::values( PlanBuilder& PlanBuilder::traceScan( const std::string& traceNodeDir, + uint32_t pipelineId, const RowTypePtr& outputType) { - planNode_ = std::make_shared( - nextPlanNodeId(), traceNodeDir, outputType); + planNode_ = std::make_shared( + nextPlanNodeId(), traceNodeDir, pipelineId, outputType); return *this; } diff --git a/velox/exec/tests/utils/PlanBuilder.h b/velox/exec/tests/utils/PlanBuilder.h index 341c25b06767..5af2e9f91ff7 100644 --- a/velox/exec/tests/utils/PlanBuilder.h +++ b/velox/exec/tests/utils/PlanBuilder.h @@ -322,9 +322,12 @@ class PlanBuilder { /// Adds a QueryReplayNode for query tracing. /// /// @param traceNodeDir The trace directory for a given plan node. + /// @param pipelineId The pipeline id for the traced operator instantiated + /// from the given plan node. /// @param outputType The type of the tracing data. PlanBuilder& traceScan( const std::string& traceNodeDir, + uint32_t pipelineId, const RowTypePtr& outputType); /// Add an ExchangeNode. diff --git a/velox/tool/trace/AggregationReplayer.cpp b/velox/tool/trace/AggregationReplayer.cpp index 54041bf8bbcb..1c397461a8a5 100644 --- a/velox/tool/trace/AggregationReplayer.cpp +++ b/velox/tool/trace/AggregationReplayer.cpp @@ -15,7 +15,7 @@ */ #include "velox/tool/trace/AggregationReplayer.h" -#include "velox/exec/QueryDataReader.h" +#include "velox/exec/OperatorTraceReader.h" #include "velox/exec/tests/utils/PlanBuilder.h" using namespace facebook::velox; diff --git a/velox/tool/trace/AggregationReplayer.h b/velox/tool/trace/AggregationReplayer.h index 21ce2c409acf..9a14688a694a 100644 --- a/velox/tool/trace/AggregationReplayer.h +++ b/velox/tool/trace/AggregationReplayer.h @@ -24,13 +24,15 @@ namespace facebook::velox::tool::trace { class AggregationReplayer : public OperatorReplayerBase { public: AggregationReplayer( - const std::string& rootDir, + const std::string& traceDir, + const std::string& queryId, const std::string& taskId, const std::string& nodeId, const int32_t pipelineId, const std::string& operatorType) : OperatorReplayerBase( - rootDir, + traceDir, + queryId, taskId, nodeId, pipelineId, diff --git a/velox/tool/trace/CMakeLists.txt b/velox/tool/trace/CMakeLists.txt index 3218b4ee30b0..8c68c9967db3 100644 --- a/velox/tool/trace/CMakeLists.txt +++ b/velox/tool/trace/CMakeLists.txt @@ -33,7 +33,7 @@ velox_link_libraries( glog::glog gflags::gflags) -add_executable(velox_query_replayer QueryReplayer.cpp) +add_executable(velox_query_replayer TraceReplayerMain.cpp) target_link_libraries( velox_query_replayer diff --git a/velox/tool/trace/OperatorReplayerBase.cpp b/velox/tool/trace/OperatorReplayerBase.cpp index b54ad323a447..2083e676f678 100644 --- a/velox/tool/trace/OperatorReplayerBase.cpp +++ b/velox/tool/trace/OperatorReplayerBase.cpp @@ -16,11 +16,9 @@ #include -#include "velox/common/serialization/Serializable.h" #include "velox/core/PlanNode.h" -#include "velox/exec/QueryMetadataReader.h" -#include "velox/exec/QueryTraceTraits.h" -#include "velox/exec/QueryTraceUtil.h" +#include "velox/exec/TaskTraceReader.h" +#include "velox/exec/TraceUtil.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/tool/trace/OperatorReplayerBase.h" @@ -29,30 +27,31 @@ using namespace facebook::velox; namespace facebook::velox::tool::trace { OperatorReplayerBase::OperatorReplayerBase( - std::string rootDir, + std::string traceDir, + std::string queryId, std::string taskId, std::string nodeId, int32_t pipelineId, std::string operatorType) - : taskId_(std::move(taskId)), + : queryId_(std::string(queryId)), + taskId_(std::move(taskId)), nodeId_(std::move(nodeId)), pipelineId_(pipelineId), operatorType_(std::move(operatorType)), - rootDir_(std::move(rootDir)), - taskDir_(fmt::format("{}/{}", rootDir_, taskId_)), - nodeDir_(fmt::format("{}/{}", taskDir_, nodeId_)) { - VELOX_USER_CHECK(!rootDir_.empty()); + taskTraceDir_( + exec::trace::getTaskTraceDirectory(traceDir, queryId_, taskId_)), + nodeTraceDir_(exec::trace::getNodeTraceDirectory(taskTraceDir_, nodeId_)), + fs_(filesystems::getFileSystem(taskTraceDir_, nullptr)), + maxDrivers_(exec::trace::getNumDrivers(nodeTraceDir_, pipelineId_, fs_)) { + VELOX_USER_CHECK(!taskTraceDir_.empty()); VELOX_USER_CHECK(!taskId_.empty()); VELOX_USER_CHECK(!nodeId_.empty()); VELOX_USER_CHECK_GE(pipelineId_, 0); VELOX_USER_CHECK(!operatorType_.empty()); - const auto metadataReader = exec::trace::QueryMetadataReader( - taskDir_, memory::MemoryManager::getInstance()->tracePool()); - metadataReader.read(queryConfigs_, connectorConfigs_, planFragment_); + const auto taskMetaReader = exec::trace::TaskTraceMetadataReader( + taskTraceDir_, memory::MemoryManager::getInstance()->tracePool()); + taskMetaReader.read(queryConfigs_, connectorConfigs_, planFragment_); queryConfigs_[core::QueryConfig::kQueryTraceEnabled] = "false"; - fs_ = filesystems::getFileSystem(rootDir_, nullptr); - maxDrivers_ = - exec::trace::getNumDrivers(rootDir_, taskId_, nodeId_, pipelineId_, fs_); } RowVectorPtr OperatorReplayerBase::run() { @@ -69,7 +68,10 @@ core::PlanNodePtr OperatorReplayerBase::createPlan() const { planFragment_.get(), [this](const core::PlanNode* node) { return node->id() == nodeId_; }); return exec::test::PlanBuilder() - .traceScan(nodeDir_, exec::trace::getDataType(planFragment_, nodeId_)) + .traceScan( + nodeTraceDir_, + pipelineId_, + exec::trace::getDataType(planFragment_, nodeId_)) .addNode(replayNodeFactory(replayNode)) .planNode(); } diff --git a/velox/tool/trace/OperatorReplayerBase.h b/velox/tool/trace/OperatorReplayerBase.h index 863a8f5f80f6..50e31fdaf02f 100644 --- a/velox/tool/trace/OperatorReplayerBase.h +++ b/velox/tool/trace/OperatorReplayerBase.h @@ -27,7 +27,8 @@ namespace facebook::velox::tool::trace { class OperatorReplayerBase { public: OperatorReplayerBase( - std::string rootDir, + std::string traceDir, + std::string queryId, std::string taskId, std::string nodeId, int32_t pipelineId, @@ -50,21 +51,21 @@ class OperatorReplayerBase { core::PlanNodePtr createPlan() const; + const std::string queryId_; const std::string taskId_; const std::string nodeId_; const int32_t pipelineId_; const std::string operatorType_; - const std::string rootDir_; - const std::string taskDir_; - const std::string nodeDir_; + const std::string taskTraceDir_; + const std::string nodeTraceDir_; + const std::shared_ptr fs_; + const int32_t maxDrivers_; std::unordered_map queryConfigs_; std::unordered_map> connectorConfigs_; core::PlanNodePtr planFragment_; - std::shared_ptr fs_; - int32_t maxDrivers_{1}; private: std::function diff --git a/velox/tool/trace/PartitionedOutputReplayer.cpp b/velox/tool/trace/PartitionedOutputReplayer.cpp index 79874caefc93..56b4c0a145eb 100644 --- a/velox/tool/trace/PartitionedOutputReplayer.cpp +++ b/velox/tool/trace/PartitionedOutputReplayer.cpp @@ -18,7 +18,7 @@ #include "velox/common/memory/Memory.h" #include "velox/exec/PartitionedOutput.h" -#include "velox/exec/QueryTraceUtil.h" +#include "velox/exec/TraceUtil.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/tool/trace/PartitionedOutputReplayer.h" @@ -105,13 +105,20 @@ void consumeAllData( } PartitionedOutputReplayer::PartitionedOutputReplayer( - const std::string& rootDir, + const std::string& traceDir, + const std::string& queryId, const std::string& taskId, const std::string& nodeId, const int32_t pipelineId, const std::string& operatorType, const ConsumerCallBack& consumerCb) - : OperatorReplayerBase(rootDir, taskId, nodeId, pipelineId, operatorType), + : OperatorReplayerBase( + traceDir, + queryId, + taskId, + nodeId, + pipelineId, + operatorType), originalNode_(dynamic_cast( core::PlanNode::findFirstNode( planFragment_.get(), diff --git a/velox/tool/trace/PartitionedOutputReplayer.h b/velox/tool/trace/PartitionedOutputReplayer.h index 61610aa031cc..a90c222dbf65 100644 --- a/velox/tool/trace/PartitionedOutputReplayer.h +++ b/velox/tool/trace/PartitionedOutputReplayer.h @@ -41,7 +41,8 @@ class PartitionedOutputReplayer final : public OperatorReplayerBase { std::function)>; PartitionedOutputReplayer( - const std::string& rootDir, + const std::string& traceDir, + const std::string& queryId, const std::string& taskId, const std::string& nodeId, const int32_t pipelineId, diff --git a/velox/tool/trace/TableWriterReplayer.cpp b/velox/tool/trace/TableWriterReplayer.cpp index 3543011aed9f..405071da433e 100644 --- a/velox/tool/trace/TableWriterReplayer.cpp +++ b/velox/tool/trace/TableWriterReplayer.cpp @@ -16,8 +16,8 @@ #include -#include "velox/exec/QueryTraceUtil.h" #include "velox/exec/TableWriter.h" +#include "velox/exec/TraceUtil.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/tool/trace/TableWriterReplayer.h" diff --git a/velox/tool/trace/TableWriterReplayer.h b/velox/tool/trace/TableWriterReplayer.h index c0c675ba6333..8684be91478d 100644 --- a/velox/tool/trace/TableWriterReplayer.h +++ b/velox/tool/trace/TableWriterReplayer.h @@ -27,13 +27,20 @@ namespace facebook::velox::tool::trace { class TableWriterReplayer final : public OperatorReplayerBase { public: TableWriterReplayer( - const std::string& rootDir, + const std::string& traceDir, + const std::string& queryId, const std::string& taskId, const std::string& nodeId, const int32_t pipelineId, const std::string& operatorType, const std::string& replayOutputDir) - : OperatorReplayerBase(rootDir, taskId, nodeId, pipelineId, operatorType), + : OperatorReplayerBase( + traceDir, + queryId, + taskId, + nodeId, + pipelineId, + operatorType), replayOutputDir_(replayOutputDir) { VELOX_CHECK(!replayOutputDir_.empty()); } diff --git a/velox/tool/trace/QueryReplayer.cpp b/velox/tool/trace/TraceReplayRunner.cpp similarity index 57% rename from velox/tool/trace/QueryReplayer.cpp rename to velox/tool/trace/TraceReplayRunner.cpp index a0680d30a387..b2cd0d4a0be3 100644 --- a/velox/tool/trace/QueryReplayer.cpp +++ b/velox/tool/trace/TraceReplayRunner.cpp @@ -14,7 +14,8 @@ * limitations under the License. */ -#include +#include "velox/tool/trace/TraceReplayRunner.h" + #include #include "velox/common/file/FileSystems.h" @@ -32,12 +33,12 @@ #include "velox/dwio/dwrf/RegisterDwrfWriter.h" #include "velox/dwio/parquet/RegisterParquetReader.h" #include "velox/dwio/parquet/RegisterParquetWriter.h" +#include "velox/exec/OperatorTraceReader.h" #include "velox/exec/PartitionFunction.h" -#include "velox/exec/QueryTraceUtil.h" -#include "velox/expression/Expr.h" +#include "velox/exec/TaskTraceReader.h" +#include "velox/exec/TraceUtil.h" #include "velox/functions/prestosql/aggregates/RegisterAggregateFunctions.h" #include "velox/functions/prestosql/registration/RegistrationFunctions.h" -#include "velox/parse/ExpressionsParser.h" #include "velox/parse/TypeResolver.h" #include "velox/tool/trace/AggregationReplayer.h" #include "velox/tool/trace/OperatorReplayerBase.h" @@ -57,6 +58,7 @@ DEFINE_bool( "It also print the query metadata including query configs, connectors " "properties, and query plan in JSON format."); DEFINE_bool(short_summary, false, "Only show number of tasks and task ids"); +DEFINE_string(query_id, "", "Specify the target query id which must be set"); DEFINE_string( task_id, "", @@ -74,61 +76,18 @@ DEFINE_double( 2.0, "Hardware multipler for hive connector."); -using namespace facebook::velox; - +namespace facebook::velox::tool::trace { namespace { -void init() { - memory::initializeMemoryManager({}); - filesystems::registerLocalFileSystem(); - filesystems::registerS3FileSystem(); - filesystems::registerHdfsFileSystem(); - filesystems::registerGCSFileSystem(); - filesystems::abfs::registerAbfsFileSystem(); - - dwio::common::registerFileSinks(); - dwrf::registerDwrfReaderFactory(); - dwrf::registerDwrfWriterFactory(); - parquet::registerParquetReaderFactory(); - parquet::registerParquetWriterFactory(); - - core::PlanNode::registerSerDe(); - core::ITypedExpr::registerSerDe(); - common::Filter::registerSerDe(); - Type::registerSerDe(); - exec::registerPartitionFunctionSerDe(); - if (!isRegisteredVectorSerde()) { - serializer::presto::PrestoVectorSerde::registerVectorSerde(); - } - connector::hive::HiveTableHandle::registerSerDe(); - connector::hive::LocationHandle::registerSerDe(); - connector::hive::HiveColumnHandle::registerSerDe(); - connector::hive::HiveInsertTableHandle::registerSerDe(); - connector::hive::HiveConnectorSplit::registerSerDe(); - - functions::prestosql::registerAllScalarFunctions(); - aggregate::prestosql::registerAllAggregateFunctions(); - parse::registerTypeResolver(); - - // TODO: make it configurable. - const auto ioExecutor = std::make_unique( - std::thread::hardware_concurrency() * - FLAGS_hiveConnectorExecutorHwMultiplier); - connector::registerConnectorFactory( - std::make_shared()); - const auto hiveConnector = - connector::getConnectorFactory("hive")->newConnector( - "test-hive", - std::make_shared( - std::unordered_map()), - ioExecutor.get()); - connector::registerConnector(hiveConnector); -} std::unique_ptr createReplayer() { std::unique_ptr replayer; if (FLAGS_operator_type == "TableWriter") { + VELOX_USER_CHECK( + !FLAGS_table_writer_output_dir.empty(), + "--table_writer_output_dir is required"); replayer = std::make_unique( FLAGS_root_dir, + FLAGS_query_id, FLAGS_task_id, FLAGS_node_id, FLAGS_pipeline_id, @@ -137,6 +96,7 @@ std::unique_ptr createReplayer() { } else if (FLAGS_operator_type == "Aggregation") { replayer = std::make_unique( FLAGS_root_dir, + FLAGS_query_id, FLAGS_task_id, FLAGS_node_id, FLAGS_pipeline_id, @@ -144,6 +104,7 @@ std::unique_ptr createReplayer() { } else if (FLAGS_operator_type == "PartitionedOutput") { replayer = std::make_unique( FLAGS_root_dir, + FLAGS_query_id, FLAGS_task_id, FLAGS_node_id, FLAGS_pipeline_id, @@ -155,76 +116,175 @@ std::unique_ptr createReplayer() { return replayer; } +void printTaskMetadata( + const std::string& taskTraceDir, + memory::MemoryPool* pool, + std::ostringstream& oss) { + auto taskMetaReader = std::make_unique( + taskTraceDir, pool); + std::unordered_map queryConfigs; + std::unordered_map> + connectorProperties; + core::PlanNodePtr queryPlan; + taskMetaReader->read(queryConfigs, connectorProperties, queryPlan); + + oss << "\n++++++Query configs++++++\n"; + for (const auto& queryConfigEntry : queryConfigs) { + oss << "\t" << queryConfigEntry.first << ": " << queryConfigEntry.second + << "\n"; + } + oss << "\n++++++Connector configs++++++\n"; + for (const auto& connectorPropertyEntry : connectorProperties) { + oss << connectorPropertyEntry.first << "\n"; + for (const auto& propertyEntry : connectorPropertyEntry.second) { + oss << "\t" << propertyEntry.first << ": " << propertyEntry.second + << "\n"; + } + } + oss << "\n++++++Task query plan++++++\n"; + oss << queryPlan->toString(true, true); +} + +void printTaskTraceSummary( + const std::string& traceDir, + const std::string& queryId, + const std::string& taskId, + const std::string& nodeId, + uint32_t pipelineId, + memory::MemoryPool* pool, + std::ostringstream& oss) { + auto fs = filesystems::getFileSystem(traceDir, nullptr); + const auto taskTraceDir = + exec::trace::getTaskTraceDirectory(traceDir, queryId, taskId); + + const std::vector driverIds = exec::trace::listDriverIds( + exec::trace::getNodeTraceDirectory(taskTraceDir, nodeId), pipelineId, fs); + oss << "\n++++++Task " << taskId << "++++++\n"; + for (const auto& driverId : driverIds) { + const auto opTraceDir = exec::trace::getOpTraceDirectory( + taskTraceDir, nodeId, pipelineId, driverId); + const auto opTraceSummary = + exec::trace::OperatorTraceSummaryReader( + exec::trace::getOpTraceDirectory( + taskTraceDir, nodeId, pipelineId, driverId), + pool) + .read(); + oss << driverId << " driver, " << opTraceSummary.toString() << "\n"; + } +} + void printSummary( const std::string& rootDir, + const std::string& queryId, const std::string& taskId, - bool shortSummary) { - const auto fs = filesystems::getFileSystem(rootDir, nullptr); - const auto taskIds = exec::trace::getTaskIds(rootDir, fs); - if (taskIds.empty()) { - LOG(ERROR) << "No traced query task under " << rootDir; - return; - } + bool shortSummary, + memory::MemoryPool* pool) { + const std::string queryDir = + exec::trace::getQueryTraceDirectory(rootDir, queryId); + const auto fs = filesystems::getFileSystem(queryDir, nullptr); + const auto taskIds = exec::trace::getTaskIds(rootDir, queryId, fs); + VELOX_USER_CHECK(!taskIds.empty(), "No task found under {}", rootDir); std::ostringstream summary; summary << "\n++++++Query trace summary++++++\n"; summary << "Number of tasks: " << taskIds.size() << "\n"; - summary << "Task ids: " << folly::join(",", taskIds); - if (shortSummary) { + summary << "Task ids: " << folly::join("\n", taskIds); LOG(INFO) << summary.str(); return; } const auto summaryTaskIds = taskId.empty() ? taskIds : std::vector{taskId}; + printTaskMetadata( + exec::trace::getTaskTraceDirectory(rootDir, queryId, summaryTaskIds[0]), + pool, + summary); + summary << "\n++++++Task Summaries++++++\n"; for (const auto& taskId : summaryTaskIds) { - summary << "\n++++++Query configs and plan of task " << taskId - << ":++++++\n"; - const auto traceTaskDir = fmt::format("{}/{}", rootDir, taskId); - const auto queryMetaFile = fmt::format( - "{}/{}", - traceTaskDir, - exec::trace::QueryTraceTraits::kQueryMetaFileName); - const auto metaObj = exec::trace::getMetadata(queryMetaFile, fs); - const auto& configObj = - metaObj[exec::trace::QueryTraceTraits::kQueryConfigKey]; - summary << "++++++Query configs++++++\n"; - summary << folly::toJson(configObj) << "\n"; - summary << "++++++Query plan++++++\n"; - const auto queryPlan = ISerializable::deserialize( - metaObj[exec::trace::QueryTraceTraits::kPlanNodeKey], - memory::MemoryManager::getInstance()->tracePool()); - summary << queryPlan->toString(true, true); + printTaskTraceSummary( + rootDir, + queryId, + taskId, + FLAGS_node_id, + FLAGS_pipeline_id, + pool, + summary); } LOG(INFO) << summary.str(); } } // namespace -int main(int argc, char** argv) { - if (argc == 1) { - gflags::ShowUsageWithFlags(argv[0]); - return -1; - } +TraceReplayRunner::TraceReplayRunner() + : ioExecutor_(std::make_unique( + std::thread::hardware_concurrency() * + FLAGS_hiveConnectorExecutorHwMultiplier, + std::make_shared( + "TraceReplayIoConnector"))) {} + +void TraceReplayRunner::init() { + memory::initializeMemoryManager({}); + filesystems::registerLocalFileSystem(); + filesystems::registerS3FileSystem(); + filesystems::registerHdfsFileSystem(); + filesystems::registerGCSFileSystem(); + filesystems::abfs::registerAbfsFileSystem(); + + dwio::common::registerFileSinks(); + dwrf::registerDwrfReaderFactory(); + dwrf::registerDwrfWriterFactory(); + parquet::registerParquetReaderFactory(); + parquet::registerParquetWriterFactory(); - gflags::ParseCommandLineFlags(&argc, &argv, true); - if (FLAGS_root_dir.empty()) { - gflags::SetUsageMessage("--root_dir must be provided."); - gflags::ShowUsageWithFlags(argv[0]); - return -1; + core::PlanNode::registerSerDe(); + core::ITypedExpr::registerSerDe(); + common::Filter::registerSerDe(); + Type::registerSerDe(); + exec::registerPartitionFunctionSerDe(); + if (!isRegisteredVectorSerde()) { + serializer::presto::PrestoVectorSerde::registerVectorSerde(); } + connector::hive::HiveTableHandle::registerSerDe(); + connector::hive::LocationHandle::registerSerDe(); + connector::hive::HiveColumnHandle::registerSerDe(); + connector::hive::HiveInsertTableHandle::registerSerDe(); + connector::hive::HiveConnectorSplit::registerSerDe(); + connector::hive::registerHivePartitionFunctionSerDe(); + connector::hive::HiveBucketProperty::registerSerDe(); - try { - init(); - if (FLAGS_summary || FLAGS_short_summary) { - printSummary(FLAGS_root_dir, FLAGS_task_id, FLAGS_short_summary); - return 0; - } - createReplayer()->run(); - } catch (const VeloxException& e) { - LOG(ERROR) << e.what(); - return -1; + functions::prestosql::registerAllScalarFunctions(); + aggregate::prestosql::registerAllAggregateFunctions(); + parse::registerTypeResolver(); + + connector::registerConnectorFactory( + std::make_shared()); + const auto hiveConnector = + connector::getConnectorFactory("hive")->newConnector( + "test-hive", + std::make_shared( + std::unordered_map()), + ioExecutor_.get()); + connector::registerConnector(hiveConnector); +} + +void TraceReplayRunner::run() { + VELOX_USER_CHECK(!FLAGS_root_dir.empty(), "--root_dir must be provided"); + VELOX_USER_CHECK(!FLAGS_query_id.empty(), "--query_id must be provided"); + VELOX_USER_CHECK(!FLAGS_node_id.empty(), "--node_id must be provided"); + + if (FLAGS_summary || FLAGS_short_summary) { + auto pool = memory::memoryManager()->addLeafPool("replayer"); + printSummary( + FLAGS_root_dir, + FLAGS_query_id, + FLAGS_task_id, + FLAGS_short_summary, + pool.get()); + return; } - return 0; + VELOX_USER_CHECK( + !FLAGS_operator_type.empty(), "--operator_type must be provided"); + createReplayer()->run(); } +} // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/TraceReplayRunner.h b/velox/tool/trace/TraceReplayRunner.h new file mode 100644 index 000000000000..f1f1d7484f62 --- /dev/null +++ b/velox/tool/trace/TraceReplayRunner.h @@ -0,0 +1,54 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include + +DECLARE_string(root_dir); +DECLARE_bool(summary); +DECLARE_bool(short_summary); +DECLARE_string(query_id); +DECLARE_string(task_id); +DECLARE_string(node_id); +DECLARE_int32(pipeline_id); +DECLARE_string(operator_type); +DECLARE_string(table_writer_output_dir); +DECLARE_double(hiveConnectorExecutorHwMultiplier); + +namespace facebook::velox::tool::trace { + +/// The trace replay runner. It is configured through a set of gflags passed +/// from replayer tool command line. +class TraceReplayRunner { + public: + TraceReplayRunner(); + virtual ~TraceReplayRunner() = default; + + /// Initializes the trace replay runner by setting the velox runtime + /// environment for the trace replay. It is invoked before run(). + virtual void init(); + + /// Runs the trace replay with a set of gflags passed from replayer tool. + virtual void run(); + + private: + const std::unique_ptr ioExecutor_; +}; + +} // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/TraceReplayerMain.cpp b/velox/tool/trace/TraceReplayerMain.cpp new file mode 100644 index 000000000000..cfca543a04fa --- /dev/null +++ b/velox/tool/trace/TraceReplayerMain.cpp @@ -0,0 +1,28 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/tool/trace/TraceReplayRunner.h" + +#include + +int main(int argc, char** argv) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + + facebook::velox::tool::trace::TraceReplayRunner runner; + runner.init(); + runner.run(); + return 0; +} diff --git a/velox/tool/trace/tests/AggregationReplayerTest.cpp b/velox/tool/trace/tests/AggregationReplayerTest.cpp index 8d3b15abde62..55a71f739eec 100644 --- a/velox/tool/trace/tests/AggregationReplayerTest.cpp +++ b/velox/tool/trace/tests/AggregationReplayerTest.cpp @@ -26,10 +26,10 @@ #include "velox/common/hyperloglog/SparseHll.h" #include "velox/common/testutil/TestValue.h" #include "velox/dwio/dwrf/writer/Writer.h" +#include "velox/exec/OperatorTraceReader.h" #include "velox/exec/PartitionFunction.h" -#include "velox/exec/QueryDataReader.h" -#include "velox/exec/QueryTraceUtil.h" #include "velox/exec/TableWriter.h" +#include "velox/exec/TraceUtil.h" #include "velox/exec/tests/utils/ArbitratorTestUtil.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" @@ -198,10 +198,14 @@ TEST_F(AggregationReplayerTest, test) { .split(makeHiveConnectorSplit(sourceFilePath->getPath())) .copyResults(pool(), task); - const auto replayingResult = - AggregationReplayer( - traceRoot, task->taskId(), traceNodeId_, 0, "Aggregation") - .run(); + const auto replayingResult = AggregationReplayer( + traceRoot, + task->queryCtx()->queryId(), + task->taskId(), + traceNodeId_, + 0, + "Aggregation") + .run(); assertEqualResults({results}, {replayingResult}); } } diff --git a/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp b/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp index b34ead3fb366..d2d9c2c4e49f 100644 --- a/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp +++ b/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp @@ -24,8 +24,8 @@ #include "velox/common/hyperloglog/SparseHll.h" #include "velox/exec/PartitionFunction.h" #include "velox/exec/PartitionedOutput.h" -#include "velox/exec/QueryTraceUtil.h" #include "velox/exec/TableWriter.h" +#include "velox/exec/TraceUtil.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" #include "velox/exec/tests/utils/PlanBuilder.h" @@ -143,10 +143,15 @@ TEST_F(PartitionedOutputReplayerTest, defaultConsumer) { executor_.get(), consumerExecutor.get(), [&](auto /* unused */, auto /* unused */) {}); - ASSERT_NO_THROW( - PartitionedOutputReplayer( - traceRoot, originalTask->taskId(), planNodeId, 0, "PartitionedOutput") - .run()); + + ASSERT_NO_THROW(PartitionedOutputReplayer( + traceRoot, + originalTask->queryCtx()->queryId(), + originalTask->taskId(), + planNodeId, + 0, + "PartitionedOutput") + .run()); } TEST_F(PartitionedOutputReplayerTest, basic) { @@ -223,6 +228,7 @@ TEST_F(PartitionedOutputReplayerTest, basic) { replayedPartitionedResults.resize(testParam.numPartitions); PartitionedOutputReplayer( traceRoot, + originalTask->queryCtx()->queryId(), originalTask->taskId(), planNodeId, 0, diff --git a/velox/tool/trace/tests/TableWriterReplayerTest.cpp b/velox/tool/trace/tests/TableWriterReplayerTest.cpp index ef2c4c54d682..5e4ec3eddfa9 100644 --- a/velox/tool/trace/tests/TableWriterReplayerTest.cpp +++ b/velox/tool/trace/tests/TableWriterReplayerTest.cpp @@ -24,10 +24,10 @@ #include "velox/common/base/Fs.h" #include "velox/common/file/FileSystems.h" #include "velox/common/hyperloglog/SparseHll.h" +#include "velox/exec/OperatorTraceReader.h" #include "velox/exec/PartitionFunction.h" -#include "velox/exec/QueryDataReader.h" -#include "velox/exec/QueryTraceUtil.h" #include "velox/exec/TableWriter.h" +#include "velox/exec/TraceUtil.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" #include "velox/exec/tests/utils/PlanBuilder.h" @@ -297,6 +297,7 @@ TEST_F(TableWriterReplayerTest, basic) { const auto traceOutputDir = TempDirectoryPath::create(); const auto result = TableWriterReplayer( traceRoot, + task->queryCtx()->queryId(), task->taskId(), "1", 0, @@ -422,6 +423,7 @@ TEST_F(TableWriterReplayerTest, partitionWrite) { const auto traceOutputDir = TempDirectoryPath::create(); TableWriterReplayer( traceRoot, + task->queryCtx()->queryId(), task->taskId(), tableWriteNodeId, 0,