-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
3 changed files
with
408 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,396 @@ | ||
--- | ||
slug: velox-query-tracing | ||
title: "Velox Query Tracing" | ||
authors: [duanmeng, xiaoxmeng] | ||
tags: [tech-blog, tracing] | ||
--- | ||
|
||
## Background | ||
|
||
The query trace tool helps analyze and debug query performance and correctness issues. It helps prevent | ||
interference from external noise in a production environment (such as storage, network, etc.) by allowing | ||
replay of a part of the query plan and dataset in an isolated environment, such as a local machine. | ||
This is much more efficient for query performance analysis and issue debugging, as it eliminates the need | ||
to replay the whole query in a production environment. | ||
|
||
## How Tracing Works | ||
|
||
The tracing process consists of two distinct phases: the tracing phase and the replaying phase. The | ||
tracing phase is executed within a production environment, while the replaying phase is conducted in | ||
a local development environment. | ||
|
||
**Tracing Phase** | ||
1. Trace replay required metadata, including the query plan fragment, query configuration, | ||
and connectorproperties, is recorded during the query task initiation. | ||
2. Throughout query processing, each traced operator logs the input vectors or splits | ||
storing them in a designated storage location. | ||
3. The metadata and splits are serialized in JSON format, and the operator data inputs are | ||
serialized using a Presto serializer. | ||
|
||
**Replaying Phase** | ||
1. Read and deserialize the recorded query plan, extract the traced plan node, and assemble a plan | ||
fragment with customized source and sink nodes. | ||
2. The source node reads the input from the serialized operator inputs on storage and the sink operator | ||
prints or logs out the execution stats. | ||
3. Build a task with the assembled plan fragment in step 1. Apply the recorded query configuration and | ||
connector properties to replay the task with the same input and configuration setup as in production. | ||
|
||
**NOTE**: The presto serialization might lose input vector encoding, such as lazy vector and nested dictionary | ||
encoding, which affects the operator’s execution. Hence, it might not always be the same as in production. | ||
|
||
<figure> | ||
<img src="/img/tracing.png" height= "100%" width="100%"/> | ||
</figure> | ||
|
||
## Tracing Framework | ||
|
||
The tracing framework consists of three components: | ||
1. **Query Trace Writers**: records the metadata, input data, and scan splits. | ||
2. **Query Trace Readers**: load the metadata, read input data, and scan splits. | ||
3. **Query Trace Replayers**: display query summaries or replay the execution of the target operator. | ||
|
||
### Trace Writers | ||
|
||
There are three types of writers: `TaskTraceMetadataWriter`, `OperatorTraceInputWriter`, | ||
and `OperatorTraceSplitWriter`. They are used in the prod or shadow environment to record | ||
the real execution data. | ||
|
||
**TaskTraceMetadataWriter** | ||
|
||
The `TaskTraceMetadataWriter` records the query metadata during task creation, serializes it, | ||
and saves it into a file in JSON format. There are two types of metadata: | ||
1. Query Configurations and Connector Properties: These are user-specified per query and can | ||
be serialized as JSON map objects (key-value pairs). | ||
2. Task Plan Fragment (aka Plan Node Tree): This can be serialized as a JSON object, a feature | ||
already supported in Velox (see #4614, #4301, and #4398). | ||
|
||
The metadata is saved as a single JSON object string in the metadata file. It would look similar | ||
to the following simplified, pretty-printed JSON string (with some content removed for brevity): | ||
|
||
```JSON | ||
{ | ||
"planNode":{ | ||
"nullAware": false, | ||
"outputType":{...}, | ||
"leftKeys":[...], | ||
"rightKeys":[...], | ||
"joinType":"INNER", | ||
"sources":[ | ||
{ | ||
"outputType":{...}, | ||
"tableHandle":{...}, | ||
"assignments":[...], | ||
"id":"0", | ||
"name":"TableScanNode" | ||
}, | ||
{ | ||
"outputType":{...}, | ||
"tableHandle":{...}, | ||
"assignments":[...], | ||
"id":"1", | ||
"name":"TableScanNode" | ||
} | ||
], | ||
"id":"2", | ||
"name":"HashJoinNode" | ||
}, | ||
"connectorProperties":{...}, | ||
"queryConfig":{"query_trace_node_ids":"2", ...} | ||
} | ||
``` | ||
|
||
**OperatorTraceInputWriter** | ||
|
||
The `OperatorTraceInputWriter` records the input vectors from the target operator, it uses a Presto | ||
serializer to serialize each vector batch and flush immediately to ensure that replay is possible | ||
even if a crash occurs during execution. | ||
|
||
It is created during the target operator's initialization and writes data in the `Operator::addInput` | ||
method during execution. It finishes when the target operator is closed. However, it can finish early | ||
if the recorded data size exceeds the limit specified by the user. | ||
|
||
**OperatorTraceSplitWriter** | ||
|
||
The `OperatorTraceSplitWriter` captures the input splits from the target `TableScan` operator. It | ||
serializes each split and immediately flushes it to ensure that replay is possible even if a crash | ||
occurs during execution. | ||
|
||
Each split is serialized as follows: | ||
|
||
`| length : uint32_t | split : JSON string | crc32 : uint32_t |` | ||
|
||
### Storage Location | ||
|
||
It is recommended to store traced data in a remote storage system to ensure its preservation and | ||
accessibility even if the computation clusters are reconfigured or encounter issues. This also | ||
helps prevent nodes in the cluster from failing due to local disk exhaustion. | ||
|
||
Users should start by creating a root directory. Writers will then create subdirectories within | ||
this root directory to organize the traced data. A well-designed directory structure will keep | ||
the data organized and accessible for replay and analysis. | ||
|
||
**Metadata Location** | ||
|
||
The `TaskTraceMetadataWriter` is set up during the task creation so it creates a trace directory | ||
named `$rootDir/$queryId/$taskId`. | ||
|
||
**Input Data and Split Location** | ||
|
||
The task generates Drivers and Operators, and each is identified by a set of IDs. Each Driver | ||
is assigned a pipeline ID and a driver ID. Pipeline IDs are sequential numbers starting from zero, | ||
and driver IDs are also sequential numbers starting from zero but are scoped to a specific pipeline, | ||
ensuring uniqueness within that pipeline. Additionally, each operator within a driver is assigned a | ||
sequential operator ID, starting from zero and unique within the driver. | ||
|
||
The node ID consolidates the tracing for the same tracing plan node. The pipeline ID isolates the | ||
tracing data between operators created from the same plan node (e.g., HashProbe and HashBuild from | ||
the HashJoinNode). The driver ID isolates the tracing data of peer operators in the same pipeline | ||
from different drivers. | ||
|
||
Correspondingly, to ensure the organized and isolated tracing data storage, the `OperatorTraceInputWriter` | ||
and `OpeartorTraceSplitWriter` are set up during the operator initialization and create a data or split | ||
tracing directory in $rootDir/$queryId$taskId/$nodeId/$pipelineId/$driverId. | ||
|
||
The following is a typical `HashJoinNode` traced metadata and data storage directory structure: | ||
|
||
```SHEL | ||
trace ---------------------------------------------------> rootDir | ||
└── query-1 -------------------------------------------> query ID | ||
└── task-1 ----------------------------------------> task ID | ||
├── 2 -----------------------------------------> node ID | ||
│ ├── 0 -------------------------> pipeline ID (probe) | ||
│ │ ├── 0 -------------------------> driver ID (0) | ||
│ │ │ ├── op_input_trace.data | ||
│ │ │ └── op_trace_summary.json | ||
│ │ └── 1 -------------------------> driver ID (1) | ||
│ │ ├── op_input_trace.data | ||
│ │ └── op_trace_summary.json | ||
│ └── 1 -------------------------> pipeline ID (build) | ||
│ ├── 0 ---------------------------> driver ID (0) | ||
│ │ ├── op_input_trace.data | ||
│ │ └── op_trace_summary.json | ||
│ └── 1 -------------------------> driver ID (1) | ||
│ ├── op_input_trace.data | ||
│ └── op_trace_summary.json | ||
└── task_trace_meta.json ----------------> query metadata | ||
``` | ||
|
||
### Memory Management | ||
|
||
Add a new leaf system pool named tracePool for tracing memory usage, and expose it | ||
like `memory::MemoryManager::getInstance()->tracePool()`. | ||
|
||
### Trace Readers | ||
|
||
Three types of readers correspond to the query trace writers: `TaskTraceMetadataReader`, | ||
`OperatorTraceInputReader`, and `OperatorTraceSplitReader`. The replayers typically use | ||
them in the local environment, which will be described in detail in the Query Trace Replayer section. | ||
|
||
**TaskTraceMetadataReader** | ||
|
||
The `TaskTraceMetadataReader` can load the query metadata JSON file and extract the query | ||
configurations, connector properties, and a plan fragment. The replayer uses these to build | ||
a replay task. | ||
|
||
**OperatorTraceInputReader** | ||
|
||
The `OperatorTraceInputReader` reads and deserializes the input vectors in a tracing data file. | ||
It is created and used by a `QueryTraceScan` operator which will be described in detail in | ||
the **Query Trace Scan** section. | ||
|
||
**OperatorTraceSplitReader** | ||
|
||
The `OperatorTraceSplitReader` reads and deserializes the input splits in tracing split info files, | ||
and produces a list of `exec::Split` for the query replay. | ||
|
||
### Trace Scan | ||
|
||
As outlined in the **How Tracing Works** section, replaying a non-leaf operator requires a | ||
specialized source operator. This operator is responsible for reading data records during the | ||
tracing phase and integrating with Velox’s `LocalPlanner` with a customized plan node and | ||
operator translator. | ||
|
||
**TraceScanNode** | ||
|
||
We introduce a customized ‘TraceScanNode’ to replay a non-leaf operator. This node acts as | ||
the source node and creates a specialized scan operator, known as `OperatorTraceScan` with | ||
one per driver during the replay. The `TraceScanNode` contains the trace directory for the | ||
designated trace node, the pipeline ID associated with it, and a driver ID list passed during | ||
the replaying by users so that the OperatorTraceScan can locate the right trace input data or | ||
split directory. | ||
|
||
**OperatorTraceScan** | ||
|
||
|
||
As described in the **Storage Location** section, a plan node may be split into multiple pipelines, | ||
each pipeline can be divided into multiple operators. Each operator corresponds to a driver, which | ||
is a thread of execution. There may be multiple tracing data files for a single plan node, one file | ||
per driver. | ||
|
||
|
||
To identify the correct input data file associated with a specific `OperatorTraceScan` operator, it | ||
leverages the trace node directory, pipeline ID, and driver ID list supplied by the TraceScanNode. | ||
|
||
|
||
During the replay process, it uses its own driver ID as an index to extract the replay driver ID from | ||
the driver ID list in the `TraceScanNode`. Along with the trace node directory and pipeline ID from | ||
the `TraceScanNode`, it locates its corresponding input data file. | ||
|
||
|
||
Correspondingly, an `OperatorTraceScan` operator uses a trace data file in | ||
**$rootDir/$queryId/$taskId/$nodeId/$pipelineId/$dirverId** to create an | ||
`OperatorTraceReader`. And `OperatorTraceScan::getOutput` method returns the vectors read by | ||
its `OperatorTraceInputReader`, which returns the vectors in the same sequence order as originally | ||
processed in the production execution. This ensures that the replaying maintains the same data flow | ||
as in the original production execution. | ||
|
||
### Query Trace Replayer | ||
|
||
The query trace replayer is typically used in the local environment and works as follows: | ||
1. Use `TaskTraceMetadataReader` to load traced query configurations, connector properties, | ||
and a plan fragment. | ||
2. Extract the target plan node from the plan fragment using the specified plan node ID. | ||
3. Use the target plan node in step 2 to create a replay plan node. Create a replay plan | ||
using `exec::test::PlanBuilder`. | ||
4. If the target plan node is a `TableScanNode` | ||
- Add the replay plan node to the replay plan as the source node. | ||
- Get all the traced splits using `OperatorInputSplitReader`. | ||
- Use the splits as inputs for task replaying. | ||
5. For a non-leaf operator, add a `QueryTraceScanNode` as the source node to the replay plan and | ||
then add the replay plan node. | ||
6. Use `exec::test::AssertQueryBuilder` to add the sink node, apply the query | ||
configurations (disable tracing), and connector properties, and execute the replay plan. | ||
|
||
The `OperatorReplayBase` provides the core functionality required for replaying an operator. | ||
It handles the retrieval of metadata, creation of the replay plan, and execution of the plan. | ||
Concrete operator replayers, such as `HashJoinReplayer` and `AggregationReplayer`, extend this | ||
base class and override the `createPlanNode` method to create the specific plan node. | ||
|
||
## Query Trace Tool Usage | ||
|
||
### How To Enable Tracing | ||
|
||
Here is the list of query configs to enable tracing in the first phase: | ||
|
||
|Property Name|Default Value|Description| | ||
|:-|:-:|:-| | ||
|query_trace_enabled|false|Enable query tracing flag.| | ||
|query_trace_dir|""|The root directory of a query to store tracing data.| | ||
|query_trace_task_reg_exp|""|The regexp of traced task id. We only enable trace on a task if its ID matches.| | ||
|query_trace_node_ids|""|A comma-separated list of plan node IDs whose input data will be traced. Empty string if only want to trace the query metadata.| | ||
|query_trace_max_bytes|0|The max trace bytes limit. Tracing is disabled if zero.| | ||
|
||
### How To Replay | ||
|
||
After the traced query finishes, its metadata and the input data for the target tasks and operators | ||
are all saved in the directory specified by `query_trace_dir`. | ||
|
||
To get a glance at the traced task, we can execute the following command: | ||
|
||
```SHELL | ||
velox_query_replayer --root_dir /trace_root --task_id task-4 --summary | ||
``` | ||
|
||
It would show something as follows: | ||
|
||
```SHELL | ||
++++++Query trace summary++++++ | ||
Number of tasks: 1 | ||
|
||
++++++Query configs++++++ | ||
query_trace_task_reg_exp: .* | ||
query_trace_node_ids: 2 | ||
query_trace_max_bytes: 107374182400 | ||
query_trace_dir: /tmp/velox_test_aJqeFd/basic/traceRoot/ | ||
query_trace_enabled: 1 | ||
|
||
++++++Connector configs++++++ | ||
|
||
++++++Task query plan++++++ | ||
-- HashJoin[2][INNER t0=u0] -> t0:BIGINT, t1:VARCHAR, t2:SMALLINT, t3:REAL, u0:BIGINT, u1:INTEGER, u2:SMALLINT, u3:VARCHAR | ||
-- TableScan[0][table: hive_table] -> t0:BIGINT, t1:VARCHAR, t2:SMALLINT, t3:REAL | ||
-- TableScan[1][table: hive_table] -> u0:BIGINT, u1:INTEGER, u2:SMALLINT, u3:VARCHAR | ||
|
||
++++++Task Summaries++++++ | ||
|
||
++++++Task task-1++++++ | ||
|
||
++++++Pipeline 2++++++ | ||
driver 0: opType HashProbe, inputRows 70720, peakMemory 108.00KB | ||
driver 1: opType HashProbe, inputRows 70720, peakMemory 108.00KB | ||
|
||
++++++Pipeline 3++++++ | ||
driver 0: opType HashBuild, inputRows 48000, peakMemory 4.51MB | ||
driver 1: opType HashBuild, inputRows 48000, peakMemory 2.25MB | ||
``` | ||
|
||
Then we can execute an assembled query plan with the traced plan node using the following command in | ||
the terminal or use the same gflags in your IDE to debug. | ||
|
||
```SHELL | ||
velox_query_replayer --root_dir /Users/bytedance/work/native/trace --query_id query-1 --task_id task-1 --node_id 2 | ||
``` | ||
|
||
```SHELL | ||
Stats of replaying operator HashBuild : Output: 0 rows (0B, 0 batches), Cpu time: 48.63us, Wall time: 65.22us, Blocked wall time: 24.08ms, Peak memory: 4.51MB, Memory allocations: 16, Threads: 2, CPU breakdown: B/I/O/F (23.79us/0ns/14.46us/10.38us) | ||
|
||
Stats of replaying operator HashProbe : Output: 13578240 rows (1.17GB, 13280 batches), Cpu time: 3.99s, Wall time: 4.01s, Blocked wall time: 98.58ms, Peak memory: 108.00KB, Memory allocations: 12534, Threads: 2, CPU breakdown: B/I/O/F (8.52ms/1.59s/2.39s/20.29us) | ||
|
||
Memory usage: TaskCursorQuery_0 usage 0B reserved 0B peak 10.00MB | ||
task.test_cursor 1 usage 0B reserved 0B peak 10.00MB | ||
node.2 usage 0B reserved 0B peak 0B | ||
op.2.1.1.OperatorTraceScan usage 0B reserved 0B peak 0B | ||
op.2.1.0.OperatorTraceScan usage 0B reserved 0B peak 0B | ||
node.N/A usage 0B reserved 0B peak 0B | ||
op.N/A.0.1.CallbackSink usage 0B reserved 0B peak 0B | ||
op.N/A.0.0.CallbackSink usage 0B reserved 0B peak 0B | ||
node.1 usage 0B reserved 0B peak 10.00MB | ||
op.1.1.0.HashBuild usage 0B reserved 0B peak 4.51MB | ||
op.1.0.1.HashProbe usage 0B reserved 0B peak 108.00KB | ||
op.1.1.1.HashBuild usage 0B reserved 0B peak 2.25MB | ||
op.1.0.0.HashProbe usage 0B reserved 0B peak 108.00KB | ||
node.0 usage 0B reserved 0B peak 0B | ||
op.0.0.1.OperatorTraceScan usage 0B reserved 0B peak 0B | ||
op.0.0.0.OperatorTraceScan usage 0B reserved 0B peak 0B | ||
|
||
``` | ||
|
||
We can replay a subset of drivers from the traced operator. For instance, if the traced operator | ||
has 16 drivers during the tracing phase, we can selectively replay a specific driver by passing | ||
--driver_ids, for example, `--driver_ids 1,3,5`. | ||
|
||
Here is a full list of supported command line arguments: | ||
|
||
|Flag|Default Value|Description| | ||
|:-|:-:|:-| | ||
|root_dir|""|The root directory where the replayer is reading the traced data, must be set.| | ||
|summary|false|Show the summary of the tracing including numbers of tasks and task IDs. It also prints the query metadata including query configs, connectors properties, and query plan in JSON format.| | ||
|query_id|""|Specify the target query ID, it must be set.| | ||
|task_id|""|Specify the target task ID, it must be set.| | ||
|node_id|""|Specify the target node ID, it must be set.| | ||
|driver_ids|""|Specify the target driver IDs to replay.| | ||
|shuffle_serialization_format|0|Specify the shuffle serialization format.| | ||
|table_writer_output_dir|""|Specify the output directory of TableWriter.| | ||
|hiveConnectorExecutorHwMultiplier|2.0|Hardware multiplier for hive connector.| | ||
|
||
## Future Work | ||
|
||
- Add support for more operators | ||
- Customize the replay task execution instead of AssertQueryBuilder | ||
- Supports IcebergHiveConnector | ||
- Add trace replay for an entire pipeline | ||
|
||
## References | ||
|
||
- https://github.com/facebookincubator/velox/issues/9668 | ||
- https://facebookincubator.github.io/velox/develop/task.html | ||
- https://github.com/prestodb/presto/blob/e80152c74a5d2453116db2d173bafdd0db748c86/presto-native-execution/presto_cpp/main/TaskManager.cpp#L379 | ||
- https://github.com/prestodb/presto/blob/e80152c74a5d2453116db2d173bafdd0db748c86/presto-native-execution/presto_cpp/main/TaskManager.cpp#L58 | ||
- https://github.com/prestodb/presto/blob/master/presto-native-execution/presto_cpp/main/TaskResource.cpp#L88 | ||
- https://github.com/prestodb/presto/blob/master/presto-native-execution/presto_cpp/main/QueryContextManager.cpp#L153 | ||
- https://github.com/prestodb/presto/blob/master/presto-native-execution/presto_cpp/main/TaskManager.cpp#L426 | ||
- https://github.com/prestodb/presto/blob/master/presto-native-execution/presto_cpp/main/TaskManager.cpp#L460 | ||
- https://github.com/facebookincubator/velox/blob/5c4903fe2c1f640094295141206f23fe9a54c218/velox/core/PlanNode.cpp#L2223 | ||
- https://github.com/prestodb/presto/blob/master/presto-native-execution/presto_cpp/main/common/ConfigReader.h | ||
- https://prestodb.io/docs/current/develop/serialized-page.html |
Oops, something went wrong.