Skip to content

Commit

Permalink
ct: Add 'evictor' component
Browse files Browse the repository at this point in the history
The goal of the component is to limit the memory usage.
Current implementation discards all write requests which are above the
memory threshold. Applying backpressure would be better but our goal
here is to provide the simplest possible implementation of this
component.

Signed-off-by: Evgeny Lazin <[email protected]>
  • Loading branch information
Lazin committed Nov 15, 2024
1 parent 7ab5046 commit 5d5437e
Show file tree
Hide file tree
Showing 7 changed files with 524 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/v/cloud_topics/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package(default_visibility = [
"//src/v/cloud_topics/core/tests:__pkg__",
"//src/v/cloud_topics/dl_stm:__pkg__",
"//src/v/cloud_topics/dl_stm/tests:__pkg__",
"//src/v/cloud_topics/evictor:__pkg__",
"//src/v/cloud_topics/evictor/tests:__pkg__",
"//src/v/cloud_topics/reader:__pkg__",
"//src/v/cloud_topics/reader/tests:__pkg__",
"//src/v/cloud_topics/tests:__pkg__",
Expand Down
2 changes: 2 additions & 0 deletions src/v/cloud_topics/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ package(default_visibility = [
"//src/v/cloud_topics/batcher:__pkg__",
"//src/v/cloud_topics/batcher/tests:__pkg__",
"//src/v/cloud_topics/core/tests:__pkg__",
"//src/v/cloud_topics/evictor:__pkg__",
"//src/v/cloud_topics/evictor/tests:__pkg__",
"//src/v/cloud_topics/throttler:__pkg__",
"//src/v/cloud_topics/throttler/tests:__pkg__",
])
Expand Down
29 changes: 29 additions & 0 deletions src/v/cloud_topics/evictor/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
load("//bazel:build.bzl", "redpanda_cc_library")

package(default_visibility = ["//src/v/cloud_topics/evictor/tests:__pkg__"])

redpanda_cc_library(
name = "evictor",
srcs = [
"evictor.cc",
],
hdrs = [
"evictor.h",
],
implementation_deps = [
"//src/v/cloud_io:remote",
"//src/v/cloud_topics:logger",
"//src/v/ssx:sformat",
],
include_prefix = "cloud_topics/evictor",
deps = [
"//src/v/base",
"//src/v/bytes",
"//src/v/bytes:iobuf",
"//src/v/cloud_topics:types",
"//src/v/cloud_topics/core:event_filter",
"//src/v/cloud_topics/core:write_pipeline",
"//src/v/cloud_topics/core:write_request",
"@seastar",
],
)
135 changes: 135 additions & 0 deletions src/v/cloud_topics/evictor/evictor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/

#include "cloud_topics/evictor/evictor.h"

#include "base/unreachable.h"
#include "cloud_topics/core/event_filter.h"
#include "cloud_topics/core/write_pipeline.h"
#include "cloud_topics/core/write_request.h"
#include "cloud_topics/logger.h"

#include <seastar/core/loop.hh>
#include <seastar/coroutine/as_future.hh>

namespace experimental::cloud_topics {

evictor::evictor(size_t mem_limit, core::write_pipeline<>& pipeline)
: _pipeline(pipeline)
, _mem_limit(mem_limit)
, _my_stage(_pipeline.register_pipeline_stage()) {}

ss::future<> evictor::start() {
ssx::spawn_with_gate(_gate, [this] { return bg_evict_pipeline(); });
return ss::now();
}

ss::future<> evictor::stop() {
_as.request_abort();
co_await _gate.close();
}

size_t evictor::throttle_memory(size_t current_bytes_pending) {
if (current_bytes_pending <= _mem_limit) {
// Fast path for the case when we're limited by tput
// and not by memory usage.
return 0;
}
auto overshoot = current_bytes_pending - _mem_limit;
auto list = _pipeline.get_write_requests(overshoot, _my_stage);
std::for_each(
list.ready.begin(), list.ready.end(), [](core::write_request<>& req) {
req.set_value(errc::slow_down);
});
_throttle_by_mem++;
return list.size_bytes;
}

ss::future<result<size_t>> evictor::evict_once(size_t prev_total_size) {
size_t total_bytes = prev_total_size;
core::event_filter<> filter(core::event_type::new_write_request, _my_stage);
auto event = co_await _pipeline.subscribe(filter, _as);
switch (event.type) {
case core::event_type::shutting_down:
co_return errc::shutting_down;
case core::event_type::err_timedout:
case core::event_type::new_read_request:
case core::event_type::none:
unreachable();
case core::event_type::new_write_request:
break;
}

// We got the write_request notification
vassert(
total_bytes <= event.total_write_bytes,
"New total_bytes value {} is smaller than the previous one {}. The "
"value is a counter and shouldn't go back.",
event.total_write_bytes,
total_bytes);

co_return do_evict(prev_total_size, event);
}

size_t evictor::do_evict(size_t prev_total_size, const core::event& event) {
size_t total_bytes = prev_total_size;
auto new_bytes = event.total_write_bytes - total_bytes;

vlog(
cd_log.debug,
"Evictor event: total bytes: {}, pending bytes: {}, new bytes: {}",
event.total_write_bytes,
event.pending_write_bytes,
new_bytes);

total_bytes = event.total_write_bytes;
if (event.pending_write_bytes > _mem_limit) {
size_t removed_bytes = throttle_memory(event.pending_write_bytes);
vlog(
cd_log.info,
"Evictor have removed {} bytes from the write_queue due to memory "
"pressure",
removed_bytes);
if (new_bytes > removed_bytes) {
new_bytes -= removed_bytes;
} else {
new_bytes = 0;
}
}
// Advance all write requests which are not throttled to next stage
_pipeline.process_stage(
[](const core::write_request<>&) noexcept
-> checked<core::get_write_requests_result, errc> {
return outcome::success(core::get_write_requests_result{
.stop_iteration = ss::stop_iteration::no,
.advance_next_stage = true,
});
},
_my_stage);
_total_events++;
return total_bytes;
}

ss::future<> evictor::bg_evict_pipeline() {
auto h = _gate.hold();
size_t total_bytes{0};
while (!_as.abort_requested()) {
auto res = co_await ss::coroutine::as_future(evict_once(total_bytes));
if (res.failed()) {
vlog(
cd_log.error, "Pipeline eviction error: {}", res.get_exception());
} else if (res.get().has_error()) {
vlog(cd_log.error, "Pipeline eviction error: {}", res.get());
} else {
total_bytes = res.get().value();
}
}
}
} // namespace experimental::cloud_topics
63 changes: 63 additions & 0 deletions src/v/cloud_topics/evictor/evictor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/

#pragma once

#include "base/seastarx.h"
#include "cloud_topics/core/write_pipeline.h"
#include "cloud_topics/core/write_request.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/weak_ptr.hh>

namespace experimental::cloud_topics {

struct evictor_accessor;

/// Throttler
///
class evictor {
friend struct evictor_accessor;

public:
// TODO: add config property
explicit evictor(size_t mem_limit, core::write_pipeline<>&);

ss::future<> start();
ss::future<> stop();

private:
ss::future<> bg_evict_pipeline();

ss::future<result<size_t>> evict_once(size_t prev_total_size_bytes);

size_t do_evict(size_t prev_total_size_bytes, const core::event& event);

/// Throttle write request by memory usage.
///
/// Dispose the latest write requests and
/// return number of bytes freed.
size_t throttle_memory(size_t current_bytes_pending);

core::write_pipeline<>& _pipeline;

size_t _mem_limit;

ss::abort_source _as;
ss::gate _gate;
core::pipeline_stage _my_stage;

// Total number of events handled
size_t _total_events{0};

// Number of times the pipeline was throttled by memory
size_t _throttle_by_mem{0};
};
} // namespace experimental::cloud_topics
21 changes: 21 additions & 0 deletions src/v/cloud_topics/evictor/tests/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
load("//bazel:test.bzl", "redpanda_cc_gtest")

redpanda_cc_gtest(
name = "evictor_test",
timeout = "short",
srcs = [
"evictor_test.cc",
],
deps = [
"//src/v/base",
"//src/v/cloud_topics/core:pipeline_stage",
"//src/v/cloud_topics/core:write_pipeline",
"//src/v/cloud_topics/core:write_request",
"//src/v/cloud_topics/evictor",
"//src/v/model",
"//src/v/model/tests:random",
"//src/v/test_utils:gtest",
"@googletest//:gtest",
"@seastar",
],
)
Loading

0 comments on commit 5d5437e

Please sign in to comment.