Skip to content

Commit

Permalink
Add query replayer (facebookincubator#10897)
Browse files Browse the repository at this point in the history
Summary:
Velox can record the query metadata (query plan and configs)
during task creation and input vectors of the traced operator,
see facebookincubator#10774 and facebookincubator#10815.

This PR adds a query replayer, it can be used to replay a query locally
using the metadata and input vectors from the production environment.
It supports showing the summary of a query at present, and more traced
operators' replaying supports will be added in the future.

Also, this PR adds two query configs `query_trace_max_bytes` and
`query_trace_task_reg_exp` to constraint the record input data size
and trace tasks respectively to ensure the stability of the cluster
in the prod.

Part of facebookincubator#9668

Pull Request resolved: facebookincubator#10897

Reviewed By: tanjialiang

Differential Revision: D62336733

Pulled By: xiaoxmeng

fbshipit-source-id: d196738dfa92c29fe5de67a944f652a328903814
  • Loading branch information
duanmeng authored and facebook-github-bot committed Sep 24, 2024
1 parent a95acb7 commit cc46d81
Show file tree
Hide file tree
Showing 24 changed files with 681 additions and 53 deletions.
4 changes: 4 additions & 0 deletions velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,7 @@ endif()
if(${VELOX_ENABLE_SUBSTRAIT})
add_subdirectory(substrait)
endif()

if(${VELOX_BUILD_TESTING})
add_subdirectory(tool)
endif()
17 changes: 17 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,14 @@ class QueryConfig {
/// Empty string if only want to trace the query metadata.
static constexpr const char* kQueryTraceNodeIds = "query_trace_node_ids";

/// The max trace bytes limit. Tracing is disabled if zero.
static constexpr const char* kQueryTraceMaxBytes = "query_trace_max_bytes";

/// The regexp of traced task id. We only enable trace on a task if its id
/// matches.
static constexpr const char* kQueryTraceTaskRegExp =
"query_trace_task_reg_exp";

/// Disable optimization in expression evaluation to peel common dictionary
/// layer from inputs.
static constexpr const char* kDebugDisableExpressionWithPeeling =
Expand Down Expand Up @@ -689,6 +697,15 @@ class QueryConfig {
return get<std::string>(kQueryTraceNodeIds, "");
}

uint64_t queryTraceMaxBytes() const {
return get<uint64_t>(kQueryTraceMaxBytes, 0);
}

std::string queryTraceTaskRegExp() const {
// The default query trace task regexp, empty by default.
return get<std::string>(kQueryTraceTaskRegExp, "");
}

bool prestoArrayAggIgnoreNulls() const {
return get<bool>(kPrestoArrayAggIgnoreNulls, false);
}
Expand Down
12 changes: 12 additions & 0 deletions velox/core/QueryCtx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,18 @@ void QueryCtx::updateSpilledBytesAndCheckLimit(uint64_t bytes) {
}
}

bool QueryCtx::updateTracedBytesAndCheckLimit(uint64_t bytes) {
if (numTracedBytes_.fetch_add(bytes) + bytes <
queryConfig_.queryTraceMaxBytes()) {
return false;
}

numTracedBytes_.fetch_sub(bytes);
LOG(WARNING) << "Query exceeded trace limit of "
<< succinctBytes(queryConfig_.queryTraceMaxBytes());
return true;
}

std::unique_ptr<memory::MemoryReclaimer> QueryCtx::MemoryReclaimer::create(
QueryCtx* queryCtx,
memory::MemoryPool* pool) {
Expand Down
9 changes: 7 additions & 2 deletions velox/core/QueryCtx.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,14 @@ class QueryCtx : public std::enable_shared_from_this<QueryCtx> {
/// the memory arbiration finishes.
bool checkUnderArbitration(ContinueFuture* future);

/// Updates the aggregated spill bytes of this query, and and throws if
/// exceeds the max spill bytes limit.
/// Updates the aggregated spill bytes of this query, and throws if exceeds
/// the max spill bytes limit.
void updateSpilledBytesAndCheckLimit(uint64_t bytes);

/// Updates the aggregated trace bytes of this query, and return true if
/// exceeds the max query trace bytes limit.
bool updateTracedBytesAndCheckLimit(uint64_t bytes);

void testingOverrideMemoryPool(std::shared_ptr<memory::MemoryPool> pool) {
pool_ = std::move(pool);
}
Expand Down Expand Up @@ -216,6 +220,7 @@ class QueryCtx : public std::enable_shared_from_this<QueryCtx> {
std::shared_ptr<memory::MemoryPool> pool_;
QueryConfig queryConfig_;
std::atomic<uint64_t> numSpilledBytes_{0};
std::atomic<uint64_t> numTracedBytes_{0};

mutable std::mutex mutex_;
// Indicates if this query is under memory arbitration or not.
Expand Down
8 changes: 8 additions & 0 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -737,3 +737,11 @@ Tracing
-
- A comma-separated list of plan node ids whose input data will be trace. If it is empty, then we only trace the
query metadata which includes the query plan and configs etc.
* - query_trace_task_reg_exp
- string
-
- The regexp of traced task id. We only enable trace on a task if its id matches.
* - query_trace_max_bytes
- integer
- 0
- The max trace bytes limit. Tracing is disabled if zero.
1 change: 1 addition & 0 deletions velox/docs/develop/debugging.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ Debugging Tools
debugging/print-expr-with-stats
debugging/vector-saver
debugging/metrics
debugging/tracing.rst
144 changes: 144 additions & 0 deletions velox/docs/develop/debugging/tracing.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
=======
Tracing
=======


Background
----------

The query trace tool helps analyze and debug query performance and correctness
issues. It helps prevent interference from the test noises in a production
environment (such as storage, network etc) by allowing replay of a part of the
query plan and data set in an isolated environment such as a local machine.
This is useful for debugging query performance debugging as we don't have to
replay the whole query in a production environment.

How Tracing Tool Works
----------------------

The tracing process consists of two phases: tracing and replaying.

**Tracing Phase**

- When the query starts, the task records the metadata including query plan fragment,
query configuration, and connector properties.
- During the query execution, each traced operator records the input vectors and saves
in the specified storage location.
- The metadata are serialized using json format and operator data inputs are serialized
using `Presto serializer <https://prestodb.io/docs/current/develop/serialized-page.html>`_.

**Replaying Phase**

1. Read and deserialize the recorded query plan in tracing phase, extract the target plan
node, and assemble a plan fragment with a customized source and sink nodes. The source
node reads the input from the serialized operator inputs on storage and sink operator
prints or logs out the execution stats.
2. Build a task with the assembled plan fragment in step 1.
3. Apply the recorded query configuration and connector properties to replay the task with
the same input and configuration as in production.

**NOTE**: The Presto serialization doesn't always preserve vector encoding (lazy vectors are
loaded, nested dictionaries are flattened). Hence, replay may differ from the original run.

.. image:: ../images/trace-arch.png
:width: 600
:align: center

Tracing Framework
-----------------

The tracing framework consists of three components:

1. **Metadata and Data Writer**: metadata writer and the data writer.
2. **Metadata and Data Reader**: metadata reader and the data reader.
3. **Query Trace Replayer**: display query summaries or replay the
execution of the target operator.

Query Trace Writer
^^^^^^^^^^^^^^^^^^

**QueryMetadataWriter** records the query metadata during task creation,
serializes, and writes them into a file in JSON format. There are two kinds
of metadata:

- Query configurations and connector properties are specified by the user per query.
They can be serialized as JSON map objects (key-value pairs).
- Plan fragment of the task (also known as a plan node tree). It can be serialized
as a JSON object, which is already supported in Velox.

**QueryDataWriter** records the input vectors from the target operator, which are
serialized and written as a binary file.

Query Trace Reader
^^^^^^^^^^^^^^^^^^

**QueryMetadataReader** can load the query metadata JSON file, and extract the query
configurations, connector properties, and the plan fragment.

**QueryDataReader** can read and deserialize the input vectors of the target operator.
It is used as the utility to replay the input data as a source operator in the target
operator replay.

**NOTE**: `QueryDataWriter` serializes and flushes the input vectors in batches,
allowing it to replay the input process using the same sequence of batches.

Query Trace Util
^^^^^^^^^^^^^^^^

- Create tracing directories.
- Get query summaries.
- Provide utilities to extract the target plan node, and assemble a plan fragment with
customized trace source and sink node with it.

Query Trace Tool
----------------

The query trace tool leverages the trace reader to display query summaries and replay the
execution of specific operators (TBD).

Tracing tools Usage
^^^^^^^^^^^^^^^^^^^

.. code-block:: c++

query_replayer --root $root_dir --summary --pretty


It would show something as the follows

.. code-block:: c++

++++++Query trace summary++++++
Number of tasks: 1
Task ids: task-1
++++++Query configs and plan:++++++
{
"planNode":{
"filter":{},
"outputType":{...},
"nullAware":false,
"sources":[{...}, {...}],
"leftKeys":[],
"joinType":"INNER",
"id":"5",
"name":"HashJoinNode"
},
"connectorProperties":{...},
"queryConfig":{...}
}


Here is a full list of supported command line arguments.

* ``--usage``: Show the usage.
* ``--root``: Root dir of the query tracing.
* ``--summary``: Show the summary of the tracing including number of tasks and task ids.
It also print the query metadata including query configs, connectors properties, and query plan in JSON format.
* ``--short_summary``: Only show number of tasks and task ids.
* ``--task_id``: Specify the target task id, if empty, show the summary of all the traced query tasks.


Future Work
-----------

https://github.com/facebookincubator/velox/issues/9668
Binary file added velox/docs/develop/images/trace-arch.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
18 changes: 17 additions & 1 deletion velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2848,6 +2848,14 @@ std::optional<trace::QueryTraceConfig> Task::maybeMakeTraceConfig() const {
!queryConfig.queryTraceDir().empty(),
"Query trace enabled but the trace dir is not set");

VELOX_USER_CHECK(
!queryConfig.queryTraceTaskRegExp().empty(),
"Query trace enabled but the trace task regexp is not set");

if (!RE2::FullMatch(taskId_, queryConfig.queryTraceTaskRegExp())) {
return std::nullopt;
}

const auto queryTraceNodes = queryConfig.queryTraceNodeIds();
if (queryTraceNodes.empty()) {
return trace::QueryTraceConfig(queryConfig.queryTraceDir());
Expand All @@ -2858,8 +2866,16 @@ std::optional<trace::QueryTraceConfig> Task::maybeMakeTraceConfig() const {
std::unordered_set<std::string> nodeSet(nodes.begin(), nodes.end());
VELOX_CHECK_EQ(nodeSet.size(), nodes.size());
LOG(INFO) << "Query trace plan node ids: " << queryTraceNodes;

trace::UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB =
[this](uint64_t bytes) {
return queryCtx_->updateTracedBytesAndCheckLimit(bytes);
};
return trace::QueryTraceConfig(
std::move(nodeSet), queryConfig.queryTraceDir());
std::move(nodeSet),
queryConfig.queryTraceDir(),
std::move(updateAndCheckTraceLimitCB),
queryConfig.queryTraceTaskRegExp());
}

void Task::maybeInitQueryTrace() {
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/trace/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ velox_link_libraries(
velox_presto_serializer)

velox_add_library(velox_query_trace_retrieve QueryDataReader.cpp
QueryMetadataReader.cpp)
QueryMetadataReader.cpp QueryTraceUtil.cpp)

velox_link_libraries(
velox_query_trace_retrieve
Expand Down
33 changes: 26 additions & 7 deletions velox/exec/trace/QueryDataWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/

#include "velox/exec/trace/QueryDataWriter.h"

#include <utility>
#include "velox/common/base/SpillStats.h"
#include "velox/common/file/File.h"
#include "velox/common/file/FileSystems.h"
Expand All @@ -24,17 +26,23 @@
namespace facebook::velox::exec::trace {

QueryDataWriter::QueryDataWriter(
const std::string& path,
memory::MemoryPool* pool)
: dirPath_(path),
std::string path,
memory::MemoryPool* pool,
UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB)
: dirPath_(std::move(path)),
fs_(filesystems::getFileSystem(dirPath_, nullptr)),
pool_(pool) {
pool_(pool),
updateAndCheckTraceLimitCB_(std::move(updateAndCheckTraceLimitCB)) {
dataFile_ = fs_->openFileForWrite(
fmt::format("{}/{}", dirPath_, QueryTraceTraits::kDataFileName));
VELOX_CHECK_NOT_NULL(dataFile_);
}

void QueryDataWriter::write(const RowVectorPtr& rows) {
if (FOLLY_UNLIKELY(finished_)) {
return;
}

if (batch_ == nullptr) {
batch_ = std::make_unique<VectorStreamGroup>(pool_);
batch_->createStreamTree(
Expand All @@ -51,24 +59,35 @@ void QueryDataWriter::write(const RowVectorPtr& rows) {
batch_->flush(&out);
batch_->clear();
auto iobuf = out.getIOBuf();
if (FOLLY_UNLIKELY(
updateAndCheckTraceLimitCB_(iobuf->computeChainDataLength()))) {
finish(true);
return;
}
dataFile_->append(std::move(iobuf));
}

void QueryDataWriter::finish() {
void QueryDataWriter::finish(bool limitExceeded) {
if (finished_) {
return;
}

VELOX_CHECK_NOT_NULL(
dataFile_, "The query data writer has already been finished");
dataFile_->close();
dataFile_.reset();
batch_.reset();
writeSummary();
writeSummary(limitExceeded);
finished_ = true;
}

void QueryDataWriter::writeSummary() const {
void QueryDataWriter::writeSummary(bool limitExceeded) const {
const auto summaryFilePath =
fmt::format("{}/{}", dirPath_, QueryTraceTraits::kDataSummaryFileName);
const auto file = fs_->openFileForWrite(summaryFilePath);
folly::dynamic obj = folly::dynamic::object;
obj[QueryTraceTraits::kDataTypeKey] = dataType_->serialize();
obj[QueryTraceTraits::kTraceLimitExceededKey] = limitExceeded;
file->append(folly::toJson(obj));
file->close();
}
Expand Down
Loading

0 comments on commit cc46d81

Please sign in to comment.