Skip to content

Commit

Permalink
Revert "misc: Add toString() to OutputBuffer stats (facebookincubator…
Browse files Browse the repository at this point in the history
…#11562)"

This reverts commit e46cb76.
  • Loading branch information
majetideepak committed Dec 18, 2024
1 parent 01e755c commit d6c7a2f
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 199 deletions.
4 changes: 0 additions & 4 deletions velox/common/testutil/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,5 @@ velox_add_library(velox_test_util ScopedTestTime.cpp TestValue.cpp)
velox_link_libraries(velox_test_util PUBLIC velox_exception)

if(${VELOX_BUILD_TESTING})
velox_add_library(velox_test_output_matcher OutputMatcher.cpp)
velox_link_libraries(velox_test_output_matcher PUBLIC Folly::folly
GTest::gtest re2::re2)

add_subdirectory(tests)
endif()
67 changes: 0 additions & 67 deletions velox/common/testutil/OutputMatcher.cpp

This file was deleted.

47 changes: 0 additions & 47 deletions velox/common/testutil/OutputMatcher.h

This file was deleted.

36 changes: 1 addition & 35 deletions velox/exec/OutputBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,18 +115,6 @@ class DestinationBuffer {
int64_t bytesSent{0};
int64_t rowsSent{0};
int64_t pagesSent{0};

std::string toString() const {
return fmt::format(
"[finished: {}, bytesBuffered: {}, rowsBuffered: {}, pagesBuffered: {}, bytesSent: {}, rowsSent: {}, pagesSent:{}]",
finished,
succinctBytes(bytesBuffered),
rowsBuffered,
pagesBuffered,
succinctBytes(bytesSent),
rowsSent,
pagesSent);
}
};

void enqueue(std::shared_ptr<SerializedPage> data);
Expand Down Expand Up @@ -266,29 +254,7 @@ class OutputBuffer {
/// Stats of the OutputBuffer's destinations.
std::vector<DestinationBuffer::Stats> buffersStats;

std::string toString() const {
std::string destinationBufferStats;
if (!buffersStats.empty()) {
for (int i = 0; i < buffersStats.size(); i++) {
auto& destinationBufferStat = buffersStats[i];
destinationBufferStats +=
fmt::format(" D{}: {}\n", i, destinationBufferStat.toString());
}
}

return fmt::format(
"[bufferedBytes: {}, bufferedPages: {}, "
"totalBytesSent: {}, totalRowsSent: {}, totalPagesSent: {}, "
"averageBufferTimeMs: {}, numTopBuffers: {}]\n{}",
succinctBytes(bufferedBytes),
bufferedPages,
succinctBytes(totalBytesSent),
totalRowsSent,
totalPagesSent,
averageBufferTimeMs,
numTopBuffers,
destinationBufferStats);
}
std::string toString() const;
};

OutputBuffer(
Expand Down
1 change: 0 additions & 1 deletion velox/exec/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ target_link_libraries(
velox_hive_connector
velox_memory
velox_serialization
velox_test_output_matcher
velox_test_util
velox_type
velox_type_test_lib
Expand Down
39 changes: 1 addition & 38 deletions velox/exec/tests/OutputBufferManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
#include <gtest/gtest.h>
#include "folly/experimental/EventCount.h"
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/testutil/OutputMatcher.h"
#include "velox/dwio/common/tests/utils/BatchMaker.h"
#include "velox/exec/Task.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/SerializedPageUtil.h"
#include "velox/serializers/CompactRowSerializer.h"
#include "velox/serializers/PrestoSerializer.h"
#include "velox/serializers/UnsafeRowSerializer.h"

using namespace facebook::velox;
Expand Down Expand Up @@ -1323,42 +1323,6 @@ TEST_P(AllOutputBufferManagerTest, outputBufferUtilization) {
verifyOutputBuffer(task, OutputBufferStatus::kFinished);
}

TEST_P(AllOutputBufferManagerTest, printOutputBufferStats) {
const vector_size_t vectorSize = 100;
const std::string taskId = std::to_string(folly::Random::rand32());
const int numDestinations = 4;
initializeTask(
taskId,
rowType_,
core::PartitionedOutputNode::Kind::kPartitioned,
numDestinations,
1);

const int numPages = numDestinations;
for (int pageId = 0; pageId < numPages; ++pageId) {
enqueue(taskId, pageId, rowType_, vectorSize);
fetchOneAndAck(taskId, pageId, 0);
}

const auto statsEnqueue = getStats(taskId);
OutputMatcher::compareOutputs(
::testing::UnitTest::GetInstance()->current_test_info()->name(),
statsEnqueue.toString(),
{{"\\[bufferedBytes: ([\\d.]+[KMGT]?[B]?), bufferedPages: (\\d+), totalBytesSent: ([\\d.]+[KMGT]?[B]?), totalRowsSent: (\\d+), totalPagesSent: (\\d+), averageBufferTimeMs: (\\d+), numTopBuffers: (\\d+)\\]"},
{"\\s*D0: \\[finished: (true|false), bytesBuffered: ([\\d.]+[KMGT]?[B]?), rowsBuffered: (\\d+), pagesBuffered: (\\d+), bytesSent: ([\\d.]+[KMGT]?[B]?), rowsSent: (\\d+), pagesSent:(\\d+)\\]"},
{"\\s*D1: \\[finished: (true|false), bytesBuffered: ([\\d.]+[KMGT]?[B]?), rowsBuffered: (\\d+), pagesBuffered: (\\d+), bytesSent: ([\\d.]+[KMGT]?[B]?), rowsSent: (\\d+), pagesSent:(\\d+)\\]"},
{"\\s*D2: \\[finished: (true|false), bytesBuffered: ([\\d.]+[KMGT]?[B]?), rowsBuffered: (\\d+), pagesBuffered: (\\d+), bytesSent: ([\\d.]+[KMGT]?[B]?), rowsSent: (\\d+), pagesSent:(\\d+)\\]"},
{"\\s*D3: \\[finished: (true|false), bytesBuffered: ([\\d.]+[KMGT]?[B]?), rowsBuffered: (\\d+), pagesBuffered: (\\d+), bytesSent: ([\\d.]+[KMGT]?[B]?), rowsSent: (\\d+), pagesSent:(\\d+)\\]"}});

bufferManager_->updateOutputBuffers(taskId, numDestinations, true);
noMoreData(taskId);
for (int pageId = 0; pageId < numPages; ++pageId) {
fetchEndMarker(taskId, pageId, 1);
deleteResults(taskId, pageId);
}
bufferManager_->removeTask(taskId);
}

TEST_P(AllOutputBufferManagerTest, outputBufferStats) {
const vector_size_t vectorSize = 100;
const std::string taskId = std::to_string(folly::Random::rand32());
Expand Down Expand Up @@ -1390,7 +1354,6 @@ TEST_P(AllOutputBufferManagerTest, outputBufferStats) {
fetchOne(taskId, 0, pageId);
}
const auto statsEnqueue = getStats(taskId);
std::cout << statsEnqueue.toString();
ASSERT_EQ(statsEnqueue.buffersStats[0].pagesBuffered, 1);
ASSERT_EQ(statsEnqueue.buffersStats[0].rowsBuffered, vectorSize);
if (outputKind_ == core::PartitionedOutputNode::Kind::kBroadcast) {
Expand Down
50 changes: 43 additions & 7 deletions velox/exec/tests/PrintPlanWithStatsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,56 @@
* limitations under the License.
*/

#include "velox/common/testutil/OutputMatcher.h"
#include "velox/exec/PlanNodeStats.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
#include "velox/exec/tests/utils/HiveConnectorTestBase.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"

#include <gtest/gtest.h>
#include <re2/re2.h>

using namespace facebook::velox;
using namespace facebook::velox::exec::test;

using facebook::velox::exec::test::PlanBuilder;

class PrintPlanWithStatsTest : public HiveConnectorTestBase {};

struct ExpectedLine {
std::string line;
bool optional = false;
};

void compareOutputs(
const std::string& testName,
const std::string& result,
const std::vector<ExpectedLine>& expectedRegex) {
std::string line;
std::string eline;
std::istringstream iss(result);
int lineCount = 0;
int expectedLineIndex = 0;
for (; std::getline(iss, line);) {
lineCount++;
std::vector<std::string> potentialLines;
auto expectedLine = expectedRegex.at(expectedLineIndex++);
while (!RE2::FullMatch(line, expectedLine.line)) {
potentialLines.push_back(expectedLine.line);
if (!expectedLine.optional) {
ASSERT_FALSE(true) << "Output did not match " << "Source:" << testName
<< ", Line number:" << lineCount
<< ", Line: " << line << ", Expected Line one of: "
<< folly::join(",", potentialLines);
}
expectedLine = expectedRegex.at(expectedLineIndex++);
}
}
for (int i = expectedLineIndex; i < expectedRegex.size(); i++) {
ASSERT_TRUE(expectedRegex[expectedLineIndex].optional);
}
}

void ensureTaskCompletion(exec::Task* task) {
// ASSERT_TRUE requires a function with return type void.
ASSERT_TRUE(waitForTaskCompletion(task));
Expand Down Expand Up @@ -93,7 +129,7 @@ TEST_F(PrintPlanWithStatsTest, innerJoinWithTableScan) {
"SELECT t.c0, t.c1 + 1, t.c1 + u.c1 FROM t, u WHERE t.c0 = u.c0");

ensureTaskCompletion(task.get());
OutputMatcher::compareOutputs(
compareOutputs(
::testing::UnitTest::GetInstance()->current_test_info()->name(),
printPlanWithStats(*op, task->taskStats()),
{{"-- Project\\[4\\]\\[expressions: \\(c0:INTEGER, ROW\\[\"c0\"\\]\\), \\(p1:BIGINT, plus\\(ROW\\[\"c1\"\\],1\\)\\), \\(p2:BIGINT, plus\\(ROW\\[\"c1\"\\],ROW\\[\"u_c1\"\\]\\)\\)\\] -> c0:INTEGER, p1:BIGINT, p2:BIGINT"},
Expand All @@ -110,7 +146,7 @@ TEST_F(PrintPlanWithStatsTest, innerJoinWithTableScan) {
{" Input: 0 rows \\(.+\\), Output: 100 rows \\(.+\\), Cpu time: .+, Blocked wall time: .+, Peak memory: 0B, Memory allocations: .+, Threads: 1, CPU breakdown: B/I/O/F (.+/.+/.+/.+)"}});

// with custom stats
OutputMatcher::compareOutputs(
compareOutputs(
::testing::UnitTest::GetInstance()->current_test_info()->name(),
printPlanWithStats(*op, task->taskStats(), true),
{{"-- Project\\[4\\]\\[expressions: \\(c0:INTEGER, ROW\\[\"c0\"\\]\\), \\(p1:BIGINT, plus\\(ROW\\[\"c1\"\\],1\\)\\), \\(p2:BIGINT, plus\\(ROW\\[\"c1\"\\],ROW\\[\"u_c1\"\\]\\)\\)\\] -> c0:INTEGER, p1:BIGINT, p2:BIGINT"},
Expand Down Expand Up @@ -216,15 +252,15 @@ TEST_F(PrintPlanWithStatsTest, partialAggregateWithTableScan) {
.assertResults(
"SELECT c5, max(c0), sum(c1), sum(c2), sum(c3), sum(c4) FROM tmp group by c5");
ensureTaskCompletion(task.get());
OutputMatcher::compareOutputs(
compareOutputs(
::testing::UnitTest::GetInstance()->current_test_info()->name(),
printPlanWithStats(*op, task->taskStats()),
{{"-- Aggregation\\[1\\]\\[PARTIAL \\[c5\\] a0 := max\\(ROW\\[\"c0\"\\]\\), a1 := sum\\(ROW\\[\"c1\"\\]\\), a2 := sum\\(ROW\\[\"c2\"\\]\\), a3 := sum\\(ROW\\[\"c3\"\\]\\), a4 := sum\\(ROW\\[\"c4\"\\]\\)\\] -> c5:VARCHAR, a0:BIGINT, a1:BIGINT, a2:BIGINT, a3:DOUBLE, a4:DOUBLE"},
{" Output: .+, Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1, CPU breakdown: B/I/O/F (.+/.+/.+/.+)"},
{" -- TableScan\\[0\\]\\[table: hive_table\\] -> c0:BIGINT, c1:INTEGER, c2:SMALLINT, c3:REAL, c4:DOUBLE, c5:VARCHAR"},
{" Input: 10000 rows \\(.+\\), Output: 10000 rows \\(.+\\), Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1, Splits: 1, CPU breakdown: B/I/O/F (.+/.+/.+/.+)"}});

OutputMatcher::compareOutputs(
compareOutputs(
::testing::UnitTest::GetInstance()->current_test_info()->name(),
printPlanWithStats(*op, task->taskStats(), true),
{{"-- Aggregation\\[1\\]\\[PARTIAL \\[c5\\] a0 := max\\(ROW\\[\"c0\"\\]\\), a1 := sum\\(ROW\\[\"c1\"\\]\\), a2 := sum\\(ROW\\[\"c2\"\\]\\), a3 := sum\\(ROW\\[\"c3\"\\]\\), a4 := sum\\(ROW\\[\"c4\"\\]\\)\\] -> c5:VARCHAR, a0:BIGINT, a1:BIGINT, a2:BIGINT, a3:DOUBLE, a4:DOUBLE"},
Expand Down Expand Up @@ -289,15 +325,15 @@ TEST_F(PrintPlanWithStatsTest, tableWriterWithTableScan) {
.splits(makeHiveConnectorSplits({filePath}))
.copyResults(pool(), task);
ensureTaskCompletion(task.get());
OutputMatcher::compareOutputs(
compareOutputs(
::testing::UnitTest::GetInstance()->current_test_info()->name(),
printPlanWithStats(*writePlan, task->taskStats()),
{{R"(-- TableWrite\[1\]\[.+InsertTableHandle .+)"},
{" Output: .+, Physical written output: .+, Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1, CPU breakdown: B/I/O/F (.+/.+/.+/.+)"},
{R"( -- TableScan\[0\]\[table: hive_table\] -> c0:BIGINT, c1:INTEGER, c2:SMALLINT, c3:REAL, c4:DOUBLE, c5:VARCHAR)"},
{R"( Input: 100 rows \(.+\), Output: 100 rows \(.+\), Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1, Splits: 1, CPU breakdown: B/I/O/F (.+/.+/.+/.+))"}});

OutputMatcher::compareOutputs(
compareOutputs(
::testing::UnitTest::GetInstance()->current_test_info()->name(),
printPlanWithStats(*writePlan, task->taskStats(), true),
{{R"(-- TableWrite\[1\]\[.+InsertTableHandle .+)"},
Expand Down

0 comments on commit d6c7a2f

Please sign in to comment.