diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index 18db39aeeec..b3ea4c1b907 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -267,7 +267,10 @@ impl Generic { let byte_offset = segment::Header::LEN as u64 + bytes_read; debug!("truncating segment {segment} to {offset} at {byte_offset}"); let mut file = self.repo.open_segment(segment)?; - file.ftruncate(offset, byte_offset)?; + // Note: The offset index truncates equal or greater, + // inclusive. We'd like to retain `offset` in the index, as + // the commit is also retained in the log. + file.ftruncate(offset + 1, byte_offset)?; // Some filesystems require fsync after ftruncate. file.fsync()?; break; diff --git a/crates/commitlog/src/segment.rs b/crates/commitlog/src/segment.rs index 6da062ab764..94c22334db1 100644 --- a/crates/commitlog/src/segment.rs +++ b/crates/commitlog/src/segment.rs @@ -317,7 +317,7 @@ impl FileLike for OffsetIndexWriter { Ok(()) } - fn ftruncate(&mut self, _tx_offset: u64, tx_offset: u64) -> io::Result<()> { + fn ftruncate(&mut self, tx_offset: u64, _size: u64) -> io::Result<()> { self.reset(); let _ = self.head.truncate(tx_offset); Ok(()) @@ -565,6 +565,9 @@ mod tests { use crate::{payload::ArrayDecoder, repo, Options}; use itertools::Itertools; use proptest::prelude::*; + use rand::thread_rng; + use spacetimedb_paths::server::CommitLogDir; + use tempfile::tempdir; #[test] fn header_roundtrip() { @@ -760,4 +763,46 @@ mod tests { writer.commit().unwrap(); assert_eq!(3, writer.next_tx_offset()); } + + #[test] + fn offset_index_writer_truncates_to_offset() { + use spacetimedb_paths::FromPathUnchecked as _; + + let tmp = tempdir().unwrap(); + let commitlog_dir = CommitLogDir::from_path_unchecked(tmp.path()); + let index_path = commitlog_dir.index(0); + let mut writer = OffsetIndexWriter::new( + TxOffsetIndexMut::create_index_file(&index_path, 100).unwrap(), + Options { + // Ensure we're writing every index entry. + offset_index_interval_bytes: 127.try_into().unwrap(), + offset_index_require_segment_fsync: false, + ..Default::default() + }, + ); + + for i in 1..=10 { + writer.append_after_commit(i, i * 128, 128).unwrap(); + } + // Ensure all entries have been written. + for i in 1..=10 { + assert_eq!(writer.head.key_lookup(i).unwrap(), (i, i * 128)); + } + + let mut rng = thread_rng(); + + // Truncating to any offset in the written range or larger + // retains that offset - 1, or the max offset written. + let truncate_to: TxOffset = rng.gen_range(1..=32); + let retained_key = truncate_to.saturating_sub(1).min(10); + let retained_val = retained_key * 128; + let retained = (retained_key, retained_val); + + writer.ftruncate(truncate_to, rng.gen()).unwrap(); + assert_eq!(writer.head.key_lookup(truncate_to).unwrap(), retained); + // Make sure this also holds after reopen. + drop(writer); + let index = TxOffsetIndex::open_index_file(&index_path).unwrap(); + assert_eq!(index.key_lookup(truncate_to).unwrap(), retained); + } }