Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ct: Add write_pipeline and throttler components #23919

Merged
merged 4 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/v/cloud_topics/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ load("//bazel:build.bzl", "redpanda_cc_library")
package(default_visibility = [
"//src/v/cloud_topics/batcher:__pkg__",
"//src/v/cloud_topics/batcher/tests:__pkg__",
"//src/v/cloud_topics/core:__pkg__",
"//src/v/cloud_topics/core/tests:__pkg__",
"//src/v/cloud_topics/dl_stm:__pkg__",
"//src/v/cloud_topics/dl_stm/tests:__pkg__",
"//src/v/cloud_topics/reader:__pkg__",
"//src/v/cloud_topics/reader/tests:__pkg__",
"//src/v/cloud_topics/tests:__pkg__",
"//src/v/cloud_topics/throttler:__pkg__",
"//src/v/cloud_topics/throttler/tests:__pkg__",
])

redpanda_cc_library(
Expand Down
52 changes: 7 additions & 45 deletions src/v/cloud_topics/batcher/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,6 @@ load("//bazel:build.bzl", "redpanda_cc_library")

package(default_visibility = ["//src/v/cloud_topics/batcher/tests:__pkg__"])

redpanda_cc_library(
name = "serializer",
srcs = [
"serializer.cc",
],
hdrs = [
"serializer.h",
],
implementation_deps = [
"//src/v/storage:record_batch_utils",
],
include_prefix = "cloud_topics/batcher",
deps = [
"//src/v/base",
"//src/v/bytes:iobuf",
"//src/v/container:fragmented_vector",
"//src/v/model",
],
)

redpanda_cc_library(
name = "write_request",
srcs = [
"write_request.cc",
],
hdrs = [
"write_request.h",
],
implementation_deps = [
"//src/v/cloud_topics:logger",
],
include_prefix = "cloud_topics/batcher",
deps = [
":serializer",
"//src/v/base",
"//src/v/cloud_topics:types",
"//src/v/model",
"@seastar",
],
)

redpanda_cc_library(
name = "aggregator",
srcs = [
Expand All @@ -52,16 +11,16 @@ redpanda_cc_library(
"aggregator.h",
],
implementation_deps = [
":serializer",
"//src/v/cloud_topics:logger",
"//src/v/cloud_topics:placeholder",
"//src/v/cloud_topics/core:serializer",
"//src/v/storage:record_batch_builder",
],
include_prefix = "cloud_topics/batcher",
deps = [
":write_request",
"//src/v/base",
"//src/v/cloud_topics:types",
"//src/v/cloud_topics/core:write_request",
"//src/v/container:fragmented_vector",
"//src/v/model",
"@abseil-cpp//absl/container:btree",
Expand All @@ -81,7 +40,7 @@ redpanda_cc_library(
"//src/v/cloud_io:remote",
"//src/v/cloud_topics:logger",
"//src/v/cloud_topics/batcher:aggregator",
"//src/v/cloud_topics/batcher:serializer",
"//src/v/cloud_topics/core:serializer",
"//src/v/ssx:sformat",
"//src/v/utils:human",
],
Expand All @@ -91,7 +50,10 @@ redpanda_cc_library(
"//src/v/bytes",
"//src/v/bytes:iobuf",
"//src/v/cloud_topics:types",
"//src/v/cloud_topics/batcher:write_request",
"//src/v/cloud_topics/core:event_filter",
"//src/v/cloud_topics/core:pipeline_stage",
"//src/v/cloud_topics/core:write_pipeline",
"//src/v/cloud_topics/core:write_request",
"//src/v/config",
"//src/v/model",
"//src/v/utils:retry_chain_node",
Expand Down
17 changes: 9 additions & 8 deletions src/v/cloud_topics/batcher/aggregator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@

#include "cloud_topics/batcher/aggregator.h"

#include "cloud_topics/batcher/serializer.h"
#include "cloud_topics/batcher/write_request.h"
#include "cloud_topics/core/serializer.h"
#include "cloud_topics/core/write_request.h"
#include "cloud_topics/dl_placeholder.h"
#include "storage/record_batch_builder.h"

#include <seastar/core/future.hh>
#include <seastar/util/defer.hh>

namespace experimental::cloud_topics::details {
namespace experimental::cloud_topics {

template<class Clock>
aggregator<Clock>::aggregator(object_id id)
Expand Down Expand Up @@ -53,8 +53,8 @@ namespace {
template<class Clock>
void make_dl_placeholder_batches(
prepared_placeholder_batches<Clock>& ctx,
write_request<Clock>& req,
const serialized_chunk& chunk) {
core::write_request<Clock>& req,
const core::serialized_chunk& chunk) {
auto result = std::make_unique<batches_for_req<Clock>>();
for (const auto& b : chunk.batches) {
dl_placeholder placeholder{
Expand Down Expand Up @@ -164,10 +164,11 @@ void aggregator<Clock>::ack_error(errc e) {
}

template<class Clock>
void aggregator<Clock>::add(write_request<Clock>& req) {
void aggregator<Clock>::add(core::write_request<Clock>& req) {
auto it = _staging.find(req.ntp);
if (it == _staging.end()) {
it = _staging.emplace_hint(it, req.ntp, write_request_list<Clock>());
it = _staging.emplace_hint(
it, req.ntp, core::write_request_list<Clock>());
}
req._hook.unlink();
it->second.push_back(req);
Expand All @@ -181,4 +182,4 @@ size_t aggregator<Clock>::size_bytes() const noexcept {

template class aggregator<ss::lowres_clock>;
template class aggregator<ss::manual_clock>;
} // namespace experimental::cloud_topics::details
} // namespace experimental::cloud_topics
13 changes: 7 additions & 6 deletions src/v/cloud_topics/batcher/aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#pragma once

#include "base/seastarx.h"
#include "cloud_topics/batcher/write_request.h"
#include "cloud_topics/core/write_request.h"
#include "cloud_topics/errc.h"
#include "cloud_topics/types.h"
#include "container/fragmented_vector.h"
Expand All @@ -22,7 +22,7 @@

#include <absl/container/btree_map.h>

namespace experimental::cloud_topics::details {
namespace experimental::cloud_topics {

/// List of placeholder batches that has to be propagated
/// to the particular write request.
Expand All @@ -31,7 +31,7 @@ struct batches_for_req {
/// Generated placeholder batches
ss::circular_buffer<model::record_batch> placeholders;
/// Source write request
ss::weak_ptr<write_request<Clock>> ref;
ss::weak_ptr<core::write_request<Clock>> ref;
};

// This component aggregates a bunch of write
Expand All @@ -53,7 +53,7 @@ class aggregator {
/// included into L0 object. The size value returned by
/// the 'size_bytes' call will not match the actual size
/// of the object.
void add(write_request<Clock>& req);
void add(core::write_request<Clock>& req);

/// Estimate L0 object size
size_t size_bytes() const noexcept;
Expand All @@ -77,11 +77,12 @@ class aggregator {
iobuf get_stream();

object_id _id;

/// Source data for the aggregator
absl::btree_map<model::ntp, write_request_list<Clock>> _staging;
absl::btree_map<model::ntp, core::write_request_list<Clock>> _staging;
/// Prepared placeholders
chunked_vector<std::unique_ptr<batches_for_req<Clock>>> _aggregated;
size_t _size_bytes{0};
};

} // namespace experimental::cloud_topics::details
} // namespace experimental::cloud_topics
Loading