Skip to content

Commit

Permalink
Merge actions support (#8982)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #8982

Let's support different kind of mergings.

Reviewed By: Yuhta

Differential Revision: D54605507

fbshipit-source-id: 9dc95de37f1c645109688e97eb4d46cef90ad3ad
  • Loading branch information
Daniel Munoz authored and facebook-github-bot committed Mar 7, 2024
1 parent 09ba220 commit 6f189b0
Show file tree
Hide file tree
Showing 2 changed files with 225 additions and 22 deletions.
96 changes: 75 additions & 21 deletions velox/dwio/common/ResultOrActions.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,85 @@ class ResultOrActions {
return std::get<ResultType>(resultOrActions_);
}

folly::Range<std::function<ActionSignature>*> actions() {
std::vector<std::function<ActionSignature>>& actions() {
switch (resultOrActions_.index()) {
case 0: // Actions
return {
std::get<std::vector<std::function<ActionSignature>>>(
resultOrActions_)
.data(),
std::get<std::vector<std::function<ActionSignature>>>(
resultOrActions_)
.size()};
return std::get<std::vector<std::function<ActionSignature>>>(
resultOrActions_);
case 1: // Result
VELOX_FAIL("Can't get actions of class that has a result");
default:
VELOX_UNREACHABLE();
}
}

const std::vector<std::function<ActionSignature>>& actions() const {
switch (resultOrActions_.index()) {
case 0: // Actions
return std::get<const std::vector<std::function<ActionSignature>>>(
resultOrActions_);
case 1: // Result
return {};
VELOX_FAIL("Can't get actions of class that has a result");
default:
VELOX_FAIL("Unexpected variant index");
VELOX_UNREACHABLE();
}
}

// Merge all contained actions into one, so that they can't be executed in
// parallel later.
void mergeActions() {
VELOX_CHECK(
resultOrActions_.index() == 0,
"Can't merge actions of an object that has a result");
resultOrActions_ = std::vector<std::function<ActionSignature>>{
[actions = std::move(actions())]() {
for (auto& action : actions) {
action();
}
}};
}

void moveActionsBack(
std::vector<std::function<ActionSignature>>&& otherActions) {
VELOX_CHECK(
resultOrActions_.index() == 0,
"Can't move actions to an object that has a result");
auto& thisActions =
std::get<std::vector<std::function<ActionSignature>>>(resultOrActions_);
if (thisActions.empty()) {
thisActions = std::move(otherActions);
} else {
thisActions.reserve(thisActions.size() + otherActions.size());
std::move(
otherActions.begin(),
otherActions.end(),
std::back_inserter(thisActions));
}
}

void moveActionsFront(
std::vector<std::function<ActionSignature>>&& otherActions) {
VELOX_CHECK(
resultOrActions_.index() == 0,
"Can't move actions to an object that has a result");
auto& thisActions =
std::get<std::vector<std::function<ActionSignature>>>(resultOrActions_);
if (thisActions.empty()) {
thisActions = std::move(otherActions);
} else {
// Extend thisActions by otherActions.size() elements and pad with nullptr
// functions.
thisActions.resize(thisActions.size() + otherActions.size());

// Move right by otherActions.size() elements.
std::move(
thisActions.rbegin() + otherActions.size(),
thisActions.rend(),
thisActions.rbegin());

// Move elements from otherActions to the beginning of thisActions.
// nullptr functions will be overwritten.
std::move(otherActions.begin(), otherActions.end(), thisActions.begin());
}
}

Expand Down Expand Up @@ -125,17 +190,6 @@ class ResultOrActions {
template <typename OtherResultType, typename OtherActionSignature>
friend class ResultOrActions;

void mergeAction(std::function<ActionSignature> action) {
switch (resultOrActions_.index()) {
case 0: // Actions
std::get<std::vector<std::function<ActionSignature>>>(resultOrActions_)
.push_back(std::move(action));
break;
case 1: // Result
VELOX_FAIL("Can't merge actions if destination class has a result");
}
}

std::variant<std::vector<std::function<ActionSignature>>, ResultType>
resultOrActions_;
};
Expand Down
151 changes: 150 additions & 1 deletion velox/dwio/common/tests/ResultOrActionsTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ TEST(ResultOrActionsTest, HasResult) {
ReaderResult readerResult(10);
ASSERT_TRUE(readerResult.hasResult());
EXPECT_EQ(readerResult.result(), 10);
EXPECT_EQ(readerResult.actions().size(), 0);
EXPECT_THAT(
[&]() { readerResult.actions(); },
Throws<facebook::velox::VeloxRuntimeError>(Property(
&facebook::velox::VeloxRuntimeError::message,
HasSubstr("Can't get actions of class that has a result"))));
}

TEST(ResultOrActionsTest, Void) {
Expand All @@ -81,6 +85,58 @@ TEST(ResultOrActionsTest, Void) {
}
}

TEST(ResultOrActionsTest, MoveBackToResult) {
SplitResult splitResult(1);
ActionsMock executedActions;

ReaderResult readerResult(getAction(executedActions));

EXPECT_THAT(
[&]() { splitResult.moveActionsBack(std::move(readerResult.actions())); },
Throws<facebook::velox::VeloxRuntimeError>(Property(
&facebook::velox::VeloxRuntimeError::message,
HasSubstr("Can't move actions to an object that has a result"))));
}

TEST(ResultOrActionsTest, MoveFrontToResult) {
SplitResult splitResult(1);
ActionsMock executedActions;

ReaderResult readerResult(getAction(executedActions));

EXPECT_THAT(
[&]() {
splitResult.moveActionsFront(std::move(readerResult.actions()));
},
Throws<facebook::velox::VeloxRuntimeError>(Property(
&facebook::velox::VeloxRuntimeError::message,
HasSubstr("Can't move actions to an object that has a result"))));
}

TEST(ResultOrActionsTest, MoveBackFromResult) {
SplitResult splitResult(1);
ReaderResult readerResult(1);

EXPECT_THAT(
[&]() { splitResult.moveActionsBack(std::move(readerResult.actions())); },
Throws<facebook::velox::VeloxRuntimeError>(Property(
&facebook::velox::VeloxRuntimeError::message,
HasSubstr("Can't get actions of class that has a result"))));
}

TEST(ResultOrActionsTest, MoveFrontFromResult) {
SplitResult splitResult(1);
ReaderResult readerResult(1);

EXPECT_THAT(
[&]() {
splitResult.moveActionsFront(std::move(readerResult.actions()));
},
Throws<facebook::velox::VeloxRuntimeError>(Property(
&facebook::velox::VeloxRuntimeError::message,
HasSubstr("Can't get actions of class that has a result"))));
}

using ReaderResultTypes = ::testing::Types<ReaderResult, VoidResult>;
TYPED_TEST_SUITE(ResultOrActionsTypedTest, ReaderResultTypes);

Expand Down Expand Up @@ -138,6 +194,99 @@ TYPED_TEST(ResultOrActionsTypedTest, ActionsNeeded) {
EXPECT_EQ(executedActions, ActionsMock({2, 2}));
}

TYPED_TEST(ResultOrActionsTypedTest, MoveActionsBackIncremental) {
SplitResult splitResult;
ActionsMock executedActions;
{
ReaderResult readerResult(getAction(executedActions));
splitResult.moveActionsBack(std::move(readerResult.actions()));
}
ASSERT_EQ(splitResult.actions().size(), 1);
EXPECT_EQ(executedActions, ActionsMock({0}));
splitResult.actions()[0]();
EXPECT_EQ(executedActions, ActionsMock({1}));
{
ReaderResult readerResult(getAction(executedActions));
splitResult.moveActionsBack(std::move(readerResult.actions()));
}
ASSERT_EQ(splitResult.actions().size(), 2);
EXPECT_EQ(executedActions, ActionsMock({1, 0}));
splitResult.actions()[0]();
EXPECT_EQ(executedActions, ActionsMock({2, 0}));
splitResult.actions()[1]();
EXPECT_EQ(executedActions, ActionsMock({2, 1}));
{
ReaderResult readerResult(getActions(executedActions, 2));
splitResult.moveActionsBack(std::move(readerResult.actions()));
}
ASSERT_EQ(splitResult.actions().size(), 4);
EXPECT_EQ(executedActions, ActionsMock({2, 1, 0, 0}));
splitResult.actions()[2]();
EXPECT_EQ(executedActions, ActionsMock({2, 1, 1, 0}));
splitResult.actions()[3]();
EXPECT_EQ(executedActions, ActionsMock({2, 1, 1, 1}));
EXPECT_EQ(splitResult.runAllActions(), 4);
EXPECT_EQ(executedActions, ActionsMock({3, 2, 2, 2}));
}

TYPED_TEST(ResultOrActionsTypedTest, MoveActionsFrontIncremental) {
SplitResult splitResult;
ActionsMock executedActions;
{
ReaderResult readerResult(getAction(executedActions));
splitResult.moveActionsFront(std::move(readerResult.actions()));
}
ASSERT_EQ(splitResult.actions().size(), 1);
EXPECT_EQ(executedActions, ActionsMock({0}));
splitResult.actions()[0]();
EXPECT_EQ(executedActions, ActionsMock({1}));
{
ReaderResult readerResult(getAction(executedActions));
splitResult.moveActionsFront(std::move(readerResult.actions()));
}
ASSERT_EQ(splitResult.actions().size(), 2);
EXPECT_EQ(executedActions, ActionsMock({1, 0}));
splitResult.actions()[0]();
EXPECT_EQ(executedActions, ActionsMock({1, 1}));
splitResult.actions()[1]();
EXPECT_EQ(executedActions, ActionsMock({2, 1}));
{
ReaderResult readerResult(getActions(executedActions, 2));
splitResult.moveActionsFront(std::move(readerResult.actions()));
}
ASSERT_EQ(splitResult.actions().size(), 4);
EXPECT_EQ(executedActions, ActionsMock({2, 1, 0, 0}));
splitResult.actions()[0]();
EXPECT_EQ(executedActions, ActionsMock({2, 1, 1, 0}));
splitResult.actions()[1]();
EXPECT_EQ(executedActions, ActionsMock({2, 1, 1, 1}));
// Actions 2 and 3 were already rotated in the second moveActionsFront, so
// they don't map to indices 0 and 1, but to 1 and 0.
splitResult.actions()[2]();
EXPECT_EQ(executedActions, ActionsMock({2, 2, 1, 1}));
splitResult.actions()[3]();
EXPECT_EQ(executedActions, ActionsMock({3, 2, 1, 1}));
EXPECT_EQ(splitResult.runAllActions(), 4);
EXPECT_EQ(executedActions, ActionsMock({4, 3, 2, 2}));
}

TYPED_TEST(ResultOrActionsTypedTest, MergeActions) {
ActionsMock executedActions;
ReaderResult readerResult(getActions(executedActions, 2));
EXPECT_EQ(readerResult.actions().size(), 2);
EXPECT_EQ(executedActions, ActionsMock({0, 0}));
readerResult.actions()[0]();
EXPECT_EQ(executedActions, ActionsMock({1, 0}));
readerResult.actions()[1]();
EXPECT_EQ(executedActions, ActionsMock({1, 1}));
readerResult.mergeActions();
EXPECT_EQ(readerResult.actions().size(), 1);
readerResult.actions()[0]();
EXPECT_EQ(executedActions, ActionsMock({2, 2}));
EXPECT_EQ(readerResult.runAllActions(), 1);
EXPECT_EQ(executedActions, ActionsMock({3, 3}));
}

TYPED_TEST(ResultOrActionsTypedTest, MoveConstructor) {
ActionsMock executedActions;
ReaderResult readerResult(getActions(executedActions, 2));
Expand Down

0 comments on commit 6f189b0

Please sign in to comment.