From 2b87b3702c1b90d4997f1e0d2cf593c11fc668fa Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Thu, 30 Jan 2025 22:26:29 -0500 Subject: [PATCH 01/12] `config`: add `min_cleanable_dirty_ratio` to `configuration` --- src/v/config/configuration.cc | 13 +++++++++++++ src/v/config/configuration.h | 2 ++ 2 files changed, 15 insertions(+) diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 176c901f8e2d..87507eaed732 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -993,6 +993,19 @@ configuration::configuration() {.needs_restart = needs_restart::no, .visibility = visibility::user}, std::nullopt, validate_tombstone_retention_ms) + , min_cleanable_dirty_ratio( + *this, + "min_cleanable_dirty_ratio", + "The minimum ratio between the number of bytes in \"dirty\" segments and " + "the total number of bytes in closed segments that must be reached " + "before a partition's log is eligible for compaction in a compact topic. " + "The topic property `min.cleanable.dirty.ratio` overrides the value of " + "`min_cleanable_dirty_ratio` at the topic level.", + {.needs_restart = needs_restart::no, + .example = "0.5", + .visibility = visibility::user}, + std::nullopt, + {.min = 0.0, .max = 1.0}) , log_disable_housekeeping_for_tests( *this, "log_disable_housekeeping_for_tests", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index 69f96ea32e26..d9ec95a46fd9 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -252,6 +252,8 @@ struct configuration final : public config_store { property log_compaction_interval_ms; // same as delete.retention.ms in kafka property> tombstone_retention_ms; + bounded_property, numeric_bounds> + min_cleanable_dirty_ratio; property log_disable_housekeeping_for_tests; property log_compaction_use_sliding_window; // same as retention.size in kafka - TODO: size not implemented From 09d85fe87b359081dc9ae88747e15b8357b2c037 Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Thu, 30 Jan 2025 22:12:57 -0500 Subject: [PATCH 02/12] `cluster`: add `min_cleanable_dirty_ratio` to `topic_properties` --- src/v/cloud_storage/tests/topic_manifest_test.cc | 1 + src/v/cluster/topic_properties.cc | 10 +++++++--- src/v/cluster/topic_properties.h | 13 +++++++++---- src/v/compat/cluster_compat.h | 5 +++++ src/v/compat/cluster_generator.h | 4 +++- src/v/compat/cluster_json.h | 2 ++ src/v/compat/json.h | 4 ++++ tools/offline_log_viewer/controller.py | 4 ++++ tools/offline_log_viewer/reader.py | 3 +++ 9 files changed, 38 insertions(+), 8 deletions(-) diff --git a/src/v/cloud_storage/tests/topic_manifest_test.cc b/src/v/cloud_storage/tests/topic_manifest_test.cc index af951e095af9..bcf7ec9985bd 100644 --- a/src/v/cloud_storage/tests/topic_manifest_test.cc +++ b/src/v/cloud_storage/tests/topic_manifest_test.cc @@ -486,6 +486,7 @@ SEASTAR_THREAD_TEST_CASE(test_topic_manifest_serde_feature_table) { std::nullopt, std::nullopt, std::nullopt, + tristate{}, }; auto random_initial_revision_id diff --git a/src/v/cluster/topic_properties.cc b/src/v/cluster/topic_properties.cc index 3684baab2929..fad6f9ad4d7c 100644 --- a/src/v/cluster/topic_properties.cc +++ b/src/v/cluster/topic_properties.cc @@ -44,7 +44,8 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) { "delete_retention_ms: {}, " "iceberg_delete: {}, " "iceberg_partition_spec: {}, " - "iceberg_invalid_record_action: {}", + "iceberg_invalid_record_action: {}, " + "min_cleanable_dirty_ratio: {}", properties.compression, properties.cleanup_policy_bitflags, properties.compaction_strategy, @@ -83,7 +84,8 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) { properties.delete_retention_ms, properties.iceberg_delete, properties.iceberg_partition_spec, - properties.iceberg_invalid_record_action); + properties.iceberg_invalid_record_action, + properties.min_cleanable_dirty_ratio); if (config::shard_local_cfg().development_enable_cloud_topics()) { fmt::print( @@ -130,7 +132,8 @@ bool topic_properties::has_overrides() const { || (iceberg_mode != storage::ntp_config::default_iceberg_mode) || leaders_preference.has_value() || delete_retention_ms.is_engaged() || iceberg_delete.has_value() || iceberg_partition_spec.has_value() - || iceberg_invalid_record_action.has_value(); + || iceberg_invalid_record_action.has_value() + || min_cleanable_dirty_ratio.is_engaged(); if (config::shard_local_cfg().development_enable_cloud_topics()) { return overrides @@ -268,6 +271,7 @@ adl::from(iobuf_parser& parser) { std::nullopt, std::nullopt, std::nullopt, + tristate{std::nullopt}, }; } diff --git a/src/v/cluster/topic_properties.h b/src/v/cluster/topic_properties.h index 37102f3438d6..7a43b2a65a0b 100644 --- a/src/v/cluster/topic_properties.h +++ b/src/v/cluster/topic_properties.h @@ -33,7 +33,7 @@ namespace cluster { */ struct topic_properties : serde:: - envelope, serde::compat_version<0>> { + envelope, serde::compat_version<0>> { topic_properties() noexcept = default; topic_properties( std::optional compression, @@ -79,7 +79,8 @@ struct topic_properties std::optional iceberg_delete, std::optional iceberg_partition_spec, std::optional - iceberg_invalid_record_action) + iceberg_invalid_record_action, + tristate min_cleanable_dirty_ratio) : compression(compression) , cleanup_policy_bitflags(cleanup_policy_bitflags) , compaction_strategy(compaction_strategy) @@ -123,7 +124,8 @@ struct topic_properties , delete_retention_ms(delete_retention_ms) , iceberg_delete(iceberg_delete) , iceberg_partition_spec(std::move(iceberg_partition_spec)) - , iceberg_invalid_record_action(iceberg_invalid_record_action) {} + , iceberg_invalid_record_action(iceberg_invalid_record_action) + , min_cleanable_dirty_ratio(min_cleanable_dirty_ratio) {} std::optional compression; std::optional cleanup_policy_bitflags; @@ -206,6 +208,8 @@ struct topic_properties std::optional iceberg_invalid_record_action; + tristate min_cleanable_dirty_ratio{std::nullopt}; + bool is_compacted() const; bool has_overrides() const; bool requires_remote_erase() const; @@ -254,7 +258,8 @@ struct topic_properties delete_retention_ms, iceberg_delete, iceberg_partition_spec, - iceberg_invalid_record_action); + iceberg_invalid_record_action, + min_cleanable_dirty_ratio); } friend bool operator==(const topic_properties&, const topic_properties&) diff --git a/src/v/compat/cluster_compat.h b/src/v/compat/cluster_compat.h index af1732d57eb0..3aade398e442 100644 --- a/src/v/compat/cluster_compat.h +++ b/src/v/compat/cluster_compat.h @@ -362,6 +362,7 @@ struct compat_check { wr, "iceberg_invalid_record_action", obj.iceberg_invalid_record_action); + json_write(min_cleanable_dirty_ratio); } static cluster::topic_properties from_json(json::Value& rd) { @@ -399,6 +400,7 @@ struct compat_check { json_read(flush_bytes); json_read(remote_topic_namespace_override); json_read(iceberg_invalid_record_action); + json_read(min_cleanable_dirty_ratio); return obj; } @@ -431,6 +433,7 @@ struct compat_check { obj.flush_ms = std::nullopt; obj.remote_topic_namespace_override = std::nullopt; obj.iceberg_invalid_record_action = std::nullopt; + obj.min_cleanable_dirty_ratio = tristate{std::nullopt}; if (reply != obj) { throw compat_error(fmt::format( @@ -517,6 +520,8 @@ struct compat_check { obj.properties.mpx_virtual_cluster_id = std::nullopt; obj.properties.iceberg_invalid_record_action = std::nullopt; + obj.properties.min_cleanable_dirty_ratio = tristate{ + std::nullopt}; // ADL will always squash is_migrated to false obj.is_migrated = false; diff --git a/src/v/compat/cluster_generator.h b/src/v/compat/cluster_generator.h index f6d747cfb846..634eca5a56cf 100644 --- a/src/v/compat/cluster_generator.h +++ b/src/v/compat/cluster_generator.h @@ -662,7 +662,9 @@ struct instance_generator { return random_generators::random_choice( {model::iceberg_invalid_record_action::drop, model::iceberg_invalid_record_action::dlq_table}); - })}; + }), + tristate{disable_tristate}, + }; } static std::vector limits() { return {}; } diff --git a/src/v/compat/cluster_json.h b/src/v/compat/cluster_json.h index c00dcdf0c46a..9f0dd16aee8e 100644 --- a/src/v/compat/cluster_json.h +++ b/src/v/compat/cluster_json.h @@ -630,6 +630,7 @@ inline void rjson_serialize( write_member(w, "delete_retention_ms", tps.delete_retention_ms); write_exceptional_member_type( w, "iceberg_invalid_record_action", tps.iceberg_invalid_record_action); + write_member(w, "min_cleanable_dirty_ratio", tps.min_cleanable_dirty_ratio); w.EndObject(); } @@ -704,6 +705,7 @@ inline void read_value(const json::Value& rd, cluster::topic_properties& obj) { read_member(rd, "delete_retention_ms", obj.delete_retention_ms); read_member( rd, "iceberg_invalid_record_action", obj.iceberg_invalid_record_action); + read_member(rd, "min_cleanable_dirty_ratio", obj.min_cleanable_dirty_ratio); } inline void rjson_serialize( diff --git a/src/v/compat/json.h b/src/v/compat/json.h index dce9e0453461..6c505e644b8a 100644 --- a/src/v/compat/json.h +++ b/src/v/compat/json.h @@ -35,6 +35,10 @@ inline const char* to_str(const rapidjson::Type t) { return str[t]; } +inline void read_value(const json::Value& v, double& target) { + target = v.GetDouble(); +} + inline void read_value(const json::Value& v, int64_t& target) { target = v.GetInt64(); } diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index 4214d869d6a7..120a60281853 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -151,6 +151,10 @@ def read_topic_properties_serde(rdr: Reader, version): 'iceberg_invalid_record_action': rdr.read_optional(Reader.read_serde_enum), } + if version >= 12: + topic_properties |= { + 'min_cleanable_dirty_ratio': rdr.read_tristate(Reader.read_double), + } return topic_properties diff --git a/tools/offline_log_viewer/reader.py b/tools/offline_log_viewer/reader.py index f91141327c5c..e4138c426d7e 100644 --- a/tools/offline_log_viewer/reader.py +++ b/tools/offline_log_viewer/reader.py @@ -78,6 +78,9 @@ def read_int64(self): def read_uint64(self): return struct.unpack(self.with_endianness('Q'), self.stream.read(8))[0] + def read_double(self): + return struct.unpack(self.with_endianness('d'), self.stream.read(8))[0] + def read_serde_enum(self): return self.read_int32() From 04363badcc0fde5d69b81cd68b3071b20cba0134 Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Thu, 30 Jan 2025 22:24:59 -0500 Subject: [PATCH 03/12] `cluster`: add `min_cleanable_dirty_ratio` to `ntp_config` --- src/v/cluster/topic_configuration.cc | 1 + src/v/cluster/topic_properties.cc | 1 + src/v/storage/ntp_config.h | 14 ++++++++++++++ 3 files changed, 16 insertions(+) diff --git a/src/v/cluster/topic_configuration.cc b/src/v/cluster/topic_configuration.cc index 4e60e29328b1..18741fb0a5fa 100644 --- a/src/v/cluster/topic_configuration.cc +++ b/src/v/cluster/topic_configuration.cc @@ -59,6 +59,7 @@ storage::ntp_config topic_configuration::make_ntp_config( .iceberg_mode = properties.iceberg_mode, .cloud_topic_enabled = properties.cloud_topic_enabled, .tombstone_retention_ms = properties.delete_retention_ms, + .min_cleanable_dirty_ratio = properties.min_cleanable_dirty_ratio, }); } return { diff --git a/src/v/cluster/topic_properties.cc b/src/v/cluster/topic_properties.cc index fad6f9ad4d7c..b5b5cf12d546 100644 --- a/src/v/cluster/topic_properties.cc +++ b/src/v/cluster/topic_properties.cc @@ -176,6 +176,7 @@ topic_properties::get_ntp_cfg_overrides() const { ret.iceberg_mode = iceberg_mode; ret.cloud_topic_enabled = cloud_topic_enabled; ret.tombstone_retention_ms = delete_retention_ms; + ret.min_cleanable_dirty_ratio = min_cleanable_dirty_ratio; return ret; } diff --git a/src/v/storage/ntp_config.h b/src/v/storage/ntp_config.h index 47c67916f17e..6b8d18448e07 100644 --- a/src/v/storage/ntp_config.h +++ b/src/v/storage/ntp_config.h @@ -86,6 +86,8 @@ class ntp_config { // properties. tristate tombstone_retention_ms; + tristate min_cleanable_dirty_ratio; + friend std::ostream& operator<<(std::ostream&, const default_overrides&); }; @@ -365,6 +367,18 @@ class ntp_config { : default_cloud_topic_enabled; } + std::optional min_cleanable_dirty_ratio() const { + if (_overrides) { + if (_overrides->min_cleanable_dirty_ratio.is_disabled()) { + return std::nullopt; + } + if (_overrides->min_cleanable_dirty_ratio.has_optional_value()) { + return _overrides->min_cleanable_dirty_ratio.value(); + } + } + return config::shard_local_cfg().min_cleanable_dirty_ratio(); + } + ntp_config copy() const { return { _ntp, From 9f435f165b07ae55b9dc70ec81e9eddd2466833e Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Mon, 10 Feb 2025 22:09:13 -0500 Subject: [PATCH 04/12] `kafka`: use `conditional_t` for floating point tristate These call sites were previously hardcoded to `int64_t` for the type used in a `boost::lexical_cast`. With the advent of the first `tristate`, this is no longer a valid default. Use a conditional type in order to account for the possibility of a floating point tristate value conversion. --- src/v/kafka/server/handlers/configs/config_utils.h | 5 ++++- src/v/kafka/server/handlers/topics/types.h | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/v/kafka/server/handlers/configs/config_utils.h b/src/v/kafka/server/handlers/configs/config_utils.h index e620b57da35e..9b5d720a9783 100644 --- a/src/v/kafka/server/handlers/configs/config_utils.h +++ b/src/v/kafka/server/handlers/configs/config_utils.h @@ -37,6 +37,7 @@ #include #include +#include namespace kafka { template @@ -775,7 +776,9 @@ void parse_and_set_tristate( } // set property value if (op == config_resource_operation::set) { - auto parsed = boost::lexical_cast(*value); + using config_t + = std::conditional_t, T, int64_t>; + auto parsed = boost::lexical_cast(*value); if (parsed <= 0) { property.value = tristate{}; } else { diff --git a/src/v/kafka/server/handlers/topics/types.h b/src/v/kafka/server/handlers/topics/types.h index 12e9d6c3e2ac..694bb72de1fa 100644 --- a/src/v/kafka/server/handlers/topics/types.h +++ b/src/v/kafka/server/handlers/topics/types.h @@ -26,6 +26,7 @@ #include #include +#include namespace kafka { @@ -185,7 +186,9 @@ get_config_value(const config_map_t& config, std::string_view key) { template tristate get_tristate_value(const config_map_t& config, std::string_view key) { - auto v = get_config_value(config, key); + using config_t + = std::conditional_t, T, int64_t>; + auto v = get_config_value(config, key); // no value set if (!v) { return tristate(std::nullopt); From 97a6f69dc022166840bd15f3a3dfb4e01b61c3d0 Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Thu, 30 Jan 2025 22:40:04 -0500 Subject: [PATCH 05/12] `kafka`: add `min.cleanable.dirty.ratio` to topic config handlers --- src/v/cluster/metadata_cache.cc | 7 +++++++ src/v/cluster/metadata_cache.h | 1 + src/v/cluster/topic_table.cc | 4 ++++ src/v/cluster/types.h | 6 ++++-- src/v/kafka/server/handlers/alter_configs.cc | 10 +++++++++- .../handlers/configs/config_response_utils.cc | 12 ++++++++++++ .../server/handlers/configs/config_utils.h | 19 +++++++++++++++++++ src/v/kafka/server/handlers/create_topics.cc | 3 ++- .../handlers/incremental_alter_configs.cc | 9 ++++++++- src/v/kafka/server/handlers/topics/types.cc | 3 +++ src/v/kafka/server/handlers/topics/types.h | 4 +++- src/v/kafka/server/tests/alter_config_test.cc | 1 + tests/rptest/tests/describe_topics_test.py | 10 ++++++++++ 13 files changed, 83 insertions(+), 6 deletions(-) diff --git a/src/v/cluster/metadata_cache.cc b/src/v/cluster/metadata_cache.cc index c49ceb6d2742..2c69c7fc7537 100644 --- a/src/v/cluster/metadata_cache.cc +++ b/src/v/cluster/metadata_cache.cc @@ -324,6 +324,11 @@ metadata_cache::get_default_delete_retention_ms() const { return config::shard_local_cfg().tombstone_retention_ms(); } +std::optional +metadata_cache::get_default_min_cleanable_dirty_ratio() const { + return config::shard_local_cfg().min_cleanable_dirty_ratio(); +} + topic_properties metadata_cache::get_default_properties() const { topic_properties tp; tp.compression = {get_default_compression()}; @@ -343,6 +348,8 @@ topic_properties metadata_cache::get_default_properties() const { get_default_retention_local_target_ms()}; tp.delete_retention_ms = tristate{ get_default_delete_retention_ms()}; + tp.min_cleanable_dirty_ratio = tristate{ + get_default_min_cleanable_dirty_ratio()}; return tp; } diff --git a/src/v/cluster/metadata_cache.h b/src/v/cluster/metadata_cache.h index ed048c4c5b0b..a8a5da820705 100644 --- a/src/v/cluster/metadata_cache.h +++ b/src/v/cluster/metadata_cache.h @@ -204,6 +204,7 @@ class metadata_cache { get_default_record_value_subject_name_strategy() const; std::optional get_default_delete_retention_ms() const; + std::optional get_default_min_cleanable_dirty_ratio() const; topic_properties get_default_properties() const; std::optional diff --git a/src/v/cluster/topic_table.cc b/src/v/cluster/topic_table.cc index c82a0bfd875a..8f69d852681a 100644 --- a/src/v/cluster/topic_table.cc +++ b/src/v/cluster/topic_table.cc @@ -1107,6 +1107,10 @@ topic_properties topic_table::update_topic_properties( incremental_update( updated_properties.iceberg_invalid_record_action, overrides.iceberg_invalid_record_action); + incremental_update( + updated_properties.min_cleanable_dirty_ratio, + overrides.min_cleanable_dirty_ratio); + return updated_properties; } diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index dc0fba1f964b..bd48e9cfacb5 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -568,7 +568,7 @@ struct property_update> struct incremental_topic_updates : serde::envelope< incremental_topic_updates, - serde::version<8>, + serde::version<9>, serde::compat_version<0>> { static constexpr int8_t version_with_data_policy = -1; static constexpr int8_t version_with_shadow_indexing = -3; @@ -644,6 +644,7 @@ struct incremental_topic_updates property_update> iceberg_partition_spec; property_update> iceberg_invalid_record_action; + property_update> min_cleanable_dirty_ratio; // To allow us to better control use of the deprecated shadow_indexing // field, use getters and setters instead. @@ -685,7 +686,8 @@ struct incremental_topic_updates delete_retention_ms, iceberg_delete, iceberg_partition_spec, - iceberg_invalid_record_action); + iceberg_invalid_record_action, + min_cleanable_dirty_ratio); } friend std::ostream& diff --git a/src/v/kafka/server/handlers/alter_configs.cc b/src/v/kafka/server/handlers/alter_configs.cc index 17b8eb63ce1f..94bf01f28aa1 100644 --- a/src/v/kafka/server/handlers/alter_configs.cc +++ b/src/v/kafka/server/handlers/alter_configs.cc @@ -83,7 +83,7 @@ create_topic_properties_update( std::apply(apply_op(op_t::none), update.custom_properties.serde_fields()); static_assert( - std::tuple_size_v == 34, + std::tuple_size_v == 35, "If you added a property, please decide on it's default alter config " "policy, and handle the update in the loop below"); static_assert( @@ -385,6 +385,14 @@ create_topic_properties_update( kafka::config_resource_operation::set); continue; } + if (cfg.name == topic_property_min_cleanable_dirty_ratio) { + parse_and_set_tristate( + update.properties.min_cleanable_dirty_ratio, + cfg.value, + kafka::config_resource_operation::set, + min_cleanable_dirty_ratio_validator{}); + continue; + } } catch (const validation_error& e) { return make_error_alter_config_resource_response< alter_configs_resource_response>( diff --git a/src/v/kafka/server/handlers/configs/config_response_utils.cc b/src/v/kafka/server/handlers/configs/config_response_utils.cc index 8a463954e63c..922fe0cfa8b6 100644 --- a/src/v/kafka/server/handlers/configs/config_response_utils.cc +++ b/src/v/kafka/server/handlers/configs/config_response_utils.cc @@ -1023,6 +1023,18 @@ config_response_container_t make_topic_configs( &describe_as_string); } + add_topic_config_if_requested( + config_keys, + result, + topic_property_min_cleanable_dirty_ratio, + metadata_cache.get_default_min_cleanable_dirty_ratio(), + topic_property_min_cleanable_dirty_ratio, + topic_properties.min_cleanable_dirty_ratio, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().min_cleanable_dirty_ratio.desc())); + return result; } diff --git a/src/v/kafka/server/handlers/configs/config_utils.h b/src/v/kafka/server/handlers/configs/config_utils.h index 9b5d720a9783..2e555048d770 100644 --- a/src/v/kafka/server/handlers/configs/config_utils.h +++ b/src/v/kafka/server/handlers/configs/config_utils.h @@ -520,6 +520,25 @@ struct iceberg_partition_spec_validator { } }; +struct min_cleanable_dirty_ratio_validator { + std::optional + operator()(const ss::sstring&, const tristate& value) { + double min = 0.0; + double max = 1.0; + if (value.has_optional_value()) { + if (value.value() < min || value.value() > max) { + return fmt::format( + "min.cleanable.dirty.ratio {} is outside of allowed range " + "[{}, {}]", + value.value(), + min, + max); + } + } + return std::nullopt; + } +}; + template requires requires( model::topic_namespace_view tns, diff --git a/src/v/kafka/server/handlers/create_topics.cc b/src/v/kafka/server/handlers/create_topics.cc index 77c949ed619c..1d30e2519063 100644 --- a/src/v/kafka/server/handlers/create_topics.cc +++ b/src/v/kafka/server/handlers/create_topics.cc @@ -78,7 +78,8 @@ bool is_supported(std::string_view name) { topic_property_delete_retention_ms, topic_property_iceberg_delete, topic_property_iceberg_partition_spec, - topic_property_iceberg_invalid_record_action}); + topic_property_iceberg_invalid_record_action, + topic_property_min_cleanable_dirty_ratio}); if (std::any_of( supported_configs.begin(), diff --git a/src/v/kafka/server/handlers/incremental_alter_configs.cc b/src/v/kafka/server/handlers/incremental_alter_configs.cc index e24dc504539d..ca30ef6a4828 100644 --- a/src/v/kafka/server/handlers/incremental_alter_configs.cc +++ b/src/v/kafka/server/handlers/incremental_alter_configs.cc @@ -389,7 +389,14 @@ create_topic_properties_update( op); continue; } - + if (cfg.name == topic_property_min_cleanable_dirty_ratio) { + parse_and_set_tristate( + update.properties.min_cleanable_dirty_ratio, + cfg.value, + op, + min_cleanable_dirty_ratio_validator{}); + continue; + } } catch (const validation_error& e) { vlog( klog.debug, diff --git a/src/v/kafka/server/handlers/topics/types.cc b/src/v/kafka/server/handlers/topics/types.cc index 7ec538f566e6..fa2f06fed668 100644 --- a/src/v/kafka/server/handlers/topics/types.cc +++ b/src/v/kafka/server/handlers/topics/types.cc @@ -280,6 +280,9 @@ to_cluster_type(const creatable_topic& t) { = get_enum_value( config_entries, topic_property_iceberg_invalid_record_action); + cfg.properties.min_cleanable_dirty_ratio = get_tristate_value( + config_entries, topic_property_min_cleanable_dirty_ratio); + schema_id_validation_config_parser schema_id_validation_config_parser{ cfg.properties}; diff --git a/src/v/kafka/server/handlers/topics/types.h b/src/v/kafka/server/handlers/topics/types.h index 694bb72de1fa..494658b2916d 100644 --- a/src/v/kafka/server/handlers/topics/types.h +++ b/src/v/kafka/server/handlers/topics/types.h @@ -115,6 +115,9 @@ inline constexpr std::string_view topic_property_iceberg_partition_spec inline constexpr std::string_view topic_property_iceberg_invalid_record_action = "redpanda.iceberg.invalid.record.action"; +inline constexpr std::string_view topic_property_min_cleanable_dirty_ratio + = "min.cleanable.dirty.ratio"; + // Kafka topic properties that is not relevant for Redpanda // Or cannot be altered with kafka alter handler inline constexpr std::array allowlist_topic_noop_confs = { @@ -126,7 +129,6 @@ inline constexpr std::array allowlist_topic_noop_confs = { "segment.jitter.ms", "min.insync.replicas", "min.compaction.lag.ms", - "min.cleanable.dirty.ratio", "message.timestamp.difference.max.ms", "message.format.version", "max.compaction.lag.ms", diff --git a/src/v/kafka/server/tests/alter_config_test.cc b/src/v/kafka/server/tests/alter_config_test.cc index 88cc9a0ec3b0..743fcca78544 100644 --- a/src/v/kafka/server/tests/alter_config_test.cc +++ b/src/v/kafka/server/tests/alter_config_test.cc @@ -749,6 +749,7 @@ FIXTURE_TEST( "redpanda.iceberg.mode", "redpanda.leaders.preference", "delete.retention.ms", + "min.cleanable.dirty.ratio", }; // All properties_request diff --git a/tests/rptest/tests/describe_topics_test.py b/tests/rptest/tests/describe_topics_test.py index 7c47dd1ba8ff..cc7662b8cf20 100644 --- a/tests/rptest/tests/describe_topics_test.py +++ b/tests/rptest/tests/describe_topics_test.py @@ -313,6 +313,16 @@ def test_describe_topics_with_documentation_and_types(self): value="dlq_table", doc_string= "Action to take when an invalid record is encountered."), + "min.cleanable.dirty.ratio": + ConfigProperty( + config_type="DOUBLE", + value="-1", + doc_string= + "The minimum ratio between the number of bytes in \"dirty\" segments and " + "the total number of bytes in closed segments that must be reached " + "before a partition's log is eligible for compaction in a compact topic. " + "The topic property `min.cleanable.dirty.ratio` overrides the value of " + "`min_cleanable_dirty_ratio` at the topic level."), } tp_spec = TopicSpec() From fa6733c95e64da38e3be2045a8ad1e4a6413db9d Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Mon, 10 Feb 2025 22:34:00 -0500 Subject: [PATCH 06/12] `rptest`: add `min_cleanable_dirty_ratio` alter config test With some minor modifications in `kafka_cli_tools.py` and `types.py`. --- tests/rptest/clients/kafka_cli_tools.py | 12 +++-- tests/rptest/clients/types.py | 5 +- .../tests/alter_topic_configuration_test.py | 47 +++++++++++++++++++ 3 files changed, 60 insertions(+), 4 deletions(-) diff --git a/tests/rptest/clients/kafka_cli_tools.py b/tests/rptest/clients/kafka_cli_tools.py index 8b2828fd1f3b..460edd00fd50 100644 --- a/tests/rptest/clients/kafka_cli_tools.py +++ b/tests/rptest/clients/kafka_cli_tools.py @@ -247,13 +247,16 @@ def describe_topic(self, topic: str): assert configs is not None, "didn't find Configs: section" - def maybe_int(key: str, value: str): + def maybe_convert_value(key: str, value: str): if key in [ "retention_ms", "retention_bytes", 'segment_bytes', 'delete_retention_ms' ]: return int(value) - return value + elif key in ['min_cleanable_dirty_ratio']: + return float(value) + else: + return value def fix_key(key: str): return key.replace(".", "_") @@ -265,7 +268,10 @@ def fix_key(key: str): ] configs = {fix_key(kv[0].strip()): kv[1].strip() for kv in configs} - configs = {kv[0]: maybe_int(kv[0], kv[1]) for kv in configs.items()} + configs = { + kv[0]: maybe_convert_value(kv[0], kv[1]) + for kv in configs.items() + } configs["replication_factor"] = replication_factor configs["partition_count"] = partitions # The cast below is needed because a dict cannot be unpacked as keyword args diff --git a/tests/rptest/clients/types.py b/tests/rptest/clients/types.py index 28df6143314a..88d5a444e693 100644 --- a/tests/rptest/clients/types.py +++ b/tests/rptest/clients/types.py @@ -42,6 +42,7 @@ class TopicSpec: PROPERTY_ICEBERG_MODE = "redpanda.iceberg.mode" PROPERTY_DELETE_RETENTION_MS = "delete.retention.ms" PROPERTY_ICEBERG_INVALID_RECORD_ACTION = "redpanda.iceberg.invalid.record.action" + PROPERTY_MIN_CLEANABLE_DIRTY_RATIO = "min.cleanable.dirty.ratio" class CompressionTypes(str, Enum): """ @@ -124,7 +125,8 @@ def __init__( initial_retention_local_target_bytes: int | None = None, initial_retention_local_target_ms: int | None = None, virtual_cluster_id: str | None = None, - delete_retention_ms: int | None = None): + delete_retention_ms: int | None = None, + min_cleanable_dirty_ratio: float | None = None): self.name = name or f"topic-{self._random_topic_suffix()}" self.partition_count = partition_count self.replication_factor = replication_factor @@ -151,6 +153,7 @@ def __init__( self.initial_retention_local_target_ms = initial_retention_local_target_ms self.virtual_cluster_id = virtual_cluster_id self.delete_retention_ms = delete_retention_ms + self.min_cleanable_dirty_ratio = min_cleanable_dirty_ratio def __str__(self): return self.name diff --git a/tests/rptest/tests/alter_topic_configuration_test.py b/tests/rptest/tests/alter_topic_configuration_test.py index 32313aecdd6e..f819906b91bf 100644 --- a/tests/rptest/tests/alter_topic_configuration_test.py +++ b/tests/rptest/tests/alter_topic_configuration_test.py @@ -251,6 +251,53 @@ def test_shadow_indexing_config(self): assert altered_output["redpanda.remote.read"] == "false" assert altered_output["redpanda.remote.write"] == "false" + @cluster(num_nodes=3) + def test_min_cleanable_dirty_ratio_validation(self): + topic = self.topics[0].name + kafka_tools = KafkaCliTools(self.redpanda) + self.redpanda.set_cluster_config({"min_cleanable_dirty_ratio": 0.5}) + initial_spec = kafka_tools.describe_topic(topic) + + # Check that a value outside valid range is rejected + try: + self.client().alter_topic_configs( + topic, {TopicSpec.PROPERTY_MIN_CLEANABLE_DIRTY_RATIO: 1.01}) + except subprocess.CalledProcessError as e: + assert "is outside of allowed range" in e.output + + assert initial_spec.min_cleanable_dirty_ratio == kafka_tools.describe_topic( + topic + ).min_cleanable_dirty_ratio, "min.cleanable.dirty.ratio shouldn't be changed to invalid value" + + # Check that a valid value is accepted + self.client().alter_topic_configs( + topic, {TopicSpec.PROPERTY_MIN_CLEANABLE_DIRTY_RATIO: 0.3}) + + assert kafka_tools.describe_topic( + topic).min_cleanable_dirty_ratio == 0.3 + + # Check that we can disable the tristate with -1 + self.client().alter_topic_configs( + topic, {TopicSpec.PROPERTY_MIN_CLEANABLE_DIRTY_RATIO: -1}) + + assert kafka_tools.describe_topic( + topic).min_cleanable_dirty_ratio == -1 + + # Check that we can disable the tristate with any value < 0 + self.client().alter_topic_configs( + topic, {TopicSpec.PROPERTY_MIN_CLEANABLE_DIRTY_RATIO: -123.456}) + + assert kafka_tools.describe_topic( + topic).min_cleanable_dirty_ratio == -1 + + # Check that deleting the topic config resets it back to cluster default + self.client().delete_topic_config( + topic, TopicSpec.PROPERTY_MIN_CLEANABLE_DIRTY_RATIO) + + assert kafka_tools.describe_topic( + topic + ).min_cleanable_dirty_ratio == initial_spec.min_cleanable_dirty_ratio + class ShadowIndexingGlobalConfig(RedpandaTest): topics = (TopicSpec(partition_count=1, replication_factor=3), ) From e45283a9bdaa96a73f3454044ffda9fb35584d90 Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Fri, 31 Jan 2025 12:43:57 -0500 Subject: [PATCH 07/12] `storage`: make `dirty_ratio` a pure `virtual` function in `log` --- src/v/raft/tests/failure_injectable_log.cc | 4 ++++ src/v/raft/tests/failure_injectable_log.h | 2 ++ src/v/storage/disk_log_impl.h | 5 +---- src/v/storage/log.h | 5 +++++ 4 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/v/raft/tests/failure_injectable_log.cc b/src/v/raft/tests/failure_injectable_log.cc index d55e12afb9a2..72b9715dbe98 100644 --- a/src/v/raft/tests/failure_injectable_log.cc +++ b/src/v/raft/tests/failure_injectable_log.cc @@ -240,4 +240,8 @@ failure_injectable_log::retention_offset(storage::gc_config cfg) const { return _underlying_log->retention_offset(cfg); } +double failure_injectable_log::dirty_ratio() const { + return _underlying_log->dirty_ratio(); +} + } // namespace raft diff --git a/src/v/raft/tests/failure_injectable_log.h b/src/v/raft/tests/failure_injectable_log.h index b8d1b14fc573..b2ec00d43196 100644 --- a/src/v/raft/tests/failure_injectable_log.h +++ b/src/v/raft/tests/failure_injectable_log.h @@ -131,6 +131,8 @@ class failure_injectable_log final : public storage::log { std::optional retention_offset(storage::gc_config) const final; + double dirty_ratio() const final; + private: ss::shared_ptr _underlying_log; std::optional _append_delay_generator; diff --git a/src/v/storage/disk_log_impl.h b/src/v/storage/disk_log_impl.h index b056a5e55ee9..d9fea6f57fbf 100644 --- a/src/v/storage/disk_log_impl.h +++ b/src/v/storage/disk_log_impl.h @@ -262,10 +262,7 @@ class disk_log_impl final : public log { uint64_t closed_segment_bytes() const { return _closed_segment_bytes; } - // Returns the dirty ratio of the log. - // The dirty ratio is the ratio of bytes in closed, dirty segments to the - // total number of bytes in all closed segments in the log. - double dirty_ratio() const; + double dirty_ratio() const final; private: friend class disk_log_appender; // for multi-term appends diff --git a/src/v/storage/log.h b/src/v/storage/log.h index 1a4b7bfbd306..7dfd65a61d1c 100644 --- a/src/v/storage/log.h +++ b/src/v/storage/log.h @@ -222,6 +222,11 @@ class log { */ virtual std::optional retention_offset(gc_config) const = 0; + // Returns the dirty ratio of the log. The dirty ratio is the ratio of bytes + // in closed, dirty segments to the total number of bytes in all closed + // segments in the log. + virtual double dirty_ratio() const = 0; + private: ntp_config _config; From eda26f7dd04553404f4235c263f9004c653a6bd4 Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Mon, 10 Feb 2025 06:28:45 -0500 Subject: [PATCH 08/12] `storage`: add `compaction_checked` flag to `bitflags` The housekeeping loop will no longer compact every partition indiscriminately, but instead evaluate which partitions are worth compacting via a heuristic. Add a new `bitflag` to `log_housekeeping_meta::bitflags` that indicates the meta has been evaluated for compaction (but does not indicate that it has been compacted.) --- src/v/storage/log_housekeeping_meta.h | 1 + 1 file changed, 1 insertion(+) diff --git a/src/v/storage/log_housekeeping_meta.h b/src/v/storage/log_housekeeping_meta.h index df7375deefaf..9b852eaebc8c 100644 --- a/src/v/storage/log_housekeeping_meta.h +++ b/src/v/storage/log_housekeeping_meta.h @@ -20,6 +20,7 @@ struct log_housekeeping_meta { none = 0, compacted = 1U, lifetime_checked = 1U << 1U, + compaction_checked = 1U << 2U, }; explicit log_housekeeping_meta(ss::shared_ptr l) noexcept : handle(std::move(l)) {} From 427bbfa15bce338f6b48ce2fcf39fb2293053154 Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Mon, 10 Feb 2025 06:32:56 -0500 Subject: [PATCH 09/12] `storage`: use `compaction_checked` flag in compacting loop --- src/v/storage/log_manager.cc | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/v/storage/log_manager.cc b/src/v/storage/log_manager.cc index df7fa8888300..8c84dcca66f3 100644 --- a/src/v/storage/log_manager.cc +++ b/src/v/storage/log_manager.cc @@ -220,7 +220,9 @@ log_manager::housekeeping_scan(model::timestamp collection_threshold) { // algorithm is: mark the logs visited, rotate _logs_list, op, and loop // until empty or reaching a marked log for (auto& log_meta : _logs_list) { - log_meta.flags &= ~(bflags::compacted | bflags::lifetime_checked); + log_meta.flags &= ~( + bflags::compacted | bflags::lifetime_checked + | bflags::compaction_checked); } /* @@ -260,8 +262,9 @@ log_manager::housekeeping_scan(model::timestamp collection_threshold) { co_await compaction_map->initialize(compaction_mem_bytes); _compaction_hash_key_map = std::move(compaction_map); } - while (!_logs_list.empty() - && is_not_set(_logs_list.front().flags, bflags::compacted)) { + while ( + !_logs_list.empty() + && is_not_set(_logs_list.front().flags, bflags::compaction_checked)) { if (_abort_source.abort_requested()) { co_return; } @@ -270,6 +273,7 @@ log_manager::housekeeping_scan(model::timestamp collection_threshold) { _logs_list.shift_forward(); + current_log.flags |= bflags::compaction_checked; current_log.flags |= bflags::compacted; current_log.last_compaction = ss::lowres_clock::now(); From 43413b83a09ab91f94b07f88086f2d5bc1550523 Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Fri, 31 Jan 2025 12:45:07 -0500 Subject: [PATCH 10/12] `storage`: schedule compaction by `dirty_ratio` --- src/v/storage/log_manager.cc | 62 ++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/src/v/storage/log_manager.cc b/src/v/storage/log_manager.cc index 8c84dcca66f3..a5d1ad7058a4 100644 --- a/src/v/storage/log_manager.cc +++ b/src/v/storage/log_manager.cc @@ -236,6 +236,9 @@ log_manager::housekeeping_scan(model::timestamp collection_threshold) { * compaction is already sequential when this will be unified with * compaction, the whole task could be made concurrent */ + using compaction_heuristic_t = double; + absl::flat_hash_map + ntp_to_compaction_heuristic; while (!_logs_list.empty() && is_not_set(_logs_list.front().flags, bflags::lifetime_checked)) { if (_abort_source.abort_requested()) { @@ -250,8 +253,61 @@ log_manager::housekeeping_scan(model::timestamp collection_threshold) { // prevents the removal of the parent object. this makes awaiting // apply_segment_ms safe against removal of segments from _logs_list co_await current_log.handle->apply_segment_ms(); + + auto should_compact_log = [](ss::shared_ptr l) { + // Consider the dirty ratio. + const auto min_cleanable_dirty_ratio + = l->config().min_cleanable_dirty_ratio().value_or(0.0); + const auto dirty_ratio = l->dirty_ratio(); + if (dirty_ratio >= min_cleanable_dirty_ratio) { + return true; + } + + return false; + }; + + const auto compact_log = should_compact_log(current_log.handle); + + if (compact_log) { + // Order ntps by compaction heuristic. + // Currently, this is just the dirty ratio. + auto compute_compaction_heuristic = + [](ss::shared_ptr l) -> compaction_heuristic_t { + return l->dirty_ratio(); + }; + + auto compaction_heuristic_weight = compute_compaction_heuristic( + current_log.handle); + ntp_to_compaction_heuristic.emplace( + current_log.handle->config().ntp(), compaction_heuristic_weight); + } } + // If there are no partitions to compact, return early + if (ntp_to_compaction_heuristic.empty()) { + co_return; + } + + auto sort_by_heuristic = [&ntp_to_compaction_heuristic]( + const log_housekeeping_meta& a, + const log_housekeeping_meta& b) { + auto heuristic_it_a = ntp_to_compaction_heuristic.find( + a.handle->config().ntp()); + compaction_heuristic_t heuristic_a + = (heuristic_it_a != ntp_to_compaction_heuristic.end()) + ? heuristic_it_a->second + : 0.0; + auto heuristic_it_b = ntp_to_compaction_heuristic.find( + b.handle->config().ntp()); + compaction_heuristic_t heuristic_b + = (heuristic_it_b != ntp_to_compaction_heuristic.end()) + ? heuristic_it_b->second + : 0.0; + return heuristic_a > heuristic_b; + }; + + _logs_list.sort(sort_by_heuristic); + if ( config::shard_local_cfg().log_compaction_use_sliding_window.value() && !_compaction_hash_key_map && !_logs_list.empty() @@ -274,6 +330,12 @@ log_manager::housekeeping_scan(model::timestamp collection_threshold) { _logs_list.shift_forward(); current_log.flags |= bflags::compaction_checked; + + if (!ntp_to_compaction_heuristic.contains( + current_log.handle->config().ntp())) { + continue; + } + current_log.flags |= bflags::compacted; current_log.last_compaction = ss::lowres_clock::now(); From 31d6c06d0c870f4a7a5f8aeb6e3b5049a8646518 Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Sun, 9 Feb 2025 19:01:22 -0500 Subject: [PATCH 11/12] `storage`: add compaction functions to `log_manager_accessor` --- src/v/storage/tests/common.h | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/v/storage/tests/common.h b/src/v/storage/tests/common.h index 6c84b1e799ba..fa15f47202e0 100644 --- a/src/v/storage/tests/common.h +++ b/src/v/storage/tests/common.h @@ -19,5 +19,14 @@ class log_manager_accessor { static batch_cache& batch_cache(storage::log_manager& m) { return m._batch_cache; } + + static ss::future<> housekeeping_scan(storage::log_manager& m) { + return m.housekeeping_scan(m.lowest_ts_to_retain()); + } + + static storage::log_manager::compaction_list_type& + logs_list(storage::log_manager& m) { + return m._logs_list; + } }; } // namespace storage::testing_details From 1af7ac0c1ab8417224a5954e74edb256e0a1d8d4 Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Sun, 9 Feb 2025 19:41:06 -0500 Subject: [PATCH 12/12] `storage`: add `compaction_scheduling` fixture test --- src/v/storage/tests/BUILD | 1 + src/v/storage/tests/storage_e2e_test.cc | 115 ++++++++++++++++++++++++ 2 files changed, 116 insertions(+) diff --git a/src/v/storage/tests/BUILD b/src/v/storage/tests/BUILD index 72f7cfd48168..3a630523172d 100644 --- a/src/v/storage/tests/BUILD +++ b/src/v/storage/tests/BUILD @@ -564,6 +564,7 @@ redpanda_cc_btest( "//src/v/test_utils:seastar_boost", "//src/v/utils:directory_walker", "//src/v/utils:to_string", + "//src/v/utils:tristate", "@boost//:test", "@fmt", "@seastar", diff --git a/src/v/storage/tests/storage_e2e_test.cc b/src/v/storage/tests/storage_e2e_test.cc index db1d78b0fc38..c868fb977de6 100644 --- a/src/v/storage/tests/storage_e2e_test.cc +++ b/src/v/storage/tests/storage_e2e_test.cc @@ -25,6 +25,7 @@ #include "reflection/adl.h" #include "storage/batch_cache.h" #include "storage/disk_log_impl.h" +#include "storage/log_housekeeping_meta.h" #include "storage/log_manager.h" #include "storage/log_reader.h" #include "storage/ntp_config.h" @@ -40,6 +41,7 @@ #include "test_utils/randoms.h" #include "test_utils/tmp_dir.h" #include "utils/directory_walker.h" +#include "utils/tristate.h" #include #include @@ -5378,3 +5380,116 @@ FIXTURE_TEST(dirty_ratio, storage_test_fixture) { BOOST_REQUIRE_EQUAL(disk_log->closed_segment_bytes(), 0); BOOST_REQUIRE_CLOSE(disk_log->dirty_ratio(), 0.0, tolerance); } + +FIXTURE_TEST(compaction_scheduling, storage_test_fixture) { + using log_manager_accessor = storage::testing_details::log_manager_accessor; + storage::log_manager mgr = make_log_manager(); + info("Configuration: {}", mgr.config()); + auto deferred = ss::defer([&mgr]() mutable { mgr.stop().get(); }); + std::vector> logs; + + using overrides_t = storage::ntp_config::default_overrides; + overrides_t ov; + ov.cleanup_policy_bitflags = model::cleanup_policy_bitflags::compaction; + ov.min_cleanable_dirty_ratio = tristate{0.2}; + + for (const auto& topic : {"tapioca", "cassava", "kudzu"}) { + auto ntp = model::ntp("kafka", topic, 0); + auto log + = mgr + .manage(storage::ntp_config( + ntp, mgr.config().base_dir, std::make_unique(ov))) + .get(); + logs.push_back(log); + } + + auto& meta_list = log_manager_accessor::logs_list(mgr); + + using bflags = storage::log_housekeeping_meta::bitflags; + + static constexpr auto is_set = [](bflags var, auto flag) { + return (var & flag) == flag; + }; + + // Floating point comparison tolerance + static constexpr auto tol = 1.0e-6; + + auto append_and_force_roll = [this](auto& log, int num_batches = 10) { + auto headers = append_random_batches( + log, num_batches); + log->force_roll(ss::default_priority_class()).get(); + }; + + // Attempt a housekeeping scan with no partitions to compact + log_manager_accessor::housekeeping_scan(mgr).get(); + + for (const auto& meta : meta_list) { + BOOST_REQUIRE(is_set(meta.flags, bflags::lifetime_checked)); + BOOST_REQUIRE(!is_set(meta.flags, bflags::compacted)); + } + + // Append batches and force roll with first log- it should be the only one + // compacted + append_and_force_roll(logs[0], 30); + BOOST_REQUIRE_CLOSE(logs[0]->dirty_ratio(), 1.0, tol); + + log_manager_accessor::housekeeping_scan(mgr).get(); + + for (const auto& meta : meta_list) { + bool expect_compacted = meta.handle->config().ntp() + == logs[0]->config().ntp(); + BOOST_REQUIRE(is_set(meta.flags, bflags::lifetime_checked)); + BOOST_REQUIRE(is_set(meta.flags, bflags::compaction_checked)); + BOOST_REQUIRE_EQUAL( + expect_compacted, is_set(meta.flags, bflags::compacted)); + auto batches = read_and_validate_all_batches(logs[0]); + linear_int_kv_batch_generator::validate_post_compaction( + std::move(batches)); + } + + // Append fewer batches and force roll with second log- it should be the + // only one compacted + append_and_force_roll(logs[1], 20); + BOOST_REQUIRE_CLOSE(logs[1]->dirty_ratio(), 1.0, tol); + + log_manager_accessor::housekeeping_scan(mgr).get(); + + for (const auto& meta : meta_list) { + bool expect_compacted = meta.handle->config().ntp() + == logs[1]->config().ntp(); + BOOST_REQUIRE(is_set(meta.flags, bflags::lifetime_checked)); + BOOST_REQUIRE(is_set(meta.flags, bflags::compaction_checked)); + BOOST_REQUIRE_EQUAL( + expect_compacted, is_set(meta.flags, bflags::compacted)); + auto batches = read_and_validate_all_batches(logs[1]); + linear_int_kv_batch_generator::validate_post_compaction( + std::move(batches)); + } + + // Append batches and force roll all logs- all of them will be compacted + for (auto& log : logs) { + append_and_force_roll(log, 10); + } + + BOOST_REQUIRE_GE(logs[0]->dirty_ratio(), 1.0 / 3.0); + BOOST_REQUIRE_GE(logs[1]->dirty_ratio(), 1.0 / 2.0); + BOOST_REQUIRE_CLOSE(logs[2]->dirty_ratio(), 1.0, tol); + + log_manager_accessor::housekeeping_scan(mgr).get(); + + // Logs in the meta list will be ordered w/r/t their dirty ratio + // (descending) post compaction + auto log_it = logs.rbegin(); + for (const auto& meta : meta_list) { + BOOST_REQUIRE(is_set(meta.flags, bflags::lifetime_checked)); + BOOST_REQUIRE(is_set(meta.flags, bflags::compaction_checked)); + BOOST_REQUIRE(is_set(meta.flags, bflags::compacted)); + BOOST_REQUIRE_EQUAL( + meta.handle->config().ntp(), (*log_it)->config().ntp()); + ++log_it; + } + + for (auto& log : logs) { + auto batches = read_and_validate_all_batches(log); + } +};