Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Monotonic buffer resource #572

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions eventuals/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ cc_library(
"lock.h",
"loop.h",
"map.h",
"memory.h",
"notification.h",
"on-begin.h",
"on-ended.h",
Expand Down
17 changes: 17 additions & 0 deletions eventuals/catch.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "eventuals/terminal.h"
#include "eventuals/then.h"
#include "eventuals/type-traits.h"
#include "stout/bytes.h"

////////////////////////////////////////////////////////////////////////

Expand Down Expand Up @@ -114,6 +115,8 @@ struct _Catch final {
k_.Register(*interrupt_);
}

k_.Register(std::move(resource_));

k_.Start(std::forward<Args>(args)...);
}

Expand Down Expand Up @@ -153,6 +156,8 @@ struct _Catch final {
k_.Register(*interrupt_);
}

k_.Register(std::move(resource_));

k_.Fail(std::forward<Error>(error));
}
}
Expand All @@ -162,6 +167,8 @@ struct _Catch final {
k_.Register(*interrupt_);
}

k_.Register(std::move(resource_));

k_.Stop();
}

Expand All @@ -174,10 +181,20 @@ struct _Catch final {
// the handler.
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
resource_ = std::move(resource);
}

Bytes StaticHeapSize() {
return Bytes(0) + k_.StaticHeapSize();
}

std::tuple<CatchHandlers_...> catch_handlers_;

Interrupt* interrupt_ = nullptr;

stout::borrowed_ptr<std::pmr::memory_resource> resource_;

// NOTE: we store 'k_' as the _last_ member so it will be
// destructed _first_ and thus we won't have any use-after-delete
// issues during destruction of 'k_' if it holds any references or
Expand Down
13 changes: 13 additions & 0 deletions eventuals/closure.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

#include "eventuals/compose.h"
#include "eventuals/interrupt.h"
#include "eventuals/memory.h"
#include "eventuals/type-erased-stream.h"
#include "stout/bytes.h"

////////////////////////////////////////////////////////////////////////

Expand Down Expand Up @@ -51,18 +53,27 @@ struct _Closure final {
interrupt_ = &interrupt;
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
resource_ = std::move(resource);
}

[[nodiscard]] auto& continuation() {
if (!continuation_) {
continuation_.emplace(f_().template k<Arg_>(std::move(k_)));

if (interrupt_ != nullptr) {
continuation_->Register(*interrupt_);
}
continuation_->Register(std::move(resource_));
}

return *continuation_;
}

Bytes StaticHeapSize() {
return Bytes(0) + k_.StaticHeapSize();
}

F_ f_;

Interrupt* interrupt_ = nullptr;
Expand All @@ -71,6 +82,8 @@ struct _Closure final {

std::optional<Continuation_> continuation_;

stout::borrowed_ptr<std::pmr::memory_resource> resource_;

// NOTE: we store 'k_' as the _last_ member so it will be
// destructed _first_ and thus we won't have any use-after-delete
// issues during destruction of 'k_' if it holds any references or
Expand Down
9 changes: 9 additions & 0 deletions eventuals/concurrent-ordered.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "eventuals/map.h"
#include "eventuals/stream.h"
#include "eventuals/terminal.h"
#include "stout/bytes.h"

/////////////////////////////////////////////////////////////////////

Expand Down Expand Up @@ -94,6 +95,10 @@ struct _ReorderAdaptor final {
upstream_->Done();
}

Bytes StaticHeapSize() {
return Bytes(0) + k_.StaticHeapSize();
}

TypeErasedStream* upstream_ = nullptr;

std::map<int, std::deque<Value_>> buffer_;
Expand Down Expand Up @@ -208,6 +213,10 @@ struct _ConcurrentOrderedAdaptor final {
upstream_->Done();
}

Bytes StaticHeapSize() {
return Bytes(0) + k_.StaticHeapSize();
}

bool ended_ = false;

std::optional<int> index_;
Expand Down
9 changes: 9 additions & 0 deletions eventuals/concurrent.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "eventuals/terminal.h"
#include "eventuals/then.h"
#include "eventuals/until.h"
#include "stout/bytes.h"

////////////////////////////////////////////////////////////////////////

Expand Down Expand Up @@ -705,6 +706,14 @@ struct _Concurrent final {
handler_->Install();
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
k_.Register(std::move(resource));
}

Bytes StaticHeapSize() {
return Bytes(0) + k_.StaticHeapSize();
}

Adaptor<F_, Arg_> adaptor_;

TypeErasedStream* stream_ = nullptr;
Expand Down
15 changes: 15 additions & 0 deletions eventuals/conditional.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "eventuals/then.h" // For '_Then::Adaptor'.
#include "eventuals/type-traits.h" // For 'type_identity'.
#include "stout/bytes.h"

////////////////////////////////////////////////////////////////////////

Expand Down Expand Up @@ -49,6 +50,8 @@ struct _Conditional {
then_adapted_->Register(*interrupt_);
}

then_adapted_->Register(std::move(resource_));

then_adapted_->Start();
} else {
else_adapted_.emplace(
Expand All @@ -59,6 +62,8 @@ struct _Conditional {
else_adapted_->Register(*interrupt_);
}

else_adapted_->Register(std::move(resource_));

else_adapted_->Start();
}
}
Expand All @@ -78,6 +83,14 @@ struct _Conditional {
k_.Register(interrupt);
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
resource_ = std::move(resource);
}

Bytes StaticHeapSize() {
return Bytes(0) + k_.StaticHeapSize();
}

Condition_ condition_;
Then_ then_;
Else_ else_;
Expand Down Expand Up @@ -114,6 +127,8 @@ struct _Conditional {
std::optional<ThenAdapted_> then_adapted_;
std::optional<ElseAdapted_> else_adapted_;

stout::borrowed_ptr<std::pmr::memory_resource> resource_;

// NOTE: we store 'k_' as the _last_ member so it will be
// destructed _first_ and thus we won't have any use-after-delete
// issues during destruction of 'k_' if it holds any references or
Expand Down
17 changes: 15 additions & 2 deletions eventuals/do-all.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "eventuals/compose.h"
#include "eventuals/scheduler.h"
#include "eventuals/terminal.h"
#include "stout/bytes.h"

////////////////////////////////////////////////////////////////////////

Expand Down Expand Up @@ -242,7 +243,7 @@ struct _DoAll final {
std::make_index_sequence<sizeof...(Eventuals_)>{}));

std::apply(
[](auto&... fiber) {
[this](auto&... fiber) {
static std::atomic<int> i = 0;

// Clone the current scheduler context for running the eventual.
Expand All @@ -252,11 +253,12 @@ struct _DoAll final {
...);

(fiber.context->scheduler()->Submit(
[&]() {
[&, this]() {
CHECK_EQ(
&fiber.context.value(),
Scheduler::Context::Get().get());
fiber.k.Register(fiber.interrupt);
fiber.k.Register(resource_.reborrow());
fiber.k.Start();
},
fiber.context.value()),
Expand Down Expand Up @@ -285,6 +287,15 @@ struct _DoAll final {
handler_->Install();
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
resource_ = std::move(resource);
}


Bytes StaticHeapSize() {
return Bytes(0) + k_.StaticHeapSize();
}

// NOTE: need to destruct the fibers LAST since they have a
// Scheduler::Context which may get borrowed in 'adaptor_' and
// it's continuations so those need to be destructed first.
Expand All @@ -309,6 +320,8 @@ struct _DoAll final {

std::optional<Interrupt::Handler> handler_;

stout::borrowed_ptr<std::pmr::memory_resource> resource_;

// NOTE: we store 'k_' as the _last_ member so it will be
// destructed _first_ and thus we won't have any use-after-delete
// issues during destruction of 'k_' if it holds any references or
Expand Down
63 changes: 49 additions & 14 deletions eventuals/event-loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
#include "eventuals/callback.h"
#include "eventuals/closure.h"
#include "eventuals/lazy.h"
#include "eventuals/memory.h"
#include "eventuals/stream.h"
#include "eventuals/then.h"
#include "eventuals/type-traits.h"
#include "stout/borrowed_ptr.h"
#include "stout/bytes.h"
#include "uv.h"

////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -398,6 +400,15 @@ class EventLoop final : public Scheduler {
handler_->Install();
}

void Register(
stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
k_.Register(std::move(resource));
}

Bytes StaticHeapSize() {
return Bytes(0) + k_.StaticHeapSize();
}

private:
EventLoop& loop() {
return clock_->loop();
Expand Down Expand Up @@ -738,6 +749,14 @@ class EventLoop final : public Scheduler {
handler_->Install();
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
k_.Register(std::move(resource));
}

Bytes StaticHeapSize() {
return Bytes(0) + k_.StaticHeapSize();
}

private:
// Adaptors to libuv functions.
uv_signal_t* signal() {
Expand Down Expand Up @@ -1039,6 +1058,14 @@ class EventLoop final : public Scheduler {
handler_->Install();
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
k_.Register(std::move(resource));
}

Bytes StaticHeapSize() {
return Bytes(0) + k_.StaticHeapSize();
}

private:
// Adaptors to libuv functions.
uv_poll_t* poll() {
Expand Down Expand Up @@ -1131,6 +1158,7 @@ struct _EventLoopSchedule final {
Continuation(Continuation&& that)
: e_(std::move(that.e_)),
context_(std::move(that.context_)),
resource_(std::move(that.resource_)),
k_(std::move(that.k_)) {}

~Continuation() override {
Expand Down Expand Up @@ -1233,32 +1261,34 @@ struct _EventLoopSchedule final {
k_.Register(interrupt);
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
resource_ = std::move(resource);
}

void Adapt() {
if (!adapted_) {
// Save previous context (even if it's us).
stout::borrowed_ref<Scheduler::Context> previous =
Scheduler::Context::Get().reborrow();

adapted_.reset(
// NOTE: for now we're assuming usage of something like
// 'jemalloc' so 'new' should use lock-free and thread-local
// arenas. Ideally allocating memory during runtime should
// actually be *faster* because the memory should have
// better locality for the execution resource being used
// (i.e., a local NUMA node). However, we should reconsider
// this design decision if in practice this performance
// tradeoff is not emperically a benefit.
new Adapted_(
std::move(e_).template k<Arg_>(
Reschedule(std::move(previous))
.template k<Value_>(_Then::Adaptor<K_>{k_}))));
adapted_ = MakeUniqueUsingMemoryResourceOrNew<Adapted_>(
resource_,
std::move(e_).template k<Arg_>(
Reschedule(std::move(previous))
.template k<Value_>(_Then::Adaptor<K_>{k_})));

if (interrupt_ != nullptr) {
adapted_->Register(*interrupt_);
}

adapted_->Register(std::move(resource_));
}
}

Bytes StaticHeapSize() {
return Bytes(sizeof(Adapted_)) + k_.StaticHeapSize();
}

E_ e_;

std::optional<
Expand All @@ -1280,7 +1310,12 @@ struct _EventLoopSchedule final {
std::declval<_Reschedule::Composable>()
.template k<Value_>(std::declval<_Then::Adaptor<K_>>())));

std::unique_ptr<Adapted_> adapted_;
std::unique_ptr<
Adapted_,
Callback<void(void*)>>
adapted_{nullptr, [](void*) {}};

stout::borrowed_ptr<std::pmr::memory_resource> resource_;

// NOTE: we store 'k_' as the _last_ member so it will be
// destructed _first_ and thus we won't have any use-after-delete
Expand Down
Loading