diff --git a/velox/common/base/VeloxException.h b/velox/common/base/VeloxException.h index fa8d18e15d09..148ae38f06a2 100644 --- a/velox/common/base/VeloxException.h +++ b/velox/common/base/VeloxException.h @@ -105,6 +105,9 @@ inline constexpr auto kNoCacheSpace = "NO_CACHE_SPACE"_fs; /// An error raised when spill bytes exceeds limits. inline constexpr auto kSpillLimitExceeded = "SPILL_LIMIT_EXCEEDED"_fs; +/// An error raised when trace bytes exceeds limits. +inline constexpr auto kTraceLimitExceeded = "TRACE_LIMIT_EXCEEDED"_fs; + /// Errors indicating file read corruptions. inline constexpr auto kFileCorruption = "FILE_CORRUPTION"_fs; diff --git a/velox/common/file/File.cpp b/velox/common/file/File.cpp index 46156c2f1aa7..6a30f0a26159 100644 --- a/velox/common/file/File.cpp +++ b/velox/common/file/File.cpp @@ -170,9 +170,10 @@ void LocalReadFile::preadInternal(uint64_t offset, uint64_t length, char* pos) VELOX_CHECK_EQ( bytesRead, length, - "fread failure in LocalReadFile::PReadInternal, {} vs {}.", + "fread failure in LocalReadFile::PReadInternal, {} vs {}: {}", bytesRead, - length); + length, + folly::errnoStr(errno)); } std::string_view diff --git a/velox/core/PlanNode.cpp b/velox/core/PlanNode.cpp index 9e3c1fc0fad3..609ad7183513 100644 --- a/velox/core/PlanNode.cpp +++ b/velox/core/PlanNode.cpp @@ -2355,15 +2355,15 @@ folly::dynamic PlanNode::serialize() const { return obj; } -const std::vector& QueryTraceScanNode::sources() const { +const std::vector& TraceScanNode::sources() const { return kEmptySources; } -std::string QueryTraceScanNode::traceDir() const { +std::string TraceScanNode::traceDir() const { return traceDir_; } -void QueryTraceScanNode::addDetails(std::stringstream& stream) const { +void TraceScanNode::addDetails(std::stringstream& stream) const { stream << "Trace dir: " << traceDir_; } diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index 331f3618c286..c893a834fda8 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -314,13 +314,17 @@ class ArrowStreamNode : public PlanNode { std::shared_ptr arrowStream_; }; -class QueryTraceScanNode final : public PlanNode { +class TraceScanNode final : public PlanNode { public: - QueryTraceScanNode( + TraceScanNode( const PlanNodeId& id, const std::string& traceDir, + uint32_t pipelineId, const RowTypePtr& outputType) - : PlanNode(id), traceDir_(traceDir), outputType_(outputType) {} + : PlanNode(id), + traceDir_(traceDir), + pipelineId_(pipelineId), + outputType_(outputType) {} const RowTypePtr& outputType() const override { return outputType_; @@ -333,17 +337,22 @@ class QueryTraceScanNode final : public PlanNode { } folly::dynamic serialize() const override { - VELOX_UNSUPPORTED("QueryReplayScanNode is not serializable"); + VELOX_UNSUPPORTED("TraceScanNode is not serializable"); return nullptr; } std::string traceDir() const; + uint32_t pipelineId() const { + return pipelineId_; + } + private: void addDetails(std::stringstream& stream) const override; // Directory of traced data, which is $traceRoot/$taskId/$nodeId. const std::string traceDir_; + const uint32_t pipelineId_; const RowTypePtr outputType_; }; diff --git a/velox/core/QueryCtx.cpp b/velox/core/QueryCtx.cpp index ee238c081800..19428281d5b6 100644 --- a/velox/core/QueryCtx.cpp +++ b/velox/core/QueryCtx.cpp @@ -87,16 +87,13 @@ void QueryCtx::updateSpilledBytesAndCheckLimit(uint64_t bytes) { } } -bool QueryCtx::updateTracedBytesAndCheckLimit(uint64_t bytes) { - if (numTracedBytes_.fetch_add(bytes) + bytes < +void QueryCtx::updateTracedBytesAndCheckLimit(uint64_t bytes) { + if (numTracedBytes_.fetch_add(bytes) + bytes >= queryConfig_.queryTraceMaxBytes()) { - return false; + VELOX_SPILL_LIMIT_EXCEEDED(fmt::format( + "Query exceeded per-query local trace limit of {}", + succinctBytes(queryConfig_.queryTraceMaxBytes()))); } - - numTracedBytes_.fetch_sub(bytes); - LOG(WARNING) << "Query exceeded trace limit of " - << succinctBytes(queryConfig_.queryTraceMaxBytes()); - return true; } std::unique_ptr QueryCtx::MemoryReclaimer::create( diff --git a/velox/core/QueryCtx.h b/velox/core/QueryCtx.h index c6cc96a3caf2..5e89ccc4c907 100644 --- a/velox/core/QueryCtx.h +++ b/velox/core/QueryCtx.h @@ -122,9 +122,9 @@ class QueryCtx : public std::enable_shared_from_this { /// the max spill bytes limit. void updateSpilledBytesAndCheckLimit(uint64_t bytes); - /// Updates the aggregated trace bytes of this query, and return true if - /// exceeds the max query trace bytes limit. - bool updateTracedBytesAndCheckLimit(uint64_t bytes); + /// Updates the aggregated trace bytes of this query, and throws if exceeds + /// the max query trace bytes limit. + void updateTracedBytesAndCheckLimit(uint64_t bytes); void testingOverrideMemoryPool(std::shared_ptr pool) { pool_ = std::move(pool); diff --git a/velox/docs/develop/debugging/tracing.rst b/velox/docs/develop/debugging/tracing.rst index 1750296503e1..0536546d2a6e 100644 --- a/velox/docs/develop/debugging/tracing.rst +++ b/velox/docs/develop/debugging/tracing.rst @@ -57,7 +57,7 @@ The tracing framework consists of three components: Query Trace Writer ^^^^^^^^^^^^^^^^^^ -**QueryMetadataWriter** records the query metadata during task creation, +**TaskTraceMetadataWriter** records the query metadata during task creation, serializes, and writes them into a file in JSON format. There are two kinds of metadata: @@ -66,20 +66,20 @@ of metadata: - Plan fragment of the task (also known as a plan node tree). It can be serialized as a JSON object, which is already supported in Velox. -**QueryDataWriter** records the input vectors from the target operator, which are +**OperatorTraceWriter** records the input vectors from the target operator, which are serialized and written as a binary file. Query Trace Reader ^^^^^^^^^^^^^^^^^^ -**QueryMetadataReader** can load the query metadata JSON file, and extract the query +**TaskTraceMetadataReader** can load the query metadata JSON file, and extract the query configurations, connector properties, and the plan fragment. **QueryDataReader** can read and deserialize the input vectors of the target operator. It is used as the utility to replay the input data as a source operator in the target operator replay. -**NOTE**: `QueryDataWriter` serializes and flushes the input vectors in batches, +**NOTE**: `OperatorTraceWriter` serializes and flushes the input vectors in batches, allowing it to replay the input process using the same sequence of batches. Query Trace Util diff --git a/velox/exec/CMakeLists.txt b/velox/exec/CMakeLists.txt index 9972f46b3ecb..73ca66635895 100644 --- a/velox/exec/CMakeLists.txt +++ b/velox/exec/CMakeLists.txt @@ -57,13 +57,14 @@ velox_add_library( OrderBy.cpp OutputBuffer.cpp OutputBufferManager.cpp - QueryDataReader.cpp - QueryDataWriter.cpp - QueryMetadataReader.cpp - QueryMetadataWriter.cpp - QueryTraceConfig.cpp - QueryTraceScan.cpp - QueryTraceUtil.cpp + OperatorTraceReader.cpp + OperatorTraceScan.cpp + OperatorTraceWriter.cpp + TaskTraceReader.cpp + TaskTraceWriter.cpp + Trace.cpp + TraceConfig.cpp + TraceUtil.cpp PartitionedOutput.cpp PartitionFunction.cpp PartitionStreamingWindowBuild.cpp diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index 56614cf823fb..27fe4275c572 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -95,8 +95,8 @@ const core::QueryConfig& DriverCtx::queryConfig() const { return task->queryCtx()->queryConfig(); } -const std::optional& DriverCtx::traceConfig() const { - return task->queryTraceConfig(); +const std::optional& DriverCtx::traceConfig() const { + return task->traceConfig(); } velox::memory::MemoryPool* DriverCtx::addOperatorPool( @@ -618,6 +618,7 @@ StopReason Driver::runInternal( lockedStats->addInputVector( resultBytes, intermediateResult->size()); } + nextOp->traceInput(intermediateResult); TestValue::adjust( "facebook::velox::exec::Driver::runInternal::addInput", nextOp); diff --git a/velox/exec/Driver.h b/velox/exec/Driver.h index 2af297dfef11..1605d09e4316 100644 --- a/velox/exec/Driver.h +++ b/velox/exec/Driver.h @@ -27,7 +27,7 @@ #include "velox/common/time/CpuWallTimer.h" #include "velox/core/PlanFragment.h" #include "velox/core/QueryCtx.h" -#include "velox/exec/QueryTraceConfig.h" +#include "velox/exec/TraceConfig.h" namespace facebook::velox::exec { @@ -291,7 +291,7 @@ struct DriverCtx { const core::QueryConfig& queryConfig() const; - const std::optional& traceConfig() const; + const std::optional& traceConfig() const; velox::memory::MemoryPool* addOperatorPool( const core::PlanNodeId& planNodeId, diff --git a/velox/exec/HashAggregation.cpp b/velox/exec/HashAggregation.cpp index b15eab7bf441..ef072d059d2c 100644 --- a/velox/exec/HashAggregation.cpp +++ b/velox/exec/HashAggregation.cpp @@ -117,7 +117,6 @@ bool HashAggregation::abandonPartialAggregationEarly(int64_t numOutput) const { } void HashAggregation::addInput(RowVectorPtr input) { - traceInput(input); if (!pushdownChecked_) { mayPushdown_ = operatorCtx_->driver()->mayPushdownAggregation(this); pushdownChecked_ = true; diff --git a/velox/exec/LocalPlanner.cpp b/velox/exec/LocalPlanner.cpp index e75e489168a4..aec3104be3a2 100644 --- a/velox/exec/LocalPlanner.cpp +++ b/velox/exec/LocalPlanner.cpp @@ -32,9 +32,9 @@ #include "velox/exec/MergeJoin.h" #include "velox/exec/NestedLoopJoinBuild.h" #include "velox/exec/NestedLoopJoinProbe.h" +#include "velox/exec/OperatorTraceScan.h" #include "velox/exec/OrderBy.h" #include "velox/exec/PartitionedOutput.h" -#include "velox/exec/QueryTraceScan.h" #include "velox/exec/RowNumber.h" #include "velox/exec/StreamingAggregation.h" #include "velox/exec/TableScan.h" @@ -595,11 +595,10 @@ std::shared_ptr DriverFactory::createDriver( assignUniqueIdNode->taskUniqueId(), assignUniqueIdNode->uniqueIdCounter())); } else if ( - const auto queryReplayScanNode = - std::dynamic_pointer_cast( - planNode)) { - operators.push_back(std::make_unique( - id, ctx.get(), queryReplayScanNode)); + const auto traceScanNode = + std::dynamic_pointer_cast(planNode)) { + operators.push_back(std::make_unique( + id, ctx.get(), traceScanNode)); } else { std::unique_ptr extended; if (planNode->requiresExchangeClient()) { diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 910e4cec76b2..bb55b790426b 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -19,10 +19,8 @@ #include "velox/common/base/SuccinctPrinter.h" #include "velox/common/testutil/TestValue.h" #include "velox/exec/Driver.h" -#include "velox/exec/HashJoinBridge.h" #include "velox/exec/OperatorUtils.h" -#include "velox/exec/QueryTraceUtil.h" -#include "velox/exec/Task.h" +#include "velox/exec/TraceUtil.h" #include "velox/expression/Expr.h" using facebook::velox::common::testutil::TestValue; @@ -107,17 +105,13 @@ void Operator::maybeSetReclaimer() { } void Operator::maybeSetTracer() { - const auto& queryTraceConfig = operatorCtx_->driverCtx()->traceConfig(); - if (!queryTraceConfig.has_value()) { - return; - } - - if (operatorCtx_->driverCtx()->queryConfig().queryTraceMaxBytes() == 0) { + const auto& traceConfig = operatorCtx_->driverCtx()->traceConfig(); + if (!traceConfig.has_value()) { return; } const auto nodeId = planNodeId(); - if (queryTraceConfig->queryNodes.count(nodeId) == 0) { + if (traceConfig->queryNodes.count(nodeId) == 0) { return; } @@ -134,20 +128,17 @@ void Operator::maybeSetTracer() { const auto pipelineId = operatorCtx_->driverCtx()->pipelineId; const auto driverId = operatorCtx_->driverCtx()->driverId; - LOG(INFO) << "Trace data for operator type: " << operatorType() + LOG(INFO) << "Trace input for operator type: " << operatorType() << ", operator id: " << operatorId() << ", pipeline: " << pipelineId << ", driver: " << driverId << ", task: " << taskId(); - const auto opTraceDirPath = fmt::format( - "{}/{}/{}/{}/data", - queryTraceConfig->queryTraceDir, - planNodeId(), - pipelineId, - driverId); + const auto opTraceDirPath = trace::getOpTraceDirectory( + traceConfig->queryTraceDir, planNodeId(), pipelineId, driverId); trace::createTraceDirectory(opTraceDirPath); - inputTracer_ = std::make_unique( + inputTracer_ = std::make_unique( + this, opTraceDirPath, memory::traceMemoryPool(), - queryTraceConfig->updateAndCheckTraceLimitCB); + traceConfig->updateAndCheckTraceLimitCB); } void Operator::traceInput(const RowVectorPtr& input) { @@ -319,6 +310,16 @@ OperatorStats Operator::stats(bool clear) { return stats; } +void Operator::close() { + input_ = nullptr; + results_.clear(); + recordSpillStats(); + finishTrace(); + + // Release the unused memory reservation on close. + operatorCtx_->pool()->release(); +} + vector_size_t Operator::outputBatchRows( std::optional averageRowSize) const { const auto& queryConfig = operatorCtx_->task()->queryCtx()->queryConfig(); diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index 20561b3e788d..2138fa47e3cc 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -21,7 +21,7 @@ #include "velox/core/PlanNode.h" #include "velox/exec/Driver.h" #include "velox/exec/JoinBridge.h" -#include "velox/exec/QueryDataWriter.h" +#include "velox/exec/OperatorTraceWriter.h" #include "velox/exec/Spiller.h" #include "velox/type/Filter.h" @@ -408,7 +408,6 @@ class Operator : public BaseRuntimeStatWriter { /// e.g. the first operator in the pipeline. virtual void noMoreInput() { noMoreInput_ = true; - finishTrace(); } /// Returns a RowVector with the result columns. Returns nullptr if @@ -483,13 +482,7 @@ class Operator : public BaseRuntimeStatWriter { /// Frees all resources associated with 'this'. No other methods /// should be called after this. - virtual void close() { - input_ = nullptr; - results_.clear(); - recordSpillStats(); - // Release the unused memory reservation on close. - operatorCtx_->pool()->release(); - } + virtual void close(); // Returns true if 'this' never has more output rows than input rows. virtual bool isFilter() const { @@ -781,7 +774,7 @@ class Operator : public BaseRuntimeStatWriter { folly::Synchronized stats_; folly::Synchronized spillStats_; - std::unique_ptr inputTracer_; + std::unique_ptr inputTracer_; /// Indicates if an operator is under a non-reclaimable execution section. /// This prevents the memory arbitrator from reclaiming memory from this diff --git a/velox/exec/OperatorTraceReader.cpp b/velox/exec/OperatorTraceReader.cpp new file mode 100644 index 000000000000..4477717b4042 --- /dev/null +++ b/velox/exec/OperatorTraceReader.cpp @@ -0,0 +1,78 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "velox/exec/OperatorTraceReader.h" + +#include "velox/exec/TraceUtil.h" + +namespace facebook::velox::exec::trace { + +OperatorTraceInputReader::OperatorTraceInputReader( + std::string traceDir, + RowTypePtr dataType, + memory::MemoryPool* pool) + : traceDir_(std::move(traceDir)), + fs_(filesystems::getFileSystem(traceDir_, nullptr)), + dataType_(std::move(dataType)), + pool_(pool), + inputStream_(getInputStream()) { + VELOX_CHECK_NOT_NULL(dataType_); + VELOX_CHECK_NOT_NULL(inputStream_); +} + +bool OperatorTraceInputReader::read(RowVectorPtr& batch) const { + if (inputStream_->atEnd()) { + batch = nullptr; + return false; + } + + VectorStreamGroup::read( + inputStream_.get(), pool_, dataType_, &batch, &readOptions_); + return true; +} + +std::unique_ptr +OperatorTraceInputReader::getInputStream() const { + auto traceFile = fs_->openFileForRead(getOpTraceInputFilePath(traceDir_)); + // TODO: Make the buffer size configurable. + return std::make_unique( + std::move(traceFile), 1 << 20, pool_); +} + +OperatorTraceSummaryReader::OperatorTraceSummaryReader( + std::string traceDir, + memory::MemoryPool* pool) + : traceDir_(std::move(traceDir)), + fs_(filesystems::getFileSystem(traceDir_, nullptr)), + pool_(pool), + summaryFile_(fs_->openFileForRead(getOpTraceSummaryFilePath(traceDir_))) { +} + +OperatorTraceSummary OperatorTraceSummaryReader::read() const { + VELOX_CHECK_NOT_NULL(summaryFile_); + const auto summaryStr = summaryFile_->pread(0, summaryFile_->size()); + VELOX_CHECK(!summaryStr.empty()); + + folly::dynamic summaryObj = folly::parseJson(summaryStr); + OperatorTraceSummary summary; + summary.opType = summaryObj[OperatorTraceTraits::kOpTypeKey].asString(); + summary.peakMemory = summaryObj[OperatorTraceTraits::kPeakMemoryKey].asInt(); + summary.inputRows = summaryObj[OperatorTraceTraits::kInputRowsKey].asInt(); + return summary; +} +} // namespace facebook::velox::exec::trace diff --git a/velox/exec/QueryDataReader.h b/velox/exec/OperatorTraceReader.h similarity index 62% rename from velox/exec/QueryDataReader.h rename to velox/exec/OperatorTraceReader.h index b5e6d24e011a..2236d73fe2f7 100644 --- a/velox/exec/QueryDataReader.h +++ b/velox/exec/OperatorTraceReader.h @@ -18,16 +18,16 @@ #include "velox/common/file/FileInputStream.h" #include "velox/common/file/FileSystems.h" -#include "velox/core/PlanNode.h" -#include "velox/core/QueryCtx.h" +#include "velox/exec/Trace.h" #include "velox/serializers/PrestoSerializer.h" -#include "velox/vector/VectorStream.h" namespace facebook::velox::exec::trace { -class QueryDataReader { +/// Used to read an operator trace input. +class OperatorTraceInputReader { public: - explicit QueryDataReader( + /// 'traceDir' specifies the operator trace directory. + OperatorTraceInputReader( std::string traceDir, RowTypePtr dataType, memory::MemoryPool* pool); @@ -37,16 +37,33 @@ class QueryDataReader { bool read(RowVectorPtr& batch) const; private: - std::unique_ptr getDataInputStream() const; + std::unique_ptr getInputStream() const; const std::string traceDir_; const serializer::presto::PrestoVectorSerde::PrestoOptions readOptions_{ true, common::CompressionKind_ZSTD, // TODO: Use trace config. - /*nullsFirst=*/true}; + /*_nullsFirst=*/true}; const std::shared_ptr fs_; const RowTypePtr dataType_; memory::MemoryPool* const pool_; - const std::unique_ptr dataStream_; + const std::unique_ptr inputStream_; +}; + +/// Used to read an operator trace summary. +class OperatorTraceSummaryReader { + public: + /// 'traceDir' specifies the operator trace directory. + OperatorTraceSummaryReader(std::string traceDir, memory::MemoryPool* pool); + + /// Read and return the operator trace 'summary'. The function throws if it + /// fails. + OperatorTraceSummary read() const; + + private: + const std::string traceDir_; + const std::shared_ptr fs_; + memory::MemoryPool* const pool_; + const std::unique_ptr summaryFile_; }; } // namespace facebook::velox::exec::trace diff --git a/velox/exec/QueryTraceScan.cpp b/velox/exec/OperatorTraceScan.cpp similarity index 62% rename from velox/exec/QueryTraceScan.cpp rename to velox/exec/OperatorTraceScan.cpp index 5cc4f0b5c223..a976367afa74 100644 --- a/velox/exec/QueryTraceScan.cpp +++ b/velox/exec/OperatorTraceScan.cpp @@ -14,33 +14,32 @@ * limitations under the License. */ -#include "velox/exec/QueryTraceScan.h" +#include "velox/exec/OperatorTraceScan.h" -#include "QueryTraceUtil.h" +#include "velox/exec/TraceUtil.h" namespace facebook::velox::exec::trace { -QueryTraceScan::QueryTraceScan( +OperatorTraceScan::OperatorTraceScan( int32_t operatorId, DriverCtx* driverCtx, - const std::shared_ptr& queryTraceScanNode) + const std::shared_ptr& traceScanNode) : SourceOperator( driverCtx, - queryTraceScanNode->outputType(), + traceScanNode->outputType(), operatorId, - queryTraceScanNode->id(), - "QueryReplayScan") { - const auto dataDir = getDataDir( - queryTraceScanNode->traceDir(), - driverCtx->pipelineId, - driverCtx->driverId); - traceReader_ = std::make_unique( - dataDir, - queryTraceScanNode->outputType(), + traceScanNode->id(), + "OperatorTraceScan") { + traceReader_ = std::make_unique( + getOpTraceDirectory( + traceScanNode->traceDir(), + traceScanNode->pipelineId(), + driverCtx->driverId), + traceScanNode->outputType(), memory::MemoryManager::getInstance()->tracePool()); } -RowVectorPtr QueryTraceScan::getOutput() { +RowVectorPtr OperatorTraceScan::getOutput() { RowVectorPtr batch; if (traceReader_->read(batch)) { return batch; @@ -49,7 +48,7 @@ RowVectorPtr QueryTraceScan::getOutput() { return nullptr; } -bool QueryTraceScan::isFinished() { +bool OperatorTraceScan::isFinished() { return finished_; } diff --git a/velox/exec/QueryTraceScan.h b/velox/exec/OperatorTraceScan.h similarity index 88% rename from velox/exec/QueryTraceScan.h rename to velox/exec/OperatorTraceScan.h index 8bff5bca486d..1542e43f2c41 100644 --- a/velox/exec/QueryTraceScan.h +++ b/velox/exec/OperatorTraceScan.h @@ -18,7 +18,7 @@ #include "velox/core/PlanNode.h" #include "velox/exec/Operator.h" -#include "velox/exec/QueryDataReader.h" +#include "velox/exec/OperatorTraceReader.h" namespace facebook::velox::exec::trace { /// This is a scan operator for query replay. It uses traced data from a @@ -38,13 +38,12 @@ namespace facebook::velox::exec::trace { /// It can be found from the QueryReplayScanNode. However the pipeline ID and /// driver ID are only known during operator creation, so we need to figure out /// the input traced data file and the output type dynamically. -class QueryTraceScan final : public SourceOperator { +class OperatorTraceScan final : public SourceOperator { public: - QueryTraceScan( + OperatorTraceScan( int32_t operatorId, DriverCtx* driverCtx, - const std::shared_ptr& - queryTraceScanNode); + const std::shared_ptr& traceScanNode); RowVectorPtr getOutput() override; @@ -55,7 +54,7 @@ class QueryTraceScan final : public SourceOperator { bool isFinished() override; private: - std::unique_ptr traceReader_; + std::unique_ptr traceReader_; bool finished_{false}; }; diff --git a/velox/exec/QueryDataWriter.cpp b/velox/exec/OperatorTraceWriter.cpp similarity index 59% rename from velox/exec/QueryDataWriter.cpp rename to velox/exec/OperatorTraceWriter.cpp index 7ab4e6d02ec0..75f2a5609c01 100644 --- a/velox/exec/QueryDataWriter.cpp +++ b/velox/exec/OperatorTraceWriter.cpp @@ -14,31 +14,32 @@ * limitations under the License. */ -#include "velox/exec/QueryDataWriter.h" +#include "velox/exec/OperatorTraceWriter.h" #include -#include "velox/common/base/SpillStats.h" #include "velox/common/file/File.h" #include "velox/common/file/FileSystems.h" -#include "velox/exec/QueryTraceTraits.h" -#include "velox/serializers/PrestoSerializer.h" +#include "velox/exec/Operator.h" +#include "velox/exec/Trace.h" +#include "velox/exec/TraceUtil.h" namespace facebook::velox::exec::trace { -QueryDataWriter::QueryDataWriter( - std::string path, +OperatorTraceWriter::OperatorTraceWriter( + Operator* traceOp, + std::string traceDir, memory::MemoryPool* pool, UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB) - : dirPath_(std::move(path)), - fs_(filesystems::getFileSystem(dirPath_, nullptr)), + : traceOp_(traceOp), + traceDir_(std::move(traceDir)), + fs_(filesystems::getFileSystem(traceDir_, nullptr)), pool_(pool), updateAndCheckTraceLimitCB_(std::move(updateAndCheckTraceLimitCB)) { - dataFile_ = fs_->openFileForWrite( - fmt::format("{}/{}", dirPath_, QueryTraceTraits::kDataFileName)); - VELOX_CHECK_NOT_NULL(dataFile_); + traceFile_ = fs_->openFileForWrite(getOpTraceInputFilePath(traceDir_)); + VELOX_CHECK_NOT_NULL(traceFile_); } -void QueryDataWriter::write(const RowVectorPtr& rows) { +void OperatorTraceWriter::write(const RowVectorPtr& rows) { if (FOLLY_UNLIKELY(finished_)) { return; } @@ -61,37 +62,36 @@ void QueryDataWriter::write(const RowVectorPtr& rows) { batch_->flush(&out); batch_->clear(); auto iobuf = out.getIOBuf(); - if (FOLLY_UNLIKELY( - updateAndCheckTraceLimitCB_(iobuf->computeChainDataLength()))) { - finish(true); - return; - } - dataFile_->append(std::move(iobuf)); + updateAndCheckTraceLimitCB_(iobuf->computeChainDataLength()); + traceFile_->append(std::move(iobuf)); } -void QueryDataWriter::finish(bool limitExceeded) { +void OperatorTraceWriter::finish() { if (finished_) { return; } VELOX_CHECK_NOT_NULL( - dataFile_, "The query data writer has already been finished"); - dataFile_->close(); - dataFile_.reset(); + traceFile_, "The query data writer has already been finished"); + traceFile_->close(); + traceFile_.reset(); batch_.reset(); - writeSummary(limitExceeded); + writeSummary(); finished_ = true; } -void QueryDataWriter::writeSummary(bool limitExceeded) const { - const auto summaryFilePath = - fmt::format("{}/{}", dirPath_, QueryTraceTraits::kDataSummaryFileName); +void OperatorTraceWriter::writeSummary() const { + const auto summaryFilePath = getOpTraceSummaryFilePath(traceDir_); const auto file = fs_->openFileForWrite(summaryFilePath); folly::dynamic obj = folly::dynamic::object; if (dataType_ != nullptr) { - obj[QueryTraceTraits::kDataTypeKey] = dataType_->serialize(); + obj[TraceTraits::kDataTypeKey] = dataType_->serialize(); } - obj[QueryTraceTraits::kTraceLimitExceededKey] = limitExceeded; + obj[OperatorTraceTraits::kOpTypeKey] = traceOp_->operatorType(); + const auto stats = traceOp_->stats(/*clear=*/false); + obj[OperatorTraceTraits::kPeakMemoryKey] = + stats.memoryStats.peakTotalMemoryReservation; + obj[OperatorTraceTraits::kInputRowsKey] = stats.inputPositions; file->append(folly::toJson(obj)); file->close(); } diff --git a/velox/exec/QueryDataWriter.h b/velox/exec/OperatorTraceWriter.h similarity index 79% rename from velox/exec/QueryDataWriter.h rename to velox/exec/OperatorTraceWriter.h index 8e6073dcd3b7..9999f2c31dbd 100644 --- a/velox/exec/QueryDataWriter.h +++ b/velox/exec/OperatorTraceWriter.h @@ -16,21 +16,27 @@ #pragma once -#include "QueryTraceConfig.h" +#include "TraceConfig.h" #include "velox/common/file/File.h" #include "velox/common/file/FileSystems.h" -#include "velox/exec/QueryTraceTraits.h" #include "velox/serializers/PrestoSerializer.h" #include "velox/vector/VectorStream.h" +namespace facebook::velox::exec { +class Operator; +} + namespace facebook::velox::exec::trace { /// Used to serialize and write the input vectors from a given operator into a /// file. -class QueryDataWriter { +class OperatorTraceWriter { public: - explicit QueryDataWriter( - std::string path, + /// 'traceOp' is the operator to trace. 'traceDir' specifies the trace + /// directory for the operator. + explicit OperatorTraceWriter( + Operator* traceOp, + std::string traceDir, memory::MemoryPool* pool, UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB); @@ -38,18 +44,16 @@ class QueryDataWriter { void write(const RowVectorPtr& rows); /// Closes the data file and writes out the data summary. - /// - /// @param limitExceeded A flag indicates the written data bytes exceed the - /// limit causing the 'QueryDataWriter' to finish early. - void finish(bool limitExceeded = false); + void finish(); private: // Flushes the trace data summaries to the disk. // // TODO: add more summaries such as number of rows etc. - void writeSummary(bool limitExceeded = false) const; + void writeSummary() const; - const std::string dirPath_; + Operator* const traceOp_; + const std::string traceDir_; // TODO: make 'useLosslessTimestamp' configuerable. const serializer::presto::PrestoVectorSerde::PrestoOptions options_ = { true, @@ -58,9 +62,11 @@ class QueryDataWriter { const std::shared_ptr fs_; memory::MemoryPool* const pool_; const UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB_; - std::unique_ptr dataFile_; + + std::unique_ptr traceFile_; TypePtr dataType_; std::unique_ptr batch_; + bool limitExceeded_{false}; bool finished_{false}; }; diff --git a/velox/exec/PartitionedOutput.cpp b/velox/exec/PartitionedOutput.cpp index cabff0661146..6b49fe22efa8 100644 --- a/velox/exec/PartitionedOutput.cpp +++ b/velox/exec/PartitionedOutput.cpp @@ -233,8 +233,6 @@ void PartitionedOutput::estimateRowSizes() { } void PartitionedOutput::addInput(RowVectorPtr input) { - traceInput(input); - initializeInput(std::move(input)); initializeDestinations(); initializeSizeBuffers(); diff --git a/velox/exec/QueryDataReader.cpp b/velox/exec/QueryDataReader.cpp deleted file mode 100644 index ba08205dd8f2..000000000000 --- a/velox/exec/QueryDataReader.cpp +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include - -#include "velox/exec/QueryDataReader.h" - -#include "velox/common/file/File.h" -#include "velox/exec/QueryTraceTraits.h" - -namespace facebook::velox::exec::trace { - -QueryDataReader::QueryDataReader( - std::string traceDir, - RowTypePtr dataType, - memory::MemoryPool* pool) - : traceDir_(std::move(traceDir)), - fs_(filesystems::getFileSystem(traceDir_, nullptr)), - dataType_(std::move(dataType)), - pool_(pool), - dataStream_(getDataInputStream()) { - VELOX_CHECK_NOT_NULL(dataType_); - VELOX_CHECK_NOT_NULL(dataStream_); -} - -bool QueryDataReader::read(RowVectorPtr& batch) const { - if (dataStream_->atEnd()) { - batch = nullptr; - return false; - } - - VectorStreamGroup::read( - dataStream_.get(), pool_, dataType_, &batch, &readOptions_); - return true; -} - -std::unique_ptr QueryDataReader::getDataInputStream() - const { - auto dataFile = fs_->openFileForRead( - fmt::format("{}/{}", traceDir_, QueryTraceTraits::kDataFileName)); - // TODO: Make the buffer size configurable. - return std::make_unique( - std::move(dataFile), 1 << 20, pool_); -} - -} // namespace facebook::velox::exec::trace diff --git a/velox/exec/QueryTraceUtil.cpp b/velox/exec/QueryTraceUtil.cpp deleted file mode 100644 index f3a339eec080..000000000000 --- a/velox/exec/QueryTraceUtil.cpp +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "velox/exec/QueryTraceUtil.h" - -#include - -#include "velox/common/base/Exceptions.h" -#include "velox/common/file/File.h" -#include "velox/common/file/FileSystems.h" -#include "velox/exec/QueryTraceTraits.h" - -namespace facebook::velox::exec::trace { - -void createTraceDirectory(const std::string& traceDir) { - try { - const auto fs = filesystems::getFileSystem(traceDir, nullptr); - if (fs->exists(traceDir)) { - fs->rmdir(traceDir); - } - fs->mkdir(traceDir); - } catch (const std::exception& e) { - VELOX_FAIL( - "Failed to create trace directory '{}' with error: {}", - traceDir, - e.what()); - } -} - -std::vector getTaskIds( - const std::string& traceDir, - const std::shared_ptr& fs) { - VELOX_USER_CHECK(fs->exists(traceDir), "{} dose not exist", traceDir); - try { - const auto taskDirs = fs->list(traceDir); - std::vector taskIds; - for (const auto& taskDir : taskDirs) { - std::vector pathNodes; - folly::split("/", taskDir, pathNodes); - taskIds.emplace_back(std::move(pathNodes.back())); - } - return taskIds; - } catch (const std::exception& e) { - VELOX_FAIL( - "Failed to list the directory '{}' with error: {}", traceDir, e.what()); - } -} - -folly::dynamic getMetadata( - const std::string& metadataFile, - const std::shared_ptr& fs) { - try { - const auto file = fs->openFileForRead(metadataFile); - VELOX_CHECK_NOT_NULL(file); - const auto metadata = file->pread(0, file->size()); - VELOX_USER_CHECK(!metadata.empty()); - return folly::parseJson(metadata); - } catch (const std::exception& e) { - VELOX_FAIL( - "Failed to get the query metadata from '{}' with error: {}", - metadataFile, - e.what()); - } -} - -RowTypePtr getDataType( - const core::PlanNodePtr& tracedPlan, - const std::string& tracedNodeId, - size_t sourceIndex) { - const auto* traceNode = core::PlanNode::findFirstNode( - tracedPlan.get(), [&tracedNodeId](const core::PlanNode* node) { - return node->id() == tracedNodeId; - }); - VELOX_CHECK_NOT_NULL( - traceNode, - "traced node id {} not found in the traced plan", - tracedNodeId); - return traceNode->sources().at(sourceIndex)->outputType(); -} - -uint8_t getNumDrivers( - const std::string& rootDir, - const std::string& taskId, - const std::string& nodeId, - int32_t pipelineId, - const std::shared_ptr& fs) { - const auto traceDir = - fmt::format("{}/{}/{}/{}", rootDir, taskId, nodeId, pipelineId); - const auto driverDirs = fs->list(traceDir); - return driverDirs.size(); -} - -std::string -getDataDir(const std::string& traceDir, int pipelineId, int driverId) { - return fmt::format("{}/{}/{}/data", traceDir, pipelineId, driverId); -} - -} // namespace facebook::velox::exec::trace diff --git a/velox/exec/SortWindowBuild.h b/velox/exec/SortWindowBuild.h index aa364846ba51..45e7b3390cb1 100644 --- a/velox/exec/SortWindowBuild.h +++ b/velox/exec/SortWindowBuild.h @@ -35,7 +35,7 @@ class SortWindowBuild : public WindowBuild { tsan_atomic* nonReclaimableSection, folly::Synchronized* spillStats); - ~SortWindowBuild() { + ~SortWindowBuild() override { pool_->release(); } diff --git a/velox/exec/TableWriter.cpp b/velox/exec/TableWriter.cpp index 3d289621f38a..4899e4b4ebf8 100644 --- a/velox/exec/TableWriter.cpp +++ b/velox/exec/TableWriter.cpp @@ -126,7 +126,7 @@ void TableWriter::addInput(RowVectorPtr input) { if (input->size() == 0) { return; } - traceInput(input); + std::vector mappedChildren; mappedChildren.reserve(inputMapping_.size()); for (const auto i : inputMapping_) { diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 22e68d6c475d..0bce65f91d14 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -24,14 +24,14 @@ #include "velox/common/testutil/TestValue.h" #include "velox/common/time/Timer.h" #include "velox/exec/Exchange.h" -#include "velox/exec/HashBuild.h" +#include "velox/exec/HashJoinBridge.h" #include "velox/exec/LocalPlanner.h" #include "velox/exec/MemoryReclaimer.h" #include "velox/exec/NestedLoopJoinBuild.h" #include "velox/exec/OperatorUtils.h" #include "velox/exec/OutputBufferManager.h" -#include "velox/exec/QueryTraceUtil.h" #include "velox/exec/Task.h" +#include "velox/exec/TraceUtil.h" using facebook::velox::common::testutil::TestValue; @@ -304,7 +304,7 @@ Task::Task( dynamic_cast(queryCtx_->executor())); } - maybeInitQueryTrace(); + maybeInitTrace(); } Task::~Task() { @@ -2888,7 +2888,7 @@ std::shared_ptr Task::getExchangeClientLocked( return exchangeClients_[pipelineId]; } -std::optional Task::maybeMakeTraceConfig() const { +std::optional Task::maybeMakeTraceConfig() const { const auto& queryConfig = queryCtx_->queryConfig(); if (!queryConfig.queryTraceEnabled()) { return std::nullopt; @@ -2906,41 +2906,62 @@ std::optional Task::maybeMakeTraceConfig() const { return std::nullopt; } - const auto traceDir = - fmt::format("{}/{}", queryConfig.queryTraceDir(), taskId_); - const auto queryTraceNodes = queryConfig.queryTraceNodeIds(); - if (queryTraceNodes.empty()) { - LOG(INFO) << "Trace metadata for task: " << taskId_; - return trace::QueryTraceConfig(traceDir); + const auto traceNodes = queryConfig.queryTraceNodeIds(); + VELOX_USER_CHECK(!traceNodes.empty(), "Query trace nodes are not set"); + + const auto traceDir = trace::getTaskTraceDirectory( + queryConfig.queryTraceDir(), queryCtx_->queryId(), taskId_); + + std::vector traceNodeIds; + folly::split(',', traceNodes, traceNodeIds); + std::unordered_set traceNodeIdSet( + traceNodeIds.begin(), traceNodeIds.end()); + VELOX_USER_CHECK_EQ( + traceNodeIdSet.size(), + traceNodeIds.size(), + "Duplicate trace nodes found: {}", + folly::join(", ", traceNodeIds)); + + bool foundTraceNode{false}; + for (const auto& traceNodeId : traceNodeIds) { + if (core::PlanNode::findFirstNode( + planFragment_.planNode.get(), + [traceNodeId](const core::PlanNode* node) -> bool { + return node->id() == traceNodeId; + })) { + foundTraceNode = true; + break; + } } + VELOX_USER_CHECK( + foundTraceNode, + "Trace plan nodes not found from task {}: {}", + taskId_, + folly::join(",", traceNodeIdSet)); - std::vector nodes; - folly::split(',', queryTraceNodes, nodes); - std::unordered_set nodeSet(nodes.begin(), nodes.end()); - VELOX_CHECK_EQ(nodeSet.size(), nodes.size()); - LOG(INFO) << "Trace data for task " << taskId_ << " with plan nodes " - << queryTraceNodes; + LOG(INFO) << "Trace input for plan nodes " << traceNodes << " from task " + << taskId_; trace::UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB = [this](uint64_t bytes) { - return queryCtx_->updateTracedBytesAndCheckLimit(bytes); + queryCtx_->updateTracedBytesAndCheckLimit(bytes); }; - return trace::QueryTraceConfig( - std::move(nodeSet), + return trace::TraceConfig( + std::move(traceNodeIdSet), traceDir, std::move(updateAndCheckTraceLimitCB), queryConfig.queryTraceTaskRegExp()); } -void Task::maybeInitQueryTrace() { +void Task::maybeInitTrace() { if (!traceConfig_) { return; } trace::createTraceDirectory(traceConfig_->queryTraceDir); - const auto queryMetadatWriter = std::make_unique( + const auto metadataWriter = std::make_unique( traceConfig_->queryTraceDir, memory::traceMemoryPool()); - queryMetadatWriter->write(queryCtx_, planFragment_.planNode); + metadataWriter->write(queryCtx_, planFragment_.planNode); } void Task::testingVisitDrivers(const std::function& callback) { diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 526ab710400c..7f421952ce51 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -21,11 +21,11 @@ #include "velox/exec/LocalPartition.h" #include "velox/exec/MemoryReclaimer.h" #include "velox/exec/MergeSource.h" -#include "velox/exec/QueryMetadataWriter.h" -#include "velox/exec/QueryTraceConfig.h" #include "velox/exec/Split.h" #include "velox/exec/TaskStats.h" #include "velox/exec/TaskStructs.h" +#include "velox/exec/TaskTraceWriter.h" +#include "velox/exec/TraceConfig.h" #include "velox/vector/ComplexVector.h" namespace facebook::velox::exec { @@ -139,7 +139,7 @@ class Task : public std::enable_shared_from_this { } /// Returns query trace config if specified. - const std::optional& queryTraceConfig() const { + const std::optional& traceConfig() const { return traceConfig_; } @@ -980,11 +980,11 @@ class Task : public std::enable_shared_from_this { int32_t pipelineId) const; // Builds the query trace config. - std::optional maybeMakeTraceConfig() const; + std::optional maybeMakeTraceConfig() const; // Create a 'QueryMetadtaWriter' to trace the query metadata if the query // trace enabled. - void maybeInitQueryTrace(); + void maybeInitTrace(); // Universally unique identifier of the task. Used to identify the task when // calling TaskListener. @@ -1004,7 +1004,7 @@ class Task : public std::enable_shared_from_this { core::PlanFragment planFragment_; - const std::optional traceConfig_; + const std::optional traceConfig_; // Hook in the system wide task list. TaskListEntry taskListEntry_; diff --git a/velox/exec/QueryMetadataReader.cpp b/velox/exec/TaskTraceReader.cpp similarity index 73% rename from velox/exec/QueryMetadataReader.cpp rename to velox/exec/TaskTraceReader.cpp index f9fdc9ec2e80..6ed295190f06 100644 --- a/velox/exec/QueryMetadataReader.cpp +++ b/velox/exec/TaskTraceReader.cpp @@ -14,44 +14,40 @@ * limitations under the License. */ -#include "velox/exec/QueryMetadataReader.h" +#include "velox/exec/TaskTraceReader.h" -#include "velox/common/file/File.h" #include "velox/common/file/FileSystems.h" #include "velox/core/PlanNode.h" -#include "velox/exec/QueryTraceTraits.h" -#include "velox/exec/QueryTraceUtil.h" +#include "velox/exec/Trace.h" +#include "velox/exec/TraceUtil.h" namespace facebook::velox::exec::trace { -QueryMetadataReader::QueryMetadataReader( +TaskTraceMetadataReader::TaskTraceMetadataReader( std::string traceDir, memory::MemoryPool* pool) : traceDir_(std::move(traceDir)), fs_(filesystems::getFileSystem(traceDir_, nullptr)), - metaFilePath_(fmt::format( - "{}/{}", - traceDir_, - QueryTraceTraits::kQueryMetaFileName)), + traceFilePath_(getTaskTraceMetaFilePath(traceDir_)), pool_(pool) { VELOX_CHECK_NOT_NULL(fs_); - VELOX_CHECK(fs_->exists(metaFilePath_)); + VELOX_CHECK(fs_->exists(traceFilePath_)); } -void QueryMetadataReader::read( +void TaskTraceMetadataReader::read( std::unordered_map& queryConfigs, std::unordered_map< std::string, std::unordered_map>& connectorProperties, core::PlanNodePtr& queryPlan) const { - folly::dynamic metaObj = getMetadata(metaFilePath_, fs_); - const auto& queryConfigObj = metaObj[QueryTraceTraits::kQueryConfigKey]; + folly::dynamic metaObj = getTaskMetadata(traceFilePath_, fs_); + const auto& queryConfigObj = metaObj[TraceTraits::kQueryConfigKey]; for (const auto& [key, value] : queryConfigObj.items()) { queryConfigs[key.asString()] = value.asString(); } const auto& connectorPropertiesObj = - metaObj[QueryTraceTraits::kConnectorPropertiesKey]; + metaObj[TraceTraits::kConnectorPropertiesKey]; for (const auto& [connectorId, configs] : connectorPropertiesObj.items()) { const auto connectorIdStr = connectorId.asString(); connectorProperties[connectorIdStr] = {}; @@ -61,6 +57,6 @@ void QueryMetadataReader::read( } queryPlan = ISerializable::deserialize( - metaObj[QueryTraceTraits::kPlanNodeKey], pool_); + metaObj[TraceTraits::kPlanNodeKey], pool_); } } // namespace facebook::velox::exec::trace diff --git a/velox/exec/QueryMetadataReader.h b/velox/exec/TaskTraceReader.h similarity index 84% rename from velox/exec/QueryMetadataReader.h rename to velox/exec/TaskTraceReader.h index 71217653d4dc..df3abcffc3b6 100644 --- a/velox/exec/QueryMetadataReader.h +++ b/velox/exec/TaskTraceReader.h @@ -18,13 +18,11 @@ #include "velox/common/file/FileSystems.h" #include "velox/core/PlanNode.h" -#include "velox/core/QueryCtx.h" -#include "velox/vector/VectorStream.h" namespace facebook::velox::exec::trace { -class QueryMetadataReader { +class TaskTraceMetadataReader { public: - explicit QueryMetadataReader(std::string traceDir, memory::MemoryPool* pool); + TaskTraceMetadataReader(std::string traceDir, memory::MemoryPool* pool); void read( std::unordered_map& queryConfigs, @@ -36,7 +34,7 @@ class QueryMetadataReader { private: const std::string traceDir_; const std::shared_ptr fs_; - const std::string metaFilePath_; + const std::string traceFilePath_; memory::MemoryPool* const pool_; }; } // namespace facebook::velox::exec::trace diff --git a/velox/exec/QueryMetadataWriter.cpp b/velox/exec/TaskTraceWriter.cpp similarity index 75% rename from velox/exec/QueryMetadataWriter.cpp rename to velox/exec/TaskTraceWriter.cpp index a8bbe6633c66..cbf0413fd0b4 100644 --- a/velox/exec/QueryMetadataWriter.cpp +++ b/velox/exec/TaskTraceWriter.cpp @@ -14,30 +14,27 @@ * limitations under the License. */ -#include "velox/exec/QueryMetadataWriter.h" -#include "velox/common/config/Config.h" +#include "velox/exec/TaskTraceWriter.h" #include "velox/common/file/File.h" #include "velox/core/PlanNode.h" #include "velox/core/QueryCtx.h" -#include "velox/exec/QueryTraceTraits.h" +#include "velox/exec/Trace.h" +#include "velox/exec/TraceUtil.h" namespace facebook::velox::exec::trace { -QueryMetadataWriter::QueryMetadataWriter( +TaskTraceMetadataWriter::TaskTraceMetadataWriter( std::string traceDir, memory::MemoryPool* pool) : traceDir_(std::move(traceDir)), fs_(filesystems::getFileSystem(traceDir_, nullptr)), - metaFilePath_(fmt::format( - "{}/{}", - traceDir_, - QueryTraceTraits::kQueryMetaFileName)), + traceFilePath_(getTaskTraceMetaFilePath(traceDir_)), pool_(pool) { VELOX_CHECK_NOT_NULL(fs_); - VELOX_CHECK(!fs_->exists(metaFilePath_)); + VELOX_CHECK(!fs_->exists(traceFilePath_)); } -void QueryMetadataWriter::write( +void TaskTraceMetadataWriter::write( const std::shared_ptr& queryCtx, const core::PlanNodePtr& planNode) { VELOX_CHECK(!finished_, "Query metadata can only be written once"); @@ -59,12 +56,12 @@ void QueryMetadataWriter::write( } folly::dynamic metaObj = folly::dynamic::object; - metaObj[QueryTraceTraits::kQueryConfigKey] = queryConfigObj; - metaObj[QueryTraceTraits::kConnectorPropertiesKey] = connectorPropertiesObj; - metaObj[QueryTraceTraits::kPlanNodeKey] = planNode->serialize(); + metaObj[TraceTraits::kQueryConfigKey] = queryConfigObj; + metaObj[TraceTraits::kConnectorPropertiesKey] = connectorPropertiesObj; + metaObj[TraceTraits::kPlanNodeKey] = planNode->serialize(); const auto metaStr = folly::toJson(metaObj); - const auto file = fs_->openFileForWrite(metaFilePath_); + const auto file = fs_->openFileForWrite(traceFilePath_); file->append(metaStr); file->close(); } diff --git a/velox/exec/QueryMetadataWriter.h b/velox/exec/TaskTraceWriter.h similarity index 85% rename from velox/exec/QueryMetadataWriter.h rename to velox/exec/TaskTraceWriter.h index f2cc66123851..5b3105104dd6 100644 --- a/velox/exec/QueryMetadataWriter.h +++ b/velox/exec/TaskTraceWriter.h @@ -19,12 +19,11 @@ #include "velox/common/file/FileSystems.h" #include "velox/core/PlanNode.h" #include "velox/core/QueryCtx.h" -#include "velox/vector/VectorStream.h" namespace facebook::velox::exec::trace { -class QueryMetadataWriter { +class TaskTraceMetadataWriter { public: - explicit QueryMetadataWriter(std::string traceDir, memory::MemoryPool* pool); + TaskTraceMetadataWriter(std::string traceDir, memory::MemoryPool* pool); void write( const std::shared_ptr& queryCtx, @@ -33,7 +32,7 @@ class QueryMetadataWriter { private: const std::string traceDir_; const std::shared_ptr fs_; - const std::string metaFilePath_; + const std::string traceFilePath_; memory::MemoryPool* const pool_; bool finished_{false}; }; diff --git a/velox/exec/QueryTraceTraits.h b/velox/exec/Trace.cpp similarity index 50% rename from velox/exec/QueryTraceTraits.h rename to velox/exec/Trace.cpp index ad817115b490..b4d224fe4daf 100644 --- a/velox/exec/QueryTraceTraits.h +++ b/velox/exec/Trace.cpp @@ -14,22 +14,19 @@ * limitations under the License. */ -#pragma once +#include "velox/exec/Trace.h" -#include +#include + +#include "velox/common/base/SuccinctPrinter.h" namespace facebook::velox::exec::trace { -/// Defines the shared constants used by query trace implementation. -struct QueryTraceTraits { - static inline const std::string kPlanNodeKey = "planNode"; - static inline const std::string kQueryConfigKey = "queryConfig"; - static inline const std::string kDataTypeKey = "rowType"; - static inline const std::string kConnectorPropertiesKey = - "connectorProperties"; - static inline const std::string kTraceLimitExceededKey = "traceLimitExceeded"; - static inline const std::string kQueryMetaFileName = "query_meta.json"; - static inline const std::string kDataSummaryFileName = "data_summary.json"; - static inline const std::string kDataFileName = "trace.data"; -}; +std::string OperatorTraceSummary::toString() const { + return fmt::format( + "opType {}, inputRows {}, peakMemory {}", + opType, + inputRows, + succinctBytes(peakMemory)); +} } // namespace facebook::velox::exec::trace diff --git a/velox/exec/Trace.h b/velox/exec/Trace.h new file mode 100644 index 000000000000..6bf7c6103b84 --- /dev/null +++ b/velox/exec/Trace.h @@ -0,0 +1,60 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace facebook::velox::exec::trace { +/// Defines the shared constants used by query trace implementation. +struct TraceTraits { + static inline const std::string kPlanNodeKey = "planNode"; + static inline const std::string kQueryConfigKey = "queryConfig"; + static inline const std::string kDataTypeKey = "rowType"; + static inline const std::string kConnectorPropertiesKey = + "connectorProperties"; + + static inline const std::string kTaskMetaFileName = "task_trace_meta.json"; +}; + +struct OperatorTraceTraits { + static inline const std::string kSummaryFileName = "op_trace_summary.json"; + static inline const std::string kInputFileName = "op_input_trace.data"; + + /// Keys for operator trace summary file. + static inline const std::string kOpTypeKey = "opType"; + static inline const std::string kPeakMemoryKey = "peakMemory"; + static inline const std::string kInputRowsKey = "inputhRows"; +}; + +/// Contains the summary of an operator trace. +struct OperatorTraceSummary { + std::string opType; + uint64_t inputRows{0}; + uint64_t peakMemory{0}; + + std::string toString() const; +}; + +#define VELOX_TRACE_LIMIT_EXCEEDED(errorMessage) \ + _VELOX_THROW( \ + ::facebook::velox::VeloxRuntimeError, \ + ::facebook::velox::error_source::kErrorSourceRuntime.c_str(), \ + ::facebook::velox::error_code::kTraceLimitExceeded.c_str(), \ + /* isRetriable */ true, \ + "{}", \ + errorMessage); +} // namespace facebook::velox::exec::trace diff --git a/velox/exec/QueryTraceConfig.cpp b/velox/exec/TraceConfig.cpp similarity index 75% rename from velox/exec/QueryTraceConfig.cpp rename to velox/exec/TraceConfig.cpp index d43479d64608..ecb64aec3b74 100644 --- a/velox/exec/QueryTraceConfig.cpp +++ b/velox/exec/TraceConfig.cpp @@ -14,13 +14,15 @@ * limitations under the License. */ -#include "velox/exec/QueryTraceConfig.h" +#include "velox/exec/TraceConfig.h" #include +#include "velox/common/base/Exceptions.h" + namespace facebook::velox::exec::trace { -QueryTraceConfig::QueryTraceConfig( +TraceConfig::TraceConfig( std::unordered_set _queryNodeIds, std::string _queryTraceDir, UpdateAndCheckTraceLimitCB _updateAndCheckTraceLimitCB, @@ -28,13 +30,7 @@ QueryTraceConfig::QueryTraceConfig( : queryNodes(std::move(_queryNodeIds)), queryTraceDir(std::move(_queryTraceDir)), updateAndCheckTraceLimitCB(std::move(_updateAndCheckTraceLimitCB)), - taskRegExp(std::move(_taskRegExp)) {} - -QueryTraceConfig::QueryTraceConfig(std::string _queryTraceDir) - : QueryTraceConfig( - std::unordered_set{}, - std::move(_queryTraceDir), - [](uint64_t) { return false; }, - ".*") {} - + taskRegExp(std::move(_taskRegExp)) { + VELOX_CHECK(!queryNodes.empty(), "Query trace nodes cannot be empty"); +} } // namespace facebook::velox::exec::trace diff --git a/velox/exec/QueryTraceConfig.h b/velox/exec/TraceConfig.h similarity index 88% rename from velox/exec/QueryTraceConfig.h rename to velox/exec/TraceConfig.h index 200e5cb52ce4..084df3173e3c 100644 --- a/velox/exec/QueryTraceConfig.h +++ b/velox/exec/TraceConfig.h @@ -25,9 +25,9 @@ namespace facebook::velox::exec::trace { /// The callback used to update and aggregate the trace bytes of a query. If the /// query trace limit is set, the callback return true if the aggregate traced /// bytes exceed the set limit otherwise return false. -using UpdateAndCheckTraceLimitCB = std::function; +using UpdateAndCheckTraceLimitCB = std::function; -struct QueryTraceConfig { +struct TraceConfig { /// Target query trace nodes. std::unordered_set queryNodes; /// Base dir of query trace. @@ -36,14 +36,10 @@ struct QueryTraceConfig { /// The trace task regexp. std::string taskRegExp; - QueryTraceConfig( + TraceConfig( std::unordered_set _queryNodeIds, std::string _queryTraceDir, UpdateAndCheckTraceLimitCB _updateAndCheckTraceLimitCB, std::string _taskRegExp); - - QueryTraceConfig(std::string _queryTraceDir); - - QueryTraceConfig() = default; }; } // namespace facebook::velox::exec::trace diff --git a/velox/exec/TraceUtil.cpp b/velox/exec/TraceUtil.cpp new file mode 100644 index 000000000000..4df3b3f12e10 --- /dev/null +++ b/velox/exec/TraceUtil.cpp @@ -0,0 +1,182 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/TraceUtil.h" + +#include + +#include "velox/common/base/Exceptions.h" +#include "velox/common/file/File.h" +#include "velox/common/file/FileSystems.h" +#include "velox/exec/Trace.h" + +namespace facebook::velox::exec::trace { +namespace { +std::string findLastPathNode(const std::string& path) { + std::vector pathNodes; + folly::split("/", path, pathNodes); + while (!pathNodes.empty() && pathNodes.back().empty()) { + pathNodes.pop_back(); + } + VELOX_CHECK(!pathNodes.empty(), "No valid path nodes found from {}", path); + return pathNodes.back(); +} +} // namespace + +void createTraceDirectory(const std::string& traceDir) { + try { + const auto fs = filesystems::getFileSystem(traceDir, nullptr); + if (fs->exists(traceDir)) { + fs->rmdir(traceDir); + } + fs->mkdir(traceDir); + } catch (const std::exception& e) { + VELOX_FAIL( + "Failed to create trace directory '{}' with error: {}", + traceDir, + e.what()); + } +} + +std::string getQueryTraceDirectory( + const std::string& traceDir, + const std::string& queryId) { + return fmt::format("{}/{}", traceDir, queryId); +} + +std::string getTaskTraceDirectory( + const std::string& traceDir, + const Task& task) { + return getTaskTraceDirectory( + traceDir, task.queryCtx()->queryId(), task.taskId()); +} + +std::string getTaskTraceDirectory( + const std::string& traceDir, + const std::string& queryId, + const std::string& taskId) { + return fmt::format( + "{}/{}", getQueryTraceDirectory(traceDir, queryId), taskId); +} + +std::string getTaskTraceMetaFilePath(const std::string& taskTraceDir) { + return fmt::format("{}/{}", taskTraceDir, TraceTraits::kTaskMetaFileName); +} + +std::string getNodeTraceDirectory( + const std::string& taskTraceDir, + const std::string& nodeId) { + return fmt::format("{}/{}", taskTraceDir, nodeId); +} + +std::string getPipelineTraceDirectory( + const std::string& nodeTraceDir, + uint32_t pipelineId) { + return fmt::format("{}/{}", nodeTraceDir, pipelineId); +} + +std::string getOpTraceDirectory( + const std::string& taskTraceDir, + const std::string& nodeId, + uint32_t pipelineId, + uint32_t driverId) { + return getOpTraceDirectory( + getNodeTraceDirectory(taskTraceDir, nodeId), pipelineId, driverId); +} + +std::string getOpTraceDirectory( + const std::string& nodeTraceDir, + int pipelineId, + int driverId) { + return fmt::format("{}/{}/{}", nodeTraceDir, pipelineId, driverId); +} + +std::string getOpTraceInputFilePath(const std::string& opTraceDir) { + return fmt::format("{}/{}", opTraceDir, OperatorTraceTraits::kInputFileName); +} + +std::string getOpTraceSummaryFilePath(const std::string& opTraceDir) { + return fmt::format( + "{}/{}", opTraceDir, OperatorTraceTraits::kSummaryFileName); +} + +std::vector getTaskIds( + const std::string& traceDir, + const std::string& queryId, + const std::shared_ptr& fs) { + const auto queryTraceDir = getQueryTraceDirectory(traceDir, queryId); + VELOX_USER_CHECK( + fs->exists(queryTraceDir), "{} dose not exist", queryTraceDir); + const auto taskDirs = fs->list(queryTraceDir); + std::vector taskIds; + for (const auto& taskDir : taskDirs) { + taskIds.emplace_back(findLastPathNode(taskDir)); + } + return taskIds; +} + +folly::dynamic getTaskMetadata( + const std::string& taskMetaFilePath, + const std::shared_ptr& fs) { + try { + const auto file = fs->openFileForRead(taskMetaFilePath); + VELOX_CHECK_NOT_NULL(file); + const auto taskMeta = file->pread(0, file->size()); + VELOX_USER_CHECK(!taskMeta.empty()); + return folly::parseJson(taskMeta); + } catch (const std::exception& e) { + VELOX_FAIL( + "Failed to get the query metadata from '{}' with error: {}", + taskMetaFilePath, + e.what()); + } +} + +RowTypePtr getDataType( + const core::PlanNodePtr& tracedPlan, + const std::string& tracedNodeId, + size_t sourceIndex) { + const auto* traceNode = core::PlanNode::findFirstNode( + tracedPlan.get(), [&tracedNodeId](const core::PlanNode* node) { + return node->id() == tracedNodeId; + }); + VELOX_CHECK_NOT_NULL( + traceNode, + "traced node id {} not found in the traced plan", + tracedNodeId); + return traceNode->sources().at(sourceIndex)->outputType(); +} + +std::vector listDriverIds( + const std::string& nodeTraceDir, + uint32_t pipelineId, + const std::shared_ptr& fs) { + const auto pipelineDir = getPipelineTraceDirectory(nodeTraceDir, pipelineId); + const auto driverDirs = fs->list(pipelineDir); + std::vector driverIds; + for (const auto& driverDir : driverDirs) { + driverIds.emplace_back(folly::to(findLastPathNode(driverDir))); + } + return driverIds; +} + +size_t getNumDrivers( + const std::string& nodeTraceDir, + uint32_t pipelineId, + const std::shared_ptr& fs) { + return listDriverIds(nodeTraceDir, pipelineId, fs).size(); +} +} // namespace facebook::velox::exec::trace diff --git a/velox/exec/QueryTraceUtil.h b/velox/exec/TraceUtil.h similarity index 50% rename from velox/exec/QueryTraceUtil.h rename to velox/exec/TraceUtil.h index 633c0bc27b2a..7b226844b759 100644 --- a/velox/exec/QueryTraceUtil.h +++ b/velox/exec/TraceUtil.h @@ -20,6 +20,7 @@ #include #include "velox/common/file/FileSystems.h" #include "velox/core/PlanNode.h" +#include "velox/exec/Task.h" #include "velox/type/Type.h" #include @@ -29,6 +30,52 @@ namespace facebook::velox::exec::trace { /// Creates a directory to store the query trace metdata and data. void createTraceDirectory(const std::string& traceDir); +/// Returns the trace directory for a given query. +std::string getQueryTraceDirectory( + const std::string& traceDir, + const std::string& queryId); + +/// Returns the trace directory for a given query task. +std::string getTaskTraceDirectory( + const std::string& traceDir, + const Task& task); + +std::string getTaskTraceDirectory( + const std::string& traceDir, + const std::string& queryId, + const std::string& taskId); + +/// Returns the file path for a given task's metadata trace file. +std::string getTaskTraceMetaFilePath(const std::string& taskTraceDir); + +/// Returns the trace directory for a given traced plan node. +std::string getNodeTraceDirectory( + const std::string& taskTraceDir, + const std::string& nodeId); + +/// Returns the trace directory for a given traced pipeline. +std::string getPipelineTraceDirectory( + const std::string& nodeTraceDir, + uint32_t pipelineId); + +/// Returns the trace directory for a given traced operator. +std::string getOpTraceDirectory( + const std::string& taskTraceDir, + const std::string& nodeId, + uint32_t pipelineId, + uint32_t driverId); + +std::string getOpTraceDirectory( + const std::string& nodeTraceDir, + int pipelineId, + int driverId); + +/// Returns the file path for a given operator's traced input file. +std::string getOpTraceInputFilePath(const std::string& opTraceDir); + +/// Returns the file path for a given operator's traced input file. +std::string getOpTraceSummaryFilePath(const std::string& opTraceDir); + /// Extracts the input data type for the trace scan operator. The function first /// uses the traced node id to find traced operator's plan node from the traced /// plan fragment. Then it uses the specified source node index to find the @@ -48,28 +95,32 @@ RowTypePtr getDataType( const std::string& tracedNodeId, size_t sourceIndex = 0); +/// Extracts the driver ids by listing the sub-directors under the trace +/// directory for a given pipeline and decode the sub-directory names to get +/// driver id. 'nodeTraceDir' is the trace directory of the plan node. +std::vector listDriverIds( + const std::string& nodeTraceDir, + uint32_t pipelineId, + const std::shared_ptr& fs); + /// Extracts the number of drivers by listing the number of sub-directors under -/// the trace directory for a given pipeline. -uint8_t getNumDrivers( - const std::string& rootDir, - const std::string& taskId, - const std::string& nodeId, - int32_t pipelineId, +/// the trace directory for a given pipeline. 'nodeTraceDir' is the trace +/// directory of the plan node. +size_t getNumDrivers( + const std::string& nodeTraceDir, + uint32_t pipelineId, const std::shared_ptr& fs); -/// Extracts task ids of the query tracing by listing the trace directory. +/// Extracts task ids of the query tracing by listing the query trace directory. +/// 'traceDir' is the root trace directory. 'queryId' is the query id. std::vector getTaskIds( const std::string& traceDir, + const std::string& queryId, const std::shared_ptr& fs); /// Gets the metadata from a given task metadata file which includes query plan, /// configs and connector properties. -folly::dynamic getMetadata( - const std::string& metadataFile, +folly::dynamic getTaskMetadata( + const std::string& taskMetaFilePath, const std::shared_ptr& fs); - -/// Gets the traced data directory. 'traceaDir' is the trace directory for a -/// given plan node, which is $traceRoot/$taskId/$nodeId. -std::string -getDataDir(const std::string& traceDir, int pipelineId, int driverId); } // namespace facebook::velox::exec::trace diff --git a/velox/exec/tests/CMakeLists.txt b/velox/exec/tests/CMakeLists.txt index 8904fdc20648..9b228e302463 100644 --- a/velox/exec/tests/CMakeLists.txt +++ b/velox/exec/tests/CMakeLists.txt @@ -60,14 +60,15 @@ add_executable( MultiFragmentTest.cpp NestedLoopJoinTest.cpp OrderByTest.cpp + OperatorTraceTest.cpp OutputBufferManagerTest.cpp PartitionedOutputTest.cpp PlanNodeSerdeTest.cpp PlanNodeToStringTest.cpp PrefixSortTest.cpp PrintPlanWithStatsTest.cpp - QueryTraceTest.cpp ProbeOperatorStateTest.cpp + TraceUtilTest.cpp RoundRobinPartitionFunctionTest.cpp RowContainerTest.cpp RowNumberTest.cpp diff --git a/velox/exec/tests/QueryTraceTest.cpp b/velox/exec/tests/OperatorTraceTest.cpp similarity index 58% rename from velox/exec/tests/QueryTraceTest.cpp rename to velox/exec/tests/OperatorTraceTest.cpp index 71b49da483a0..36d9eb467d5e 100644 --- a/velox/exec/tests/QueryTraceTest.cpp +++ b/velox/exec/tests/OperatorTraceTest.cpp @@ -16,26 +16,25 @@ #include #include -#include #include +#include "velox/common/base/tests/GTestUtils.h" #include "velox/common/file/FileSystems.h" +#include "velox/exec/OperatorTraceReader.h" #include "velox/exec/PartitionFunction.h" -#include "velox/exec/QueryDataReader.h" -#include "velox/exec/QueryDataWriter.h" -#include "velox/exec/QueryMetadataReader.h" -#include "velox/exec/QueryMetadataWriter.h" -#include "velox/exec/QueryTraceUtil.h" -#include "velox/exec/tests/utils/ArbitratorTestUtil.h" +#include "velox/exec/TaskTraceReader.h" +#include "velox/exec/Trace.h" +#include "velox/exec/TraceUtil.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" #include "velox/serializers/PrestoSerializer.h" -#include "velox/vector/tests/utils/VectorTestBase.h" using namespace facebook::velox::exec::test; namespace facebook::velox::exec::trace::test { -class QueryTracerTest : public HiveConnectorTestBase { +class OperatorTraceTest : public HiveConnectorTestBase { protected: static void SetUpTestCase() { memory::MemoryManager::testingSetInstance({}); @@ -55,6 +54,11 @@ class QueryTracerTest : public HiveConnectorTestBase { registerPartitionFunctionSerDe(); } + void SetUp() override { + HiveConnectorTestBase::SetUp(); + dataType_ = ROW({"a", "b", "c"}, {BIGINT(), BIGINT(), BIGINT()}); + } + static VectorFuzzer::Options getFuzzerOptions() { return VectorFuzzer::Options{ .vectorSize = 16, @@ -65,7 +69,7 @@ class QueryTracerTest : public HiveConnectorTestBase { }; } - QueryTracerTest() : vectorFuzzer_{getFuzzerOptions(), pool_.get()} { + OperatorTraceTest() : vectorFuzzer_{getFuzzerOptions(), pool_.get()} { filesystems::registerLocalFileSystem(); } @@ -99,32 +103,63 @@ class QueryTracerTest : public HiveConnectorTestBase { return true; } + std::unique_ptr driverCtx() { + return std::make_unique(nullptr, 0, 0, 0, 0); + } + + RowTypePtr dataType_; VectorFuzzer vectorFuzzer_; }; -TEST_F(QueryTracerTest, emptyTrace) { - const auto outputDir = TempDirectoryPath::create(); - auto writer = trace::QueryDataWriter( - outputDir->getPath(), pool(), [&](uint64_t bytes) { return false; }); - writer.finish(); - - const auto fs = filesystems::getFileSystem(outputDir->getPath(), nullptr); - const auto summaryFile = fs->openFileForRead(fmt::format( - "{}/{}", outputDir->getPath(), QueryTraceTraits::kDataSummaryFileName)); - const auto summary = summaryFile->pread(0, summaryFile->size()); - ASSERT_FALSE(summary.empty()); - folly::dynamic obj = folly::parseJson(summary); - ASSERT_EQ(obj[QueryTraceTraits::kTraceLimitExceededKey].asBool(), false); +TEST_F(OperatorTraceTest, emptyTrace) { + auto input = vectorFuzzer_.fuzzInputRow(dataType_); + input->childAt(0) = + makeFlatVector(input->size(), [](auto /*unused*/) { return 0; }); + createDuckDbTable({input}); + + std::string planNodeId; + auto traceDirPath = TempDirectoryPath::create(); + auto plan = PlanBuilder() + .values({input}) + .filter("a > 0") + .singleAggregation({"a"}, {"count(1)"}) + .capturePlanNodeId(planNodeId) + .planNode(); + + const auto task = + AssertQueryBuilder(duckDbQueryRunner_) + .plan(plan) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceDirPath->getPath()) + .config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config(core::QueryConfig::kQueryTraceNodeIds, planNodeId) + .assertResults("SELECT a, count(1) FROM tmp WHERE a > 0 GROUP BY 1"); + + const auto taskTraceDir = + getTaskTraceDirectory(traceDirPath->getPath(), *task); + const auto opTraceDir = getOpTraceDirectory( + taskTraceDir, + planNodeId, + /*pipelineId=*/0, + /*driverId=*/0); + const auto summary = OperatorTraceSummaryReader(opTraceDir, pool()).read(); + ASSERT_EQ(summary.inputRows, 0); + ASSERT_EQ(summary.opType, "Aggregation"); + // The hash aggregation operator might allocate memory when prepare output + // buffer even though there is no output. We could optimize out this later if + // needs. + ASSERT_GT(summary.peakMemory, 0); } -TEST_F(QueryTracerTest, traceData) { - const auto rowType = ROW({"a", "b", "c"}, {BIGINT(), BIGINT(), BIGINT()}); +TEST_F(OperatorTraceTest, traceData) { std::vector inputVectors; constexpr auto numBatch = 5; inputVectors.reserve(numBatch); for (auto i = 0; i < numBatch; ++i) { - inputVectors.push_back(vectorFuzzer_.fuzzInputFlatRow(rowType)); + inputVectors.push_back(vectorFuzzer_.fuzzInputFlatRow(dataType_)); } + createDuckDbTable(inputVectors); struct { uint64_t maxTracedBytes; @@ -143,37 +178,62 @@ TEST_F(QueryTracerTest, traceData) { for (const auto& testData : testSettings) { SCOPED_TRACE(testData.debugString()); - const auto outputDir = TempDirectoryPath::create(); - // Ensure the writer only write one batch. - uint64_t numTracedBytes{0}; - auto writer = trace::QueryDataWriter( - outputDir->getPath(), pool(), [&](uint64_t bytes) { - numTracedBytes += bytes; - return numTracedBytes >= testData.maxTracedBytes; - }); - for (auto i = 0; i < numBatch; ++i) { - writer.write(inputVectors[i]); - } - writer.finish(); - - const auto fs = filesystems::getFileSystem(outputDir->getPath(), nullptr); - const auto summaryFile = fs->openFileForRead(fmt::format( - "{}/{}", outputDir->getPath(), QueryTraceTraits::kDataSummaryFileName)); - const auto summary = summaryFile->pread(0, summaryFile->size()); - ASSERT_FALSE(summary.empty()); - folly::dynamic obj = folly::parseJson(summary); - ASSERT_EQ( - obj[QueryTraceTraits::kTraceLimitExceededKey].asBool(), - testData.limitExceeded); - if (testData.maxTracedBytes == 0) { - const auto dataFile = fs->openFileForRead(fmt::format( - "{}/{}", outputDir->getPath(), QueryTraceTraits::kDataFileName)); - ASSERT_EQ(dataFile->size(), 0); + std::string planNodeId; + auto traceDirPath = TempDirectoryPath::create(); + auto plan = PlanBuilder() + .values(inputVectors) + .singleAggregation({"a"}, {"count(1)"}) + .capturePlanNodeId(planNodeId) + .planNode(); + + if (testData.limitExceeded) { + VELOX_ASSERT_THROW( + AssertQueryBuilder(duckDbQueryRunner_) + .plan(plan) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config( + core::QueryConfig::kQueryTraceDir, traceDirPath->getPath()) + .config( + core::QueryConfig::kQueryTraceMaxBytes, + testData.maxTracedBytes) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config(core::QueryConfig::kQueryTraceNodeIds, planNodeId) + .assertResults("SELECT a, count(1) FROM tmp GROUP BY 1"), + "Query exceeded per-query local trace limit of"); continue; } - - const auto reader = QueryDataReader(outputDir->getPath(), rowType, pool()); + const auto task = + AssertQueryBuilder(duckDbQueryRunner_) + .plan(plan) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceDirPath->getPath()) + .config( + core::QueryConfig::kQueryTraceMaxBytes, testData.maxTracedBytes) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config(core::QueryConfig::kQueryTraceNodeIds, planNodeId) + .assertResults("SELECT a, count(1) FROM tmp GROUP BY 1"); + + const auto fs = + filesystems::getFileSystem(traceDirPath->getPath(), nullptr); + const auto taskTraceDir = + getTaskTraceDirectory(traceDirPath->getPath(), *task); + const auto opTraceDir = getOpTraceDirectory( + taskTraceDir, + planNodeId, + /*pipelineId=*/0, + /*driverId=*/0); + const auto summaryFilePath = getOpTraceSummaryFilePath(opTraceDir); + const auto dataFilePath = getOpTraceInputFilePath(opTraceDir); + ASSERT_TRUE(fs->exists(summaryFilePath)); + ASSERT_TRUE(fs->exists(dataFilePath)); + + const auto summary = OperatorTraceSummaryReader(opTraceDir, pool()).read(); + ASSERT_EQ(summary.opType, "Aggregation"); + ASSERT_GT(summary.peakMemory, 0); + ASSERT_EQ(summary.inputRows, testData.numTracedBatches * 16); + + const auto reader = OperatorTraceInputReader(opTraceDir, dataType_, pool()); RowVectorPtr actual; size_t numOutputVectors{0}; while (reader.read(actual)) { @@ -189,7 +249,7 @@ TEST_F(QueryTracerTest, traceData) { } } -TEST_F(QueryTracerTest, traceMetadata) { +TEST_F(OperatorTraceTest, traceMetadata) { const auto rowType = ROW({"c0", "c1", "c2", "c3", "c4", "c5"}, {BIGINT(), SMALLINT(), TINYINT(), VARCHAR(), VARCHAR(), VARCHAR()}); @@ -234,14 +294,14 @@ TEST_F(QueryTracerTest, traceMetadata) { executor_.get(), core::QueryConfig(expectedQueryConfigs), expectedConnectorProperties); - auto writer = trace::QueryMetadataWriter(outputDir->getPath(), pool()); + auto writer = trace::TaskTraceMetadataWriter(outputDir->getPath(), pool()); writer.write(queryCtx, planNode); std::unordered_map acutalQueryConfigs; std::unordered_map> actualConnectorProperties; core::PlanNodePtr actualQueryPlan; - auto reader = trace::QueryMetadataReader(outputDir->getPath(), pool()); + auto reader = trace::TaskTraceMetadataReader(outputDir->getPath(), pool()); reader.read(acutalQueryConfigs, actualConnectorProperties, actualQueryPlan); ASSERT_TRUE(isSamePlan(actualQueryPlan, planNode)); @@ -262,7 +322,7 @@ TEST_F(QueryTracerTest, traceMetadata) { } } -TEST_F(QueryTracerTest, task) { +TEST_F(OperatorTraceTest, task) { const auto rowType = ROW({"c0", "c1", "c2", "c3", "c4", "c5"}, {BIGINT(), SMALLINT(), TINYINT(), VARCHAR(), VARCHAR(), VARCHAR()}); @@ -310,6 +370,8 @@ TEST_F(QueryTracerTest, task) { {core::QueryConfig::kSpillEnabled, "true"}, {core::QueryConfig::kSpillNumPartitionBits, "17"}, {core::QueryConfig::kQueryTraceEnabled, "true"}, + {core::QueryConfig::kQueryTraceMaxBytes, + std::to_string(100UL << 30)}, {core::QueryConfig::kQueryTraceDir, outputDir->getPath()}, {core::QueryConfig::kQueryTraceTaskRegExp, testData.taskRegExpr}, {core::QueryConfig::kQueryTraceNodeIds, "1,2"}, @@ -345,7 +407,8 @@ TEST_F(QueryTracerTest, task) { } ASSERT_EQ(actaulDirs.size(), testData.expectedNumDirs); ASSERT_EQ(actaulDirs.at(0), expectedDir); - const auto taskIds = getTaskIds(outputDir->getPath(), fs); + const auto taskIds = + getTaskIds(outputDir->getPath(), task->queryCtx()->queryId(), fs); ASSERT_EQ(taskIds.size(), testData.expectedNumDirs); ASSERT_EQ(taskIds.at(0), task->taskId()); @@ -354,7 +417,7 @@ TEST_F(QueryTracerTest, task) { unordered_map> actualConnectorProperties; core::PlanNodePtr actualQueryPlan; - auto reader = trace::QueryMetadataReader(expectedDir, pool()); + auto reader = trace::TaskTraceMetadataReader(expectedDir, pool()); reader.read(acutalQueryConfigs, actualConnectorProperties, actualQueryPlan); ASSERT_TRUE(isSamePlan(actualQueryPlan, planNode)); @@ -377,76 +440,93 @@ TEST_F(QueryTracerTest, task) { } } -TEST_F(QueryTracerTest, error) { +TEST_F(OperatorTraceTest, error) { const auto planNode = PlanBuilder().values({}).planNode(); - const auto expectedQueryConfigs = - std::unordered_map{ - {core::QueryConfig::kSpillEnabled, "true"}, - {core::QueryConfig::kSpillNumPartitionBits, "17"}, - {core::QueryConfig::kQueryTraceEnabled, "true"}, - }; - const auto queryCtx = core::QueryCtx::create( - executor_.get(), core::QueryConfig(expectedQueryConfigs)); - VELOX_ASSERT_USER_THROW( - AssertQueryBuilder(planNode).queryCtx(queryCtx).maxDrivers(1).copyResults( - pool()), - "Query trace enabled but the trace dir is not set"); -} - -TEST_F(QueryTracerTest, traceDir) { - const auto outputDir = TempDirectoryPath::create(); - const auto rootDir = outputDir->getPath(); - const auto fs = filesystems::getFileSystem(rootDir, nullptr); - auto dir1 = fmt::format("{}/{}", outputDir->getPath(), "t1"); - trace::createTraceDirectory(dir1); - ASSERT_TRUE(fs->exists(dir1)); - - auto dir2 = fmt::format("{}/{}", dir1, "t1_1"); - trace::createTraceDirectory(dir2); - ASSERT_TRUE(fs->exists(dir2)); - - // It will remove the old dir1 along with its subdir when created the dir1 - // again. - trace::createTraceDirectory(dir1); - ASSERT_TRUE(fs->exists(dir1)); - ASSERT_FALSE(fs->exists(dir2)); - - const auto parentDir = fmt::format("{}/{}", outputDir->getPath(), "p"); - fs->mkdir(parentDir); - - constexpr auto numThreads = 5; - std::vector traceThreads; - traceThreads.reserve(numThreads); - std::mutex mutex; - std::set expectedDirs; - for (int i = 0; i < numThreads; ++i) { - traceThreads.emplace_back([&, i]() { - const auto dir = fmt::format("{}/s{}", parentDir, i); - trace::createTraceDirectory(dir); - std::lock_guard l(mutex); - expectedDirs.insert(dir); - }); + // No trace dir. + { + const auto queryConfigs = std::unordered_map{ + {core::QueryConfig::kQueryTraceEnabled, "true"}}; + const auto queryCtx = core::QueryCtx::create( + executor_.get(), core::QueryConfig(queryConfigs)); + VELOX_ASSERT_USER_THROW( + AssertQueryBuilder(planNode) + .queryCtx(queryCtx) + .maxDrivers(1) + .copyResults(pool()), + "Query trace enabled but the trace dir is not set"); } - - for (auto& traceThread : traceThreads) { - traceThread.join(); + // Duplicate trace plan node ids. + { + const auto queryConfigs = std::unordered_map{ + {core::QueryConfig::kQueryTraceEnabled, "true"}, + {core::QueryConfig::kQueryTraceDir, "traceDir"}, + }; + const auto queryCtx = core::QueryCtx::create( + executor_.get(), core::QueryConfig(queryConfigs)); + VELOX_ASSERT_USER_THROW( + AssertQueryBuilder(planNode) + .queryCtx(queryCtx) + .maxDrivers(1) + .copyResults(pool()), + "Query trace enabled but the trace task regexp is not set"); } - - const auto actualDirs = fs->list(parentDir); - ASSERT_EQ(actualDirs.size(), numThreads); - ASSERT_EQ(actualDirs.size(), expectedDirs.size()); - for (const auto& dir : actualDirs) { - ASSERT_EQ(expectedDirs.count(dir), 1); + // No trace plan node ids. + { + const auto queryConfigs = std::unordered_map{ + {core::QueryConfig::kQueryTraceEnabled, "true"}, + {core::QueryConfig::kQueryTraceDir, "traceDir"}, + {core::QueryConfig::kQueryTraceTaskRegExp, ".*"}}; + const auto queryCtx = core::QueryCtx::create( + executor_.get(), core::QueryConfig(queryConfigs)); + VELOX_ASSERT_USER_THROW( + AssertQueryBuilder(planNode) + .queryCtx(queryCtx) + .maxDrivers(1) + .copyResults(pool()), + "Query trace nodes are not set"); + } + // Duplicate trace plan node ids. + { + const auto queryConfigs = std::unordered_map{ + {core::QueryConfig::kQueryTraceEnabled, "true"}, + {core::QueryConfig::kQueryTraceDir, "traceDir"}, + {core::QueryConfig::kQueryTraceTaskRegExp, ".*"}, + {core::QueryConfig::kQueryTraceNodeIds, "1,1"}, + }; + const auto queryCtx = core::QueryCtx::create( + executor_.get(), core::QueryConfig(queryConfigs)); + VELOX_ASSERT_USER_THROW( + AssertQueryBuilder(planNode) + .queryCtx(queryCtx) + .maxDrivers(1) + .copyResults(pool()), + "Duplicate trace nodes found: 1, 1"); + } + // Nonexist trace plan node id. + { + const auto queryConfigs = std::unordered_map{ + {core::QueryConfig::kQueryTraceEnabled, "true"}, + {core::QueryConfig::kQueryTraceDir, "traceDir"}, + {core::QueryConfig::kQueryTraceTaskRegExp, ".*"}, + {core::QueryConfig::kQueryTraceNodeIds, "nonexist"}, + }; + const auto queryCtx = core::QueryCtx::create( + executor_.get(), core::QueryConfig(queryConfigs)); + VELOX_ASSERT_USER_THROW( + AssertQueryBuilder(planNode) + .queryCtx(queryCtx) + .maxDrivers(1) + .copyResults(pool()), + "Trace plan nodes not found from task"); } } -TEST_F(QueryTracerTest, traceTableWriter) { - const auto rowType = ROW({"a", "b", "c"}, {BIGINT(), BIGINT(), BIGINT()}); +TEST_F(OperatorTraceTest, traceTableWriter) { std::vector inputVectors; constexpr auto numBatch = 5; inputVectors.reserve(numBatch); for (auto i = 0; i < numBatch; ++i) { - inputVectors.push_back(vectorFuzzer_.fuzzInputFlatRow(rowType)); + inputVectors.push_back(vectorFuzzer_.fuzzInputFlatRow(dataType_)); } struct { @@ -465,7 +545,7 @@ TEST_F(QueryTracerTest, traceTableWriter) { } } testSettings[]{ {".*", 10UL << 30, numBatch, false}, - {".*", 0, numBatch, false}, + {".*", 0, numBatch, true}, {"wrong id", 10UL << 30, 0, false}, {"test_cursor \\d+", 10UL << 30, numBatch, false}, {"test_cursor \\d+", 800, 2, true}}; @@ -481,6 +561,23 @@ TEST_F(QueryTracerTest, traceTableWriter) { const auto traceRoot = fmt::format("{}/{}", testDir->getPath(), "traceRoot"); std::shared_ptr task; + if (testData.limitExceeded) { + VELOX_ASSERT_THROW( + AssertQueryBuilder(planNode) + .maxDrivers(1) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceRoot) + .config( + core::QueryConfig::kQueryTraceMaxBytes, + testData.maxTracedBytes) + .config( + core::QueryConfig::kQueryTraceTaskRegExp, + testData.taskRegExpr) + .config(core::QueryConfig::kQueryTraceNodeIds, "1") + .copyResults(pool(), task), + "Query exceeded per-query local trace limit of"); + continue; + } AssertQueryBuilder(planNode) .maxDrivers(1) .config(core::QueryConfig::kQueryTraceEnabled, true) @@ -490,8 +587,8 @@ TEST_F(QueryTracerTest, traceTableWriter) { .config(core::QueryConfig::kQueryTraceNodeIds, "1") .copyResults(pool(), task); - const auto metadataDir = fmt::format("{}/{}", traceRoot, task->taskId()); - const auto fs = filesystems::getFileSystem(metadataDir, nullptr); + const auto taskTraceDir = getTaskTraceDirectory(traceRoot, *task); + const auto fs = filesystems::getFileSystem(taskTraceDir, nullptr); if (testData.taskRegExpr == "wrong id") { ASSERT_FALSE(fs->exists(traceRoot)); @@ -499,34 +596,16 @@ TEST_F(QueryTracerTest, traceTableWriter) { } // Query metadta file should exist. - const auto traceMetaFile = fmt::format( - "{}/{}/{}", - traceRoot, - task->taskId(), - trace::QueryTraceTraits::kQueryMetaFileName); - ASSERT_TRUE(fs->exists(traceMetaFile)); - - const auto dataDir = - fmt::format("{}/{}/{}", traceRoot, task->taskId(), "1/0/0/data"); - - // Query data tracing disabled. - if (testData.maxTracedBytes == 0) { - ASSERT_FALSE(fs->exists(dataDir)); - continue; - } + const auto traceMetaFilePath = getTaskTraceMetaFilePath(taskTraceDir); + ASSERT_TRUE(fs->exists(traceMetaFilePath)); - ASSERT_EQ(fs->list(dataDir).size(), 2); - // Check data summaries. - const auto summaryFile = fs->openFileForRead( - fmt::format("{}/{}", dataDir, QueryTraceTraits::kDataSummaryFileName)); - const auto summary = summaryFile->pread(0, summaryFile->size()); - ASSERT_FALSE(summary.empty()); - folly::dynamic obj = folly::parseJson(summary); - ASSERT_EQ( - obj[QueryTraceTraits::kTraceLimitExceededKey].asBool(), - testData.limitExceeded); + const auto opTraceDir = getOpTraceDirectory(taskTraceDir, "1", 0, 0); + + ASSERT_EQ(fs->list(opTraceDir).size(), 2); - const auto reader = trace::QueryDataReader(dataDir, rowType, pool()); + const auto summary = OperatorTraceSummaryReader(opTraceDir, pool()).read(); + const auto reader = + trace::OperatorTraceInputReader(opTraceDir, dataType_, pool()); RowVectorPtr actual; size_t numOutputVectors{0}; while (reader.read(actual)) { diff --git a/velox/exec/tests/TraceUtilTest.cpp b/velox/exec/tests/TraceUtilTest.cpp new file mode 100644 index 000000000000..5dbbdae535cb --- /dev/null +++ b/velox/exec/tests/TraceUtilTest.cpp @@ -0,0 +1,186 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/file/FileSystems.h" +#include "velox/exec/Trace.h" +#include "velox/exec/TraceUtil.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" + +using namespace facebook::velox::exec::test; + +namespace facebook::velox::exec::trace::test { +class TraceUtilTest : public testing::Test { + protected: + static void SetUpTestCase() { + filesystems::registerLocalFileSystem(); + } +}; + +TEST_F(TraceUtilTest, traceDir) { + const auto outputDir = TempDirectoryPath::create(); + const auto rootDir = outputDir->getPath(); + const auto fs = filesystems::getFileSystem(rootDir, nullptr); + auto dir1 = fmt::format("{}/{}", outputDir->getPath(), "t1"); + trace::createTraceDirectory(dir1); + ASSERT_TRUE(fs->exists(dir1)); + + auto dir2 = fmt::format("{}/{}", dir1, "t1_1"); + trace::createTraceDirectory(dir2); + ASSERT_TRUE(fs->exists(dir2)); + + // It will remove the old dir1 along with its subdir when created the dir1 + // again. + trace::createTraceDirectory(dir1); + ASSERT_TRUE(fs->exists(dir1)); + ASSERT_FALSE(fs->exists(dir2)); + + const auto parentDir = fmt::format("{}/{}", outputDir->getPath(), "p"); + fs->mkdir(parentDir); + + constexpr auto numThreads = 5; + std::vector traceThreads; + traceThreads.reserve(numThreads); + std::mutex mutex; + std::set expectedDirs; + for (int i = 0; i < numThreads; ++i) { + traceThreads.emplace_back([&, i]() { + const auto dir = fmt::format("{}/s{}", parentDir, i); + trace::createTraceDirectory(dir); + std::lock_guard l(mutex); + expectedDirs.insert(dir); + }); + } + + for (auto& traceThread : traceThreads) { + traceThread.join(); + } + + const auto actualDirs = fs->list(parentDir); + ASSERT_EQ(actualDirs.size(), numThreads); + ASSERT_EQ(actualDirs.size(), expectedDirs.size()); + for (const auto& dir : actualDirs) { + ASSERT_EQ(expectedDirs.count(dir), 1); + } +} + +TEST_F(TraceUtilTest, OperatorTraceSummary) { + exec::trace::OperatorTraceSummary summary; + summary.opType = "summary"; + summary.inputRows = 100; + summary.peakMemory = 200; + ASSERT_EQ( + summary.toString(), "opType summary, inputRows 100, peakMemory 200B"); +} + +TEST_F(TraceUtilTest, traceDirectoryLayoutUtilities) { + const std::string traceRoot = "/traceRoot"; + const std::string queryId = "queryId"; + ASSERT_EQ( + getQueryTraceDirectory(traceRoot, queryId), + fmt::format("{}/{}", traceRoot, queryId)); + const std::string taskId = "taskId"; + const std::string taskTraceDir = + getTaskTraceDirectory(traceRoot, queryId, taskId); + ASSERT_EQ(taskTraceDir, fmt::format("{}/{}/{}", traceRoot, queryId, taskId)); + ASSERT_EQ( + getTaskTraceMetaFilePath( + getTaskTraceDirectory(traceRoot, queryId, taskId)), + "/traceRoot/queryId/taskId/task_trace_meta.json"); + const std::string nodeId = "1"; + const std::string nodeTraceDir = getNodeTraceDirectory(taskTraceDir, nodeId); + ASSERT_EQ(nodeTraceDir, "/traceRoot/queryId/taskId/1"); + const uint32_t pipelineId = 1; + ASSERT_EQ( + getPipelineTraceDirectory(nodeTraceDir, pipelineId), + "/traceRoot/queryId/taskId/1/1"); + const uint32_t driverId = 1; + const std::string opTraceDir = + getOpTraceDirectory(taskTraceDir, nodeId, pipelineId, driverId); + ASSERT_EQ(opTraceDir, "/traceRoot/queryId/taskId/1/1/1"); + ASSERT_EQ( + getOpTraceDirectory(nodeTraceDir, pipelineId, driverId), + "/traceRoot/queryId/taskId/1/1/1"); + ASSERT_EQ( + getOpTraceInputFilePath(opTraceDir), + "/traceRoot/queryId/taskId/1/1/1/op_input_trace.data"); + ASSERT_EQ( + getOpTraceSummaryFilePath(opTraceDir), + "/traceRoot/queryId/taskId/1/1/1/op_trace_summary.json"); +} + +TEST_F(TraceUtilTest, getTaskIds) { + const auto rootDir = TempDirectoryPath::create(); + const auto rootPath = rootDir->getPath(); + const auto fs = filesystems::getFileSystem(rootPath, nullptr); + const std::string queryId = "queryId"; + fs->mkdir(trace::getQueryTraceDirectory(rootPath, queryId)); + ASSERT_TRUE(getTaskIds(rootPath, queryId, fs).empty()); + const std::string taskId1 = "task1"; + fs->mkdir(trace::getTaskTraceDirectory(rootPath, queryId, taskId1)); + const std::string taskId2 = "task2"; + fs->mkdir(trace::getTaskTraceDirectory(rootPath, queryId, taskId2)); + auto taskIds = getTaskIds(rootPath, queryId, fs); + ASSERT_EQ(taskIds.size(), 2); + std::set taskIdSet({taskId1, taskId2}); + ASSERT_EQ(*taskIds.begin(), taskId1); + ASSERT_EQ(*taskIds.rbegin(), taskId2); +} + +TEST_F(TraceUtilTest, getDriverIds) { + const auto rootDir = TempDirectoryPath::create(); + const auto rootPath = rootDir->getPath(); + const auto fs = filesystems::getFileSystem(rootPath, nullptr); + const std::string queryId = "queryId"; + fs->mkdir(trace::getQueryTraceDirectory(rootPath, queryId)); + ASSERT_TRUE(getTaskIds(rootPath, queryId, fs).empty()); + const std::string taskId = "task"; + const std::string taskTraceDir = + trace::getTaskTraceDirectory(rootPath, queryId, taskId); + fs->mkdir(taskTraceDir); + const std::string nodeId = "node"; + const std::string nodeTraceDir = + trace::getNodeTraceDirectory(taskTraceDir, nodeId); + fs->mkdir(nodeTraceDir); + const uint32_t pipelineId = 1; + fs->mkdir(trace::getPipelineTraceDirectory(nodeTraceDir, pipelineId)); + ASSERT_EQ(getNumDrivers(nodeTraceDir, pipelineId, fs), 0); + ASSERT_TRUE(listDriverIds(nodeTraceDir, pipelineId, fs).empty()); + // create 3 drivers. + const uint32_t driverId1 = 1; + fs->mkdir(trace::getOpTraceDirectory(nodeTraceDir, pipelineId, driverId1)); + const uint32_t driverId2 = 2; + fs->mkdir(trace::getOpTraceDirectory(nodeTraceDir, pipelineId, driverId2)); + const uint32_t driverId3 = 3; + fs->mkdir(trace::getOpTraceDirectory(nodeTraceDir, pipelineId, driverId3)); + ASSERT_EQ(getNumDrivers(nodeTraceDir, pipelineId, fs), 3); + auto driverIds = listDriverIds(nodeTraceDir, pipelineId, fs); + ASSERT_EQ(driverIds.size(), 3); + std::sort(driverIds.begin(), driverIds.end()); + ASSERT_EQ(driverIds[0], driverId1); + ASSERT_EQ(driverIds[1], driverId2); + ASSERT_EQ(driverIds[2], driverId3); + // Bad driver id. + const std::string BadDriverId = "badDriverId"; + fs->mkdir(fmt::format("{}/{}/{}", nodeTraceDir, pipelineId, BadDriverId)); + ASSERT_ANY_THROW(getNumDrivers(nodeTraceDir, pipelineId, fs)); + ASSERT_ANY_THROW(listDriverIds(nodeTraceDir, pipelineId, fs)); +} +} // namespace facebook::velox::exec::trace::test diff --git a/velox/exec/tests/utils/OperatorTestBase.h b/velox/exec/tests/utils/OperatorTestBase.h index 4e5def101cfa..079bfc2ff6b0 100644 --- a/velox/exec/tests/utils/OperatorTestBase.h +++ b/velox/exec/tests/utils/OperatorTestBase.h @@ -24,8 +24,6 @@ #include "velox/exec/HashProbe.h" #include "velox/exec/tests/utils/QueryAssertions.h" #include "velox/parse/ExpressionsParser.h" -#include "velox/type/Variant.h" -#include "velox/vector/FlatVector.h" #include "velox/vector/tests/utils/VectorMaker.h" #include "velox/vector/tests/utils/VectorTestBase.h" diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index 57d12d57bd13..5deed1140f4a 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -222,9 +222,10 @@ PlanBuilder& PlanBuilder::values( PlanBuilder& PlanBuilder::traceScan( const std::string& traceNodeDir, + uint32_t pipelineId, const RowTypePtr& outputType) { - planNode_ = std::make_shared( - nextPlanNodeId(), traceNodeDir, outputType); + planNode_ = std::make_shared( + nextPlanNodeId(), traceNodeDir, pipelineId, outputType); return *this; } diff --git a/velox/exec/tests/utils/PlanBuilder.h b/velox/exec/tests/utils/PlanBuilder.h index 341c25b06767..5af2e9f91ff7 100644 --- a/velox/exec/tests/utils/PlanBuilder.h +++ b/velox/exec/tests/utils/PlanBuilder.h @@ -322,9 +322,12 @@ class PlanBuilder { /// Adds a QueryReplayNode for query tracing. /// /// @param traceNodeDir The trace directory for a given plan node. + /// @param pipelineId The pipeline id for the traced operator instantiated + /// from the given plan node. /// @param outputType The type of the tracing data. PlanBuilder& traceScan( const std::string& traceNodeDir, + uint32_t pipelineId, const RowTypePtr& outputType); /// Add an ExchangeNode. diff --git a/velox/tool/trace/AggregationReplayer.cpp b/velox/tool/trace/AggregationReplayer.cpp index 54041bf8bbcb..1c397461a8a5 100644 --- a/velox/tool/trace/AggregationReplayer.cpp +++ b/velox/tool/trace/AggregationReplayer.cpp @@ -15,7 +15,7 @@ */ #include "velox/tool/trace/AggregationReplayer.h" -#include "velox/exec/QueryDataReader.h" +#include "velox/exec/OperatorTraceReader.h" #include "velox/exec/tests/utils/PlanBuilder.h" using namespace facebook::velox; diff --git a/velox/tool/trace/AggregationReplayer.h b/velox/tool/trace/AggregationReplayer.h index 21ce2c409acf..9a14688a694a 100644 --- a/velox/tool/trace/AggregationReplayer.h +++ b/velox/tool/trace/AggregationReplayer.h @@ -24,13 +24,15 @@ namespace facebook::velox::tool::trace { class AggregationReplayer : public OperatorReplayerBase { public: AggregationReplayer( - const std::string& rootDir, + const std::string& traceDir, + const std::string& queryId, const std::string& taskId, const std::string& nodeId, const int32_t pipelineId, const std::string& operatorType) : OperatorReplayerBase( - rootDir, + traceDir, + queryId, taskId, nodeId, pipelineId, diff --git a/velox/tool/trace/CMakeLists.txt b/velox/tool/trace/CMakeLists.txt index 3218b4ee30b0..8c68c9967db3 100644 --- a/velox/tool/trace/CMakeLists.txt +++ b/velox/tool/trace/CMakeLists.txt @@ -33,7 +33,7 @@ velox_link_libraries( glog::glog gflags::gflags) -add_executable(velox_query_replayer QueryReplayer.cpp) +add_executable(velox_query_replayer TraceReplayerMain.cpp) target_link_libraries( velox_query_replayer diff --git a/velox/tool/trace/OperatorReplayerBase.cpp b/velox/tool/trace/OperatorReplayerBase.cpp index b54ad323a447..2083e676f678 100644 --- a/velox/tool/trace/OperatorReplayerBase.cpp +++ b/velox/tool/trace/OperatorReplayerBase.cpp @@ -16,11 +16,9 @@ #include -#include "velox/common/serialization/Serializable.h" #include "velox/core/PlanNode.h" -#include "velox/exec/QueryMetadataReader.h" -#include "velox/exec/QueryTraceTraits.h" -#include "velox/exec/QueryTraceUtil.h" +#include "velox/exec/TaskTraceReader.h" +#include "velox/exec/TraceUtil.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/tool/trace/OperatorReplayerBase.h" @@ -29,30 +27,31 @@ using namespace facebook::velox; namespace facebook::velox::tool::trace { OperatorReplayerBase::OperatorReplayerBase( - std::string rootDir, + std::string traceDir, + std::string queryId, std::string taskId, std::string nodeId, int32_t pipelineId, std::string operatorType) - : taskId_(std::move(taskId)), + : queryId_(std::string(queryId)), + taskId_(std::move(taskId)), nodeId_(std::move(nodeId)), pipelineId_(pipelineId), operatorType_(std::move(operatorType)), - rootDir_(std::move(rootDir)), - taskDir_(fmt::format("{}/{}", rootDir_, taskId_)), - nodeDir_(fmt::format("{}/{}", taskDir_, nodeId_)) { - VELOX_USER_CHECK(!rootDir_.empty()); + taskTraceDir_( + exec::trace::getTaskTraceDirectory(traceDir, queryId_, taskId_)), + nodeTraceDir_(exec::trace::getNodeTraceDirectory(taskTraceDir_, nodeId_)), + fs_(filesystems::getFileSystem(taskTraceDir_, nullptr)), + maxDrivers_(exec::trace::getNumDrivers(nodeTraceDir_, pipelineId_, fs_)) { + VELOX_USER_CHECK(!taskTraceDir_.empty()); VELOX_USER_CHECK(!taskId_.empty()); VELOX_USER_CHECK(!nodeId_.empty()); VELOX_USER_CHECK_GE(pipelineId_, 0); VELOX_USER_CHECK(!operatorType_.empty()); - const auto metadataReader = exec::trace::QueryMetadataReader( - taskDir_, memory::MemoryManager::getInstance()->tracePool()); - metadataReader.read(queryConfigs_, connectorConfigs_, planFragment_); + const auto taskMetaReader = exec::trace::TaskTraceMetadataReader( + taskTraceDir_, memory::MemoryManager::getInstance()->tracePool()); + taskMetaReader.read(queryConfigs_, connectorConfigs_, planFragment_); queryConfigs_[core::QueryConfig::kQueryTraceEnabled] = "false"; - fs_ = filesystems::getFileSystem(rootDir_, nullptr); - maxDrivers_ = - exec::trace::getNumDrivers(rootDir_, taskId_, nodeId_, pipelineId_, fs_); } RowVectorPtr OperatorReplayerBase::run() { @@ -69,7 +68,10 @@ core::PlanNodePtr OperatorReplayerBase::createPlan() const { planFragment_.get(), [this](const core::PlanNode* node) { return node->id() == nodeId_; }); return exec::test::PlanBuilder() - .traceScan(nodeDir_, exec::trace::getDataType(planFragment_, nodeId_)) + .traceScan( + nodeTraceDir_, + pipelineId_, + exec::trace::getDataType(planFragment_, nodeId_)) .addNode(replayNodeFactory(replayNode)) .planNode(); } diff --git a/velox/tool/trace/OperatorReplayerBase.h b/velox/tool/trace/OperatorReplayerBase.h index 863a8f5f80f6..50e31fdaf02f 100644 --- a/velox/tool/trace/OperatorReplayerBase.h +++ b/velox/tool/trace/OperatorReplayerBase.h @@ -27,7 +27,8 @@ namespace facebook::velox::tool::trace { class OperatorReplayerBase { public: OperatorReplayerBase( - std::string rootDir, + std::string traceDir, + std::string queryId, std::string taskId, std::string nodeId, int32_t pipelineId, @@ -50,21 +51,21 @@ class OperatorReplayerBase { core::PlanNodePtr createPlan() const; + const std::string queryId_; const std::string taskId_; const std::string nodeId_; const int32_t pipelineId_; const std::string operatorType_; - const std::string rootDir_; - const std::string taskDir_; - const std::string nodeDir_; + const std::string taskTraceDir_; + const std::string nodeTraceDir_; + const std::shared_ptr fs_; + const int32_t maxDrivers_; std::unordered_map queryConfigs_; std::unordered_map> connectorConfigs_; core::PlanNodePtr planFragment_; - std::shared_ptr fs_; - int32_t maxDrivers_{1}; private: std::function diff --git a/velox/tool/trace/PartitionedOutputReplayer.cpp b/velox/tool/trace/PartitionedOutputReplayer.cpp index 79874caefc93..56b4c0a145eb 100644 --- a/velox/tool/trace/PartitionedOutputReplayer.cpp +++ b/velox/tool/trace/PartitionedOutputReplayer.cpp @@ -18,7 +18,7 @@ #include "velox/common/memory/Memory.h" #include "velox/exec/PartitionedOutput.h" -#include "velox/exec/QueryTraceUtil.h" +#include "velox/exec/TraceUtil.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/tool/trace/PartitionedOutputReplayer.h" @@ -105,13 +105,20 @@ void consumeAllData( } PartitionedOutputReplayer::PartitionedOutputReplayer( - const std::string& rootDir, + const std::string& traceDir, + const std::string& queryId, const std::string& taskId, const std::string& nodeId, const int32_t pipelineId, const std::string& operatorType, const ConsumerCallBack& consumerCb) - : OperatorReplayerBase(rootDir, taskId, nodeId, pipelineId, operatorType), + : OperatorReplayerBase( + traceDir, + queryId, + taskId, + nodeId, + pipelineId, + operatorType), originalNode_(dynamic_cast( core::PlanNode::findFirstNode( planFragment_.get(), diff --git a/velox/tool/trace/PartitionedOutputReplayer.h b/velox/tool/trace/PartitionedOutputReplayer.h index 61610aa031cc..a90c222dbf65 100644 --- a/velox/tool/trace/PartitionedOutputReplayer.h +++ b/velox/tool/trace/PartitionedOutputReplayer.h @@ -41,7 +41,8 @@ class PartitionedOutputReplayer final : public OperatorReplayerBase { std::function)>; PartitionedOutputReplayer( - const std::string& rootDir, + const std::string& traceDir, + const std::string& queryId, const std::string& taskId, const std::string& nodeId, const int32_t pipelineId, diff --git a/velox/tool/trace/TableWriterReplayer.cpp b/velox/tool/trace/TableWriterReplayer.cpp index 3543011aed9f..405071da433e 100644 --- a/velox/tool/trace/TableWriterReplayer.cpp +++ b/velox/tool/trace/TableWriterReplayer.cpp @@ -16,8 +16,8 @@ #include -#include "velox/exec/QueryTraceUtil.h" #include "velox/exec/TableWriter.h" +#include "velox/exec/TraceUtil.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/tool/trace/TableWriterReplayer.h" diff --git a/velox/tool/trace/TableWriterReplayer.h b/velox/tool/trace/TableWriterReplayer.h index c0c675ba6333..8684be91478d 100644 --- a/velox/tool/trace/TableWriterReplayer.h +++ b/velox/tool/trace/TableWriterReplayer.h @@ -27,13 +27,20 @@ namespace facebook::velox::tool::trace { class TableWriterReplayer final : public OperatorReplayerBase { public: TableWriterReplayer( - const std::string& rootDir, + const std::string& traceDir, + const std::string& queryId, const std::string& taskId, const std::string& nodeId, const int32_t pipelineId, const std::string& operatorType, const std::string& replayOutputDir) - : OperatorReplayerBase(rootDir, taskId, nodeId, pipelineId, operatorType), + : OperatorReplayerBase( + traceDir, + queryId, + taskId, + nodeId, + pipelineId, + operatorType), replayOutputDir_(replayOutputDir) { VELOX_CHECK(!replayOutputDir_.empty()); } diff --git a/velox/tool/trace/QueryReplayer.cpp b/velox/tool/trace/TraceReplayRunner.cpp similarity index 57% rename from velox/tool/trace/QueryReplayer.cpp rename to velox/tool/trace/TraceReplayRunner.cpp index a0680d30a387..b2cd0d4a0be3 100644 --- a/velox/tool/trace/QueryReplayer.cpp +++ b/velox/tool/trace/TraceReplayRunner.cpp @@ -14,7 +14,8 @@ * limitations under the License. */ -#include +#include "velox/tool/trace/TraceReplayRunner.h" + #include #include "velox/common/file/FileSystems.h" @@ -32,12 +33,12 @@ #include "velox/dwio/dwrf/RegisterDwrfWriter.h" #include "velox/dwio/parquet/RegisterParquetReader.h" #include "velox/dwio/parquet/RegisterParquetWriter.h" +#include "velox/exec/OperatorTraceReader.h" #include "velox/exec/PartitionFunction.h" -#include "velox/exec/QueryTraceUtil.h" -#include "velox/expression/Expr.h" +#include "velox/exec/TaskTraceReader.h" +#include "velox/exec/TraceUtil.h" #include "velox/functions/prestosql/aggregates/RegisterAggregateFunctions.h" #include "velox/functions/prestosql/registration/RegistrationFunctions.h" -#include "velox/parse/ExpressionsParser.h" #include "velox/parse/TypeResolver.h" #include "velox/tool/trace/AggregationReplayer.h" #include "velox/tool/trace/OperatorReplayerBase.h" @@ -57,6 +58,7 @@ DEFINE_bool( "It also print the query metadata including query configs, connectors " "properties, and query plan in JSON format."); DEFINE_bool(short_summary, false, "Only show number of tasks and task ids"); +DEFINE_string(query_id, "", "Specify the target query id which must be set"); DEFINE_string( task_id, "", @@ -74,61 +76,18 @@ DEFINE_double( 2.0, "Hardware multipler for hive connector."); -using namespace facebook::velox; - +namespace facebook::velox::tool::trace { namespace { -void init() { - memory::initializeMemoryManager({}); - filesystems::registerLocalFileSystem(); - filesystems::registerS3FileSystem(); - filesystems::registerHdfsFileSystem(); - filesystems::registerGCSFileSystem(); - filesystems::abfs::registerAbfsFileSystem(); - - dwio::common::registerFileSinks(); - dwrf::registerDwrfReaderFactory(); - dwrf::registerDwrfWriterFactory(); - parquet::registerParquetReaderFactory(); - parquet::registerParquetWriterFactory(); - - core::PlanNode::registerSerDe(); - core::ITypedExpr::registerSerDe(); - common::Filter::registerSerDe(); - Type::registerSerDe(); - exec::registerPartitionFunctionSerDe(); - if (!isRegisteredVectorSerde()) { - serializer::presto::PrestoVectorSerde::registerVectorSerde(); - } - connector::hive::HiveTableHandle::registerSerDe(); - connector::hive::LocationHandle::registerSerDe(); - connector::hive::HiveColumnHandle::registerSerDe(); - connector::hive::HiveInsertTableHandle::registerSerDe(); - connector::hive::HiveConnectorSplit::registerSerDe(); - - functions::prestosql::registerAllScalarFunctions(); - aggregate::prestosql::registerAllAggregateFunctions(); - parse::registerTypeResolver(); - - // TODO: make it configurable. - const auto ioExecutor = std::make_unique( - std::thread::hardware_concurrency() * - FLAGS_hiveConnectorExecutorHwMultiplier); - connector::registerConnectorFactory( - std::make_shared()); - const auto hiveConnector = - connector::getConnectorFactory("hive")->newConnector( - "test-hive", - std::make_shared( - std::unordered_map()), - ioExecutor.get()); - connector::registerConnector(hiveConnector); -} std::unique_ptr createReplayer() { std::unique_ptr replayer; if (FLAGS_operator_type == "TableWriter") { + VELOX_USER_CHECK( + !FLAGS_table_writer_output_dir.empty(), + "--table_writer_output_dir is required"); replayer = std::make_unique( FLAGS_root_dir, + FLAGS_query_id, FLAGS_task_id, FLAGS_node_id, FLAGS_pipeline_id, @@ -137,6 +96,7 @@ std::unique_ptr createReplayer() { } else if (FLAGS_operator_type == "Aggregation") { replayer = std::make_unique( FLAGS_root_dir, + FLAGS_query_id, FLAGS_task_id, FLAGS_node_id, FLAGS_pipeline_id, @@ -144,6 +104,7 @@ std::unique_ptr createReplayer() { } else if (FLAGS_operator_type == "PartitionedOutput") { replayer = std::make_unique( FLAGS_root_dir, + FLAGS_query_id, FLAGS_task_id, FLAGS_node_id, FLAGS_pipeline_id, @@ -155,76 +116,175 @@ std::unique_ptr createReplayer() { return replayer; } +void printTaskMetadata( + const std::string& taskTraceDir, + memory::MemoryPool* pool, + std::ostringstream& oss) { + auto taskMetaReader = std::make_unique( + taskTraceDir, pool); + std::unordered_map queryConfigs; + std::unordered_map> + connectorProperties; + core::PlanNodePtr queryPlan; + taskMetaReader->read(queryConfigs, connectorProperties, queryPlan); + + oss << "\n++++++Query configs++++++\n"; + for (const auto& queryConfigEntry : queryConfigs) { + oss << "\t" << queryConfigEntry.first << ": " << queryConfigEntry.second + << "\n"; + } + oss << "\n++++++Connector configs++++++\n"; + for (const auto& connectorPropertyEntry : connectorProperties) { + oss << connectorPropertyEntry.first << "\n"; + for (const auto& propertyEntry : connectorPropertyEntry.second) { + oss << "\t" << propertyEntry.first << ": " << propertyEntry.second + << "\n"; + } + } + oss << "\n++++++Task query plan++++++\n"; + oss << queryPlan->toString(true, true); +} + +void printTaskTraceSummary( + const std::string& traceDir, + const std::string& queryId, + const std::string& taskId, + const std::string& nodeId, + uint32_t pipelineId, + memory::MemoryPool* pool, + std::ostringstream& oss) { + auto fs = filesystems::getFileSystem(traceDir, nullptr); + const auto taskTraceDir = + exec::trace::getTaskTraceDirectory(traceDir, queryId, taskId); + + const std::vector driverIds = exec::trace::listDriverIds( + exec::trace::getNodeTraceDirectory(taskTraceDir, nodeId), pipelineId, fs); + oss << "\n++++++Task " << taskId << "++++++\n"; + for (const auto& driverId : driverIds) { + const auto opTraceDir = exec::trace::getOpTraceDirectory( + taskTraceDir, nodeId, pipelineId, driverId); + const auto opTraceSummary = + exec::trace::OperatorTraceSummaryReader( + exec::trace::getOpTraceDirectory( + taskTraceDir, nodeId, pipelineId, driverId), + pool) + .read(); + oss << driverId << " driver, " << opTraceSummary.toString() << "\n"; + } +} + void printSummary( const std::string& rootDir, + const std::string& queryId, const std::string& taskId, - bool shortSummary) { - const auto fs = filesystems::getFileSystem(rootDir, nullptr); - const auto taskIds = exec::trace::getTaskIds(rootDir, fs); - if (taskIds.empty()) { - LOG(ERROR) << "No traced query task under " << rootDir; - return; - } + bool shortSummary, + memory::MemoryPool* pool) { + const std::string queryDir = + exec::trace::getQueryTraceDirectory(rootDir, queryId); + const auto fs = filesystems::getFileSystem(queryDir, nullptr); + const auto taskIds = exec::trace::getTaskIds(rootDir, queryId, fs); + VELOX_USER_CHECK(!taskIds.empty(), "No task found under {}", rootDir); std::ostringstream summary; summary << "\n++++++Query trace summary++++++\n"; summary << "Number of tasks: " << taskIds.size() << "\n"; - summary << "Task ids: " << folly::join(",", taskIds); - if (shortSummary) { + summary << "Task ids: " << folly::join("\n", taskIds); LOG(INFO) << summary.str(); return; } const auto summaryTaskIds = taskId.empty() ? taskIds : std::vector{taskId}; + printTaskMetadata( + exec::trace::getTaskTraceDirectory(rootDir, queryId, summaryTaskIds[0]), + pool, + summary); + summary << "\n++++++Task Summaries++++++\n"; for (const auto& taskId : summaryTaskIds) { - summary << "\n++++++Query configs and plan of task " << taskId - << ":++++++\n"; - const auto traceTaskDir = fmt::format("{}/{}", rootDir, taskId); - const auto queryMetaFile = fmt::format( - "{}/{}", - traceTaskDir, - exec::trace::QueryTraceTraits::kQueryMetaFileName); - const auto metaObj = exec::trace::getMetadata(queryMetaFile, fs); - const auto& configObj = - metaObj[exec::trace::QueryTraceTraits::kQueryConfigKey]; - summary << "++++++Query configs++++++\n"; - summary << folly::toJson(configObj) << "\n"; - summary << "++++++Query plan++++++\n"; - const auto queryPlan = ISerializable::deserialize( - metaObj[exec::trace::QueryTraceTraits::kPlanNodeKey], - memory::MemoryManager::getInstance()->tracePool()); - summary << queryPlan->toString(true, true); + printTaskTraceSummary( + rootDir, + queryId, + taskId, + FLAGS_node_id, + FLAGS_pipeline_id, + pool, + summary); } LOG(INFO) << summary.str(); } } // namespace -int main(int argc, char** argv) { - if (argc == 1) { - gflags::ShowUsageWithFlags(argv[0]); - return -1; - } +TraceReplayRunner::TraceReplayRunner() + : ioExecutor_(std::make_unique( + std::thread::hardware_concurrency() * + FLAGS_hiveConnectorExecutorHwMultiplier, + std::make_shared( + "TraceReplayIoConnector"))) {} + +void TraceReplayRunner::init() { + memory::initializeMemoryManager({}); + filesystems::registerLocalFileSystem(); + filesystems::registerS3FileSystem(); + filesystems::registerHdfsFileSystem(); + filesystems::registerGCSFileSystem(); + filesystems::abfs::registerAbfsFileSystem(); + + dwio::common::registerFileSinks(); + dwrf::registerDwrfReaderFactory(); + dwrf::registerDwrfWriterFactory(); + parquet::registerParquetReaderFactory(); + parquet::registerParquetWriterFactory(); - gflags::ParseCommandLineFlags(&argc, &argv, true); - if (FLAGS_root_dir.empty()) { - gflags::SetUsageMessage("--root_dir must be provided."); - gflags::ShowUsageWithFlags(argv[0]); - return -1; + core::PlanNode::registerSerDe(); + core::ITypedExpr::registerSerDe(); + common::Filter::registerSerDe(); + Type::registerSerDe(); + exec::registerPartitionFunctionSerDe(); + if (!isRegisteredVectorSerde()) { + serializer::presto::PrestoVectorSerde::registerVectorSerde(); } + connector::hive::HiveTableHandle::registerSerDe(); + connector::hive::LocationHandle::registerSerDe(); + connector::hive::HiveColumnHandle::registerSerDe(); + connector::hive::HiveInsertTableHandle::registerSerDe(); + connector::hive::HiveConnectorSplit::registerSerDe(); + connector::hive::registerHivePartitionFunctionSerDe(); + connector::hive::HiveBucketProperty::registerSerDe(); - try { - init(); - if (FLAGS_summary || FLAGS_short_summary) { - printSummary(FLAGS_root_dir, FLAGS_task_id, FLAGS_short_summary); - return 0; - } - createReplayer()->run(); - } catch (const VeloxException& e) { - LOG(ERROR) << e.what(); - return -1; + functions::prestosql::registerAllScalarFunctions(); + aggregate::prestosql::registerAllAggregateFunctions(); + parse::registerTypeResolver(); + + connector::registerConnectorFactory( + std::make_shared()); + const auto hiveConnector = + connector::getConnectorFactory("hive")->newConnector( + "test-hive", + std::make_shared( + std::unordered_map()), + ioExecutor_.get()); + connector::registerConnector(hiveConnector); +} + +void TraceReplayRunner::run() { + VELOX_USER_CHECK(!FLAGS_root_dir.empty(), "--root_dir must be provided"); + VELOX_USER_CHECK(!FLAGS_query_id.empty(), "--query_id must be provided"); + VELOX_USER_CHECK(!FLAGS_node_id.empty(), "--node_id must be provided"); + + if (FLAGS_summary || FLAGS_short_summary) { + auto pool = memory::memoryManager()->addLeafPool("replayer"); + printSummary( + FLAGS_root_dir, + FLAGS_query_id, + FLAGS_task_id, + FLAGS_short_summary, + pool.get()); + return; } - return 0; + VELOX_USER_CHECK( + !FLAGS_operator_type.empty(), "--operator_type must be provided"); + createReplayer()->run(); } +} // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/TraceReplayRunner.h b/velox/tool/trace/TraceReplayRunner.h new file mode 100644 index 000000000000..f1f1d7484f62 --- /dev/null +++ b/velox/tool/trace/TraceReplayRunner.h @@ -0,0 +1,54 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include + +DECLARE_string(root_dir); +DECLARE_bool(summary); +DECLARE_bool(short_summary); +DECLARE_string(query_id); +DECLARE_string(task_id); +DECLARE_string(node_id); +DECLARE_int32(pipeline_id); +DECLARE_string(operator_type); +DECLARE_string(table_writer_output_dir); +DECLARE_double(hiveConnectorExecutorHwMultiplier); + +namespace facebook::velox::tool::trace { + +/// The trace replay runner. It is configured through a set of gflags passed +/// from replayer tool command line. +class TraceReplayRunner { + public: + TraceReplayRunner(); + virtual ~TraceReplayRunner() = default; + + /// Initializes the trace replay runner by setting the velox runtime + /// environment for the trace replay. It is invoked before run(). + virtual void init(); + + /// Runs the trace replay with a set of gflags passed from replayer tool. + virtual void run(); + + private: + const std::unique_ptr ioExecutor_; +}; + +} // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/TraceReplayerMain.cpp b/velox/tool/trace/TraceReplayerMain.cpp new file mode 100644 index 000000000000..cfca543a04fa --- /dev/null +++ b/velox/tool/trace/TraceReplayerMain.cpp @@ -0,0 +1,28 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/tool/trace/TraceReplayRunner.h" + +#include + +int main(int argc, char** argv) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + + facebook::velox::tool::trace::TraceReplayRunner runner; + runner.init(); + runner.run(); + return 0; +} diff --git a/velox/tool/trace/tests/AggregationReplayerTest.cpp b/velox/tool/trace/tests/AggregationReplayerTest.cpp index 8d3b15abde62..55a71f739eec 100644 --- a/velox/tool/trace/tests/AggregationReplayerTest.cpp +++ b/velox/tool/trace/tests/AggregationReplayerTest.cpp @@ -26,10 +26,10 @@ #include "velox/common/hyperloglog/SparseHll.h" #include "velox/common/testutil/TestValue.h" #include "velox/dwio/dwrf/writer/Writer.h" +#include "velox/exec/OperatorTraceReader.h" #include "velox/exec/PartitionFunction.h" -#include "velox/exec/QueryDataReader.h" -#include "velox/exec/QueryTraceUtil.h" #include "velox/exec/TableWriter.h" +#include "velox/exec/TraceUtil.h" #include "velox/exec/tests/utils/ArbitratorTestUtil.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" @@ -198,10 +198,14 @@ TEST_F(AggregationReplayerTest, test) { .split(makeHiveConnectorSplit(sourceFilePath->getPath())) .copyResults(pool(), task); - const auto replayingResult = - AggregationReplayer( - traceRoot, task->taskId(), traceNodeId_, 0, "Aggregation") - .run(); + const auto replayingResult = AggregationReplayer( + traceRoot, + task->queryCtx()->queryId(), + task->taskId(), + traceNodeId_, + 0, + "Aggregation") + .run(); assertEqualResults({results}, {replayingResult}); } } diff --git a/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp b/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp index b34ead3fb366..d2d9c2c4e49f 100644 --- a/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp +++ b/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp @@ -24,8 +24,8 @@ #include "velox/common/hyperloglog/SparseHll.h" #include "velox/exec/PartitionFunction.h" #include "velox/exec/PartitionedOutput.h" -#include "velox/exec/QueryTraceUtil.h" #include "velox/exec/TableWriter.h" +#include "velox/exec/TraceUtil.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" #include "velox/exec/tests/utils/PlanBuilder.h" @@ -143,10 +143,15 @@ TEST_F(PartitionedOutputReplayerTest, defaultConsumer) { executor_.get(), consumerExecutor.get(), [&](auto /* unused */, auto /* unused */) {}); - ASSERT_NO_THROW( - PartitionedOutputReplayer( - traceRoot, originalTask->taskId(), planNodeId, 0, "PartitionedOutput") - .run()); + + ASSERT_NO_THROW(PartitionedOutputReplayer( + traceRoot, + originalTask->queryCtx()->queryId(), + originalTask->taskId(), + planNodeId, + 0, + "PartitionedOutput") + .run()); } TEST_F(PartitionedOutputReplayerTest, basic) { @@ -223,6 +228,7 @@ TEST_F(PartitionedOutputReplayerTest, basic) { replayedPartitionedResults.resize(testParam.numPartitions); PartitionedOutputReplayer( traceRoot, + originalTask->queryCtx()->queryId(), originalTask->taskId(), planNodeId, 0, diff --git a/velox/tool/trace/tests/TableWriterReplayerTest.cpp b/velox/tool/trace/tests/TableWriterReplayerTest.cpp index ef2c4c54d682..5e4ec3eddfa9 100644 --- a/velox/tool/trace/tests/TableWriterReplayerTest.cpp +++ b/velox/tool/trace/tests/TableWriterReplayerTest.cpp @@ -24,10 +24,10 @@ #include "velox/common/base/Fs.h" #include "velox/common/file/FileSystems.h" #include "velox/common/hyperloglog/SparseHll.h" +#include "velox/exec/OperatorTraceReader.h" #include "velox/exec/PartitionFunction.h" -#include "velox/exec/QueryDataReader.h" -#include "velox/exec/QueryTraceUtil.h" #include "velox/exec/TableWriter.h" +#include "velox/exec/TraceUtil.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" #include "velox/exec/tests/utils/PlanBuilder.h" @@ -297,6 +297,7 @@ TEST_F(TableWriterReplayerTest, basic) { const auto traceOutputDir = TempDirectoryPath::create(); const auto result = TableWriterReplayer( traceRoot, + task->queryCtx()->queryId(), task->taskId(), "1", 0, @@ -422,6 +423,7 @@ TEST_F(TableWriterReplayerTest, partitionWrite) { const auto traceOutputDir = TempDirectoryPath::create(); TableWriterReplayer( traceRoot, + task->queryCtx()->queryId(), task->taskId(), tableWriteNodeId, 0,