diff --git a/velox/CMakeLists.txt b/velox/CMakeLists.txt index 064917ef60ed..00c969ccce7b 100644 --- a/velox/CMakeLists.txt +++ b/velox/CMakeLists.txt @@ -72,3 +72,7 @@ endif() if(${VELOX_ENABLE_SUBSTRAIT}) add_subdirectory(substrait) endif() + +if(${VELOX_BUILD_TESTING}) + add_subdirectory(tool) +endif() diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 304e1fd370bb..ee71b1e86bc7 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -357,6 +357,14 @@ class QueryConfig { /// Empty string if only want to trace the query metadata. static constexpr const char* kQueryTraceNodeIds = "query_trace_node_ids"; + /// The max trace bytes limit. Tracing is disabled if zero. + static constexpr const char* kQueryTraceMaxBytes = "query_trace_max_bytes"; + + /// The regexp of traced task id. We only enable trace on a task if its id + /// matches. + static constexpr const char* kQueryTraceTaskRegExp = + "query_trace_task_reg_exp"; + /// Disable optimization in expression evaluation to peel common dictionary /// layer from inputs. static constexpr const char* kDebugDisableExpressionWithPeeling = @@ -689,6 +697,15 @@ class QueryConfig { return get(kQueryTraceNodeIds, ""); } + uint64_t queryTraceMaxBytes() const { + return get(kQueryTraceMaxBytes, 0); + } + + std::string queryTraceTaskRegExp() const { + // The default query trace task regexp, empty by default. + return get(kQueryTraceTaskRegExp, ""); + } + bool prestoArrayAggIgnoreNulls() const { return get(kPrestoArrayAggIgnoreNulls, false); } diff --git a/velox/core/QueryCtx.cpp b/velox/core/QueryCtx.cpp index e0392252ee74..ee238c081800 100644 --- a/velox/core/QueryCtx.cpp +++ b/velox/core/QueryCtx.cpp @@ -87,6 +87,18 @@ void QueryCtx::updateSpilledBytesAndCheckLimit(uint64_t bytes) { } } +bool QueryCtx::updateTracedBytesAndCheckLimit(uint64_t bytes) { + if (numTracedBytes_.fetch_add(bytes) + bytes < + queryConfig_.queryTraceMaxBytes()) { + return false; + } + + numTracedBytes_.fetch_sub(bytes); + LOG(WARNING) << "Query exceeded trace limit of " + << succinctBytes(queryConfig_.queryTraceMaxBytes()); + return true; +} + std::unique_ptr QueryCtx::MemoryReclaimer::create( QueryCtx* queryCtx, memory::MemoryPool* pool) { diff --git a/velox/core/QueryCtx.h b/velox/core/QueryCtx.h index 89ff229e3a48..c6cc96a3caf2 100644 --- a/velox/core/QueryCtx.h +++ b/velox/core/QueryCtx.h @@ -118,10 +118,14 @@ class QueryCtx : public std::enable_shared_from_this { /// the memory arbiration finishes. bool checkUnderArbitration(ContinueFuture* future); - /// Updates the aggregated spill bytes of this query, and and throws if - /// exceeds the max spill bytes limit. + /// Updates the aggregated spill bytes of this query, and throws if exceeds + /// the max spill bytes limit. void updateSpilledBytesAndCheckLimit(uint64_t bytes); + /// Updates the aggregated trace bytes of this query, and return true if + /// exceeds the max query trace bytes limit. + bool updateTracedBytesAndCheckLimit(uint64_t bytes); + void testingOverrideMemoryPool(std::shared_ptr pool) { pool_ = std::move(pool); } @@ -216,6 +220,7 @@ class QueryCtx : public std::enable_shared_from_this { std::shared_ptr pool_; QueryConfig queryConfig_; std::atomic numSpilledBytes_{0}; + std::atomic numTracedBytes_{0}; mutable std::mutex mutex_; // Indicates if this query is under memory arbitration or not. diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index bdcd28d054a8..799a6cfe1823 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -737,3 +737,11 @@ Tracing - - A comma-separated list of plan node ids whose input data will be trace. If it is empty, then we only trace the query metadata which includes the query plan and configs etc. + * - query_trace_task_reg_exp + - string + - + - The regexp of traced task id. We only enable trace on a task if its id matches. + * - query_trace_max_bytes + - integer + - 0 + - The max trace bytes limit. Tracing is disabled if zero. diff --git a/velox/docs/develop/debugging.rst b/velox/docs/develop/debugging.rst index 4b91e019b6b3..aa48a2ae6f48 100644 --- a/velox/docs/develop/debugging.rst +++ b/velox/docs/develop/debugging.rst @@ -9,3 +9,4 @@ Debugging Tools debugging/print-expr-with-stats debugging/vector-saver debugging/metrics + debugging/tracing.rst diff --git a/velox/docs/develop/debugging/tracing.rst b/velox/docs/develop/debugging/tracing.rst new file mode 100644 index 000000000000..1750296503e1 --- /dev/null +++ b/velox/docs/develop/debugging/tracing.rst @@ -0,0 +1,144 @@ +======= +Tracing +======= + + +Background +---------- + +The query trace tool helps analyze and debug query performance and correctness +issues. It helps prevent interference from the test noises in a production +environment (such as storage, network etc) by allowing replay of a part of the +query plan and data set in an isolated environment such as a local machine. +This is useful for debugging query performance debugging as we don't have to +replay the whole query in a production environment. + +How Tracing Tool Works +---------------------- + +The tracing process consists of two phases: tracing and replaying. + +**Tracing Phase** + +- When the query starts, the task records the metadata including query plan fragment, + query configuration, and connector properties. +- During the query execution, each traced operator records the input vectors and saves + in the specified storage location. +- The metadata are serialized using json format and operator data inputs are serialized + using `Presto serializer `_. + +**Replaying Phase** + +1. Read and deserialize the recorded query plan in tracing phase, extract the target plan + node, and assemble a plan fragment with a customized source and sink nodes. The source + node reads the input from the serialized operator inputs on storage and sink operator + prints or logs out the execution stats. +2. Build a task with the assembled plan fragment in step 1. +3. Apply the recorded query configuration and connector properties to replay the task with + the same input and configuration as in production. + +**NOTE**: The Presto serialization doesn't always preserve vector encoding (lazy vectors are +loaded, nested dictionaries are flattened). Hence, replay may differ from the original run. + +.. image:: ../images/trace-arch.png + :width: 600 + :align: center + +Tracing Framework +----------------- + +The tracing framework consists of three components: + +1. **Metadata and Data Writer**: metadata writer and the data writer. +2. **Metadata and Data Reader**: metadata reader and the data reader. +3. **Query Trace Replayer**: display query summaries or replay the + execution of the target operator. + +Query Trace Writer +^^^^^^^^^^^^^^^^^^ + +**QueryMetadataWriter** records the query metadata during task creation, +serializes, and writes them into a file in JSON format. There are two kinds +of metadata: + +- Query configurations and connector properties are specified by the user per query. + They can be serialized as JSON map objects (key-value pairs). +- Plan fragment of the task (also known as a plan node tree). It can be serialized + as a JSON object, which is already supported in Velox. + +**QueryDataWriter** records the input vectors from the target operator, which are +serialized and written as a binary file. + +Query Trace Reader +^^^^^^^^^^^^^^^^^^ + +**QueryMetadataReader** can load the query metadata JSON file, and extract the query +configurations, connector properties, and the plan fragment. + +**QueryDataReader** can read and deserialize the input vectors of the target operator. +It is used as the utility to replay the input data as a source operator in the target +operator replay. + +**NOTE**: `QueryDataWriter` serializes and flushes the input vectors in batches, +allowing it to replay the input process using the same sequence of batches. + +Query Trace Util +^^^^^^^^^^^^^^^^ + +- Create tracing directories. +- Get query summaries. +- Provide utilities to extract the target plan node, and assemble a plan fragment with + customized trace source and sink node with it. + +Query Trace Tool +---------------- + +The query trace tool leverages the trace reader to display query summaries and replay the +execution of specific operators (TBD). + +Tracing tools Usage +^^^^^^^^^^^^^^^^^^^ + +.. code-block:: c++ + + query_replayer --root $root_dir --summary --pretty + + +It would show something as the follows + +.. code-block:: c++ + + ++++++Query trace summary++++++ + Number of tasks: 1 + Task ids: task-1 + ++++++Query configs and plan:++++++ + { + "planNode":{ + "filter":{}, + "outputType":{...}, + "nullAware":false, + "sources":[{...}, {...}], + "leftKeys":[], + "joinType":"INNER", + "id":"5", + "name":"HashJoinNode" + }, + "connectorProperties":{...}, + "queryConfig":{...} + } + + +Here is a full list of supported command line arguments. + +* ``--usage``: Show the usage. +* ``--root``: Root dir of the query tracing. +* ``--summary``: Show the summary of the tracing including number of tasks and task ids. + It also print the query metadata including query configs, connectors properties, and query plan in JSON format. +* ``--short_summary``: Only show number of tasks and task ids. +* ``--task_id``: Specify the target task id, if empty, show the summary of all the traced query tasks. + + +Future Work +----------- + +https://github.com/facebookincubator/velox/issues/9668 diff --git a/velox/docs/develop/images/trace-arch.png b/velox/docs/develop/images/trace-arch.png new file mode 100644 index 000000000000..3a82224b37af Binary files /dev/null and b/velox/docs/develop/images/trace-arch.png differ diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 6c108dafd196..66083ceb5d3e 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -2848,6 +2848,14 @@ std::optional Task::maybeMakeTraceConfig() const { !queryConfig.queryTraceDir().empty(), "Query trace enabled but the trace dir is not set"); + VELOX_USER_CHECK( + !queryConfig.queryTraceTaskRegExp().empty(), + "Query trace enabled but the trace task regexp is not set"); + + if (!RE2::FullMatch(taskId_, queryConfig.queryTraceTaskRegExp())) { + return std::nullopt; + } + const auto queryTraceNodes = queryConfig.queryTraceNodeIds(); if (queryTraceNodes.empty()) { return trace::QueryTraceConfig(queryConfig.queryTraceDir()); @@ -2858,8 +2866,16 @@ std::optional Task::maybeMakeTraceConfig() const { std::unordered_set nodeSet(nodes.begin(), nodes.end()); VELOX_CHECK_EQ(nodeSet.size(), nodes.size()); LOG(INFO) << "Query trace plan node ids: " << queryTraceNodes; + + trace::UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB = + [this](uint64_t bytes) { + return queryCtx_->updateTracedBytesAndCheckLimit(bytes); + }; return trace::QueryTraceConfig( - std::move(nodeSet), queryConfig.queryTraceDir()); + std::move(nodeSet), + queryConfig.queryTraceDir(), + std::move(updateAndCheckTraceLimitCB), + queryConfig.queryTraceTaskRegExp()); } void Task::maybeInitQueryTrace() { diff --git a/velox/exec/trace/CMakeLists.txt b/velox/exec/trace/CMakeLists.txt index f8cc53e08ae4..555c0e270dd1 100644 --- a/velox/exec/trace/CMakeLists.txt +++ b/velox/exec/trace/CMakeLists.txt @@ -31,7 +31,7 @@ velox_link_libraries( velox_presto_serializer) velox_add_library(velox_query_trace_retrieve QueryDataReader.cpp - QueryMetadataReader.cpp) + QueryMetadataReader.cpp QueryTraceUtil.cpp) velox_link_libraries( velox_query_trace_retrieve diff --git a/velox/exec/trace/QueryDataWriter.cpp b/velox/exec/trace/QueryDataWriter.cpp index 57dc4284cd10..7db47fb31795 100644 --- a/velox/exec/trace/QueryDataWriter.cpp +++ b/velox/exec/trace/QueryDataWriter.cpp @@ -15,6 +15,8 @@ */ #include "velox/exec/trace/QueryDataWriter.h" + +#include #include "velox/common/base/SpillStats.h" #include "velox/common/file/File.h" #include "velox/common/file/FileSystems.h" @@ -24,17 +26,23 @@ namespace facebook::velox::exec::trace { QueryDataWriter::QueryDataWriter( - const std::string& path, - memory::MemoryPool* pool) - : dirPath_(path), + std::string path, + memory::MemoryPool* pool, + UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB) + : dirPath_(std::move(path)), fs_(filesystems::getFileSystem(dirPath_, nullptr)), - pool_(pool) { + pool_(pool), + updateAndCheckTraceLimitCB_(std::move(updateAndCheckTraceLimitCB)) { dataFile_ = fs_->openFileForWrite( fmt::format("{}/{}", dirPath_, QueryTraceTraits::kDataFileName)); VELOX_CHECK_NOT_NULL(dataFile_); } void QueryDataWriter::write(const RowVectorPtr& rows) { + if (FOLLY_UNLIKELY(finished_)) { + return; + } + if (batch_ == nullptr) { batch_ = std::make_unique(pool_); batch_->createStreamTree( @@ -51,24 +59,35 @@ void QueryDataWriter::write(const RowVectorPtr& rows) { batch_->flush(&out); batch_->clear(); auto iobuf = out.getIOBuf(); + if (FOLLY_UNLIKELY( + updateAndCheckTraceLimitCB_(iobuf->computeChainDataLength()))) { + finish(true); + return; + } dataFile_->append(std::move(iobuf)); } -void QueryDataWriter::finish() { +void QueryDataWriter::finish(bool limitExceeded) { + if (finished_) { + return; + } + VELOX_CHECK_NOT_NULL( dataFile_, "The query data writer has already been finished"); dataFile_->close(); dataFile_.reset(); batch_.reset(); - writeSummary(); + writeSummary(limitExceeded); + finished_ = true; } -void QueryDataWriter::writeSummary() const { +void QueryDataWriter::writeSummary(bool limitExceeded) const { const auto summaryFilePath = fmt::format("{}/{}", dirPath_, QueryTraceTraits::kDataSummaryFileName); const auto file = fs_->openFileForWrite(summaryFilePath); folly::dynamic obj = folly::dynamic::object; obj[QueryTraceTraits::kDataTypeKey] = dataType_->serialize(); + obj[QueryTraceTraits::kTraceLimitExceededKey] = limitExceeded; file->append(folly::toJson(obj)); file->close(); } diff --git a/velox/exec/trace/QueryDataWriter.h b/velox/exec/trace/QueryDataWriter.h index 435a0bccc00f..d0e6018f5837 100644 --- a/velox/exec/trace/QueryDataWriter.h +++ b/velox/exec/trace/QueryDataWriter.h @@ -16,6 +16,7 @@ #pragma once +#include "QueryTraceConfig.h" #include "velox/common/file/File.h" #include "velox/common/file/FileSystems.h" #include "velox/exec/trace/QueryTraceTraits.h" @@ -28,21 +29,25 @@ namespace facebook::velox::exec::trace { /// file. class QueryDataWriter { public: - explicit QueryDataWriter(const std::string& path, memory::MemoryPool* pool); + explicit QueryDataWriter( + std::string path, + memory::MemoryPool* pool, + UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB); /// Serializes rows and writes out each batch. void write(const RowVectorPtr& rows); /// Closes the data file and writes out the data summary. /// - /// NOTE: This method should be only called once. - void finish(); + /// @param limitExceeded A flag indicates the written data bytes exceed the + /// limit causing the 'QueryDataWriter' to finish early. + void finish(bool limitExceeded = false); private: // Flushes the trace data summaries to the disk. // // TODO: add more summaries such as number of rows etc. - void writeSummary() const; + void writeSummary(bool limitExceeded = false) const; const std::string dirPath_; // TODO: make 'useLosslessTimestamp' configuerable. @@ -52,9 +57,11 @@ class QueryDataWriter { /*nullsFirst=*/true}; const std::shared_ptr fs_; memory::MemoryPool* const pool_; + const UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB_; std::unique_ptr dataFile_; TypePtr dataType_; std::unique_ptr batch_; + bool finished_{false}; }; } // namespace facebook::velox::exec::trace diff --git a/velox/exec/trace/QueryMetadataReader.cpp b/velox/exec/trace/QueryMetadataReader.cpp index 99fe7cc8c435..0eb6270acacf 100644 --- a/velox/exec/trace/QueryMetadataReader.cpp +++ b/velox/exec/trace/QueryMetadataReader.cpp @@ -20,6 +20,7 @@ #include "velox/common/file/FileSystems.h" #include "velox/core/PlanNode.h" #include "velox/exec/trace/QueryTraceTraits.h" +#include "velox/exec/trace/QueryTraceUtil.h" namespace facebook::velox::exec::trace { @@ -43,19 +44,14 @@ void QueryMetadataReader::read( std::string, std::unordered_map>& connectorProperties, core::PlanNodePtr& queryPlan) const { - const auto file = fs_->openFileForRead(metaFilePath_); - VELOX_CHECK_NOT_NULL(file); - const auto metadata = file->pread(0, file->size()); - VELOX_USER_CHECK(!metadata.empty()); - folly::dynamic obj = folly::parseJson(metadata); - - const auto& queryConfigObj = obj[QueryTraceTraits::kQueryConfigKey]; + folly::dynamic metaObj = getMetadata(metaFilePath_, fs_); + const auto& queryConfigObj = metaObj[QueryTraceTraits::kQueryConfigKey]; for (const auto& [key, value] : queryConfigObj.items()) { queryConfigs[key.asString()] = value.asString(); } const auto& connectorPropertiesObj = - obj[QueryTraceTraits::kConnectorPropertiesKey]; + metaObj[QueryTraceTraits::kConnectorPropertiesKey]; for (const auto& [connectorId, configs] : connectorPropertiesObj.items()) { const auto connectorIdStr = connectorId.asString(); connectorProperties[connectorIdStr] = {}; @@ -65,6 +61,6 @@ void QueryMetadataReader::read( } queryPlan = ISerializable::deserialize( - obj[QueryTraceTraits::kPlanNodeKey], pool_); + metaObj[QueryTraceTraits::kPlanNodeKey], pool_); } } // namespace facebook::velox::exec::trace diff --git a/velox/exec/trace/QueryTraceConfig.cpp b/velox/exec/trace/QueryTraceConfig.cpp index 233226ebccf5..d177bcdf3470 100644 --- a/velox/exec/trace/QueryTraceConfig.cpp +++ b/velox/exec/trace/QueryTraceConfig.cpp @@ -16,17 +16,25 @@ #include "velox/exec/trace/QueryTraceConfig.h" +#include + namespace facebook::velox::exec::trace { QueryTraceConfig::QueryTraceConfig( std::unordered_set _queryNodeIds, - std::string _queryTraceDir) + std::string _queryTraceDir, + UpdateAndCheckTraceLimitCB _updateAndCheckTraceLimitCB, + std::string _taskRegExp) : queryNodes(std::move(_queryNodeIds)), - queryTraceDir(std::move(_queryTraceDir)) {} + queryTraceDir(std::move(_queryTraceDir)), + updateAndCheckTraceLimitCB(std::move(_updateAndCheckTraceLimitCB)), + taskRegExp(std::move(_taskRegExp)) {} QueryTraceConfig::QueryTraceConfig(std::string _queryTraceDir) : QueryTraceConfig( std::unordered_set{}, - std::move(_queryTraceDir)) {} + std::move(_queryTraceDir), + [](uint64_t) { return false; }, + ".*") {} } // namespace facebook::velox::exec::trace diff --git a/velox/exec/trace/QueryTraceConfig.h b/velox/exec/trace/QueryTraceConfig.h index 0e9c22818c34..200e5cb52ce4 100644 --- a/velox/exec/trace/QueryTraceConfig.h +++ b/velox/exec/trace/QueryTraceConfig.h @@ -16,19 +16,31 @@ #pragma once +#include #include #include namespace facebook::velox::exec::trace { + +/// The callback used to update and aggregate the trace bytes of a query. If the +/// query trace limit is set, the callback return true if the aggregate traced +/// bytes exceed the set limit otherwise return false. +using UpdateAndCheckTraceLimitCB = std::function; + struct QueryTraceConfig { /// Target query trace nodes. std::unordered_set queryNodes; /// Base dir of query trace. std::string queryTraceDir; + UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB; + /// The trace task regexp. + std::string taskRegExp; QueryTraceConfig( std::unordered_set _queryNodeIds, - std::string _queryTraceDir); + std::string _queryTraceDir, + UpdateAndCheckTraceLimitCB _updateAndCheckTraceLimitCB, + std::string _taskRegExp); QueryTraceConfig(std::string _queryTraceDir); diff --git a/velox/exec/trace/QueryTraceTraits.h b/velox/exec/trace/QueryTraceTraits.h index 5e3a91f783d3..ad817115b490 100644 --- a/velox/exec/trace/QueryTraceTraits.h +++ b/velox/exec/trace/QueryTraceTraits.h @@ -26,6 +26,7 @@ struct QueryTraceTraits { static inline const std::string kDataTypeKey = "rowType"; static inline const std::string kConnectorPropertiesKey = "connectorProperties"; + static inline const std::string kTraceLimitExceededKey = "traceLimitExceeded"; static inline const std::string kQueryMetaFileName = "query_meta.json"; static inline const std::string kDataSummaryFileName = "data_summary.json"; diff --git a/velox/exec/trace/QueryTraceUtil.cpp b/velox/exec/trace/QueryTraceUtil.cpp index 437d3eee224e..a0ed04fe03c9 100644 --- a/velox/exec/trace/QueryTraceUtil.cpp +++ b/velox/exec/trace/QueryTraceUtil.cpp @@ -15,7 +15,11 @@ */ #include "velox/exec/trace/QueryTraceUtil.h" + +#include + #include "velox/common/base/Exceptions.h" +#include "velox/common/file/File.h" #include "velox/common/file/FileSystems.h" namespace facebook::velox::exec::trace { @@ -35,4 +39,40 @@ void createTraceDirectory(const std::string& traceDir) { } } +std::vector getTaskIds( + const std::string& traceDir, + const std::shared_ptr& fs) { + VELOX_USER_CHECK(fs->exists(traceDir), "{} dose not exist", traceDir); + try { + const auto taskDirs = fs->list(traceDir); + std::vector taskIds; + for (const auto& taskDir : taskDirs) { + std::vector pathNodes; + folly::split("/", taskDir, pathNodes); + taskIds.emplace_back(std::move(pathNodes.back())); + } + return taskIds; + } catch (const std::exception& e) { + VELOX_FAIL( + "Failed to list the directory '{}' with error: {}", traceDir, e.what()); + } +} + +folly::dynamic getMetadata( + const std::string& metadataFile, + const std::shared_ptr& fs) { + try { + const auto file = fs->openFileForRead(metadataFile); + VELOX_CHECK_NOT_NULL(file); + const auto metadata = file->pread(0, file->size()); + VELOX_USER_CHECK(!metadata.empty()); + return folly::parseJson(metadata); + } catch (const std::exception& e) { + VELOX_FAIL( + "Failed to get the query metadata from '{}' with error: {}", + metadataFile, + e.what()); + } +} + } // namespace facebook::velox::exec::trace diff --git a/velox/exec/trace/QueryTraceUtil.h b/velox/exec/trace/QueryTraceUtil.h index 826aa35283a2..e006b2fdb18f 100644 --- a/velox/exec/trace/QueryTraceUtil.h +++ b/velox/exec/trace/QueryTraceUtil.h @@ -17,10 +17,24 @@ #pragma once #include +#include +#include "velox/common/file/FileSystems.h" + +#include namespace facebook::velox::exec::trace { /// Creates a directory to store the query trace metdata and data. void createTraceDirectory(const std::string& traceDir); +/// Extracts task ids of the query tracing by listing the trace directory. +std::vector getTaskIds( + const std::string& traceDir, + const std::shared_ptr& fs); + +/// Gets the metadata from the given task directory which includes query plan, +/// configs and connector properties. +folly::dynamic getMetadata( + const std::string& traceTaskDir, + const std::shared_ptr& fs); } // namespace facebook::velox::exec::trace diff --git a/velox/exec/trace/test/QueryTraceTest.cpp b/velox/exec/trace/test/QueryTraceTest.cpp index 171c7cea138a..8e8a2a6405b8 100644 --- a/velox/exec/trace/test/QueryTraceTest.cpp +++ b/velox/exec/trace/test/QueryTraceTest.cpp @@ -32,7 +32,9 @@ #include "velox/serializers/PrestoSerializer.h" #include "velox/vector/tests/utils/VectorTestBase.h" -namespace facebook::velox::exec::test { +using namespace facebook::velox::exec::test; + +namespace facebook::velox::exec::trace::test { class QueryTracerTest : public HiveConnectorTestBase { protected: static void SetUpTestCase() { @@ -101,34 +103,75 @@ class QueryTracerTest : public HiveConnectorTestBase { }; TEST_F(QueryTracerTest, traceData) { - const auto rowType = generateTypes(5); + const auto rowType = ROW({"a", "b", "c"}, {BIGINT(), BIGINT(), BIGINT()}); std::vector inputVectors; - constexpr auto numBatch = 3; + constexpr auto numBatch = 5; inputVectors.reserve(numBatch); for (auto i = 0; i < numBatch; ++i) { - inputVectors.push_back(vectorFuzzer_.fuzzInputRow(rowType)); + inputVectors.push_back(vectorFuzzer_.fuzzInputFlatRow(rowType)); } - const auto outputDir = TempDirectoryPath::create(); - auto writer = trace::QueryDataWriter(outputDir->getPath(), pool()); - for (auto i = 0; i < numBatch; ++i) { - writer.write(inputVectors[i]); - } - writer.finish(); - - const auto reader = trace::QueryDataReader(outputDir->getPath(), pool()); - RowVectorPtr actual; - size_t numOutputVectors{0}; - while (reader.read(actual)) { - const auto expected = inputVectors[numOutputVectors]; - const auto size = actual->size(); - ASSERT_EQ(size, expected->size()); - for (auto i = 0; i < size; ++i) { - actual->compare(expected.get(), i, i, {.nullsFirst = true}); + struct { + uint64_t maxTracedBytes; + uint8_t numTracedBatches; + bool limitExceeded; + + std::string debugString() const { + return fmt::format( + "maxTracedBytes: {}, numTracedBatches: {}, limitExceeded {}", + maxTracedBytes, + numTracedBatches, + limitExceeded); + } + } testSettings[]{ + {0, 0, true}, {800, 2, true}, {100UL << 30, numBatch, false}}; + + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + const auto outputDir = TempDirectoryPath::create(); + // Ensure the writer only write one batch. + uint64_t numTracedBytes{0}; + auto writer = trace::QueryDataWriter( + outputDir->getPath(), pool(), [&](uint64_t bytes) { + numTracedBytes += bytes; + return numTracedBytes >= testData.maxTracedBytes; + }); + for (auto i = 0; i < numBatch; ++i) { + writer.write(inputVectors[i]); } - ++numOutputVectors; + writer.finish(); + + const auto fs = filesystems::getFileSystem(outputDir->getPath(), nullptr); + const auto summaryFile = fs->openFileForRead(fmt::format( + "{}/{}", outputDir->getPath(), QueryTraceTraits::kDataSummaryFileName)); + const auto summary = summaryFile->pread(0, summaryFile->size()); + ASSERT_FALSE(summary.empty()); + folly::dynamic obj = folly::parseJson(summary); + ASSERT_EQ( + obj[QueryTraceTraits::kTraceLimitExceededKey].asBool(), + testData.limitExceeded); + + if (testData.maxTracedBytes == 0) { + const auto dataFile = fs->openFileForRead(fmt::format( + "{}/{}", outputDir->getPath(), QueryTraceTraits::kDataFileName)); + ASSERT_EQ(dataFile->size(), 0); + continue; + } + + const auto reader = QueryDataReader(outputDir->getPath(), pool()); + RowVectorPtr actual; + size_t numOutputVectors{0}; + while (reader.read(actual)) { + const auto expected = inputVectors[numOutputVectors]; + const auto size = actual->size(); + ASSERT_EQ(size, expected->size()); + for (auto i = 0; i < size; ++i) { + actual->compare(expected.get(), i, i, {.nullsFirst = true}); + } + ++numOutputVectors; + } + ASSERT_EQ(numOutputVectors, testData.numTracedBatches); } - ASSERT_EQ(numOutputVectors, inputVectors.size()); } TEST_F(QueryTracerTest, traceMetadata) { @@ -235,7 +278,8 @@ TEST_F(QueryTracerTest, task) { const auto expectedResult = AssertQueryBuilder(planNode).maxDrivers(1).copyResults(pool()); - for (const auto& queryTraceNodeIds : {"1,2", ""}) { + for (const auto* taceTaskRegExp : + {".*", "test_cursor [12345]", "xxx_yyy \\d+"}) { const auto outputDir = TempDirectoryPath::create(); const auto expectedQueryConfigs = std::unordered_map{ @@ -243,9 +287,11 @@ TEST_F(QueryTracerTest, task) { {core::QueryConfig::kSpillNumPartitionBits, "17"}, {core::QueryConfig::kQueryTraceEnabled, "true"}, {core::QueryConfig::kQueryTraceDir, outputDir->getPath()}, - {core::QueryConfig::kQueryTraceEnabled, queryTraceNodeIds}, + {core::QueryConfig::kQueryTraceTaskRegExp, taceTaskRegExp}, + {core::QueryConfig::kQueryTraceNodeIds, "1,2"}, {"key1", "value1"}, }; + const auto expectedConnectorProperties = std::unordered_map>{ {"test_trace", @@ -268,8 +314,16 @@ TEST_F(QueryTracerTest, task) { fmt::format("{}/{}", outputDir->getPath(), task->taskId()); const auto fs = filesystems::getFileSystem(expectedDir, nullptr); const auto actaulDirs = fs->list(outputDir->getPath()); + + if (std::strcmp(taceTaskRegExp, "xxx_yyy \\d+") == 0) { + ASSERT_EQ(actaulDirs.size(), 0); + continue; + } ASSERT_EQ(actaulDirs.size(), 1); ASSERT_EQ(actaulDirs.at(0), expectedDir); + const auto taskIds = getTaskIds(outputDir->getPath(), fs); + ASSERT_EQ(taskIds.size(), 1); + ASSERT_EQ(taskIds.at(0), task->taskId()); std::unordered_map acutalQueryConfigs; std:: @@ -361,7 +415,7 @@ TEST_F(QueryTracerTest, traceDir) { ASSERT_EQ(expectedDirs.count(dir), 1); } } -} // namespace facebook::velox::exec::test +} // namespace facebook::velox::exec::trace::test // This main is needed for some tests on linux. int main(int argc, char** argv) { diff --git a/velox/tool/CMakeLists.txt b/velox/tool/CMakeLists.txt new file mode 100644 index 000000000000..cfcec2fbb716 --- /dev/null +++ b/velox/tool/CMakeLists.txt @@ -0,0 +1,15 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_subdirectory(trace) diff --git a/velox/tool/trace/CMakeLists.txt b/velox/tool/trace/CMakeLists.txt new file mode 100644 index 000000000000..643376c7c942 --- /dev/null +++ b/velox/tool/trace/CMakeLists.txt @@ -0,0 +1,34 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +velox_add_library(query_trace_replayer_base QueryTraceReplayer.cpp) +velox_link_libraries( + query_trace_replayer_base + velox_query_trace_retrieve + velox_aggregates + velox_type + velox_vector + velox_vector_test_lib + velox_exec + velox_exec_test_lib + velox_tpch_connector + velox_memory + Folly::folly + glog::glog + gflags::gflags) + +add_executable(query_replayer QueryReplayer.cpp) + +target_link_libraries( + query_replayer query_trace_replayer_base) diff --git a/velox/tool/trace/QueryReplayer.cpp b/velox/tool/trace/QueryReplayer.cpp new file mode 100644 index 000000000000..053ddd417ab5 --- /dev/null +++ b/velox/tool/trace/QueryReplayer.cpp @@ -0,0 +1,78 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "velox/common/memory/Memory.h" +#include "velox/core/PlanNode.h" +#include "velox/exec/PartitionFunction.h" +#include "velox/type/Type.h" + +#include "velox/common/file/FileSystems.h" +#include "velox/connectors/hive/storage_adapters/abfs/RegisterAbfsFileSystem.h" +#include "velox/connectors/hive/storage_adapters/gcs/RegisterGCSFileSystem.h" +#include "velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.h" +#include "velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.h" +#include "velox/tool/trace/QueryTraceReplayer.h" + +using namespace facebook::velox; + +namespace { +void init() { + memory::initializeMemoryManager({}); + filesystems::registerLocalFileSystem(); + filesystems::registerS3FileSystem(); + filesystems::registerHdfsFileSystem(); + filesystems::registerGCSFileSystem(); + filesystems::abfs::registerAbfsFileSystem(); + Type::registerSerDe(); + core::PlanNode::registerSerDe(); + core::ITypedExpr::registerSerDe(); + exec::registerPartitionFunctionSerDe(); +} +} // namespace + +int main(int argc, char** argv) { + if (argc == 1) { + LOG(ERROR) << "\n" << tool::trace::QueryTraceReplayer::usage(); + return 1; + } + + gflags::ParseCommandLineFlags(&argc, &argv, true); + if (FLAGS_usage) { + LOG(INFO) << "\n" << tool::trace::QueryTraceReplayer::usage(); + return 0; + } + + if (FLAGS_root.empty()) { + LOG(ERROR) << "Root dir is not provided!\n" + << tool::trace::QueryTraceReplayer::usage(); + return 1; + } + + if (!FLAGS_summary && !FLAGS_short_summary) { + LOG(ERROR) << "Only support to print traced query metadata for now"; + return 1; + } + + init(); + const auto tool = std::make_unique(); + if (FLAGS_summary || FLAGS_short_summary) { + tool->printSummary(); + return 0; + } + + VELOX_UNREACHABLE(tool::trace::QueryTraceReplayer::usage()); +} diff --git a/velox/tool/trace/QueryTraceReplayer.cpp b/velox/tool/trace/QueryTraceReplayer.cpp new file mode 100644 index 000000000000..862f9a3505d0 --- /dev/null +++ b/velox/tool/trace/QueryTraceReplayer.cpp @@ -0,0 +1,101 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "velox/exec/trace/QueryTraceTraits.h" +#include "velox/exec/trace/QueryTraceUtil.h" +#include "velox/tool/trace/QueryTraceReplayer.h" + +#include "velox/common/serialization/Serializable.h" +#include "velox/core/PlanNode.h" + +DEFINE_bool(usage, false, "Show the usage"); +DEFINE_string(root, "", "Root dir of the query tracing"); +DEFINE_bool(summary, false, "Show the summary of the tracing"); +DEFINE_bool(short_summary, false, "Only show number of tasks and task ids"); +DEFINE_string( + task_id, + "", + "Specify the target task id, if empty, show the summary of all the traced query task."); + +using namespace facebook::velox; + +namespace facebook::velox::tool::trace { + +QueryTraceReplayer::QueryTraceReplayer() + : rootDir_(FLAGS_root), taskId_(FLAGS_task_id) {} + +void QueryTraceReplayer::printSummary() const { + const auto fs = filesystems::getFileSystem(rootDir_, nullptr); + const auto taskIds = exec::trace::getTaskIds(rootDir_, fs); + if (taskIds.empty()) { + LOG(ERROR) << "No traced query task under " << rootDir_; + return; + } + + std::ostringstream summary; + summary << "\n++++++Query trace summary++++++\n"; + summary << "Number of tasks: " << taskIds.size() << "\n"; + summary << "Task ids: " << folly::join(",", taskIds); + + if (FLAGS_short_summary) { + LOG(INFO) << summary.str(); + return; + } + + const auto summaryTaskIds = + taskId_.empty() ? taskIds : std::vector{taskId_}; + for (const auto& taskId : summaryTaskIds) { + summary << "\n++++++Query configs and plan of task " << taskId + << ":++++++\n"; + const auto traceTaskDir = fmt::format("{}/{}", rootDir_, taskId); + const auto queryMetaFile = fmt::format( + "{}/{}", + traceTaskDir, + exec::trace::QueryTraceTraits::kQueryMetaFileName); + const auto metaObj = exec::trace::getMetadata(queryMetaFile, fs); + const auto& configObj = + metaObj[exec::trace::QueryTraceTraits::kQueryConfigKey]; + summary << "++++++Query configs++++++\n"; + summary << folly::toJson(configObj) << "\n"; + summary << "++++++Query plan++++++\n"; + const auto queryPlan = ISerializable::deserialize( + metaObj[exec::trace::QueryTraceTraits::kPlanNodeKey], + memory::MemoryManager::getInstance()->tracePool()); + summary << queryPlan->toString(true, true); + } + LOG(INFO) << summary.str(); +} + +std::string QueryTraceReplayer::usage() { + std::ostringstream usage; + usage + << "++++++Query Trace Tool Usage++++++\n" + << "The following options are available:\n" + << "--usage: Show the usage\n" + << "--root: Root dir of the query tracing, it must be set\n" + << "--summary: Show the summary of the tracing including number of tasks" + << "and task ids. It also print the query metadata including" + << "query configs, connectors properties, and query plan in JSON format.\n" + << "--short_summary: Only show number of tasks and task ids.\n" + << "--pretty: Show the summary of the tracing in pretty JSON.\n" + << "--task_id: Specify the target task id, if empty, show the summary of " + << "all the traced query task.\n"; + return usage.str(); +} + +} // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/QueryTraceReplayer.h b/velox/tool/trace/QueryTraceReplayer.h new file mode 100644 index 000000000000..29f99f91594f --- /dev/null +++ b/velox/tool/trace/QueryTraceReplayer.h @@ -0,0 +1,42 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +DECLARE_bool(usage); +DECLARE_string(root); +DECLARE_bool(summary); +DECLARE_bool(short_summary); +DECLARE_bool(pretty); +DECLARE_string(task_id); + +namespace facebook::velox::tool::trace { +/// The tool used to print or replay the traced query metadata and operations. +class QueryTraceReplayer { + public: + QueryTraceReplayer(); + + void printSummary() const; + static std::string usage(); + + private: + const std::string rootDir_; + const std::string taskId_; +}; + +} // namespace facebook::velox::tool::trace