Skip to content

Commit

Permalink
fix: fix apply wal frames bug
Browse files Browse the repository at this point in the history
  • Loading branch information
lichuang committed Oct 6, 2024
1 parent 8a19eb1 commit 0d14a9b
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 26 deletions.
15 changes: 0 additions & 15 deletions src/database/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,19 +708,9 @@ impl Database {
let (mut ck1, mut ck2) = read_last_checksum(shadow_wal, self.page_size)?;
let mut offset = orig_shadow_wal_size;
let mut last_commit_size = orig_shadow_wal_size;

debug!(
"wal header salt: ({}, {})",
wal_header.salt1, wal_header.salt2
);
// Read through WAL from last position to find the page of the last
// committed transaction.
loop {
debug!(
"db {} copy frame at {}, checksum: ({}, {})",
self.config.db, offset, ck1, ck2
);

let wal_frame = WALFrame::read(&mut wal_file, self.page_size);
let wal_frame = match wal_frame {
Ok(wal_frame) => wal_frame,
Expand All @@ -735,11 +725,6 @@ impl Database {
}
};

debug!(
"db {} copy frame at {}, salt: ({}, {})",
self.config.db, offset, wal_frame.salt1, wal_frame.salt2
);

// compare wal frame salts with wal header salts, break if mismatch
if wal_frame.salt1 != wal_header.salt1 || wal_frame.salt2 != wal_header.salt2 {
debug!(
Expand Down
12 changes: 6 additions & 6 deletions src/sqlite/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ use crate::error::Result;
pub const WAL_FRAME_HEADER_SIZE: u64 = 24;
pub const WAL_HEADER_SIZE: u64 = 32;

const WAL_HEADER_CHECKSUM_OFFSET: u64 = 24;
const WAL_FRAME_HEADER_CHECKSUM_OFFSET: u64 = 16;
static WAL_HEADER_CHECKSUM_OFFSET: u64 = 24;
static WAL_FRAME_HEADER_CHECKSUM_OFFSET: u64 = 16;

pub const WAL_HEADER_BIG_ENDIAN_MAGIC: [u8; 4] = [0x37, 0x7f, 0x06, 0x83];
pub const WAL_HEADER_LITTLE_ENDIAN_MAGIC: [u8; 4] = [0x37, 0x7f, 0x06, 0x82];

// SQLite checkpoint modes.
const CHECKPOINT_MODE_PASSIVE: &str = "PASSIVE";
const CHECKPOINT_MODE_FULL: &str = "FULL";
const CHECKPOINT_MODE_RESTART: &str = "RESTART";
const CHECKPOINT_MODE_TRUNCATE: &str = "TRUNCATE";
static CHECKPOINT_MODE_PASSIVE: &str = "PASSIVE";
static CHECKPOINT_MODE_FULL: &str = "FULL";
static CHECKPOINT_MODE_RESTART: &str = "RESTART";
static CHECKPOINT_MODE_TRUNCATE: &str = "TRUNCATE";

#[derive(Clone)]
pub enum CheckpointMode {
Expand Down
2 changes: 1 addition & 1 deletion src/sync/replicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ impl Replicate {
let init_pos = reader.position();
let mut data = Vec::new();

// debug!("db {} write wal segment position {:?}", self.db, init_pos,);
debug!("db {} write wal segment position {:?}", self.db, init_pos,);

// Copy header if at offset zero.
let mut salt1 = 0;
Expand Down
13 changes: 10 additions & 3 deletions src/sync/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use crate::storage::SnapshotInfo;
use crate::storage::StorageClient;
use crate::storage::WalSegmentInfo;

static WAL_CHECKPOINT_TRUNCATE: &str = "PRAGMA wal_checkpoint(TRUNCATE);";

struct Restore {
db: String,
config: Vec<StorageConfig>,
Expand Down Expand Up @@ -94,9 +96,7 @@ impl Restore {
"restore db {} apply wal segments: {:?}",
self.db, wal_segments
);
let connection = Connection::open(db_path)?;
let wal_file_name = format!("{}-wal", db_path);
let sql = "PRAGMA wal_checkpoint(TRUNCATE)".to_string();

for (index, offsets) in wal_segments {
let mut wal_decompressed_data = Vec::new();
Expand All @@ -113,20 +113,27 @@ impl Restore {
wal_decompressed_data.extend_from_slice(&data);
}

// prepare db wal before open db connection
let mut wal_file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&wal_file_name)?;

wal_file.write_all(&wal_decompressed_data)?;
if let Err(e) = connection.execute_batch(&sql) {
wal_file.flush()?;

let connection = Connection::open(db_path)?;

if let Err(e) = connection.query_row(WAL_CHECKPOINT_TRUNCATE, [], |_row| Ok(())) {
error!(
"truncation checkpoint failed during restore {}:{:?}",
index, offsets
);
return Err(e.into());
}

// connection.close().unwrap();
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def test_restore(p, config_file, root, exp_data):
config = FsConfigGenerator()
config.generate()

test = Test(config.root, 2000)
test = Test(config.root, 12000)
test.create_table()

bin = "/Users/codedump/source/replited/target/debug/replited"
Expand Down

0 comments on commit 0d14a9b

Please sign in to comment.