Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve operator tracing and make it E2E work #11360

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions velox/common/base/VeloxException.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ inline constexpr auto kNoCacheSpace = "NO_CACHE_SPACE"_fs;
/// An error raised when spill bytes exceeds limits.
inline constexpr auto kSpillLimitExceeded = "SPILL_LIMIT_EXCEEDED"_fs;

/// An error raised when trace bytes exceeds limits.
inline constexpr auto kTraceLimitExceeded = "TRACE_LIMIT_EXCEEDED"_fs;

/// Errors indicating file read corruptions.
inline constexpr auto kFileCorruption = "FILE_CORRUPTION"_fs;

Expand Down
5 changes: 3 additions & 2 deletions velox/common/file/File.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,10 @@ void LocalReadFile::preadInternal(uint64_t offset, uint64_t length, char* pos)
VELOX_CHECK_EQ(
bytesRead,
length,
"fread failure in LocalReadFile::PReadInternal, {} vs {}.",
"fread failure in LocalReadFile::PReadInternal, {} vs {}: {}",
bytesRead,
length);
length,
folly::errnoStr(errno));
}

std::string_view
Expand Down
6 changes: 3 additions & 3 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2355,15 +2355,15 @@ folly::dynamic PlanNode::serialize() const {
return obj;
}

const std::vector<PlanNodePtr>& QueryTraceScanNode::sources() const {
const std::vector<PlanNodePtr>& TraceScanNode::sources() const {
return kEmptySources;
}

std::string QueryTraceScanNode::traceDir() const {
std::string TraceScanNode::traceDir() const {
return traceDir_;
}

void QueryTraceScanNode::addDetails(std::stringstream& stream) const {
void TraceScanNode::addDetails(std::stringstream& stream) const {
stream << "Trace dir: " << traceDir_;
}

Expand Down
17 changes: 13 additions & 4 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,13 +314,17 @@ class ArrowStreamNode : public PlanNode {
std::shared_ptr<ArrowArrayStream> arrowStream_;
};

class QueryTraceScanNode final : public PlanNode {
class TraceScanNode final : public PlanNode {
public:
QueryTraceScanNode(
TraceScanNode(
const PlanNodeId& id,
const std::string& traceDir,
uint32_t pipelineId,
const RowTypePtr& outputType)
: PlanNode(id), traceDir_(traceDir), outputType_(outputType) {}
: PlanNode(id),
traceDir_(traceDir),
pipelineId_(pipelineId),
outputType_(outputType) {}

const RowTypePtr& outputType() const override {
return outputType_;
Expand All @@ -333,17 +337,22 @@ class QueryTraceScanNode final : public PlanNode {
}

folly::dynamic serialize() const override {
VELOX_UNSUPPORTED("QueryReplayScanNode is not serializable");
VELOX_UNSUPPORTED("TraceScanNode is not serializable");
return nullptr;
}

std::string traceDir() const;

uint32_t pipelineId() const {
return pipelineId_;
}

private:
void addDetails(std::stringstream& stream) const override;

// Directory of traced data, which is $traceRoot/$taskId/$nodeId.
const std::string traceDir_;
const uint32_t pipelineId_;
const RowTypePtr outputType_;
};

Expand Down
13 changes: 5 additions & 8 deletions velox/core/QueryCtx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,13 @@ void QueryCtx::updateSpilledBytesAndCheckLimit(uint64_t bytes) {
}
}

bool QueryCtx::updateTracedBytesAndCheckLimit(uint64_t bytes) {
if (numTracedBytes_.fetch_add(bytes) + bytes <
void QueryCtx::updateTracedBytesAndCheckLimit(uint64_t bytes) {
if (numTracedBytes_.fetch_add(bytes) + bytes >=
queryConfig_.queryTraceMaxBytes()) {
return false;
VELOX_SPILL_LIMIT_EXCEEDED(fmt::format(
"Query exceeded per-query local trace limit of {}",
succinctBytes(queryConfig_.queryTraceMaxBytes())));
}

numTracedBytes_.fetch_sub(bytes);
LOG(WARNING) << "Query exceeded trace limit of "
<< succinctBytes(queryConfig_.queryTraceMaxBytes());
return true;
}

std::unique_ptr<memory::MemoryReclaimer> QueryCtx::MemoryReclaimer::create(
Expand Down
6 changes: 3 additions & 3 deletions velox/core/QueryCtx.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ class QueryCtx : public std::enable_shared_from_this<QueryCtx> {
/// the max spill bytes limit.
void updateSpilledBytesAndCheckLimit(uint64_t bytes);

/// Updates the aggregated trace bytes of this query, and return true if
/// exceeds the max query trace bytes limit.
bool updateTracedBytesAndCheckLimit(uint64_t bytes);
/// Updates the aggregated trace bytes of this query, and throws if exceeds
/// the max query trace bytes limit.
void updateTracedBytesAndCheckLimit(uint64_t bytes);

void testingOverrideMemoryPool(std::shared_ptr<memory::MemoryPool> pool) {
pool_ = std::move(pool);
Expand Down
8 changes: 4 additions & 4 deletions velox/docs/develop/debugging/tracing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ The tracing framework consists of three components:
Query Trace Writer
^^^^^^^^^^^^^^^^^^

**QueryMetadataWriter** records the query metadata during task creation,
**TaskTraceMetadataWriter** records the query metadata during task creation,
serializes, and writes them into a file in JSON format. There are two kinds
of metadata:

Expand All @@ -66,20 +66,20 @@ of metadata:
- Plan fragment of the task (also known as a plan node tree). It can be serialized
as a JSON object, which is already supported in Velox.

**QueryDataWriter** records the input vectors from the target operator, which are
**OperatorTraceWriter** records the input vectors from the target operator, which are
serialized and written as a binary file.

Query Trace Reader
^^^^^^^^^^^^^^^^^^

**QueryMetadataReader** can load the query metadata JSON file, and extract the query
**TaskTraceMetadataReader** can load the query metadata JSON file, and extract the query
configurations, connector properties, and the plan fragment.

**QueryDataReader** can read and deserialize the input vectors of the target operator.
It is used as the utility to replay the input data as a source operator in the target
operator replay.

**NOTE**: `QueryDataWriter` serializes and flushes the input vectors in batches,
**NOTE**: `OperatorTraceWriter` serializes and flushes the input vectors in batches,
allowing it to replay the input process using the same sequence of batches.

Query Trace Util
Expand Down
15 changes: 8 additions & 7 deletions velox/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,14 @@ velox_add_library(
OrderBy.cpp
OutputBuffer.cpp
OutputBufferManager.cpp
QueryDataReader.cpp
QueryDataWriter.cpp
QueryMetadataReader.cpp
QueryMetadataWriter.cpp
QueryTraceConfig.cpp
QueryTraceScan.cpp
QueryTraceUtil.cpp
OperatorTraceReader.cpp
OperatorTraceScan.cpp
OperatorTraceWriter.cpp
TaskTraceReader.cpp
TaskTraceWriter.cpp
Trace.cpp
TraceConfig.cpp
TraceUtil.cpp
PartitionedOutput.cpp
PartitionFunction.cpp
PartitionStreamingWindowBuild.cpp
Expand Down
5 changes: 3 additions & 2 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ const core::QueryConfig& DriverCtx::queryConfig() const {
return task->queryCtx()->queryConfig();
}

const std::optional<trace::QueryTraceConfig>& DriverCtx::traceConfig() const {
return task->queryTraceConfig();
const std::optional<trace::TraceConfig>& DriverCtx::traceConfig() const {
return task->traceConfig();
}

velox::memory::MemoryPool* DriverCtx::addOperatorPool(
Expand Down Expand Up @@ -618,6 +618,7 @@ StopReason Driver::runInternal(
lockedStats->addInputVector(
resultBytes, intermediateResult->size());
}
nextOp->traceInput(intermediateResult);
TestValue::adjust(
"facebook::velox::exec::Driver::runInternal::addInput",
nextOp);
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
#include "velox/common/time/CpuWallTimer.h"
#include "velox/core/PlanFragment.h"
#include "velox/core/QueryCtx.h"
#include "velox/exec/QueryTraceConfig.h"
#include "velox/exec/TraceConfig.h"

namespace facebook::velox::exec {

Expand Down Expand Up @@ -291,7 +291,7 @@ struct DriverCtx {

const core::QueryConfig& queryConfig() const;

const std::optional<trace::QueryTraceConfig>& traceConfig() const;
const std::optional<trace::TraceConfig>& traceConfig() const;

velox::memory::MemoryPool* addOperatorPool(
const core::PlanNodeId& planNodeId,
Expand Down
1 change: 0 additions & 1 deletion velox/exec/HashAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ bool HashAggregation::abandonPartialAggregationEarly(int64_t numOutput) const {
}

void HashAggregation::addInput(RowVectorPtr input) {
traceInput(input);
if (!pushdownChecked_) {
mayPushdown_ = operatorCtx_->driver()->mayPushdownAggregation(this);
pushdownChecked_ = true;
Expand Down
11 changes: 5 additions & 6 deletions velox/exec/LocalPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
#include "velox/exec/MergeJoin.h"
#include "velox/exec/NestedLoopJoinBuild.h"
#include "velox/exec/NestedLoopJoinProbe.h"
#include "velox/exec/OperatorTraceScan.h"
#include "velox/exec/OrderBy.h"
#include "velox/exec/PartitionedOutput.h"
#include "velox/exec/QueryTraceScan.h"
#include "velox/exec/RowNumber.h"
#include "velox/exec/StreamingAggregation.h"
#include "velox/exec/TableScan.h"
Expand Down Expand Up @@ -595,11 +595,10 @@ std::shared_ptr<Driver> DriverFactory::createDriver(
assignUniqueIdNode->taskUniqueId(),
assignUniqueIdNode->uniqueIdCounter()));
} else if (
const auto queryReplayScanNode =
std::dynamic_pointer_cast<const core::QueryTraceScanNode>(
planNode)) {
operators.push_back(std::make_unique<trace::QueryTraceScan>(
id, ctx.get(), queryReplayScanNode));
const auto traceScanNode =
std::dynamic_pointer_cast<const core::TraceScanNode>(planNode)) {
operators.push_back(std::make_unique<trace::OperatorTraceScan>(
id, ctx.get(), traceScanNode));
} else {
std::unique_ptr<Operator> extended;
if (planNode->requiresExchangeClient()) {
Expand Down
39 changes: 20 additions & 19 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
#include "velox/common/base/SuccinctPrinter.h"
#include "velox/common/testutil/TestValue.h"
#include "velox/exec/Driver.h"
#include "velox/exec/HashJoinBridge.h"
#include "velox/exec/OperatorUtils.h"
#include "velox/exec/QueryTraceUtil.h"
#include "velox/exec/Task.h"
#include "velox/exec/TraceUtil.h"
#include "velox/expression/Expr.h"

using facebook::velox::common::testutil::TestValue;
Expand Down Expand Up @@ -107,17 +105,13 @@ void Operator::maybeSetReclaimer() {
}

void Operator::maybeSetTracer() {
const auto& queryTraceConfig = operatorCtx_->driverCtx()->traceConfig();
if (!queryTraceConfig.has_value()) {
return;
}

if (operatorCtx_->driverCtx()->queryConfig().queryTraceMaxBytes() == 0) {
const auto& traceConfig = operatorCtx_->driverCtx()->traceConfig();
if (!traceConfig.has_value()) {
return;
}

const auto nodeId = planNodeId();
if (queryTraceConfig->queryNodes.count(nodeId) == 0) {
if (traceConfig->queryNodes.count(nodeId) == 0) {
return;
}

Expand All @@ -134,20 +128,17 @@ void Operator::maybeSetTracer() {

const auto pipelineId = operatorCtx_->driverCtx()->pipelineId;
const auto driverId = operatorCtx_->driverCtx()->driverId;
LOG(INFO) << "Trace data for operator type: " << operatorType()
LOG(INFO) << "Trace input for operator type: " << operatorType()
<< ", operator id: " << operatorId() << ", pipeline: " << pipelineId
<< ", driver: " << driverId << ", task: " << taskId();
const auto opTraceDirPath = fmt::format(
"{}/{}/{}/{}/data",
queryTraceConfig->queryTraceDir,
planNodeId(),
pipelineId,
driverId);
const auto opTraceDirPath = trace::getOpTraceDirectory(
traceConfig->queryTraceDir, planNodeId(), pipelineId, driverId);
trace::createTraceDirectory(opTraceDirPath);
inputTracer_ = std::make_unique<trace::QueryDataWriter>(
inputTracer_ = std::make_unique<trace::OperatorTraceWriter>(
this,
opTraceDirPath,
memory::traceMemoryPool(),
queryTraceConfig->updateAndCheckTraceLimitCB);
traceConfig->updateAndCheckTraceLimitCB);
}

void Operator::traceInput(const RowVectorPtr& input) {
Expand Down Expand Up @@ -319,6 +310,16 @@ OperatorStats Operator::stats(bool clear) {
return stats;
}

void Operator::close() {
input_ = nullptr;
results_.clear();
recordSpillStats();
finishTrace();

// Release the unused memory reservation on close.
operatorCtx_->pool()->release();
}

vector_size_t Operator::outputBatchRows(
std::optional<uint64_t> averageRowSize) const {
const auto& queryConfig = operatorCtx_->task()->queryCtx()->queryConfig();
Expand Down
13 changes: 3 additions & 10 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include "velox/core/PlanNode.h"
#include "velox/exec/Driver.h"
#include "velox/exec/JoinBridge.h"
#include "velox/exec/QueryDataWriter.h"
#include "velox/exec/OperatorTraceWriter.h"
#include "velox/exec/Spiller.h"
#include "velox/type/Filter.h"

Expand Down Expand Up @@ -408,7 +408,6 @@ class Operator : public BaseRuntimeStatWriter {
/// e.g. the first operator in the pipeline.
virtual void noMoreInput() {
noMoreInput_ = true;
finishTrace();
}

/// Returns a RowVector with the result columns. Returns nullptr if
Expand Down Expand Up @@ -483,13 +482,7 @@ class Operator : public BaseRuntimeStatWriter {

/// Frees all resources associated with 'this'. No other methods
/// should be called after this.
virtual void close() {
input_ = nullptr;
results_.clear();
recordSpillStats();
// Release the unused memory reservation on close.
operatorCtx_->pool()->release();
}
virtual void close();

// Returns true if 'this' never has more output rows than input rows.
virtual bool isFilter() const {
Expand Down Expand Up @@ -781,7 +774,7 @@ class Operator : public BaseRuntimeStatWriter {

folly::Synchronized<OperatorStats> stats_;
folly::Synchronized<common::SpillStats> spillStats_;
std::unique_ptr<trace::QueryDataWriter> inputTracer_;
std::unique_ptr<trace::OperatorTraceWriter> inputTracer_;

/// Indicates if an operator is under a non-reclaimable execution section.
/// This prevents the memory arbitrator from reclaiming memory from this
Expand Down
Loading
Loading