From 2f575f836bfb75876f0b69bc09adc5cdd1ac07fd Mon Sep 17 00:00:00 2001 From: archi Date: Tue, 6 Sep 2022 11:28:37 +0300 Subject: [PATCH] Monotonic buffer resource --- eventuals/BUILD.bazel | 1 + eventuals/catch.h | 12 ++++ eventuals/closure.h | 8 +++ eventuals/concurrent.h | 4 ++ eventuals/conditional.h | 10 +++ eventuals/do-all.h | 12 +++- eventuals/event-loop.h | 46 +++++++++----- eventuals/eventual.h | 4 ++ eventuals/filter.h | 4 ++ eventuals/finally.h | 6 ++ eventuals/flat-map.h | 11 ++++ eventuals/generator.h | 22 ++++--- eventuals/head.h | 4 ++ eventuals/http.h | 4 ++ eventuals/if.h | 10 +++ eventuals/lock.h | 12 ++++ eventuals/loop.h | 4 ++ eventuals/map.h | 11 ++++ eventuals/memory.h | 48 +++++++++++++++ eventuals/on-begin.h | 8 +++ eventuals/on-ended.h | 8 +++ eventuals/raise.h | 4 ++ eventuals/range.h | 4 ++ eventuals/reduce.h | 11 ++++ eventuals/repeat.h | 4 ++ eventuals/scheduler.h | 22 +++++++ eventuals/static-thread-pool.h | 35 ++++++----- eventuals/stream.h | 4 ++ eventuals/take.h | 8 +++ eventuals/task.h | 108 ++++++++++++++++++--------------- eventuals/terminal.h | 5 ++ eventuals/then.h | 15 +++++ eventuals/transformer.h | 20 +++--- eventuals/until.h | 15 +++++ test/BUILD.bazel | 1 - test/generator.cc | 37 +++++++++++ test/static-thread-pool.cc | 49 ++++++++++++++- test/take.cc | 4 -- test/task.cc | 32 ++++++++++ test/transformer.cc | 44 ++++++++++++++ 40 files changed, 571 insertions(+), 100 deletions(-) create mode 100644 eventuals/memory.h diff --git a/eventuals/BUILD.bazel b/eventuals/BUILD.bazel index 98afbe0b4..875761823 100644 --- a/eventuals/BUILD.bazel +++ b/eventuals/BUILD.bazel @@ -37,6 +37,7 @@ cc_library( "lock.h", "loop.h", "map.h", + "memory.h", "notification.h", "on-begin.h", "on-ended.h", diff --git a/eventuals/catch.h b/eventuals/catch.h index d78cc5f53..4ca02dba3 100644 --- a/eventuals/catch.h +++ b/eventuals/catch.h @@ -115,6 +115,8 @@ struct _Catch final { k_.Register(*interrupt_); } + k_.Register(std::move(resource_)); + k_.Start(std::forward(args)...); } @@ -154,6 +156,8 @@ struct _Catch final { k_.Register(*interrupt_); } + k_.Register(std::move(resource_)); + k_.Fail(std::forward(error)); } } @@ -163,6 +167,8 @@ struct _Catch final { k_.Register(*interrupt_); } + k_.Register(std::move(resource_)); + k_.Stop(); } @@ -175,6 +181,10 @@ struct _Catch final { // the handler. } + void Register(stout::borrowed_ptr&& resource) { + resource_ = std::move(resource); + } + Bytes StaticHeapSize() { return Bytes(0) + k_.StaticHeapSize(); } @@ -183,6 +193,8 @@ struct _Catch final { Interrupt* interrupt_ = nullptr; + stout::borrowed_ptr resource_; + // NOTE: we store 'k_' as the _last_ member so it will be // destructed _first_ and thus we won't have any use-after-delete // issues during destruction of 'k_' if it holds any references or diff --git a/eventuals/closure.h b/eventuals/closure.h index 4dc4d705d..21a31d56f 100644 --- a/eventuals/closure.h +++ b/eventuals/closure.h @@ -4,6 +4,7 @@ #include "eventuals/compose.h" #include "eventuals/interrupt.h" +#include "eventuals/memory.h" #include "eventuals/type-erased-stream.h" #include "stout/bytes.h" @@ -52,6 +53,10 @@ struct _Closure final { interrupt_ = &interrupt; } + void Register(stout::borrowed_ptr&& resource) { + resource_ = std::move(resource); + } + [[nodiscard]] auto& continuation() { if (!continuation_) { continuation_.emplace(f_().template k(std::move(k_))); @@ -59,6 +64,7 @@ struct _Closure final { if (interrupt_ != nullptr) { continuation_->Register(*interrupt_); } + continuation_->Register(std::move(resource_)); } return *continuation_; @@ -76,6 +82,8 @@ struct _Closure final { std::optional continuation_; + stout::borrowed_ptr resource_; + // NOTE: we store 'k_' as the _last_ member so it will be // destructed _first_ and thus we won't have any use-after-delete // issues during destruction of 'k_' if it holds any references or diff --git a/eventuals/concurrent.h b/eventuals/concurrent.h index 9e4c64b8f..d7f830b25 100644 --- a/eventuals/concurrent.h +++ b/eventuals/concurrent.h @@ -706,6 +706,10 @@ struct _Concurrent final { handler_->Install(); } + void Register(stout::borrowed_ptr&& resource) { + k_.Register(std::move(resource)); + } + Bytes StaticHeapSize() { return Bytes(0) + k_.StaticHeapSize(); } diff --git a/eventuals/conditional.h b/eventuals/conditional.h index 16203a596..6cead3660 100644 --- a/eventuals/conditional.h +++ b/eventuals/conditional.h @@ -50,6 +50,8 @@ struct _Conditional { then_adapted_->Register(*interrupt_); } + then_adapted_->Register(std::move(resource_)); + then_adapted_->Start(); } else { else_adapted_.emplace( @@ -60,6 +62,8 @@ struct _Conditional { else_adapted_->Register(*interrupt_); } + else_adapted_->Register(std::move(resource_)); + else_adapted_->Start(); } } @@ -79,6 +83,10 @@ struct _Conditional { k_.Register(interrupt); } + void Register(stout::borrowed_ptr&& resource) { + resource_ = std::move(resource); + } + Bytes StaticHeapSize() { return Bytes(0) + k_.StaticHeapSize(); } @@ -119,6 +127,8 @@ struct _Conditional { std::optional then_adapted_; std::optional else_adapted_; + stout::borrowed_ptr resource_; + // NOTE: we store 'k_' as the _last_ member so it will be // destructed _first_ and thus we won't have any use-after-delete // issues during destruction of 'k_' if it holds any references or diff --git a/eventuals/do-all.h b/eventuals/do-all.h index 3e24b09fb..e8935c5a1 100644 --- a/eventuals/do-all.h +++ b/eventuals/do-all.h @@ -243,7 +243,7 @@ struct _DoAll final { std::make_index_sequence{})); std::apply( - [](auto&... fiber) { + [this](auto&... fiber) { static std::atomic i = 0; // Clone the current scheduler context for running the eventual. @@ -253,11 +253,12 @@ struct _DoAll final { ...); (fiber.context->scheduler()->Submit( - [&]() { + [&, this]() { CHECK_EQ( &fiber.context.value(), Scheduler::Context::Get().get()); fiber.k.Register(fiber.interrupt); + fiber.k.Register(resource_.reborrow()); fiber.k.Start(); }, fiber.context.value()), @@ -286,6 +287,11 @@ struct _DoAll final { handler_->Install(); } + void Register(stout::borrowed_ptr&& resource) { + resource_ = std::move(resource); + } + + Bytes StaticHeapSize() { return Bytes(0) + k_.StaticHeapSize(); } @@ -314,6 +320,8 @@ struct _DoAll final { std::optional handler_; + stout::borrowed_ptr resource_; + // NOTE: we store 'k_' as the _last_ member so it will be // destructed _first_ and thus we won't have any use-after-delete // issues during destruction of 'k_' if it holds any references or diff --git a/eventuals/event-loop.h b/eventuals/event-loop.h index 5dfc4113b..f86f279da 100644 --- a/eventuals/event-loop.h +++ b/eventuals/event-loop.h @@ -14,6 +14,7 @@ #include "eventuals/callback.h" #include "eventuals/closure.h" #include "eventuals/lazy.h" +#include "eventuals/memory.h" #include "eventuals/stream.h" #include "eventuals/then.h" #include "eventuals/type-traits.h" @@ -399,6 +400,11 @@ class EventLoop final : public Scheduler { handler_->Install(); } + void Register( + stout::borrowed_ptr&& resource) { + k_.Register(std::move(resource)); + } + Bytes StaticHeapSize() { return Bytes(0) + k_.StaticHeapSize(); } @@ -743,6 +749,10 @@ class EventLoop final : public Scheduler { handler_->Install(); } + void Register(stout::borrowed_ptr&& resource) { + k_.Register(std::move(resource)); + } + Bytes StaticHeapSize() { return Bytes(0) + k_.StaticHeapSize(); } @@ -1048,6 +1058,10 @@ class EventLoop final : public Scheduler { handler_->Install(); } + void Register(stout::borrowed_ptr&& resource) { + k_.Register(std::move(resource)); + } + Bytes StaticHeapSize() { return Bytes(0) + k_.StaticHeapSize(); } @@ -1144,6 +1158,7 @@ struct _EventLoopSchedule final { Continuation(Continuation&& that) : e_(std::move(that.e_)), context_(std::move(that.context_)), + resource_(std::move(that.resource_)), k_(std::move(that.k_)) {} ~Continuation() override { @@ -1246,29 +1261,27 @@ struct _EventLoopSchedule final { k_.Register(interrupt); } + void Register(stout::borrowed_ptr&& resource) { + resource_ = std::move(resource); + } + void Adapt() { if (!adapted_) { // Save previous context (even if it's us). stout::borrowed_ref previous = Scheduler::Context::Get().reborrow(); - adapted_.reset( - // NOTE: for now we're assuming usage of something like - // 'jemalloc' so 'new' should use lock-free and thread-local - // arenas. Ideally allocating memory during runtime should - // actually be *faster* because the memory should have - // better locality for the execution resource being used - // (i.e., a local NUMA node). However, we should reconsider - // this design decision if in practice this performance - // tradeoff is not emperically a benefit. - new Adapted_( - std::move(e_).template k( - Reschedule(std::move(previous)) - .template k(_Then::Adaptor{k_})))); + adapted_ = MakeUniqueUsingMemoryResourceOrNew( + resource_, + std::move(e_).template k( + Reschedule(std::move(previous)) + .template k(_Then::Adaptor{k_}))); if (interrupt_ != nullptr) { adapted_->Register(*interrupt_); } + + adapted_->Register(std::move(resource_)); } } @@ -1297,7 +1310,12 @@ struct _EventLoopSchedule final { std::declval<_Reschedule::Composable>() .template k(std::declval<_Then::Adaptor>()))); - std::unique_ptr adapted_; + std::unique_ptr< + Adapted_, + Callback> + adapted_{nullptr, [](void*) {}}; + + stout::borrowed_ptr resource_; // NOTE: we store 'k_' as the _last_ member so it will be // destructed _first_ and thus we won't have any use-after-delete diff --git a/eventuals/eventual.h b/eventuals/eventual.h index 0e85351ea..faa4dde87 100644 --- a/eventuals/eventual.h +++ b/eventuals/eventual.h @@ -160,6 +160,10 @@ struct _Eventual { } } + void Register(stout::borrowed_ptr&& resource) { + k_.Register(std::move(resource)); + } + Adaptor& adaptor() { // Note: needed to delay doing this until now because this // eventual might have been moved before being started. diff --git a/eventuals/filter.h b/eventuals/filter.h index 1ebf85da0..8cfd0319e 100644 --- a/eventuals/filter.h +++ b/eventuals/filter.h @@ -47,6 +47,10 @@ struct _Filter final { k_.Register(interrupt); } + void Register(stout::borrowed_ptr&& resource) { + k_.Register(std::move(resource)); + } + Bytes StaticHeapSize() { return Bytes(0) + k_.StaticHeapSize(); } diff --git a/eventuals/finally.h b/eventuals/finally.h index 81202d3d0..001b6b674 100644 --- a/eventuals/finally.h +++ b/eventuals/finally.h @@ -1,5 +1,7 @@ #pragma once +#include + #include "eventuals/expected.h" #include "eventuals/terminal.h" // For 'StoppedException'. #include "eventuals/then.h" @@ -42,6 +44,10 @@ struct _Finally final { k_.Register(interrupt); } + void Register(stout::borrowed_ptr&& resource) { + k_.Register(std::move(resource)); + } + Bytes StaticHeapSize() { return Bytes(0) + k_.StaticHeapSize(); } diff --git a/eventuals/flat-map.h b/eventuals/flat-map.h index 366285c07..76de99ae5 100644 --- a/eventuals/flat-map.h +++ b/eventuals/flat-map.h @@ -52,6 +52,9 @@ struct _FlatMap final { // Already registered K once in 'FlatMap::Body()'. } + void Register(stout::borrowed_ptr&& resource) { + } + FlatMap_* streamforeach_; }; @@ -89,6 +92,10 @@ struct _FlatMap final { k_.Register(interrupt); } + void Register(stout::borrowed_ptr&& resource) { + resource_ = std::move(resource); + } + template void Body(Args&&... args) { CHECK(!adapted_.has_value()); @@ -101,6 +108,8 @@ struct _FlatMap final { adapted_->Register(*interrupt_); } + adapted_->Register(std::move(resource_)); + adapted_->Start(); } @@ -152,6 +161,8 @@ struct _FlatMap final { stout::borrowed_ptr previous_; + stout::borrowed_ptr resource_; + // NOTE: we store 'k_' as the _last_ member so it will be // destructed _first_ and thus we won't have any use-after-delete // issues during destruction of 'k_' if it holds any references or diff --git a/eventuals/generator.h b/eventuals/generator.h index 52264e0c3..23ab6c79a 100644 --- a/eventuals/generator.h +++ b/eventuals/generator.h @@ -5,6 +5,7 @@ #include #include +#include "eventuals/memory.h" #include "eventuals/stream.h" #include "eventuals/terminal.h" #include "eventuals/then.h" @@ -65,6 +66,9 @@ struct HeapGenerator final { // Already registered in 'adapted_' void Register(Interrupt&) {} + void Register(stout::borrowed_ptr&& resource) { + } + Callback* begin_; Callback* fail_; Callback* stop_; @@ -175,6 +179,7 @@ struct _Generator final { template using DispatchCallback = Callback&&, Action, std::optional&&, Args&..., @@ -258,6 +263,10 @@ struct _Generator final { k_.Register(interrupt); } + void Register(stout::borrowed_ptr&& resource) { + resource_ = std::move(resource); + } + void Dispatch( Action action, std::optional< @@ -269,6 +278,7 @@ struct _Generator final { std::apply( [&](auto&... args) { dispatch_( + std::move(resource_), action, std::move(exception), args..., @@ -307,13 +317,13 @@ struct _Generator final { Bytes static_heap_size_ = 0; + stout::borrowed_ptr resource_; + // NOTE: we store 'k_' as the _last_ member so it will be // destructed _first_ and thus we won't have any use-after-delete // issues during destruction of 'k_' if it holds any references or // pointers to any (or within any) of the above members. K_ k_; - - Bytes heap_size_ = 0; }; template @@ -413,6 +423,7 @@ struct _Generator final { static_heap_size_ = Bytes(sizeof(HeapGenerator)); dispatch_ = [f = std::move(f)]( + stout::borrowed_ptr&& resource, Action action, std::optional&& exception, Args_&... args, @@ -429,11 +440,8 @@ struct _Generator final { Callback>&& body, Callback&& ended) mutable { if (!e_) { - e_ = std::unique_ptr>( - new HeapGenerator(f(args...)), - [](void* e) { - delete static_cast*>(e); - }); + e_ = MakeUniqueUsingMemoryResourceOrNew< + HeapGenerator>(resource, f(args...)); } auto* e = static_cast*>(e_.get()); diff --git a/eventuals/head.h b/eventuals/head.h index 67dc9e68d..b27f1f68c 100644 --- a/eventuals/head.h +++ b/eventuals/head.h @@ -46,6 +46,10 @@ struct _Head final { k_.Register(interrupt); } + void Register(stout::borrowed_ptr&& resource) { + k_.Register(std::move(resource)); + } + Bytes StaticHeapSize() { return Bytes(0) + k_.StaticHeapSize(); } diff --git a/eventuals/http.h b/eventuals/http.h index 856060e20..b52e5fed3 100644 --- a/eventuals/http.h +++ b/eventuals/http.h @@ -1142,6 +1142,10 @@ struct _HTTP final { handler_->Install(); } + void Register(stout::borrowed_ptr&& resource) { + k_.Register(std::move(resource)); + } + Bytes StaticHeapSize() { return Bytes(0) + k_.HeapSize(); } diff --git a/eventuals/if.h b/eventuals/if.h index 71cbab158..d555e8ac0 100644 --- a/eventuals/if.h +++ b/eventuals/if.h @@ -30,6 +30,8 @@ struct _If final { yes_adapted_->Register(*interrupt_); } + yes_adapted_->Register(std::move(resource_)); + yes_adapted_->Start(); } else { no_adapted_.emplace( @@ -39,6 +41,8 @@ struct _If final { no_adapted_->Register(*interrupt_); } + no_adapted_->Register(std::move(resource_)); + no_adapted_->Start(); } } @@ -58,6 +62,10 @@ struct _If final { k_.Register(interrupt); } + void Register(stout::borrowed_ptr&& resource) { + resource_ = std::move(resource); + } + Bytes StaticHeapSize() { return Bytes(0) + k_.StaticHeapSize(); } @@ -90,6 +98,8 @@ struct _If final { std::optional yes_adapted_; std::optional no_adapted_; + stout::borrowed_ptr resource_; + // NOTE: we store 'k_' as the _last_ member so it will be // destructed _first_ and thus we won't have any use-after-delete // issues during destruction of 'k_' if it holds any references or diff --git a/eventuals/lock.h b/eventuals/lock.h index b88ff4534..b82953aa5 100644 --- a/eventuals/lock.h +++ b/eventuals/lock.h @@ -457,6 +457,10 @@ struct _Acquire final { k_.Register(interrupt); } + void Register(stout::borrowed_ptr&& resource) { + k_.Register(std::move(resource)); + } + Bytes StaticHeapSize() { return Bytes(0) + k_.StaticHeapSize(); } @@ -553,6 +557,10 @@ struct _Release final { k_.Register(interrupt); } + void Register(stout::borrowed_ptr&& resource) { + k_.Register(std::move(resource)); + } + Bytes StaticHeapSize() { return Bytes(0) + k_.StaticHeapSize(); } @@ -785,6 +793,10 @@ struct _Wait final { k_.Register(interrupt); } + void Register(stout::borrowed_ptr&& resource) { + k_.Register(std::move(resource)); + } + Bytes StaticHeapSize() { return Bytes(0) + k_.StaticHeapSize(); } diff --git a/eventuals/loop.h b/eventuals/loop.h index aa0e7b3e9..3b3ce77f9 100644 --- a/eventuals/loop.h +++ b/eventuals/loop.h @@ -165,6 +165,10 @@ struct _Loop final { } } + void Register(stout::borrowed_ptr&& resource) { + k_.Register(std::move(resource)); + } + template void Body(Args&&... args) { if constexpr (IsUndefined::value) { diff --git a/eventuals/map.h b/eventuals/map.h index 3613053a6..3d807e54b 100644 --- a/eventuals/map.h +++ b/eventuals/map.h @@ -32,6 +32,9 @@ struct _Map final { // Already registered K once in 'Map::Register()'. } + void Register(stout::borrowed_ptr&&) { + } + K_& k_; }; @@ -64,6 +67,8 @@ struct _Map final { if (interrupt_ != nullptr) { adapted_->Register(*interrupt_); } + + adapted_->Register(std::move(resource_)); } adapted_->Start(std::forward(args)...); @@ -79,6 +84,10 @@ struct _Map final { k_.Register(interrupt); } + void Register(stout::borrowed_ptr&& resource) { + resource_ = std::move(resource); + } + Bytes StaticHeapSize() { return Bytes(0) + k_.StaticHeapSize(); } @@ -92,6 +101,8 @@ struct _Map final { Interrupt* interrupt_ = nullptr; + stout::borrowed_ptr resource_; + // NOTE: we store 'k_' as the _last_ member so it will be // destructed _first_ and thus we won't have any use-after-delete // issues during destruction of 'k_' if it holds any references or diff --git a/eventuals/memory.h b/eventuals/memory.h new file mode 100644 index 000000000..ab0d4b821 --- /dev/null +++ b/eventuals/memory.h @@ -0,0 +1,48 @@ +#pragma once + +#include +#include +#ifdef __MACH__ +#include +#else +#include +#endif + +#include "eventuals/callback.h" +#include "stout/borrowable.h" + +//////////////////////////////////////////////////////////////////////// + +namespace eventuals { + +//////////////////////////////////////////////////////////////////////// + +template +std::unique_ptr> MakeUniqueUsingMemoryResourceOrNew( + stout::borrowed_ptr& resource, + Args&&... args) { + if (resource) { + size_t size = sizeof(T); + void* pointer = resource->allocate(size); + CHECK_NOTNULL(pointer); + new (pointer) T(std::forward(args)...); + return std::unique_ptr>( + static_cast(pointer), + [resource = resource.reborrow()](void* t) { + static_cast(t)->~T(); + resource->deallocate(t, sizeof(T)); + }); + } else { + return std::unique_ptr>( + new T(std::forward(args)...), + [](void* t) { + delete static_cast(t); + }); + } +} + +//////////////////////////////////////////////////////////////////////// + +} // namespace eventuals + +//////////////////////////////////////////////////////////////////////// diff --git a/eventuals/on-begin.h b/eventuals/on-begin.h index 955ca7351..3afcfaab8 100644 --- a/eventuals/on-begin.h +++ b/eventuals/on-begin.h @@ -63,6 +63,8 @@ struct _OnBegin final { adapted_->Register(*interrupt_); } + adapted_->Register(std::move(resource_)); + adapted_->Start(); } @@ -90,6 +92,10 @@ struct _OnBegin final { k_.Register(interrupt); } + void Register(stout::borrowed_ptr&& resource) { + resource_ = std::move(resource); + } + Bytes StaticHeapSize() { return Bytes(0) + k_.StaticHeapSize(); } @@ -103,6 +109,8 @@ struct _OnBegin final { std::optional adapted_; + stout::borrowed_ptr resource_; + // NOTE: we store 'k_' as the _last_ member so it will be // destructed _first_ and thus we won't have any use-after-delete // issues during destruction of 'k_' if it holds any references or diff --git a/eventuals/on-ended.h b/eventuals/on-ended.h index 0f0ebf27d..26163682f 100644 --- a/eventuals/on-ended.h +++ b/eventuals/on-ended.h @@ -83,6 +83,8 @@ struct _OnEnded final { adapted_->Register(*interrupt_); } + adapted_->Register(std::move(resource_)); + adapted_->Start(); }); } @@ -93,6 +95,10 @@ struct _OnEnded final { k_.Register(interrupt); } + void Register(stout::borrowed_ptr&& resource) { + resource_ = std::move(resource); + } + Bytes StaticHeapSize() { return Bytes(0) + k_.StaticHeapSize(); } @@ -108,6 +114,8 @@ struct _OnEnded final { std::optional adapted_; + stout::borrowed_ptr resource_; + // NOTE: we store 'k_' as the _last_ member so it will be // destructed _first_ and thus we won't have any use-after-delete // issues during destruction of 'k_' if it holds any references or diff --git a/eventuals/raise.h b/eventuals/raise.h index 244edfe00..3000e3237 100644 --- a/eventuals/raise.h +++ b/eventuals/raise.h @@ -37,6 +37,10 @@ struct _Raise final { k_.Register(interrupt); } + void Register(stout::borrowed_ptr&& resource) { + k_.Register(std::move(resource)); + } + Bytes StaticHeapSize() { return Bytes(0) + k_.StaticHeapSize(); } diff --git a/eventuals/range.h b/eventuals/range.h index 196176e91..689f63c04 100644 --- a/eventuals/range.h +++ b/eventuals/range.h @@ -46,6 +46,10 @@ struct _Range final { k_.Register(interrupt); } + void Register(stout::borrowed_ptr&& resource) { + k_.Register(std::move(resource)); + } + void Next() override { if (from_ == to_ || step_ == 0 diff --git a/eventuals/reduce.h b/eventuals/reduce.h index fe40f479d..43132f812 100644 --- a/eventuals/reduce.h +++ b/eventuals/reduce.h @@ -33,6 +33,9 @@ struct _Reduce final { // Already registered K once in 'Reduce::Register()'. } + void Register(stout::borrowed_ptr&& resource) { + } + K_& k_; TypeErasedStream* stream_; }; @@ -71,6 +74,8 @@ struct _Reduce final { if (interrupt_ != nullptr) { adapted_->Register(*interrupt_); } + + adapted_->Register(std::move(resource_)); } adapted_->Start(std::forward(args)...); @@ -85,6 +90,10 @@ struct _Reduce final { k_.Register(interrupt); } + void Register(stout::borrowed_ptr&& resource) { + resource_ = std::move(resource); + } + Bytes StaticHeapSize() { return Bytes(0) + k_.StaticHeapSize(); } @@ -107,6 +116,8 @@ struct _Reduce final { std::optional adapted_; + stout::borrowed_ptr resource_; + // NOTE: we store 'k_' as the _last_ member so it will be // destructed _first_ and thus we won't have any use-after-delete // issues during destruction of 'k_' if it holds any references or diff --git a/eventuals/repeat.h b/eventuals/repeat.h index 69046cc2a..6c6038e61 100644 --- a/eventuals/repeat.h +++ b/eventuals/repeat.h @@ -42,6 +42,10 @@ struct _Repeat final { k_.Register(interrupt); } + void Register(stout::borrowed_ptr&& resource) { + k_.Register(std::move(resource)); + } + void Next() override { previous_->Continue([this]() { k_.Body(); diff --git a/eventuals/scheduler.h b/eventuals/scheduler.h index 4f10ade0b..1e3fd95ed 100644 --- a/eventuals/scheduler.h +++ b/eventuals/scheduler.h @@ -289,6 +289,10 @@ struct _Reschedule final { k_.Register(interrupt); } + void Register(stout::borrowed_ptr&& resource) { + k_.Register(std::move(resource)); + } + Bytes StaticHeapSize() { return Bytes(0) + k_.StaticHeapSize(); } @@ -307,6 +311,8 @@ struct _Reschedule final { TypeErasedStream* stream_ = nullptr; + stout::borrowed_ptr resource_; + // NOTE: we store 'k_' as the _last_ member so it will be // destructed _first_ and thus we won't have any use-after-delete // issues during destruction of 'k_' if it holds any references or @@ -382,6 +388,8 @@ struct Reschedulable final { if (interrupt_ != nullptr) { continuation_->Register(*interrupt_); } + + continuation_->Register(std::move(resource_)); } // NOTE: there is no invariant that 'previous' equals the current @@ -397,6 +405,10 @@ struct Reschedulable final { interrupt_ = &interrupt; } + void Register(stout::borrowed_ptr&& resource) { + resource_ = std::move(resource); + } + Bytes StaticHeapSize() { return Bytes(0) + k_.StaticHeapSize(); } @@ -409,6 +421,8 @@ struct Reschedulable final { std::optional continuation_; + stout::borrowed_ptr resource_; + // NOTE: we store 'k_' as the _last_ member so it will be // destructed _first_ and thus we won't have any use-after-delete // issues during destruction of 'k_' if it holds any references or @@ -463,6 +477,10 @@ struct _Preempt final { interrupt_ = &interrupt; } + void Register(stout::borrowed_ptr&& resource) { + resource_ = std::move(resource); + } + void Adapt() { CHECK(!adapted_); @@ -478,6 +496,8 @@ struct _Preempt final { if (interrupt_ != nullptr) { adapted_->Register(*interrupt_); } + + adapted_->Register(std::move(resource_)); } Bytes StaticHeapSize() { @@ -500,6 +520,8 @@ struct _Preempt final { std::optional adapted_; + stout::borrowed_ptr resource_; + // NOTE: we store 'k_' as the _last_ member so it will be // destructed _first_ and thus we won't have any use-after-delete // issues during destruction of 'k_' if it holds any references or diff --git a/eventuals/static-thread-pool.h b/eventuals/static-thread-pool.h index 05a1f154c..534fbfef3 100644 --- a/eventuals/static-thread-pool.h +++ b/eventuals/static-thread-pool.h @@ -12,6 +12,7 @@ #include "eventuals/compose.h" #include "eventuals/lazy.h" +#include "eventuals/memory.h" #include "eventuals/scheduler.h" #include "eventuals/semaphore.h" #include "stout/borrowed_ptr.h" @@ -164,6 +165,7 @@ struct _StaticThreadPoolSchedule final { : e_(std::move(that.e_)), context_(std::move(that.context_)), engine_(device_()), + resource_(std::move(that.resource_)), k_(std::move(that.k_)) {} ~Continuation() override { @@ -446,30 +448,28 @@ struct _StaticThreadPoolSchedule final { void Register(Interrupt& interrupt) { interrupt_ = &interrupt; - // NOTE: we propagate interrupt registration when we adapt or // when we call 'Fail()' in the cases when we aren't adapting. } + void Register(stout::borrowed_ptr&& resource) { + resource_ = std::move(resource); + } + void Adapt() { if (!adapted_) { // Save previous context (even if it's us). stout::borrowed_ref previous = Scheduler::Context::Get().reborrow(); - adapted_.reset( - // NOTE: for now we're assuming usage of something like - // 'jemalloc' so 'new' should use lock-free and thread-local - // arenas. Ideally allocating memory during runtime should - // actually be *faster* because the memory should have - // better locality for the execution resource being used - // (i.e., a local NUMA node). However, we should reconsider - // this design decision if in practice this performance - // tradeoff is not emperically a benefit. - new Adapted_( - std::move(e_).template k( - Reschedule(std::move(previous)) - .template k(std::move(k_))))); + adapted_ = MakeUniqueUsingMemoryResourceOrNew( + resource_, + std::move(e_).template k( + Reschedule( + std::move(previous)) + .template k(std::move(k_)))); + + adapted_->Register(std::move(resource_)); if (interrupt_ != nullptr) { adapted_->Register(*interrupt_); @@ -516,7 +516,12 @@ struct _StaticThreadPoolSchedule final { std::declval<_Reschedule::Composable>() .template k(std::declval()))); - std::unique_ptr adapted_; + std::unique_ptr< + Adapted_, + Callback> + adapted_{nullptr, [](void*) {}}; + + stout::borrowed_ptr resource_; // NOTE: we store 'k_' as the _last_ member so it will be // destructed _first_ and thus we won't have any use-after-delete diff --git a/eventuals/stream.h b/eventuals/stream.h index 4ec4dd632..7d32886c8 100644 --- a/eventuals/stream.h +++ b/eventuals/stream.h @@ -249,6 +249,10 @@ struct _Stream final { } } + void Register(stout::borrowed_ptr&& resource) { + k_.Register(std::move(resource)); + } + void Next() override { static_assert( !IsUndefined::value, diff --git a/eventuals/take.h b/eventuals/take.h index 5ed928153..a3607a219 100644 --- a/eventuals/take.h +++ b/eventuals/take.h @@ -68,6 +68,10 @@ struct _TakeLast final { k_.Register(interrupt); } + void Register(stout::borrowed_ptr&& resource) { + k_.Register(std::move(resource)); + } + void Next() override { // When Next is called from the next eventual, // the element should be taken from the stored stream. @@ -217,6 +221,10 @@ struct _TakeRange final { k_.Register(interrupt); } + void Register(stout::borrowed_ptr&& resource) { + k_.Register(std::move(resource)); + } + Bytes StaticHeapSize() { return Bytes(0) + k_.StaticHeapSize(); } diff --git a/eventuals/task.h b/eventuals/task.h index bec1631d4..436f87db2 100644 --- a/eventuals/task.h +++ b/eventuals/task.h @@ -8,6 +8,7 @@ #include "eventuals/eventual.h" #include "eventuals/just.h" +#include "eventuals/memory.h" #include "eventuals/raise.h" #include "eventuals/terminal.h" #include "eventuals/type-traits.h" @@ -54,6 +55,9 @@ struct HeapTask final { void Register(Interrupt&) {} + void Register(stout::borrowed_ptr&& resource) { + } + Callback>* start_; Callback* fail_; Callback* stop_; @@ -176,6 +180,7 @@ struct _TaskFromToWith final { template using DispatchCallback = Callback&&, Action, std::optional&&, Args&..., @@ -261,6 +266,10 @@ struct _TaskFromToWith final { k_.Register(interrupt); } + void Register(stout::borrowed_ptr&& resource) { + resource_ = std::move(resource); + } + Bytes StaticHeapSize() { return static_heap_size_ + k_.StaticHeapSize(); } @@ -274,6 +283,7 @@ struct _TaskFromToWith final { std::apply( [&](auto&... args) { std::get<1>(value_or_dispatch_)( + std::move(resource_), action, std::move(exception), args..., @@ -308,12 +318,13 @@ struct _TaskFromToWith final { Bytes static_heap_size_ = 0; + stout::borrowed_ptr resource_; + // NOTE: we store 'k_' as the _last_ member so it will be // destructed _first_ and thus we won't have any use-after-delete // issues during destruction of 'k_' if it holds any references or // pointers to any (or within any) of the above members. K_ k_; - Bytes heap_size_ = 0; }; template < @@ -395,54 +406,53 @@ struct _TaskFromToWith final { static_heap_size_ = Bytes(sizeof(HeapTask)); - value_or_dispatch_ = [f = std::move(f)]( - Action action, - std::optional&& exception, - Args_&... args, - std::optional>&& arg, - std::unique_ptr>& e_, - Interrupt& interrupt, - Callback>&& start, - Callback&& fail, - Callback&& stop) mutable { - if (!e_) { - e_ = std::unique_ptr>( - new HeapTask(f(args...)), - [](void* e) { - delete static_cast*>(e); - }); - } - - auto* e = static_cast*>(e_.get()); - - switch (action) { - case Action::Start: - e->Start( - std::move(arg.value()), - interrupt, - std::move(start), - std::move(fail), - std::move(stop)); - break; - case Action::Fail: - e->Fail( - interrupt, - std::move(exception.value()), - std::move(start), - std::move(fail), - std::move(stop)); - break; - case Action::Stop: - e->Stop( - interrupt, - std::move(start), - std::move(fail), - std::move(stop)); - break; - default: - LOG(FATAL) << "unreachable"; - } - }; + value_or_dispatch_ = + [f = std::move(f)]( + stout::borrowed_ptr&& resource, + Action action, + std::optional&& exception, + Args_&... args, + std::optional>&& arg, + std::unique_ptr>& e_, + Interrupt& interrupt, + Callback>&& start, + Callback&& fail, + Callback&& stop) mutable { + if (!e_) { + e_ = MakeUniqueUsingMemoryResourceOrNew< + HeapTask>(resource, f(args...)); + } + + auto* e = static_cast*>(e_.get()); + + switch (action) { + case Action::Start: + e->Start( + std::move(arg.value()), + interrupt, + std::move(start), + std::move(fail), + std::move(stop)); + break; + case Action::Fail: + e->Fail( + interrupt, + std::move(exception.value()), + std::move(start), + std::move(fail), + std::move(stop)); + break; + case Action::Stop: + e->Stop( + interrupt, + std::move(start), + std::move(fail), + std::move(stop)); + break; + default: + LOG(FATAL) << "unreachable"; + } + }; } Composable( diff --git a/eventuals/terminal.h b/eventuals/terminal.h index c5ab27370..e0e36af98 100644 --- a/eventuals/terminal.h +++ b/eventuals/terminal.h @@ -1,5 +1,7 @@ #pragma once +#include + #include "eventuals/compose.h" #include "eventuals/interrupt.h" #include "eventuals/undefined.h" @@ -58,6 +60,9 @@ struct _Terminal final { void Register(Interrupt&) {} + void Register(stout::borrowed_ptr&& resource) { + } + Bytes StaticHeapSize() { return Bytes(0); } diff --git a/eventuals/then.h b/eventuals/then.h index 65854e045..4745a679c 100644 --- a/eventuals/then.h +++ b/eventuals/then.h @@ -63,6 +63,9 @@ struct _Then final { // Already registered K once in 'Then::Register()'. } + void Register(stout::borrowed_ptr&& resource) { + } + K_& k_; }; @@ -152,6 +155,10 @@ struct _Then::Continuation final { k_.Register(interrupt); } + void Register(stout::borrowed_ptr&& resource) { + k_.Register(std::move(resource)); + } + Bytes StaticHeapSize() { return Bytes(0) + k_.StaticHeapSize(); } @@ -182,6 +189,8 @@ struct _Then::Continuation final { adapted_->Register(*interrupt_); } + adapted_->Register(std::move(resource_)); + adapted_->Start(); } @@ -200,6 +209,10 @@ struct _Then::Continuation final { k_.Register(interrupt); } + void Register(stout::borrowed_ptr&& resource) { + resource_ = std::move(resource); + } + Bytes StaticHeapSize() { return Bytes(0) + k_.StaticHeapSize(); } @@ -218,6 +231,8 @@ struct _Then::Continuation final { std::optional adapted_; + stout::borrowed_ptr resource_; + // NOTE: we store 'k_' as the _last_ member so it will be // destructed _first_ and thus we won't have any use-after-delete // issues during destruction of 'k_' if it holds any references or diff --git a/eventuals/transformer.h b/eventuals/transformer.h index 82e36590a..e73811974 100644 --- a/eventuals/transformer.h +++ b/eventuals/transformer.h @@ -4,6 +4,7 @@ #include #include "eventuals/callback.h" +#include "eventuals/memory.h" #include "eventuals/stream.h" #include "eventuals/terminal.h" #include "stout/bytes.h" @@ -184,11 +185,16 @@ struct _Transformer final { k_.Register(interrupt); } + void Register(stout::borrowed_ptr&& resource) { + resource_ = std::move(resource); + } + void Dispatch( Action action, std::optional&& from = std::nullopt, std::optional&& exception = std::nullopt) { dispatch_( + std::move(resource_), action, std::move(exception), std::move(from), @@ -216,6 +222,7 @@ struct _Transformer final { } Callback&&, Action, std::optional&&, std::optional&&, @@ -233,13 +240,13 @@ struct _Transformer final { Bytes static_heap_size_ = 0; + stout::borrowed_ptr resource_; + // NOTE: we store 'k_' as the _last_ member so it will be // destructed _first_ and thus we won't have any use-after-delete // issues during destruction of 'k_' if it holds any references or // pointers to any (or within any) of the above members. K_ k_; - - Bytes heap_size_ = 0; }; template @@ -310,6 +317,7 @@ struct _Transformer final { static_heap_size_ = Bytes(sizeof(HeapTransformer)); dispatch_ = [f = std::move(f)]( + stout::borrowed_ptr&& resource, Action action, std::optional&& exception, std::optional&& from, @@ -321,11 +329,8 @@ struct _Transformer final { Callback&& stop, Callback&& ended) { if (!e_) { - e_ = std::unique_ptr>( - new HeapTransformer(f()), - [](void* e) { - delete static_cast*>(e); - }); + e_ = MakeUniqueUsingMemoryResourceOrNew< + HeapTransformer>(resource, f()); } auto* e = static_cast*>(e_.get()); @@ -377,6 +382,7 @@ struct _Transformer final { } Callback&&, Action, std::optional&&, std::optional&&, diff --git a/eventuals/until.h b/eventuals/until.h index 618bf29a6..d1e432a65 100644 --- a/eventuals/until.h +++ b/eventuals/until.h @@ -62,6 +62,9 @@ struct _Until final { // Already registered K once in 'Until::Register()'. } + void Register(stout::borrowed_ptr&& resource) { + } + K_& k_; TypeErasedStream& stream_; }; @@ -141,6 +144,10 @@ struct _Until::Continuation final { k_.Register(interrupt); } + void Register(stout::borrowed_ptr&& resource) { + k_.Register(std::move(resource)); + } + template void Body(Args&&... args) { bool done = f_(args...); // NOTE: explicitly not forwarding. @@ -199,6 +206,10 @@ struct _Until::Continuation final { k_.Register(interrupt); } + void Register(stout::borrowed_ptr&& resource) { + resource_ = std::move(resource); + } + template void Body(Args&&... args) { static_assert( @@ -222,6 +233,8 @@ struct _Until::Continuation final { adapted_->Register(*interrupt_); } + adapted_->Register(std::move(resource_)); + adapted_->Start(); } @@ -260,6 +273,8 @@ struct _Until::Continuation final { std::optional adapted_; + stout::borrowed_ptr resource_; + // NOTE: we store 'k_' as the _last_ member so it will be // destructed _first_ and thus we won't have any use-after-delete // issues during destruction of 'k_' if it holds any references or diff --git a/test/BUILD.bazel b/test/BUILD.bazel index 65807a405..0d4c1a987 100644 --- a/test/BUILD.bazel +++ b/test/BUILD.bazel @@ -71,7 +71,6 @@ cc_test( "notification.cc", "on-begin.cc", "on-ended.cc", - "os.cc", "pipe.cc", "poll.cc", "range.cc", diff --git a/test/generator.cc b/test/generator.cc index 0867570b4..c9cf4cf66 100644 --- a/test/generator.cc +++ b/test/generator.cc @@ -578,5 +578,42 @@ TEST(Generator, StaticHeapSize) { EXPECT_GT(k.StaticHeapSize().bytes(), 0); } +TEST(Generator, MonotonicBuffer) { + auto stream = []() -> Generator::Of { + return []() { + return Iterate({1, 2, 3}); + }; + }; + + auto e = [&]() { + return stream() + >> Collect(); + }; + + std::optional> + resource; + + auto [future, k] = PromisifyForTest(e()); + + Bytes static_heap_size = k.StaticHeapSize(); + + EXPECT_GT(static_heap_size, 0); + + char* buffer = new char[static_heap_size.bytes()]; + + resource.emplace( + buffer, + static_heap_size.bytes()); + + k.Register(resource->Borrow()); + + k.Start(); + + EXPECT_EQ(future.get(), (std::vector{1, 2, 3})); + + delete[] buffer; +} + } // namespace } // namespace eventuals::test diff --git a/test/static-thread-pool.cc b/test/static-thread-pool.cc index ae0022bfc..b47290e82 100644 --- a/test/static-thread-pool.cc +++ b/test/static-thread-pool.cc @@ -16,6 +16,8 @@ #include "eventuals/until.h" #include "gmock/gmock.h" #include "gtest/gtest.h" +#include "stout/borrowable.h" +#include "stout/bytes.h" #include "test/promisify-for-test.h" namespace eventuals::test { @@ -48,9 +50,54 @@ TEST(StaticThreadPoolTest, Schedulable) { k.Start(); - EXPECT_TRUE(k.StaticHeapSize().bytes() > 0); + EXPECT_EQ(42, future.get()); +} + + +TEST(StaticThreadPoolTest, MonotonicBuffer) { + struct Foo : public StaticThreadPool::Schedulable { + Foo() + : StaticThreadPool::Schedulable(Pinned::ExactCPU( + std::thread::hardware_concurrency() - 1)) {} + + auto Operation() { + return Schedule( + Then([this]() { + return i; + })) + >> Then([](auto i) { + return i + 1; + }); + } + + int i = 41; + }; + + Foo foo; + + std::optional> + resource; + + auto [future, k] = PromisifyForTest(foo.Operation()); + + Bytes static_heap_size = k.StaticHeapSize(); + + EXPECT_TRUE(static_heap_size > 0); + + char* buffer = new char[static_heap_size.bytes()]; + + resource.emplace( + buffer, + static_heap_size.bytes()); + + k.Register(resource->Borrow()); + + k.Start(); EXPECT_EQ(42, future.get()); + + delete[] buffer; } diff --git a/test/take.cc b/test/take.cc index 8b0193563..cce4f25ee 100644 --- a/test/take.cc +++ b/test/take.cc @@ -174,11 +174,7 @@ TEST(Take, TakeRangeInfiniteStream) { EXPECT_THAT(*s(), ElementsAre(0, 1)); } -<<<<<<< HEAD TEST(Take, StaticHeapSize1) { -======= -TEST(Take, TakeLastStaticHeapSize) { ->>>>>>> Introduce `HeapSize()` std::vector v = {5, 12, 17, 3}; auto s = [&]() { diff --git a/test/task.cc b/test/task.cc index 6d2a5d268..ae66ac5a9 100644 --- a/test/task.cc +++ b/test/task.cc @@ -683,5 +683,37 @@ TEST(Task, StaticHeapSize) { EXPECT_GT(k.StaticHeapSize().bytes(), 0); } +TEST(Task, MonotonicBuffer) { + auto e = []() -> Task::Of { + return [x = 42]() { + return Just(x); + }; + }; + + std::optional> + resource; + + auto [future, k] = PromisifyForTest(e()); + + Bytes static_heap_size = k.StaticHeapSize(); + + EXPECT_GT(static_heap_size, 0); + + char* buffer = new char[static_heap_size.bytes()]; + + resource.emplace( + buffer, + static_heap_size.bytes()); + + k.Register(resource->Borrow()); + + k.Start(); + + EXPECT_EQ(future.get(), 42); + + delete[] buffer; +} + } // namespace } // namespace eventuals::test diff --git a/test/transformer.cc b/test/transformer.cc index 63d94a6db..7d52e7429 100644 --- a/test/transformer.cc +++ b/test/transformer.cc @@ -266,5 +266,49 @@ TEST(Transformer, StaticHeapSize) { EXPECT_GT(k.StaticHeapSize().bytes(), 0); } +TEST(Transformer, MonotonicBuffer) { + auto transformer = []() { + return Transformer::From::To( + []() { + return Map([](int x) { + return std::to_string(x); + }); + }); + }; + + auto e = [&]() { + return Iterate({100}) + >> transformer() + >> Map([](std::string s) { + return s; + }) + >> Collect(); + }; + + std::optional> + resource; + + auto [future, k] = PromisifyForTest(e()); + + Bytes static_heap_size = k.StaticHeapSize(); + + EXPECT_GT(static_heap_size, 0); + + char* buffer = new char[static_heap_size.bytes()]; + + resource.emplace( + buffer, + static_heap_size.bytes()); + + k.Register(resource->Borrow()); + + k.Start(); + + EXPECT_EQ(future.get(), (std::vector{"100"})); + + delete[] buffer; +} + } // namespace } // namespace eventuals::test