Skip to content

Commit

Permalink
Add partitioned output trace replayer (#11175)
Browse files Browse the repository at this point in the history
Summary:
Add partitioned output trace replayer to facilitate debugging for
partitioned output operator with complex input.

part of #9668

Pull Request resolved: #11175

Reviewed By: xiaoxmeng

Differential Revision: D63959956

Pulled By: tanjialiang

fbshipit-source-id: a1519cd1191222316ec03f7e5c219d03c5e6a5be
  • Loading branch information
tanjialiang authored and facebook-github-bot committed Oct 9, 2024
1 parent d5df0a7 commit 6ddb65f
Show file tree
Hide file tree
Showing 15 changed files with 587 additions and 167 deletions.
4 changes: 2 additions & 2 deletions velox/exec/OutputBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,8 @@ class OutputBuffer {
DataAvailableCallback notify,
DataConsumerActiveCheckCallback activeCheck);

// Continues any possibly waiting producers. Called when the
// producer task has an error or cancellation.
/// Continues any possibly waiting producers. Called when the producer task
/// has an error or cancellation.
void terminate();

std::string toString();
Expand Down
14 changes: 7 additions & 7 deletions velox/exec/OutputBufferManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ class OutputBufferManager {
/// Returns true if the buffer exists for a given taskId, else returns false.
bool updateNumDrivers(const std::string& taskId, uint32_t newNumDrivers);

// Adds data to the outgoing queue for 'destination'. 'data' must not be
// nullptr. 'data' is always added but if the buffers are full the future is
// set to a ContinueFuture that will be realized when there is space.
/// Adds data to the outgoing queue for 'destination'. 'data' must not be
/// nullptr. 'data' is always added but if the buffers are full the future is
/// set to a ContinueFuture that will be realized when there is space.
bool enqueue(
const std::string& taskId,
int destination,
Expand All @@ -62,12 +62,12 @@ class OutputBufferManager {

void noMoreData(const std::string& taskId);

// Returns true if noMoreData has been called and all the accumulated data
// have been fetched and acknowledged.
/// Returns true if noMoreData has been called and all the accumulated data
/// have been fetched and acknowledged.
bool isFinished(const std::string& taskId);

// Removes data with sequence number < 'sequence' from the queue for
// 'destination_'.
/// Removes data with sequence number < 'sequence' from the queue for
/// 'destination_'.
void
acknowledge(const std::string& taskId, int destination, int64_t sequence);

Expand Down
5 changes: 2 additions & 3 deletions velox/exec/PartitionedOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,11 @@ void PartitionedOutput::estimateRowSizes() {
}

void PartitionedOutput::addInput(RowVectorPtr input) {
initializeInput(std::move(input));
traceInput(input);

initializeInput(std::move(input));
initializeDestinations();

initializeSizeBuffers();

estimateRowSizes();

for (auto& destination : destinations_) {
Expand Down
9 changes: 7 additions & 2 deletions velox/tool/trace/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

velox_add_library(velox_query_trace_replayer_base OperatorReplayerBase.cpp
TableWriterReplayer.cpp AggregationReplayer.cpp)
velox_add_library(
velox_query_trace_replayer_base
AggregationReplayer.cpp
OperatorReplayerBase.cpp
PartitionedOutputReplayer.cpp
TableWriterReplayer.cpp)

velox_link_libraries(
velox_query_trace_replayer_base
velox_aggregates
Expand Down
85 changes: 10 additions & 75 deletions velox/tool/trace/OperatorReplayerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,28 @@ OperatorReplayerBase::OperatorReplayerBase(
std::string nodeId,
int32_t pipelineId,
std::string operatorType)
: rootDir_(std::move(rootDir)),
taskId_(std::move(taskId)),
: taskId_(std::move(taskId)),
nodeId_(std::move(nodeId)),
pipelineId_(pipelineId),
operatorType_(std::move(operatorType)) {
operatorType_(std::move(operatorType)),
rootDir_(std::move(rootDir)),
taskDir_(fmt::format("{}/{}", rootDir_, taskId_)),
nodeDir_(fmt::format("{}/{}", taskDir_, nodeId_)) {
VELOX_USER_CHECK(!rootDir_.empty());
VELOX_USER_CHECK(!taskId_.empty());
VELOX_USER_CHECK(!nodeId_.empty());
VELOX_USER_CHECK_GE(pipelineId_, 0);
VELOX_USER_CHECK(!operatorType_.empty());
const auto traceTaskDir = fmt::format("{}/{}", rootDir_, taskId_);
const auto metadataReader = exec::trace::QueryMetadataReader(
traceTaskDir, memory::MemoryManager::getInstance()->tracePool());
taskDir_, memory::MemoryManager::getInstance()->tracePool());
metadataReader.read(queryConfigs_, connectorConfigs_, planFragment_);
queryConfigs_[core::QueryConfig::kQueryTraceEnabled] = "false";
fs_ = filesystems::getFileSystem(rootDir_, nullptr);
maxDrivers_ =
exec::trace::getNumDrivers(rootDir_, taskId_, nodeId_, pipelineId_, fs_);
}

RowVectorPtr OperatorReplayerBase::run() const {
RowVectorPtr OperatorReplayerBase::run() {
const auto restoredPlanNode = createPlan();
return exec::test::AssertQueryBuilder(restoredPlanNode)
.maxDrivers(maxDrivers_)
Expand All @@ -67,83 +68,17 @@ core::PlanNodePtr OperatorReplayerBase::createPlan() const {
const auto* replayNode = core::PlanNode::findFirstNode(
planFragment_.get(),
[this](const core::PlanNode* node) { return node->id() == nodeId_; });
const auto traceDir = fmt::format("{}/{}", rootDir_, taskId_);
return exec::test::PlanBuilder()
.traceScan(
fmt::format("{}/{}", traceDir, nodeId_),
exec::trace::getDataType(planFragment_, nodeId_))
.addNode(addReplayNode(replayNode))
.traceScan(nodeDir_, exec::trace::getDataType(planFragment_, nodeId_))
.addNode(replayNodeFactory(replayNode))
.planNode();
}

std::function<core::PlanNodePtr(std::string, core::PlanNodePtr)>
OperatorReplayerBase::addReplayNode(const core::PlanNode* node) const {
OperatorReplayerBase::replayNodeFactory(const core::PlanNode* node) const {
return [=](const core::PlanNodeId& nodeId,
const core::PlanNodePtr& source) -> core::PlanNodePtr {
return createPlanNode(node, nodeId, source);
};
}

void OperatorReplayerBase::printSummary(
const std::string& rootDir,
const std::string& taskId,
bool shortSummary) {
const auto fs = filesystems::getFileSystem(rootDir, nullptr);
const auto taskIds = exec::trace::getTaskIds(rootDir, fs);
if (taskIds.empty()) {
LOG(ERROR) << "No traced query task under " << rootDir;
return;
}

std::ostringstream summary;
summary << "\n++++++Query trace summary++++++\n";
summary << "Number of tasks: " << taskIds.size() << "\n";
summary << "Task ids: " << folly::join(",", taskIds);

if (shortSummary) {
LOG(INFO) << summary.str();
return;
}

const auto summaryTaskIds =
taskId.empty() ? taskIds : std::vector<std::string>{taskId};
for (const auto& taskId : summaryTaskIds) {
summary << "\n++++++Query configs and plan of task " << taskId
<< ":++++++\n";
const auto traceTaskDir = fmt::format("{}/{}", rootDir, taskId);
const auto queryMetaFile = fmt::format(
"{}/{}",
traceTaskDir,
exec::trace::QueryTraceTraits::kQueryMetaFileName);
const auto metaObj = exec::trace::getMetadata(queryMetaFile, fs);
const auto& configObj =
metaObj[exec::trace::QueryTraceTraits::kQueryConfigKey];
summary << "++++++Query configs++++++\n";
summary << folly::toJson(configObj) << "\n";
summary << "++++++Query plan++++++\n";
const auto queryPlan = ISerializable::deserialize<core::PlanNode>(
metaObj[exec::trace::QueryTraceTraits::kPlanNodeKey],
memory::MemoryManager::getInstance()->tracePool());
summary << queryPlan->toString(true, true);
}
LOG(INFO) << summary.str();
}

std::string OperatorReplayerBase::usage() {
std::ostringstream usage;
usage
<< "++++++Query Trace Tool Usage++++++\n"
<< "The following options are available:\n"
<< "--usage: Show the usage\n"
<< "--root: Root dir of the query tracing, it must be set\n"
<< "--summary: Show the summary of the tracing including number of tasks"
<< "and task ids. It also print the query metadata including"
<< "query configs, connectors properties, and query plan in JSON format.\n"
<< "--short_summary: Only show number of tasks and task ids.\n"
<< "--pretty: Show the summary of the tracing in pretty JSON.\n"
<< "--task_id: Specify the target task id, if empty, show the summary of "
<< "all the traced query task.\n";
return usage.str();
}

} // namespace facebook::velox::tool::trace
30 changes: 15 additions & 15 deletions velox/tool/trace/OperatorReplayerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
#include "velox/common/file/FileSystems.h"
#include "velox/core/PlanNode.h"

namespace facebook::velox::exec {
class Task;
}

namespace facebook::velox::tool::trace {
class OperatorReplayerBase {
public:
Expand All @@ -36,38 +40,34 @@ class OperatorReplayerBase {
OperatorReplayerBase& operator=(OperatorReplayerBase&& other) noexcept =
delete;

RowVectorPtr run() const;

static void printSummary(
const std::string& rootDir,
const std::string& taskId,
bool shortSummary);

static std::string usage();
virtual RowVectorPtr run();

protected:
virtual core::PlanNodePtr createPlan() const;

virtual std::function<core::PlanNodePtr(std::string, core::PlanNodePtr)>
addReplayNode(const core::PlanNode* node) const;

virtual core::PlanNodePtr createPlanNode(
const core::PlanNode* node,
const core::PlanNodeId& nodeId,
const core::PlanNodePtr& source) const = 0;

const std::string rootDir_;
core::PlanNodePtr createPlan() const;

const std::string taskId_;
const std::string nodeId_;
const int32_t pipelineId_;
const std::string operatorType_;

const std::string rootDir_;
const std::string taskDir_;
const std::string nodeDir_;

std::unordered_map<std::string, std::string> queryConfigs_;
std::unordered_map<std::string, std::unordered_map<std::string, std::string>>
connectorConfigs_;
core::PlanNodePtr planFragment_;
std::shared_ptr<filesystems::FileSystem> fs_;
int32_t maxDrivers_{1};
};

private:
std::function<core::PlanNodePtr(std::string, core::PlanNodePtr)>
replayNodeFactory(const core::PlanNode* node) const;
};
} // namespace facebook::velox::tool::trace
Loading

0 comments on commit 6ddb65f

Please sign in to comment.