Skip to content

Commit

Permalink
fix: Public reconciliation of a concurrent write and remove (of indep…
Browse files Browse the repository at this point in the history
…endent paths) (#432)

* chore: Write failing test case

Also: Add better debug impls for `PublicFile`, `PublicFile`, `PublicLink`, etc.

* fix: Concurrent write and remove should reconcile correctly
  • Loading branch information
matheus23 authored Apr 25, 2024
1 parent abfefef commit e265c94
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 44 deletions.
8 changes: 6 additions & 2 deletions wnfs-common/src/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,12 @@ where
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
Self::Encoded { cid, .. } => f.debug_tuple("Link::Encoded").field(cid).finish(),
Self::Decoded { value, .. } => f.debug_tuple("Link::Decoded").field(value).finish(),
Self::Encoded { cid, value_cache } => f
.debug_struct("Link::Encoded")
.field("cid", &format!("{cid}"))
.field("value_cache", &value_cache.get())
.finish(),
Self::Decoded { value } => f.debug_tuple("Link::Decoded").field(value).finish(),
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions wnfs/src/private/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1453,13 +1453,13 @@ impl PrivateDirectory {
}
(PrivateNode::File(_), PrivateNode::Dir(_)) => {
// a directory wins over a file
*our_link = other_link.clone();
our_link.clone_from(other_link);
}
// file vs. file and dir vs. dir cases
_ => {
// We tie-break as usual
if ord == Ordering::Greater {
*our_link = other_link.clone();
our_link.clone_from(other_link);
}
}
}
Expand Down
125 changes: 87 additions & 38 deletions wnfs/src/public/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ use super::{
PublicDirectorySerializable, PublicFile, PublicLink, PublicNode, PublicNodeSerializable,
};
use crate::{
error::FsError, is_readable_wnfs_version, traits::Id, utils, SearchResult, WNFS_VERSION,
error::FsError,
is_readable_wnfs_version,
traits::Id,
utils::{self, OnceCellDebug},
SearchResult, WNFS_VERSION,
};
use anyhow::{bail, ensure, Result};
use async_once_cell::OnceCell;
Expand Down Expand Up @@ -36,7 +40,6 @@ use wnfs_common::{
///
/// println!("Directory: {:?}", dir);
/// ```
#[derive(Debug)]
pub struct PublicDirectory {
persisted_as: OnceCell<Cid>,
pub(crate) metadata: Metadata,
Expand Down Expand Up @@ -868,7 +871,7 @@ impl PublicDirectory {
/// See the documentation for the `Reconciliation` enum for more information.
pub async fn reconcile(
self: &mut Arc<Self>,
other: Arc<Self>,
other: &Arc<Self>,
store: &impl BlockStore,
) -> Result<Reconciliation> {
let causal_order = self.clone().causal_compare(other.clone(), store).await?;
Expand All @@ -877,42 +880,21 @@ impl PublicDirectory {
Some(Ordering::Equal) => Reconciliation::AlreadyAhead,
Some(Ordering::Greater) => Reconciliation::AlreadyAhead,
Some(Ordering::Less) => {
*self = other;
self.clone_from(other);
Reconciliation::FastForward
}
None => {
let file_tie_breaks = self.merge(&other, store).await?;
let mut file_tie_breaks = BTreeSet::new();
self.reconcile_helper(other, store, &[], &mut file_tie_breaks)
.await?;
Reconciliation::Merged { file_tie_breaks }
}
})
}

/// Merge this node with given other node, ignoring whether the
/// other node is actually ahead in history or not.
///
/// Prefer using `reconcile`, if you don't know what the difference is!
///
/// Returns the set of file paths where tie breaks were used to resolve
/// conflicts. This means that for each path there exists a file that has been
/// overwritten with another version.
///
/// It's possible to walk the history backwards to find which version of each
/// file has been overwritten & merge the two file versions of each file together
/// in an application-specific way and create another history entry.
pub async fn merge<'a>(
self: &'a mut Arc<Self>,
other: &'a Arc<Self>,
store: &'a impl BlockStore,
) -> Result<BTreeSet<Vec<String>>> {
let mut file_tie_breaks = BTreeSet::new();
self.merge_helper(other, store, &[], &mut file_tie_breaks)
.await?;
Ok(file_tie_breaks)
}

#[cfg_attr(not(target_arch = "wasm32"), async_recursion)]
#[cfg_attr(target_arch = "wasm32", async_recursion(?Send))]
async fn merge_helper<'a>(
async fn reconcile_helper<'a>(
self: &'a mut Arc<Self>,
other: &'a Arc<Self>,
store: &'a impl BlockStore,
Expand Down Expand Up @@ -944,6 +926,21 @@ impl PublicDirectory {
}
Entry::Occupied(mut occupied) => {
let our_node = occupied.get_mut().resolve_value_mut(store).await?;

match our_node.causal_compare(other_node, store).await? {
Some(Ordering::Equal) => {
continue;
}
Some(Ordering::Less) => {
our_node.clone_from(other_node);
continue;
}
Some(Ordering::Greater) => {
continue;
}
None => {}
};

match (our_node, other_node) {
(PublicNode::File(our_file), PublicNode::File(other_file)) => {
if our_file.merge(other_file, store).await? {
Expand All @@ -963,7 +960,7 @@ impl PublicDirectory {
(PublicNode::Dir(dir), PublicNode::Dir(other_dir)) => {
let mut path = current_path.to_vec();
path.push(name.clone());
dir.merge_helper(other_dir, store, &path, file_tie_breaks)
dir.reconcile_helper(other_dir, store, &path, file_tie_breaks)
.await?;
}
}
Expand All @@ -975,6 +972,27 @@ impl PublicDirectory {
}
}

impl std::fmt::Debug for PublicDirectory {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PublicDirectory")
.field(
"persisted_as",
&OnceCellDebug(self.persisted_as.get().map(|cid| format!("{cid}"))),
)
.field("metadata", &self.metadata)
.field("userland", &self.userland)
.field(
"previous",
&self
.previous
.iter()
.map(|cid| format!("{cid}"))
.collect::<Vec<_>>(),
)
.finish()
}
}

impl Id for PublicDirectory {
fn get_id(&self) -> String {
format!("{:p}", &self.metadata)
Expand Down Expand Up @@ -1455,6 +1473,37 @@ mod tests {
vec![previous_cid]
);
}

#[async_std::test]
async fn reconciliation_of_concurrent_write_and_remove() -> TestResult {
// This path we first write, then both replicas have it, then one replica removes it
let path1 = &["a".into(), "b.txt".into()];
// This path the second replica writes after forking
let path2 = &["file.txt".into()];
// we should be left with `b.txt` removed, while `file.txt` is there.

let time = Utc::now();
let store = &MemoryBlockStore::new();
let root_dir = &mut PublicDirectory::new_rc(time);
root_dir.store(store).await?;
root_dir.write(path1, vec![0], time, store).await?;
root_dir.store(store).await?;

let fork = &mut Arc::clone(root_dir);
fork.rm(path1, store).await?;
fork.store(store).await?;

root_dir.write(path2, vec![0], time, store).await?;
root_dir.store(store).await?;

root_dir.reconcile(fork, store).await?;

assert!(root_dir.get_node(path1, store).await?.is_none());

assert_eq!(root_dir.read(path2, store).await?, vec![0]);

Ok(())
}
}

#[cfg(test)]
Expand Down Expand Up @@ -1571,7 +1620,7 @@ mod proptests {

root1.mkdir(&path, time, store).await.unwrap();

root0.merge(root1, store).await.unwrap();
root0.reconcile(root1, store).await.unwrap();

let node = root0
.get_node(&path, store)
Expand All @@ -1598,9 +1647,9 @@ mod proptests {
let root1 = convert_fs(fs1, time, store).await.unwrap();

let mut merge_one_way = Arc::clone(&root0);
merge_one_way.merge(&root1, store).await.unwrap();
merge_one_way.reconcile(&root1, store).await.unwrap();
let mut merge_other_way = Arc::clone(&root1);
merge_other_way.merge(&root0, store).await.unwrap();
merge_other_way.reconcile(&root0, store).await.unwrap();

let cid_one_way = merge_one_way.store(store).await.unwrap();
let cid_other_way = merge_other_way.store(store).await.unwrap();
Expand All @@ -1625,13 +1674,13 @@ mod proptests {
let root2 = convert_fs(fs2, time, store).await.unwrap();

let mut merge_0_1_then_2 = Arc::clone(&root0);
merge_0_1_then_2.merge(&root1, store).await.unwrap();
merge_0_1_then_2.merge(&root2, store).await.unwrap();
merge_0_1_then_2.reconcile(&root1, store).await.unwrap();
merge_0_1_then_2.reconcile(&root2, store).await.unwrap();

let mut merge_1_2 = Arc::clone(&root1);
merge_1_2.merge(&root2, store).await.unwrap();
merge_1_2.reconcile(&root2, store).await.unwrap();
let mut merge_0_with_1_2 = Arc::clone(&root0);
merge_0_with_1_2.merge(&merge_1_2, store).await.unwrap();
merge_0_with_1_2.reconcile(&merge_1_2, store).await.unwrap();

let cid_one_way = merge_0_1_then_2.store(store).await.unwrap();
let cid_other_way = merge_0_with_1_2.store(store).await.unwrap();
Expand All @@ -1657,7 +1706,7 @@ mod proptests {
let mut root = convert_fs(fs0, time, store).await.unwrap();
let root1 = convert_fs(fs1, time, store).await.unwrap();

root.merge(&root1, store).await.unwrap();
root.reconcile(&root1, store).await.unwrap();

for dir in all_dirs {
let exists = root.get_node(&dir, store).await.unwrap().is_some();
Expand Down
26 changes: 24 additions & 2 deletions wnfs/src/public/file.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
//! Public fs file node.
use super::{PublicFileSerializable, PublicNodeSerializable};
use crate::{error::FsError, is_readable_wnfs_version, traits::Id, WNFS_VERSION};
use crate::{
error::FsError, is_readable_wnfs_version, traits::Id, utils::OnceCellDebug, WNFS_VERSION,
};
use anyhow::{anyhow, bail, Result};
use async_once_cell::OnceCell;
use chrono::{DateTime, Utc};
Expand All @@ -28,7 +30,6 @@ use wnfs_unixfs_file::{builder::FileBuilder, unixfs::UnixFsFile};
///
/// println!("File: {:?}", file);
/// ```
#[derive(Debug)]
pub struct PublicFile {
persisted_as: OnceCell<Cid>,
pub(crate) metadata: Metadata,
Expand Down Expand Up @@ -545,6 +546,27 @@ impl PublicFile {
}
}

impl std::fmt::Debug for PublicFile {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PublicFile")
.field(
"persisted_as",
&OnceCellDebug(self.persisted_as.get().map(|cid| format!("{cid}"))),
)
.field("metadata", &self.metadata)
.field("userland", &self.userland)
.field(
"previous",
&self
.previous
.iter()
.map(|cid| format!("{cid}"))
.collect::<Vec<_>>(),
)
.finish()
}
}

impl Storable for PublicFile {
type Serializable = PublicNodeSerializable;

Expand Down

0 comments on commit e265c94

Please sign in to comment.