Skip to content

Commit

Permalink
Operator trace refactor and support more trace summary (#11360)
Browse files Browse the repository at this point in the history
Summary: Pull Request resolved: #11360

Differential Revision: D64946367
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Oct 28, 2024
1 parent a1d923e commit 39d7f55
Show file tree
Hide file tree
Showing 51 changed files with 962 additions and 577 deletions.
5 changes: 3 additions & 2 deletions velox/common/file/File.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,10 @@ void LocalReadFile::preadInternal(uint64_t offset, uint64_t length, char* pos)
VELOX_CHECK_EQ(
bytesRead,
length,
"fread failure in LocalReadFile::PReadInternal, {} vs {}.",
"fread failure in LocalReadFile::PReadInternal, {} vs {}: {}",
bytesRead,
length);
length,
folly::errnoStr(errno));
}

std::string_view
Expand Down
6 changes: 3 additions & 3 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2355,15 +2355,15 @@ folly::dynamic PlanNode::serialize() const {
return obj;
}

const std::vector<PlanNodePtr>& QueryTraceScanNode::sources() const {
const std::vector<PlanNodePtr>& TraceScanNode::sources() const {
return kEmptySources;
}

std::string QueryTraceScanNode::traceDir() const {
std::string TraceScanNode::traceDir() const {
return traceDir_;
}

void QueryTraceScanNode::addDetails(std::stringstream& stream) const {
void TraceScanNode::addDetails(std::stringstream& stream) const {
stream << "Trace dir: " << traceDir_;
}

Expand Down
4 changes: 2 additions & 2 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,9 @@ class ArrowStreamNode : public PlanNode {
std::shared_ptr<ArrowArrayStream> arrowStream_;
};

class QueryTraceScanNode final : public PlanNode {
class TraceScanNode final : public PlanNode {
public:
QueryTraceScanNode(
TraceScanNode(
const PlanNodeId& id,
const std::string& traceDir,
const RowTypePtr& outputType)
Expand Down
9 changes: 5 additions & 4 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,8 @@ class QueryConfig {

/// Returns true if query tracing is enabled.
bool queryTraceEnabled() const {
return get<bool>(kQueryTraceEnabled, false);
return get<bool>(kQueryTraceEnabled, true);
// return get<bool>(kQueryTraceEnabled, false);
}

std::string queryTraceDir() const {
Expand All @@ -724,16 +725,16 @@ class QueryConfig {

std::string queryTraceNodeIds() const {
// The default query trace nodes, empty by default.
return get<std::string>(kQueryTraceNodeIds, "");
return get<std::string>(kQueryTraceNodeIds, "29");
}

uint64_t queryTraceMaxBytes() const {
return get<uint64_t>(kQueryTraceMaxBytes, 0);
return get<uint64_t>(kQueryTraceMaxBytes, 500ULL << 40);
}

std::string queryTraceTaskRegExp() const {
// The default query trace task regexp, empty by default.
return get<std::string>(kQueryTraceTaskRegExp, "");
return get<std::string>(kQueryTraceTaskRegExp, ".*");
}

bool prestoArrayAggIgnoreNulls() const {
Expand Down
8 changes: 4 additions & 4 deletions velox/docs/develop/debugging/tracing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ The tracing framework consists of three components:
Query Trace Writer
^^^^^^^^^^^^^^^^^^

**QueryMetadataWriter** records the query metadata during task creation,
**TaskTraceMetadataWriter** records the query metadata during task creation,
serializes, and writes them into a file in JSON format. There are two kinds
of metadata:

Expand All @@ -66,20 +66,20 @@ of metadata:
- Plan fragment of the task (also known as a plan node tree). It can be serialized
as a JSON object, which is already supported in Velox.

**QueryDataWriter** records the input vectors from the target operator, which are
**OperatorTraceWriter** records the input vectors from the target operator, which are
serialized and written as a binary file.

Query Trace Reader
^^^^^^^^^^^^^^^^^^

**QueryMetadataReader** can load the query metadata JSON file, and extract the query
**TaskTraceMetadataReader** can load the query metadata JSON file, and extract the query
configurations, connector properties, and the plan fragment.

**QueryDataReader** can read and deserialize the input vectors of the target operator.
It is used as the utility to replay the input data as a source operator in the target
operator replay.

**NOTE**: `QueryDataWriter` serializes and flushes the input vectors in batches,
**NOTE**: `OperatorTraceWriter` serializes and flushes the input vectors in batches,
allowing it to replay the input process using the same sequence of batches.

Query Trace Util
Expand Down
13 changes: 7 additions & 6 deletions velox/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,14 @@ velox_add_library(
OrderBy.cpp
OutputBuffer.cpp
OutputBufferManager.cpp
QueryDataReader.cpp
QueryDataWriter.cpp
QueryMetadataReader.cpp
QueryMetadataWriter.cpp
QueryTraceConfig.cpp
OperatorTraceReader.cpp
OperatorTraceWriter.cpp
TaskTraceMetadataReader.cpp
TaskTraceMetadataWriter.cpp
Trace.h
TraceConfig.cpp
TraceUtil.cpp
QueryTraceScan.cpp
QueryTraceUtil.cpp
PartitionedOutput.cpp
PartitionFunction.cpp
PartitionStreamingWindowBuild.cpp
Expand Down
5 changes: 3 additions & 2 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ const core::QueryConfig& DriverCtx::queryConfig() const {
return task->queryCtx()->queryConfig();
}

const std::optional<trace::QueryTraceConfig>& DriverCtx::traceConfig() const {
return task->queryTraceConfig();
const std::optional<trace::TraceConfig>& DriverCtx::traceConfig() const {
return task->TraceConfig();
}

velox::memory::MemoryPool* DriverCtx::addOperatorPool(
Expand Down Expand Up @@ -618,6 +618,7 @@ StopReason Driver::runInternal(
lockedStats->addInputVector(
resultBytes, intermediateResult->size());
}
nextOp->traceInput(intermediateResult);
TestValue::adjust(
"facebook::velox::exec::Driver::runInternal::addInput",
nextOp);
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
#include "velox/common/time/CpuWallTimer.h"
#include "velox/core/PlanFragment.h"
#include "velox/core/QueryCtx.h"
#include "velox/exec/QueryTraceConfig.h"
#include "velox/exec/TraceConfig.h"

namespace facebook::velox::exec {

Expand Down Expand Up @@ -291,7 +291,7 @@ struct DriverCtx {

const core::QueryConfig& queryConfig() const;

const std::optional<trace::QueryTraceConfig>& traceConfig() const;
const std::optional<trace::TraceConfig>& traceConfig() const;

velox::memory::MemoryPool* addOperatorPool(
const core::PlanNodeId& planNodeId,
Expand Down
1 change: 0 additions & 1 deletion velox/exec/HashAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ bool HashAggregation::abandonPartialAggregationEarly(int64_t numOutput) const {
}

void HashAggregation::addInput(RowVectorPtr input) {
traceInput(input);
if (!pushdownChecked_) {
mayPushdown_ = operatorCtx_->driver()->mayPushdownAggregation(this);
pushdownChecked_ = true;
Expand Down
11 changes: 5 additions & 6 deletions velox/exec/LocalPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
#include "velox/exec/MergeJoin.h"
#include "velox/exec/NestedLoopJoinBuild.h"
#include "velox/exec/NestedLoopJoinProbe.h"
#include "velox/exec/OperatorTraceScan.h"
#include "velox/exec/OrderBy.h"
#include "velox/exec/PartitionedOutput.h"
#include "velox/exec/QueryTraceScan.h"
#include "velox/exec/RowNumber.h"
#include "velox/exec/StreamingAggregation.h"
#include "velox/exec/TableScan.h"
Expand Down Expand Up @@ -595,11 +595,10 @@ std::shared_ptr<Driver> DriverFactory::createDriver(
assignUniqueIdNode->taskUniqueId(),
assignUniqueIdNode->uniqueIdCounter()));
} else if (
const auto queryReplayScanNode =
std::dynamic_pointer_cast<const core::QueryTraceScanNode>(
planNode)) {
operators.push_back(std::make_unique<trace::QueryTraceScan>(
id, ctx.get(), queryReplayScanNode));
const auto traceScanNode =
std::dynamic_pointer_cast<const core::TraceScanNode>(planNode)) {
operators.push_back(std::make_unique<trace::OperatorTraceScan>(
id, ctx.get(), traceScanNode));
} else {
std::unique_ptr<Operator> extended;
if (planNode->requiresExchangeClient()) {
Expand Down
29 changes: 20 additions & 9 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
#include "velox/exec/Driver.h"
#include "velox/exec/HashJoinBridge.h"
#include "velox/exec/OperatorUtils.h"
#include "velox/exec/QueryTraceUtil.h"
#include "velox/exec/Task.h"
#include "velox/exec/TraceUtil.h"
#include "velox/expression/Expr.h"

using facebook::velox::common::testutil::TestValue;
Expand Down Expand Up @@ -107,8 +107,8 @@ void Operator::maybeSetReclaimer() {
}

void Operator::maybeSetTracer() {
const auto& queryTraceConfig = operatorCtx_->driverCtx()->traceConfig();
if (!queryTraceConfig.has_value()) {
const auto& TraceConfig = operatorCtx_->driverCtx()->traceConfig();
if (!TraceConfig.has_value()) {
return;
}

Expand All @@ -117,7 +117,7 @@ void Operator::maybeSetTracer() {
}

const auto nodeId = planNodeId();
if (queryTraceConfig->queryNodes.count(nodeId) == 0) {
if (TraceConfig->queryNodes.count(nodeId) == 0) {
return;
}

Expand All @@ -134,20 +134,21 @@ void Operator::maybeSetTracer() {

const auto pipelineId = operatorCtx_->driverCtx()->pipelineId;
const auto driverId = operatorCtx_->driverCtx()->driverId;
LOG(INFO) << "Trace data for operator type: " << operatorType()
LOG(INFO) << "Trace input for operator type: " << operatorType()
<< ", operator id: " << operatorId() << ", pipeline: " << pipelineId
<< ", driver: " << driverId << ", task: " << taskId();
const auto opTraceDirPath = fmt::format(
"{}/{}/{}/{}/data",
queryTraceConfig->queryTraceDir,
"{}/{}/{}/{}",
TraceConfig->queryTraceDir,
planNodeId(),
pipelineId,
driverId);
trace::createTraceDirectory(opTraceDirPath);
inputTracer_ = std::make_unique<trace::QueryDataWriter>(
inputTracer_ = std::make_unique<trace::OperatorTraceWriter>(
this,
opTraceDirPath,
memory::traceMemoryPool(),
queryTraceConfig->updateAndCheckTraceLimitCB);
TraceConfig->updateAndCheckTraceLimitCB);
}

void Operator::traceInput(const RowVectorPtr& input) {
Expand Down Expand Up @@ -319,6 +320,16 @@ OperatorStats Operator::stats(bool clear) {
return stats;
}

void Operator::close() {
input_ = nullptr;
results_.clear();
recordSpillStats();
finishTrace();

// Release the unused memory reservation on close.
operatorCtx_->pool()->release();
}

vector_size_t Operator::outputBatchRows(
std::optional<uint64_t> averageRowSize) const {
const auto& queryConfig = operatorCtx_->task()->queryCtx()->queryConfig();
Expand Down
13 changes: 3 additions & 10 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include "velox/core/PlanNode.h"
#include "velox/exec/Driver.h"
#include "velox/exec/JoinBridge.h"
#include "velox/exec/QueryDataWriter.h"
#include "velox/exec/OperatorTraceWriter.h"
#include "velox/exec/Spiller.h"
#include "velox/type/Filter.h"

Expand Down Expand Up @@ -408,7 +408,6 @@ class Operator : public BaseRuntimeStatWriter {
/// e.g. the first operator in the pipeline.
virtual void noMoreInput() {
noMoreInput_ = true;
finishTrace();
}

/// Returns a RowVector with the result columns. Returns nullptr if
Expand Down Expand Up @@ -483,13 +482,7 @@ class Operator : public BaseRuntimeStatWriter {

/// Frees all resources associated with 'this'. No other methods
/// should be called after this.
virtual void close() {
input_ = nullptr;
results_.clear();
recordSpillStats();
// Release the unused memory reservation on close.
operatorCtx_->pool()->release();
}
virtual void close();

// Returns true if 'this' never has more output rows than input rows.
virtual bool isFilter() const {
Expand Down Expand Up @@ -781,7 +774,7 @@ class Operator : public BaseRuntimeStatWriter {

folly::Synchronized<OperatorStats> stats_;
folly::Synchronized<common::SpillStats> spillStats_;
std::unique_ptr<trace::QueryDataWriter> inputTracer_;
std::unique_ptr<trace::OperatorTraceWriter> inputTracer_;

/// Indicates if an operator is under a non-reclaimable execution section.
/// This prevents the memory arbitrator from reclaiming memory from this
Expand Down
79 changes: 79 additions & 0 deletions velox/exec/OperatorTraceReader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <utility>

#include "velox/exec/OperatorTraceReader.h"

#include "velox/exec/TraceUtil.h"

namespace facebook::velox::exec::trace {

OperatorTraceInputReader::OperatorTraceInputReader(
std::string traceDir,
RowTypePtr dataType,
memory::MemoryPool* pool)
: traceDir_(std::move(traceDir)),
fs_(filesystems::getFileSystem(traceDir_, nullptr)),
dataType_(std::move(dataType)),
pool_(pool),
inputStream_(getInputStream()) {
VELOX_CHECK_NOT_NULL(dataType_);
VELOX_CHECK_NOT_NULL(inputStream_);
}

bool OperatorTraceInputReader::read(RowVectorPtr& batch) const {
if (inputStream_->atEnd()) {
batch = nullptr;
return false;
}

VectorStreamGroup::read(
inputStream_.get(), pool_, dataType_, &batch, &readOptions_);
return true;
}

std::unique_ptr<common::FileInputStream>
OperatorTraceInputReader::getInputStream() const {
auto traceFile = fs_->openFileForRead(getOpTraceInputFilePath(traceDir_));
// TODO: Make the buffer size configurable.
return std::make_unique<common::FileInputStream>(
std::move(traceFile), 1 << 20, pool_);
}

OperatorTraceSummaryReader::OperatorTraceSummaryReader(
std::string traceDir,
memory::MemoryPool* pool)
: traceDir_(std::move(traceDir)),
fs_(filesystems::getFileSystem(traceDir_, nullptr)),
pool_(pool),
summaryFile_(fs_->openFileForRead(getOpTraceSummaryFilePath(traceDir_))) {
}

OperatorTraceSummary OperatorTraceSummaryReader::read() const {
VELOX_CHECK_NOT_NULL(summaryFile_);
const auto summaryStr = summaryFile_->pread(0, summaryFile_->size());
VELOX_CHECK(!summaryStr.empty());

folly::dynamic summaryObj = folly::parseJson(summaryStr);
OperatorTraceSummary summary;
summary.exceededTraceLimit =
summaryObj[OperatorTraceTraits::kTraceLimitExceededKey].asBool();
summary.peakMemory = summaryObj[OperatorTraceTraits::kPeakMemoryKey].asInt();
summary.inputRows = summaryObj[OperatorTraceTraits::kInputRowsKey].asInt();
return summary;
}
} // namespace facebook::velox::exec::trace
Loading

0 comments on commit 39d7f55

Please sign in to comment.