Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE-8161] storage: add min.cleanable.dirty.ratio, schedule compaction by dirty_ratio #24991

Open
wants to merge 12 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/v/cloud_storage/tests/topic_manifest_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ SEASTAR_THREAD_TEST_CASE(test_topic_manifest_serde_feature_table) {
std::nullopt,
std::nullopt,
std::nullopt,
tristate<double>{},
};

auto random_initial_revision_id
Expand Down
7 changes: 7 additions & 0 deletions src/v/cluster/metadata_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,11 @@ metadata_cache::get_default_delete_retention_ms() const {
return config::shard_local_cfg().tombstone_retention_ms();
}

std::optional<double>
metadata_cache::get_default_min_cleanable_dirty_ratio() const {
return config::shard_local_cfg().min_cleanable_dirty_ratio();
}
WillemKauf marked this conversation as resolved.
Show resolved Hide resolved

topic_properties metadata_cache::get_default_properties() const {
topic_properties tp;
tp.compression = {get_default_compression()};
Expand All @@ -343,6 +348,8 @@ topic_properties metadata_cache::get_default_properties() const {
get_default_retention_local_target_ms()};
tp.delete_retention_ms = tristate<std::chrono::milliseconds>{
get_default_delete_retention_ms()};
tp.min_cleanable_dirty_ratio = tristate<double>{
get_default_min_cleanable_dirty_ratio()};

return tp;
}
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/metadata_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ class metadata_cache {
get_default_record_value_subject_name_strategy() const;
std::optional<std::chrono::milliseconds>
get_default_delete_retention_ms() const;
std::optional<double> get_default_min_cleanable_dirty_ratio() const;

topic_properties get_default_properties() const;
std::optional<partition_assignment>
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/topic_configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ storage::ntp_config topic_configuration::make_ntp_config(
.iceberg_mode = properties.iceberg_mode,
.cloud_topic_enabled = properties.cloud_topic_enabled,
.tombstone_retention_ms = properties.delete_retention_ms,
.min_cleanable_dirty_ratio = properties.min_cleanable_dirty_ratio,
});
}
return {
Expand Down
11 changes: 8 additions & 3 deletions src/v/cluster/topic_properties.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) {
"delete_retention_ms: {}, "
"iceberg_delete: {}, "
"iceberg_partition_spec: {}, "
"iceberg_invalid_record_action: {}",
"iceberg_invalid_record_action: {}, "
"min_cleanable_dirty_ratio: {}",
properties.compression,
properties.cleanup_policy_bitflags,
properties.compaction_strategy,
Expand Down Expand Up @@ -83,7 +84,8 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) {
properties.delete_retention_ms,
properties.iceberg_delete,
properties.iceberg_partition_spec,
properties.iceberg_invalid_record_action);
properties.iceberg_invalid_record_action,
properties.min_cleanable_dirty_ratio);

if (config::shard_local_cfg().development_enable_cloud_topics()) {
fmt::print(
Expand Down Expand Up @@ -130,7 +132,8 @@ bool topic_properties::has_overrides() const {
|| (iceberg_mode != storage::ntp_config::default_iceberg_mode)
|| leaders_preference.has_value() || delete_retention_ms.is_engaged()
|| iceberg_delete.has_value() || iceberg_partition_spec.has_value()
|| iceberg_invalid_record_action.has_value();
|| iceberg_invalid_record_action.has_value()
|| min_cleanable_dirty_ratio.is_engaged();

if (config::shard_local_cfg().development_enable_cloud_topics()) {
return overrides
Expand Down Expand Up @@ -173,6 +176,7 @@ topic_properties::get_ntp_cfg_overrides() const {
ret.iceberg_mode = iceberg_mode;
ret.cloud_topic_enabled = cloud_topic_enabled;
ret.tombstone_retention_ms = delete_retention_ms;
ret.min_cleanable_dirty_ratio = min_cleanable_dirty_ratio;
return ret;
}

Expand Down Expand Up @@ -268,6 +272,7 @@ adl<cluster::topic_properties>::from(iobuf_parser& parser) {
std::nullopt,
std::nullopt,
std::nullopt,
tristate<double>{std::nullopt},
};
}

Expand Down
13 changes: 9 additions & 4 deletions src/v/cluster/topic_properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace cluster {
*/
struct topic_properties
: serde::
envelope<topic_properties, serde::version<11>, serde::compat_version<0>> {
envelope<topic_properties, serde::version<12>, serde::compat_version<0>> {
topic_properties() noexcept = default;
topic_properties(
std::optional<model::compression> compression,
Expand Down Expand Up @@ -79,7 +79,8 @@ struct topic_properties
std::optional<bool> iceberg_delete,
std::optional<ss::sstring> iceberg_partition_spec,
std::optional<model::iceberg_invalid_record_action>
iceberg_invalid_record_action)
iceberg_invalid_record_action,
tristate<double> min_cleanable_dirty_ratio)
: compression(compression)
, cleanup_policy_bitflags(cleanup_policy_bitflags)
, compaction_strategy(compaction_strategy)
Expand Down Expand Up @@ -123,7 +124,8 @@ struct topic_properties
, delete_retention_ms(delete_retention_ms)
, iceberg_delete(iceberg_delete)
, iceberg_partition_spec(std::move(iceberg_partition_spec))
, iceberg_invalid_record_action(iceberg_invalid_record_action) {}
, iceberg_invalid_record_action(iceberg_invalid_record_action)
, min_cleanable_dirty_ratio(min_cleanable_dirty_ratio) {}

std::optional<model::compression> compression;
std::optional<model::cleanup_policy_bitflags> cleanup_policy_bitflags;
Expand Down Expand Up @@ -206,6 +208,8 @@ struct topic_properties
std::optional<model::iceberg_invalid_record_action>
iceberg_invalid_record_action;

tristate<double> min_cleanable_dirty_ratio{std::nullopt};

bool is_compacted() const;
bool has_overrides() const;
bool requires_remote_erase() const;
Expand Down Expand Up @@ -254,7 +258,8 @@ struct topic_properties
delete_retention_ms,
iceberg_delete,
iceberg_partition_spec,
iceberg_invalid_record_action);
iceberg_invalid_record_action,
min_cleanable_dirty_ratio);
}

friend bool operator==(const topic_properties&, const topic_properties&)
Expand Down
4 changes: 4 additions & 0 deletions src/v/cluster/topic_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1107,6 +1107,10 @@ topic_properties topic_table::update_topic_properties(
incremental_update(
updated_properties.iceberg_invalid_record_action,
overrides.iceberg_invalid_record_action);
incremental_update(
updated_properties.min_cleanable_dirty_ratio,
overrides.min_cleanable_dirty_ratio);

return updated_properties;
}

Expand Down
6 changes: 4 additions & 2 deletions src/v/cluster/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ struct property_update<tristate<T>>
struct incremental_topic_updates
: serde::envelope<
incremental_topic_updates,
serde::version<8>,
serde::version<9>,
serde::compat_version<0>> {
static constexpr int8_t version_with_data_policy = -1;
static constexpr int8_t version_with_shadow_indexing = -3;
Expand Down Expand Up @@ -644,6 +644,7 @@ struct incremental_topic_updates
property_update<std::optional<ss::sstring>> iceberg_partition_spec;
property_update<std::optional<model::iceberg_invalid_record_action>>
iceberg_invalid_record_action;
property_update<tristate<double>> min_cleanable_dirty_ratio;

// To allow us to better control use of the deprecated shadow_indexing
// field, use getters and setters instead.
Expand Down Expand Up @@ -685,7 +686,8 @@ struct incremental_topic_updates
delete_retention_ms,
iceberg_delete,
iceberg_partition_spec,
iceberg_invalid_record_action);
iceberg_invalid_record_action,
min_cleanable_dirty_ratio);
}

friend std::ostream&
Expand Down
5 changes: 5 additions & 0 deletions src/v/compat/cluster_compat.h
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ struct compat_check<cluster::topic_properties> {
wr,
"iceberg_invalid_record_action",
obj.iceberg_invalid_record_action);
json_write(min_cleanable_dirty_ratio);
}

static cluster::topic_properties from_json(json::Value& rd) {
Expand Down Expand Up @@ -399,6 +400,7 @@ struct compat_check<cluster::topic_properties> {
json_read(flush_bytes);
json_read(remote_topic_namespace_override);
json_read(iceberg_invalid_record_action);
json_read(min_cleanable_dirty_ratio);
return obj;
}

Expand Down Expand Up @@ -431,6 +433,7 @@ struct compat_check<cluster::topic_properties> {
obj.flush_ms = std::nullopt;
obj.remote_topic_namespace_override = std::nullopt;
obj.iceberg_invalid_record_action = std::nullopt;
obj.min_cleanable_dirty_ratio = tristate<double>{std::nullopt};

if (reply != obj) {
throw compat_error(fmt::format(
Expand Down Expand Up @@ -517,6 +520,8 @@ struct compat_check<cluster::topic_configuration> {
obj.properties.mpx_virtual_cluster_id = std::nullopt;

obj.properties.iceberg_invalid_record_action = std::nullopt;
obj.properties.min_cleanable_dirty_ratio = tristate<double>{
std::nullopt};

// ADL will always squash is_migrated to false
obj.is_migrated = false;
Expand Down
4 changes: 3 additions & 1 deletion src/v/compat/cluster_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,9 @@ struct instance_generator<cluster::topic_properties> {
return random_generators::random_choice(
{model::iceberg_invalid_record_action::drop,
model::iceberg_invalid_record_action::dlq_table});
})};
}),
tristate<double>{disable_tristate},
};
}

static std::vector<cluster::topic_properties> limits() { return {}; }
Expand Down
2 changes: 2 additions & 0 deletions src/v/compat/cluster_json.h
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@ inline void rjson_serialize(
write_member(w, "delete_retention_ms", tps.delete_retention_ms);
write_exceptional_member_type(
w, "iceberg_invalid_record_action", tps.iceberg_invalid_record_action);
write_member(w, "min_cleanable_dirty_ratio", tps.min_cleanable_dirty_ratio);
w.EndObject();
}

Expand Down Expand Up @@ -704,6 +705,7 @@ inline void read_value(const json::Value& rd, cluster::topic_properties& obj) {
read_member(rd, "delete_retention_ms", obj.delete_retention_ms);
read_member(
rd, "iceberg_invalid_record_action", obj.iceberg_invalid_record_action);
read_member(rd, "min_cleanable_dirty_ratio", obj.min_cleanable_dirty_ratio);
}

inline void rjson_serialize(
Expand Down
4 changes: 4 additions & 0 deletions src/v/compat/json.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ inline const char* to_str(const rapidjson::Type t) {
return str[t];
}

inline void read_value(const json::Value& v, double& target) {
target = v.GetDouble();
}

inline void read_value(const json::Value& v, int64_t& target) {
target = v.GetInt64();
}
Expand Down
13 changes: 13 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,19 @@ configuration::configuration()
{.needs_restart = needs_restart::no, .visibility = visibility::user},
std::nullopt,
validate_tombstone_retention_ms)
, min_cleanable_dirty_ratio(
*this,
"min_cleanable_dirty_ratio",
"The minimum ratio between the number of bytes in \"dirty\" segments and "
"the total number of bytes in closed segments that must be reached "
"before a partition's log is eligible for compaction in a compact topic. "
"The topic property `min.cleanable.dirty.ratio` overrides the value of "
"`min_cleanable_dirty_ratio` at the topic level.",
{.needs_restart = needs_restart::no,
.example = "0.5",
.visibility = visibility::user},
std::nullopt,
{.min = 0.0, .max = 1.0})
, log_disable_housekeeping_for_tests(
*this,
"log_disable_housekeeping_for_tests",
Expand Down
2 changes: 2 additions & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ struct configuration final : public config_store {
property<std::chrono::milliseconds> log_compaction_interval_ms;
// same as delete.retention.ms in kafka
property<std::optional<std::chrono::milliseconds>> tombstone_retention_ms;
bounded_property<std::optional<double>, numeric_bounds>
min_cleanable_dirty_ratio;
property<bool> log_disable_housekeeping_for_tests;
property<bool> log_compaction_use_sliding_window;
// same as retention.size in kafka - TODO: size not implemented
Expand Down
10 changes: 9 additions & 1 deletion src/v/kafka/server/handlers/alter_configs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ create_topic_properties_update(
std::apply(apply_op(op_t::none), update.custom_properties.serde_fields());

static_assert(
std::tuple_size_v<decltype(update.properties.serde_fields())> == 34,
std::tuple_size_v<decltype(update.properties.serde_fields())> == 35,
"If you added a property, please decide on it's default alter config "
"policy, and handle the update in the loop below");
static_assert(
Expand Down Expand Up @@ -385,6 +385,14 @@ create_topic_properties_update(
kafka::config_resource_operation::set);
continue;
}
if (cfg.name == topic_property_min_cleanable_dirty_ratio) {
parse_and_set_tristate(
update.properties.min_cleanable_dirty_ratio,
cfg.value,
kafka::config_resource_operation::set,
min_cleanable_dirty_ratio_validator{});
continue;
}
} catch (const validation_error& e) {
return make_error_alter_config_resource_response<
alter_configs_resource_response>(
Expand Down
12 changes: 12 additions & 0 deletions src/v/kafka/server/handlers/configs/config_response_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,18 @@ config_response_container_t make_topic_configs(
&describe_as_string<model::iceberg_invalid_record_action>);
}

add_topic_config_if_requested(
config_keys,
result,
topic_property_min_cleanable_dirty_ratio,
metadata_cache.get_default_min_cleanable_dirty_ratio(),
topic_property_min_cleanable_dirty_ratio,
topic_properties.min_cleanable_dirty_ratio,
include_synonyms,
maybe_make_documentation(
include_documentation,
config::shard_local_cfg().min_cleanable_dirty_ratio.desc()));

return result;
}

Expand Down
24 changes: 23 additions & 1 deletion src/v/kafka/server/handlers/configs/config_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

#include <algorithm>
#include <optional>
#include <type_traits>

namespace kafka {
template<typename T>
Expand Down Expand Up @@ -519,6 +520,25 @@ struct iceberg_partition_spec_validator {
}
};

struct min_cleanable_dirty_ratio_validator {
std::optional<ss::sstring>
operator()(const ss::sstring&, const tristate<double>& value) {
double min = 0.0;
double max = 1.0;
if (value.has_optional_value()) {
if (value.value() < min || value.value() > max) {
return fmt::format(
"min.cleanable.dirty.ratio {} is outside of allowed range "
"[{}, {}]",
value.value(),
min,
max);
}
}
return std::nullopt;
}
};

template<typename T, typename... ValidatorTypes>
requires requires(
model::topic_namespace_view tns,
Expand Down Expand Up @@ -775,7 +795,9 @@ void parse_and_set_tristate(
}
// set property value
if (op == config_resource_operation::set) {
auto parsed = boost::lexical_cast<int64_t>(*value);
using config_t
= std::conditional_t<std::is_floating_point_v<T>, T, int64_t>;
auto parsed = boost::lexical_cast<config_t>(*value);
if (parsed <= 0) {
property.value = tristate<T>{};
} else {
Expand Down
3 changes: 2 additions & 1 deletion src/v/kafka/server/handlers/create_topics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ bool is_supported(std::string_view name) {
topic_property_delete_retention_ms,
topic_property_iceberg_delete,
topic_property_iceberg_partition_spec,
topic_property_iceberg_invalid_record_action});
topic_property_iceberg_invalid_record_action,
topic_property_min_cleanable_dirty_ratio});

if (std::any_of(
supported_configs.begin(),
Expand Down
9 changes: 8 additions & 1 deletion src/v/kafka/server/handlers/incremental_alter_configs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,14 @@ create_topic_properties_update(
op);
continue;
}

if (cfg.name == topic_property_min_cleanable_dirty_ratio) {
parse_and_set_tristate(
update.properties.min_cleanable_dirty_ratio,
cfg.value,
op,
min_cleanable_dirty_ratio_validator{});
continue;
}
} catch (const validation_error& e) {
vlog(
klog.debug,
Expand Down
Loading