Skip to content

Commit

Permalink
misc: Add toString() to OutputBuffer stats (facebookincubator#11562)
Browse files Browse the repository at this point in the history
Summary:
Sample output:
```
[bufferedBytes: 0B, bufferedPages: 0, totalBytesSent: 6.69KB, totalRowsSent: 400, totalPagesSent: 4, averageBufferTimeMs: 0, numTopBuffers: 4]
  D0: [finished: false, bytesBuffered: 0B, rowsBuffered: 0, pagesBuffered: 0, bytesSent: 1.67KB, rowsSent: 100, pagesSent:1]
  D1: [finished: false, bytesBuffered: 0B, rowsBuffered: 0, pagesBuffered: 0, bytesSent: 1.67KB, rowsSent: 100, pagesSent:1]
  D2: [finished: false, bytesBuffered: 0B, rowsBuffered: 0, pagesBuffered: 0, bytesSent: 1.67KB, rowsSent: 100, pagesSent:1]
  D3: [finished: false, bytesBuffered: 0B, rowsBuffered: 0, pagesBuffered: 0, bytesSent: 1.67KB, rowsSent: 100, pagesSent:1]
```

Pull Request resolved: facebookincubator#11562

Reviewed By: DanielHunte

Differential Revision: D67304118

Pulled By: kgpai

fbshipit-source-id: a930b75527627393acd205377756c010b527ab03
  • Loading branch information
yingsu00 authored and facebook-github-bot committed Dec 17, 2024
1 parent f1622ab commit e46cb76
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 45 deletions.
4 changes: 4 additions & 0 deletions velox/common/testutil/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,9 @@ velox_add_library(velox_test_util ScopedTestTime.cpp TestValue.cpp)
velox_link_libraries(velox_test_util PUBLIC velox_exception)

if(${VELOX_BUILD_TESTING})
velox_add_library(velox_test_output_matcher OutputMatcher.cpp)
velox_link_libraries(velox_test_output_matcher PUBLIC Folly::folly
GTest::gtest re2::re2)

add_subdirectory(tests)
endif()
67 changes: 67 additions & 0 deletions velox/common/testutil/OutputMatcher.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/common/testutil/OutputMatcher.h"

#include <vector>

#include <folly/String.h>
#include <gtest/gtest.h>
#include <re2/re2.h>

void OutputMatcher::compareOutputs(
const std::string& testName,
const std::string& result,
const std::vector<ExpectedLine>& expectedRegex) {
std::string line;
std::string eline;
std::istringstream iss(result);
int lineCount = 0;
int expectedLineIndex = 0;
for (; std::getline(iss, line);) {
lineCount++;
std::vector<std::string> potentialLines;
auto expectedLine = expectedRegex.at(expectedLineIndex++);
while (!RE2::FullMatch(line, expectedLine.line)) {
potentialLines.push_back(expectedLine.line);
if (!expectedLine.optional) {
ASSERT_FALSE(true) << "Output did not match " << "Source:" << testName
<< ", Line number:" << lineCount
<< ", Line: " << line << ", Expected Line one of: "
<< folly::join(",", potentialLines);
}
expectedLine = expectedRegex.at(expectedLineIndex++);
}
}
for (int i = expectedLineIndex; i < expectedRegex.size(); i++) {
ASSERT_TRUE(expectedRegex[expectedLineIndex].optional);
}
}
47 changes: 47 additions & 0 deletions velox/common/testutil/OutputMatcher.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <iostream>
#include <vector>

struct ExpectedLine {
std::string line;
bool optional = false;
};

class OutputMatcher {
public:
static void compareOutputs(
const std::string& testName,
const std::string& result,
const std::vector<ExpectedLine>& expectedRegex);
};
36 changes: 35 additions & 1 deletion velox/exec/OutputBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,18 @@ class DestinationBuffer {
int64_t bytesSent{0};
int64_t rowsSent{0};
int64_t pagesSent{0};

std::string toString() const {
return fmt::format(
"[finished: {}, bytesBuffered: {}, rowsBuffered: {}, pagesBuffered: {}, bytesSent: {}, rowsSent: {}, pagesSent:{}]",
finished,
succinctBytes(bytesBuffered),
rowsBuffered,
pagesBuffered,
succinctBytes(bytesSent),
rowsSent,
pagesSent);
}
};

void enqueue(std::shared_ptr<SerializedPage> data);
Expand Down Expand Up @@ -254,7 +266,29 @@ class OutputBuffer {
/// Stats of the OutputBuffer's destinations.
std::vector<DestinationBuffer::Stats> buffersStats;

std::string toString() const;
std::string toString() const {
std::string destinationBufferStats;
if (!buffersStats.empty()) {
for (int i = 0; i < buffersStats.size(); i++) {
auto& destinationBufferStat = buffersStats[i];
destinationBufferStats +=
fmt::format(" D{}: {}\n", i, destinationBufferStat.toString());
}
}

return fmt::format(
"[bufferedBytes: {}, bufferedPages: {}, "
"totalBytesSent: {}, totalRowsSent: {}, totalPagesSent: {}, "
"averageBufferTimeMs: {}, numTopBuffers: {}]\n{}",
succinctBytes(bufferedBytes),
bufferedPages,
succinctBytes(totalBytesSent),
totalRowsSent,
totalPagesSent,
averageBufferTimeMs,
numTopBuffers,
destinationBufferStats);
}
};

OutputBuffer(
Expand Down
1 change: 1 addition & 0 deletions velox/exec/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ target_link_libraries(
velox_hive_connector
velox_memory
velox_serialization
velox_test_output_matcher
velox_test_util
velox_type
velox_type_test_lib
Expand Down
41 changes: 40 additions & 1 deletion velox/exec/tests/OutputBufferManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
#include <gtest/gtest.h>
#include "folly/experimental/EventCount.h"
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/testutil/OutputMatcher.h"
#include "velox/dwio/common/tests/utils/BatchMaker.h"
#include "velox/exec/Task.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/SerializedPageUtil.h"
#include "velox/serializers/CompactRowSerializer.h"
#include "velox/serializers/PrestoSerializer.h"
#include "velox/serializers/UnsafeRowSerializer.h"

using namespace facebook::velox;
Expand Down Expand Up @@ -1323,6 +1323,44 @@ TEST_P(AllOutputBufferManagerTest, outputBufferUtilization) {
verifyOutputBuffer(task, OutputBufferStatus::kFinished);
}

TEST_P(AllOutputBufferManagerTest, printOutputBufferStats) {
const vector_size_t vectorSize = 100;
const std::string taskId = std::to_string(folly::Random::rand32());
const int numDestinations = 4;
initializeTask(
taskId,
rowType_,
core::PartitionedOutputNode::Kind::kPartitioned,
numDestinations,
1);

const int numPages = numDestinations;
int totalNumRows = 0;
int totalBytes = 0;
for (int pageId = 0; pageId < numPages; ++pageId) {
const auto pageBytes = enqueue(taskId, pageId, rowType_, vectorSize);
fetchOneAndAck(taskId, pageId, 0);
}

const auto statsEnqueue = getStats(taskId);
OutputMatcher::compareOutputs(
::testing::UnitTest::GetInstance()->current_test_info()->name(),
statsEnqueue.toString(),
{{"\\[bufferedBytes: ([\\d.]+[KMGT]?[B]?), bufferedPages: (\\d+), totalBytesSent: ([\\d.]+[KMGT]?[B]?), totalRowsSent: (\\d+), totalPagesSent: (\\d+), averageBufferTimeMs: (\\d+), numTopBuffers: (\\d+)\\]"},
{"\\s*D0: \\[finished: (true|false), bytesBuffered: ([\\d.]+[KMGT]?[B]?), rowsBuffered: (\\d+), pagesBuffered: (\\d+), bytesSent: ([\\d.]+[KMGT]?[B]?), rowsSent: (\\d+), pagesSent:(\\d+)\\]"},
{"\\s*D1: \\[finished: (true|false), bytesBuffered: ([\\d.]+[KMGT]?[B]?), rowsBuffered: (\\d+), pagesBuffered: (\\d+), bytesSent: ([\\d.]+[KMGT]?[B]?), rowsSent: (\\d+), pagesSent:(\\d+)\\]"},
{"\\s*D2: \\[finished: (true|false), bytesBuffered: ([\\d.]+[KMGT]?[B]?), rowsBuffered: (\\d+), pagesBuffered: (\\d+), bytesSent: ([\\d.]+[KMGT]?[B]?), rowsSent: (\\d+), pagesSent:(\\d+)\\]"},
{"\\s*D3: \\[finished: (true|false), bytesBuffered: ([\\d.]+[KMGT]?[B]?), rowsBuffered: (\\d+), pagesBuffered: (\\d+), bytesSent: ([\\d.]+[KMGT]?[B]?), rowsSent: (\\d+), pagesSent:(\\d+)\\]"}});

bufferManager_->updateOutputBuffers(taskId, numDestinations, true);
noMoreData(taskId);
for (int pageId = 0; pageId < numPages; ++pageId) {
fetchEndMarker(taskId, pageId, 1);
deleteResults(taskId, pageId);
}
bufferManager_->removeTask(taskId);
}

TEST_P(AllOutputBufferManagerTest, outputBufferStats) {
const vector_size_t vectorSize = 100;
const std::string taskId = std::to_string(folly::Random::rand32());
Expand Down Expand Up @@ -1354,6 +1392,7 @@ TEST_P(AllOutputBufferManagerTest, outputBufferStats) {
fetchOne(taskId, 0, pageId);
}
const auto statsEnqueue = getStats(taskId);
std::cout << statsEnqueue.toString();
ASSERT_EQ(statsEnqueue.buffersStats[0].pagesBuffered, 1);
ASSERT_EQ(statsEnqueue.buffersStats[0].rowsBuffered, vectorSize);
if (outputKind_ == core::PartitionedOutputNode::Kind::kBroadcast) {
Expand Down
50 changes: 7 additions & 43 deletions velox/exec/tests/PrintPlanWithStatsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,56 +14,20 @@
* limitations under the License.
*/

#include "velox/common/testutil/OutputMatcher.h"
#include "velox/exec/PlanNodeStats.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
#include "velox/exec/tests/utils/HiveConnectorTestBase.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"

#include <gtest/gtest.h>
#include <re2/re2.h>

using namespace facebook::velox;
using namespace facebook::velox::exec::test;

using facebook::velox::exec::test::PlanBuilder;

class PrintPlanWithStatsTest : public HiveConnectorTestBase {};

struct ExpectedLine {
std::string line;
bool optional = false;
};

void compareOutputs(
const std::string& testName,
const std::string& result,
const std::vector<ExpectedLine>& expectedRegex) {
std::string line;
std::string eline;
std::istringstream iss(result);
int lineCount = 0;
int expectedLineIndex = 0;
for (; std::getline(iss, line);) {
lineCount++;
std::vector<std::string> potentialLines;
auto expectedLine = expectedRegex.at(expectedLineIndex++);
while (!RE2::FullMatch(line, expectedLine.line)) {
potentialLines.push_back(expectedLine.line);
if (!expectedLine.optional) {
ASSERT_FALSE(true) << "Output did not match " << "Source:" << testName
<< ", Line number:" << lineCount
<< ", Line: " << line << ", Expected Line one of: "
<< folly::join(",", potentialLines);
}
expectedLine = expectedRegex.at(expectedLineIndex++);
}
}
for (int i = expectedLineIndex; i < expectedRegex.size(); i++) {
ASSERT_TRUE(expectedRegex[expectedLineIndex].optional);
}
}

void ensureTaskCompletion(exec::Task* task) {
// ASSERT_TRUE requires a function with return type void.
ASSERT_TRUE(waitForTaskCompletion(task));
Expand Down Expand Up @@ -129,7 +93,7 @@ TEST_F(PrintPlanWithStatsTest, innerJoinWithTableScan) {
"SELECT t.c0, t.c1 + 1, t.c1 + u.c1 FROM t, u WHERE t.c0 = u.c0");

ensureTaskCompletion(task.get());
compareOutputs(
OutputMatcher::compareOutputs(
::testing::UnitTest::GetInstance()->current_test_info()->name(),
printPlanWithStats(*op, task->taskStats()),
{{"-- Project\\[4\\]\\[expressions: \\(c0:INTEGER, ROW\\[\"c0\"\\]\\), \\(p1:BIGINT, plus\\(ROW\\[\"c1\"\\],1\\)\\), \\(p2:BIGINT, plus\\(ROW\\[\"c1\"\\],ROW\\[\"u_c1\"\\]\\)\\)\\] -> c0:INTEGER, p1:BIGINT, p2:BIGINT"},
Expand All @@ -146,7 +110,7 @@ TEST_F(PrintPlanWithStatsTest, innerJoinWithTableScan) {
{" Input: 0 rows \\(.+\\), Output: 100 rows \\(.+\\), Cpu time: .+, Blocked wall time: .+, Peak memory: 0B, Memory allocations: .+, Threads: 1, CPU breakdown: B/I/O/F (.+/.+/.+/.+)"}});

// with custom stats
compareOutputs(
OutputMatcher::compareOutputs(
::testing::UnitTest::GetInstance()->current_test_info()->name(),
printPlanWithStats(*op, task->taskStats(), true),
{{"-- Project\\[4\\]\\[expressions: \\(c0:INTEGER, ROW\\[\"c0\"\\]\\), \\(p1:BIGINT, plus\\(ROW\\[\"c1\"\\],1\\)\\), \\(p2:BIGINT, plus\\(ROW\\[\"c1\"\\],ROW\\[\"u_c1\"\\]\\)\\)\\] -> c0:INTEGER, p1:BIGINT, p2:BIGINT"},
Expand Down Expand Up @@ -252,15 +216,15 @@ TEST_F(PrintPlanWithStatsTest, partialAggregateWithTableScan) {
.assertResults(
"SELECT c5, max(c0), sum(c1), sum(c2), sum(c3), sum(c4) FROM tmp group by c5");
ensureTaskCompletion(task.get());
compareOutputs(
OutputMatcher::compareOutputs(
::testing::UnitTest::GetInstance()->current_test_info()->name(),
printPlanWithStats(*op, task->taskStats()),
{{"-- Aggregation\\[1\\]\\[PARTIAL \\[c5\\] a0 := max\\(ROW\\[\"c0\"\\]\\), a1 := sum\\(ROW\\[\"c1\"\\]\\), a2 := sum\\(ROW\\[\"c2\"\\]\\), a3 := sum\\(ROW\\[\"c3\"\\]\\), a4 := sum\\(ROW\\[\"c4\"\\]\\)\\] -> c5:VARCHAR, a0:BIGINT, a1:BIGINT, a2:BIGINT, a3:DOUBLE, a4:DOUBLE"},
{" Output: .+, Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1, CPU breakdown: B/I/O/F (.+/.+/.+/.+)"},
{" -- TableScan\\[0\\]\\[table: hive_table\\] -> c0:BIGINT, c1:INTEGER, c2:SMALLINT, c3:REAL, c4:DOUBLE, c5:VARCHAR"},
{" Input: 10000 rows \\(.+\\), Output: 10000 rows \\(.+\\), Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1, Splits: 1, CPU breakdown: B/I/O/F (.+/.+/.+/.+)"}});

compareOutputs(
OutputMatcher::compareOutputs(
::testing::UnitTest::GetInstance()->current_test_info()->name(),
printPlanWithStats(*op, task->taskStats(), true),
{{"-- Aggregation\\[1\\]\\[PARTIAL \\[c5\\] a0 := max\\(ROW\\[\"c0\"\\]\\), a1 := sum\\(ROW\\[\"c1\"\\]\\), a2 := sum\\(ROW\\[\"c2\"\\]\\), a3 := sum\\(ROW\\[\"c3\"\\]\\), a4 := sum\\(ROW\\[\"c4\"\\]\\)\\] -> c5:VARCHAR, a0:BIGINT, a1:BIGINT, a2:BIGINT, a3:DOUBLE, a4:DOUBLE"},
Expand Down Expand Up @@ -325,15 +289,15 @@ TEST_F(PrintPlanWithStatsTest, tableWriterWithTableScan) {
.splits(makeHiveConnectorSplits({filePath}))
.copyResults(pool(), task);
ensureTaskCompletion(task.get());
compareOutputs(
OutputMatcher::compareOutputs(
::testing::UnitTest::GetInstance()->current_test_info()->name(),
printPlanWithStats(*writePlan, task->taskStats()),
{{R"(-- TableWrite\[1\]\[.+InsertTableHandle .+)"},
{" Output: .+, Physical written output: .+, Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1, CPU breakdown: B/I/O/F (.+/.+/.+/.+)"},
{R"( -- TableScan\[0\]\[table: hive_table\] -> c0:BIGINT, c1:INTEGER, c2:SMALLINT, c3:REAL, c4:DOUBLE, c5:VARCHAR)"},
{R"( Input: 100 rows \(.+\), Output: 100 rows \(.+\), Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1, Splits: 1, CPU breakdown: B/I/O/F (.+/.+/.+/.+))"}});

compareOutputs(
OutputMatcher::compareOutputs(
::testing::UnitTest::GetInstance()->current_test_info()->name(),
printPlanWithStats(*writePlan, task->taskStats(), true),
{{R"(-- TableWrite\[1\]\[.+InsertTableHandle .+)"},
Expand Down

0 comments on commit e46cb76

Please sign in to comment.