Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Public conflict reconciliation edge case on files with equal content but different metadata #430

Merged
merged 6 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion wnfs-wasm/src/fs/public/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ impl PublicFile {
let time = DateTime::<Utc>::from(time);

Ok(future_to_promise(async move {
file.set_content(time, content, &store)
file.prepare_next_revision()
.set_content(content, time, &store)
.await
.map_err(error("Cannot set file content"))?;

Expand Down
2 changes: 1 addition & 1 deletion wnfs/examples/tiered_blockstores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async fn main() -> Result<()> {

// `set_content` actually writes the data blocks to the blockstore in chunks,
// so for this we provide the `cold_store`.
file.set_content(Utc::now(), &video[..], forest, &cold_store, rng)
file.set_content(&video[..], Utc::now(), forest, &cold_store, rng)
.await?;

// When storing the hierarchy data blocks, we use the `hot_store`:
Expand Down
1 change: 1 addition & 0 deletions wnfs/proptest-regressions/public/directory.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ cc 52d317f93f0d815bfd054e6147b32492abc79b100274458f3fc266d1d9f40083 # shrinks to
cc 5d512e34a6b76473ff418d6cc7730003875ae30727a3155b2abc13d5f8313b58 # shrinks to input = _TestMergeCommutativityArgs { fs0: FileSystem { files: {}, dirs: {} }, fs1: FileSystem { files: {["a"]: "a"}, dirs: {["a"]} } }
cc d4c4529fd972a2a6af4dcecd28a289d11451203600ae18e001dbdd42fe19e245 # shrinks to input = _TestMergeCommutativityArgs { fs0: FileSystem { files: {["b"]: "a", ["b", "a"]: "a"}, dirs: {} }, fs1: FileSystem { files: {}, dirs: {} } }
cc e5c61f6ac3dec61974eedf0a7042fd1f801efa9f020e4b37473d5a11a7a7a7a4 # shrinks to input = _TestMergeAssociativityArgs { fs0: FileSystem { files: {}, dirs: {["e", "b"]} }, fs1: FileSystem { files: {}, dirs: {["e", "b"]} }, fs2: FileSystem { files: {}, dirs: {["e", "b"]} } }
cc 1384a02358a0d16afcc3908434eb01fd04cb681d3acc0795fdf0254243d365ec # shrinks to input = _TestMergeCommutativityArgs { fs0: FileSystem { files: {["f", "a"]: ("a", "e")}, dirs: {} }, fs1: FileSystem { files: {["f", "a"]: ("b", "e")}, dirs: {} } }
2 changes: 1 addition & 1 deletion wnfs/src/private/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ impl PrivateDirectory {
/// // Define the content that will replace what is already in the file
/// let new_file_content = b"print('hello world 2')";
/// // Set the contents of the file, waiting for result and expecting no errors
/// file.set_content(Utc::now(), &new_file_content[..], forest, store, rng)
/// file.set_content(&new_file_content[..], Utc::now(), forest, store, rng)
/// .await?;
/// // Read the file again
/// let result = root_dir.read(hello_py, true, forest, store).await?;
Expand Down
4 changes: 2 additions & 2 deletions wnfs/src/private/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,8 +680,8 @@ impl PrivateFile {
/// Sets the content of a file.
pub async fn set_content(
&mut self,
time: DateTime<Utc>,
content: impl AsyncRead + Unpin,
time: DateTime<Utc>,
forest: &mut impl PrivateForest,
store: &impl BlockStore,
rng: &mut impl CryptoRngCore,
Expand Down Expand Up @@ -1371,8 +1371,8 @@ mod proptests {
let mut file = PrivateFile::new(&forest.empty_name(), Utc::now(), rng);

file.set_content(
Utc::now(),
&mut Cursor::new(vec![5u8; length]),
Utc::now(),
forest,
&MemoryBlockStore::default(),
rng,
Expand Down
97 changes: 54 additions & 43 deletions wnfs/src/public/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,11 @@ impl PublicDirectory {
let dir = self.get_or_create_leaf_dir_mut(path, time, store).await?;

match dir.lookup_node_mut(filename, store).await? {
Some(PublicNode::File(file)) => file.set_content(time, content, store).await?,
Some(PublicNode::File(file)) => {
file.prepare_next_revision()
.set_content(content, time, store)
.await?
}
Some(PublicNode::Dir(_)) => bail!(FsError::DirectoryAlreadyExists),
None => {
dir.userland.insert(
Expand Down Expand Up @@ -932,41 +936,20 @@ impl PublicDirectory {
}
dir.metadata.tie_break_with(&other.metadata)?;

for (name, link) in other.userland.iter() {
let other_node = link.resolve_value(store).await?;
for (name, other_link) in other.userland.iter() {
let other_node = other_link.resolve_value(store).await?;
match dir.userland.entry(name.clone()) {
Entry::Vacant(vacant) => {
vacant.insert(PublicLink::new(other_node.clone()));
vacant.insert(other_link.clone());
}
Entry::Occupied(mut occupied) => {
let our_node = occupied.get_mut().resolve_value_mut(store).await?;
match (our_node, other_node) {
(PublicNode::File(our_file), PublicNode::File(other_file)) => {
let our_cid = our_file.store(store).await?;
let other_cid = other_file.store(store).await?;
if our_cid == other_cid {
continue; // No need to merge, the files are equal
}

let mut path = current_path.to_vec();
path.push(name.clone());
file_tie_breaks.insert(path);

let our_content_cid = our_file.userland.resolve_cid(store).await?;
let other_content_cid = other_file.userland.resolve_cid(store).await?;

let file = our_file.prepare_next_merge(store).await?;
if other_file.previous.len() > 1 {
// The other node is a merge node, we should merge the merge nodes directly:
file.previous.extend(other_file.previous.iter().cloned());
} else {
// The other node is a 'normal' node - we need to merge it normally
file.previous.insert(other_file.store(store).await?);
}

if our_content_cid.hash().digest() > other_content_cid.hash().digest() {
file.userland = other_file.userland.clone();
file.metadata = other_file.metadata.clone();
if our_file.merge(other_file, store).await? {
let mut path = current_path.to_vec();
path.push(name.clone());
file_tie_breaks.insert(path);
}
}
(node @ PublicNode::File(_), PublicNode::Dir(other_dir)) => {
Expand Down Expand Up @@ -1477,31 +1460,41 @@ mod tests {
#[cfg(test)]
mod proptests {
use super::*;
use libipld_core::ipld::Ipld;
use proptest::{
collection::{btree_map, btree_set, vec},
collection::{btree_map, vec},
prelude::*,
};
use test_strategy::proptest;
use wnfs_common::MemoryBlockStore;

type MockMetadata = String;

#[derive(Debug, Clone)]
struct FileSystem {
files: BTreeMap<Vec<String>, String>,
dirs: BTreeSet<Vec<String>>,
files: BTreeMap<Vec<String>, (MockMetadata, String)>,
dirs: BTreeMap<Vec<String>, MockMetadata>,
}

fn file_system() -> impl Strategy<Value = FileSystem> {
(
btree_map(
vec(simple_string(), 1..10),
(simple_string(), simple_string()),
// we generate a lot more file paths than directory paths
// since file paths get filtered out and lose over directory paths
0..100,
),
btree_map(vec(simple_string(), 1..10), simple_string(), 0..40),
btree_set(vec(simple_string(), 1..10), 0..40),
)
.prop_map(|(mut files, dirs)| {
files = files
.into_iter()
.filter(|(file_path, _)| {
// We filter out file paths that are prefixes of directory paths in advance
!dirs
.iter()
.any(|dir_path| !dir_path.starts_with(&file_path))
.any(|(dir_path, _)| dir_path.starts_with(&file_path))
})
.collect();
FileSystem { files, dirs }
Expand All @@ -1515,13 +1508,15 @@ mod proptests {

fn valid_fs(fs: &FileSystem) -> bool {
fs.files.iter().all(|(file_path, _)| {
// File paths must not be prefixes of directory paths
!fs.dirs
.iter()
.any(|dir_path| dir_path.starts_with(&file_path))
.any(|(dir_path, _)| dir_path.starts_with(&file_path))
// file paths must not be prefixes of other file paths
&& !fs
.files
.iter()
.any(|(other_path, _)| other_path.starts_with(&file_path))
.any(|(other_path, _)| file_path != other_path && other_path.starts_with(&file_path))
})
}

Expand All @@ -1532,13 +1527,29 @@ mod proptests {
) -> Result<Arc<PublicDirectory>> {
let mut dir = PublicDirectory::new_rc(time);
let FileSystem { files, dirs } = fs;
for (path, content) in files.iter() {
dir.write(&path, content.clone().into_bytes(), time, store)
.await?;
for (path, (metadata, content)) in files.into_iter() {
let file = dir.open_file_mut(&path, time, store).await?;
file.set_content(content.into_bytes(), time, store).await?;
file.get_metadata_mut()
.put("test-meta", Ipld::String(metadata));
}

for path in dirs.iter() {
dir.mkdir(&path, time, store).await?;
for (path, metadata) in dirs.into_iter() {
let (path, filename) = utils::split_last(&path)?;
// There's currently no API for setting metadata on a directory :S
dir.get_or_create_leaf_dir_mut(path, time, store)
.await?
.userland
.entry(filename.clone())
// Create a file, if it doesn't exist yet
.or_insert_with(|| PublicLink::with_dir(PublicDirectory::new(time)))
// Get a mutable ref out of the directory entry
.resolve_value_mut(store)
.await?
.as_dir_mut()?
.prepare_next_revision()
.get_metadata_mut()
.put("test-meta", Ipld::String(metadata));
}

Ok(dir)
Expand Down Expand Up @@ -1640,8 +1651,8 @@ mod proptests {
let store = &MemoryBlockStore::new();
let time = Utc::now();

let mut all_dirs = fs0.dirs.clone();
all_dirs.extend(fs1.dirs.iter().cloned());
let mut all_dirs = fs0.dirs.keys().cloned().collect::<BTreeSet<_>>();
all_dirs.extend(fs1.dirs.keys().cloned());

let mut root = convert_fs(fs0, time, store).await.unwrap();
let root1 = convert_fs(fs1, time, store).await.unwrap();
Expand Down
76 changes: 66 additions & 10 deletions wnfs/src/public/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use async_once_cell::OnceCell;
use chrono::{DateTime, Utc};
use futures::{AsyncRead, AsyncReadExt};
use libipld_core::cid::Cid;
use std::{collections::BTreeSet, io::SeekFrom};
use std::{cmp::Ordering, collections::BTreeSet, io::SeekFrom};
use tokio::io::AsyncSeekExt;
use tokio_util::compat::{FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt};
use wnfs_common::{
Expand Down Expand Up @@ -390,10 +390,10 @@ impl PublicFile {
}

/// Takes care of creating previous links, in case the current
/// directory was previously `.store()`ed.
/// In any case it'll try to give you ownership of the directory if possible,
/// file was previously `.store()`ed.
/// In any case it'll try to give you ownership of the file if possible,
/// otherwise it clones.
pub(crate) fn prepare_next_revision<'a>(self: &'a mut Arc<Self>) -> &'a mut Self {
pub fn prepare_next_revision<'a>(self: &'a mut Arc<Self>) -> &'a mut Self {
let Some(previous_cid) = self.persisted_as.get().cloned() else {
return Arc::make_mut(self);
};
Expand Down Expand Up @@ -434,9 +434,9 @@ impl PublicFile {
/// Writes a new content cid to the file.
/// This will create a new revision of the file.
pub async fn set_content(
self: &mut Arc<Self>,
time: DateTime<Utc>,
&mut self,
content: Vec<u8>,
time: DateTime<Utc>,
store: &impl BlockStore,
) -> Result<()> {
let content_cid = FileBuilder::new()
Expand All @@ -445,9 +445,8 @@ impl PublicFile {
.store(store)
.await?;

let file = self.prepare_next_revision();
file.metadata.upsert_mtime(time);
file.userland = Link::from_cid(content_cid);
self.metadata.upsert_mtime(time);
self.userland = Link::from_cid(content_cid);

Ok(())
}
Expand Down Expand Up @@ -488,6 +487,62 @@ impl PublicFile {
pub fn get_metadata_mut_rc<'a>(self: &'a mut Arc<Self>) -> &'a mut Metadata {
self.prepare_next_revision().get_metadata_mut()
}

/// Runs the merge part of the conflict reconciliation algorithm on this
/// file together with the other file.
///
/// Don't call this function, unless you know what you're doing. Prefer
/// calling `PrivateDirectory::reconcile` instead.
///
/// This function is commutative and associative.
///
/// Both `self` and `other` will be serialized to given blockstore when calling
/// this function.
///
/// The return value indicates whether tie-breaking was necessary or not.
pub async fn merge(
self: &mut Arc<Self>,
other: &Arc<Self>,
store: &impl BlockStore,
) -> Result<bool> {
let our_cid = self.store(store).await?;
let other_cid = other.store(store).await?;
if our_cid == other_cid {
return Ok(false); // No need to merge, the files are equal
}

let our_content_cid = self.userland.resolve_cid(store).await?;
let other_content_cid = other.userland.resolve_cid(store).await?;

let file = self.prepare_next_merge(store).await?;
if other.previous.len() > 1 {
// The other node is a merge node, we should merge the merge nodes directly:
file.previous.extend(other.previous.iter().cloned());
} else {
// The other node is a 'normal' node - we need to merge it normally
file.previous.insert(other.store(store).await?);
}

match our_content_cid
.hash()
.digest()
.cmp(other_content_cid.hash().digest())
{
Ordering::Greater => {
file.userland.clone_from(&other.userland);
file.metadata.clone_from(&other.metadata);
}
Ordering::Equal => {
file.metadata.tie_break_with(&other.metadata)?;
}
Ordering::Less => {
// We take ours
}
}

// Returning true to indicate that we needed to tie-break
Ok(true)
}
}

impl Storable for PublicFile {
Expand Down Expand Up @@ -629,7 +684,8 @@ mod snapshot_tests {
let file = &mut PublicFile::new_rc(time);
let _ = file.store(store).await?;

file.set_content(time, b"Hello, World!".to_vec(), store)
let file = file.prepare_next_revision();
file.set_content(b"Hello, World!".to_vec(), time, store)
.await?;
let cid = file.store(store).await?;

Expand Down
7 changes: 1 addition & 6 deletions wnfs/src/public/node/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,6 @@ mod tests {
#[cfg(test)]
mod proptests {
use super::*;
use chrono::NaiveDateTime;
use futures::{stream, StreamExt, TryStreamExt};
use proptest::{collection::vec, prelude::*};
use test_strategy::proptest;
Expand Down Expand Up @@ -441,11 +440,7 @@ mod proptests {
}

fn time(n: i64) -> DateTime<Utc> {
DateTime::from_naive_utc_and_offset(
// convert into seconds, otherwise 0 and 1 would both be mapped to "0 seconds"
NaiveDateTime::from_timestamp_millis(n * 1000).unwrap(),
Utc,
)
DateTime::<Utc>::from_timestamp(n, 0).unwrap()
}

pub fn get_head(&self, n: usize) -> &Arc<PublicDirectory> {
Expand Down
Loading