Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ct: Add write_pipeline and throttler components #23919

Merged
merged 4 commits into from
Dec 2, 2024

Conversation

Lazin
Copy link
Contributor

@Lazin Lazin commented Oct 25, 2024

Currently, the batcher is unnecessary complicated component. It's responsible for storing write_request instances, performing housekeeping (expiring), and uploading.

I wanted to add throttling to the batcher to avoid overloading Redpanda and keep memory used by in-flight write_request instances in check. With current architecture all this functionality should be added to the batcher itself because it maintains the list of write requests. Other aspects like caching and load balancing could also be added to the batcher complicating it even further.

To avoid this I refactored the batcher and split it into two components, the batcher and the write_pipeline. The write_pipeline is accepting and maintaining write_request instances. It can expire old write requests and can return size limited set of in-flight write requests in both FIFO and LIFO orders. It has an interface that allows the caller to subscribe to certain events (new stuff is added to the pipeline). The event_filter component implements the subscription mechanism.

The batcher component now uses the write_pipeline to upload the data. It subscribes to events and detects when there is enough data to start L0 upload (or enough time have passed). So the functionality of the batcher is not limited to two things: interacting with the write_pipeline and uploading the data.

Previously, we could just produce data to the batcher directly:

auto placeholders = co_await batcher.write_and_debounce(std::move(reader), timeout);

With this reorg we will have to connect batcher to the pipeline and then produce data to the pipeline:

core::write_pipeline pipeline;
batcher batcher(pipeline, bucket, mock);
...
auto placeholders = co_await pipeline.write_and_debounce(std::move(reader), timeout);

This PR also adds throttler component. The throttler is similar to batcher. It subscribes to write_pipeline and inspects new write requests. If the throughput limit is exceeded it moves newest write requests out of the pipeline for some period of time and returns them back (or discards them if they are discarded). If the new write request exceeds the memory limit it discards the write request and acknowledges the client using the slow_down error code.

Similar modular approach could also be used for the read path. The main goal of this is to make the write path as modular as possible so it could be developed and tested in parallel. Things like in-memory caching and load balancing should also be added as separate services on top of the write_pipeline.

This is a work in progress. Some unit-tests in the throttler are missing.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v24.2.x
  • v24.1.x
  • v23.3.x

Release Notes

  • none

@Lazin Lazin requested review from dotnwat and nvartolomei October 25, 2024 14:57
@Lazin Lazin force-pushed the ct/write-pipeline-and-throttler branch 2 times, most recently from 63b55b2 to c1dfc92 Compare November 4, 2024 13:45
@vbotbuildovich
Copy link
Collaborator

vbotbuildovich commented Nov 4, 2024

the below tests from https://buildkite.com/redpanda/redpanda/builds/57534#0192f76d-ffbe-4623-9477-06fa42b73564 have failed and will be retried

catalog_schema_manager_rpunit

the below tests from https://buildkite.com/redpanda/redpanda/builds/57686#0193024f-650b-4cb2-b30f-2ab59bb2c8db have failed and will be retried

datalake_cloud_rpunit

the below tests from https://buildkite.com/redpanda/redpanda/builds/57686#0193024f-650d-4001-83e9-acb094714ff1 have failed and will be retried

catalog_schema_manager_rpunit

the below tests from https://buildkite.com/redpanda/redpanda/builds/57699#019302e1-9e10-4352-973c-266bf0dd7fa0 have failed and will be retried

catalog_schema_manager_rpunit

the below tests from https://buildkite.com/redpanda/redpanda/builds/57699#019302e1-9e10-4352-973c-266bf0dd7fa0 have failed and will be retried

translator_test_rpfixture

the below tests from https://buildkite.com/redpanda/redpanda/builds/57929#01931cd6-73c3-4f93-a187-c32b055aa756 have failed and will be retried

gtest_raft_rpunit

the below tests from https://buildkite.com/redpanda/redpanda/builds/58020#01932656-695f-49f9-a0f5-d974072856e3 have failed and will be retried

gtest_raft_rpunit

the below tests from https://buildkite.com/redpanda/redpanda/builds/58020#01932656-6960-42ec-9dec-d34bc2f76b53 have failed and will be retried

gtest_storage_e2e_rpfixture

@Lazin Lazin force-pushed the ct/write-pipeline-and-throttler branch 5 times, most recently from a284e77 to cb27f93 Compare November 6, 2024 19:08
@Lazin Lazin enabled auto-merge November 7, 2024 08:52
Copy link
Contributor

@nvartolomei nvartolomei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👀

@@ -0,0 +1,8 @@
The `core` contains utilities shared by read path, write path and resource balancer.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if they are part of public api maybe they should be part of the top-level package? What meaning does core bear?

Name your packages after what they provide, not what they contain.
https://dave.cheney.net/2019/01/08/avoid-package-names-like-base-util-or-common

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not supposed to be a part of the public api, also with bazel it feels much nicer to have a lot of fine grained libs inside the sub-project dir. It encourages to do this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also with bazel it feels much nicer to have a lot of fine grained libs inside the sub-project dir. It encourages to do this

then why not do this insetad of slapping {write,read}_request, {write,read}_pipeline in a generically named package called "core"? :)

It's not supposed to be a part of the public api

by public api i mean whether these classes will be constructed from outside of cloud_topics package (i assumed batcher will be and hence pipelines too)

src/v/cloud_topics/core/write_request.cc Show resolved Hide resolved
src/v/cloud_topics/core/write_request.cc Show resolved Hide resolved
src/v/cloud_topics/core/event_filter.h Outdated Show resolved Hide resolved
src/v/cloud_topics/core/event_filter.h Outdated Show resolved Hide resolved
src/v/cloud_topics/core/pipeline_stage.h Outdated Show resolved Hide resolved
src/v/cloud_topics/core/BUILD Show resolved Hide resolved
src/v/cloud_topics/core/event_filter.h Show resolved Hide resolved
src/v/cloud_topics/throttler/throttler.cc Show resolved Hide resolved
@Lazin Lazin force-pushed the ct/write-pipeline-and-throttler branch from cb27f93 to c7a0557 Compare November 11, 2024 20:06
@Lazin Lazin requested a review from nvartolomei November 11, 2024 20:07
@Lazin Lazin force-pushed the ct/write-pipeline-and-throttler branch 2 times, most recently from 28143a7 to bdfa11d Compare November 12, 2024 14:55
@Lazin Lazin mentioned this pull request Nov 12, 2024
7 tasks
This commit adds new nested directory ('core') which will contain all
code used by different subsystems in cloud topics (read path, write
path, resource management, etc).

The write_request and serializer are moved into the nested directory.
The write request will be generalized and used not only by the batcher
but also by the resource management subsystem and subsystem. The logic
in the batcher that manages in-flight write_request instances will be
generalized and used for other things and not only batcher.

This is needed to avoid code duplication between the susbsystems. For
instance, if the load balancing and throttling components are placed in
front of the batcher they will have to introduce their own
'write_request' analogs to manage in-flight requests.

Something similar has to be implemented for the read path but I don't
want to generalize write_request (and make it just request) to avoid
code bloat that will come with this. The goal of this change is to build
a scaffolding for the architecture without introducing too much new.

Signed-off-by: Evgeny Lazin <[email protected]>
@Lazin Lazin force-pushed the ct/write-pipeline-and-throttler branch from bdfa11d to 9a06509 Compare November 13, 2024 16:19
@vbotbuildovich
Copy link
Collaborator

non flaky failures in https://buildkite.com/redpanda/redpanda/builds/58020#019326af-93ad-49d8-816e-ae80d6f1e04c:

"rptest.tests.archive_retention_test.CloudArchiveRetentionTest.test_delete.cloud_storage_type=CloudStorageType.ABS.retention_type=retention.ms"

@vbotbuildovich
Copy link
Collaborator

Retry command for Build#58020

please wait until all jobs are finished before running the slash command

/ci-repeat 1
tests/rptest/tests/archive_retention_test.py::CloudArchiveRetentionTest.test_delete@{"cloud_storage_type":2,"retention_type":"retention.ms"}

@Lazin
Copy link
Contributor Author

Lazin commented Nov 13, 2024

/ci-repeat 1
tests/rptest/tests/archive_retention_test.py::CloudArchiveRetentionTest.test_delete@{"cloud_storage_type":2,"retention_type":"retention.ms"}

@Lazin Lazin force-pushed the ct/write-pipeline-and-throttler branch from 9a06509 to 5d5437e Compare November 15, 2024 14:28
@Lazin
Copy link
Contributor Author

Lazin commented Nov 15, 2024

The throttler component was divided into two. The throttler now handles only throughput. The evictor removes write requests if memory limit was reached.

@Lazin Lazin force-pushed the ct/write-pipeline-and-throttler branch 2 times, most recently from fe8bd13 to 98b39c7 Compare November 16, 2024 18:49
@Lazin Lazin force-pushed the ct/write-pipeline-and-throttler branch from 98b39c7 to 5cdf2e4 Compare November 27, 2024 00:13
@Lazin
Copy link
Contributor Author

Lazin commented Nov 28, 2024

Changed the code a bit. Removed the evictor and replaced it by the semaphore inside the pipeline. Every write request has to acquire the units from this semaphore first.

src/v/cloud_topics/core/event_filter.h Show resolved Hide resolved
src/v/cloud_topics/core/write_pipeline.h Outdated Show resolved Hide resolved
src/v/cloud_topics/core/write_pipeline.h Show resolved Hide resolved
struct write_pipeline_accessor;

template<class Clock = ss::lowres_clock>
class write_pipeline {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing stop() method and a vassert(_gate.is_closed(), ...) in destructor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's in the next PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is "next PR"?

src/v/cloud_topics/core/write_pipeline.cc Show resolved Hide resolved
src/v/cloud_topics/throttler/throttler.cc Outdated Show resolved Hide resolved
Add 'event_filter' class
The filter can be used to subscribe to certain events in the pipeline.

Signed-off-by: Evgeny Lazin <[email protected]>
Previously, the batcher was responsible for both housekeeping and
uploading. The housekeeping includes write_request creation and
accounting (including timeout handling).

The housekeeing is now moved to the write_pipeline. The goal is to be
able to extend the write path without adding new things to the batcher
itself. For instance, the throttler can now be impelemented
independently from the batcher. It could just consume the write_pipeline
as a dependency.

Signed-off-by: Evgeny Lazin <[email protected]>
The component implements L0 upload throttling. It monitors the
write_pipeline and acquires units from its internal token bucket when
new write requests are added. If there are not enough units the
throttler removes some write requests from the pipeline and stores them
internally until the units are available. When this happens it returns
write requests back to the pipeline.

It also monitors memory consumed by all in-flight write requests. If too
much memory is consumed it removes write requests from the pipeline and
forcibly closes them (the kafka layer is getting error ack).

Signed-off-by: Evgeny Lazin <[email protected]>
@Lazin Lazin force-pushed the ct/write-pipeline-and-throttler branch from 5cdf2e4 to afb40e8 Compare November 30, 2024 01:12
@Lazin Lazin merged commit 38801dd into redpanda-data:dev Dec 2, 2024
16 checks passed
@Lazin Lazin mentioned this pull request Dec 12, 2024
7 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants