Skip to content

Commit

Permalink
fix: do not pick compacting/expired files (#4955)
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag authored Nov 6, 2024
1 parent 305767e commit e373334
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 2 deletions.
18 changes: 18 additions & 0 deletions src/mito2/src/compaction/twcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ fn assign_to_windows<'a>(
let mut windows: HashMap<i64, Window> = HashMap::new();
// Iterates all files and assign to time windows according to max timestamp
for f in files {
if f.compacting() {
continue;
}
let (_, end) = f.time_range();
let time_window = end
.convert_to(TimeUnit::Second)
Expand Down Expand Up @@ -444,6 +447,21 @@ mod tests {
);
}

#[test]
fn test_assign_compacting_to_windows() {
let files = [
new_file_handle(FileId::random(), 0, 999, 0),
new_file_handle(FileId::random(), 0, 999, 0),
new_file_handle(FileId::random(), 0, 999, 0),
new_file_handle(FileId::random(), 0, 999, 0),
new_file_handle(FileId::random(), 0, 999, 0),
];
files[0].set_compacting(true);
files[2].set_compacting(true);
let windows = assign_to_windows(files.iter(), 3);
assert_eq!(3, windows.get(&0).unwrap().files.len());
}

/// (Window value, overlapping, files' time ranges in window)
type ExpectedWindowSpec = (i64, bool, Vec<(i64, i64)>);

Expand Down
22 changes: 22 additions & 0 deletions src/mito2/src/compaction/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,28 @@ mod tests {
);
}

#[test]
fn test_assign_compacting_files_to_windows() {
let picker = WindowedCompactionPicker::new(Some(HOUR / 1000));
let files = vec![
(FileId::random(), 0, 2 * HOUR - 1, 0),
(FileId::random(), HOUR, HOUR * 3 - 1, 0),
];
let version = build_version(&files, Some(Duration::from_millis(3 * HOUR as u64)));
version.ssts.levels()[0]
.files()
.for_each(|f| f.set_compacting(true));
let (outputs, expired_ssts, window_seconds) = picker.pick_inner(
RegionId::new(0, 0),
&version,
Timestamp::new_millisecond(HOUR * 3),
);

assert!(expired_ssts.is_empty());
assert_eq!(HOUR / 1000, window_seconds);
assert!(outputs.is_empty());
}

#[test]
fn test_file_time_bucket_span() {
assert_eq!(
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/test_util/memtable_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::data::{timestamp_array_to_i64_slice, DataBatch, DataBuffer};
use crate::memtable::{
BoxedBatchIterator, BulkPart, IterBuilder, KeyValues, Memtable, MemtableBuilder, MemtableId,
MemtableRange, MemtableRef, MemtableStats,
BoxedBatchIterator, BulkPart, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRange,
MemtableRef, MemtableStats,
};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};

Expand Down

0 comments on commit e373334

Please sign in to comment.