diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 555d412a..0878c40b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -84,11 +84,3 @@ jobs: - name: Format run: cargo fmt --all -- --check - - semver: - name: semver - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - name: Check semver - uses: obi1kenobi/cargo-semver-checks-action@v2 diff --git a/.github/workflows/crates.yml b/.github/workflows/crates.yml index 89ad7e37..67a56501 100644 --- a/.github/workflows/crates.yml +++ b/.github/workflows/crates.yml @@ -10,6 +10,14 @@ defaults: shell: bash jobs: + semver: + name: semver + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Check semver + uses: obi1kenobi/cargo-semver-checks-action@v2 + publish-crate: name: Publish crate runs-on: ubuntu-latest diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 4fb4a51f..750e27fe 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -6,6 +6,14 @@ on: workflow_dispatch: jobs: + semver: + name: semver + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Check semver + uses: obi1kenobi/cargo-semver-checks-action@v2 + release: name: Process Release runs-on: ubuntu-latest diff --git a/Cargo.lock b/Cargo.lock index 4bdf9f23..855e0159 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1212,7 +1212,7 @@ dependencies = [ [[package]] name = "surrealkv" -version = "0.5.4" +version = "0.6.0" dependencies = [ "ahash", "async-channel", @@ -1239,6 +1239,7 @@ dependencies = [ "tokio", "vart 0.7.0", "walkdir", + "wasm-bindgen-futures", ] [[package]] @@ -1367,6 +1368,18 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7ec4f8827a71586374db3e87abdb5a2bb3a15afed140221307c3ec06b1f63b" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.95" diff --git a/Cargo.toml b/Cargo.toml index 33220237..4b31ebe0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "surrealkv" publish = true -version = "0.5.4" +version = "0.6.0" edition = "2021" license = "Apache-2.0" readme = "README.md" @@ -30,6 +30,7 @@ vart = "0.7.0" revision = "0.10.0" [target.'cfg(target_arch = "wasm32")'.dependencies] getrandom = { version = "0.2.15", features = ["js"] } +wasm-bindgen-futures = "0.4.45" [dev-dependencies] tokio = { version = "1", features = ["full"] } @@ -49,7 +50,3 @@ walkdir = "2.5.0" [[bench]] name = "store_bench" harness = false - -[[bench]] -name = "load_bench" -harness = false diff --git a/src/storage/kv/store.rs b/src/storage/kv/store.rs index bbb77b1e..0721dbef 100644 --- a/src/storage/kv/store.rs +++ b/src/storage/kv/store.rs @@ -6,10 +6,6 @@ use std::vec; use async_channel::{bounded, Receiver, Sender}; use futures::{select, FutureExt}; -use tokio::{ - sync::Mutex as AsyncMutex, - task::{spawn, JoinHandle}, -}; use ahash::{HashMap, HashMapExt}; use bytes::{Bytes, BytesMut}; @@ -39,7 +35,7 @@ pub(crate) struct StoreInner { pub(crate) is_closed: AtomicBool, pub(crate) is_compacting: AtomicBool, stop_tx: Sender<()>, - task_runner_handle: Arc>>>, + done_rx: Receiver<()>, pub(crate) stats: Arc, } @@ -54,14 +50,15 @@ impl StoreInner { let (stop_tx, stop_rx) = bounded(1); let core = Arc::new(Core::new(opts, writes_tx)?); - let task_runner_handle = TaskRunner::new(core.clone(), writes_rx, stop_rx).spawn(); + let (task_runner, done_rx) = TaskRunner::new(core.clone(), writes_rx, stop_rx); + task_runner.spawn(); Ok(Self { core, stop_tx, + done_rx, is_closed: AtomicBool::new(false), is_compacting: AtomicBool::new(false), - task_runner_handle: Arc::new(AsyncMutex::new(Some(task_runner_handle))), stats: Arc::new(StorageStats::new()), }) } @@ -82,15 +79,10 @@ impl StoreInner { .await .map_err(|e| Error::SendError(format!("{}", e)))?; - // Wait for task to finish - if let Some(handle) = self.task_runner_handle.lock().await.take() { - handle.await.map_err(|e| { - Error::ReceiveError(format!( - "Error occurred while closing the kv store. JoinError: {}", - e - )) - })?; - } + // Wait for done signal + self.done_rx.recv().await.map_err(|e| { + Error::ReceiveError(format!("Error waiting for task runner to complete: {}", e)) + })?; self.core.close()?; @@ -201,36 +193,59 @@ pub(crate) struct TaskRunner { core: Arc, writes_rx: Receiver, stop_rx: Receiver<()>, + // Done channel to signal completion + done_tx: Arc>, } impl TaskRunner { - fn new(core: Arc, writes_rx: Receiver, stop_rx: Receiver<()>) -> Self { - Self { - core, - writes_rx, - stop_rx, - } - } - - fn spawn(self) -> JoinHandle<()> { - spawn(Box::pin(async move { - loop { - select! { - req = self.writes_rx.recv().fuse() => { - let task = req.unwrap(); - self.handle_task(task).await - }, - _ = self.stop_rx.recv().fuse() => { - // Consume all remaining items in writes_rx - while let Ok(task) = self.writes_rx.try_recv() { - self.handle_task(task).await; - } - drop(self); - return; - }, - } + fn new( + core: Arc, + writes_rx: Receiver, + stop_rx: Receiver<()>, + ) -> (Self, Receiver<()>) { + let (done_tx, done_rx) = bounded(1); + ( + Self { + core, + writes_rx, + stop_rx, + done_tx: Arc::new(done_tx), + }, + done_rx, + ) + } + + fn spawn(self) { + let done_tx = self.done_tx.clone(); + + #[cfg(not(target_arch = "wasm32"))] + tokio::spawn(self.run(done_tx)); + + #[cfg(target_arch = "wasm32")] + wasm_bindgen_futures::spawn_local(self.run(done_tx)); + } + + async fn run(self, done_tx: Arc>) { + loop { + select! { + req = self.writes_rx.recv().fuse() => { + match req { + Ok(task) => self.handle_task(task).await, + Err(_) => break, + } + }, + _ = self.stop_rx.recv().fuse() => { + // Consume all remaining items in writes_rx + while let Ok(task) = self.writes_rx.try_recv() { + self.handle_task(task).await; + } + break; + }, } - })) + } + + // Signal completion + let _ = done_tx.send(()).await; } async fn handle_task(&self, task: Task) { @@ -746,6 +761,7 @@ mod tests { use std::sync::Arc; use crate::storage::kv::option::Options; + use crate::storage::kv::store::Core; use crate::storage::kv::store::{Store, Task, TaskRunner}; use crate::storage::kv::transaction::Durability; use crate::storage::log::Error as LogError; @@ -930,59 +946,6 @@ mod tests { } } - #[tokio::test] - async fn stop_task_runner() { - // Create a temporary directory for testing - let temp_dir = create_temp_directory(); - - // Create store options with the test directory - let mut opts = Options::new(); - opts.dir = temp_dir.path().to_path_buf(); - - // Create a new store instance with VariableKey as the key type - let store = Store::new(opts).expect("should create store"); - - let (writes_tx, writes_rx) = bounded(100); - let (stop_tx, stop_rx) = bounded(1); - let core = &store.inner.as_ref().unwrap().core; - - let runner = TaskRunner::new(core.clone(), writes_rx, stop_rx); - let fut = runner.spawn(); - - // Send some tasks - let task_counter = Arc::new(AtomicU64::new(0)); - for i in 0..100 { - let (done_tx, done_rx) = bounded(1); - writes_tx - .send(Task { - entries: vec![], - done: Some(done_tx), - tx_id: i, - durability: Durability::default(), - }) - .await - .unwrap(); - - let task_counter = Arc::clone(&task_counter); - tokio::spawn(async move { - done_rx.recv().await.unwrap().unwrap(); - task_counter.fetch_add(1, Ordering::SeqCst); - }); - } - - // Send stop signal - stop_tx.send(()).await.unwrap(); - - // Wait for a while to let TaskRunner handle all tasks by waiting on done_rx - fut.await.expect("TaskRunner should finish"); - - // Wait for the spawned tokio thread to finish - tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; - - // Check if all tasks were handled - assert_eq!(task_counter.load(Ordering::SeqCst), 100); - } - async fn concurrent_task(store: Arc) { let mut txn = store.begin().unwrap(); txn.set(b"dummy key", b"dummy value").unwrap(); @@ -1840,4 +1803,111 @@ mod tests { } } } + + #[tokio::test] + async fn stop_task_runner_with_pending_tasks() { + // Create a temporary directory for testing + let temp_dir = create_temp_directory(); + + // Create store options with the test directory + let mut opts = Options::new(); + opts.dir = temp_dir.path().to_path_buf(); + + let (writes_tx, writes_rx) = bounded(100); + let (stop_tx, stop_rx) = bounded(1); + let core = Arc::new(Core::new(opts, writes_tx.clone()).unwrap()); + + let (runner, done_rx) = TaskRunner::new(core.clone(), writes_rx, stop_rx); + runner.spawn(); + + // Create a task that will take some time to process + let (slow_done_tx, slow_done_rx) = bounded(1); + let slow_task = Task { + entries: vec![], + done: Some(slow_done_tx), + tx_id: 1, + durability: Durability::default(), + }; + + // Send the slow task + writes_tx.send(slow_task).await.unwrap(); + + // Send stop signal immediately + stop_tx.send(()).await.unwrap(); + + // Wait for TaskRunner to finish + done_rx + .recv() + .await + .expect("TaskRunner should signal completion"); + + // Verify the slow task was completed + assert!(slow_done_rx.recv().await.is_ok()); + } + + #[tokio::test] + async fn stop_task_runner_concurrent_tasks() { + // Create a temporary directory for testing + let temp_dir = create_temp_directory(); + + // Create store options with the test directory + let mut opts = Options::new(); + opts.dir = temp_dir.path().to_path_buf(); + + // Create a new store instance + let store = Store::new(opts).expect("should create store"); + + let (writes_tx, writes_rx) = bounded(1000); // Increased buffer size + let (stop_tx, stop_rx) = bounded(1); + let core = &store.inner.as_ref().unwrap().core; + + let (runner, finish_rx) = TaskRunner::new(core.clone(), writes_rx, stop_rx); + runner.spawn(); + + let task_counter = Arc::new(AtomicU64::new(0)); + let total_tasks = 1000; + + // First, send all tasks before stopping + for i in 0..total_tasks { + let (done_tx, done_rx) = bounded(1); + writes_tx + .send(Task { + entries: vec![], + done: Some(done_tx), + tx_id: i, + durability: Durability::default(), + }) + .await + .expect("should send task"); + + let task_counter = task_counter.clone(); + tokio::spawn(async move { + done_rx.recv().await.unwrap().unwrap(); + task_counter.fetch_add(1, Ordering::SeqCst); + }); + } + + // Give some time for tasks to be processed + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + + // Send stop signal + stop_tx.send(()).await.expect("should send stop signal"); + + // Wait for TaskRunner to finish + finish_rx + .recv() + .await + .expect("TaskRunner should signal completion"); + + // Give some time for all task counters to be updated + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + + // Check that all tasks that were sent were processed + let final_count = task_counter.load(Ordering::SeqCst); + assert_eq!( + final_count, total_tasks, + "Expected {} tasks to be processed, but got {}", + total_tasks, final_count + ); + } }