From 0d14a9b55d4cb6ff02fa2fadc684efcdc3dc6f95 Mon Sep 17 00:00:00 2001 From: codedump Date: Sun, 6 Oct 2024 15:16:23 +0800 Subject: [PATCH] fix: fix apply wal frames bug --- src/database/database.rs | 15 --------------- src/sqlite/common.rs | 12 ++++++------ src/sync/replicate.rs | 2 +- src/sync/restore.rs | 13 ++++++++++--- tests/integration_test.py | 2 +- 5 files changed, 18 insertions(+), 26 deletions(-) diff --git a/src/database/database.rs b/src/database/database.rs index ff7da1a..750847b 100644 --- a/src/database/database.rs +++ b/src/database/database.rs @@ -708,19 +708,9 @@ impl Database { let (mut ck1, mut ck2) = read_last_checksum(shadow_wal, self.page_size)?; let mut offset = orig_shadow_wal_size; let mut last_commit_size = orig_shadow_wal_size; - - debug!( - "wal header salt: ({}, {})", - wal_header.salt1, wal_header.salt2 - ); // Read through WAL from last position to find the page of the last // committed transaction. loop { - debug!( - "db {} copy frame at {}, checksum: ({}, {})", - self.config.db, offset, ck1, ck2 - ); - let wal_frame = WALFrame::read(&mut wal_file, self.page_size); let wal_frame = match wal_frame { Ok(wal_frame) => wal_frame, @@ -735,11 +725,6 @@ impl Database { } }; - debug!( - "db {} copy frame at {}, salt: ({}, {})", - self.config.db, offset, wal_frame.salt1, wal_frame.salt2 - ); - // compare wal frame salts with wal header salts, break if mismatch if wal_frame.salt1 != wal_header.salt1 || wal_frame.salt2 != wal_header.salt2 { debug!( diff --git a/src/sqlite/common.rs b/src/sqlite/common.rs index 410e7bf..00ca10a 100644 --- a/src/sqlite/common.rs +++ b/src/sqlite/common.rs @@ -8,17 +8,17 @@ use crate::error::Result; pub const WAL_FRAME_HEADER_SIZE: u64 = 24; pub const WAL_HEADER_SIZE: u64 = 32; -const WAL_HEADER_CHECKSUM_OFFSET: u64 = 24; -const WAL_FRAME_HEADER_CHECKSUM_OFFSET: u64 = 16; +static WAL_HEADER_CHECKSUM_OFFSET: u64 = 24; +static WAL_FRAME_HEADER_CHECKSUM_OFFSET: u64 = 16; pub const WAL_HEADER_BIG_ENDIAN_MAGIC: [u8; 4] = [0x37, 0x7f, 0x06, 0x83]; pub const WAL_HEADER_LITTLE_ENDIAN_MAGIC: [u8; 4] = [0x37, 0x7f, 0x06, 0x82]; // SQLite checkpoint modes. -const CHECKPOINT_MODE_PASSIVE: &str = "PASSIVE"; -const CHECKPOINT_MODE_FULL: &str = "FULL"; -const CHECKPOINT_MODE_RESTART: &str = "RESTART"; -const CHECKPOINT_MODE_TRUNCATE: &str = "TRUNCATE"; +static CHECKPOINT_MODE_PASSIVE: &str = "PASSIVE"; +static CHECKPOINT_MODE_FULL: &str = "FULL"; +static CHECKPOINT_MODE_RESTART: &str = "RESTART"; +static CHECKPOINT_MODE_TRUNCATE: &str = "TRUNCATE"; #[derive(Clone)] pub enum CheckpointMode { diff --git a/src/sync/replicate.rs b/src/sync/replicate.rs index da02d47..2d1a313 100644 --- a/src/sync/replicate.rs +++ b/src/sync/replicate.rs @@ -169,7 +169,7 @@ impl Replicate { let init_pos = reader.position(); let mut data = Vec::new(); - // debug!("db {} write wal segment position {:?}", self.db, init_pos,); + debug!("db {} write wal segment position {:?}", self.db, init_pos,); // Copy header if at offset zero. let mut salt1 = 0; diff --git a/src/sync/restore.rs b/src/sync/restore.rs index ebc81ec..00c5029 100644 --- a/src/sync/restore.rs +++ b/src/sync/restore.rs @@ -20,6 +20,8 @@ use crate::storage::SnapshotInfo; use crate::storage::StorageClient; use crate::storage::WalSegmentInfo; +static WAL_CHECKPOINT_TRUNCATE: &str = "PRAGMA wal_checkpoint(TRUNCATE);"; + struct Restore { db: String, config: Vec, @@ -94,9 +96,7 @@ impl Restore { "restore db {} apply wal segments: {:?}", self.db, wal_segments ); - let connection = Connection::open(db_path)?; let wal_file_name = format!("{}-wal", db_path); - let sql = "PRAGMA wal_checkpoint(TRUNCATE)".to_string(); for (index, offsets) in wal_segments { let mut wal_decompressed_data = Vec::new(); @@ -113,6 +113,7 @@ impl Restore { wal_decompressed_data.extend_from_slice(&data); } + // prepare db wal before open db connection let mut wal_file = OpenOptions::new() .write(true) .create(true) @@ -120,13 +121,19 @@ impl Restore { .open(&wal_file_name)?; wal_file.write_all(&wal_decompressed_data)?; - if let Err(e) = connection.execute_batch(&sql) { + wal_file.flush()?; + + let connection = Connection::open(db_path)?; + + if let Err(e) = connection.query_row(WAL_CHECKPOINT_TRUNCATE, [], |_row| Ok(())) { error!( "truncation checkpoint failed during restore {}:{:?}", index, offsets ); return Err(e.into()); } + + // connection.close().unwrap(); } Ok(()) diff --git a/tests/integration_test.py b/tests/integration_test.py index 2036499..9907779 100644 --- a/tests/integration_test.py +++ b/tests/integration_test.py @@ -139,7 +139,7 @@ def test_restore(p, config_file, root, exp_data): config = FsConfigGenerator() config.generate() - test = Test(config.root, 2000) + test = Test(config.root, 12000) test.create_table() bin = "/Users/codedump/source/replited/target/debug/replited"