From 4cc1cec0b10fd383877c87d1edf3e658e5d10479 Mon Sep 17 00:00:00 2001 From: Vito G Castellana Date: Thu, 3 Feb 2022 20:55:54 -0800 Subject: [PATCH 1/2] [#206] Implemented AsyncGetElements with DMA --- include/shad/data_structures/array.h | 52 ++++++++++++++++++- test/unit_tests/data_structures/array_test.cc | 33 +++++++++++- 2 files changed, 82 insertions(+), 3 deletions(-) diff --git a/include/shad/data_structures/array.h b/include/shad/data_structures/array.h index f17fab50..169ecd93 100644 --- a/include/shad/data_structures/array.h +++ b/include/shad/data_structures/array.h @@ -458,6 +458,21 @@ class Array : public AbstractDataStructure> { data_[std::get<0>(entry)] = std::get<1>(entry); } + constexpr void FillPtrs() { + rt::executeOnAll([](const ObjectID &oid) { + auto This = Array::GetPtr(oid); + rt::executeOnAll([](const std::tuple &args) { + auto This = Array::GetPtr(std::get<0>(args)); + + This->ptrs_[(uint32_t)std::get<1>(args)] = std::get<2>(args); + }, + std::make_tuple(This->GetGlobalID(), rt::thisLocality(), This->data_.data())); + }, GetGlobalID()); + } + + void AsyncGetElements(rt::Handle& h, T* local_data, + const uint64_t idx, const uint64_t num_el); + protected: Array(ObjectID oid, size_t size, const T &initValue) : oid_(oid), @@ -467,7 +482,8 @@ class Array : public AbstractDataStructure> { : rt::numLocalities() - (size % rt::numLocalities())), data_(), dataDistribution_(), - buffers_(oid) { + buffers_(oid), + ptrs_(rt::numLocalities()) { rt::Locality pivot(pivot_); size_t start = 0; size_t chunkSize = size / rt::numLocalities(); @@ -498,6 +514,7 @@ class Array : public AbstractDataStructure> { std::vector data_; std::vector> dataDistribution_; BuffersVector buffers_; + std::vector ptrs_; struct InsertAtArgs { ObjectID oid; @@ -1073,6 +1090,39 @@ void Array::ForEach(ApplyFunT &&function, Args &... args) { rt::executeOnAll(feLambda, arguments); } +template +void Array::AsyncGetElements(rt::Handle& h, T* local_data, + const uint64_t idx, const uint64_t num_el) { + + size_t tgtPos = 0, firstPos = idx; + rt::Locality tgtLoc; + size_t remainingValues = num_el; + size_t chunkSize = 0; + T* tgtAddress; + + while (remainingValues > 0) { + if (firstPos < pivot_ * (size_ / rt::numLocalities())) { + tgtLoc = rt::Locality(firstPos / (size_ / rt::numLocalities())); + tgtPos = firstPos % (size_ / rt::numLocalities()); + chunkSize = + std::min((size_ / rt::numLocalities() - tgtPos), remainingValues); + } else { + size_t newPos = firstPos - (pivot_ * (size_ / rt::numLocalities())); + tgtLoc = + rt::Locality(pivot_ + newPos / ((size_ / rt::numLocalities() + 1))); + tgtPos = newPos % ((size_ / rt::numLocalities() + 1)); + chunkSize = + std::min((size_ / rt::numLocalities() + 1 - tgtPos), remainingValues); + } + + tgtAddress = ptrs_[(uint32_t)tgtLoc] + tgtPos; + rt::asyncDma(h, local_data, tgtLoc, tgtAddress, chunkSize); + local_data += chunkSize; + firstPos += chunkSize; + remainingValues -= chunkSize; + } +} + } // namespace shad #endif // INCLUDE_SHAD_DATA_STRUCTURES_ARRAY_H_ diff --git a/test/unit_tests/data_structures/array_test.cc b/test/unit_tests/data_structures/array_test.cc index 3fe491e6..d5fa6efb 100644 --- a/test/unit_tests/data_structures/array_test.cc +++ b/test/unit_tests/data_structures/array_test.cc @@ -122,7 +122,6 @@ TEST_F(ArrayTest, RangedAsyncInsertAndAsyncGet) { shad::rt::Handle handle; edsPtr->AsyncInsertAt(handle, 0, inputData_.data(), kArraySize); shad::rt::waitForCompletion(handle); - ; shad::rt::Handle handle2; for (size_t i = 0; i < kArraySize; i++) { @@ -136,6 +135,36 @@ TEST_F(ArrayTest, RangedAsyncInsertAndAsyncGet) { shad::Array::Destroy(edsPtr->GetGlobalID()); } +TEST_F(ArrayTest, RangedAsyncInsertAndAsyncGetElements) { + std::vector values(kArraySize); + + auto edsPtr = shad::Array::Create(kArraySize, kInitValue); + edsPtr->FillPtrs(); + shad::rt::Handle handle; + edsPtr->AsyncInsertAt(handle, 0, inputData_.data(), kArraySize); + shad::rt::waitForCompletion(handle); + + shad::rt::Handle handle2; + edsPtr->AsyncGetElements(handle2, values.data(), 0, kArraySize); + shad::rt::waitForCompletion(handle2); + + for (size_t i = 0; i < kArraySize; i++) { + ASSERT_EQ(values[i], i + 1); + } + + uint64_t to_insert2 = kArraySize/2; + uint64_t idx2 = kArraySize/6; + std::vector values2(to_insert2); + + edsPtr->AsyncGetElements(handle2, values2.data(), idx2, to_insert2); + shad::rt::waitForCompletion(handle2); + + for (size_t i = 0; i < to_insert2; i++) { + ASSERT_EQ(values2[i], i + idx2 + 1); + } + shad::Array::Destroy(edsPtr->GetGlobalID()); +} + TEST_F(ArrayTest, BufferedSyncInsertAndSyncGet) { auto edsPtr = shad::Array::Create(kArraySize, kInitValue); for (size_t i = 0; i < kArraySize; i++) { @@ -367,4 +396,4 @@ TEST_F(ArrayTest, AsyncInsertAsyncForEachAndAsyncGet) { ASSERT_EQ(values[i], i + 1 + (2 * kInitValue)); } shad::Array::Destroy(edsPtr->GetGlobalID()); -} +} \ No newline at end of file From c77ac933c485ce70605065617bd327c8adc39165 Mon Sep 17 00:00:00 2001 From: Vito G Castellana Date: Fri, 4 Feb 2022 19:06:19 -0800 Subject: [PATCH 2/2] [#206] Implemented AsyncApplyWithRetBuff in Array --- include/shad/data_structures/array.h | 48 +++++++++++++++++++ test/unit_tests/data_structures/array_test.cc | 41 ++++++++++++++++ 2 files changed, 89 insertions(+) diff --git a/include/shad/data_structures/array.h b/include/shad/data_structures/array.h index 169ecd93..a7c8082f 100644 --- a/include/shad/data_structures/array.h +++ b/include/shad/data_structures/array.h @@ -319,6 +319,11 @@ class Array : public AbstractDataStructure> { template void AsyncApply(rt::Handle &handle, const size_t pos, ApplyFunT &&function, Args &... args); + + template + void AsyncApplyWithRetBuff(rt::Handle &handle, const size_t pos, + ApplyFunT &&function, uint8_t* result, + uint32_t* resultSize, Args &... args); /// @brief Applies a user-defined function to every element /// in the specified range. @@ -617,6 +622,30 @@ class Array : public AbstractDataStructure> { std::get<4>(tuple), std::make_index_sequence{}); } + template + static void AsyncCallApplyWRBFun(rt::Handle &handle, ObjectID &oid, size_t pos, + size_t loffset, ApplyFunT function, + std::tuple &args, std::index_sequence, + uint8_t* result, uint32_t* resultSize) { + // Get a local instance on the remote node. + auto arrayPtr = Array::GetPtr(oid); + T &element = arrayPtr->data_[loffset]; + function(handle, pos, element, std::get(args)..., result, resultSize); + } + + template + static void AsyncApplyWRBFunWrapper(rt::Handle &handle, const Tuple &args, + uint8_t* result, uint32_t* resultSize) { + constexpr auto Size = std::tuple_size< + typename std::decay(args))>::type>::value; + + Tuple &tuple = const_cast(args); + + AsyncCallApplyWRBFun(handle, std::get<0>(tuple), std::get<1>(tuple), + std::get<2>(tuple), std::get<3>(tuple), + std::get<4>(tuple), std::make_index_sequence{}, result, resultSize); + } + template static void CallForEachInRangeFun(size_t i, ObjectID &oid, size_t pos, size_t lpos, ApplyFunT function, @@ -954,6 +983,25 @@ void Array::AsyncApply(rt::Handle &handle, const size_t pos, AsyncApplyFunWrapper, argsTuple); } +template +template +void Array::AsyncApplyWithRetBuff(rt::Handle &handle, const size_t pos, + ApplyFunT &&function, uint8_t* result, + uint32_t* resultSize, Args &... args) { + auto target = getTargetLocalityFromTargePosition(dataDistribution_, pos); + + using FunctionTy = void (*)(rt::Handle &, size_t, T &, Args & ..., uint8_t*, uint32_t*); + FunctionTy fn = std::forward(function); + using ArgsTuple = + std::tuple>; + ArgsTuple argsTuple{oid_, pos, target.second, fn, + std::tuple(args...)}; + + rt::asyncExecuteAtWithRetBuff(handle, target.first, + AsyncApplyWRBFunWrapper, argsTuple, + result, resultSize); +} + template template void Array::ForEachInRange(const size_t first, const size_t last, diff --git a/test/unit_tests/data_structures/array_test.cc b/test/unit_tests/data_structures/array_test.cc index d5fa6efb..3627c4bb 100644 --- a/test/unit_tests/data_structures/array_test.cc +++ b/test/unit_tests/data_structures/array_test.cc @@ -247,6 +247,15 @@ static void asyncApplyFun(shad::rt::Handle & /*unused*/, size_t i, size_t &elem, elem += kInitValue; } +static void asyncApplyWRBFun(shad::rt::Handle & /*unused*/, size_t i, size_t &elem, size_t &incr, + uint8_t* result, uint32_t* resultSize) { + ASSERT_EQ(incr, kInitValue); + ASSERT_EQ(elem, i + 1); + elem += kInitValue; + *resultSize = sizeof(elem); + memcpy(result, &elem, sizeof(elem)); +} + static void asyncApplyFunNoArgs(shad::rt::Handle & /*unused*/, size_t i, size_t &elem) { ASSERT_EQ(elem, i + kInitValue + 1); @@ -298,6 +307,38 @@ TEST_F(ArrayTest, AsyncInsertAsyncApplyAndAsyncGet) { shad::Array::Destroy(edsPtr->GetGlobalID()); } +TEST_F(ArrayTest, AsyncInsertAsyncApplyWRBAndAsyncGet) { + std::vector values(kArraySize); + auto edsPtr = shad::Array::Create(kArraySize, kInitValue); + + shad::rt::Handle handle; + for (size_t i = 0; i < kArraySize; i++) { + edsPtr->AsyncInsertAt(handle, i, i + 1); + } + shad::rt::waitForCompletion(handle); + + shad::rt::Handle handle2; + std::vector ret_values(kArraySize); + std::vector ret_sizes(kArraySize); + for (size_t i = 0; i < kArraySize; i++) { + edsPtr->AsyncApplyWithRetBuff(handle2, i, asyncApplyWRBFun, + (uint8_t*)(&ret_values[i]), + &ret_sizes[i], kInitValue); + } + shad::rt::waitForCompletion(handle2); + + shad::rt::Handle handle3; + for (size_t i = 0; i < kArraySize; i++) { + edsPtr->AsyncAt(handle3, i, &values[i]); + } + shad::rt::waitForCompletion(handle3); + for (size_t i = 0; i < kArraySize; i++) { + ASSERT_EQ(values[i], i + 1 + kInitValue); + ASSERT_EQ(ret_values[i], i + 1 + kInitValue); + } + shad::Array::Destroy(edsPtr->GetGlobalID()); +} + TEST_F(ArrayTest, AsyncInsertSyncForEachInRangeAndAsyncGet) { std::vector values(kArraySize); auto edsPtr = shad::Array::Create(kArraySize, kInitValue);