diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 7f7ed5dc..cd8393ad 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -143,7 +143,7 @@ jobs: - uses: taiki-e/install-action@cross - name: test - run: cross test --all --target ${{ matrix.target }} -- --test-threads=4 + run: cross test --all --target ${{ matrix.target }} -- --test-threads=1 env: RUST_LOG: ${{ runner.debug && 'TRACE' || 'DEBUG' }} diff --git a/Cargo.lock b/Cargo.lock index ef286bbb..b966614a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1658,9 +1658,8 @@ dependencies = [ [[package]] name = "iroh" -version = "0.91.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef05c956df0788a649d65c33fdbbb8fc4442d7716af3d67a1bd6d00a9ee56ead" +version = "0.91.1" +source = "git+https://github.com/n0-computer/iroh?branch=main#e30c788f968265bd9d181e5ca92d02eb61ef3d0d" dependencies = [ "aead", "backon", @@ -1720,9 +1719,8 @@ dependencies = [ [[package]] name = "iroh-base" -version = "0.91.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f68b5c5e190d8965699b2fd583f301a7e6094a0b89bb4d6c5baa94761fd1b7a3" +version = "0.91.1" +source = "git+https://github.com/n0-computer/iroh?branch=main#e30c788f968265bd9d181e5ca92d02eb61ef3d0d" dependencies = [ "curve25519-dalek", "data-encoding", @@ -1883,9 +1881,8 @@ dependencies = [ [[package]] name = "iroh-relay" -version = "0.91.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49596b5079817d0904fe4985307f532a4e23a33eb494bd680baaf2743f0c456b" +version = "0.91.1" +source = "git+https://github.com/n0-computer/iroh?branch=main#e30c788f968265bd9d181e5ca92d02eb61ef3d0d" dependencies = [ "blake3", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 8b3c95cc..726aaf78 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,10 +37,10 @@ chrono = "0.4.39" nested_enum_utils = "0.2.1" ref-cast = "1.0.24" arrayvec = "0.7.6" -iroh = "0.91" +iroh = "0.91.1" self_cell = "1.1.0" genawaiter = { version = "0.99.1", features = ["futures03"] } -iroh-base = "0.91" +iroh-base = "0.91.1" reflink-copy = "0.1.24" irpc = { version = "0.7.0", features = ["rpc", "quinn_endpoint_setup", "spans", "stream", "derive"], default-features = false } iroh-metrics = { version = "0.35" } @@ -59,9 +59,13 @@ tracing-subscriber = { version = "0.3.19", features = ["fmt"] } tracing-test = "0.2.5" walkdir = "2.5.0" atomic_refcell = "0.1.13" -iroh = { version = "0.91", features = ["discovery-local-network"]} +iroh = { version = "0.91.1", features = ["discovery-local-network"] } [features] hide-proto-docs = [] metrics = [] default = ["hide-proto-docs"] + +[patch.crates-io] +iroh = { git = "https://github.com/n0-computer/iroh", branch = "main" } +iroh-base = { git = "https://github.com/n0-computer/iroh", branch = "main" } diff --git a/deny.toml b/deny.toml index bb2a4118..b1e890e5 100644 --- a/deny.toml +++ b/deny.toml @@ -39,3 +39,6 @@ name = "ring" [[licenses.clarify.license-files]] hash = 3171872035 path = "LICENSE" + +[sources] +allow-git = ["https://github.com/n0-computer/iroh"] diff --git a/src/api.rs b/src/api.rs index 29b25f58..a2a34a2d 100644 --- a/src/api.rs +++ b/src/api.rs @@ -30,6 +30,7 @@ pub mod downloader; pub mod proto; pub mod remote; pub mod tags; +use crate::api::proto::WaitIdleRequest; pub use crate::{store::util::Tag, util::temp_tag::TempTag}; pub(crate) type ApiClient = irpc::Client; @@ -298,6 +299,23 @@ impl Store { Ok(()) } + /// Waits for the store to become completely idle. + /// + /// This is mostly useful for tests, where you want to check that e.g. the + /// store has written all data to disk. + /// + /// Note that a store is not guaranteed to become idle, if it is being + /// interacted with concurrently. So this might wait forever. + /// + /// Also note that once you get the callback, the store is not guaranteed to + /// still be idle. All this tells you that there was a point in time where + /// the store was idle between the call and the response. + pub async fn wait_idle(&self) -> irpc::Result<()> { + let msg = WaitIdleRequest; + self.client.rpc(msg).await?; + Ok(()) + } + pub(crate) fn from_sender(client: ApiClient) -> Self { Self { client } } diff --git a/src/api/proto.rs b/src/api/proto.rs index e5d77cc3..8b3780bd 100644 --- a/src/api/proto.rs +++ b/src/api/proto.rs @@ -130,11 +130,16 @@ pub enum Request { #[rpc(tx = oneshot::Sender>)] SyncDb(SyncDbRequest), #[rpc(tx = oneshot::Sender<()>)] + WaitIdle(WaitIdleRequest), + #[rpc(tx = oneshot::Sender<()>)] Shutdown(ShutdownRequest), #[rpc(tx = oneshot::Sender>)] ClearProtected(ClearProtectedRequest), } +#[derive(Debug, Serialize, Deserialize)] +pub struct WaitIdleRequest; + #[derive(Debug, Serialize, Deserialize)] pub struct SyncDbRequest; diff --git a/src/api/remote.rs b/src/api/remote.rs index 9b010f69..47c3eea2 100644 --- a/src/api/remote.rs +++ b/src/api/remote.rs @@ -1064,8 +1064,15 @@ mod tests { use testresult::TestResult; use crate::{ + api::blobs::Blobs, protocol::{ChunkRangesSeq, GetRequest}, - store::fs::{tests::INTERESTING_SIZES, FsStore}, + store::{ + fs::{ + tests::{create_n0_bao, test_data, INTERESTING_SIZES}, + FsStore, + }, + mem::MemStore, + }, tests::{add_test_hash_seq, add_test_hash_seq_incomplete}, util::ChunkRangesExt, }; @@ -1117,6 +1124,38 @@ mod tests { Ok(()) } + async fn test_observe_partial(blobs: &Blobs) -> TestResult<()> { + let sizes = INTERESTING_SIZES; + for size in sizes { + let data = test_data(size); + let ranges = ChunkRanges::chunk(0); + let (hash, bao) = create_n0_bao(&data, &ranges)?; + blobs.import_bao_bytes(hash, ranges.clone(), bao).await?; + let bitfield = blobs.observe(hash).await?; + if size > 1024 { + assert_eq!(bitfield.ranges, ranges); + } else { + assert_eq!(bitfield.ranges, ChunkRanges::all()); + } + } + Ok(()) + } + + #[tokio::test] + async fn test_observe_partial_mem() -> TestResult<()> { + let store = MemStore::new(); + test_observe_partial(store.blobs()).await?; + Ok(()) + } + + #[tokio::test] + async fn test_observe_partial_fs() -> TestResult<()> { + let td = tempfile::tempdir()?; + let store = FsStore::load(td.path()).await?; + test_observe_partial(store.blobs()).await?; + Ok(()) + } + #[tokio::test] async fn test_local_info_hash_seq() -> TestResult<()> { let sizes = INTERESTING_SIZES; diff --git a/src/store/fs.rs b/src/store/fs.rs index 159784f3..ad1cb5e0 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -64,22 +64,28 @@ //! safely shut down as well. Any store refs you are holding will be inoperable //! after this. use std::{ - fmt, fs, + fmt::{self, Debug}, + fs, future::Future, io::Write, num::NonZeroU64, ops::Deref, path::{Path, PathBuf}, - sync::Arc, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, }; use bao_tree::{ + blake3, io::{ mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt}, - sync::ReadAt, + outboard::PreOrderOutboard, + sync::{Outboard, ReadAt}, BaoContentItem, Leaf, }, - ChunkNum, ChunkRanges, + BaoTree, ChunkNum, ChunkRanges, }; use bytes::Bytes; use delete_set::{BaoFilePart, ProtectHandle}; @@ -88,7 +94,6 @@ use entry_state::{DataLocation, OutboardLocation}; use gc::run_gc; use import::{ImportEntry, ImportSource}; use irpc::channel::mpsc; -use meta::list_blobs; use n0_future::{future::yield_now, io}; use nested_enum_utils::enum_conversions; use range_collections::range_set::RangeSetRange; @@ -98,17 +103,24 @@ use tracing::{error, instrument, trace}; use crate::{ api::{ proto::{ - self, bitfield::is_validated, BatchMsg, BatchResponse, Bitfield, Command, - CreateTempTagMsg, ExportBaoMsg, ExportBaoRequest, ExportPathMsg, ExportPathRequest, - ExportRangesItem, ExportRangesMsg, ExportRangesRequest, HashSpecific, ImportBaoMsg, - ImportBaoRequest, ObserveMsg, Scope, + self, bitfield::is_validated, BatchMsg, BatchResponse, Bitfield, BlobStatusMsg, + Command, CreateTagMsg, CreateTempTagMsg, DeleteBlobsMsg, DeleteTagsMsg, ExportBaoMsg, + ExportBaoRequest, ExportPathMsg, ExportPathRequest, ExportRangesItem, ExportRangesMsg, + ExportRangesRequest, HashSpecific, ImportBaoMsg, ImportBaoRequest, ListBlobsMsg, + ListTagsMsg, ObserveMsg, RenameTagMsg, Scope, SetTagMsg, ShutdownMsg, SyncDbMsg, }, ApiClient, }, store::{ - fs::util::entity_manager::{self, ActiveEntityState}, + fs::{ + bao_file::{ + BaoFileStorage, BaoFileStorageSubscriber, CompleteStorage, DataReader, + OutboardReader, + }, + util::entity_manager::{self, EntityState}, + }, util::{BaoTreeSender, FixedSize, MemOrFile, ValueOrPoisioned}, - Hash, + Hash, IROH_BLOCK_SIZE, }, util::{ channel::oneshot, @@ -151,14 +163,17 @@ fn temp_name() -> String { format!("{}.temp", hex::encode(new_uuid())) } -#[derive(Debug)] +#[derive(derive_more::Debug)] #[enum_conversions()] pub(crate) enum InternalCommand { Dump(meta::Dump), FinishImport(ImportEntryMsg), ClearScope(ClearScope), + Spawn(#[debug(skip)] Spawn), } +type Spawn = Box n0_future::future::Boxed<()> + Send + Sync + 'static>; + #[derive(Debug)] pub(crate) struct ClearScope { pub scope: Scope, @@ -167,6 +182,7 @@ pub(crate) struct ClearScope { impl InternalCommand { pub fn parent_span(&self) -> tracing::Span { match self { + Self::Spawn(_) => tracing::Span::current(), Self::Dump(_) => tracing::Span::current(), Self::ClearScope(_) => tracing::Span::current(), Self::FinishImport(cmd) => cmd @@ -186,8 +202,6 @@ struct TaskContext { pub db: meta::Db, // Handle to send internal commands pub internal_cmd_tx: tokio::sync::mpsc::Sender, - /// The file handle for the empty hash. - pub empty: BaoFileHandle, /// Handle to protect files from deletion. pub protect: ProtectHandle, } @@ -201,31 +215,59 @@ impl TaskContext { } } -impl entity_manager::Reset for Slot { - fn reset(&mut self) { - self.0 = Arc::new(tokio::sync::Mutex::new(None)); - } -} +impl entity_manager::EntityState for HashContext { + type Id = Hash; -#[derive(Debug)] -struct EmParams; + type GlobalState = Arc; -impl entity_manager::Params for EmParams { - type EntityId = Hash; + fn id(&self) -> &Self::Id { + &self.id + } - type GlobalState = Arc; + fn global(&self) -> &Self::GlobalState { + &self.global + } - type EntityState = Slot; + fn ref_count(&self) -> usize { + self.state.sender_count() + self.state.receiver_count() + } - async fn on_shutdown( - state: entity_manager::ActiveEntityState, - cause: entity_manager::ShutdownCause, - ) { - if let Some(mut handle) = state.state.0.lock().await.take() { - trace!("shutting down hash: {}, cause: {cause:?}", state.id); - handle.persist(&state); + fn new(id: &Self::Id, global: &Self::GlobalState) -> Self { + Self { + id: *id, + global: global.clone(), + state: BaoFileHandle::default(), } } + + fn reset(&mut self, id: &Self::Id, global: &Self::GlobalState) { + self.id = *id; + self.global = global.clone(); + // this is identical to self.state = BaoFileHandle::default(), + // but does not allocate a new handle. + self.state.send_replace(BaoFileStorage::Initial); + } + + #[instrument(skip_all, fields(hash = %self.id.fmt_short()))] + async fn on_shutdown(&self, _cause: entity_manager::ShutdownCause) { + self.state.send_if_modified(|guard| { + let hash = &self.id; + let BaoFileStorage::Partial(fs) = guard.take() else { + return false; + }; + let path = self.global.options.path.bitfield_path(hash); + trace!("writing bitfield for hash {} to {}", hash, path.display()); + if let Err(cause) = fs.sync_all(&path) { + error!( + "failed to write bitfield for {} at {}: {:?}", + hash, + path.display(), + cause + ); + } + false + }); + } } #[derive(Debug)] @@ -239,17 +281,171 @@ struct Actor { // Tasks for import and export operations. tasks: JoinSet<()>, // Entity manager that handles concurrency for entities. - handles: EntityManagerState, + handles: EntityManagerState, // temp tags temp_tags: TempTags, + // waiters for idle state. + idle_waiters: Vec>, // our private tokio runtime. It has to live somewhere. _rt: RtWrapper, } -type HashContext = ActiveEntityState; +#[derive(Debug, Clone)] +struct HashContext { + id: Hash, + global: Arc, + state: BaoFileHandle, +} + +impl SyncEntityApi for HashContext { + /// Load the state from the database. + /// + /// If the state is Initial, this will start the load. + /// If it is Loading, it will wait until loading is done. + /// If it is any other state, it will be a noop. + async fn load(&self) { + enum Action { + Load, + Wait, + None, + } + let mut action = Action::None; + self.state.send_if_modified(|guard| match guard.deref() { + BaoFileStorage::Initial => { + *guard = BaoFileStorage::Loading; + action = Action::Load; + true + } + BaoFileStorage::Loading => { + action = Action::Wait; + false + } + _ => false, + }); + match action { + Action::Load => { + let state = if self.id == Hash::EMPTY { + BaoFileStorage::Complete(CompleteStorage { + data: MemOrFile::Mem(Bytes::new()), + outboard: MemOrFile::empty(), + }) + } else { + // we must assign a new state even in the error case, otherwise + // tasks waiting for loading would stall! + match self.global.db.get(self.id).await { + Ok(state) => match BaoFileStorage::open(state, self).await { + Ok(handle) => handle, + Err(_) => BaoFileStorage::Poisoned, + }, + Err(_) => BaoFileStorage::Poisoned, + } + }; + self.state.send_replace(state); + } + Action::Wait => { + // we are in state loading already, so we just need to wait for the + // other task to complete loading. + while matches!(self.state.borrow().deref(), BaoFileStorage::Loading) { + self.state.0.subscribe().changed().await.ok(); + } + } + Action::None => {} + } + } + + /// Write a batch and notify the db + async fn write_batch(&self, batch: &[BaoContentItem], bitfield: &Bitfield) -> io::Result<()> { + trace!("write_batch bitfield={:?} batch={}", bitfield, batch.len()); + let mut res = Ok(None); + self.state.send_if_modified(|state| { + let Ok((state1, update)) = state.take().write_batch(batch, bitfield, self) else { + res = Err(io::Error::other("write batch failed")); + return false; + }; + res = Ok(update); + *state = state1; + true + }); + if let Some(update) = res? { + self.global.db.update(self.id, update).await?; + } + Ok(()) + } + + /// An AsyncSliceReader for the data file. + /// + /// Caution: this is a reader for the unvalidated data file. Reading this + /// can produce data that does not match the hash. + #[allow(refining_impl_trait_internal)] + fn data(&self) -> DataReader { + DataReader(self.state.clone()) + } + + fn outboard(&self) -> io::Result { + let tree = BaoTree::new(self.current_size()?, IROH_BLOCK_SIZE); + let outboard = OutboardReader(self.state.clone()); + Ok(PreOrderOutboard { + root: blake3::Hash::from(*self.id()), + tree, + data: outboard, + }) + } + + /// The most precise known total size of the data file. + fn bitfield(&self) -> io::Result { + match self.state.borrow().deref() { + BaoFileStorage::Complete(mem) => Ok(mem.bitfield()), + BaoFileStorage::PartialMem(mem) => Ok(mem.bitfield().clone()), + BaoFileStorage::Partial(file) => Ok(file.bitfield().clone()), + BaoFileStorage::Poisoned => Err(io::Error::other("poisoned storage")), + BaoFileStorage::Initial => Err(io::Error::other("initial")), + BaoFileStorage::Loading => Err(io::Error::other("loading")), + BaoFileStorage::NonExisting => Err(io::ErrorKind::NotFound.into()), + } + } + + #[instrument(skip_all, fields(hash = %cmd.hash_short()))] + async fn observe(&self, cmd: ObserveMsg) { + trace!("{cmd:?}"); + self.load().await; + BaoFileStorageSubscriber::new(self.state.subscribe()) + .forward(cmd.tx) + .await + .ok(); + } + + #[instrument(skip_all, fields(hash = %cmd.hash_short()))] + async fn finish_import(&self, cmd: ImportEntryMsg, mut tt: TempTag) { + trace!("{cmd:?}"); + self.load().await; + let res = match finish_import_impl(self, cmd.inner).await { + Ok(()) => { + // for a remote call, we can't have the on_drop callback, so we have to leak the temp tag + // it will be cleaned up when either the process exits or scope ends + if cmd.tx.is_rpc() { + trace!("leaking temp tag {}", tt.hash_and_format()); + tt.leak(); + } + AddProgressItem::Done(tt) + } + Err(cause) => AddProgressItem::Error(cause), + }; + cmd.tx.send(res).await.ok(); + } + + #[instrument(skip_all, fields(hash = %cmd.hash_short()))] + async fn export_path(&self, cmd: ExportPathMsg) { + trace!("{cmd:?}"); + self.load().await; + let ExportPathMsg { inner, mut tx, .. } = cmd; + if let Err(cause) = export_path_impl(self, inner, &mut tx).await { + tx.send(cause.into()).await.ok(); + } + } +} impl HashContext { - pub fn db(&self) -> &meta::Db { + fn db(&self) -> &meta::Db { &self.global.db } @@ -257,21 +453,18 @@ impl HashContext { &self.global.options } - pub async fn lock(&self) -> tokio::sync::MutexGuard<'_, Option> { - self.state.0.lock().await - } - - pub fn protect(&self, hash: Hash, parts: impl IntoIterator) { - self.global.protect.protect(hash, parts); + pub fn protect(&self, parts: impl IntoIterator) { + self.global.protect.protect(self.id, parts); } /// Update the entry state in the database, and wait for completion. - pub async fn update_await(&self, hash: Hash, state: EntryState) -> io::Result<()> { - self.db().update_await(hash, state).await?; + pub async fn update_await(&self, state: EntryState) -> io::Result<()> { + self.db().update_await(self.id, state).await?; Ok(()) } - pub async fn get_entry_state(&self, hash: Hash) -> io::Result>> { + pub async fn get_entry_state(&self) -> io::Result>> { + let hash = self.id; if hash == Hash::EMPTY { return Ok(Some(EntryState::Complete { data_location: DataLocation::Inline(Bytes::new()), @@ -282,115 +475,21 @@ impl HashContext { } /// Update the entry state in the database, and wait for completion. - pub async fn set(&self, hash: Hash, state: EntryState) -> io::Result<()> { - self.db().set(hash, state).await - } - - pub async fn get(&self, hash: Hash) -> api::Result { - if hash == Hash::EMPTY { - return Ok(self.global.empty.clone()); - } - let res = self - .state - .get_or_create(|| async { - let res = self.db().get(hash).await.map_err(io::Error::other)?; - let res = match res { - Some(state) => open_bao_file(state, self).await, - None => Err(io::Error::new(io::ErrorKind::NotFound, "hash not found")), - }; - Ok((res?, ())) - }) - .await - .map_err(api::Error::from); - let (res, _) = res?; - Ok(res) - } - - pub async fn get_or_create(&self, hash: Hash) -> api::Result { - if hash == Hash::EMPTY { - return Ok(self.global.empty.clone()); - } - let res = self - .state - .get_or_create(|| async { - let res = self.db().get(hash).await.map_err(io::Error::other)?; - let res = match res { - Some(state) => open_bao_file(state, self).await, - None => Ok(BaoFileHandle::new_partial_mem()), - }; - Ok((res?, ())) - }) - .await - .map_err(api::Error::from); - trace!("{res:?}"); - let (res, _) = res?; - Ok(res) + pub async fn set(&self, state: EntryState) -> io::Result<()> { + self.db().set(self.id, state).await } -} - -async fn open_bao_file(state: EntryState, ctx: &HashContext) -> io::Result { - let hash = &ctx.id; - let options = &ctx.global.options; - Ok(match state { - EntryState::Complete { - data_location, - outboard_location, - } => { - let data = match data_location { - DataLocation::Inline(data) => MemOrFile::Mem(data), - DataLocation::Owned(size) => { - let path = options.path.data_path(hash); - let file = fs::File::open(&path)?; - MemOrFile::File(FixedSize::new(file, size)) - } - DataLocation::External(paths, size) => { - let Some(path) = paths.into_iter().next() else { - return Err(io::Error::other("no external data path")); - }; - let file = fs::File::open(&path)?; - MemOrFile::File(FixedSize::new(file, size)) - } - }; - let outboard = match outboard_location { - OutboardLocation::NotNeeded => MemOrFile::empty(), - OutboardLocation::Inline(data) => MemOrFile::Mem(data), - OutboardLocation::Owned => { - let path = options.path.outboard_path(hash); - let file = fs::File::open(&path)?; - MemOrFile::File(file) - } - }; - BaoFileHandle::new_complete(data, outboard) - } - EntryState::Partial { .. } => BaoFileHandle::new_partial_file(ctx).await?, - }) -} - -/// An entry for each hash, containing a weak reference to a BaoFileHandle -/// wrapped in a tokio mutex so handle creation is sequential. -#[derive(Debug, Clone, Default)] -pub(crate) struct Slot(Arc>>); -impl Slot { - /// Get the handle if it exists and is still alive, otherwise load it from the database. - /// If there is nothing in the database, create a new in-memory handle. - /// - /// `make` will be called if the a live handle does not exist. - pub async fn get_or_create(&self, make: F) -> io::Result<(BaoFileHandle, T)> - where - F: FnOnce() -> Fut, - Fut: std::future::Future>, - T: Default, - { - let mut slot = self.0.lock().await; - if let Some(handle) = &*slot { - return Ok((handle.clone(), Default::default())); + /// The most precise known total size of the data file. + fn current_size(&self) -> io::Result { + match self.state.borrow().deref() { + BaoFileStorage::Complete(mem) => Ok(mem.size()), + BaoFileStorage::PartialMem(mem) => Ok(mem.current_size()), + BaoFileStorage::Partial(file) => file.current_size(), + BaoFileStorage::Poisoned => Err(io::Error::other("poisoned storage")), + BaoFileStorage::Initial => Err(io::Error::other("initial")), + BaoFileStorage::Loading => Err(io::Error::other("loading")), + BaoFileStorage::NonExisting => Err(io::ErrorKind::NotFound.into()), } - let handle = make().await; - if let Ok((handle, _)) = &handle { - *slot = Some(handle.clone()); - } - handle } } @@ -434,6 +533,16 @@ impl Actor { trace!("{cmd:?}"); self.db().send(cmd.into()).await.ok(); } + Command::WaitIdle(cmd) => { + trace!("{cmd:?}"); + if self.tasks.is_empty() { + // we are currently idle + cmd.tx.send(()).await.ok(); + } else { + // wait for idle state + self.idle_waiters.push(cmd.tx); + } + } Command::Shutdown(cmd) => { trace!("{cmd:?}"); self.db().send(cmd.into()).await.ok(); @@ -472,9 +581,7 @@ impl Actor { } Command::ListBlobs(cmd) => { trace!("{cmd:?}"); - if let Ok(snapshot) = self.db().snapshot(cmd.span.clone()).await { - self.spawn(list_blobs(snapshot, cmd)); - } + self.db().send(cmd.into()).await.ok(); } Command::Batch(cmd) => { trace!("{cmd:?}"); @@ -555,6 +662,9 @@ impl Actor { (tt, cmd).spawn(&mut self.handles, &mut self.tasks).await; } } + InternalCommand::Spawn(spawn) => { + self.spawn(spawn()); + } } } @@ -577,6 +687,11 @@ impl Actor { } Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => { Self::log_task_result(res); + if self.tasks.is_empty() { + for tx in self.idle_waiters.drain(..) { + tx.send(()).await.ok(); + } + } } } } @@ -616,7 +731,6 @@ impl Actor { options: options.clone(), db: meta::Db::new(db_send), internal_cmd_tx: fs_commands_tx, - empty: BaoFileHandle::new_complete(MemOrFile::empty(), MemOrFile::empty()), protect, }); rt.spawn(db_actor.run()); @@ -627,42 +741,45 @@ impl Actor { tasks: JoinSet::new(), handles: EntityManagerState::new(slot_context, 1024, 32, 32, 2), temp_tags: Default::default(), + idle_waiters: Vec::new(), _rt: rt, }) } } trait HashSpecificCommand: HashSpecific + Send + 'static { + /// Handle the command on success by spawning a task into the per-hash context. fn handle(self, ctx: HashContext) -> impl Future + Send + 'static; - fn on_error(self) -> impl Future + Send + 'static; + /// Opportunity to send an error if spawning fails due to the task being busy (inbox full) + /// or dead (e.g. panic in one of the running tasks). + fn on_error(self, arg: SpawnArg) -> impl Future + Send + 'static; async fn spawn( self, - manager: &mut entity_manager::EntityManagerState, + manager: &mut entity_manager::EntityManagerState, tasks: &mut JoinSet<()>, ) where Self: Sized, { + let span = tracing::Span::current(); let task = manager - .spawn_boxed( - self.hash(), - Box::new(|x| { - Box::pin(async move { - match x { - SpawnArg::Active(state) => { - self.handle(state).await; - } - SpawnArg::Busy => { - self.on_error().await; - } - SpawnArg::Dead => { - self.on_error().await; - } + .spawn(self.hash(), |arg| { + async move { + match arg { + SpawnArg::Active(state) => { + self.handle(state).await; } - }) - }), - ) + SpawnArg::Busy => { + self.on_error(arg).await; + } + SpawnArg::Dead => { + self.on_error(arg).await; + } + } + } + .instrument(span) + }) .await; if let Some(task) = task { tasks.spawn(task); @@ -672,33 +789,70 @@ trait HashSpecificCommand: HashSpecific + Send + 'static { impl HashSpecificCommand for ObserveMsg { async fn handle(self, ctx: HashContext) { - observe(self, ctx).await + EntityApi::observe(&ctx, self).await } - async fn on_error(self) {} + async fn on_error(self, _arg: SpawnArg) {} } impl HashSpecificCommand for ExportPathMsg { async fn handle(self, ctx: HashContext) { - export_path(self, ctx).await + EntityApi::export_path(&ctx, self).await + } + async fn on_error(self, arg: SpawnArg) { + let err = match arg { + SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(), + SpawnArg::Dead => io::Error::other("entity is dead"), + _ => unreachable!(), + }; + self.tx + .send(ExportProgressItem::Error(api::Error::Io(err))) + .await + .ok(); } - async fn on_error(self) {} } impl HashSpecificCommand for ExportBaoMsg { async fn handle(self, ctx: HashContext) { - export_bao(self, ctx).await + ctx.export_bao(self).await + } + async fn on_error(self, arg: SpawnArg) { + let err = match arg { + SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(), + SpawnArg::Dead => io::Error::other("entity is dead"), + _ => unreachable!(), + }; + self.tx + .send(EncodedItem::Error(bao_tree::io::EncodeError::Io(err))) + .await + .ok(); } - async fn on_error(self) {} } impl HashSpecificCommand for ExportRangesMsg { async fn handle(self, ctx: HashContext) { - export_ranges(self, ctx).await + ctx.export_ranges(self).await + } + async fn on_error(self, arg: SpawnArg) { + let err = match arg { + SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(), + SpawnArg::Dead => io::Error::other("entity is dead"), + _ => unreachable!(), + }; + self.tx + .send(ExportRangesItem::Error(api::Error::Io(err))) + .await + .ok(); } - async fn on_error(self) {} } impl HashSpecificCommand for ImportBaoMsg { async fn handle(self, ctx: HashContext) { - import_bao(self, ctx).await + ctx.import_bao(self).await + } + async fn on_error(self, arg: SpawnArg) { + let err = match arg { + SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(), + SpawnArg::Dead => io::Error::other("entity is dead"), + _ => unreachable!(), + }; + self.tx.send(Err(api::Error::Io(err))).await.ok(); } - async fn on_error(self) {} } impl HashSpecific for (TempTag, ImportEntryMsg) { fn hash(&self) -> Hash { @@ -708,9 +862,16 @@ impl HashSpecific for (TempTag, ImportEntryMsg) { impl HashSpecificCommand for (TempTag, ImportEntryMsg) { async fn handle(self, ctx: HashContext) { let (tt, cmd) = self; - finish_import(cmd, tt, ctx).await + EntityApi::finish_import(&ctx, cmd, tt).await + } + async fn on_error(self, arg: SpawnArg) { + let err = match arg { + SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(), + SpawnArg::Dead => io::Error::other("entity is dead"), + _ => unreachable!(), + }; + self.1.tx.send(AddProgressItem::Error(err)).await.ok(); } - async fn on_error(self) {} } struct RtWrapper(Option); @@ -767,24 +928,131 @@ async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc) Ok(()) } -#[instrument(skip_all, fields(hash = %cmd.hash_short()))] -async fn finish_import(cmd: ImportEntryMsg, mut tt: TempTag, ctx: HashContext) { - let res = match finish_import_impl(cmd.inner, ctx).await { - Ok(()) => { - // for a remote call, we can't have the on_drop callback, so we have to leak the temp tag - // it will be cleaned up when either the process exits or scope ends - if cmd.tx.is_rpc() { - trace!("leaking temp tag {}", tt.hash_and_format()); - tt.leak(); - } - AddProgressItem::Done(tt) +/// The minimal API you need to implement for an entity for a store to work. +trait EntityApi { + /// Import from a stream of n0 bao encoded data. + async fn import_bao(&self, cmd: ImportBaoMsg); + /// Finish an import from a local file or memory. + async fn finish_import(&self, cmd: ImportEntryMsg, tt: TempTag); + /// Observe the bitfield of the entry. + async fn observe(&self, cmd: ObserveMsg); + /// Export byte ranges of the entry as data + async fn export_ranges(&self, cmd: ExportRangesMsg); + /// Export chunk ranges of the entry as a n0 bao encoded stream. + async fn export_bao(&self, cmd: ExportBaoMsg); + /// Export the entry to a local file. + async fn export_path(&self, cmd: ExportPathMsg); +} + +#[derive(Debug)] +#[enum_conversions()] +pub enum GlobalCmd { + // tag related commands + ListTags(ListTagsMsg), + CreateTag(CreateTagMsg), + SetTag(SetTagMsg), + DeleteTags(DeleteTagsMsg), + RenameTag(RenameTagMsg), + + // blob related commands + DeleteBlobs(DeleteBlobsMsg), + ListBlobs(ListBlobsMsg), + BlobStatus(BlobStatusMsg), + + // global commands + SyncDb(SyncDbMsg), + Shutdown(ShutdownMsg), +} + +/// A more opinionated API that can be used as a helper to save implementation +/// effort when implementing the EntityApi trait. +trait SyncEntityApi: EntityState { + /// Load the entry state from the database. This must make sure that it is + /// not run concurrently, so if load is called multiple times, all but one + /// must wait. You can use a tokio::sync::OnceCell or similar to achieve this. + async fn load(&self); + + /// Get a synchronous reader for the data file. + fn data(&self) -> impl ReadBytesAt; + + /// The outboard for the file. + fn outboard(&self) -> io::Result; + + /// Get the bitfield of the entry. + fn bitfield(&self) -> io::Result; + + /// Write a batch of content items to the entry. + async fn write_batch(&self, batch: &[BaoContentItem], bitfield: &Bitfield) -> io::Result<()>; + + /// Observe the entry for changes. + async fn observe(&self, cmd: ObserveMsg); + + async fn export_path(&self, cmd: ExportPathMsg); + async fn finish_import(&self, cmd: ImportEntryMsg, tt: TempTag); +} + +/// The high level entry point per entry. +impl EntityApi for T { + #[instrument(skip_all, fields(hash = %cmd.hash_short()))] + fn import_bao(&self, cmd: ImportBaoMsg) -> impl Future { + async move { + trace!("{cmd:?}"); + self.load().await; + let ImportBaoMsg { + inner: ImportBaoRequest { size, .. }, + rx, + tx, + .. + } = cmd; + let res = import_bao_impl(self, size, rx).await; + trace!("{res:?}"); + tx.send(res).await.ok(); + } + } + + #[instrument(skip_all, fields(hash = %cmd.hash_short()))] + async fn export_ranges(&self, mut cmd: ExportRangesMsg) { + trace!("{cmd:?}"); + self.load().await; + if let Err(cause) = export_ranges_impl(self, cmd.inner, &mut cmd.tx).await { + cmd.tx + .send(ExportRangesItem::Error(cause.into())) + .await + .ok(); } - Err(cause) => AddProgressItem::Error(cause), - }; - cmd.tx.send(res).await.ok(); + } + + #[instrument(skip_all, fields(hash = %cmd.hash_short()))] + async fn export_bao(&self, mut cmd: ExportBaoMsg) { + trace!("{cmd:?}"); + self.load().await; + if let Err(cause) = export_bao_impl(self, cmd.inner, &mut cmd.tx).await { + // if the entry is in state NonExisting, this will be an io error with + // kind NotFound. So we must not wrap this somehow but pass it on directly. + cmd.tx + .send(bao_tree::io::EncodeError::Io(cause).into()) + .await + .ok(); + } + } + + async fn observe(&self, cmd: ObserveMsg) { + SyncEntityApi::observe(self, cmd).await + } + + async fn finish_import(&self, cmd: ImportEntryMsg, tt: TempTag) { + SyncEntityApi::finish_import(self, cmd, tt).await; + } + + async fn export_path(&self, cmd: ExportPathMsg) { + SyncEntityApi::export_path(self, cmd).await + } } -async fn finish_import_impl(import_data: ImportEntry, ctx: HashContext) -> io::Result<()> { +async fn finish_import_impl(ctx: &HashContext, import_data: ImportEntry) -> io::Result<()> { + if ctx.id() == &Hash::EMPTY { + return Ok(()); // nothing to do for the empty hash + } let ImportEntry { source, hash, @@ -803,14 +1071,14 @@ async fn finish_import_impl(import_data: ImportEntry, ctx: HashContext) -> io::R debug_assert!(!options.is_inlined_data(*size)); } } - let guard = ctx.lock().await; - let handle = guard.as_ref().map(|x| x.clone()); + ctx.load().await; + let handle = &ctx.state; // if I do have an existing handle, I have to possibly deal with observers. // if I don't have an existing handle, there are 2 cases: // the entry exists in the db, but we don't have a handle // the entry does not exist at all. // convert the import source to a data location and drop the open files - ctx.protect(hash, [BaoFilePart::Data, BaoFilePart::Outboard]); + ctx.protect([BaoFilePart::Data, BaoFilePart::Outboard]); let data_location = match source { ImportSource::Memory(data) => DataLocation::Inline(data), ImportSource::External(path, _file, size) => DataLocation::External(vec![path], size), @@ -854,58 +1122,39 @@ async fn finish_import_impl(import_data: ImportEntry, ctx: HashContext) -> io::R OutboardLocation::Owned } }; - if let Some(handle) = handle { - let data = match &data_location { - DataLocation::Inline(data) => MemOrFile::Mem(data.clone()), - DataLocation::Owned(size) => { - let path = ctx.options().path.data_path(&hash); - let file = fs::File::open(&path)?; - MemOrFile::File(FixedSize::new(file, *size)) - } - DataLocation::External(paths, size) => { - let Some(path) = paths.iter().next() else { - return Err(io::Error::other("no external data path")); - }; - let file = fs::File::open(path)?; - MemOrFile::File(FixedSize::new(file, *size)) - } - }; - let outboard = match &outboard_location { - OutboardLocation::NotNeeded => MemOrFile::empty(), - OutboardLocation::Inline(data) => MemOrFile::Mem(data.clone()), - OutboardLocation::Owned => { - let path = ctx.options().path.outboard_path(&hash); - let file = fs::File::open(&path)?; - MemOrFile::File(file) - } - }; - handle.complete(data, outboard); - } + let data = match &data_location { + DataLocation::Inline(data) => MemOrFile::Mem(data.clone()), + DataLocation::Owned(size) => { + let path = ctx.options().path.data_path(&hash); + let file = fs::File::open(&path)?; + MemOrFile::File(FixedSize::new(file, *size)) + } + DataLocation::External(paths, size) => { + let Some(path) = paths.iter().next() else { + return Err(io::Error::other("no external data path")); + }; + let file = fs::File::open(path)?; + MemOrFile::File(FixedSize::new(file, *size)) + } + }; + let outboard = match &outboard_location { + OutboardLocation::NotNeeded => MemOrFile::empty(), + OutboardLocation::Inline(data) => MemOrFile::Mem(data.clone()), + OutboardLocation::Owned => { + let path = ctx.options().path.outboard_path(&hash); + let file = fs::File::open(&path)?; + MemOrFile::File(file) + } + }; + handle.complete(data, outboard); let state = EntryState::Complete { data_location, outboard_location, }; - ctx.update_await(hash, state).await?; + ctx.update_await(state).await?; Ok(()) } -#[instrument(skip_all, fields(hash = %cmd.hash_short()))] -async fn import_bao(cmd: ImportBaoMsg, ctx: HashContext) { - trace!("{cmd:?}"); - let ImportBaoMsg { - inner: ImportBaoRequest { size, hash }, - rx, - tx, - .. - } = cmd; - let res = match ctx.get_or_create(hash).await { - Ok(handle) => import_bao_impl(size, rx, handle, ctx).await, - Err(cause) => Err(cause), - }; - trace!("{res:?}"); - tx.send(res).await.ok(); -} - fn chunk_range(leaf: &Leaf) -> ChunkRanges { let start = ChunkNum::chunks(leaf.offset); let end = ChunkNum::chunks(leaf.offset + leaf.data.len() as u64); @@ -913,19 +1162,18 @@ fn chunk_range(leaf: &Leaf) -> ChunkRanges { } async fn import_bao_impl( + ctx: &impl SyncEntityApi, size: NonZeroU64, mut rx: mpsc::Receiver, - handle: BaoFileHandle, - ctx: HashContext, ) -> api::Result<()> { - trace!("importing bao: {} {} bytes", ctx.id.fmt_short(), size); + trace!("importing bao: {} {} bytes", ctx.id().fmt_short(), size); let mut batch = Vec::::new(); let mut ranges = ChunkRanges::empty(); while let Some(item) = rx.recv().await? { // if the batch is not empty, the last item is a leaf and the current item is a parent, write the batch if !batch.is_empty() && batch[batch.len() - 1].is_leaf() && item.is_parent() { let bitfield = Bitfield::new_unchecked(ranges, size.into()); - handle.write_batch(&batch, &bitfield, &ctx).await?; + ctx.write_batch(&batch, &bitfield).await?; batch.clear(); ranges = ChunkRanges::empty(); } @@ -941,49 +1189,21 @@ async fn import_bao_impl( } if !batch.is_empty() { let bitfield = Bitfield::new_unchecked(ranges, size.into()); - handle.write_batch(&batch, &bitfield, &ctx).await?; + ctx.write_batch(&batch, &bitfield).await?; } Ok(()) } -#[instrument(skip_all, fields(hash = %cmd.hash_short()))] -async fn observe(cmd: ObserveMsg, ctx: HashContext) { - let Ok(handle) = ctx.get_or_create(cmd.hash).await else { - return; - }; - handle.subscribe().forward(cmd.tx).await.ok(); -} - -#[instrument(skip_all, fields(hash = %cmd.hash_short()))] -async fn export_ranges(mut cmd: ExportRangesMsg, ctx: HashContext) { - match ctx.get(cmd.hash).await { - Ok(handle) => { - if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, handle).await { - cmd.tx - .send(ExportRangesItem::Error(cause.into())) - .await - .ok(); - } - } - Err(cause) => { - cmd.tx.send(ExportRangesItem::Error(cause)).await.ok(); - } - } -} - async fn export_ranges_impl( + ctx: &impl SyncEntityApi, cmd: ExportRangesRequest, tx: &mut mpsc::Sender, - handle: BaoFileHandle, ) -> io::Result<()> { let ExportRangesRequest { ranges, hash } = cmd; - trace!( - "export_ranges: exporting ranges: {hash} {ranges:?} size={}", - handle.current_size()? - ); - let bitfield = handle.bitfield()?; - let data = handle.data_reader(); + let bitfield = ctx.bitfield()?; + let data = ctx.data(); let size = bitfield.size(); + trace!("exporting ranges: {hash} {ranges:?} size={size}",); for range in ranges.iter() { let range = match range { RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end), @@ -1012,58 +1232,29 @@ async fn export_ranges_impl( Ok(()) } -#[instrument(skip_all, fields(hash = %cmd.hash_short()))] -async fn export_bao(mut cmd: ExportBaoMsg, ctx: HashContext) { - match ctx.get(cmd.hash).await { - Ok(handle) => { - if let Err(cause) = export_bao_impl(cmd.inner, &mut cmd.tx, handle).await { - cmd.tx - .send(bao_tree::io::EncodeError::Io(io::Error::other(cause)).into()) - .await - .ok(); - } - } - Err(cause) => { - let crate::api::Error::Io(cause) = cause; - cmd.tx - .send(bao_tree::io::EncodeError::Io(cause).into()) - .await - .ok(); - } - } -} - async fn export_bao_impl( + ctx: &impl SyncEntityApi, cmd: ExportBaoRequest, tx: &mut mpsc::Sender, - handle: BaoFileHandle, -) -> anyhow::Result<()> { +) -> io::Result<()> { let ExportBaoRequest { ranges, hash, .. } = cmd; - let outboard = handle.outboard(&hash)?; - let size = outboard.tree.size(); - if size == 0 && hash != Hash::EMPTY { + let outboard = ctx.outboard()?; + let size = outboard.tree().size(); + if size == 0 && cmd.hash != Hash::EMPTY { // we have no data whatsoever, so we stop here return Ok(()); } trace!("exporting bao: {hash} {ranges:?} size={size}",); - let data = handle.data_reader(); + let data = ctx.data(); let tx = BaoTreeSender::new(tx); traverse_ranges_validated(data, outboard, &ranges, tx).await?; Ok(()) } -#[instrument(skip_all, fields(hash = %cmd.hash_short()))] -async fn export_path(cmd: ExportPathMsg, ctx: HashContext) { - let ExportPathMsg { inner, mut tx, .. } = cmd; - if let Err(cause) = export_path_impl(inner, &mut tx, ctx).await { - tx.send(cause.into()).await.ok(); - } -} - async fn export_path_impl( + ctx: &HashContext, cmd: ExportPathRequest, tx: &mut mpsc::Sender, - ctx: HashContext, ) -> api::Result<()> { let ExportPathRequest { mode, target, .. } = cmd; if !target.is_absolute() { @@ -1075,8 +1266,7 @@ async fn export_path_impl( if let Some(parent) = target.parent() { fs::create_dir_all(parent)?; } - let _guard = ctx.lock().await; - let state = ctx.get_entry_state(cmd.hash).await?; + let state = ctx.get_entry_state().await?; let (data_location, outboard_location) = match state { Some(EntryState::Complete { data_location, @@ -1138,13 +1328,10 @@ async fn export_path_impl( } } } - ctx.set( - cmd.hash, - EntryState::Complete { - data_location: DataLocation::External(vec![target], size), - outboard_location, - }, - ) + ctx.set(EntryState::Complete { + data_location: DataLocation::External(vec![target], size), + outboard_location, + }) .await?; } }, @@ -1188,8 +1375,14 @@ impl FsStore { /// Load or create a new store with custom options, returning an additional sender for file store specific commands. pub async fn load_with_opts(db_path: PathBuf, options: Options) -> anyhow::Result { + static THREAD_NR: AtomicU64 = AtomicU64::new(0); let rt = tokio::runtime::Builder::new_multi_thread() - .thread_name("iroh-blob-store") + .thread_name_fn(|| { + format!( + "iroh-blob-store-{}", + THREAD_NR.fetch_add(1, Ordering::Relaxed) + ) + }) .enable_time() .build()?; let handle = rt.handle().clone(); @@ -1407,7 +1600,7 @@ pub mod tests { // import data via import_bytes, check that we can observe it and that it is complete #[tokio::test] - async fn test_import_bytes() -> TestResult<()> { + async fn test_import_bytes_simple() -> TestResult<()> { tracing_subscriber::fmt::try_init().ok(); let testdir = tempfile::tempdir()?; let db_dir = testdir.path().join("db"); @@ -1840,6 +2033,7 @@ pub mod tests { assert!(tts.contains(tt2.hash_and_format())); drop(batch); store.sync_db().await?; + store.wait_idle().await?; let tts = store .tags() .list_temp_tags() @@ -1938,7 +2132,6 @@ pub mod tests { if path.is_file() { if let Some(file_ext) = path.extension() { if file_ext.to_string_lossy().to_lowercase() == ext { - println!("Deleting: {}", path.display()); fs::remove_file(path)?; } } diff --git a/src/store/fs/bao_file.rs b/src/store/fs/bao_file.rs index e53fe8dc..ccdd9a1a 100644 --- a/src/store/fs/bao_file.rs +++ b/src/store/fs/bao_file.rs @@ -4,7 +4,6 @@ use std::{ io, ops::Deref, path::Path, - sync::Arc, }; use bao_tree::{ @@ -21,7 +20,7 @@ use bytes::{Bytes, BytesMut}; use derive_more::Debug; use irpc::channel::mpsc; use tokio::sync::watch; -use tracing::{debug, error, info, trace}; +use tracing::{debug, info, trace}; use super::{ entry_state::{DataLocation, EntryState, OutboardLocation}, @@ -143,7 +142,7 @@ impl PartialFileStorage { &self.bitfield } - fn sync_all(&self, bitfield_path: &Path) -> io::Result<()> { + pub(super) fn sync_all(&self, bitfield_path: &Path) -> io::Result<()> { self.data.sync_all()?; self.outboard.sync_all()?; self.sizes.sync_all()?; @@ -236,7 +235,7 @@ impl PartialFileStorage { )) } - fn current_size(&self) -> io::Result { + pub(super) fn current_size(&self) -> io::Result { read_size(&self.sizes) } @@ -286,8 +285,24 @@ fn read_size(size_file: &File) -> io::Result { } /// The storage for a bao file. This can be either in memory or on disk. -#[derive(derive_more::From)] +/// +/// The two initial states `Initial` and `Loading` are used to coordinate the +/// loading of the entry from the metadata database. Once that is complete, +/// you should never see these states again. +/// +/// From the remaining states you can get into `Poisoned` if there is an +/// IO error during an operation. +/// +/// `Poisioned` is also used once the handle is persisted and no longer usable. +#[derive(derive_more::From, Default)] pub(crate) enum BaoFileStorage { + /// Initial state, we don't know anything yet. + #[default] + Initial, + /// Currently loading the entry from the metadata. + Loading, + /// There is no info about this hash in the metadata db. + NonExisting, /// The entry is incomplete and in memory. /// /// Since it is incomplete, it must be writeable. @@ -305,13 +320,8 @@ pub(crate) enum BaoFileStorage { /// /// Writing to this is a no-op, since it is already complete. Complete(CompleteStorage), - /// We will get into that state if there is an io error in the middle of an operation - /// - /// Also, when the handle is dropped we will poison the storage, so poisoned - /// can be seen when the handle is revived during the drop. - /// - /// BaoFileHandleWeak::upgrade() will return None if the storage is poisoned, - /// treat it as dead. + /// We will get into that state if there is an io error in the middle of an operation, + /// or after the handle is persisted and no longer usable. Poisoned, } @@ -322,16 +332,13 @@ impl fmt::Debug for BaoFileStorage { BaoFileStorage::Partial(x) => x.fmt(f), BaoFileStorage::Complete(x) => x.fmt(f), BaoFileStorage::Poisoned => f.debug_struct("Poisoned").finish(), + BaoFileStorage::Initial => f.debug_struct("Initial").finish(), + BaoFileStorage::Loading => f.debug_struct("Loading").finish(), + BaoFileStorage::NonExisting => f.debug_struct("NonExisting").finish(), } } } -impl Default for BaoFileStorage { - fn default() -> Self { - BaoFileStorage::Complete(Default::default()) - } -} - impl PartialMemStorage { /// Converts this storage into a complete storage, using the given hash for /// path names and the given options for decisions about inlining. @@ -387,22 +394,32 @@ impl PartialMemStorage { impl BaoFileStorage { pub fn bitfield(&self) -> Bitfield { match self { - BaoFileStorage::Complete(x) => Bitfield::complete(x.data.size()), + BaoFileStorage::Initial => { + panic!("initial storage should not be used") + } + BaoFileStorage::Loading => { + panic!("loading storage should not be used") + } + BaoFileStorage::NonExisting => Bitfield::empty(), BaoFileStorage::PartialMem(x) => x.bitfield.clone(), BaoFileStorage::Partial(x) => x.bitfield.clone(), + BaoFileStorage::Complete(x) => Bitfield::complete(x.data.size()), BaoFileStorage::Poisoned => { panic!("poisoned storage should not be used") } } } - fn write_batch( + pub(super) fn write_batch( self, batch: &[BaoContentItem], bitfield: &Bitfield, ctx: &HashContext, ) -> io::Result<(Self, Option>)> { Ok(match self { + BaoFileStorage::NonExisting => { + Self::new_partial_mem().write_batch(batch, bitfield, ctx)? + } BaoFileStorage::PartialMem(mut ms) => { // check if we need to switch to file mode, otherwise write to memory if max_offset(batch) <= ctx.global.options.inline.max_data_inlined { @@ -465,7 +482,7 @@ impl BaoFileStorage { // unless there is a bug, this would just write the exact same data (self, None) } - BaoFileStorage::Poisoned => { + _ => { // we are poisoned, so just ignore the write (self, None) } @@ -473,7 +490,7 @@ impl BaoFileStorage { } /// Create a new mutable mem storage. - pub fn partial_mem() -> Self { + pub fn new_partial_mem() -> Self { Self::PartialMem(Default::default()) } @@ -483,13 +500,14 @@ impl BaoFileStorage { match self { Self::Complete(_) => Ok(()), Self::PartialMem(_) => Ok(()), + Self::NonExisting => Ok(()), Self::Partial(file) => { file.data.sync_all()?; file.outboard.sync_all()?; file.sizes.sync_all()?; Ok(()) } - Self::Poisoned => { + Self::Poisoned | Self::Initial | Self::Loading => { // we are poisoned, so just ignore the sync Ok(()) } @@ -501,42 +519,17 @@ impl BaoFileStorage { } } -/// A cheaply cloneable handle to a bao file, including the hash and the configuration. +/// A cheaply cloneable handle to a bao file. /// /// You must call [Self::persist] to write the bitfield to disk, if you want to persist /// the file handle, otherwise the bitfield will not be written to disk and will have /// to be reconstructed on next use. -#[derive(Debug, Clone, derive_more::Deref)] -pub(crate) struct BaoFileHandle(Arc>); - -impl BaoFileHandle { - pub(super) fn persist(&mut self, ctx: &HashContext) { - self.send_if_modified(|guard| { - let hash = &ctx.id; - if Arc::strong_count(&self.0) > 1 { - return false; - } - let BaoFileStorage::Partial(fs) = guard.take() else { - return false; - }; - let path = ctx.global.options.path.bitfield_path(hash); - trace!("writing bitfield for hash {} to {}", hash, path.display()); - if let Err(cause) = fs.sync_all(&path) { - error!( - "failed to write bitfield for {} at {}: {:?}", - hash, - path.display(), - cause - ); - } - false - }); - } -} +#[derive(Debug, Clone, Default, derive_more::Deref)] +pub(crate) struct BaoFileHandle(pub(super) watch::Sender); /// A reader for a bao file, reading just the data. #[derive(Debug)] -pub struct DataReader(BaoFileHandle); +pub struct DataReader(pub(super) BaoFileHandle); impl ReadBytesAt for DataReader { fn read_bytes_at(&self, offset: u64, size: usize) -> std::io::Result { @@ -546,13 +539,16 @@ impl ReadBytesAt for DataReader { BaoFileStorage::Partial(x) => x.data.read_bytes_at(offset, size), BaoFileStorage::Complete(x) => x.data.read_bytes_at(offset, size), BaoFileStorage::Poisoned => io::Result::Err(io::Error::other("poisoned storage")), + BaoFileStorage::Initial => io::Result::Err(io::Error::other("initial")), + BaoFileStorage::Loading => io::Result::Err(io::Error::other("loading")), + BaoFileStorage::NonExisting => io::Result::Err(io::ErrorKind::NotFound.into()), } } } /// A reader for the outboard part of a bao file. #[derive(Debug)] -pub struct OutboardReader(BaoFileHandle); +pub struct OutboardReader(pub(super) BaoFileHandle); impl ReadAt for OutboardReader { fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result { @@ -562,22 +558,51 @@ impl ReadAt for OutboardReader { BaoFileStorage::PartialMem(x) => x.outboard.read_at(offset, buf), BaoFileStorage::Partial(x) => x.outboard.read_at(offset, buf), BaoFileStorage::Poisoned => io::Result::Err(io::Error::other("poisoned storage")), + BaoFileStorage::Initial => io::Result::Err(io::Error::other("initial")), + BaoFileStorage::Loading => io::Result::Err(io::Error::other("loading")), + BaoFileStorage::NonExisting => io::Result::Err(io::ErrorKind::NotFound.into()), } } } -impl BaoFileHandle { - #[allow(dead_code)] - pub fn id(&self) -> usize { - Arc::as_ptr(&self.0) as usize - } - - /// Create a new bao file handle. - /// - /// This will create a new file handle with an empty memory storage. - pub fn new_partial_mem() -> Self { - let storage = BaoFileStorage::partial_mem(); - Self(Arc::new(watch::Sender::new(storage))) +impl BaoFileStorage { + pub async fn open(state: Option>, ctx: &HashContext) -> io::Result { + let hash = &ctx.id; + let options = &ctx.global.options; + Ok(match state { + Some(EntryState::Complete { + data_location, + outboard_location, + }) => { + let data = match data_location { + DataLocation::Inline(data) => MemOrFile::Mem(data), + DataLocation::Owned(size) => { + let path = options.path.data_path(hash); + let file = std::fs::File::open(&path)?; + MemOrFile::File(FixedSize::new(file, size)) + } + DataLocation::External(paths, size) => { + let Some(path) = paths.into_iter().next() else { + return Err(io::Error::other("no external data path")); + }; + let file = std::fs::File::open(&path)?; + MemOrFile::File(FixedSize::new(file, size)) + } + }; + let outboard = match outboard_location { + OutboardLocation::NotNeeded => MemOrFile::empty(), + OutboardLocation::Inline(data) => MemOrFile::Mem(data), + OutboardLocation::Owned => { + let path = options.path.outboard_path(hash); + let file = std::fs::File::open(&path)?; + MemOrFile::File(file) + } + }; + Self::new_complete(data, outboard) + } + Some(EntryState::Partial { .. }) => Self::new_partial_file(ctx).await?, + None => Self::NonExisting, + }) } /// Create a new bao file handle with a partial file. @@ -585,7 +610,7 @@ impl BaoFileHandle { let hash = &ctx.id; let options = ctx.global.options.clone(); let storage = PartialFileStorage::load(hash, &options.path)?; - let storage = if storage.bitfield.is_complete() { + Ok(if storage.bitfield.is_complete() { let size = storage.bitfield.size; let (storage, entry_state) = storage.into_complete(size, &options)?; debug!("File was reconstructed as complete"); @@ -593,8 +618,7 @@ impl BaoFileHandle { storage.into() } else { storage.into() - }; - Ok(Self(Arc::new(watch::Sender::new(storage)))) + }) } /// Create a new complete bao file handle. @@ -602,10 +626,11 @@ impl BaoFileHandle { data: MemOrFile>, outboard: MemOrFile, ) -> Self { - let storage = CompleteStorage { data, outboard }.into(); - Self(Arc::new(watch::Sender::new(storage))) + CompleteStorage { data, outboard }.into() } +} +impl BaoFileHandle { /// Complete the handle pub fn complete( &self, @@ -613,14 +638,14 @@ impl BaoFileHandle { outboard: MemOrFile, ) { self.send_if_modified(|guard| { - let res = match guard { - BaoFileStorage::Complete(_) => None, - BaoFileStorage::PartialMem(entry) => Some(&mut entry.bitfield), - BaoFileStorage::Partial(entry) => Some(&mut entry.bitfield), - BaoFileStorage::Poisoned => None, + let needs_complete = match guard { + BaoFileStorage::NonExisting => true, + BaoFileStorage::Complete(_) => false, + BaoFileStorage::PartialMem(_) => true, + BaoFileStorage::Partial(_) => true, + _ => false, }; - if let Some(bitfield) = res { - bitfield.update(&Bitfield::complete(data.size())); + if needs_complete { *guard = BaoFileStorage::Complete(CompleteStorage { data, outboard }); true } else { @@ -628,87 +653,6 @@ impl BaoFileHandle { } }); } - - pub fn subscribe(&self) -> BaoFileStorageSubscriber { - BaoFileStorageSubscriber::new(self.0.subscribe()) - } - - /// True if the file is complete. - #[allow(dead_code)] - pub fn is_complete(&self) -> bool { - matches!(self.borrow().deref(), BaoFileStorage::Complete(_)) - } - - /// An AsyncSliceReader for the data file. - /// - /// Caution: this is a reader for the unvalidated data file. Reading this - /// can produce data that does not match the hash. - pub fn data_reader(&self) -> DataReader { - DataReader(self.clone()) - } - - /// An AsyncSliceReader for the outboard file. - /// - /// The outboard file is used to validate the data file. It is not guaranteed - /// to be complete. - pub fn outboard_reader(&self) -> OutboardReader { - OutboardReader(self.clone()) - } - - /// The most precise known total size of the data file. - pub fn current_size(&self) -> io::Result { - match self.borrow().deref() { - BaoFileStorage::Complete(mem) => Ok(mem.size()), - BaoFileStorage::PartialMem(mem) => Ok(mem.current_size()), - BaoFileStorage::Partial(file) => file.current_size(), - BaoFileStorage::Poisoned => io::Result::Err(io::Error::other("poisoned storage")), - } - } - - /// The most precise known total size of the data file. - pub fn bitfield(&self) -> io::Result { - match self.borrow().deref() { - BaoFileStorage::Complete(mem) => Ok(mem.bitfield()), - BaoFileStorage::PartialMem(mem) => Ok(mem.bitfield().clone()), - BaoFileStorage::Partial(file) => Ok(file.bitfield().clone()), - BaoFileStorage::Poisoned => io::Result::Err(io::Error::other("poisoned storage")), - } - } - - /// The outboard for the file. - pub fn outboard(&self, hash: &Hash) -> io::Result> { - let tree = BaoTree::new(self.current_size()?, IROH_BLOCK_SIZE); - let outboard = self.outboard_reader(); - Ok(PreOrderOutboard { - root: blake3::Hash::from(*hash), - tree, - data: outboard, - }) - } - - /// Write a batch and notify the db - pub(super) async fn write_batch( - &self, - batch: &[BaoContentItem], - bitfield: &Bitfield, - ctx: &HashContext, - ) -> io::Result<()> { - trace!("write_batch bitfield={:?} batch={}", bitfield, batch.len()); - let mut res = Ok(None); - self.send_if_modified(|state| { - let Ok((state1, update)) = state.take().write_batch(batch, bitfield, ctx) else { - res = Err(io::Error::other("write batch failed")); - return false; - }; - res = Ok(update); - *state = state1; - true - }); - if let Some(update) = res? { - ctx.global.db.update(ctx.id, update).await?; - } - Ok(()) - } } impl PartialMemStorage { diff --git a/src/store/fs/gc.rs b/src/store/fs/gc.rs index a496eee3..da7836e7 100644 --- a/src/store/fs/gc.rs +++ b/src/store/fs/gc.rs @@ -243,7 +243,6 @@ mod tests { use std::{ io::{self}, path::Path, - time::Duration, }; use bao_tree::{io::EncodeError, ChunkNum}; @@ -352,7 +351,7 @@ mod tests { let outboard_path = options.outboard_path(&bh); let sizes_path = options.sizes_path(&bh); let bitfield_path = options.bitfield_path(&bh); - tokio::time::sleep(Duration::from_millis(100)).await; // allow for some time for the file to be written + store.wait_idle().await?; assert!(data_path.exists()); assert!(outboard_path.exists()); assert!(sizes_path.exists()); diff --git a/src/store/fs/meta.rs b/src/store/fs/meta.rs index 21fbd9ed..f840c985 100644 --- a/src/store/fs/meta.rs +++ b/src/store/fs/meta.rs @@ -1,11 +1,7 @@ //! The metadata database #![allow(clippy::result_large_err)] use std::{ - collections::HashSet, - io, - ops::{Bound, Deref, DerefMut}, - path::PathBuf, - time::SystemTime, + collections::HashSet, io, ops::{Bound, Deref, DerefMut}, path::PathBuf, time::SystemTime }; use bao_tree::BaoTree; @@ -15,7 +11,7 @@ use n0_snafu::SpanTrace; use nested_enum_utils::common_fields; use redb::{Database, DatabaseError, ReadableTable}; use snafu::{Backtrace, ResultExt, Snafu}; -use tokio::pin; +use tokio::{pin, task::JoinSet}; use crate::{ api::{ @@ -96,15 +92,6 @@ impl Db { Self { sender } } - pub async fn snapshot(&self, span: tracing::Span) -> io::Result { - let (tx, rx) = tokio::sync::oneshot::channel(); - self.sender - .send(Snapshot { tx, span }.into()) - .await - .map_err(|_| io::Error::other("send snapshot"))?; - rx.await.map_err(|_| io::Error::other("receive snapshot")) - } - pub async fn update_await(&self, hash: Hash, state: EntryState) -> io::Result<()> { let (tx, rx) = oneshot::channel(); self.sender @@ -463,6 +450,7 @@ pub struct Actor { ds: DeleteHandle, options: BatchOptions, protected: HashSet, + tasks: JoinSet<()>, } impl Actor { @@ -492,6 +480,7 @@ impl Actor { ds, options, protected: Default::default(), + tasks: JoinSet::new(), }) } @@ -707,6 +696,7 @@ impl Actor { async fn handle_toplevel( db: &mut Database, + tasks: &mut JoinSet<()>, cmd: TopLevelCommand, op: TxnNum, ) -> ActorResult> { @@ -726,11 +716,11 @@ impl Actor { // nothing to do here, since the database will be dropped Some(cmd) } - TopLevelCommand::Snapshot(cmd) => { + TopLevelCommand::ListBlobs(cmd) => { trace!("{cmd:?}"); let txn = db.begin_read().context(TransactionSnafu)?; let snapshot = ReadOnlyTables::new(&txn).context(TableSnafu)?; - cmd.tx.send(snapshot).ok(); + tasks.spawn(list_blobs(snapshot, cmd)); None } }) @@ -741,14 +731,18 @@ impl Actor { let options = &self.options; let mut op = 0u64; let shutdown = loop { + let cmd = tokio::select! { + cmd = self.cmds.recv() => cmd, + _ = self.tasks.join_next(), if !self.tasks.is_empty() => continue, + }; op += 1; - let Some(cmd) = self.cmds.recv().await else { + let Some(cmd) = cmd else { break None; }; match cmd { Command::TopLevel(cmd) => { let op = TxnNum::TopLevel(op); - if let Some(shutdown) = Self::handle_toplevel(&mut db, cmd, op).await? { + if let Some(shutdown) = Self::handle_toplevel(&mut db, &mut self.tasks, cmd, op).await? { break Some(shutdown); } } diff --git a/src/store/fs/meta/proto.rs b/src/store/fs/meta/proto.rs index 6f4aaa6c..5b9afec5 100644 --- a/src/store/fs/meta/proto.rs +++ b/src/store/fs/meta/proto.rs @@ -5,13 +5,15 @@ use bytes::Bytes; use nested_enum_utils::enum_conversions; use tracing::Span; -use super::{ActorResult, ReadOnlyTables}; +use super::ActorResult; use crate::{ api::proto::{ - BlobStatusMsg, ClearProtectedMsg, DeleteBlobsMsg, ProcessExitRequest, ShutdownMsg, - SyncDbMsg, + BlobStatusMsg, ClearProtectedMsg, DeleteBlobsMsg, ListBlobsMsg, ProcessExitRequest, ShutdownMsg, SyncDbMsg + }, + store::{ + fs::{entry_state::EntryState, GlobalCmd}, + util::DD, }, - store::{fs::entry_state::EntryState, util::DD}, util::channel::oneshot, Hash, }; @@ -49,12 +51,6 @@ pub struct Dump { pub span: Span, } -#[derive(Debug)] -pub struct Snapshot { - pub(crate) tx: tokio::sync::oneshot::Sender, - pub span: Span, -} - pub struct Update { pub hash: Hash, pub state: EntryState, @@ -167,7 +163,7 @@ impl ReadWriteCommand { pub enum TopLevelCommand { SyncDb(SyncDbMsg), Shutdown(ShutdownMsg), - Snapshot(Snapshot), + ListBlobs(ListBlobsMsg), } impl TopLevelCommand { @@ -181,7 +177,7 @@ impl TopLevelCommand { match self { Self::SyncDb(x) => x.parent_span_opt(), Self::Shutdown(x) => x.parent_span_opt(), - Self::Snapshot(x) => Some(&x.span), + Self::ListBlobs(x) => Some(&x.span), } } } @@ -193,6 +189,25 @@ pub enum Command { TopLevel(TopLevelCommand), } +impl From for Command { + fn from(cmd: GlobalCmd) -> Self { + match cmd { + GlobalCmd::SyncDb(cmd) => cmd.into(), + GlobalCmd::Shutdown(cmd) => cmd.into(), + + GlobalCmd::ListTags(cmd) => cmd.into(), + GlobalCmd::SetTag(cmd) => cmd.into(), + GlobalCmd::DeleteTags(cmd) => cmd.into(), + GlobalCmd::RenameTag(cmd) => cmd.into(), + GlobalCmd::CreateTag(cmd) => cmd.into(), + + GlobalCmd::BlobStatus(cmd) => cmd.into(), + GlobalCmd::DeleteBlobs(cmd) => cmd.into(), + GlobalCmd::ListBlobs(_cmd) => todo!(), + } + } +} + impl Command { pub fn non_top_level(self) -> std::result::Result { match self { diff --git a/src/store/fs/util/entity_manager.rs b/src/store/fs/util/entity_manager.rs index 493a52aa..46bd1456 100644 --- a/src/store/fs/util/entity_manager.rs +++ b/src/store/fs/util/entity_manager.rs @@ -24,52 +24,54 @@ pub enum ShutdownCause { Drop, } -/// Parameters for the entity manager system. -pub trait Params: Send + Sync + 'static { +/// State of a single entity for the entity manager. +pub trait EntityState: Send + Sync + Clone + 'static { /// Entity id type. /// /// This does not require Copy to allow for more complex types, such as `String`, /// but you have to make sure that ids are small and cheap to clone, since they are /// used as keys in maps. - type EntityId: Debug + Hash + Eq + Clone + Send + Sync + 'static; + type Id: Debug + Hash + Eq + Clone + Send + Sync + 'static; /// Global state type. /// /// This is passed into all entity actors. It also needs to be cheap handle. /// If you don't need it, just set it to `()`. type GlobalState: Debug + Clone + Send + Sync + 'static; - /// Entity state type. - /// - /// This is the actual distinct per-entity state. This needs to implement - /// `Default` and a matching `Reset`. It also needs to implement `Clone` - /// since we unfortunately need to pass an owned copy of the state to the - /// callback - otherwise we run into some rust lifetime limitations - /// . + + fn global(&self) -> &Self::GlobalState; + + fn id(&self) -> &Self::Id; + + /// Reset the state to its default value. + fn reset(&mut self, id: &Self::Id, global: &Self::GlobalState); + + /// Create a new state for the given id and global state. + fn new(id: &Self::Id, global: &Self::GlobalState) -> Self; + + /// A ref count to ensure that the state is unique when shutting down. /// - /// Frequently this is an `Arc>` or similar. Note that per entity - /// access is concurrent but not parallel, so you can use a more efficient - /// synchronization primitive like [`AtomicRefCell`](https://crates.io/crates/atomic_refcell) if you want to. - type EntityState: Default + Debug + Reset + Clone + Send + Sync + 'static; + /// You are not allowed to clone the state out of a task, even though that + /// is possible. + fn ref_count(&self) -> usize; + /// Function being called when an entity actor is shutting down. - fn on_shutdown( - state: entity_actor::State, - cause: ShutdownCause, - ) -> impl Future + Send + 'static + fn on_shutdown(&self, cause: ShutdownCause) -> impl Future + Send where Self: Sized; } /// Sent to the main actor and then delegated to the entity actor to spawn a new task. -pub(crate) struct Spawn { - id: P::EntityId, +pub(crate) struct Spawn { + id: P::Id, f: Box) -> future::Boxed<()> + Send>, } pub(crate) struct EntityShutdown; /// Argument for the `EntityManager::spawn` function. -pub enum SpawnArg { +pub enum SpawnArg { /// The entity is active, and we were able to spawn a task. - Active(ActiveEntityState

), + Active(P), /// The entity is busy and cannot spawn a new task. Busy, /// The entity is dead. @@ -81,8 +83,8 @@ pub enum SpawnArg { /// With this message the entity actor gives back the receiver for its command channel, /// so it can be reusd either immediately if commands come in during shutdown, or later /// if the entity actor is reused for a different entity. -struct Shutdown { - id: P::EntityId, +struct Shutdown { + id: P::Id, receiver: mpsc::Receiver>, } @@ -94,8 +96,8 @@ struct ShutdownAll { /// /// With this message the entity actor sends back the remaining state. The tasks set /// at this point must be empty, as the entity actor has already completed all tasks. -struct ShutdownComplete { - state: ActiveEntityState

, +struct ShutdownComplete { + state: P, tasks: FuturesUnordered>, } @@ -105,50 +107,29 @@ mod entity_actor { use tokio::sync::mpsc; use super::{ - EntityShutdown, Params, Reset, Shutdown, ShutdownCause, ShutdownComplete, Spawn, SpawnArg, + EntityShutdown, EntityState, Shutdown, ShutdownCause, ShutdownComplete, Spawn, SpawnArg, }; - /// State of an active entity. - #[derive(Debug)] - pub struct State { - /// The entity id. - pub id: P::EntityId, - /// A copy of the global state. - pub global: P::GlobalState, - /// The per-entity state which might have internal mutability. - pub state: P::EntityState, - } - - impl Clone for State

{ - fn clone(&self) -> Self { - Self { - id: self.id.clone(), - global: self.global.clone(), - state: self.state.clone(), - } - } - } - - pub enum Command { + pub enum Command { Spawn(Spawn

), EntityShutdown(EntityShutdown), } - impl From for Command

{ + impl From for Command

{ fn from(_: EntityShutdown) -> Self { Self::EntityShutdown(EntityShutdown) } } #[derive(Debug)] - pub struct Actor { + pub struct Actor { pub recv: mpsc::Receiver>, pub main: mpsc::Sender>, - pub state: State

, + pub state: P, pub tasks: FuturesUnordered>, } - impl Actor

{ + impl Actor

{ pub async fn run(mut self) { loop { tokio::select! { @@ -184,7 +165,7 @@ mod entity_actor { /// All senders for our receive channel were dropped, so we shut down without waiting for any tasks to complete. async fn drop_shutdown_state(self) { let Self { state, .. } = self; - P::on_shutdown(state, ShutdownCause::Drop).await; + state.on_shutdown(ShutdownCause::Drop).await; } /// Soft shutdown state. @@ -192,7 +173,8 @@ mod entity_actor { /// We have received an explicit shutdown command, so we wait for all tasks to complete and then call the shutdown function. async fn soft_shutdown_state(mut self) { while (self.tasks.next().await).is_some() {} - P::on_shutdown(self.state.clone(), ShutdownCause::Soft).await; + let Self { state, .. } = self; + state.on_shutdown(ShutdownCause::Soft).await; } async fn recycle_state(self) { @@ -206,14 +188,15 @@ mod entity_actor { self.main .send( Shutdown { - id: self.state.id.clone(), + id: self.state.id().clone(), receiver: self.recv, } .into(), ) .await .ok(); - P::on_shutdown(self.state.clone(), ShutdownCause::Idle).await; + assert_eq!(self.state.ref_count(), 1); + self.state.on_shutdown(ShutdownCause::Idle).await; // Notify the main actor that we have completed shutdown. // here we also give back the rest of ourselves so the main actor can recycle us. self.main @@ -249,11 +232,11 @@ mod entity_actor { self.tasks.is_empty(), "Tasks must be empty before recycling" ); - self.state.state.reset(); + // todo: reset? + // we do reset on reuse, but not doing it here might cause a memory leak } } } -pub use entity_actor::State as ActiveEntityState; pub use main_actor::ActorState as EntityManagerState; mod main_actor { @@ -265,40 +248,40 @@ mod main_actor { use tracing::{error, warn}; use super::{ - entity_actor, EntityShutdown, Params, Reset, Shutdown, ShutdownAll, ShutdownComplete, - Spawn, SpawnArg, + entity_actor, EntityShutdown, EntityState, Shutdown, ShutdownAll, ShutdownComplete, Spawn, + SpawnArg, }; - pub(super) enum Command { + pub(super) enum Command { Spawn(Spawn

), ShutdownAll(ShutdownAll), } - impl From for Command

{ + impl From for Command

{ fn from(shutdown_all: ShutdownAll) -> Self { Self::ShutdownAll(shutdown_all) } } - pub(super) enum InternalCommand { + pub(super) enum InternalCommand { ShutdownComplete(ShutdownComplete

), Shutdown(Shutdown

), } - impl From> for InternalCommand

{ + impl From> for InternalCommand

{ fn from(shutdown: Shutdown

) -> Self { Self::Shutdown(shutdown) } } - impl From> for InternalCommand

{ + impl From> for InternalCommand

{ fn from(shutdown_complete: ShutdownComplete

) -> Self { Self::ShutdownComplete(shutdown_complete) } } #[derive(Debug)] - pub enum EntityHandle { + pub enum EntityHandle { /// A running entity actor. Live { send: mpsc::Sender>, @@ -309,7 +292,7 @@ mod main_actor { }, } - impl EntityHandle

{ + impl EntityHandle

{ pub fn send(&self) -> &mpsc::Sender> { match self { EntityHandle::Live { send } => send, @@ -323,14 +306,14 @@ mod main_actor { /// This is if you don't want a separate manager actor, but want to inline the entity /// actor management into your main actor. #[derive(Debug)] - pub struct ActorState { + pub struct ActorState { /// Channel to receive internal commands from the entity actors. /// This channel will never be closed since we also hold a sender to it. internal_recv: mpsc::Receiver>, /// Channel to send internal commands to ourselves, to hand out to entity actors. internal_send: mpsc::Sender>, /// Map of live entity actors. - live: HashMap>, + live: HashMap>, /// Global state shared across all entity actors. state: P::GlobalState, /// Pool of inactive entity actors to reuse. @@ -344,7 +327,7 @@ mod main_actor { entity_futures_initial_capacity: usize, } - impl ActorState

{ + impl ActorState

{ pub fn new( state: P::GlobalState, pool_capacity: usize, @@ -368,7 +351,7 @@ mod main_actor { /// Friendly version of `spawn_boxed` that does the boxing pub async fn spawn( &mut self, - id: P::EntityId, + id: P::Id, f: F, ) -> Option + Send + 'static> where @@ -389,7 +372,7 @@ mod main_actor { #[must_use = "this function may return a future that must be spawned by the caller"] pub async fn spawn_boxed( &mut self, - id: P::EntityId, + id: P::Id, f: Box) -> future::Boxed<()> + Send>, ) -> Option + Send + 'static> { let (entity_handle, task) = self.get_or_create(id.clone()); @@ -454,7 +437,7 @@ mod main_actor { ); } InternalCommand::ShutdownComplete(ShutdownComplete { state, tasks }) => { - let id = state.id.clone(); + let id = state.id().clone(); let Some(entity_handle) = self.live.remove(&id) else { error!( "Received shutdown complete command for unknown entity actor {id:?}" @@ -478,7 +461,7 @@ mod main_actor { // No commands during shutdown, we can recycle the actor. self.recycle(send, actor); } else { - actor.state.state.reset(); + actor.state.reset(&id, &self.state); self.live.insert(id.clone(), EntityHandle::Live { send }); return Some(actor.run()); } @@ -500,7 +483,7 @@ mod main_actor { /// If this function returns a future, it must be spawned by the caller. fn get_or_create( &mut self, - id: P::EntityId, + id: P::Id, ) -> ( &mut EntityHandle

, Option + Send + 'static>, @@ -509,20 +492,13 @@ mod main_actor { let handle = self.live.entry(id.clone()).or_insert_with(|| { if let Some((send, mut actor)) = self.pool.pop() { // Get an actor from the pool of inactive actors and initialize it. - actor.state.id = id.clone(); - actor.state.global = self.state.clone(); - // strictly speaking this is not needed, since we reset the state when adding the actor to the pool. - actor.state.state.reset(); + actor.state.reset(&id, &self.state); task = Some(actor.run()); EntityHandle::Live { send } } else { // Create a new entity actor and inbox. let (send, recv) = mpsc::channel(self.entity_inbox_size); - let state: entity_actor::State

= entity_actor::State { - id: id.clone(), - global: self.state.clone(), - state: Default::default(), - }; + let state = P::new(&id, &self.state); let actor = entity_actor::Actor { main: self.internal_send.clone(), recv, @@ -554,7 +530,7 @@ mod main_actor { } } - pub struct Actor { + pub struct Actor { /// Channel to receive commands from the outside world. /// If this channel is closed, it means we need to shut down in a hurry. recv: mpsc::Receiver>, @@ -564,7 +540,7 @@ mod main_actor { state: ActorState

, } - impl Actor

{ + impl Actor

{ pub fn new( state: P::GlobalState, recv: tokio::sync::mpsc::Receiver>, @@ -682,7 +658,7 @@ mod main_actor { /// tasks to complete. For a more gentle shutdown, use the [`EntityManager::shutdown`] function /// that does wait for tasks to complete. #[derive(Debug, Clone)] -pub struct EntityManager(mpsc::Sender>); +pub struct EntityManager(mpsc::Sender>); #[derive(Debug, Clone, Copy)] pub struct Options { @@ -712,7 +688,7 @@ impl Default for Options { } } -impl EntityManager

{ +impl EntityManager

{ pub fn new(state: P::GlobalState, options: Options) -> Self { let (send, recv) = mpsc::channel(options.inbox_size); let actor = main_actor::Actor::new( @@ -740,7 +716,7 @@ impl EntityManager

{ /// /// The future returned by `f` will be executed concurrently with other tasks, but again /// there will be no real parallelism within a single entity actor. - pub async fn spawn(&self, id: P::EntityId, f: F) -> Result<(), &'static str> + pub async fn spawn(&self, id: P::Id, f: F) -> Result<(), &'static str> where F: FnOnce(SpawnArg

) -> Fut + Send + 'static, Fut: future::Future + Send + 'static, @@ -832,15 +808,6 @@ mod tests { tasks: HashSet, } - #[derive(Debug, Clone, Default)] - struct State(Arc>); - - impl Reset for State { - fn reset(&mut self) { - *self.0.borrow_mut() = Default::default(); - } - } - #[derive(Debug, Default)] struct Global { // the "database" of entity values @@ -849,36 +816,64 @@ mod tests { log: HashMap>, } - struct Counters; - impl Params for Counters { - type EntityId = u64; + #[derive(Debug, Clone)] + struct CounterState { + id: u64, + global: Arc>, + value: Arc>, + } + impl EntityState for CounterState { + type Id = u64; type GlobalState = Arc>; - type EntityState = State; - async fn on_shutdown(entity: entity_actor::State, _cause: ShutdownCause) { - let state = entity.state.0.borrow(); - let mut global = entity.global.lock().unwrap(); + fn global(&self) -> &Self::GlobalState { + &self.global + } + fn id(&self) -> &Self::Id { + &self.id + } + async fn on_shutdown(&self, _cause: ShutdownCause) { + let state = self.value.borrow(); + let mut global = self.global.lock().unwrap(); assert_eq!(state.tasks.len(), 1); // persist the state if let Some(value) = state.value { - global.data.insert(entity.id, value); + global.data.insert(self.id, value); } // log the shutdown event global .log - .entry(entity.id) + .entry(self.id) .or_default() .push((Event::Shutdown, Instant::now())); } + + fn new(id: &Self::Id, global: &Self::GlobalState) -> Self { + Self { + id: *id, + global: global.clone(), + value: Default::default(), + } + } + + fn reset(&mut self, id: &Self::Id, global: &Self::GlobalState) { + *self.value.borrow_mut() = Inner::default(); + self.id = *id; + self.global = global.clone(); + } + + fn ref_count(&self) -> usize { + Arc::strong_count(&self.value) + } } pub struct MemDb { - m: EntityManager, + m: EntityManager, global: Arc>, } - impl entity_actor::State { + impl CounterState { async fn with_value(&self, f: impl FnOnce(&mut u128)) -> Result<(), &'static str> { - let mut state = self.state.0.borrow_mut(); + let mut state = self.value.borrow_mut(); // lazily load the data from the database if state.value.is_none() { let mut global = self.global.lock().unwrap(); @@ -886,7 +881,7 @@ mod tests { // log the wakeup event global .log - .entry(self.id) + .entry(*self.id()) .or_default() .push((Event::Wakeup, Instant::now())); } @@ -905,7 +900,7 @@ mod tests { let global = Arc::new(Mutex::new(Global::default())); Self { global: global.clone(), - m: EntityManager::::new(global, Options::default()), + m: EntityManager::::new(global, Options::default()), } } } @@ -959,10 +954,11 @@ mod tests { assert_eq!(global.data, values, "Data mismatch"); for id in values.keys() { let log = global.log.get(id).unwrap(); - assert!( - log.len() % 2 == 0, - "Log must contain alternating wakeup and shutdown events" - ); + if log.len() % 2 != 0 { + panic!( + "Log for entity {id} must contain an even number of events.\n{log:#?}" + ); + } for (i, (event, _)) in log.iter().enumerate() { assert_eq!( *event, @@ -986,7 +982,7 @@ mod tests { /// simulate it by just not spawning the task as we are supposed to. #[tokio::test] async fn test_busy() -> TestResult<()> { - let mut state = EntityManagerState::::new( + let mut state = EntityManagerState::::new( Arc::new(Mutex::new(Global::default())), 1024, 8, @@ -998,7 +994,7 @@ mod tests { let inc = || { let active = active.clone(); let busy = busy.clone(); - |arg: SpawnArg| async move { + |arg: SpawnArg| async move { match arg { SpawnArg::Active(_) => { active.fetch_add(1, Ordering::SeqCst); @@ -1036,12 +1032,12 @@ mod tests { /// a SpawnArg::Dead. #[tokio::test] async fn test_dead() -> TestResult<()> { - let manager = EntityManager::::new( + let manager = EntityManager::::new( Arc::new(Mutex::new(Global::default())), Options::default(), ); let (tx, rx) = oneshot::channel(); - let killer = |arg: SpawnArg| async move { + let killer = |arg: SpawnArg| async move { if let SpawnArg::Active(_) = arg { tx.send(()).ok(); panic!("Panic to kill the task"); @@ -1051,7 +1047,7 @@ mod tests { manager.spawn(1, killer).await?; rx.await.expect("Failed to receive kill confirmation"); let (tx, rx) = oneshot::channel(); - let counter = |arg: SpawnArg| async move { + let counter = |arg: SpawnArg| async move { if let SpawnArg::Dead = arg { tx.send(()).ok(); } @@ -1088,25 +1084,16 @@ mod tests { log: HashMap>, } - #[derive(Debug, Clone, Default)] - struct EntityState(Arc>); - - impl Reset for EntityState { - fn reset(&mut self) { - *self.0.borrow_mut() = Default::default(); - } - } - fn get_path(root: impl AsRef, id: u64) -> PathBuf { root.as_ref().join(hex::encode(id.to_be_bytes())) } - impl entity_actor::State { + impl CounterState { async fn with_value(&self, f: impl FnOnce(&mut u128)) -> Result<(), &'static str> { - let Ok(mut r) = self.state.0.try_borrow_mut() else { + let Ok(mut r) = self.value.try_borrow_mut() else { panic!("failed to borrow state mutably"); }; - if r.value.is_none() { + if r.is_none() { let mut global = self.global.lock().unwrap(); global .log @@ -1129,9 +1116,9 @@ mod tests { let value = u128::from_be_bytes( value.try_into().map_err(|_| "Invalid disk state format")?, ); - r.value = Some(value); + *r = Some(value); } - let Some(value) = r.value.as_mut() else { + let Some(value) = r.as_mut() else { panic!("State must be Memory at this point"); }; f(value); @@ -1139,22 +1126,46 @@ mod tests { } } - struct Counters; - impl Params for Counters { - type EntityId = u64; + #[derive(Debug, Clone)] + struct CounterState { + id: u64, + global: Arc>, + value: Arc>>, + } + impl EntityState for CounterState { + type Id = u64; type GlobalState = Arc>; - type EntityState = EntityState; - async fn on_shutdown(state: entity_actor::State, _cause: ShutdownCause) { - let r = state.state.0.borrow(); - let mut global = state.global.lock().unwrap(); - if let Some(value) = r.value { - let path = get_path(&global.path, state.id); + fn id(&self) -> &Self::Id { + &self.id + } + fn global(&self) -> &Self::GlobalState { + &self.global + } + fn ref_count(&self) -> usize { + Arc::strong_count(&self.value) + } + fn new(id: &Self::Id, global: &Self::GlobalState) -> Self { + Self { + id: *id, + global: global.clone(), + value: Arc::new(AtomicRefCell::new(None)), + } + } + fn reset(&mut self, id: &Self::Id, global: &Self::GlobalState) { + self.value.borrow_mut().take(); + self.id = *id; + self.global = global.clone(); + } + async fn on_shutdown(&self, _cause: ShutdownCause) { + let mut global = self.global.lock().unwrap(); + if let Some(value) = self.value.borrow_mut().take() { + let path = get_path(&global.path, self.id); let value_bytes = value.to_be_bytes(); std::fs::write(&path, value_bytes).expect("Failed to write disk state"); } global .log - .entry(state.id) + .entry(self.id) .or_default() .push((Event::Shutdown, Instant::now())); } @@ -1162,7 +1173,7 @@ mod tests { pub struct FsDb { global: Arc>, - m: EntityManager, + m: EntityManager, } impl FsDb { @@ -1174,7 +1185,7 @@ mod tests { let global = Arc::new(Mutex::new(global)); Self { global: global.clone(), - m: EntityManager::::new(global, Options::default()), + m: EntityManager::::new(global, Options::default()), } } } diff --git a/src/store/mem.rs b/src/store/mem.rs index 083e95f2..6d022e0f 100644 --- a/src/store/mem.rs +++ b/src/store/mem.rs @@ -51,7 +51,7 @@ use crate::{ ImportByteStreamMsg, ImportByteStreamUpdate, ImportBytesMsg, ImportBytesRequest, ImportPathMsg, ImportPathRequest, ListBlobsMsg, ListTagsMsg, ListTagsRequest, ObserveMsg, ObserveRequest, RenameTagMsg, RenameTagRequest, Scope, SetTagMsg, - SetTagRequest, ShutdownMsg, SyncDbMsg, + SetTagRequest, ShutdownMsg, SyncDbMsg, WaitIdleMsg, }, tags::TagInfo, ApiClient, @@ -122,6 +122,7 @@ impl MemStore { options: Arc::new(Options::default()), temp_tags: Default::default(), protected: Default::default(), + idle_waiters: Default::default(), } .run(), ); @@ -137,6 +138,8 @@ struct Actor { options: Arc, // temp tags temp_tags: TempTags, + // idle waiters + idle_waiters: Vec>, protected: HashSet, } @@ -162,6 +165,16 @@ impl Actor { let entry = self.get_or_create_entry(hash); self.spawn(import_bao(entry, size, data, tx)); } + Command::WaitIdle(WaitIdleMsg { tx, .. }) => { + trace!("wait idle"); + if self.tasks.is_empty() { + // we are currently idle + tx.send(()).await.ok(); + } else { + // wait for idle state + self.idle_waiters.push(tx); + } + } Command::Observe(ObserveMsg { inner: ObserveRequest { hash }, tx, @@ -485,6 +498,12 @@ impl Actor { } TaskResult::Unit(_) => {} } + if self.tasks.is_empty() { + // we are idle now + for tx in self.idle_waiters.drain(..) { + tx.send(()).await.ok(); + } + } } } }; diff --git a/src/store/readonly_mem.rs b/src/store/readonly_mem.rs index 55ef3693..42274b2e 100644 --- a/src/store/readonly_mem.rs +++ b/src/store/readonly_mem.rs @@ -37,7 +37,7 @@ use crate::{ self, BlobStatus, Command, ExportBaoMsg, ExportBaoRequest, ExportPathMsg, ExportPathRequest, ExportRangesItem, ExportRangesMsg, ExportRangesRequest, ImportBaoMsg, ImportByteStreamMsg, ImportBytesMsg, ImportPathMsg, ObserveMsg, - ObserveRequest, + ObserveRequest, WaitIdleMsg, }, ApiClient, TempTag, }, @@ -62,6 +62,7 @@ impl Deref for ReadonlyMemStore { struct Actor { commands: tokio::sync::mpsc::Receiver, tasks: JoinSet<()>, + idle_waiters: Vec>, data: HashMap, } @@ -74,6 +75,7 @@ impl Actor { data, commands, tasks: JoinSet::new(), + idle_waiters: Vec::new(), } } @@ -86,6 +88,15 @@ impl Actor { .await .ok(); } + Command::WaitIdle(WaitIdleMsg { tx, .. }) => { + if self.tasks.is_empty() { + // we are currently idle + tx.send(()).await.ok(); + } else { + // wait for idle state + self.idle_waiters.push(tx); + } + } Command::ImportBytes(ImportBytesMsg { tx, .. }) => { tx.send(io::Error::other("import not supported").into()) .await @@ -226,6 +237,12 @@ impl Actor { }, Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => { self.log_unit_task(res); + if self.tasks.is_empty() { + // we are idle now + for tx in self.idle_waiters.drain(..) { + tx.send(()).await.ok(); + } + } }, else => break, } diff --git a/src/util.rs b/src/util.rs index e1c30921..7b9ad4e6 100644 --- a/src/util.rs +++ b/src/util.rs @@ -472,6 +472,7 @@ pub mod sink { } } + #[allow(dead_code)] pub struct IrpcSenderSink(pub irpc::channel::mpsc::Sender); impl Sink for IrpcSenderSink