diff --git a/include/shad/data_structures/local_multimap.h b/include/shad/data_structures/local_multimap.h index 28e13c6e..4b03ae7f 100644 --- a/include/shad/data_structures/local_multimap.h +++ b/include/shad/data_structures/local_multimap.h @@ -187,7 +187,7 @@ class LocalMultimap { /// /// @param[in] key The key. /// @param[out] local_result The result of the lookup operation. - void LookupFromRemote(KTYPE &key, LookupRemoteResult *remote_result); + void LookupFromRemote(const KTYPE &key, LookupRemoteResult *remote_result); /// @brief Apply a user-defined function to every element of an entry's value /// array. @@ -223,6 +223,28 @@ class LocalMultimap { template void AsyncApply(rt::Handle &handle, const KTYPE &key, ApplyFunT &&function, Args &...args); + + /// @brief Asynchronously apply a user-defined function to a key-value pair. + /// @tparam ApplyFunT User-defined function type. The function prototype + /// should be: + /// @code + /// void(rt::Handle &handle, const KTYPE&, std::vector&, Args&, + /// uint8_t*, uint32_t*); + /// @endcode + /// @tparam ...Args Types of the function arguments. + /// + /// @param[in,out] handle Reference to the handle. + /// @param[in] key The key. + /// @param function The function to apply. + /// @param[out] result Pointer to the region where the result is + /// written; result must point to a valid memory allocation. + /// @param[out] resultSize Pointer to the region where the result size is + /// written; resultSize must point to a valid memory allocation. + /// @param args The function arguments. + template + void AsyncApplyWithRetBuff(rt::Handle &handle, const KTYPE &key, + ApplyFunT &&function, uint8_t* result, + uint32_t* resultSize, Args &...args); /// @brief Apply a user-defined function to each key-value pair. /// @tparam ApplyFunT User-defined function type. The function prototype @@ -601,6 +623,38 @@ class LocalMultimap { return; } + template + static void AsyncCallApplyWithRetBuffFun( + rt::Handle &handle, LocalMultimap *mapPtr, + const KTYPE &key, ApplyFunT function, std::tuple &args, + std::index_sequence, uint8_t* result, uint32_t* resultSize) { + size_t bucketIdx = shad::hash{}(key) % mapPtr->numBuckets_; + Bucket *bucket = &(mapPtr->buckets_array_[bucketIdx]); + + while (bucket != nullptr) { + for (size_t i = 0; i < bucket->BucketSize(); ++i) { + Entry *entry = &bucket->getEntry(i); + + // Stop at the first empty entry. + if (entry->state == EMPTY) break; + + // Yield on pending entries. + while (entry->state == PENDING_INSERT) rt::impl::yield(); + + // Entry is USED. + if (mapPtr->KeyComp_(&entry->key, &key) == 0) { + while (entry->state == PENDING_INSERT) rt::impl::yield(); + function(handle, key, entry->value, + std::get(args)..., result, resultSize); + return; + } + } + + bucket = bucket->next.get(); + } + return; + } + template static void CallApplyFun(LocalMultimap *mapPtr, const KTYPE &key, ApplyFunT function, @@ -642,6 +696,20 @@ class LocalMultimap { std::get<2>(tuple), std::get<3>(tuple), std::make_index_sequence{}); } + + template + static void AsyncApplyWRBFunWrapper(rt::Handle &handle, const Tuple &args, + uint8_t* result, uint32_t* resultSize) { + constexpr auto Size = std::tuple_size< + typename std::decay(args))>::type>::value; + Tuple &tuple = const_cast(args); + + AsyncCallApplyWithRetBuffFun(handle, std::get<0>(tuple), + std::get<1>(tuple), + std::get<2>(tuple), std::get<3>(tuple), + std::make_index_sequence{}, + result, resultSize); + } }; template @@ -705,7 +773,7 @@ bool LocalMultimap::Lookup(const KTYPE &key, template void LocalMultimap::LookupFromRemote( - KTYPE &key, LookupRemoteResult *result) { + const KTYPE &key, LookupRemoteResult *result) { size_t bucketIdx = shad::hash{}(key) % numBuckets_; Bucket *bucket = &(buckets_array_[bucketIdx]); // concurrent inserts okay; concurrent delete not okay @@ -959,7 +1027,7 @@ template template void LocalMultimap::ForEachEntry( ApplyFunT &&function, Args &...args) { - using FunctionTy = void (*)(const KTYPE &, VTYPE &, Args &...); + using FunctionTy = void (*)(const KTYPE &, std::vector &, Args &...); FunctionTy fn = std::forward(function); using LMapPtr = LocalMultimap *; using ArgsTuple = std::tuple>; @@ -973,7 +1041,8 @@ template template void LocalMultimap::AsyncForEachEntry( rt::Handle &handle, ApplyFunT &&function, Args &...args) { - using FunctionTy = void (*)(rt::Handle &, const KTYPE &, VTYPE &, Args &...); + using FunctionTy = void (*)(rt::Handle &, const KTYPE &, + std::vector &, Args &...); FunctionTy fn = std::forward(function); using LMapPtr = LocalMultimap *; using ArgsTuple = std::tuple>; @@ -1002,7 +1071,8 @@ template template void LocalMultimap::AsyncForEachKey( rt::Handle &handle, ApplyFunT &&function, Args &...args) { - using FunctionTy = void (*)(rt::Handle &, const KTYPE &, Args &...); + using FunctionTy = void (*)(rt::Handle &, const KTYPE &, + std::vector &, Args &...); FunctionTy fn = std::forward(function); using LMapPtr = LocalMultimap *; using ArgsTuple = std::tuple>; @@ -1019,7 +1089,8 @@ void LocalMultimap::AsyncApply(rt::Handle &handle, const KTYPE &key, ApplyFunT &&function, Args &...args) { - using FunctionTy = void (*)(rt::Handle &, const KTYPE &, VTYPE &, Args &...); + using FunctionTy = void (*)(rt::Handle &, const KTYPE &, + std::vector &, Args &...); FunctionTy fn = std::forward(function); using LMapPtr = LocalMultimap *; using ArgsTuple = @@ -1030,6 +1101,26 @@ void LocalMultimap::AsyncApply(rt::Handle &handle, AsyncApplyFunWrapper, argsTuple); } +template +template +void LocalMultimap:: +AsyncApplyWithRetBuff(rt::Handle &handle, const KTYPE &key, + ApplyFunT &&function, uint8_t* result, + uint32_t* resultSize, Args &...args) { + using FunctionTy = void (*)(rt::Handle &, const KTYPE &, + std::vector &, Args &..., + uint8_t*, uint32_t*); + FunctionTy fn = std::forward(function); + using LMapPtr = LocalMultimap *; + using ArgsTuple = + std::tuple>; + + ArgsTuple argsTuple(this, key, fn, std::tuple(args...)); + rt::asyncExecuteAtWithRetBuff(handle, rt::thisLocality(), + AsyncApplyWRBFunWrapper, argsTuple, + result, resultSize); +} + template template std::pair::iterator, bool> @@ -1052,8 +1143,9 @@ LocalMultimap::Insert(const KTYPE &key, size_ += 1; entry->state = USED; release_inserter(bucketIdx); - return std::make_pair(iterator(this, bucketIdx, i, bucket, entry), - true); + iterator itr(this, bucketIdx, i, bucket, entry, + entry->value.begin()); + return std::make_pair(itr, true); } // Wait for some other thread to finish with this entry @@ -1070,8 +1162,9 @@ LocalMultimap::Insert(const KTYPE &key, size_ += 1; entry->state = USED; release_inserter(bucketIdx); - return std::make_pair(iterator(this, bucketIdx, i, bucket, entry), - true); + iterator itr(this, bucketIdx, i, bucket, entry, + entry->value.begin()); + return std::make_pair(itr, true); } } // Inner for loop diff --git a/include/shad/data_structures/multimap.h b/include/shad/data_structures/multimap.h index 7527c780..e3f4293d 100644 --- a/include/shad/data_structures/multimap.h +++ b/include/shad/data_structures/multimap.h @@ -234,7 +234,7 @@ class Multimap : public AbstractDataStructure< Multimap &, Args&); + /// void(const KTYPE&, std::vector&, Args&); /// @endcode /// @tparam ...Args Types of the function arguments. /// @@ -245,6 +245,28 @@ class Multimap : public AbstractDataStructure< Multimap void AsyncApply(rt::Handle &handle, const KTYPE &key, ApplyFunT &&function, Args &... args); + /// @brief Asynchronously apply a user-defined function to a key-value pair. + /// @tparam ApplyFunT User-defined function type. The function prototype + /// should be: + /// @code + /// void(rt::Handle &handle, const KTYPE&, std::vector&, Args&, + /// uint8_t*, uint32_t*); + /// @endcode + /// @tparam ...Args Types of the function arguments. + /// + /// @param[in,out] handle Reference to the handle. + /// @param[in] key The key. + /// @param function The function to apply. + /// @param[out] result Pointer to the region where the result is + /// written; result must point to a valid memory allocation. + /// @param[out] resultSize Pointer to the region where the result size is + /// written; resultSize must point to a valid memory allocation. + /// @param args The function arguments. + template + void AsyncApplyWithRetBuff(rt::Handle &handle, const KTYPE &key, + ApplyFunT &&function, uint8_t* result, + uint32_t* resultSize, Args &... args); + /// @brief Apply a user-defined function to each key-value pair. /// /// @tparam ApplyFunT User-defined function type. The function prototype should be: @@ -742,6 +764,41 @@ void Multimap::AsyncApply( } } +template +template +void Multimap::AsyncApplyWithRetBuff( + rt::Handle &handle, const KTYPE &key, + ApplyFunT &&function, uint8_t* result, + uint32_t* resultSize, Args &... args) { + size_t targetId = shad::hash{}(key) % rt::numLocalities(); + rt::Locality targetLocality(targetId); + + if (targetLocality == rt::thisLocality()) { + localMultimap_.AsyncApplyWithRetBuff(handle, key, + function, result, + resultSize, args...); + } else { + using FunctionTy = void (*)(rt::Handle &, const KTYPE &, + std::vector &, Args &..., uint8_t*, uint32_t*); + FunctionTy fn = std::forward(function); + using ArgsTuple = std::tuple>; + + ArgsTuple arguments(oid_, key, fn, std::tuple(args...)); + auto feLambda = [](rt::Handle &handle, const ArgsTuple &args, + uint8_t* result, uint32_t* resultSize) { + constexpr auto Size = std::tuple_size(args))>::type>::value; + ArgsTuple &tuple(const_cast(args)); + LMapT *mapPtr = &(HmapT::GetPtr(std::get<0>(tuple))->localMultimap_); + LMapT::AsyncCallApplyWithRetBuffFun(handle, mapPtr, std::get<1>(tuple), + std::get<2>(tuple), std::get<3>(tuple), + std::make_index_sequence{}, + result, resultSize); + }; + rt::asyncExecuteAtWithRetBuff(handle, targetLocality, feLambda, + arguments, result, resultSize); + } +} + template class multimap_iterator : public std::iterator { public: