Skip to content

Commit

Permalink
[#214] Implemented force fetch-store ops
Browse files Browse the repository at this point in the history
  • Loading branch information
VitoCastellana committed Mar 6, 2022
1 parent cebc6cc commit b96ab4c
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 0 deletions.
85 changes: 85 additions & 0 deletions include/shad/data_structures/atomic.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,48 @@ class Atomic : public AbstractDataStructure<Atomic<T>> {
rt::executeAt(ownerLoc_, StoreFun, args);
}


/// @brief Async Atomic Fetch-Store. Attempts at atomically storing
/// the results of binop unitil succesful.
///
/// @tparam ArgT Type of rhs for the BinaryOp operator.
/// @tparam BinaryOp User-defined binary operator T (const T& lhs, const ArgT& rhs).
/// @param[in,out] h The handle to be used to wait for completion.
/// @param[in] desired_arg Non atomic rhs value for binop.
/// @param[in] binop Binary operator. lhs is atomic's value, rhs is desired_arg.
/// @return The value fetched when Store was successful.
template <typename ArgT, typename BinaryOp>
T ForceFetchStore(ArgT desired_arg, BinaryOp binop) {
if (ownerLoc_ == rt::thisLocality()) {
auto old_value = localInstance_.load();
T desired = binop(old_value, desired_arg);
while(!atomic_compare_exchange_weak(&localInstance_,
&old_value, desired)) {
old_value = localInstance_.load();
desired = binop(old_value, desired_arg);
}
return old_value;
}
using StoreArgs = std::tuple<ObjectID, ArgT, BinaryOp>;
auto StoreFun = [](const StoreArgs &args, T*res) {
auto ptr = Atomic<T>::GetPtr(std::get<0>(args));
auto old_value = ptr->localInstance_.load();
auto desired_arg = std::get<1>(args);
auto binop = std::get<2>(args);
T desired = binop(old_value, desired_arg);
while(!atomic_compare_exchange_weak(&ptr->localInstance_,
&old_value, desired)) {
old_value = ptr->localInstance_.load();
desired = binop(old_value, desired_arg);
}
*res = old_value;
};
StoreArgs args(oid_, desired_arg, binop);
T res;
rt::executeAtWithRet(ownerLoc_, StoreFun, args, &res);
return res;
}

/// @brief Async Atomic Store.
///
/// @param[in,out] h The handle to be used to wait for completion.
Expand Down Expand Up @@ -266,6 +308,49 @@ class Atomic : public AbstractDataStructure<Atomic<T>> {
rt::asyncExecuteAt(h, ownerLoc_, StoreFun, args);
}

/// @brief Async Atomic Fetch-Store. Attempts at atomically storing
/// the results of binop unitil succesful.
///
/// @tparam ArgT Type of rhs for the BinaryOp operator.
/// @tparam BinaryOp User-defined binary operator T (const T& lhs, const ArgT& rhs).
/// @param[in,out] h The handle to be used to wait for completion.
/// @param[in] desired_arg Non atomic rhs value for binop.
/// @param[in] binop Binary operator. lhs is atomic's value, rhs is desired_arg.
/// @param[out] res Pointer to the region where the result is
/// written; res must point to a valid memory allocation.
/// Result is the value fetched when Store was successful
template <typename ArgT, typename BinaryOp>
void AsyncForceFetchStore(rt::Handle &h, ArgT desired_arg,
BinaryOp binop, T* res) {
if (ownerLoc_ == rt::thisLocality()) {
auto old_value = localInstance_.load();
T desired = binop(old_value, desired_arg);
while(!atomic_compare_exchange_weak(&localInstance_,
&old_value, desired)) {
old_value = localInstance_.load();
desired = binop(old_value, desired_arg);
}
*res = old_value;
return;
}
using StoreArgs = std::tuple<ObjectID, ArgT, BinaryOp>;
auto StoreFun = [](rt::Handle&, const StoreArgs &args, T* res) {
auto ptr = Atomic<T>::GetPtr(std::get<0>(args));
auto old_value = ptr->localInstance_.load();
auto desired_arg = std::get<1>(args);
auto binop = std::get<2>(args);
T desired = binop(old_value, desired_arg);
while(!atomic_compare_exchange_weak(&ptr->localInstance_,
&old_value, desired)) {
old_value = ptr->localInstance_.load();
desired = binop(old_value, desired_arg);
}
*res = old_value;
};
StoreArgs args(oid_, desired_arg, binop);
rt::asyncExecuteAtWithRet(h, ownerLoc_, StoreFun, args, res);
}

/// @brief Compare and exchange operation.
///
/// @param[in] expected Value expected to be found in the atomic object.
Expand Down
60 changes: 60 additions & 0 deletions test/unit_tests/data_structures/atomic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,66 @@ TEST_F(AtomicTest, AsyncForceStore) {
destroy(ptrs);
}


TEST_F(AtomicTest, ForceFetchStore) {
auto locs = shad::rt::allLocalities();
std::vector<atomicPtr> ptrs(locs.size());
std::vector<int64_t> results(locs.size());
int64_t cnt = 0;
for (auto loc : locs) {
ptrs[cnt] = shad::Atomic<int64_t>::Create(kInitValue+cnt, loc);
results[cnt] = ptrs[cnt]->ForceFetchStore(cnt,
[](auto& a, auto& b) {return std::min(a, b);});
++cnt;
}

shad::rt::Handle h;
cnt = 0;
std::vector<int64_t> values(locs.size());
for (auto ptr : ptrs) {
ptr->AsyncLoad(h, &values[cnt]);
++cnt;
}
shad::rt::waitForCompletion(h);
cnt = 0;
for (int64_t value : values) {
ASSERT_EQ(value, cnt);
ASSERT_EQ(results[cnt], kInitValue+cnt);
++cnt;
}
destroy(ptrs);
}

TEST_F(AtomicTest, AsyncForceFetchStore) {
auto locs = shad::rt::allLocalities();
std::vector<atomicPtr> ptrs(locs.size());
std::vector<int64_t> results(locs.size());
int64_t cnt = 0;
shad::rt::Handle h;
for (auto loc : locs) {
ptrs[cnt] = shad::Atomic<int64_t>::Create(cnt, loc);
ptrs[cnt]->AsyncForceFetchStore(h, kInitValue + cnt,
[](auto& a, auto& b) {return std::max(a, b);},
&results[cnt]);
++cnt;
}
shad::rt::waitForCompletion(h);
cnt = 0;
std::vector<int64_t> values(locs.size());
for (auto ptr : ptrs) {
ptr->AsyncLoad(h, &values[cnt]);
++cnt;
}
shad::rt::waitForCompletion(h);
cnt = 0;
for (int64_t value : values) {
ASSERT_EQ(value, kInitValue+cnt);
ASSERT_EQ(results[cnt], cnt);
++cnt;
}
destroy(ptrs);
}

TEST_F(AtomicTest, SyncFetchAdd) {
auto locs = shad::rt::allLocalities();
std::vector<atomicPtr> ptrs(locs.size());
Expand Down

0 comments on commit b96ab4c

Please sign in to comment.