Skip to content

Commit

Permalink
Monotonic buffer resource
Browse files Browse the repository at this point in the history
  • Loading branch information
ArthurBandaryk committed Sep 7, 2022
1 parent f2d9825 commit cf4401f
Show file tree
Hide file tree
Showing 17 changed files with 258 additions and 63 deletions.
1 change: 1 addition & 0 deletions eventuals/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ cc_library(
"lock.h",
"loop.h",
"map.h",
"memory.h",
"notification.h",
"on-begin.h",
"on-ended.h",
Expand Down
8 changes: 8 additions & 0 deletions eventuals/closure.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "eventuals/compose.h"
#include "eventuals/interrupt.h"
#include "eventuals/memory.h"
#include "eventuals/type-erased-stream.h"
#include "stout/bytes.h"

Expand Down Expand Up @@ -52,13 +53,18 @@ struct _Closure final {
interrupt_ = &interrupt;
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
resource_ = std::move(resource);
}

[[nodiscard]] auto& continuation() {
if (!continuation_) {
continuation_.emplace(f_().template k<Arg_>(std::move(k_)));

if (interrupt_ != nullptr) {
continuation_->Register(*interrupt_);
}
continuation_->Register(std::move(resource_));
}

return *continuation_;
Expand All @@ -76,6 +82,8 @@ struct _Closure final {

std::optional<Continuation_> continuation_;

stout::borrowed_ptr<std::pmr::memory_resource> resource_;

// NOTE: we store 'k_' as the _last_ member so it will be
// destructed _first_ and thus we won't have any use-after-delete
// issues during destruction of 'k_' if it holds any references or
Expand Down
4 changes: 4 additions & 0 deletions eventuals/concurrent.h
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,10 @@ struct _Concurrent final {
handler_->Install();
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
k_.Register(std::move(resource));
}

Bytes StaticHeapSize() {
return Bytes(0) + k_.StaticHeapSize();
}
Expand Down
4 changes: 4 additions & 0 deletions eventuals/eventual.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ struct _Eventual {
}
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
k_.Register(std::move(resource));
}

Adaptor<K_, Value_, Errors_>& adaptor() {
// Note: needed to delay doing this until now because this
// eventual might have been moved before being started.
Expand Down
8 changes: 8 additions & 0 deletions eventuals/lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,10 @@ struct _Acquire final {
k_.Register(interrupt);
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
k_.Register(std::move(resource));
}

Bytes StaticHeapSize() {
return Bytes(0) + k_.StaticHeapSize();
}
Expand Down Expand Up @@ -553,6 +557,10 @@ struct _Release final {
k_.Register(interrupt);
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
k_.Register(std::move(resource));
}

Bytes StaticHeapSize() {
return Bytes(0) + k_.StaticHeapSize();
}
Expand Down
4 changes: 4 additions & 0 deletions eventuals/loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ struct _Loop final {
}
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
k_.Register(std::move(resource));
}

template <typename... Args>
void Body(Args&&... args) {
if constexpr (IsUndefined<Body_>::value) {
Expand Down
11 changes: 11 additions & 0 deletions eventuals/map.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ struct _Map final {
// Already registered K once in 'Map::Register()'.
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&&) {
}

K_& k_;
};

Expand Down Expand Up @@ -64,6 +67,8 @@ struct _Map final {
if (interrupt_ != nullptr) {
adapted_->Register(*interrupt_);
}

adapted_->Register(std::move(resource_));
}

adapted_->Start(std::forward<Args>(args)...);
Expand All @@ -79,6 +84,10 @@ struct _Map final {
k_.Register(interrupt);
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
resource_ = std::move(resource);
}

Bytes StaticHeapSize() {
return Bytes(0) + k_.StaticHeapSize();
}
Expand All @@ -92,6 +101,8 @@ struct _Map final {

Interrupt* interrupt_ = nullptr;

stout::borrowed_ptr<std::pmr::memory_resource> resource_;

// NOTE: we store 'k_' as the _last_ member so it will be
// destructed _first_ and thus we won't have any use-after-delete
// issues during destruction of 'k_' if it holds any references or
Expand Down
44 changes: 44 additions & 0 deletions eventuals/memory.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#pragma once

#include <iostream>
#include <memory>
#include <memory_resource>

#include "eventuals/callback.h"
#include "stout/borrowable.h"

////////////////////////////////////////////////////////////////////////

namespace eventuals {

////////////////////////////////////////////////////////////////////////

template <typename T, typename... Args>
std::unique_ptr<T, Callback<void(T*)>> MakeUniqueUsingMemoryResourceOrNew(
stout::borrowed_ptr<std::pmr::memory_resource>& resource,
Args&&... args) {
if (resource) {
size_t size = sizeof(T);
void* pointer = resource->allocate(size);
CHECK_NOTNULL(pointer);
new (pointer) T(std::forward<Args>(args)...);
return std::unique_ptr<T, Callback<void(T*)>>(
static_cast<T*>(pointer),
[resource = resource.reborrow()](T* t) {
t->~T();
resource->deallocate(t, sizeof(T));
});
} else {
return std::unique_ptr<T, Callback<void(T*)>>(
new T(std::forward<Args>(args)...),
[](T* t) {
delete t;
});
}
}

////////////////////////////////////////////////////////////////////////

} // namespace eventuals

////////////////////////////////////////////////////////////////////////
4 changes: 4 additions & 0 deletions eventuals/repeat.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ struct _Repeat final {
k_.Register(interrupt);
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
k_.Register(std::move(resource));
}

void Next() override {
previous_->Continue([this]() {
k_.Body();
Expand Down
22 changes: 22 additions & 0 deletions eventuals/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,10 @@ struct _Reschedule final {
k_.Register(interrupt);
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
k_.Register(std::move(resource));
}

Bytes StaticHeapSize() {
return Bytes(0) + k_.StaticHeapSize();
}
Expand All @@ -307,6 +311,8 @@ struct _Reschedule final {

TypeErasedStream* stream_ = nullptr;

stout::borrowed_ptr<std::pmr::memory_resource> resource_;

// NOTE: we store 'k_' as the _last_ member so it will be
// destructed _first_ and thus we won't have any use-after-delete
// issues during destruction of 'k_' if it holds any references or
Expand Down Expand Up @@ -382,6 +388,8 @@ struct Reschedulable final {
if (interrupt_ != nullptr) {
continuation_->Register(*interrupt_);
}

continuation_->Register(std::move(resource_));
}

// NOTE: there is no invariant that 'previous' equals the current
Expand All @@ -397,6 +405,10 @@ struct Reschedulable final {
interrupt_ = &interrupt;
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
resource_ = std::move(resource);
}

Bytes StaticHeapSize() {
return Bytes(0) + k_.StaticHeapSize();
}
Expand All @@ -409,6 +421,8 @@ struct Reschedulable final {

std::optional<Continuation_> continuation_;

stout::borrowed_ptr<std::pmr::memory_resource> resource_;

// NOTE: we store 'k_' as the _last_ member so it will be
// destructed _first_ and thus we won't have any use-after-delete
// issues during destruction of 'k_' if it holds any references or
Expand Down Expand Up @@ -463,6 +477,10 @@ struct _Preempt final {
interrupt_ = &interrupt;
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
resource_ = std::move(resource);
}

void Adapt() {
CHECK(!adapted_);

Expand All @@ -478,6 +496,8 @@ struct _Preempt final {
if (interrupt_ != nullptr) {
adapted_->Register(*interrupt_);
}

adapted_->Register(std::move(resource_));
}

Bytes StaticHeapSize() {
Expand All @@ -500,6 +520,8 @@ struct _Preempt final {

std::optional<Adapted_> adapted_;

stout::borrowed_ptr<std::pmr::memory_resource> resource_;

// NOTE: we store 'k_' as the _last_ member so it will be
// destructed _first_ and thus we won't have any use-after-delete
// issues during destruction of 'k_' if it holds any references or
Expand Down
32 changes: 17 additions & 15 deletions eventuals/static-thread-pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "eventuals/compose.h"
#include "eventuals/lazy.h"
#include "eventuals/memory.h"
#include "eventuals/scheduler.h"
#include "eventuals/semaphore.h"
#include "stout/borrowed_ptr.h"
Expand Down Expand Up @@ -164,6 +165,7 @@ struct _StaticThreadPoolSchedule final {
: e_(std::move(that.e_)),
context_(std::move(that.context_)),
engine_(device_()),
resource_(std::move(that.resource_)),
k_(std::move(that.k_)) {}

~Continuation() override {
Expand Down Expand Up @@ -446,30 +448,28 @@ struct _StaticThreadPoolSchedule final {

void Register(Interrupt& interrupt) {
interrupt_ = &interrupt;

// NOTE: we propagate interrupt registration when we adapt or
// when we call 'Fail()' in the cases when we aren't adapting.
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
resource_ = std::move(resource);
}

void Adapt() {
if (!adapted_) {
// Save previous context (even if it's us).
stout::borrowed_ref<Scheduler::Context> previous =
Scheduler::Context::Get().reborrow();

adapted_.reset(
// NOTE: for now we're assuming usage of something like
// 'jemalloc' so 'new' should use lock-free and thread-local
// arenas. Ideally allocating memory during runtime should
// actually be *faster* because the memory should have
// better locality for the execution resource being used
// (i.e., a local NUMA node). However, we should reconsider
// this design decision if in practice this performance
// tradeoff is not emperically a benefit.
new Adapted_(
std::move(e_).template k<Arg_>(
Reschedule(std::move(previous))
.template k<Value_>(std::move(k_)))));
adapted_ = MakeUniqueUsingMemoryResourceOrNew<Adapted_>(
resource_,
std::move(e_).template k<Arg_>(
Reschedule(
std::move(previous))
.template k<Value_>(std::move(k_))));

adapted_->Register(std::move(resource_));

if (interrupt_ != nullptr) {
adapted_->Register(*interrupt_);
Expand Down Expand Up @@ -516,7 +516,9 @@ struct _StaticThreadPoolSchedule final {
std::declval<_Reschedule::Composable>()
.template k<Value_>(std::declval<K_>())));

std::unique_ptr<Adapted_> adapted_;
std::unique_ptr<Adapted_, Callback<void(Adapted_*)>> adapted_{nullptr, [](Adapted_*) {}};

stout::borrowed_ptr<std::pmr::memory_resource> resource_;

// NOTE: we store 'k_' as the _last_ member so it will be
// destructed _first_ and thus we won't have any use-after-delete
Expand Down
4 changes: 4 additions & 0 deletions eventuals/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ struct _Stream final {
}
}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
k_.Register(std::move(resource));
}

void Next() override {
static_assert(
!IsUndefined<Next_>::value,
Expand Down
3 changes: 3 additions & 0 deletions eventuals/terminal.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ struct _Terminal final {

void Register(Interrupt&) {}

void Register(stout::borrowed_ptr<std::pmr::memory_resource>&& resource) {
}

Bytes StaticHeapSize() {
return Bytes(0);
}
Expand Down
Loading

0 comments on commit cf4401f

Please sign in to comment.