Skip to content

Commit

Permalink
Improve operator tracing and make it E2E work (#11360)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #11360

This PR improves the operator trace framework and replay tool implementation:
(1) Let driver execution framework to handle the trace input collection and summary file generation.
The finish trace is either called when trace limit exceeds or on operator close instead of no
more input. So it can capture more information in summary like peak memory usage. By removing
the trace input collection into the driver framework it eases the trace input collection for a spilling
operator. We add to capture the peak memory and input rows in summary so it helps identify the
hot operator or task on replay debugging.
(2) Change the trace storage layout to have root with query id for traces from different tasks
(3) Abstract the replay tool function into TraceReplayRunner class and derive in Meta internal code repo
to handle the Meta internal env setup and keep most common part in  TraceReplayRunner for OSS
(4) A couple of fixes in trace replay tool to make it E2E function in Meta for table writer use case
(5) A couple of file/class renaming to make the file/class name to be more specific as currently we only
support operator level trace collection and replay

Reviewed By: tanjialiang

Differential Revision: D64946367
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Oct 29, 2024
1 parent b1834bd commit 6dbf3dd
Show file tree
Hide file tree
Showing 54 changed files with 1,287 additions and 656 deletions.
5 changes: 3 additions & 2 deletions velox/common/file/File.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,10 @@ void LocalReadFile::preadInternal(uint64_t offset, uint64_t length, char* pos)
VELOX_CHECK_EQ(
bytesRead,
length,
"fread failure in LocalReadFile::PReadInternal, {} vs {}.",
"fread failure in LocalReadFile::PReadInternal, {} vs {}: {}",
bytesRead,
length);
length,
folly::errnoStr(errno));
}

std::string_view
Expand Down
6 changes: 3 additions & 3 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2355,15 +2355,15 @@ folly::dynamic PlanNode::serialize() const {
return obj;
}

const std::vector<PlanNodePtr>& QueryTraceScanNode::sources() const {
const std::vector<PlanNodePtr>& TraceScanNode::sources() const {
return kEmptySources;
}

std::string QueryTraceScanNode::traceDir() const {
std::string TraceScanNode::traceDir() const {
return traceDir_;
}

void QueryTraceScanNode::addDetails(std::stringstream& stream) const {
void TraceScanNode::addDetails(std::stringstream& stream) const {
stream << "Trace dir: " << traceDir_;
}

Expand Down
17 changes: 13 additions & 4 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,13 +314,17 @@ class ArrowStreamNode : public PlanNode {
std::shared_ptr<ArrowArrayStream> arrowStream_;
};

class QueryTraceScanNode final : public PlanNode {
class TraceScanNode final : public PlanNode {
public:
QueryTraceScanNode(
TraceScanNode(
const PlanNodeId& id,
const std::string& traceDir,
uint32_t pipelineId,
const RowTypePtr& outputType)
: PlanNode(id), traceDir_(traceDir), outputType_(outputType) {}
: PlanNode(id),
traceDir_(traceDir),
pipelineId_(pipelineId),
outputType_(outputType) {}

const RowTypePtr& outputType() const override {
return outputType_;
Expand All @@ -333,17 +337,22 @@ class QueryTraceScanNode final : public PlanNode {
}

folly::dynamic serialize() const override {
VELOX_UNSUPPORTED("QueryReplayScanNode is not serializable");
VELOX_UNSUPPORTED("TraceScanNode is not serializable");
return nullptr;
}

std::string traceDir() const;

uint32_t pipelineId() const {
return pipelineId_;
}

private:
void addDetails(std::stringstream& stream) const override;

// Directory of traced data, which is $traceRoot/$taskId/$nodeId.
const std::string traceDir_;
const uint32_t pipelineId_;
const RowTypePtr outputType_;
};

Expand Down
8 changes: 4 additions & 4 deletions velox/docs/develop/debugging/tracing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ The tracing framework consists of three components:
Query Trace Writer
^^^^^^^^^^^^^^^^^^

**QueryMetadataWriter** records the query metadata during task creation,
**TaskTraceMetadataWriter** records the query metadata during task creation,
serializes, and writes them into a file in JSON format. There are two kinds
of metadata:

Expand All @@ -66,20 +66,20 @@ of metadata:
- Plan fragment of the task (also known as a plan node tree). It can be serialized
as a JSON object, which is already supported in Velox.

**QueryDataWriter** records the input vectors from the target operator, which are
**OperatorTraceWriter** records the input vectors from the target operator, which are
serialized and written as a binary file.

Query Trace Reader
^^^^^^^^^^^^^^^^^^

**QueryMetadataReader** can load the query metadata JSON file, and extract the query
**TaskTraceMetadataReader** can load the query metadata JSON file, and extract the query
configurations, connector properties, and the plan fragment.

**QueryDataReader** can read and deserialize the input vectors of the target operator.
It is used as the utility to replay the input data as a source operator in the target
operator replay.

**NOTE**: `QueryDataWriter` serializes and flushes the input vectors in batches,
**NOTE**: `OperatorTraceWriter` serializes and flushes the input vectors in batches,
allowing it to replay the input process using the same sequence of batches.

Query Trace Util
Expand Down
13 changes: 7 additions & 6 deletions velox/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,14 @@ velox_add_library(
OrderBy.cpp
OutputBuffer.cpp
OutputBufferManager.cpp
QueryDataReader.cpp
QueryDataWriter.cpp
QueryMetadataReader.cpp
QueryMetadataWriter.cpp
QueryTraceConfig.cpp
OperatorTraceReader.cpp
OperatorTraceWriter.cpp
TaskTraceReader.cpp
TaskTraceWriter.cpp
Trace.h
TraceConfig.cpp
TraceUtil.cpp
QueryTraceScan.cpp
QueryTraceUtil.cpp
PartitionedOutput.cpp
PartitionFunction.cpp
PartitionStreamingWindowBuild.cpp
Expand Down
5 changes: 3 additions & 2 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ const core::QueryConfig& DriverCtx::queryConfig() const {
return task->queryCtx()->queryConfig();
}

const std::optional<trace::QueryTraceConfig>& DriverCtx::traceConfig() const {
return task->queryTraceConfig();
const std::optional<trace::TraceConfig>& DriverCtx::traceConfig() const {
return task->traceConfig();
}

velox::memory::MemoryPool* DriverCtx::addOperatorPool(
Expand Down Expand Up @@ -618,6 +618,7 @@ StopReason Driver::runInternal(
lockedStats->addInputVector(
resultBytes, intermediateResult->size());
}
nextOp->traceInput(intermediateResult);
TestValue::adjust(
"facebook::velox::exec::Driver::runInternal::addInput",
nextOp);
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
#include "velox/common/time/CpuWallTimer.h"
#include "velox/core/PlanFragment.h"
#include "velox/core/QueryCtx.h"
#include "velox/exec/QueryTraceConfig.h"
#include "velox/exec/TraceConfig.h"

namespace facebook::velox::exec {

Expand Down Expand Up @@ -291,7 +291,7 @@ struct DriverCtx {

const core::QueryConfig& queryConfig() const;

const std::optional<trace::QueryTraceConfig>& traceConfig() const;
const std::optional<trace::TraceConfig>& traceConfig() const;

velox::memory::MemoryPool* addOperatorPool(
const core::PlanNodeId& planNodeId,
Expand Down
1 change: 0 additions & 1 deletion velox/exec/HashAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ bool HashAggregation::abandonPartialAggregationEarly(int64_t numOutput) const {
}

void HashAggregation::addInput(RowVectorPtr input) {
traceInput(input);
if (!pushdownChecked_) {
mayPushdown_ = operatorCtx_->driver()->mayPushdownAggregation(this);
pushdownChecked_ = true;
Expand Down
11 changes: 5 additions & 6 deletions velox/exec/LocalPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
#include "velox/exec/MergeJoin.h"
#include "velox/exec/NestedLoopJoinBuild.h"
#include "velox/exec/NestedLoopJoinProbe.h"
#include "velox/exec/OperatorTraceScan.h"
#include "velox/exec/OrderBy.h"
#include "velox/exec/PartitionedOutput.h"
#include "velox/exec/QueryTraceScan.h"
#include "velox/exec/RowNumber.h"
#include "velox/exec/StreamingAggregation.h"
#include "velox/exec/TableScan.h"
Expand Down Expand Up @@ -595,11 +595,10 @@ std::shared_ptr<Driver> DriverFactory::createDriver(
assignUniqueIdNode->taskUniqueId(),
assignUniqueIdNode->uniqueIdCounter()));
} else if (
const auto queryReplayScanNode =
std::dynamic_pointer_cast<const core::QueryTraceScanNode>(
planNode)) {
operators.push_back(std::make_unique<trace::QueryTraceScan>(
id, ctx.get(), queryReplayScanNode));
const auto traceScanNode =
std::dynamic_pointer_cast<const core::TraceScanNode>(planNode)) {
operators.push_back(std::make_unique<trace::OperatorTraceScan>(
id, ctx.get(), traceScanNode));
} else {
std::unique_ptr<Operator> extended;
if (planNode->requiresExchangeClient()) {
Expand Down
35 changes: 20 additions & 15 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
#include "velox/common/base/SuccinctPrinter.h"
#include "velox/common/testutil/TestValue.h"
#include "velox/exec/Driver.h"
#include "velox/exec/HashJoinBridge.h"
#include "velox/exec/OperatorUtils.h"
#include "velox/exec/QueryTraceUtil.h"
#include "velox/exec/Task.h"
#include "velox/exec/TraceUtil.h"
#include "velox/expression/Expr.h"

using facebook::velox::common::testutil::TestValue;
Expand Down Expand Up @@ -107,8 +105,8 @@ void Operator::maybeSetReclaimer() {
}

void Operator::maybeSetTracer() {
const auto& queryTraceConfig = operatorCtx_->driverCtx()->traceConfig();
if (!queryTraceConfig.has_value()) {
const auto& traceConfig = operatorCtx_->driverCtx()->traceConfig();
if (!traceConfig.has_value()) {
return;
}

Expand All @@ -117,7 +115,7 @@ void Operator::maybeSetTracer() {
}

const auto nodeId = planNodeId();
if (queryTraceConfig->queryNodes.count(nodeId) == 0) {
if (traceConfig->queryNodes.count(nodeId) == 0) {
return;
}

Expand All @@ -134,20 +132,17 @@ void Operator::maybeSetTracer() {

const auto pipelineId = operatorCtx_->driverCtx()->pipelineId;
const auto driverId = operatorCtx_->driverCtx()->driverId;
LOG(INFO) << "Trace data for operator type: " << operatorType()
LOG(INFO) << "Trace input for operator type: " << operatorType()
<< ", operator id: " << operatorId() << ", pipeline: " << pipelineId
<< ", driver: " << driverId << ", task: " << taskId();
const auto opTraceDirPath = fmt::format(
"{}/{}/{}/{}/data",
queryTraceConfig->queryTraceDir,
planNodeId(),
pipelineId,
driverId);
const auto opTraceDirPath = trace::getOpTraceDirectory(
traceConfig->queryTraceDir, planNodeId(), pipelineId, driverId);
trace::createTraceDirectory(opTraceDirPath);
inputTracer_ = std::make_unique<trace::QueryDataWriter>(
inputTracer_ = std::make_unique<trace::OperatorTraceWriter>(
this,
opTraceDirPath,
memory::traceMemoryPool(),
queryTraceConfig->updateAndCheckTraceLimitCB);
traceConfig->updateAndCheckTraceLimitCB);
}

void Operator::traceInput(const RowVectorPtr& input) {
Expand Down Expand Up @@ -319,6 +314,16 @@ OperatorStats Operator::stats(bool clear) {
return stats;
}

void Operator::close() {
input_ = nullptr;
results_.clear();
recordSpillStats();
finishTrace();

// Release the unused memory reservation on close.
operatorCtx_->pool()->release();
}

vector_size_t Operator::outputBatchRows(
std::optional<uint64_t> averageRowSize) const {
const auto& queryConfig = operatorCtx_->task()->queryCtx()->queryConfig();
Expand Down
13 changes: 3 additions & 10 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include "velox/core/PlanNode.h"
#include "velox/exec/Driver.h"
#include "velox/exec/JoinBridge.h"
#include "velox/exec/QueryDataWriter.h"
#include "velox/exec/OperatorTraceWriter.h"
#include "velox/exec/Spiller.h"
#include "velox/type/Filter.h"

Expand Down Expand Up @@ -408,7 +408,6 @@ class Operator : public BaseRuntimeStatWriter {
/// e.g. the first operator in the pipeline.
virtual void noMoreInput() {
noMoreInput_ = true;
finishTrace();
}

/// Returns a RowVector with the result columns. Returns nullptr if
Expand Down Expand Up @@ -483,13 +482,7 @@ class Operator : public BaseRuntimeStatWriter {

/// Frees all resources associated with 'this'. No other methods
/// should be called after this.
virtual void close() {
input_ = nullptr;
results_.clear();
recordSpillStats();
// Release the unused memory reservation on close.
operatorCtx_->pool()->release();
}
virtual void close();

// Returns true if 'this' never has more output rows than input rows.
virtual bool isFilter() const {
Expand Down Expand Up @@ -781,7 +774,7 @@ class Operator : public BaseRuntimeStatWriter {

folly::Synchronized<OperatorStats> stats_;
folly::Synchronized<common::SpillStats> spillStats_;
std::unique_ptr<trace::QueryDataWriter> inputTracer_;
std::unique_ptr<trace::OperatorTraceWriter> inputTracer_;

/// Indicates if an operator is under a non-reclaimable execution section.
/// This prevents the memory arbitrator from reclaiming memory from this
Expand Down
79 changes: 79 additions & 0 deletions velox/exec/OperatorTraceReader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <utility>

#include "velox/exec/OperatorTraceReader.h"

#include "velox/exec/TraceUtil.h"

namespace facebook::velox::exec::trace {

OperatorTraceInputReader::OperatorTraceInputReader(
std::string traceDir,
RowTypePtr dataType,
memory::MemoryPool* pool)
: traceDir_(std::move(traceDir)),
fs_(filesystems::getFileSystem(traceDir_, nullptr)),
dataType_(std::move(dataType)),
pool_(pool),
inputStream_(getInputStream()) {
VELOX_CHECK_NOT_NULL(dataType_);
VELOX_CHECK_NOT_NULL(inputStream_);
}

bool OperatorTraceInputReader::read(RowVectorPtr& batch) const {
if (inputStream_->atEnd()) {
batch = nullptr;
return false;
}

VectorStreamGroup::read(
inputStream_.get(), pool_, dataType_, &batch, &readOptions_);
return true;
}

std::unique_ptr<common::FileInputStream>
OperatorTraceInputReader::getInputStream() const {
auto traceFile = fs_->openFileForRead(getOpTraceInputFilePath(traceDir_));
// TODO: Make the buffer size configurable.
return std::make_unique<common::FileInputStream>(
std::move(traceFile), 1 << 20, pool_);
}

OperatorTraceSummaryReader::OperatorTraceSummaryReader(
std::string traceDir,
memory::MemoryPool* pool)
: traceDir_(std::move(traceDir)),
fs_(filesystems::getFileSystem(traceDir_, nullptr)),
pool_(pool),
summaryFile_(fs_->openFileForRead(getOpTraceSummaryFilePath(traceDir_))) {
}

OperatorTraceSummary OperatorTraceSummaryReader::read() const {
VELOX_CHECK_NOT_NULL(summaryFile_);
const auto summaryStr = summaryFile_->pread(0, summaryFile_->size());
VELOX_CHECK(!summaryStr.empty());

folly::dynamic summaryObj = folly::parseJson(summaryStr);
OperatorTraceSummary summary;
summary.exceededTraceLimit =
summaryObj[OperatorTraceTraits::kTraceLimitExceededKey].asBool();
summary.peakMemory = summaryObj[OperatorTraceTraits::kPeakMemoryKey].asInt();
summary.inputRows = summaryObj[OperatorTraceTraits::kInputRowsKey].asInt();
return summary;
}
} // namespace facebook::velox::exec::trace
Loading

0 comments on commit 6dbf3dd

Please sign in to comment.