Skip to content

Commit

Permalink
Monotonic buffer resource
Browse files Browse the repository at this point in the history
  • Loading branch information
ArthurBandaryk committed Sep 9, 2022
1 parent f2d9825 commit 5fd756f
Show file tree
Hide file tree
Showing 40 changed files with 571 additions and 100 deletions.
1 change: 1 addition & 0 deletions eventuals/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ cc_library(
"lock.h",
"loop.h",
"map.h",
"memory.h",
"notification.h",
"on-begin.h",
"on-ended.h",
Expand Down
12 changes: 12 additions & 0 deletions eventuals/catch.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ struct _Catch final {
k_.Register(*interrupt_);
}

k_.Register(std::move(resource_));

k_.Start(std::forward<Args>(args)...);
}

Expand Down Expand Up @@ -154,6 +156,8 @@ struct _Catch final {
k_.Register(*interrupt_);
}

k_.Register(std::move(resource_));

k_.Fail(std::forward<Error>(error));
}
}
Expand All @@ -163,6 +167,8 @@ struct _Catch final {
k_.Register(*interrupt_);
}

k_.Register(std::move(resource_));

k_.Stop();
}

Expand All @@ -175,6 +181,10 @@ struct _Catch final {
// the handler.
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
resource_ = std::move(resource);
}

Bytes StaticHeapSize() {
return Bytes(0) + k_.StaticHeapSize();
}
Expand All @@ -183,6 +193,8 @@ struct _Catch final {

Interrupt* interrupt_ = nullptr;

stout::borrowed_ptr<std::pmr::memory_resource> resource_;

// NOTE: we store 'k_' as the _last_ member so it will be
// destructed _first_ and thus we won't have any use-after-delete
// issues during destruction of 'k_' if it holds any references or
Expand Down
8 changes: 8 additions & 0 deletions eventuals/closure.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "eventuals/compose.h"
#include "eventuals/interrupt.h"
#include "eventuals/memory.h"
#include "eventuals/type-erased-stream.h"
#include "stout/bytes.h"

Expand Down Expand Up @@ -52,13 +53,18 @@ struct _Closure final {
interrupt_ = &interrupt;
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
resource_ = std::move(resource);
}

[[nodiscard]] auto& continuation() {
if (!continuation_) {
continuation_.emplace(f_().template k<Arg_>(std::move(k_)));

if (interrupt_ != nullptr) {
continuation_->Register(*interrupt_);
}
continuation_->Register(std::move(resource_));
}

return *continuation_;
Expand All @@ -76,6 +82,8 @@ struct _Closure final {

std::optional<Continuation_> continuation_;

stout::borrowed_ptr<std::pmr::memory_resource> resource_;

// NOTE: we store 'k_' as the _last_ member so it will be
// destructed _first_ and thus we won't have any use-after-delete
// issues during destruction of 'k_' if it holds any references or
Expand Down
4 changes: 4 additions & 0 deletions eventuals/concurrent.h
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,10 @@ struct _Concurrent final {
handler_->Install();
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
k_.Register(std::move(resource));
}

Bytes StaticHeapSize() {
return Bytes(0) + k_.StaticHeapSize();
}
Expand Down
10 changes: 10 additions & 0 deletions eventuals/conditional.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ struct _Conditional {
then_adapted_->Register(*interrupt_);
}

then_adapted_->Register(std::move(resource_));

then_adapted_->Start();
} else {
else_adapted_.emplace(
Expand All @@ -60,6 +62,8 @@ struct _Conditional {
else_adapted_->Register(*interrupt_);
}

else_adapted_->Register(std::move(resource_));

else_adapted_->Start();
}
}
Expand All @@ -79,6 +83,10 @@ struct _Conditional {
k_.Register(interrupt);
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
resource_ = std::move(resource);
}

Bytes StaticHeapSize() {
return Bytes(0) + k_.StaticHeapSize();
}
Expand Down Expand Up @@ -119,6 +127,8 @@ struct _Conditional {
std::optional<ThenAdapted_> then_adapted_;
std::optional<ElseAdapted_> else_adapted_;

stout::borrowed_ptr<std::pmr::memory_resource> resource_;

// NOTE: we store 'k_' as the _last_ member so it will be
// destructed _first_ and thus we won't have any use-after-delete
// issues during destruction of 'k_' if it holds any references or
Expand Down
12 changes: 10 additions & 2 deletions eventuals/do-all.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ struct _DoAll final {
std::make_index_sequence<sizeof...(Eventuals_)>{}));

std::apply(
[](auto&... fiber) {
[this](auto&... fiber) {
static std::atomic<int> i = 0;

// Clone the current scheduler context for running the eventual.
Expand All @@ -253,11 +253,12 @@ struct _DoAll final {
...);

(fiber.context->scheduler()->Submit(
[&]() {
[&, this]() {
CHECK_EQ(
&fiber.context.value(),
Scheduler::Context::Get().get());
fiber.k.Register(fiber.interrupt);
fiber.k.Register(resource_.reborrow());
fiber.k.Start();
},
fiber.context.value()),
Expand Down Expand Up @@ -286,6 +287,11 @@ struct _DoAll final {
handler_->Install();
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
resource_ = std::move(resource);
}


Bytes StaticHeapSize() {
return Bytes(0) + k_.StaticHeapSize();
}
Expand Down Expand Up @@ -314,6 +320,8 @@ struct _DoAll final {

std::optional<Interrupt::Handler> handler_;

stout::borrowed_ptr<std::pmr::memory_resource> resource_;

// NOTE: we store 'k_' as the _last_ member so it will be
// destructed _first_ and thus we won't have any use-after-delete
// issues during destruction of 'k_' if it holds any references or
Expand Down
46 changes: 32 additions & 14 deletions eventuals/event-loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "eventuals/callback.h"
#include "eventuals/closure.h"
#include "eventuals/lazy.h"
#include "eventuals/memory.h"
#include "eventuals/stream.h"
#include "eventuals/then.h"
#include "eventuals/type-traits.h"
Expand Down Expand Up @@ -399,6 +400,11 @@ class EventLoop final : public Scheduler {
handler_->Install();
}

void Register(
stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
k_.Register(std::move(resource));
}

Bytes StaticHeapSize() {
return Bytes(0) + k_.StaticHeapSize();
}
Expand Down Expand Up @@ -743,6 +749,10 @@ class EventLoop final : public Scheduler {
handler_->Install();
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
k_.Register(std::move(resource));
}

Bytes StaticHeapSize() {
return Bytes(0) + k_.StaticHeapSize();
}
Expand Down Expand Up @@ -1048,6 +1058,10 @@ class EventLoop final : public Scheduler {
handler_->Install();
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
k_.Register(std::move(resource));
}

Bytes StaticHeapSize() {
return Bytes(0) + k_.StaticHeapSize();
}
Expand Down Expand Up @@ -1144,6 +1158,7 @@ struct _EventLoopSchedule final {
Continuation(Continuation&& that)
: e_(std::move(that.e_)),
context_(std::move(that.context_)),
resource_(std::move(that.resource_)),
k_(std::move(that.k_)) {}

~Continuation() override {
Expand Down Expand Up @@ -1246,29 +1261,27 @@ struct _EventLoopSchedule final {
k_.Register(interrupt);
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
resource_ = std::move(resource);
}

void Adapt() {
if (!adapted_) {
// Save previous context (even if it's us).
stout::borrowed_ref<Scheduler::Context> previous =
Scheduler::Context::Get().reborrow();

adapted_.reset(
// NOTE: for now we're assuming usage of something like
// 'jemalloc' so 'new' should use lock-free and thread-local
// arenas. Ideally allocating memory during runtime should
// actually be *faster* because the memory should have
// better locality for the execution resource being used
// (i.e., a local NUMA node). However, we should reconsider
// this design decision if in practice this performance
// tradeoff is not emperically a benefit.
new Adapted_(
std::move(e_).template k<Arg_>(
Reschedule(std::move(previous))
.template k<Value_>(_Then::Adaptor<K_>{k_}))));
adapted_ = MakeUniqueUsingMemoryResourceOrNew<Adapted_>(
resource_,
std::move(e_).template k<Arg_>(
Reschedule(std::move(previous))
.template k<Value_>(_Then::Adaptor<K_>{k_})));

if (interrupt_ != nullptr) {
adapted_->Register(*interrupt_);
}

adapted_->Register(std::move(resource_));
}
}

Expand Down Expand Up @@ -1297,7 +1310,12 @@ struct _EventLoopSchedule final {
std::declval<_Reschedule::Composable>()
.template k<Value_>(std::declval<_Then::Adaptor<K_>>())));

std::unique_ptr<Adapted_> adapted_;
std::unique_ptr<
Adapted_,
Callback<void(void*)>>
adapted_{nullptr, [](void*) {}};

stout::borrowed_ptr<std::pmr::memory_resource> resource_;

// NOTE: we store 'k_' as the _last_ member so it will be
// destructed _first_ and thus we won't have any use-after-delete
Expand Down
4 changes: 4 additions & 0 deletions eventuals/eventual.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ struct _Eventual {
}
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
k_.Register(std::move(resource));
}

Adaptor<K_, Value_, Errors_>& adaptor() {
// Note: needed to delay doing this until now because this
// eventual might have been moved before being started.
Expand Down
4 changes: 4 additions & 0 deletions eventuals/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ struct _Filter final {
k_.Register(interrupt);
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
k_.Register(std::move(resource));
}

Bytes StaticHeapSize() {
return Bytes(0) + k_.StaticHeapSize();
}
Expand Down
6 changes: 6 additions & 0 deletions eventuals/finally.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <memory_resource>

#include "eventuals/expected.h"
#include "eventuals/terminal.h" // For 'StoppedException'.
#include "eventuals/then.h"
Expand Down Expand Up @@ -42,6 +44,10 @@ struct _Finally final {
k_.Register(interrupt);
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
k_.Register(std::move(resource));
}

Bytes StaticHeapSize() {
return Bytes(0) + k_.StaticHeapSize();
}
Expand Down
11 changes: 11 additions & 0 deletions eventuals/flat-map.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ struct _FlatMap final {
// Already registered K once in 'FlatMap::Body()'.
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
}

FlatMap_* streamforeach_;
};

Expand Down Expand Up @@ -89,6 +92,10 @@ struct _FlatMap final {
k_.Register(interrupt);
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
resource_ = std::move(resource);
}

template <typename... Args>
void Body(Args&&... args) {
CHECK(!adapted_.has_value());
Expand All @@ -101,6 +108,8 @@ struct _FlatMap final {
adapted_->Register(*interrupt_);
}

adapted_->Register(std::move(resource_));

adapted_->Start();
}

Expand Down Expand Up @@ -152,6 +161,8 @@ struct _FlatMap final {

stout::borrowed_ptr<Scheduler::Context> previous_;

stout::borrowed_ptr<std::pmr::memory_resource> resource_;

// NOTE: we store 'k_' as the _last_ member so it will be
// destructed _first_ and thus we won't have any use-after-delete
// issues during destruction of 'k_' if it holds any references or
Expand Down
Loading

0 comments on commit 5fd756f

Please sign in to comment.