Skip to content

Commit

Permalink
fix: Public conflict reconciliation edge case on files with equal con…
Browse files Browse the repository at this point in the history
…tent but different metadata (#430)

* chore: Improve public dir proptests to cover metadata

* fix: generate better file system cases in public dire proptests

* fix: public directory merge commutativity test case

The failing case was having two files with identical content, but
different metadata were merging non-commutativiely.

* refactor: Extract out `PublicFile::merge` from `PublicDirectory::merge_helper`

* chore: Avoid deprecated chrono functions

* fix: `File::set_content` parameter order
  • Loading branch information
matheus23 authored Apr 23, 2024
1 parent dc0785a commit a0bf23c
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 64 deletions.
3 changes: 2 additions & 1 deletion wnfs-wasm/src/fs/public/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ impl PublicFile {
let time = DateTime::<Utc>::from(time);

Ok(future_to_promise(async move {
file.set_content(time, content, &store)
file.prepare_next_revision()
.set_content(content, time, &store)
.await
.map_err(error("Cannot set file content"))?;

Expand Down
2 changes: 1 addition & 1 deletion wnfs/examples/tiered_blockstores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async fn main() -> Result<()> {

// `set_content` actually writes the data blocks to the blockstore in chunks,
// so for this we provide the `cold_store`.
file.set_content(Utc::now(), &video[..], forest, &cold_store, rng)
file.set_content(&video[..], Utc::now(), forest, &cold_store, rng)
.await?;

// When storing the hierarchy data blocks, we use the `hot_store`:
Expand Down
1 change: 1 addition & 0 deletions wnfs/proptest-regressions/public/directory.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ cc 52d317f93f0d815bfd054e6147b32492abc79b100274458f3fc266d1d9f40083 # shrinks to
cc 5d512e34a6b76473ff418d6cc7730003875ae30727a3155b2abc13d5f8313b58 # shrinks to input = _TestMergeCommutativityArgs { fs0: FileSystem { files: {}, dirs: {} }, fs1: FileSystem { files: {["a"]: "a"}, dirs: {["a"]} } }
cc d4c4529fd972a2a6af4dcecd28a289d11451203600ae18e001dbdd42fe19e245 # shrinks to input = _TestMergeCommutativityArgs { fs0: FileSystem { files: {["b"]: "a", ["b", "a"]: "a"}, dirs: {} }, fs1: FileSystem { files: {}, dirs: {} } }
cc e5c61f6ac3dec61974eedf0a7042fd1f801efa9f020e4b37473d5a11a7a7a7a4 # shrinks to input = _TestMergeAssociativityArgs { fs0: FileSystem { files: {}, dirs: {["e", "b"]} }, fs1: FileSystem { files: {}, dirs: {["e", "b"]} }, fs2: FileSystem { files: {}, dirs: {["e", "b"]} } }
cc 1384a02358a0d16afcc3908434eb01fd04cb681d3acc0795fdf0254243d365ec # shrinks to input = _TestMergeCommutativityArgs { fs0: FileSystem { files: {["f", "a"]: ("a", "e")}, dirs: {} }, fs1: FileSystem { files: {["f", "a"]: ("b", "e")}, dirs: {} } }
2 changes: 1 addition & 1 deletion wnfs/src/private/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ impl PrivateDirectory {
/// // Define the content that will replace what is already in the file
/// let new_file_content = b"print('hello world 2')";
/// // Set the contents of the file, waiting for result and expecting no errors
/// file.set_content(Utc::now(), &new_file_content[..], forest, store, rng)
/// file.set_content(&new_file_content[..], Utc::now(), forest, store, rng)
/// .await?;
/// // Read the file again
/// let result = root_dir.read(hello_py, true, forest, store).await?;
Expand Down
4 changes: 2 additions & 2 deletions wnfs/src/private/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,8 +680,8 @@ impl PrivateFile {
/// Sets the content of a file.
pub async fn set_content(
&mut self,
time: DateTime<Utc>,
content: impl AsyncRead + Unpin,
time: DateTime<Utc>,
forest: &mut impl PrivateForest,
store: &impl BlockStore,
rng: &mut impl CryptoRngCore,
Expand Down Expand Up @@ -1371,8 +1371,8 @@ mod proptests {
let mut file = PrivateFile::new(&forest.empty_name(), Utc::now(), rng);

file.set_content(
Utc::now(),
&mut Cursor::new(vec![5u8; length]),
Utc::now(),
forest,
&MemoryBlockStore::default(),
rng,
Expand Down
97 changes: 54 additions & 43 deletions wnfs/src/public/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,11 @@ impl PublicDirectory {
let dir = self.get_or_create_leaf_dir_mut(path, time, store).await?;

match dir.lookup_node_mut(filename, store).await? {
Some(PublicNode::File(file)) => file.set_content(time, content, store).await?,
Some(PublicNode::File(file)) => {
file.prepare_next_revision()
.set_content(content, time, store)
.await?
}
Some(PublicNode::Dir(_)) => bail!(FsError::DirectoryAlreadyExists),
None => {
dir.userland.insert(
Expand Down Expand Up @@ -932,41 +936,20 @@ impl PublicDirectory {
}
dir.metadata.tie_break_with(&other.metadata)?;

for (name, link) in other.userland.iter() {
let other_node = link.resolve_value(store).await?;
for (name, other_link) in other.userland.iter() {
let other_node = other_link.resolve_value(store).await?;
match dir.userland.entry(name.clone()) {
Entry::Vacant(vacant) => {
vacant.insert(PublicLink::new(other_node.clone()));
vacant.insert(other_link.clone());
}
Entry::Occupied(mut occupied) => {
let our_node = occupied.get_mut().resolve_value_mut(store).await?;
match (our_node, other_node) {
(PublicNode::File(our_file), PublicNode::File(other_file)) => {
let our_cid = our_file.store(store).await?;
let other_cid = other_file.store(store).await?;
if our_cid == other_cid {
continue; // No need to merge, the files are equal
}

let mut path = current_path.to_vec();
path.push(name.clone());
file_tie_breaks.insert(path);

let our_content_cid = our_file.userland.resolve_cid(store).await?;
let other_content_cid = other_file.userland.resolve_cid(store).await?;

let file = our_file.prepare_next_merge(store).await?;
if other_file.previous.len() > 1 {
// The other node is a merge node, we should merge the merge nodes directly:
file.previous.extend(other_file.previous.iter().cloned());
} else {
// The other node is a 'normal' node - we need to merge it normally
file.previous.insert(other_file.store(store).await?);
}

if our_content_cid.hash().digest() > other_content_cid.hash().digest() {
file.userland = other_file.userland.clone();
file.metadata = other_file.metadata.clone();
if our_file.merge(other_file, store).await? {
let mut path = current_path.to_vec();
path.push(name.clone());
file_tie_breaks.insert(path);
}
}
(node @ PublicNode::File(_), PublicNode::Dir(other_dir)) => {
Expand Down Expand Up @@ -1477,31 +1460,41 @@ mod tests {
#[cfg(test)]
mod proptests {
use super::*;
use libipld_core::ipld::Ipld;
use proptest::{
collection::{btree_map, btree_set, vec},
collection::{btree_map, vec},
prelude::*,
};
use test_strategy::proptest;
use wnfs_common::MemoryBlockStore;

type MockMetadata = String;

#[derive(Debug, Clone)]
struct FileSystem {
files: BTreeMap<Vec<String>, String>,
dirs: BTreeSet<Vec<String>>,
files: BTreeMap<Vec<String>, (MockMetadata, String)>,
dirs: BTreeMap<Vec<String>, MockMetadata>,
}

fn file_system() -> impl Strategy<Value = FileSystem> {
(
btree_map(
vec(simple_string(), 1..10),
(simple_string(), simple_string()),
// we generate a lot more file paths than directory paths
// since file paths get filtered out and lose over directory paths
0..100,
),
btree_map(vec(simple_string(), 1..10), simple_string(), 0..40),
btree_set(vec(simple_string(), 1..10), 0..40),
)
.prop_map(|(mut files, dirs)| {
files = files
.into_iter()
.filter(|(file_path, _)| {
// We filter out file paths that are prefixes of directory paths in advance
!dirs
.iter()
.any(|dir_path| !dir_path.starts_with(&file_path))
.any(|(dir_path, _)| dir_path.starts_with(&file_path))
})
.collect();
FileSystem { files, dirs }
Expand All @@ -1515,13 +1508,15 @@ mod proptests {

fn valid_fs(fs: &FileSystem) -> bool {
fs.files.iter().all(|(file_path, _)| {
// File paths must not be prefixes of directory paths
!fs.dirs
.iter()
.any(|dir_path| dir_path.starts_with(&file_path))
.any(|(dir_path, _)| dir_path.starts_with(&file_path))
// file paths must not be prefixes of other file paths
&& !fs
.files
.iter()
.any(|(other_path, _)| other_path.starts_with(&file_path))
.any(|(other_path, _)| file_path != other_path && other_path.starts_with(&file_path))
})
}

Expand All @@ -1532,13 +1527,29 @@ mod proptests {
) -> Result<Arc<PublicDirectory>> {
let mut dir = PublicDirectory::new_rc(time);
let FileSystem { files, dirs } = fs;
for (path, content) in files.iter() {
dir.write(&path, content.clone().into_bytes(), time, store)
.await?;
for (path, (metadata, content)) in files.into_iter() {
let file = dir.open_file_mut(&path, time, store).await?;
file.set_content(content.into_bytes(), time, store).await?;
file.get_metadata_mut()
.put("test-meta", Ipld::String(metadata));
}

for path in dirs.iter() {
dir.mkdir(&path, time, store).await?;
for (path, metadata) in dirs.into_iter() {
let (path, filename) = utils::split_last(&path)?;
// There's currently no API for setting metadata on a directory :S
dir.get_or_create_leaf_dir_mut(path, time, store)
.await?
.userland
.entry(filename.clone())
// Create a file, if it doesn't exist yet
.or_insert_with(|| PublicLink::with_dir(PublicDirectory::new(time)))
// Get a mutable ref out of the directory entry
.resolve_value_mut(store)
.await?
.as_dir_mut()?
.prepare_next_revision()
.get_metadata_mut()
.put("test-meta", Ipld::String(metadata));
}

Ok(dir)
Expand Down Expand Up @@ -1640,8 +1651,8 @@ mod proptests {
let store = &MemoryBlockStore::new();
let time = Utc::now();

let mut all_dirs = fs0.dirs.clone();
all_dirs.extend(fs1.dirs.iter().cloned());
let mut all_dirs = fs0.dirs.keys().cloned().collect::<BTreeSet<_>>();
all_dirs.extend(fs1.dirs.keys().cloned());

let mut root = convert_fs(fs0, time, store).await.unwrap();
let root1 = convert_fs(fs1, time, store).await.unwrap();
Expand Down
76 changes: 66 additions & 10 deletions wnfs/src/public/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use async_once_cell::OnceCell;
use chrono::{DateTime, Utc};
use futures::{AsyncRead, AsyncReadExt};
use libipld_core::cid::Cid;
use std::{collections::BTreeSet, io::SeekFrom};
use std::{cmp::Ordering, collections::BTreeSet, io::SeekFrom};
use tokio::io::AsyncSeekExt;
use tokio_util::compat::{FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt};
use wnfs_common::{
Expand Down Expand Up @@ -390,10 +390,10 @@ impl PublicFile {
}

/// Takes care of creating previous links, in case the current
/// directory was previously `.store()`ed.
/// In any case it'll try to give you ownership of the directory if possible,
/// file was previously `.store()`ed.
/// In any case it'll try to give you ownership of the file if possible,
/// otherwise it clones.
pub(crate) fn prepare_next_revision<'a>(self: &'a mut Arc<Self>) -> &'a mut Self {
pub fn prepare_next_revision<'a>(self: &'a mut Arc<Self>) -> &'a mut Self {
let Some(previous_cid) = self.persisted_as.get().cloned() else {
return Arc::make_mut(self);
};
Expand Down Expand Up @@ -434,9 +434,9 @@ impl PublicFile {
/// Writes a new content cid to the file.
/// This will create a new revision of the file.
pub async fn set_content(
self: &mut Arc<Self>,
time: DateTime<Utc>,
&mut self,
content: Vec<u8>,
time: DateTime<Utc>,
store: &impl BlockStore,
) -> Result<()> {
let content_cid = FileBuilder::new()
Expand All @@ -445,9 +445,8 @@ impl PublicFile {
.store(store)
.await?;

let file = self.prepare_next_revision();
file.metadata.upsert_mtime(time);
file.userland = Link::from_cid(content_cid);
self.metadata.upsert_mtime(time);
self.userland = Link::from_cid(content_cid);

Ok(())
}
Expand Down Expand Up @@ -488,6 +487,62 @@ impl PublicFile {
pub fn get_metadata_mut_rc<'a>(self: &'a mut Arc<Self>) -> &'a mut Metadata {
self.prepare_next_revision().get_metadata_mut()
}

/// Runs the merge part of the conflict reconciliation algorithm on this
/// file together with the other file.
///
/// Don't call this function, unless you know what you're doing. Prefer
/// calling `PrivateDirectory::reconcile` instead.
///
/// This function is commutative and associative.
///
/// Both `self` and `other` will be serialized to given blockstore when calling
/// this function.
///
/// The return value indicates whether tie-breaking was necessary or not.
pub async fn merge(
self: &mut Arc<Self>,
other: &Arc<Self>,
store: &impl BlockStore,
) -> Result<bool> {
let our_cid = self.store(store).await?;
let other_cid = other.store(store).await?;
if our_cid == other_cid {
return Ok(false); // No need to merge, the files are equal
}

let our_content_cid = self.userland.resolve_cid(store).await?;
let other_content_cid = other.userland.resolve_cid(store).await?;

let file = self.prepare_next_merge(store).await?;
if other.previous.len() > 1 {
// The other node is a merge node, we should merge the merge nodes directly:
file.previous.extend(other.previous.iter().cloned());
} else {
// The other node is a 'normal' node - we need to merge it normally
file.previous.insert(other.store(store).await?);
}

match our_content_cid
.hash()
.digest()
.cmp(other_content_cid.hash().digest())
{
Ordering::Greater => {
file.userland.clone_from(&other.userland);
file.metadata.clone_from(&other.metadata);
}
Ordering::Equal => {
file.metadata.tie_break_with(&other.metadata)?;
}
Ordering::Less => {
// We take ours
}
}

// Returning true to indicate that we needed to tie-break
Ok(true)
}
}

impl Storable for PublicFile {
Expand Down Expand Up @@ -629,7 +684,8 @@ mod snapshot_tests {
let file = &mut PublicFile::new_rc(time);
let _ = file.store(store).await?;

file.set_content(time, b"Hello, World!".to_vec(), store)
let file = file.prepare_next_revision();
file.set_content(b"Hello, World!".to_vec(), time, store)
.await?;
let cid = file.store(store).await?;

Expand Down
7 changes: 1 addition & 6 deletions wnfs/src/public/node/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,6 @@ mod tests {
#[cfg(test)]
mod proptests {
use super::*;
use chrono::NaiveDateTime;
use futures::{stream, StreamExt, TryStreamExt};
use proptest::{collection::vec, prelude::*};
use test_strategy::proptest;
Expand Down Expand Up @@ -441,11 +440,7 @@ mod proptests {
}

fn time(n: i64) -> DateTime<Utc> {
DateTime::from_naive_utc_and_offset(
// convert into seconds, otherwise 0 and 1 would both be mapped to "0 seconds"
NaiveDateTime::from_timestamp_millis(n * 1000).unwrap(),
Utc,
)
DateTime::<Utc>::from_timestamp(n, 0).unwrap()
}

pub fn get_head(&self, n: usize) -> &Arc<PublicDirectory> {
Expand Down

0 comments on commit a0bf23c

Please sign in to comment.