From 20fb1b474ec9f543ca10ebc67d0f03d7d6b31630 Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Wed, 27 Nov 2024 10:45:49 -0500 Subject: [PATCH] libsql: restart sync on lower frame_no from remote This commit adds a new error returned from `SyncContext` that indicates that the server returned a lower frame_no than what the local cached max server frame no expected (durable_frame_num). When the high level `Database::push` fn gets this error it will restart the sync which will pick up the new server side frame hint that we got and start sync from that frame_no. Test output: ``` 2024-11-27T15:45:27.284544Z DEBUG libsql::sync: no metadata info file found 2024-11-27T15:45:27.284603Z DEBUG push_one_frame{generation=1 frame_no=0}: libsql::sync: pushing frame 2024-11-27T15:45:27.285044Z DEBUG push_one_frame{generation=1 frame_no=0}: libsql::sync: frame successfully pushed durable_frame_num=0 2024-11-27T15:45:27.315258Z DEBUG libsql::sync: read sync metadata for db_path="/var/folders/xf/644ms45s1dj06pdbkn9ybs_80000gn/T/.tmpwvzlR0/test.db", metadata=MetadataJson { hash: 1306961641, version: 0, durable_frame_num: 3, generation: 1 } 2024-11-27T15:45:27.315287Z DEBUG push_one_frame{generation=1 frame_no=4}: libsql::sync: pushing frame 2024-11-27T15:45:27.315517Z DEBUG push_one_frame{generation=1 frame_no=4}: libsql::sync: server returned durable_frame_num lower than what we sent: sent=4, got=1 2024-11-27T15:45:27.315543Z DEBUG push_one_frame{generation=1 frame_no=2}: libsql::sync: pushing frame 2024-11-27T15:45:27.315649Z DEBUG push_one_frame{generation=1 frame_no=2}: libsql::sync: frame successfully pushed durable_frame_num=2 ``` Closes #1838 --- libsql/src/local/database.rs | 19 +++++++++ libsql/src/sync.rs | 39 +++++++++++++++++- libsql/src/sync/test.rs | 79 +++++++++++++++++++++++++++++++----- 3 files changed, 125 insertions(+), 12 deletions(-) diff --git a/libsql/src/local/database.rs b/libsql/src/local/database.rs index 4cfa5a4a63..b233196656 100644 --- a/libsql/src/local/database.rs +++ b/libsql/src/local/database.rs @@ -388,6 +388,25 @@ impl Database { #[cfg(feature = "sync")] /// Push WAL frames to remote. pub async fn push(&self) -> Result { + use crate::sync::SyncError; + use crate::Error; + + match self.try_push().await { + Ok(rep) => Ok(rep), + Err(Error::Sync(err)) => { + if let Some(SyncError::InvalidPushFrameNo(_, _)) = err.downcast_ref::() { + tracing::debug!("got InvalidPushFrameNo, retrying push"); + self.try_push().await + } else { + Err(Error::Sync(err)) + } + } + Err(e) => Err(e), + } + } + + #[cfg(feature = "sync")] + async fn try_push(&self) -> Result { let mut sync_ctx = self.sync_ctx.as_ref().unwrap().lock().await; let conn = self.connect()?; diff --git a/libsql/src/sync.rs b/libsql/src/sync.rs index 7415a38882..e72ecaa723 100644 --- a/libsql/src/sync.rs +++ b/libsql/src/sync.rs @@ -43,6 +43,8 @@ pub enum SyncError { VerifyVersion(u32, u32), #[error("failed to verify metadata file hash: expected={0}, got={1}")] VerifyHash(u32, u32), + #[error("server returned a lower frame_no: sent={0}, got={1}")] + InvalidPushFrameNo(u32, u32), } impl SyncError { @@ -91,7 +93,10 @@ impl SyncContext { }; if let Err(e) = me.read_metadata().await { - tracing::error!("failed to read sync metadata file: {}", e); + tracing::error!( + "failed to read sync metadata file, resetting back to defaults: {}", + e + ); } Ok(me) @@ -115,6 +120,30 @@ impl SyncContext { let durable_frame_num = self.push_with_retry(uri, frame, self.max_retries).await?; + if durable_frame_num > frame_no { + tracing::error!( + "server returned durable_frame_num larger than what we sent: sent={}, got={}", + frame_no, + durable_frame_num + ); + + // TODO: do we want to return an error here? + } + + if durable_frame_num < frame_no { + // Update our knowledge of where the server is at frame wise. + self.durable_frame_num = durable_frame_num; + + tracing::debug!( + "server returned durable_frame_num lower than what we sent: sent={}, got={}", + frame_no, + durable_frame_num + ); + + // Return an error and expect the caller to re-call push with the updated state. + return Err(SyncError::InvalidPushFrameNo(frame_no, durable_frame_num).into()); + } + tracing::debug!(?durable_frame_num, "frame successfully pushed"); // Update our last known max_frame_no from the server. @@ -232,6 +261,12 @@ impl SyncContext { return Err(SyncError::VerifyVersion(metadata.version, METADATA_VERSION).into()); } + tracing::debug!( + "read sync metadata for db_path={:?}, metadata={:?}", + self.db_path, + metadata + ); + self.durable_frame_num = metadata.durable_frame_num; self.generation = metadata.generation; @@ -239,7 +274,7 @@ impl SyncContext { } } -#[derive(serde::Serialize, serde::Deserialize)] +#[derive(serde::Serialize, serde::Deserialize, Debug)] struct MetadataJson { hash: u32, version: u32, diff --git a/libsql/src/sync/test.rs b/libsql/src/sync/test.rs index dba8e5bbae..aec89ef3b4 100644 --- a/libsql/src/sync/test.rs +++ b/libsql/src/sync/test.rs @@ -4,10 +4,10 @@ use std::pin::Pin; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::Arc; use std::task::{Context, Poll}; +use std::time::Duration; use tempfile::tempdir; use tokio::io::{duplex, AsyncRead, AsyncWrite, DuplexStream}; use tower::Service; -use std::time::Duration; #[tokio::test] async fn test_sync_context_push_frame() { @@ -30,10 +30,10 @@ async fn test_sync_context_push_frame() { // Push a frame and verify the response let durable_frame = sync_ctx.push_one_frame(frame, 1, 0).await.unwrap(); sync_ctx.write_metadata().await.unwrap(); - assert_eq!(durable_frame, 1); // First frame should return max_frame_no = 1 + assert_eq!(durable_frame, 0); // First frame should return max_frame_no = 0 // Verify internal state was updated - assert_eq!(sync_ctx.durable_frame_num(), 1); + assert_eq!(sync_ctx.durable_frame_num(), 0); assert_eq!(sync_ctx.generation(), 1); assert_eq!(server.frame_count(), 1); } @@ -58,7 +58,7 @@ async fn test_sync_context_with_auth() { let durable_frame = sync_ctx.push_one_frame(frame, 1, 0).await.unwrap(); sync_ctx.write_metadata().await.unwrap(); - assert_eq!(durable_frame, 1); + assert_eq!(durable_frame, 0); assert_eq!(server.frame_count(), 1); } @@ -84,8 +84,8 @@ async fn test_sync_context_multiple_frames() { let frame = Bytes::from(format!("frame data {}", i)); let durable_frame = sync_ctx.push_one_frame(frame, 1, i).await.unwrap(); sync_ctx.write_metadata().await.unwrap(); - assert_eq!(durable_frame, i + 1); - assert_eq!(sync_ctx.durable_frame_num(), i + 1); + assert_eq!(durable_frame, i); + assert_eq!(sync_ctx.durable_frame_num(), i); assert_eq!(server.frame_count(), i + 1); } } @@ -110,7 +110,7 @@ async fn test_sync_context_corrupted_metadata() { let frame = Bytes::from("test frame data"); let durable_frame = sync_ctx.push_one_frame(frame, 1, 0).await.unwrap(); sync_ctx.write_metadata().await.unwrap(); - assert_eq!(durable_frame, 1); + assert_eq!(durable_frame, 0); assert_eq!(server.frame_count(), 1); // Update metadata path to use -info instead of .meta @@ -132,11 +132,69 @@ async fn test_sync_context_corrupted_metadata() { assert_eq!(sync_ctx.generation(), 1); } +#[tokio::test] +async fn test_sync_restarts_with_lower_max_frame_no() { + let _ = tracing_subscriber::fmt::try_init(); + + let server = MockServer::start(); + let temp_dir = tempdir().unwrap(); + let db_path = temp_dir.path().join("test.db"); + + // Create initial sync context and push a frame + let sync_ctx = SyncContext::new( + server.connector(), + db_path.to_str().unwrap().to_string(), + server.url(), + None, + ) + .await + .unwrap(); + + let mut sync_ctx = sync_ctx; + let frame = Bytes::from("test frame data"); + let durable_frame = sync_ctx.push_one_frame(frame.clone(), 1, 0).await.unwrap(); + sync_ctx.write_metadata().await.unwrap(); + assert_eq!(durable_frame, 0); + assert_eq!(server.frame_count(), 1); + + // Bump the durable frame num so that the next time we call the + // server we think we are further ahead than the database we are talking to is. + sync_ctx.durable_frame_num += 3; + sync_ctx.write_metadata().await.unwrap(); + + // Create new sync context with corrupted metadata + let mut sync_ctx = SyncContext::new( + server.connector(), + db_path.to_str().unwrap().to_string(), + server.url(), + None, + ) + .await + .unwrap(); + + // Verify that the context was set to new fake values. + assert_eq!(sync_ctx.durable_frame_num(), 3); + assert_eq!(sync_ctx.generation(), 1); + + let frame_no = sync_ctx.durable_frame_num() + 1; + // This push should fail because we are ahead of the server and thus should get an invalid + // frame no error. + sync_ctx + .push_one_frame(frame.clone(), 1, frame_no) + .await + .unwrap_err(); + + let frame_no = sync_ctx.durable_frame_num() + 1; + // This then should work because when the last one failed it updated our state of the server + // durable_frame_num and we should then start writing from there. + sync_ctx.push_one_frame(frame, 1, frame_no).await.unwrap(); +} + #[tokio::test] async fn test_sync_context_retry_on_error() { // Pause time to control it manually tokio::time::pause(); - + let server = MockServer::start(); let temp_dir = tempdir().unwrap(); let db_path = temp_dir.path().join("test.db"); @@ -172,7 +230,7 @@ async fn test_sync_context_retry_on_error() { // Next attempt should succeed let durable_frame = sync_ctx.push_one_frame(frame, 1, 0).await.unwrap(); sync_ctx.write_metadata().await.unwrap(); - assert_eq!(durable_frame, 1); + assert_eq!(durable_frame, 0); assert_eq!(server.frame_count(), 1); } @@ -316,8 +374,9 @@ impl MockServer { let current_count = frame_count.fetch_add(1, Ordering::SeqCst); if req.uri().path().contains("/sync/") { + // Return the max_frame_no that has been accepted let response = serde_json::json!({ - "max_frame_no": current_count + 1 + "max_frame_no": current_count }); Ok::<_, hyper::Error>(