From 6f189b052a6d9a3bde653a817bbcd2eb2919bb58 Mon Sep 17 00:00:00 2001 From: Daniel Munoz Date: Wed, 6 Mar 2024 20:50:26 -0800 Subject: [PATCH] Merge actions support (#8982) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/8982 Let's support different kind of mergings. Reviewed By: Yuhta Differential Revision: D54605507 fbshipit-source-id: 9dc95de37f1c645109688e97eb4d46cef90ad3ad --- velox/dwio/common/ResultOrActions.h | 96 ++++++++--- .../common/tests/ResultOrActionsTests.cpp | 151 +++++++++++++++++- 2 files changed, 225 insertions(+), 22 deletions(-) diff --git a/velox/dwio/common/ResultOrActions.h b/velox/dwio/common/ResultOrActions.h index 1b126353c421..539eb7e681ae 100644 --- a/velox/dwio/common/ResultOrActions.h +++ b/velox/dwio/common/ResultOrActions.h @@ -83,20 +83,85 @@ class ResultOrActions { return std::get(resultOrActions_); } - folly::Range*> actions() { + std::vector>& actions() { switch (resultOrActions_.index()) { case 0: // Actions - return { - std::get>>( - resultOrActions_) - .data(), - std::get>>( - resultOrActions_) - .size()}; + return std::get>>( + resultOrActions_); + case 1: // Result + VELOX_FAIL("Can't get actions of class that has a result"); + default: + VELOX_UNREACHABLE(); + } + } + + const std::vector>& actions() const { + switch (resultOrActions_.index()) { + case 0: // Actions + return std::get>>( + resultOrActions_); case 1: // Result - return {}; + VELOX_FAIL("Can't get actions of class that has a result"); default: - VELOX_FAIL("Unexpected variant index"); + VELOX_UNREACHABLE(); + } + } + + // Merge all contained actions into one, so that they can't be executed in + // parallel later. + void mergeActions() { + VELOX_CHECK( + resultOrActions_.index() == 0, + "Can't merge actions of an object that has a result"); + resultOrActions_ = std::vector>{ + [actions = std::move(actions())]() { + for (auto& action : actions) { + action(); + } + }}; + } + + void moveActionsBack( + std::vector>&& otherActions) { + VELOX_CHECK( + resultOrActions_.index() == 0, + "Can't move actions to an object that has a result"); + auto& thisActions = + std::get>>(resultOrActions_); + if (thisActions.empty()) { + thisActions = std::move(otherActions); + } else { + thisActions.reserve(thisActions.size() + otherActions.size()); + std::move( + otherActions.begin(), + otherActions.end(), + std::back_inserter(thisActions)); + } + } + + void moveActionsFront( + std::vector>&& otherActions) { + VELOX_CHECK( + resultOrActions_.index() == 0, + "Can't move actions to an object that has a result"); + auto& thisActions = + std::get>>(resultOrActions_); + if (thisActions.empty()) { + thisActions = std::move(otherActions); + } else { + // Extend thisActions by otherActions.size() elements and pad with nullptr + // functions. + thisActions.resize(thisActions.size() + otherActions.size()); + + // Move right by otherActions.size() elements. + std::move( + thisActions.rbegin() + otherActions.size(), + thisActions.rend(), + thisActions.rbegin()); + + // Move elements from otherActions to the beginning of thisActions. + // nullptr functions will be overwritten. + std::move(otherActions.begin(), otherActions.end(), thisActions.begin()); } } @@ -125,17 +190,6 @@ class ResultOrActions { template friend class ResultOrActions; - void mergeAction(std::function action) { - switch (resultOrActions_.index()) { - case 0: // Actions - std::get>>(resultOrActions_) - .push_back(std::move(action)); - break; - case 1: // Result - VELOX_FAIL("Can't merge actions if destination class has a result"); - } - } - std::variant>, ResultType> resultOrActions_; }; diff --git a/velox/dwio/common/tests/ResultOrActionsTests.cpp b/velox/dwio/common/tests/ResultOrActionsTests.cpp index 7f8808c10cc7..86e524e6505a 100644 --- a/velox/dwio/common/tests/ResultOrActionsTests.cpp +++ b/velox/dwio/common/tests/ResultOrActionsTests.cpp @@ -58,7 +58,11 @@ TEST(ResultOrActionsTest, HasResult) { ReaderResult readerResult(10); ASSERT_TRUE(readerResult.hasResult()); EXPECT_EQ(readerResult.result(), 10); - EXPECT_EQ(readerResult.actions().size(), 0); + EXPECT_THAT( + [&]() { readerResult.actions(); }, + Throws(Property( + &facebook::velox::VeloxRuntimeError::message, + HasSubstr("Can't get actions of class that has a result")))); } TEST(ResultOrActionsTest, Void) { @@ -81,6 +85,58 @@ TEST(ResultOrActionsTest, Void) { } } +TEST(ResultOrActionsTest, MoveBackToResult) { + SplitResult splitResult(1); + ActionsMock executedActions; + + ReaderResult readerResult(getAction(executedActions)); + + EXPECT_THAT( + [&]() { splitResult.moveActionsBack(std::move(readerResult.actions())); }, + Throws(Property( + &facebook::velox::VeloxRuntimeError::message, + HasSubstr("Can't move actions to an object that has a result")))); +} + +TEST(ResultOrActionsTest, MoveFrontToResult) { + SplitResult splitResult(1); + ActionsMock executedActions; + + ReaderResult readerResult(getAction(executedActions)); + + EXPECT_THAT( + [&]() { + splitResult.moveActionsFront(std::move(readerResult.actions())); + }, + Throws(Property( + &facebook::velox::VeloxRuntimeError::message, + HasSubstr("Can't move actions to an object that has a result")))); +} + +TEST(ResultOrActionsTest, MoveBackFromResult) { + SplitResult splitResult(1); + ReaderResult readerResult(1); + + EXPECT_THAT( + [&]() { splitResult.moveActionsBack(std::move(readerResult.actions())); }, + Throws(Property( + &facebook::velox::VeloxRuntimeError::message, + HasSubstr("Can't get actions of class that has a result")))); +} + +TEST(ResultOrActionsTest, MoveFrontFromResult) { + SplitResult splitResult(1); + ReaderResult readerResult(1); + + EXPECT_THAT( + [&]() { + splitResult.moveActionsFront(std::move(readerResult.actions())); + }, + Throws(Property( + &facebook::velox::VeloxRuntimeError::message, + HasSubstr("Can't get actions of class that has a result")))); +} + using ReaderResultTypes = ::testing::Types; TYPED_TEST_SUITE(ResultOrActionsTypedTest, ReaderResultTypes); @@ -138,6 +194,99 @@ TYPED_TEST(ResultOrActionsTypedTest, ActionsNeeded) { EXPECT_EQ(executedActions, ActionsMock({2, 2})); } +TYPED_TEST(ResultOrActionsTypedTest, MoveActionsBackIncremental) { + SplitResult splitResult; + ActionsMock executedActions; + { + ReaderResult readerResult(getAction(executedActions)); + splitResult.moveActionsBack(std::move(readerResult.actions())); + } + ASSERT_EQ(splitResult.actions().size(), 1); + EXPECT_EQ(executedActions, ActionsMock({0})); + splitResult.actions()[0](); + EXPECT_EQ(executedActions, ActionsMock({1})); + { + ReaderResult readerResult(getAction(executedActions)); + splitResult.moveActionsBack(std::move(readerResult.actions())); + } + ASSERT_EQ(splitResult.actions().size(), 2); + EXPECT_EQ(executedActions, ActionsMock({1, 0})); + splitResult.actions()[0](); + EXPECT_EQ(executedActions, ActionsMock({2, 0})); + splitResult.actions()[1](); + EXPECT_EQ(executedActions, ActionsMock({2, 1})); + { + ReaderResult readerResult(getActions(executedActions, 2)); + splitResult.moveActionsBack(std::move(readerResult.actions())); + } + ASSERT_EQ(splitResult.actions().size(), 4); + EXPECT_EQ(executedActions, ActionsMock({2, 1, 0, 0})); + splitResult.actions()[2](); + EXPECT_EQ(executedActions, ActionsMock({2, 1, 1, 0})); + splitResult.actions()[3](); + EXPECT_EQ(executedActions, ActionsMock({2, 1, 1, 1})); + EXPECT_EQ(splitResult.runAllActions(), 4); + EXPECT_EQ(executedActions, ActionsMock({3, 2, 2, 2})); +} + +TYPED_TEST(ResultOrActionsTypedTest, MoveActionsFrontIncremental) { + SplitResult splitResult; + ActionsMock executedActions; + { + ReaderResult readerResult(getAction(executedActions)); + splitResult.moveActionsFront(std::move(readerResult.actions())); + } + ASSERT_EQ(splitResult.actions().size(), 1); + EXPECT_EQ(executedActions, ActionsMock({0})); + splitResult.actions()[0](); + EXPECT_EQ(executedActions, ActionsMock({1})); + { + ReaderResult readerResult(getAction(executedActions)); + splitResult.moveActionsFront(std::move(readerResult.actions())); + } + ASSERT_EQ(splitResult.actions().size(), 2); + EXPECT_EQ(executedActions, ActionsMock({1, 0})); + splitResult.actions()[0](); + EXPECT_EQ(executedActions, ActionsMock({1, 1})); + splitResult.actions()[1](); + EXPECT_EQ(executedActions, ActionsMock({2, 1})); + { + ReaderResult readerResult(getActions(executedActions, 2)); + splitResult.moveActionsFront(std::move(readerResult.actions())); + } + ASSERT_EQ(splitResult.actions().size(), 4); + EXPECT_EQ(executedActions, ActionsMock({2, 1, 0, 0})); + splitResult.actions()[0](); + EXPECT_EQ(executedActions, ActionsMock({2, 1, 1, 0})); + splitResult.actions()[1](); + EXPECT_EQ(executedActions, ActionsMock({2, 1, 1, 1})); + // Actions 2 and 3 were already rotated in the second moveActionsFront, so + // they don't map to indices 0 and 1, but to 1 and 0. + splitResult.actions()[2](); + EXPECT_EQ(executedActions, ActionsMock({2, 2, 1, 1})); + splitResult.actions()[3](); + EXPECT_EQ(executedActions, ActionsMock({3, 2, 1, 1})); + EXPECT_EQ(splitResult.runAllActions(), 4); + EXPECT_EQ(executedActions, ActionsMock({4, 3, 2, 2})); +} + +TYPED_TEST(ResultOrActionsTypedTest, MergeActions) { + ActionsMock executedActions; + ReaderResult readerResult(getActions(executedActions, 2)); + EXPECT_EQ(readerResult.actions().size(), 2); + EXPECT_EQ(executedActions, ActionsMock({0, 0})); + readerResult.actions()[0](); + EXPECT_EQ(executedActions, ActionsMock({1, 0})); + readerResult.actions()[1](); + EXPECT_EQ(executedActions, ActionsMock({1, 1})); + readerResult.mergeActions(); + EXPECT_EQ(readerResult.actions().size(), 1); + readerResult.actions()[0](); + EXPECT_EQ(executedActions, ActionsMock({2, 2})); + EXPECT_EQ(readerResult.runAllActions(), 1); + EXPECT_EQ(executedActions, ActionsMock({3, 3})); +} + TYPED_TEST(ResultOrActionsTypedTest, MoveConstructor) { ActionsMock executedActions; ReaderResult readerResult(getActions(executedActions, 2));