diff --git a/wnfs-wasm/src/fs/public/file.rs b/wnfs-wasm/src/fs/public/file.rs index 0d4d28a1..ee84391a 100644 --- a/wnfs-wasm/src/fs/public/file.rs +++ b/wnfs-wasm/src/fs/public/file.rs @@ -166,7 +166,8 @@ impl PublicFile { let time = DateTime::::from(time); Ok(future_to_promise(async move { - file.set_content(time, content, &store) + file.prepare_next_revision() + .set_content(content, time, &store) .await .map_err(error("Cannot set file content"))?; diff --git a/wnfs/examples/tiered_blockstores.rs b/wnfs/examples/tiered_blockstores.rs index 4cfab8d4..4839e940 100644 --- a/wnfs/examples/tiered_blockstores.rs +++ b/wnfs/examples/tiered_blockstores.rs @@ -49,7 +49,7 @@ async fn main() -> Result<()> { // `set_content` actually writes the data blocks to the blockstore in chunks, // so for this we provide the `cold_store`. - file.set_content(Utc::now(), &video[..], forest, &cold_store, rng) + file.set_content(&video[..], Utc::now(), forest, &cold_store, rng) .await?; // When storing the hierarchy data blocks, we use the `hot_store`: diff --git a/wnfs/proptest-regressions/public/directory.txt b/wnfs/proptest-regressions/public/directory.txt index 5aa948a5..b0eafee9 100644 --- a/wnfs/proptest-regressions/public/directory.txt +++ b/wnfs/proptest-regressions/public/directory.txt @@ -9,3 +9,4 @@ cc 52d317f93f0d815bfd054e6147b32492abc79b100274458f3fc266d1d9f40083 # shrinks to cc 5d512e34a6b76473ff418d6cc7730003875ae30727a3155b2abc13d5f8313b58 # shrinks to input = _TestMergeCommutativityArgs { fs0: FileSystem { files: {}, dirs: {} }, fs1: FileSystem { files: {["a"]: "a"}, dirs: {["a"]} } } cc d4c4529fd972a2a6af4dcecd28a289d11451203600ae18e001dbdd42fe19e245 # shrinks to input = _TestMergeCommutativityArgs { fs0: FileSystem { files: {["b"]: "a", ["b", "a"]: "a"}, dirs: {} }, fs1: FileSystem { files: {}, dirs: {} } } cc e5c61f6ac3dec61974eedf0a7042fd1f801efa9f020e4b37473d5a11a7a7a7a4 # shrinks to input = _TestMergeAssociativityArgs { fs0: FileSystem { files: {}, dirs: {["e", "b"]} }, fs1: FileSystem { files: {}, dirs: {["e", "b"]} }, fs2: FileSystem { files: {}, dirs: {["e", "b"]} } } +cc 1384a02358a0d16afcc3908434eb01fd04cb681d3acc0795fdf0254243d365ec # shrinks to input = _TestMergeCommutativityArgs { fs0: FileSystem { files: {["f", "a"]: ("a", "e")}, dirs: {} }, fs1: FileSystem { files: {["f", "a"]: ("b", "e")}, dirs: {} } } diff --git a/wnfs/src/private/directory.rs b/wnfs/src/private/directory.rs index f15a6b5a..171d4246 100644 --- a/wnfs/src/private/directory.rs +++ b/wnfs/src/private/directory.rs @@ -616,7 +616,7 @@ impl PrivateDirectory { /// // Define the content that will replace what is already in the file /// let new_file_content = b"print('hello world 2')"; /// // Set the contents of the file, waiting for result and expecting no errors - /// file.set_content(Utc::now(), &new_file_content[..], forest, store, rng) + /// file.set_content(&new_file_content[..], Utc::now(), forest, store, rng) /// .await?; /// // Read the file again /// let result = root_dir.read(hello_py, true, forest, store).await?; diff --git a/wnfs/src/private/file.rs b/wnfs/src/private/file.rs index 1f707b54..af144696 100644 --- a/wnfs/src/private/file.rs +++ b/wnfs/src/private/file.rs @@ -680,8 +680,8 @@ impl PrivateFile { /// Sets the content of a file. pub async fn set_content( &mut self, - time: DateTime, content: impl AsyncRead + Unpin, + time: DateTime, forest: &mut impl PrivateForest, store: &impl BlockStore, rng: &mut impl CryptoRngCore, @@ -1371,8 +1371,8 @@ mod proptests { let mut file = PrivateFile::new(&forest.empty_name(), Utc::now(), rng); file.set_content( - Utc::now(), &mut Cursor::new(vec![5u8; length]), + Utc::now(), forest, &MemoryBlockStore::default(), rng, diff --git a/wnfs/src/public/directory.rs b/wnfs/src/public/directory.rs index 17e5a7fb..9bdbf68e 100644 --- a/wnfs/src/public/directory.rs +++ b/wnfs/src/public/directory.rs @@ -510,7 +510,11 @@ impl PublicDirectory { let dir = self.get_or_create_leaf_dir_mut(path, time, store).await?; match dir.lookup_node_mut(filename, store).await? { - Some(PublicNode::File(file)) => file.set_content(time, content, store).await?, + Some(PublicNode::File(file)) => { + file.prepare_next_revision() + .set_content(content, time, store) + .await? + } Some(PublicNode::Dir(_)) => bail!(FsError::DirectoryAlreadyExists), None => { dir.userland.insert( @@ -932,41 +936,20 @@ impl PublicDirectory { } dir.metadata.tie_break_with(&other.metadata)?; - for (name, link) in other.userland.iter() { - let other_node = link.resolve_value(store).await?; + for (name, other_link) in other.userland.iter() { + let other_node = other_link.resolve_value(store).await?; match dir.userland.entry(name.clone()) { Entry::Vacant(vacant) => { - vacant.insert(PublicLink::new(other_node.clone())); + vacant.insert(other_link.clone()); } Entry::Occupied(mut occupied) => { let our_node = occupied.get_mut().resolve_value_mut(store).await?; match (our_node, other_node) { (PublicNode::File(our_file), PublicNode::File(other_file)) => { - let our_cid = our_file.store(store).await?; - let other_cid = other_file.store(store).await?; - if our_cid == other_cid { - continue; // No need to merge, the files are equal - } - - let mut path = current_path.to_vec(); - path.push(name.clone()); - file_tie_breaks.insert(path); - - let our_content_cid = our_file.userland.resolve_cid(store).await?; - let other_content_cid = other_file.userland.resolve_cid(store).await?; - - let file = our_file.prepare_next_merge(store).await?; - if other_file.previous.len() > 1 { - // The other node is a merge node, we should merge the merge nodes directly: - file.previous.extend(other_file.previous.iter().cloned()); - } else { - // The other node is a 'normal' node - we need to merge it normally - file.previous.insert(other_file.store(store).await?); - } - - if our_content_cid.hash().digest() > other_content_cid.hash().digest() { - file.userland = other_file.userland.clone(); - file.metadata = other_file.metadata.clone(); + if our_file.merge(other_file, store).await? { + let mut path = current_path.to_vec(); + path.push(name.clone()); + file_tie_breaks.insert(path); } } (node @ PublicNode::File(_), PublicNode::Dir(other_dir)) => { @@ -1477,31 +1460,41 @@ mod tests { #[cfg(test)] mod proptests { use super::*; + use libipld_core::ipld::Ipld; use proptest::{ - collection::{btree_map, btree_set, vec}, + collection::{btree_map, vec}, prelude::*, }; use test_strategy::proptest; use wnfs_common::MemoryBlockStore; + type MockMetadata = String; + #[derive(Debug, Clone)] struct FileSystem { - files: BTreeMap, String>, - dirs: BTreeSet>, + files: BTreeMap, (MockMetadata, String)>, + dirs: BTreeMap, MockMetadata>, } fn file_system() -> impl Strategy { ( + btree_map( + vec(simple_string(), 1..10), + (simple_string(), simple_string()), + // we generate a lot more file paths than directory paths + // since file paths get filtered out and lose over directory paths + 0..100, + ), btree_map(vec(simple_string(), 1..10), simple_string(), 0..40), - btree_set(vec(simple_string(), 1..10), 0..40), ) .prop_map(|(mut files, dirs)| { files = files .into_iter() .filter(|(file_path, _)| { + // We filter out file paths that are prefixes of directory paths in advance !dirs .iter() - .any(|dir_path| !dir_path.starts_with(&file_path)) + .any(|(dir_path, _)| dir_path.starts_with(&file_path)) }) .collect(); FileSystem { files, dirs } @@ -1515,13 +1508,15 @@ mod proptests { fn valid_fs(fs: &FileSystem) -> bool { fs.files.iter().all(|(file_path, _)| { + // File paths must not be prefixes of directory paths !fs.dirs .iter() - .any(|dir_path| dir_path.starts_with(&file_path)) + .any(|(dir_path, _)| dir_path.starts_with(&file_path)) + // file paths must not be prefixes of other file paths && !fs .files .iter() - .any(|(other_path, _)| other_path.starts_with(&file_path)) + .any(|(other_path, _)| file_path != other_path && other_path.starts_with(&file_path)) }) } @@ -1532,13 +1527,29 @@ mod proptests { ) -> Result> { let mut dir = PublicDirectory::new_rc(time); let FileSystem { files, dirs } = fs; - for (path, content) in files.iter() { - dir.write(&path, content.clone().into_bytes(), time, store) - .await?; + for (path, (metadata, content)) in files.into_iter() { + let file = dir.open_file_mut(&path, time, store).await?; + file.set_content(content.into_bytes(), time, store).await?; + file.get_metadata_mut() + .put("test-meta", Ipld::String(metadata)); } - for path in dirs.iter() { - dir.mkdir(&path, time, store).await?; + for (path, metadata) in dirs.into_iter() { + let (path, filename) = utils::split_last(&path)?; + // There's currently no API for setting metadata on a directory :S + dir.get_or_create_leaf_dir_mut(path, time, store) + .await? + .userland + .entry(filename.clone()) + // Create a file, if it doesn't exist yet + .or_insert_with(|| PublicLink::with_dir(PublicDirectory::new(time))) + // Get a mutable ref out of the directory entry + .resolve_value_mut(store) + .await? + .as_dir_mut()? + .prepare_next_revision() + .get_metadata_mut() + .put("test-meta", Ipld::String(metadata)); } Ok(dir) @@ -1640,8 +1651,8 @@ mod proptests { let store = &MemoryBlockStore::new(); let time = Utc::now(); - let mut all_dirs = fs0.dirs.clone(); - all_dirs.extend(fs1.dirs.iter().cloned()); + let mut all_dirs = fs0.dirs.keys().cloned().collect::>(); + all_dirs.extend(fs1.dirs.keys().cloned()); let mut root = convert_fs(fs0, time, store).await.unwrap(); let root1 = convert_fs(fs1, time, store).await.unwrap(); diff --git a/wnfs/src/public/file.rs b/wnfs/src/public/file.rs index cdfcada4..2c37e612 100644 --- a/wnfs/src/public/file.rs +++ b/wnfs/src/public/file.rs @@ -7,7 +7,7 @@ use async_once_cell::OnceCell; use chrono::{DateTime, Utc}; use futures::{AsyncRead, AsyncReadExt}; use libipld_core::cid::Cid; -use std::{collections::BTreeSet, io::SeekFrom}; +use std::{cmp::Ordering, collections::BTreeSet, io::SeekFrom}; use tokio::io::AsyncSeekExt; use tokio_util::compat::{FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt}; use wnfs_common::{ @@ -390,10 +390,10 @@ impl PublicFile { } /// Takes care of creating previous links, in case the current - /// directory was previously `.store()`ed. - /// In any case it'll try to give you ownership of the directory if possible, + /// file was previously `.store()`ed. + /// In any case it'll try to give you ownership of the file if possible, /// otherwise it clones. - pub(crate) fn prepare_next_revision<'a>(self: &'a mut Arc) -> &'a mut Self { + pub fn prepare_next_revision<'a>(self: &'a mut Arc) -> &'a mut Self { let Some(previous_cid) = self.persisted_as.get().cloned() else { return Arc::make_mut(self); }; @@ -434,9 +434,9 @@ impl PublicFile { /// Writes a new content cid to the file. /// This will create a new revision of the file. pub async fn set_content( - self: &mut Arc, - time: DateTime, + &mut self, content: Vec, + time: DateTime, store: &impl BlockStore, ) -> Result<()> { let content_cid = FileBuilder::new() @@ -445,9 +445,8 @@ impl PublicFile { .store(store) .await?; - let file = self.prepare_next_revision(); - file.metadata.upsert_mtime(time); - file.userland = Link::from_cid(content_cid); + self.metadata.upsert_mtime(time); + self.userland = Link::from_cid(content_cid); Ok(()) } @@ -488,6 +487,62 @@ impl PublicFile { pub fn get_metadata_mut_rc<'a>(self: &'a mut Arc) -> &'a mut Metadata { self.prepare_next_revision().get_metadata_mut() } + + /// Runs the merge part of the conflict reconciliation algorithm on this + /// file together with the other file. + /// + /// Don't call this function, unless you know what you're doing. Prefer + /// calling `PrivateDirectory::reconcile` instead. + /// + /// This function is commutative and associative. + /// + /// Both `self` and `other` will be serialized to given blockstore when calling + /// this function. + /// + /// The return value indicates whether tie-breaking was necessary or not. + pub async fn merge( + self: &mut Arc, + other: &Arc, + store: &impl BlockStore, + ) -> Result { + let our_cid = self.store(store).await?; + let other_cid = other.store(store).await?; + if our_cid == other_cid { + return Ok(false); // No need to merge, the files are equal + } + + let our_content_cid = self.userland.resolve_cid(store).await?; + let other_content_cid = other.userland.resolve_cid(store).await?; + + let file = self.prepare_next_merge(store).await?; + if other.previous.len() > 1 { + // The other node is a merge node, we should merge the merge nodes directly: + file.previous.extend(other.previous.iter().cloned()); + } else { + // The other node is a 'normal' node - we need to merge it normally + file.previous.insert(other.store(store).await?); + } + + match our_content_cid + .hash() + .digest() + .cmp(other_content_cid.hash().digest()) + { + Ordering::Greater => { + file.userland.clone_from(&other.userland); + file.metadata.clone_from(&other.metadata); + } + Ordering::Equal => { + file.metadata.tie_break_with(&other.metadata)?; + } + Ordering::Less => { + // We take ours + } + } + + // Returning true to indicate that we needed to tie-break + Ok(true) + } } impl Storable for PublicFile { @@ -629,7 +684,8 @@ mod snapshot_tests { let file = &mut PublicFile::new_rc(time); let _ = file.store(store).await?; - file.set_content(time, b"Hello, World!".to_vec(), store) + let file = file.prepare_next_revision(); + file.set_content(b"Hello, World!".to_vec(), time, store) .await?; let cid = file.store(store).await?; diff --git a/wnfs/src/public/node/node.rs b/wnfs/src/public/node/node.rs index 9e98ed42..79da130e 100644 --- a/wnfs/src/public/node/node.rs +++ b/wnfs/src/public/node/node.rs @@ -413,7 +413,6 @@ mod tests { #[cfg(test)] mod proptests { use super::*; - use chrono::NaiveDateTime; use futures::{stream, StreamExt, TryStreamExt}; use proptest::{collection::vec, prelude::*}; use test_strategy::proptest; @@ -441,11 +440,7 @@ mod proptests { } fn time(n: i64) -> DateTime { - DateTime::from_naive_utc_and_offset( - // convert into seconds, otherwise 0 and 1 would both be mapped to "0 seconds" - NaiveDateTime::from_timestamp_millis(n * 1000).unwrap(), - Utc, - ) + DateTime::::from_timestamp(n, 0).unwrap() } pub fn get_head(&self, n: usize) -> &Arc {