Skip to content

Commit

Permalink
[#204] Implemented Multimap AsyncApplyWithRetBuff and other minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
VitoCastellana committed Mar 7, 2022
1 parent a3615e9 commit 29014d4
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 11 deletions.
113 changes: 103 additions & 10 deletions include/shad/data_structures/local_multimap.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ class LocalMultimap {
///
/// @param[in] key The key.
/// @param[out] local_result The result of the lookup operation.
void LookupFromRemote(KTYPE &key, LookupRemoteResult *remote_result);
void LookupFromRemote(const KTYPE &key, LookupRemoteResult *remote_result);

/// @brief Apply a user-defined function to every element of an entry's value
/// array.
Expand Down Expand Up @@ -223,6 +223,28 @@ class LocalMultimap {
template <typename ApplyFunT, typename... Args>
void AsyncApply(rt::Handle &handle, const KTYPE &key, ApplyFunT &&function,
Args &...args);

/// @brief Asynchronously apply a user-defined function to a key-value pair.
/// @tparam ApplyFunT User-defined function type. The function prototype
/// should be:
/// @code
/// void(rt::Handle &handle, const KTYPE&, std::vector<VTYPE>&, Args&,
/// uint8_t*, uint32_t*);
/// @endcode
/// @tparam ...Args Types of the function arguments.
///
/// @param[in,out] handle Reference to the handle.
/// @param[in] key The key.
/// @param function The function to apply.
/// @param[out] result Pointer to the region where the result is
/// written; result must point to a valid memory allocation.
/// @param[out] resultSize Pointer to the region where the result size is
/// written; resultSize must point to a valid memory allocation.
/// @param args The function arguments.
template <typename ApplyFunT, typename... Args>
void AsyncApplyWithRetBuff(rt::Handle &handle, const KTYPE &key,
ApplyFunT &&function, uint8_t* result,
uint32_t* resultSize, Args &...args);

/// @brief Apply a user-defined function to each key-value pair.
/// @tparam ApplyFunT User-defined function type. The function prototype
Expand Down Expand Up @@ -601,6 +623,38 @@ class LocalMultimap {
return;
}

template <typename ApplyFunT, typename... Args, std::size_t... is>
static void AsyncCallApplyWithRetBuffFun(
rt::Handle &handle, LocalMultimap<KTYPE, VTYPE, KEY_COMPARE> *mapPtr,
const KTYPE &key, ApplyFunT function, std::tuple<Args...> &args,
std::index_sequence<is...>, uint8_t* result, uint32_t* resultSize) {
size_t bucketIdx = shad::hash<KTYPE>{}(key) % mapPtr->numBuckets_;
Bucket *bucket = &(mapPtr->buckets_array_[bucketIdx]);

while (bucket != nullptr) {
for (size_t i = 0; i < bucket->BucketSize(); ++i) {
Entry *entry = &bucket->getEntry(i);

// Stop at the first empty entry.
if (entry->state == EMPTY) break;

// Yield on pending entries.
while (entry->state == PENDING_INSERT) rt::impl::yield();

// Entry is USED.
if (mapPtr->KeyComp_(&entry->key, &key) == 0) {
while (entry->state == PENDING_INSERT) rt::impl::yield();
function(handle, key, entry->value,
std::get<is>(args)..., result, resultSize);
return;
}
}

bucket = bucket->next.get();
}
return;
}

template <typename ApplyFunT, typename... Args, std::size_t... is>
static void CallApplyFun(LocalMultimap<KTYPE, VTYPE, KEY_COMPARE> *mapPtr,
const KTYPE &key, ApplyFunT function,
Expand Down Expand Up @@ -642,6 +696,20 @@ class LocalMultimap {
std::get<2>(tuple), std::get<3>(tuple),
std::make_index_sequence<Size>{});
}

template <typename Tuple, typename... Args>
static void AsyncApplyWRBFunWrapper(rt::Handle &handle, const Tuple &args,
uint8_t* result, uint32_t* resultSize) {
constexpr auto Size = std::tuple_size<
typename std::decay<decltype(std::get<3>(args))>::type>::value;
Tuple &tuple = const_cast<Tuple &>(args);

AsyncCallApplyWithRetBuffFun(handle, std::get<0>(tuple),
std::get<1>(tuple),
std::get<2>(tuple), std::get<3>(tuple),
std::make_index_sequence<Size>{},
result, resultSize);
}
};

template <typename KTYPE, typename VTYPE, typename KEY_COMPARE>
Expand Down Expand Up @@ -705,7 +773,7 @@ bool LocalMultimap<KTYPE, VTYPE, KEY_COMPARE>::Lookup(const KTYPE &key,

template <typename KTYPE, typename VTYPE, typename KEY_COMPARE>
void LocalMultimap<KTYPE, VTYPE, KEY_COMPARE>::LookupFromRemote(
KTYPE &key, LookupRemoteResult *result) {
const KTYPE &key, LookupRemoteResult *result) {
size_t bucketIdx = shad::hash<KTYPE>{}(key) % numBuckets_;
Bucket *bucket = &(buckets_array_[bucketIdx]);
// concurrent inserts okay; concurrent delete not okay
Expand Down Expand Up @@ -959,7 +1027,7 @@ template <typename KTYPE, typename VTYPE, typename KEY_COMPARE>
template <typename ApplyFunT, typename... Args>
void LocalMultimap<KTYPE, VTYPE, KEY_COMPARE>::ForEachEntry(
ApplyFunT &&function, Args &...args) {
using FunctionTy = void (*)(const KTYPE &, VTYPE &, Args &...);
using FunctionTy = void (*)(const KTYPE &, std::vector<VTYPE> &, Args &...);
FunctionTy fn = std::forward<decltype(function)>(function);
using LMapPtr = LocalMultimap<KTYPE, VTYPE, KEY_COMPARE> *;
using ArgsTuple = std::tuple<LMapPtr, FunctionTy, std::tuple<Args...>>;
Expand All @@ -973,7 +1041,8 @@ template <typename KTYPE, typename VTYPE, typename KEY_COMPARE>
template <typename ApplyFunT, typename... Args>
void LocalMultimap<KTYPE, VTYPE, KEY_COMPARE>::AsyncForEachEntry(
rt::Handle &handle, ApplyFunT &&function, Args &...args) {
using FunctionTy = void (*)(rt::Handle &, const KTYPE &, VTYPE &, Args &...);
using FunctionTy = void (*)(rt::Handle &, const KTYPE &,
std::vector<VTYPE> &, Args &...);
FunctionTy fn = std::forward<decltype(function)>(function);
using LMapPtr = LocalMultimap<KTYPE, VTYPE, KEY_COMPARE> *;
using ArgsTuple = std::tuple<LMapPtr, FunctionTy, std::tuple<Args...>>;
Expand Down Expand Up @@ -1002,7 +1071,8 @@ template <typename KTYPE, typename VTYPE, typename KEY_COMPARE>
template <typename ApplyFunT, typename... Args>
void LocalMultimap<KTYPE, VTYPE, KEY_COMPARE>::AsyncForEachKey(
rt::Handle &handle, ApplyFunT &&function, Args &...args) {
using FunctionTy = void (*)(rt::Handle &, const KTYPE &, Args &...);
using FunctionTy = void (*)(rt::Handle &, const KTYPE &,
std::vector<VTYPE> &, Args &...);
FunctionTy fn = std::forward<decltype(function)>(function);
using LMapPtr = LocalMultimap<KTYPE, VTYPE, KEY_COMPARE> *;
using ArgsTuple = std::tuple<LMapPtr, FunctionTy, std::tuple<Args...>>;
Expand All @@ -1019,7 +1089,8 @@ void LocalMultimap<KTYPE, VTYPE, KEY_COMPARE>::AsyncApply(rt::Handle &handle,
const KTYPE &key,
ApplyFunT &&function,
Args &...args) {
using FunctionTy = void (*)(rt::Handle &, const KTYPE &, VTYPE &, Args &...);
using FunctionTy = void (*)(rt::Handle &, const KTYPE &,
std::vector<VTYPE> &, Args &...);
FunctionTy fn = std::forward<decltype(function)>(function);
using LMapPtr = LocalMultimap<KTYPE, VTYPE, KEY_COMPARE> *;
using ArgsTuple =
Expand All @@ -1030,6 +1101,26 @@ void LocalMultimap<KTYPE, VTYPE, KEY_COMPARE>::AsyncApply(rt::Handle &handle,
AsyncApplyFunWrapper<ArgsTuple, Args...>, argsTuple);
}

template <typename KTYPE, typename VTYPE, typename KEY_COMPARE>
template <typename ApplyFunT, typename... Args>
void LocalMultimap<KTYPE, VTYPE, KEY_COMPARE>::
AsyncApplyWithRetBuff(rt::Handle &handle, const KTYPE &key,
ApplyFunT &&function, uint8_t* result,
uint32_t* resultSize, Args &...args) {
using FunctionTy = void (*)(rt::Handle &, const KTYPE &,
std::vector<VTYPE> &, Args &...,
uint8_t*, uint32_t*);
FunctionTy fn = std::forward<decltype(function)>(function);
using LMapPtr = LocalMultimap<KTYPE, VTYPE, KEY_COMPARE> *;
using ArgsTuple =
std::tuple<LMapPtr, const KTYPE, FunctionTy, std::tuple<Args...>>;

ArgsTuple argsTuple(this, key, fn, std::tuple<Args...>(args...));
rt::asyncExecuteAtWithRetBuff(handle, rt::thisLocality(),
AsyncApplyWRBFunWrapper<ArgsTuple, Args...>, argsTuple,
result, resultSize);
}

template <typename KTYPE, typename VTYPE, typename KEY_COMPARE>
template <typename ELTYPE>
std::pair<typename LocalMultimap<KTYPE, VTYPE, KEY_COMPARE>::iterator, bool>
Expand All @@ -1052,8 +1143,9 @@ LocalMultimap<KTYPE, VTYPE, KEY_COMPARE>::Insert(const KTYPE &key,
size_ += 1;
entry->state = USED;
release_inserter(bucketIdx);
return std::make_pair(iterator(this, bucketIdx, i, bucket, entry),
true);
iterator itr(this, bucketIdx, i, bucket, entry,
entry->value.begin());
return std::make_pair(itr, true);
}

// Wait for some other thread to finish with this entry
Expand All @@ -1070,8 +1162,9 @@ LocalMultimap<KTYPE, VTYPE, KEY_COMPARE>::Insert(const KTYPE &key,
size_ += 1;
entry->state = USED;
release_inserter(bucketIdx);
return std::make_pair(iterator(this, bucketIdx, i, bucket, entry),
true);
iterator itr(this, bucketIdx, i, bucket, entry,
entry->value.begin());
return std::make_pair(itr, true);
}
} // Inner for loop

Expand Down
59 changes: 58 additions & 1 deletion include/shad/data_structures/multimap.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ class Multimap : public AbstractDataStructure< Multimap<KTYPE, VTYPE, KEY_COMPAR
///
/// @tparam ApplyFunT User-defined function type. The function prototype should be:
/// @code
/// void(rt::Handle &handle, const KTYPE&, std::vector<VTYPE> &, Args&);
/// void(const KTYPE&, std::vector<VTYPE>&, Args&);
/// @endcode
/// @tparam ...Args Types of the function arguments.
///
Expand All @@ -245,6 +245,28 @@ class Multimap : public AbstractDataStructure< Multimap<KTYPE, VTYPE, KEY_COMPAR
template <typename ApplyFunT, typename... Args>
void AsyncApply(rt::Handle &handle, const KTYPE &key, ApplyFunT &&function, Args &... args);

/// @brief Asynchronously apply a user-defined function to a key-value pair.
/// @tparam ApplyFunT User-defined function type. The function prototype
/// should be:
/// @code
/// void(rt::Handle &handle, const KTYPE&, std::vector<VTYPE>&, Args&,
/// uint8_t*, uint32_t*);
/// @endcode
/// @tparam ...Args Types of the function arguments.
///
/// @param[in,out] handle Reference to the handle.
/// @param[in] key The key.
/// @param function The function to apply.
/// @param[out] result Pointer to the region where the result is
/// written; result must point to a valid memory allocation.
/// @param[out] resultSize Pointer to the region where the result size is
/// written; resultSize must point to a valid memory allocation.
/// @param args The function arguments.
template <typename ApplyFunT, typename... Args>
void AsyncApplyWithRetBuff(rt::Handle &handle, const KTYPE &key,
ApplyFunT &&function, uint8_t* result,
uint32_t* resultSize, Args &... args);

/// @brief Apply a user-defined function to each key-value pair.
///
/// @tparam ApplyFunT User-defined function type. The function prototype should be:
Expand Down Expand Up @@ -742,6 +764,41 @@ void Multimap<KTYPE, VTYPE, KEY_COMPARE>::AsyncApply(
}
}

template <typename KTYPE, typename VTYPE, typename KEY_COMPARE>
template <typename ApplyFunT, typename... Args>
void Multimap<KTYPE, VTYPE, KEY_COMPARE>::AsyncApplyWithRetBuff(
rt::Handle &handle, const KTYPE &key,
ApplyFunT &&function, uint8_t* result,
uint32_t* resultSize, Args &... args) {
size_t targetId = shad::hash<KTYPE>{}(key) % rt::numLocalities();
rt::Locality targetLocality(targetId);

if (targetLocality == rt::thisLocality()) {
localMultimap_.AsyncApplyWithRetBuff(handle, key,
function, result,
resultSize, args...);
} else {
using FunctionTy = void (*)(rt::Handle &, const KTYPE &,
std::vector<VTYPE> &, Args &..., uint8_t*, uint32_t*);
FunctionTy fn = std::forward<decltype(function)>(function);
using ArgsTuple = std::tuple<ObjectID, const KTYPE, FunctionTy, std::tuple<Args...>>;

ArgsTuple arguments(oid_, key, fn, std::tuple<Args...>(args...));
auto feLambda = [](rt::Handle &handle, const ArgsTuple &args,
uint8_t* result, uint32_t* resultSize) {
constexpr auto Size = std::tuple_size<typename std::decay<decltype(std::get<3>(args))>::type>::value;
ArgsTuple &tuple(const_cast<ArgsTuple &>(args));
LMapT *mapPtr = &(HmapT::GetPtr(std::get<0>(tuple))->localMultimap_);
LMapT::AsyncCallApplyWithRetBuffFun(handle, mapPtr, std::get<1>(tuple),
std::get<2>(tuple), std::get<3>(tuple),
std::make_index_sequence<Size>{},
result, resultSize);
};
rt::asyncExecuteAtWithRetBuff(handle, targetLocality, feLambda,
arguments, result, resultSize);
}
}

template <typename MapT, typename T, typename NonConstT>
class multimap_iterator : public std::iterator<std::forward_iterator_tag, T> {
public:
Expand Down

0 comments on commit 29014d4

Please sign in to comment.