From 41e822e5eed25dd9cf70a967324ace2fbf13bbe5 Mon Sep 17 00:00:00 2001 From: Vito G Castellana Date: Fri, 24 Mar 2023 17:35:25 -0700 Subject: [PATCH] [#204] Implemented Blocking Apply functions in Multimap --- include/shad/data_structures/local_multimap.h | 131 ++++++++++++++++++ include/shad/data_structures/multimap.h | 88 ++++++++++++ 2 files changed, 219 insertions(+) diff --git a/include/shad/data_structures/local_multimap.h b/include/shad/data_structures/local_multimap.h index ad176387..6a372e8a 100644 --- a/include/shad/data_structures/local_multimap.h +++ b/include/shad/data_structures/local_multimap.h @@ -229,6 +229,38 @@ class LocalMultimap { template void AsyncApply(rt::Handle &handle, const KTYPE &key, ApplyFunT &&function, Args &...args); + + /// @brief Apply a user-defined function to every element of an entry's value + /// array. Thread safe wrt other operations. + /// @tparam ApplyFunT User-defined function type. The function prototype + /// should be: + /// @code + /// void(const KTYPE&, std::vector &, Args&); + /// @endcode + /// @tparam ...Args Types of the function arguments. + /// + /// @param[in] key The key. + /// @param function The function to apply. + /// @param args The function arguments. + template + void BlockingApply(const KTYPE &key, ApplyFunT &&function, Args &...args); + + /// @brief Asynchronously apply a user-defined function to a key-value pair. + /// Thread safe wrt other operations. + /// @tparam ApplyFunT User-defined function type. The function prototype + /// should be: + /// @code + /// void(rt::Handle &handle, std::vector &, VTYPE&, Args&); + /// @endcode + /// @tparam ...Args Types of the function arguments. + /// + /// @param[in,out] handle Reference to the handle. + /// @param[in] key The key. + /// @param function The function to apply. + /// @param args The function arguments. + template + void AsyncBlockingApply(rt::Handle &handle, const KTYPE &key, ApplyFunT &&function, + Args &...args); /// @brief Asynchronously apply a user-defined function to a key-value pair. /// @tparam ApplyFunT User-defined function type. The function prototype @@ -693,6 +725,14 @@ class LocalMultimap { return; } + template + static void CallBlockingApplyFun(LocalMultimap *mapPtr, + const KTYPE &key, ApplyFunT function, + std::tuple &args, + std::index_sequence) { + mapPtr->BlockingApply(key, function, std::get(args)...); + } + template static void AsyncApplyFunWrapper(rt::Handle &handle, const Tuple &args) { constexpr auto Size = std::tuple_size< @@ -704,6 +744,16 @@ class LocalMultimap { std::make_index_sequence{}); } + + template + static void CallAsyncBlockingApplyFun(rt::Handle &h, + LocalMultimap *mapPtr, + const KTYPE &key, ApplyFunT function, + std::tuple &args, + std::index_sequence) { + mapPtr->AsyncBlockingApply(h, key, function, std::get(args)...); + } + template static void AsyncApplyWRBFunWrapper(rt::Handle &handle, const Tuple &args, uint8_t* result, uint32_t* resultSize) { @@ -1213,6 +1263,87 @@ void LocalMultimap::AsyncInsert( rt::asyncExecuteAt(handle, rt::thisLocality(), insertLambda, args); } +template +template +void LocalMultimap::BlockingApply(const KTYPE &key, + ApplyFunT &&function, + Args &...args) { + size_t bucketIdx = shad::hash{}(key) % numBuckets_; + Bucket *bucket = &(buckets_array_[bucketIdx]); + allow_inserter(bucketIdx); + // loop over linked buckets + while (bucket != nullptr) { + // loop over entries in this bucket + for (size_t i = 0; i < bucket->BucketSize(); ++i) { + Entry *entry = &bucket->getEntry(i); + + // Stop at the first empty or pending insert entry. + if ((entry->state == EMPTY) or (entry->state == PENDING_INSERT)) { + break; + } + + // if key matches this entry's key, apply function; else continue inner for + // loop + if (KeyComp_(&entry->key, &key) == 0) { + // tagging as pending insert + while (!__sync_bool_compare_and_swap(&entry->state, USED, + PENDING_INSERT)) { + rt::impl::yield(); + } + function(key, entry->value, args...); + entry->state = USED; + release_inserter(bucketIdx); + return; + } + } // Inner for loop + + // move to next bucket + bucket = bucket->next.get(); + } // Outer for loop + release_inserter(bucketIdx); +} + +template +template +void LocalMultimap::AsyncBlockingApply(rt::Handle &h, + const KTYPE &key, + ApplyFunT &&function, + Args &...args) { + size_t bucketIdx = shad::hash{}(key) % numBuckets_; + Bucket *bucket = &(buckets_array_[bucketIdx]); + allow_inserter(bucketIdx); + // loop over linked buckets + while (bucket != nullptr) { + // loop over entries in this bucket + for (size_t i = 0; i < bucket->BucketSize(); ++i) { + Entry *entry = &bucket->getEntry(i); + + // Stop at the first empty or pending insert entry. + if ((entry->state == EMPTY) or (entry->state == PENDING_INSERT)) { + break; + } + + // if key matches this entry's key, apply function; else continue inner for + // loop + if (KeyComp_(&entry->key, &key) == 0) { + // tagging as pending insert + while (!__sync_bool_compare_and_swap(&entry->state, USED, + PENDING_INSERT)) { + rt::impl::yield(); + } + function(h, key, entry->value, args...); + entry->state = USED; + release_inserter(bucketIdx); + return; + } + } // Inner for loop + + // move to next bucket + bucket = bucket->next.get(); + } // Outer for loop + release_inserter(bucketIdx); +} + template class lmultimap_iterator : public std::iterator { template diff --git a/include/shad/data_structures/multimap.h b/include/shad/data_structures/multimap.h index cca6d714..fa998592 100644 --- a/include/shad/data_structures/multimap.h +++ b/include/shad/data_structures/multimap.h @@ -245,6 +245,37 @@ class Multimap : public AbstractDataStructure< Multimap void AsyncApply(rt::Handle &handle, const KTYPE &key, ApplyFunT &&function, Args &... args); + /// @brief Apply a user-defined function to a key-value pair. + /// Thread safe wrt other operations. + /// @tparam ApplyFunT User-defined function type. The function prototype should be: + /// @code + /// void(const KTYPE&, std::vector &, Args&); + /// @endcode + /// @tparam ...Args Types of the function arguments. + /// + /// @param key The key. + /// @param function The function to apply. + /// @param args The function arguments. + template + void BlockingApply(const KTYPE &key, ApplyFunT &&function, Args &... args); + + + /// @brief Asynchronously apply a user-defined function to a key-value pair. + /// Thread safe wrt other operations. + /// @tparam ApplyFunT User-defined function type. The function prototype should be: + /// @code + /// void(rt::Handle &h, const KTYPE&, std::vector&, Args&); + /// @endcode + /// @tparam ...Args Types of the function arguments. + /// + /// @param[in,out] handle Reference to the handle. + /// @param key The key. + /// @param function The function to apply. + /// @param args The function arguments. + template + void AsyncBlockingApply(rt::Handle &handle, const KTYPE &key, + ApplyFunT &&function, Args &... args); + /// @brief Asynchronously apply a user-defined function to a key-value pair. /// @tparam ApplyFunT User-defined function type. The function prototype /// should be: @@ -797,6 +828,63 @@ void Multimap::AsyncApply( } } +template +template +void Multimap::BlockingApply(const KTYPE &key, + ApplyFunT &&function, Args &... args) { + size_t targetId = shad::hash{}(key) % rt::numLocalities(); + rt::Locality targetLocality(targetId); + + if (targetLocality == rt::thisLocality()) { + localMultimap_.BlockingApply(key, function, args...); + + } else { + using FunctionTy = void (*)(const KTYPE &, std::vector &, Args &...); + FunctionTy fn = std::forward(function); + + using ArgsTuple = std::tuple>; + ArgsTuple arguments(oid_, key, fn, std::tuple(args...)); + + auto feLambda = [](const ArgsTuple &args) { + constexpr auto Size = std::tuple_size(args))>::type>::value; + ArgsTuple &tuple = const_cast(args); + LMapT *mapPtr = &(HmapT::GetPtr(std::get<0>(tuple))->localMultimap_); + //map_ptr->BlockingApply(key, function, std::get(args)...); + LMapT::CallBlockingApplyFun(mapPtr, std::get<1>(tuple), std::get<2>(tuple), + std::get<3>(tuple), std::make_index_sequence{}); + }; + rt::executeAt(targetLocality, feLambda, arguments); + } +} + +template +template +void Multimap::AsyncBlockingApply( + rt::Handle &handle, const KTYPE &key, ApplyFunT &&function, Args &... args) { + size_t targetId = shad::hash{}(key) % rt::numLocalities(); + rt::Locality targetLocality(targetId); + + if (targetLocality == rt::thisLocality()) { + localMultimap_.AsyncBlockingApply(handle, key, function, args...); + + } else { + using FunctionTy = void (*)(rt::Handle &, const KTYPE &, std::vector &, Args &...); + FunctionTy fn = std::forward(function); + using ArgsTuple = std::tuple>; + + ArgsTuple arguments(oid_, key, fn, std::tuple(args...)); + auto feLambda = [](rt::Handle &handle, const ArgsTuple &args) { + constexpr auto Size = std::tuple_size(args))>::type>::value; + ArgsTuple &tuple(const_cast(args)); + LMapT *mapPtr = &(HmapT::GetPtr(std::get<0>(tuple))->localMultimap_); + LMapT::CallAsyncBlockingApplyFun(handle, mapPtr, std::get<1>(tuple), + std::get<2>(tuple), std::get<3>(tuple), + std::make_index_sequence{}); + }; + rt::asyncExecuteAt(handle, targetLocality, feLambda, arguments); + } +} + template template void Multimap::AsyncApplyWithRetBuff(