Skip to content

Commit

Permalink
fix(storage): Remove ambiguous configuration max_sub_compaction (#16960)
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k authored May 30, 2024
1 parent 4e64389 commit 8dfae83
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 25 deletions.
2 changes: 2 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,8 @@ message CompactTask {
map<uint32, TableWatermarks> table_watermarks = 24;
// The table schemas that are at least as new as the one used to create `input_ssts`.
map<uint32, TableSchema> table_schemas = 25;
// Max sub compaction task numbers
uint32 max_sub_compaction = 26;
}

message LevelHandler {
Expand Down
8 changes: 0 additions & 8 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,10 +721,6 @@ pub struct StorageConfig {
#[serde(default = "default::storage::min_sst_size_for_streaming_upload")]
pub min_sst_size_for_streaming_upload: u64,

/// Max sub compaction task numbers
#[serde(default = "default::storage::max_sub_compaction")]
pub max_sub_compaction: u32,

#[serde(default = "default::storage::max_concurrent_compaction_task_number")]
pub max_concurrent_compaction_task_number: u64,

Expand Down Expand Up @@ -1461,10 +1457,6 @@ pub mod default {
32 * 1024 * 1024
}

pub fn max_sub_compaction() -> u32 {
4
}

pub fn max_concurrent_compaction_task_number() -> u64 {
16
}
Expand Down
1 change: 0 additions & 1 deletion src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ This page is automatically generated by `./risedev generate-example-config`
| max_prefetch_block_number | max prefetch block number | 16 |
| max_preload_io_retry_times | | 3 |
| max_preload_wait_time_mill | | 0 |
| max_sub_compaction | Max sub compaction task numbers | 4 |
| max_version_pinning_duration_sec | | 10800 |
| mem_table_spill_threshold | The spill threshold for mem table. | 4194304 |
| meta_cache_capacity_mb | DEPRECATED: This config will be deprecated in the future version, use `storage.cache.meta_cache_capacity_mb` instead. | |
Expand Down
1 change: 0 additions & 1 deletion src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ compactor_memory_available_proportion = 0.8
sstable_id_remote_fetch_number = 10
min_sstable_size_mb = 32
min_sst_size_for_streaming_upload = 33554432
max_sub_compaction = 4
max_concurrent_compaction_task_number = 16
max_preload_wait_time_mill = 0
max_version_pinning_duration_sec = 10800
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/hummock/manager/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,7 @@ impl HummockManager {
target_sub_level_id: compact_task.input.target_sub_level_id,
task_type: compact_task.compaction_task_type as i32,
split_weight_by_vnode: vnode_partition_count,
max_sub_compaction: group_config.compaction_config.max_sub_compaction,
..Default::default()
};

Expand Down
32 changes: 20 additions & 12 deletions src/storage/src/hummock/compactor/compaction_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ use risingwave_hummock_sdk::table_stats::TableStatsMap;
use risingwave_hummock_sdk::{can_concat, EpochWithGap, KeyComparator};
use risingwave_pb::hummock::compact_task::TaskType;
use risingwave_pb::hummock::{
compact_task, BloomFilterType, CompactTask, KeyRange as KeyRange_vec, LevelType, SstableInfo,
TableSchema,
compact_task, BloomFilterType, CompactTask, LevelType, PbKeyRange, SstableInfo, TableSchema,
};
use tokio::time::Instant;

Expand Down Expand Up @@ -178,15 +177,16 @@ fn generate_splits_fast(
sstable_infos: &Vec<SstableInfo>,
compaction_size: u64,
context: &CompactorContext,
) -> Vec<KeyRange_vec> {
max_sub_compaction: u32,
) -> Vec<PbKeyRange> {
let worker_num = context.compaction_executor.worker_num();
let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;

let parallelism = calculate_task_parallelism_impl(
worker_num,
parallel_compact_size,
compaction_size,
context.storage_opts.max_sub_compaction,
max_sub_compaction,
);
let mut indexes = vec![];
for sst in sstable_infos {
Expand All @@ -213,13 +213,13 @@ fn generate_splits_fast(
}

let mut splits = vec![];
splits.push(KeyRange_vec::new(vec![], vec![]));
splits.push(PbKeyRange::new(vec![], vec![]));
let parallel_key_count = indexes.len() / parallelism;
let mut last_split_key_count = 0;
for key in indexes {
if last_split_key_count >= parallel_key_count {
splits.last_mut().unwrap().right.clone_from(&key);
splits.push(KeyRange_vec::new(key.clone(), vec![]));
splits.push(PbKeyRange::new(key.clone(), vec![]));
last_split_key_count = 0;
}
last_split_key_count += 1;
Expand All @@ -232,7 +232,8 @@ pub async fn generate_splits(
sstable_infos: &Vec<SstableInfo>,
compaction_size: u64,
context: &CompactorContext,
) -> HummockResult<Vec<KeyRange_vec>> {
max_sub_compaction: u32,
) -> HummockResult<Vec<PbKeyRange>> {
const MAX_FILE_COUNT: usize = 32;
let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
if compaction_size > parallel_compact_size {
Expand All @@ -241,6 +242,7 @@ pub async fn generate_splits(
sstable_infos,
compaction_size,
context,
max_sub_compaction,
));
}
let mut indexes = vec![];
Expand Down Expand Up @@ -269,13 +271,13 @@ pub async fn generate_splits(
// sort by key, as for every data block has the same size;
indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.1.as_ref(), b.1.as_ref()));
let mut splits = vec![];
splits.push(KeyRange_vec::new(vec![], vec![]));
splits.push(PbKeyRange::new(vec![], vec![]));

let parallelism = calculate_task_parallelism_impl(
context.compaction_executor.worker_num(),
parallel_compact_size,
compaction_size,
context.storage_opts.max_sub_compaction,
max_sub_compaction,
);

let sub_compaction_data_size =
Expand All @@ -291,7 +293,7 @@ pub async fn generate_splits(
&& remaining_size > parallel_compact_size
{
splits.last_mut().unwrap().right.clone_from(&key);
splits.push(KeyRange_vec::new(key.clone(), vec![]));
splits.push(PbKeyRange::new(key.clone(), vec![]));
last_buffer_size = data_size;
} else {
last_buffer_size += data_size;
Expand Down Expand Up @@ -577,7 +579,13 @@ pub async fn generate_splits_for_task(
.sum::<u64>();

if !optimize_by_copy_block {
let splits = generate_splits(&sstable_infos, compaction_size, context).await?;
let splits = generate_splits(
&sstable_infos,
compaction_size,
context,
compact_task.get_max_sub_compaction(),
)
.await?;
if !splits.is_empty() {
compact_task.splits = splits;
}
Expand Down Expand Up @@ -659,7 +667,7 @@ pub fn calculate_task_parallelism(compact_task: &CompactTask, context: &Compacto
context.compaction_executor.worker_num(),
parallel_compact_size,
compaction_size,
context.storage_opts.max_sub_compaction,
compact_task.get_max_sub_compaction(),
)
}

Expand Down
3 changes: 0 additions & 3 deletions src/storage/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@ pub struct StorageOpts {
pub sstable_id_remote_fetch_number: u32,
/// Whether to enable streaming upload for sstable.
pub min_sst_size_for_streaming_upload: u64,
/// Max sub compaction task numbers
pub max_sub_compaction: u32,
pub max_concurrent_compaction_task_number: u64,
pub max_version_pinning_duration_sec: u64,
pub compactor_iter_max_io_retry_times: usize,
Expand Down Expand Up @@ -176,7 +174,6 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt
compactor_memory_limit_mb: s.compactor_memory_limit_mb,
sstable_id_remote_fetch_number: c.storage.sstable_id_remote_fetch_number,
min_sst_size_for_streaming_upload: c.storage.min_sst_size_for_streaming_upload,
max_sub_compaction: c.storage.max_sub_compaction,
max_concurrent_compaction_task_number: c.storage.max_concurrent_compaction_task_number,
max_version_pinning_duration_sec: c.storage.max_version_pinning_duration_sec,
data_file_cache_dir: c.storage.data_file_cache.dir.clone(),
Expand Down

0 comments on commit 8dfae83

Please sign in to comment.