Skip to content

Commit

Permalink
Transfer and improve Transactions (#99)
Browse files Browse the repository at this point in the history
* Transfer and improve opencmw::TimingCtx

This is by large a port of the opencmw TimingCtx, but with some improvements: 
 * The most significant change is that the parse and match functions are not hardcoded anymore, but can instead be passed over from the outside via a predicate. This is necessary to ensure that TimingCtx stays generic enough so that
it can be used for arbitrary use-cases.
 *  Furthermore, we no longer hardcode any ids but instead use a `pmtv::map_t` to allow the user to store arbitrary identifiers for their usecase.
* Transfer opencmw::ReaderWriterLock
* Transfer and improve opencmw Transaction SettingBase
   This required some extensive changes and adaptions to work with the more general version of TimingCtx and to use the circular buffer instead of the opencmw::RingBuffer.

 * This also adds a new Utils function to find out the true cache line size. It uses `std::hardware_destructive_interference_size` and falls back to 64, which is the cache line size for most x86_64 targets.
 * The old implementation used redundant locking mechanisms and unnecessarily complicated circular buffers.
    Instead this now uses just a single recursive lock and a normal collection, which simplifies this a lot and hopefully makes emscripten more happy as well.
* This also integrates the multiplexed settings into the Node as a drop-in replacement for the basic settings.
* Call the matching predicate over multiple rounds
   This allows for hierarchical matching, where in the first round only very refined matching is performed and then the condition becomes increasingly looser.
  • Loading branch information
vimpostor authored Sep 29, 2023
1 parent b2d5c75 commit 8a2f060
Show file tree
Hide file tree
Showing 8 changed files with 798 additions and 0 deletions.
6 changes: 6 additions & 0 deletions include/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,12 @@ struct node : protected std::tuple<Arguments...> {
return *_settings;
}

template<typename T>
void
setSettings(std::unique_ptr<T> &settings) {
_settings = std::move(settings);
}

template<std::size_t Index, typename Self>
friend constexpr auto &
input_port(Self *self) noexcept;
Expand Down
111 changes: 111 additions & 0 deletions include/reader_writer_lock.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
#ifndef READER_WRITER_LOCK_HPP
#define READER_WRITER_LOCK_HPP

#include "utils.hpp"
#include <atomic>
#include <cstdint>

namespace fair::graph {

enum class ReaderWriterLockType { READ, WRITE };

/**
* @brief ReaderWriterLock is multi-reader-multi-writer atomic lock meant to protect a resource
* in situations where the thread is not allowed to block.
*
* The lock is implemented using atomic CAS-loops on a counter, which is
* incremented (/decremented) when a thread acquires a read (/write) lock, and
* decremented (/incremented) when the thread releases the read (/write) lock.
*
* N.B. The lock is unlocked when the counter reaches 0.
*/
class ReaderWriterLock {
alignas(fair::meta::kCacheLine) mutable std::atomic<std::int64_t> _activeReaderCount{ 0 };

public:
ReaderWriterLock() = default;

[[nodiscard]] std::int64_t
value() const noexcept {
return std::atomic_load_explicit(&_activeReaderCount, std::memory_order_acquire);
}

template<ReaderWriterLockType lockType>
std::int64_t
tryLock() const noexcept {
std::int64_t expected = _activeReaderCount.load(std::memory_order_relaxed);
if constexpr (lockType == ReaderWriterLockType::READ) {
if (expected < 0L) {
expected = 0L;
}
return std::atomic_compare_exchange_strong(&_activeReaderCount, &expected, expected + 1L);
} else {
if (expected > 0L) {
expected = 0L;
}
return std::atomic_compare_exchange_strong(&_activeReaderCount, &expected, expected - 1L);
}
}

template<ReaderWriterLockType lockType>
std::int64_t
lock() const noexcept {
if constexpr (lockType == ReaderWriterLockType::READ) {
std::int64_t expected = _activeReaderCount.load(std::memory_order_relaxed);
do {
if (expected < 0L) {
expected = 0L;
}
} while (!std::atomic_compare_exchange_strong(&_activeReaderCount, &expected, expected + 1L));
return expected + 1L;
} else {
std::int64_t expected = _activeReaderCount.load(std::memory_order_relaxed);
do {
if (expected > 0L) {
expected = 0L;
}
} while (!std::atomic_compare_exchange_strong(&_activeReaderCount, &expected, expected - 1L));
return expected - 1L;
}
}

template<ReaderWriterLockType lockType>
std::int64_t
unlock() const noexcept {
if constexpr (lockType == ReaderWriterLockType::READ) {
return std::atomic_fetch_sub(&_activeReaderCount, 1L) - 1L;
} else {
return std::atomic_fetch_add(&_activeReaderCount, 1L) + 1L;
}
}

template<ReaderWriterLockType lockType>
auto
scopedGuard() {
return ScopedLock<lockType>(*this);
}

template<ReaderWriterLockType lockType>
class ScopedLock { // NOSONAR - class destructor is needed for guard functionality
ReaderWriterLock *_readWriteLock;

public:
ScopedLock() = delete;
ScopedLock(const ScopedLock &) = delete;
ScopedLock(ScopedLock &&) = delete;
ScopedLock &
operator=(const ScopedLock &)
= delete;
ScopedLock &
operator=(ScopedLock &&)
= delete;

explicit constexpr ScopedLock(ReaderWriterLock &parent) noexcept : _readWriteLock(&parent) { _readWriteLock->lock<lockType>(); }

~ScopedLock() { _readWriteLock->unlock<lockType>(); }
};
};

} // namespace fair::graph

#endif // READER_WRITER_LOCK_HPP
50 changes: 50 additions & 0 deletions include/settings.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,50 @@

namespace fair::graph {

namespace detail {
template<class T>
inline constexpr void
hash_combine(std::size_t &seed, const T &v) noexcept {
std::hash<T> hasher;
seed ^= hasher(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
}
} // namespace detail

struct SettingsCtx {
// using TimePoint = std::chrono::time_point<std::chrono::utc_clock>; // TODO: change once the C++20 support is ubiquitous
using TimePoint = std::chrono::time_point<std::chrono::system_clock>;
std::optional<TimePoint> time = std::nullopt; /// UTC time-stamp from which the setting is valid
property_map context; /// user-defined multiplexing context for which the setting is valid

SettingsCtx() {}

explicit SettingsCtx(const TimePoint &t, const property_map &ctx = {}) {
time = t;
context = ctx;
}

bool
operator==(const SettingsCtx &) const
= default;

bool
operator<(const SettingsCtx &other) {
// order by time
return !time || (other.time && *time < *other.time);
}

[[nodiscard]] std::size_t
hash() const noexcept {
std::size_t seed = 0;
if (time) {
detail::hash_combine(seed, time.value().time_since_epoch().count());
}
for (const auto &[key, val] : context) {
detail::hash_combine(seed, key);
detail::hash_combine(seed, pmtv::to_base64(val));
}
return seed;
}
};

/**
Expand Down Expand Up @@ -603,4 +642,15 @@ class basic_settings : public settings_base {
static_assert(Settings<basic_settings<int>>);

} // namespace fair::graph

namespace std {
template<>
struct hash<fair::graph::SettingsCtx> {
[[nodiscard]] size_t
operator()(const fair::graph::SettingsCtx &ctx) const noexcept {
return ctx.hash();
}
};
} // namespace std

#endif // GRAPH_PROTOTYPE_SETTINGS_HPP
Loading

0 comments on commit 8a2f060

Please sign in to comment.