Skip to content

Commit

Permalink
several changes to improve locking
Browse files Browse the repository at this point in the history
  • Loading branch information
ct-clmsn committed Apr 30, 2024
1 parent 91477c2 commit 98f65d8
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ namespace hpx { namespace util {
static int is_initialized_;

static int init_val_;
//static std::vector<std::shared_ptr<hpx::spinlock>> segment_mutex;
static std::vector<openshmem_seginfo_t> segments;
static std::uint8_t* shmem_buffer;
static std::size_t this_rank;
};
}} // namespace hpx::util

Expand Down
24 changes: 7 additions & 17 deletions libs/core/openshmem_base/src/openshmem_environment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ namespace hpx::util {
int openshmem_environment::provided_threading_flag_ = 0;
int openshmem_environment::is_initialized_ = -1;
int openshmem_environment::init_val_ = 0;
std::size_t openshmem_environment::this_rank = -1;
//std::vector<std::shared_ptr<hpx::spinlock>> openshmem_environment::segment_mutex{};
std::vector<openshmem_seginfo_t> openshmem_environment::segments{};
std::uint8_t* hpx::util::openshmem_environment::shmem_buffer = nullptr;
Expand Down Expand Up @@ -163,14 +164,6 @@ namespace hpx::util {
//
const std::size_t page_count = size();

// allocate a mutex per-page
//
/*
openshmem_environment::segment_mutex.reserve(page_count);
for(std::size_t i = 0; i < page_count; ++i) {
openshmem_environment::segment_mutex.emplace_back(std::make_shared<hpx::spinlock>());
}
*/
// symmetric allocation for number of pages total + number of signals
//
// (allocate page_size * number of PEs * number of threads) + (number of PEs * number of threads * 2 [for signaling])
Expand Down Expand Up @@ -224,7 +217,11 @@ namespace hpx::util {
if (enabled_)
return; // don't call twice

int this_rank = -1;
if (enabled()) {
scoped_lock l;
this_rank = static_cast<int>(shmem_my_pe());
}

has_called_init_ = false;

// We assume to use the OpenSHMEM parcelport if it is not explicitly disabled
Expand Down Expand Up @@ -256,8 +253,6 @@ namespace hpx::util {
rtcfg.add_entry("hpx.parcel.openshmem.multithreaded", "0");
}

this_rank = rank();

#if defined(HPX_HAVE_NETWORKING)
if (this_rank == 0)
{
Expand Down Expand Up @@ -436,12 +431,7 @@ namespace hpx::util {

int openshmem_environment::rank()
{
int res(-1);
if (enabled()) {
scoped_lock l;
res = static_cast<int>(shmem_my_pe());
}
return res;
return this_rank;
}

openshmem_environment::scoped_lock::scoped_lock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ namespace hpx::parcelset::policies::openshmem {
receiver_connection(int src, header h, Parcelport& pp) noexcept
: state_(initialized)
, src_(src)
, self_(-1)
, header_(h)
, request_ptr_(false)
, num_bytes(0)
Expand All @@ -58,7 +57,6 @@ namespace hpx::parcelset::policies::openshmem {
{
header_.assert_valid();

self_ = hpx::util::openshmem_environment::rank();
num_bytes = header_.numbytes();

#if defined(HPX_HAVE_PARCELPORT_COUNTERS)
Expand Down Expand Up @@ -108,7 +106,7 @@ namespace hpx::parcelset::policies::openshmem {

bool receive_transmission_chunks()
{
const auto idx = self_;
const auto idx = hpx::util::openshmem_environment::rank();

// determine the size of the chunk buffer
std::size_t num_zero_copy_chunks = static_cast<std::size_t>(
Expand All @@ -122,15 +120,13 @@ namespace hpx::parcelset::policies::openshmem {
{
buffer_.chunks_.resize(num_zero_copy_chunks);
{
//std::lock_guard<hpx::spinlock> l(*(*hpx::util::openshmem_environment::segments[idx].mut));

hpx::util::openshmem_environment::wait_until(
1, hpx::util::openshmem_environment::segments[idx].rcv);

hpx::util::openshmem_environment::get(
reinterpret_cast<std::uint8_t*>(
buffer_.transmission_chunks_.data()),
self_,
idx,
hpx::util::openshmem_environment::segments[idx].beg_addr,
static_cast<int>(buffer_.transmission_chunks_.size() *
sizeof(buffer_type::transmission_chunk_type)));
Expand Down Expand Up @@ -161,16 +157,14 @@ namespace hpx::parcelset::policies::openshmem {
}
else
{
const auto idx = self_;

//std::lock_guard<hpx::spinlock> l(*(*(hpx::util::openshmem_environment::segments[idx].mut)));
const auto idx = hpx::util::openshmem_environment::rank();

hpx::util::openshmem_environment::wait_until(
1, hpx::util::openshmem_environment::segments[idx].rcv);

hpx::util::openshmem_environment::get(
reinterpret_cast<std::uint8_t*>(buffer_.data_.data()),
self_,
idx,
hpx::util::openshmem_environment::segments[idx].beg_addr,
buffer_.data_.size());

Expand All @@ -189,9 +183,7 @@ namespace hpx::parcelset::policies::openshmem {
std::size_t cidx = 0;
std::size_t chunk_size = 0;

const auto idx = self_;

//std::lock_guard<hpx::spinlock> l(*(*(hpx::util::openshmem_environment::segments[idx].mut)));
const auto idx = hpx::util::openshmem_environment::rank();

while (chunks_idx_ < buffer_.chunks_.size())
{
Expand All @@ -211,7 +203,7 @@ namespace hpx::parcelset::policies::openshmem {
1, hpx::util::openshmem_environment::segments[idx].rcv);

hpx::util::openshmem_environment::get(
reinterpret_cast<std::uint8_t*>(c.data()), self_,
reinterpret_cast<std::uint8_t*>(c.data()), idx,
hpx::util::openshmem_environment::segments[idx].beg_addr,
c.size());

Expand All @@ -236,9 +228,11 @@ namespace hpx::parcelset::policies::openshmem {

bool request_done() noexcept
{
//const auto idx = self_;
util::openshmem_environment::scoped_try_lock const l;
if(!l.locked) { return false; }

hpx::util::openshmem_environment::quiet();

//const bool l = (*hpx::util::openshmem_environment::segments[idx].mut)->try_lock();
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace hpx::parcelset::policies::openshmem {
// different versions of clang-format disagree
// clang-format off
sender() noexcept
: connections_mtx_(), connections_(), next_free_tag_mtx_()
: connections_mtx_(), connections_()
{
}
// clang-format on
Expand Down Expand Up @@ -105,7 +105,6 @@ namespace hpx::parcelset::policies::openshmem {
hpx::spinlock connections_mtx_;
connection_list connections_;

hpx::spinlock next_free_tag_mtx_;
};
} // namespace hpx::parcelset::policies::openshmem

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ namespace hpx::parcelset::policies::openshmem {
const auto idx = dst_;

util::openshmem_environment::scoped_lock l;
//std::lock_guard<hpx::spinlock> l(*(*(hpx::util::openshmem_environment::segments[idx].mut)));

// put from this localities openshmem shared memory segment
// into the remote locality (dst_)'s shared memory segment
Expand Down Expand Up @@ -186,7 +185,6 @@ namespace hpx::parcelset::policies::openshmem {
const auto idx = dst_;

util::openshmem_environment::scoped_lock l;
//std::lock_guard<hpx::spinlock> l(*(*(hpx::util::openshmem_environment::segments[idx].mut)));

hpx::util::openshmem_environment::put_signal(
reinterpret_cast<std::uint8_t*>(chunks.data()), dst_,
Expand Down Expand Up @@ -214,7 +212,6 @@ namespace hpx::parcelset::policies::openshmem {
const auto idx = dst_;

util::openshmem_environment::scoped_lock l;
//std::lock_guard<hpx::spinlock> l(*(*(hpx::util::openshmem_environment::segments[idx].mut)));

hpx::util::openshmem_environment::put_signal(
reinterpret_cast<std::uint8_t*>(buffer_.data_.data()), dst_,
Expand All @@ -234,11 +231,6 @@ namespace hpx::parcelset::policies::openshmem {

const auto idx = dst_;

/*
std::lock_guard<hpx::spinlock> l(
*(*(hpx::util::openshmem_environment::segments[idx].mut))
);
*/
while (chunks_idx_ < buffer_.chunks_.size())
{
serialization::serialization_chunk& c =
Expand Down Expand Up @@ -298,7 +290,7 @@ namespace hpx::parcelset::policies::openshmem {

bool request_done()
{
util::openshmem_environment::scoped_lock l;
util::openshmem_environment::scoped_try_lock const l;
if(!l.locked) { return false; }

hpx::util::openshmem_environment::quiet();
Expand Down

0 comments on commit 98f65d8

Please sign in to comment.