From 4df744677abee8d79de2bb9e423c86f0ee62ada2 Mon Sep 17 00:00:00 2001 From: Vito G Castellana Date: Tue, 16 May 2023 18:08:35 -0700 Subject: [PATCH] [#204] Implemented the TryBlockingApply function in Multimap --- include/shad/data_structures/local_multimap.h | 72 +++++++++++++++++++ include/shad/data_structures/multimap.h | 53 ++++++++++++++ 2 files changed, 125 insertions(+) diff --git a/include/shad/data_structures/local_multimap.h b/include/shad/data_structures/local_multimap.h index 566057f1..362eccff 100644 --- a/include/shad/data_structures/local_multimap.h +++ b/include/shad/data_structures/local_multimap.h @@ -250,6 +250,26 @@ class LocalMultimap { template void BlockingApply(const KTYPE &key, ApplyFunT &&function, Args &...args); + enum ApplyResult { FAILED, SUCCESS, NOT_FOUND}; + + /// @brief Tries to apply a user-defined function to every element of an entry's value + /// array. Thread safe wrt other operations. + /// @tparam ApplyFunT User-defined function type. The function prototype + /// should be: + /// @code + /// void(const KTYPE&, std::vector &, Args&); + /// @endcode + /// @tparam ...Args Types of the function arguments. + /// + /// @param[in] key The key. + /// @param function The function to apply. + /// @param args The function arguments. + /// @return SUCCESS if the function has been successfully applied, + /// NOT_FOUND if the key is not found, FAILED otherwise. + template + LocalMultimap::ApplyResult + TryBlockingApply(const KTYPE &key, ApplyFunT &&function, Args &...args); + /// @brief Asynchronously apply a user-defined function to a key-value pair. /// Thread safe wrt other operations. /// @tparam ApplyFunT User-defined function type. The function prototype @@ -737,6 +757,15 @@ class LocalMultimap { mapPtr->BlockingApply(key, function, std::get(args)...); } + template + static LocalMultimap::ApplyResult + CallTryBlockingApplyFun(LocalMultimap *mapPtr, + const KTYPE &key, ApplyFunT function, + std::tuple &args, + std::index_sequence) { + return mapPtr->TryBlockingApply(key, function, std::get(args)...); + } + template static void AsyncApplyFunWrapper(rt::Handle &handle, const Tuple &args) { constexpr auto Size = std::tuple_size< @@ -1304,6 +1333,49 @@ void LocalMultimap::BlockingApply(const KTYPE &key, release_inserter(bucketIdx); } +template +template +typename LocalMultimap::ApplyResult +LocalMultimap::TryBlockingApply( + const KTYPE &key, + ApplyFunT &&function, + Args &...args) { + size_t bucketIdx = shad::hash{}(key) % numBuckets_; + Bucket *bucket = &(buckets_array_[bucketIdx]); + allow_inserter(bucketIdx); + // loop over linked buckets + while (bucket != nullptr) { + // loop over entries in this bucket + for (size_t i = 0; i < bucket->BucketSize(); ++i) { + Entry *entry = &bucket->getEntry(i); + + // Stop at the first empty or pending insert entry. + if ((entry->state == EMPTY) or (entry->state == PENDING_INSERT)) { + break; + } + + // if key matches this entry's key, apply function; else continue inner for + // loop + if (KeyComp_(&entry->key, &key) == 0) { + // tagging as pending insert + if (!__sync_bool_compare_and_swap(&entry->state, USED, + PENDING_UPDATE)) { + return ApplyResult::FAILED; + } + function(key, entry->value, args...); + entry->state = USED; + release_inserter(bucketIdx); + return ApplyResult::SUCCESS; + } + } // Inner for loop + + // move to next bucket + bucket = bucket->next.get(); + } // Outer for loop + release_inserter(bucketIdx); + return ApplyResult::NOT_FOUND; +} + template template void LocalMultimap::AsyncBlockingApply(rt::Handle &h, diff --git a/include/shad/data_structures/multimap.h b/include/shad/data_structures/multimap.h index 57f1b455..cc7ef2a8 100644 --- a/include/shad/data_structures/multimap.h +++ b/include/shad/data_structures/multimap.h @@ -123,6 +123,15 @@ class Multimap : public AbstractDataStructure< Multimap{}(key) % rt::numLocalities(); + rt::Locality tgtLocality(targetId); + return tgtLocality; + } + /// @brief Number of keys in the multimap. /// @warning Calling this method may result in one-to-all /// communication among localities to retrieve consistent information. @@ -258,6 +267,21 @@ class Multimap : public AbstractDataStructure< Multimap void BlockingApply(const KTYPE &key, ApplyFunT &&function, Args &... args); + /// @brief Tries to apply a user-defined function to a key-value pair. + /// Thread safe wrt other operations. + /// @tparam ApplyFunT User-defined function type. The function prototype should be: + /// @code + /// void(const KTYPE&, std::vector &, Args&); + /// @endcode + /// @tparam ...Args Types of the function arguments. + /// + /// @param key The key. + /// @param function The function to apply. + /// @param args The function arguments. + /// @return SUCCESS if the function has been successfully applied, + /// NOT_FOUND if the key is not found, FAILED otherwise. + template + typename LMapT::ApplyResult TryBlockingApply(const KTYPE &key, ApplyFunT &&function, Args &... args); /// @brief Asynchronously apply a user-defined function to a key-value pair. /// Thread safe wrt other operations. @@ -846,6 +870,35 @@ void Multimap::BlockingApply(const KTYPE &key, } } +template +template +typename Multimap::LMapT::ApplyResult +Multimap::TryBlockingApply(const KTYPE &key, + ApplyFunT &&function, Args &... args) { + size_t targetId = shad::hash{}(key) % rt::numLocalities(); + rt::Locality targetLocality(targetId); + if (targetLocality == rt::thisLocality()) { + return localMultimap_.TryBlockingApply(key, function, args...); + + } else { + using FunctionTy = void (*)(const KTYPE &, std::vector &, Args &...); + FunctionTy fn = std::forward(function); + using ArgsTuple = std::tuple>; + ArgsTuple arguments(oid_, key, fn, std::tuple(args...)); + + auto feLambda = [](const ArgsTuple &args, bool* res) { + constexpr auto Size = std::tuple_size(args))>::type>::value; + ArgsTuple &tuple = const_cast(args); + LMapT *mapPtr = &(HmapT::GetPtr(std::get<0>(tuple))->localMultimap_); + *res = LMapT::CallTryBlockingApplyFun(mapPtr, std::get<1>(tuple), std::get<2>(tuple), + std::get<3>(tuple), std::make_index_sequence{}); + }; + typename LMapT::ApplyResult res; + rt::executeAtWithRet(targetLocality, feLambda, arguments, &res); + return res; + } +} + template template void Multimap::AsyncBlockingApply(