diff --git a/wnfs-bench/hamt.rs b/wnfs-bench/hamt.rs index fa469d22..f34fea05 100644 --- a/wnfs-bench/hamt.rs +++ b/wnfs-bench/hamt.rs @@ -6,10 +6,8 @@ use criterion::{ use proptest::{arbitrary::any, collection::vec, test_runner::TestRunner}; use std::cmp; use wnfs_common::{ - async_encode, decode, - libipld::cbor::DagCborCodec, utils::{Arc, Sampleable}, - BlockStore, Link, MemoryBlockStore, + BlockStore, Link, MemoryBlockStore, Storable, StoreIpld, }; use wnfs_hamt::{ diff, merge, @@ -82,17 +80,12 @@ fn node_load_get(c: &mut Criterion) { node.set(i.to_string(), i, &store).await.unwrap(); } - let encoded_hamt = async_encode(&Hamt::with_root(node), &store, DagCborCodec) - .await - .unwrap(); - - store.put_serializable(&encoded_hamt).await.unwrap() + Hamt::with_root(node).store(&store).await.unwrap() }); c.bench_function("node load and get", |b| { b.to_async(AsyncStdExecutor).iter(|| async { - let encoded_hamt = store.get_deserializable::>(&cid).await.unwrap(); - let hamt: Hamt = decode(encoded_hamt.as_ref(), DagCborCodec).unwrap(); + let hamt = Hamt::::load(&cid, &store).await.unwrap(); for i in 0..50 { assert!(hamt @@ -114,18 +107,12 @@ fn node_load_remove(c: &mut Criterion) { node.set(i.to_string(), i, &store).await.unwrap(); } - let encoded_hamt = async_encode(&Hamt::with_root(node), &store, DagCborCodec) - .await - .unwrap(); - - store.put_serializable(&encoded_hamt).await.unwrap() + Hamt::with_root(node).store(&store).await.unwrap() }); c.bench_function("node load and remove", |b| { b.to_async(AsyncStdExecutor).iter(|| async { - let encoded_hamt = store.get_deserializable::>(&cid).await.unwrap(); - let mut hamt: Hamt = - black_box(decode(encoded_hamt.as_ref(), DagCborCodec).unwrap()); + let mut hamt = black_box(Hamt::::load(&cid, &store).await.unwrap()); for i in 0..50 { let value = hamt.root.remove(&i.to_string(), &store).await.unwrap(); @@ -143,11 +130,14 @@ fn hamt_load_decode(c: &mut Criterion) { node.set(i.to_string(), i, &store).await.unwrap(); } - let encoded_hamt = async_encode(&Hamt::with_root(node), &store, DagCborCodec) + let (encoded_hamt, codec) = Hamt::with_root(node) + .to_serializable(&store) .await + .unwrap() + .encode_ipld() .unwrap(); - let cid = store.put_serializable(&encoded_hamt).await.unwrap(); + let cid = store.put_block(encoded_hamt.clone(), codec).await.unwrap(); (cid, encoded_hamt) }); @@ -156,9 +146,7 @@ fn hamt_load_decode(c: &mut Criterion) { group.throughput(Throughput::Bytes(bytes.len() as u64)); group.bench_function("0", |b| { b.to_async(AsyncStdExecutor).iter(|| async { - let encoded_hamt = store.get_deserializable::>(&cid).await.unwrap(); - let _: Hamt = - black_box(decode(encoded_hamt.as_ref(), DagCborCodec).unwrap()); + black_box(Hamt::::load(&cid, &store).await.unwrap()); }) }); group.finish(); @@ -180,7 +168,13 @@ fn hamt_set_encode(c: &mut Criterion) { let hamt = Hamt::with_root(node); - let _ = black_box(async_encode(&hamt, &store, DagCborCodec).await.unwrap()); + black_box( + hamt.to_serializable(&store) + .await + .unwrap() + .encode_ipld() + .unwrap(), + ); }, BatchSize::SmallInput, ) diff --git a/wnfs-common/Cargo.toml b/wnfs-common/Cargo.toml index 61bb2c5e..d1254438 100644 --- a/wnfs-common/Cargo.toml +++ b/wnfs-common/Cargo.toml @@ -33,6 +33,7 @@ parking_lot = "0.12" proptest = { version = "1.1", optional = true } rand_core = "0.6" serde = { version = "1.0", features = ["rc"] } +serde_ipld_dagcbor = "0.4.2" serde_json = { version = "1.0", optional = true } thiserror = "1.0" diff --git a/wnfs-common/src/async_serialize.rs b/wnfs-common/src/async_serialize.rs deleted file mode 100644 index 27a94742..00000000 --- a/wnfs-common/src/async_serialize.rs +++ /dev/null @@ -1,94 +0,0 @@ -use crate::{ - utils::{Arc, CondSend, CondSync}, - BlockStore, -}; -use async_trait::async_trait; -use libipld::{error::SerdeError, serde as ipld_serde, Ipld}; -use serde::{Serialize, Serializer}; - -//-------------------------------------------------------------------------------------------------- -// Macros -//-------------------------------------------------------------------------------------------------- - -macro_rules! impl_async_serialize { - ( $( $ty:ty $( : < $( $generics:ident ),+ > )? ),+ ) => { - $( - #[cfg_attr(not(target_arch = "wasm32"), async_trait)] - #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] - impl $( < $( $generics ),+ > )? AsyncSerialize for $ty $( where $( $generics: Serialize + CondSync ),+ )? { - async fn async_serialize( - &self, - serializer: S, - _: &BS, - ) -> Result { - self.serialize(serializer) - } - } - )+ - }; -} - -//-------------------------------------------------------------------------------------------------- -// Type Definitions -//-------------------------------------------------------------------------------------------------- - -/// A **data structure** that can be serialized into any data format supported -/// by Serde. -/// -/// This trait is slightly different from Serde's Serialize trait because it allows for asynchronous -/// serialisation and it is designed for the IPLD ecosystem where a `Store` is sometimes needed to -/// properly resolve the internal state of certain data structures to Cids. -/// -/// An example of this is the PublicDirectory which can contain links to other IPLD nodes. -/// These links need to be resolved to Cids during serialization if they aren't already. -#[cfg_attr(not(target_arch = "wasm32"), async_trait)] -#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] -pub trait AsyncSerialize { - /// Serializes the type. - async fn async_serialize(&self, serializer: S, store: &B) -> Result - where - S: Serializer + CondSend, - S::Error: CondSend, - B: BlockStore + ?Sized; - - /// Serialize with an IPLD serializer. - async fn async_serialize_ipld(&self, store: &B) -> Result - where - B: BlockStore + ?Sized, - { - self.async_serialize(ipld_serde::Serializer, store).await - } -} - -//-------------------------------------------------------------------------------------------------- -// Implementations -//-------------------------------------------------------------------------------------------------- - -#[cfg_attr(not(target_arch = "wasm32"), async_trait)] -#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] -impl AsyncSerialize for Arc { - async fn async_serialize(&self, serializer: S, store: &B) -> Result - where - S: Serializer + CondSend, - S::Error: CondSend, - B: BlockStore + ?Sized, - { - self.as_ref().async_serialize(serializer, store).await - } -} - -impl_async_serialize! { usize, u128, u64, u32, u16, u8, isize, i128, i64, i32, i16, i8 } -impl_async_serialize! { String, &str } -impl_async_serialize! { - (A,): , - (A, B): , - (A, B, C): , - (A, B, C, D): , - (A, B, C, D, E): , - (A, B, C, D, E, F): , - (A, B, C, D, E, F, G): , - (A, B, C, D, E, F, G, H): , - (A, B, C, D, E, F, G, H, I): , - (A, B, C, D, E, F, G, H, I, J): , - (A, B, C, D, E, F, G, H, I, J, K): -} diff --git a/wnfs-common/src/blockstore.rs b/wnfs-common/src/blockstore.rs index ad4ec201..8f139d28 100644 --- a/wnfs-common/src/blockstore.rs +++ b/wnfs-common/src/blockstore.rs @@ -1,7 +1,7 @@ use crate::{ decode, encode, utils::{Arc, CondSend, CondSync}, - AsyncSerialize, BlockStoreError, MAX_BLOCK_SIZE, + BlockStoreError, MAX_BLOCK_SIZE, }; use anyhow::{bail, Result}; use async_trait::async_trait; @@ -72,15 +72,6 @@ pub trait BlockStore: Sized + CondSync { self.put_block(bytes, CODEC_DAG_CBOR).await } - async fn put_async_serializable(&self, value: &V) -> Result - where - V: AsyncSerialize + CondSync, - { - let ipld = value.async_serialize_ipld(self).await?; - let bytes = encode(&ipld, DagCborCodec)?; - self.put_block(bytes, CODEC_DAG_CBOR).await - } - // This should be the same in all implementations of BlockStore fn create_cid(&self, bytes: &[u8], codec: u64) -> Result { // If there are too many bytes, abandon this task diff --git a/wnfs-common/src/encoding.rs b/wnfs-common/src/encoding.rs index a6ef20fc..1cf1d8ba 100644 --- a/wnfs-common/src/encoding.rs +++ b/wnfs-common/src/encoding.rs @@ -1,4 +1,3 @@ -use crate::{utils::CondSync, AsyncSerialize, BlockStore}; use anyhow::Result; use libipld::{ codec::{Decode, Encode}, @@ -21,19 +20,6 @@ where Ok(bytes) } -/// Encodes an async serializable value into DagCbor bytes. -pub async fn async_encode(value: &V, store: &impl BlockStore, codec: C) -> Result> -where - V: AsyncSerialize + CondSync, - C: Codec, - Ipld: Encode, -{ - let ipld = value.async_serialize_ipld(store).await?; - let mut bytes = Vec::new(); - >::encode(&ipld, codec, &mut bytes)?; - Ok(bytes) -} - /// Decodes recieved DagCbor bytes into a deserializable value. pub fn decode(bytes: &[u8], codec: C) -> Result where diff --git a/wnfs-common/src/lib.rs b/wnfs-common/src/lib.rs index 892b6d7d..b3ac7b9a 100644 --- a/wnfs-common/src/lib.rs +++ b/wnfs-common/src/lib.rs @@ -1,21 +1,21 @@ //! This crate contains the common types and functions used by the WNFS crates. -mod async_serialize; pub mod blockstore; mod encoding; mod error; mod link; mod metadata; mod pathnodes; +mod storable; mod traits; pub mod utils; -pub use async_serialize::*; pub use blockstore::*; pub use encoding::*; pub use error::*; pub use link::*; pub use metadata::*; pub use pathnodes::*; +pub use storable::*; //-------------------------------------------------------------------------------------------------- // Constants diff --git a/wnfs-common/src/link.rs b/wnfs-common/src/link.rs index 937fb2d4..d6fb8cec 100644 --- a/wnfs-common/src/link.rs +++ b/wnfs-common/src/link.rs @@ -1,13 +1,8 @@ -use crate::{ - traits::IpldEq, - utils::{Arc, CondSync}, - AsyncSerialize, BlockStore, -}; +use crate::{traits::IpldEq, utils::CondSync, BlockStore, Storable}; use anyhow::Result; use async_once_cell::OnceCell; use async_trait::async_trait; use libipld::Cid; -use serde::de::DeserializeOwned; use std::fmt::{self, Debug, Formatter}; //-------------------------------------------------------------------------------------------------- @@ -26,7 +21,7 @@ pub enum Link { Encoded { cid: Cid, value_cache: OnceCell }, /// A variant of `Link` that started out as a deserialized value `T`. /// If the cid is resolved using `resolve_cid`, then `T`'s `.persisted_as` from the - /// `RemembersCid` trait is called and that `OnceCell` is populated, preventing + /// `Storable` trait is called and that `OnceCell` is populated, preventing /// further calls to `resolve_cid` from duplicating work. Decoded { value: T }, } @@ -35,7 +30,7 @@ pub enum Link { // Implementations //-------------------------------------------------------------------------------------------------- -impl Link { +impl Link { /// Creates a new `Link` that starts out as a Cid. pub fn from_cid(cid: Cid) -> Self { Self::Encoded { @@ -45,54 +40,30 @@ impl Link { } /// Gets the Cid stored in type. It attempts to get it from the store if it is not present in type. - pub async fn resolve_cid(&self, store: &impl BlockStore) -> Result<&Cid> - where - T: AsyncSerialize, - { + pub async fn resolve_cid(&self, store: &impl BlockStore) -> Result { match self { - Self::Encoded { cid, .. } => Ok(cid), - Self::Decoded { value } => { - let cid_cache = value.persisted_as(); - cid_cache - .get_or_try_init(store.put_async_serializable(value)) - .await - } + Self::Encoded { cid, .. } => Ok(*cid), + Self::Decoded { value } => value.store(store).await, } } /// Gets the value stored in link. It attempts to get it from the store if it is not present in link. - pub async fn resolve_value(&self, store: &impl BlockStore) -> Result<&T> - where - T: DeserializeOwned, - { + pub async fn resolve_value(&self, store: &impl BlockStore) -> Result<&T> { match self { Self::Encoded { cid, value_cache } => { - value_cache - .get_or_try_init(async { - let value: T = store.get_deserializable(cid).await?; - value.persisted_as().get_or_init(async { *cid }).await; - Ok(value) - }) - .await + value_cache.get_or_try_init(T::load(cid, store)).await } Self::Decoded { value, .. } => Ok(value), } } /// Gets mut value stored in link. It attempts to get it from the store if it is not present in link. - pub async fn resolve_value_mut(&mut self, store: &impl BlockStore) -> Result<&mut T> - where - T: DeserializeOwned, - { + pub async fn resolve_value_mut(&mut self, store: &impl BlockStore) -> Result<&mut T> { match self { Self::Encoded { cid, value_cache } => { let value = match value_cache.take() { Some(v) => v, - None => { - let value: T = store.get_deserializable(cid).await?; - value.persisted_as().get_or_init(async { *cid }).await; - value - } + None => T::load(cid, store).await?, }; *self = Self::Decoded { value }; @@ -112,7 +83,7 @@ impl Link { pub fn get_cid(&self) -> Option<&Cid> { match self { Self::Encoded { cid, .. } => Some(cid), - Self::Decoded { value } => value.persisted_as().get(), + Self::Decoded { value } => value.persisted_as().and_then(OnceCell::get), } } @@ -129,16 +100,12 @@ impl Link { /// Gets an owned value from type. It attempts to it get from the store if it is not present in type. pub async fn resolve_owned_value(self, store: &impl BlockStore) -> Result where - T: DeserializeOwned, + T: Storable, { match self { Self::Encoded { cid, value_cache } => match value_cache.into_inner() { Some(cached) => Ok(cached), - None => { - let value: T = store.get_deserializable(&cid).await?; - value.persisted_as().get_or_init(async { cid }).await; - Ok(value) - } + None => Ok(T::load(&cid, store).await?), }, Self::Decoded { value, .. } => Ok(value), } @@ -146,10 +113,7 @@ impl Link { /// Checks if there is a Cid cached in link. pub fn has_cid(&self) -> bool { - match self { - Self::Decoded { value } => value.persisted_as().get().is_some(), - _ => true, - } + self.get_cid().is_some() } /// Checks if there is a value stored in link. @@ -163,7 +127,7 @@ impl Link { /// Compares two links for equality. Attempts to get them from store if they are not already cached. pub async fn deep_eq(&self, other: &Link, store: &impl BlockStore) -> Result where - T: PartialEq + AsyncSerialize, + T: PartialEq + Storable, { if self == other { return Ok(true); @@ -175,7 +139,7 @@ impl Link { #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] -impl IpldEq for Link { +impl IpldEq for Link { async fn eq(&self, other: &Link, store: &impl BlockStore) -> Result { if self == other { return Ok(true); @@ -185,7 +149,7 @@ impl IpldEq for Link } } -impl From for Link { +impl From for Link { fn from(value: T) -> Self { Self::Decoded { value } } @@ -212,7 +176,7 @@ where } } -impl PartialEq for Link +impl PartialEq for Link where T: PartialEq, { @@ -254,30 +218,18 @@ where } } -pub trait RemembersCid { - fn persisted_as(&self) -> &OnceCell; -} - -impl RemembersCid for Arc { - fn persisted_as(&self) -> &OnceCell { - self.as_ref().persisted_as() - } -} - //-------------------------------------------------------------------------------------------------- // Tests //-------------------------------------------------------------------------------------------------- #[cfg(test)] mod tests { - use crate::{ - utils::CondSend, AsyncSerialize, BlockStore, Link, MemoryBlockStore, RemembersCid, - }; - use ::serde::{Deserialize, Serialize}; + use crate::{BlockStore, Link, MemoryBlockStore, Storable}; + use anyhow::Result; use async_once_cell::OnceCell; use async_trait::async_trait; use libipld::Cid; - use serde::Serializer; + use serde::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize)] struct Example { @@ -286,15 +238,24 @@ mod tests { persisted_as: OnceCell, } - #[cfg_attr(not(target_arch = "wasm32"), async_trait)] - #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] - impl AsyncSerialize for Example { - async fn async_serialize(&self, serializer: S, _store: &B) -> Result - where - S: Serializer + CondSend, - B: BlockStore + ?Sized, - { - self.serialize(serializer) + #[async_trait] + impl Storable for Example { + type Serializable = Example; + + async fn to_serializable(&self, _store: &impl BlockStore) -> Result { + Ok(self.clone()) + } + + async fn from_serializable( + cid: Option<&Cid>, + mut serializable: Self::Serializable, + ) -> Result { + serializable.persisted_as = cid.cloned().map(OnceCell::new_with).unwrap_or_default(); + Ok(serializable) + } + + fn persisted_as(&self) -> Option<&OnceCell> { + Some(&self.persisted_as) } } @@ -327,12 +288,6 @@ mod tests { } } - impl RemembersCid for Example { - fn persisted_as(&self) -> &OnceCell { - &self.persisted_as - } - } - #[async_std::test] async fn link_value_can_be_resolved() { let store = &MemoryBlockStore::default(); @@ -352,7 +307,7 @@ mod tests { let link = Link::::from(example.clone()); let cid = link.resolve_cid(store).await.unwrap(); - let value = store.get_deserializable::(cid).await.unwrap(); + let value = Example::load(&cid, store).await.unwrap(); assert_eq!(value, example); } diff --git a/wnfs-common/src/storable.rs b/wnfs-common/src/storable.rs new file mode 100644 index 00000000..79854cae --- /dev/null +++ b/wnfs-common/src/storable.rs @@ -0,0 +1,197 @@ +//! Defines the [`Storable`] trait, which defines the `.load` and `.store` functions +//! that are implemented for most WNFS structures, such as `PublicFile`, `PublicDirectory`, +//! `PublicNode`, `HamtForest` etc. +use crate::{ + utils::{Arc, CondSync}, + BlockStore, +}; +use anyhow::{bail, Result}; +use async_once_cell::OnceCell; +use async_trait::async_trait; +use bytes::Bytes; +use libipld::{cbor::DagCborCodec, Cid}; +use serde::{de::DeserializeOwned, Serialize}; + +//-------------------------------------------------------------------------------------------------- +// Macros +//-------------------------------------------------------------------------------------------------- + +#[macro_export] +macro_rules! impl_storable_from_serde { + ( $( $ty:ty $( : < $( $generics:ident ),+ > )? ),+ ) => { + $( + #[cfg_attr(not(target_arch = "wasm32"), ::async_trait::async_trait)] + #[cfg_attr(target_arch = "wasm32", ::async_trait::async_trait(?Send))] + impl $( < $( $generics ),+ > )? $crate::Storable for $ty $( where $( $generics: ::serde::Serialize + ::serde::de::DeserializeOwned + Clone + $crate::utils::CondSync ),+ )?{ + type Serializable = $ty; + + async fn to_serializable(&self, _store: &impl $crate::BlockStore) -> ::anyhow::Result { + Ok(self.clone()) + } + + async fn from_serializable(_cid: Option<&$crate::libipld::Cid>, serializable: Self::Serializable) -> ::anyhow::Result { + Ok(serializable) + } + } + )+ + }; +} + +pub use impl_storable_from_serde; + +//-------------------------------------------------------------------------------------------------- +// Type Definitions +//-------------------------------------------------------------------------------------------------- + +/// The trait that defines how to store something in a blockstore. +/// +/// This works via a two-tiered system, where the actual in-memory representation +/// (the struct that implements this trait) is not the same as the at-rest +/// representation of itself. +/// The at-rest representation is given by the `Serializable` associated type. +/// +/// Commonly, the `Serializable` type implements serde's `Serialize` and `Deserialize` +/// traits and thus can automatically be used without having to implement `StoreIpld` +/// and `LoadIpld` yourself. In that case, the default implementation will use +/// `serde_ipld_dagcbor`. +/// +/// This trait also optionally supports memoizing serialization via the `persisted_as` function. +/// You can add a field `persisted_as: OnceCell` to your in-memory representation and +/// return it in the `persisted_as` function and any `store` calls will automatically populate +/// that cache. +/// If you do so, remember to initialize the `OnceCell` if a `Cid` is passed in the +/// `from_serializable` call, such that a `store` call right after a `load` call is practically +/// free. +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +pub trait Storable: Sized { + /// The at-rest representation of this storable type. + type Serializable: StoreIpld + LoadIpld + CondSync; + + /// Turn the current type into the at-rest representation of this type. + async fn to_serializable(&self, store: &impl BlockStore) -> Result; + + /// Take an at-rest representation of this type and turn it into the in-memory representation. + /// You can use the `cid` parameter to populate a cache. + async fn from_serializable(cid: Option<&Cid>, serializable: Self::Serializable) + -> Result; + + /// Return a serialization cache, if it exists. + /// By default, this always returns `None`. + fn persisted_as(&self) -> Option<&OnceCell> { + None + } + + /// Store this data type in a given `BlockStore`. + /// + /// This will short-circuit by using the `persisted_as` once-cell, if available. + async fn store(&self, store: &impl BlockStore) -> Result { + let store_future = async { + let (bytes, codec) = self.to_serializable(store).await?.encode_ipld()?; + store.put_block(bytes, codec).await + }; + + if let Some(persisted_as) = self.persisted_as() { + persisted_as.get_or_try_init(store_future).await.cloned() + } else { + store_future.await + } + } + + /// Try to load a value of this type from a CID. + /// + /// This will pass on the CID to the `from_serializable` function so it can + /// populate a cache in some cases. + async fn load(cid: &Cid, store: &impl BlockStore) -> Result { + let bytes = store.get_block(cid).await?; + let serializable = Self::Serializable::decode_ipld(cid, bytes)?; + Self::from_serializable(Some(cid), serializable).await + } +} + +pub trait StoreIpld { + fn encode_ipld(&self) -> Result<(Bytes, u64)>; +} + +pub trait LoadIpld: Sized { + fn decode_ipld(cid: &Cid, bytes: Bytes) -> Result; +} + +impl StoreIpld for T { + fn encode_ipld(&self) -> Result<(Bytes, u64)> { + let bytes = serde_ipld_dagcbor::to_vec(self)?; + Ok((bytes.into(), DagCborCodec.into())) + } +} + +impl LoadIpld for T { + fn decode_ipld(cid: &Cid, bytes: Bytes) -> Result { + let codec = cid.codec(); + let dag_cbor: u64 = DagCborCodec.into(); + if codec != dag_cbor { + bail!("Expected dag-cbor codec, but got {codec:X} in CID {cid}"); + } + Ok(serde_ipld_dagcbor::from_slice(bytes.as_ref())?) + } +} + +//-------------------------------------------------------------------------------------------------- +// Implementations +//-------------------------------------------------------------------------------------------------- + +// We need to choose *one* blanket implementation, and unfortunately +// you can't `impl Storable for Arc` outside of this module, +// because that'd be an orphan instance. So instead we're providing a +// macro and implement the `Arc` instance generically here. + +// #[cfg_attr(not(target_arch = "wasm32"), async_trait)] +// #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +// impl Storable for T { +// type Serializable = T; + +// async fn to_serializable(&self, _store: &impl BlockStore) -> Result { +// Ok(self.clone()) +// } + +// async fn from_serializable(serializable: Self::Serializable) -> Result { +// Ok(serializable) +// } +// } + +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl Storable for Arc { + type Serializable = T::Serializable; + + async fn to_serializable(&self, store: &impl BlockStore) -> Result { + self.as_ref().to_serializable(store).await + } + + async fn from_serializable( + cid: Option<&Cid>, + serializable: Self::Serializable, + ) -> Result { + Ok(Arc::new(T::from_serializable(cid, serializable).await?)) + } + + fn persisted_as(&self) -> Option<&OnceCell> { + self.as_ref().persisted_as() + } +} + +impl_storable_from_serde! { [u8; 0], [u8; 1], [u8; 2], [u8; 4], [u8; 8], [u8; 16], [u8; 32] } +impl_storable_from_serde! { usize, u128, u64, u32, u16, u8, isize, i128, i64, i32, i16, i8 } +impl_storable_from_serde! { String } +impl_storable_from_serde! { + (A,): , + (A, B): , + (A, B, C): , + (A, B, C, D): , + (A, B, C, D, E): , + (A, B, C, D, E, F): , + (A, B, C, D, E, F, G): , + (A, B, C, D, E, F, G, H): , + (A, B, C, D, E, F, G, H, I): , + (A, B, C, D, E, F, G, H, I, J): , + (A, B, C, D, E, F, G, H, I, J, K): +} diff --git a/wnfs-hamt/CHANGELOG.md b/wnfs-hamt/CHANGELOG.md index 074bb1e7..8b9ed3c2 100644 --- a/wnfs-hamt/CHANGELOG.md +++ b/wnfs-hamt/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## Unreleased + +* Remove `TryFrom` instances for `Hamt`, `Node` and `Pointer` and instead refactor handling of serialization internally. + ## 0.1.25 (2023-09-04) * Fixed a bug causing dropped updates when doing serialization, then continuing writes, then serializing again and loading from that serialized state [#348](https://github.com/wnfs-wg/rs-wnfs/pull/348) diff --git a/wnfs-hamt/Cargo.toml b/wnfs-hamt/Cargo.toml index b8f68f2f..94458c41 100644 --- a/wnfs-hamt/Cargo.toml +++ b/wnfs-hamt/Cargo.toml @@ -36,6 +36,9 @@ rand_core = "0.6" semver = { version = "1.0", features = ["serde"] } serde = { version = "1.0", features = ["rc"] } serde-byte-array = "0.1.2" +serde_bytes = "0.11.12" +serde_ipld_dagcbor = "0.4.2" +testresult = "0.3.0" thiserror = "1.0" wnfs-common = { path = "../wnfs-common", version = "=0.1.25" } diff --git a/wnfs-hamt/src/diff.rs b/wnfs-hamt/src/diff.rs index fa39f41a..42fa77f4 100644 --- a/wnfs-hamt/src/diff.rs +++ b/wnfs-hamt/src/diff.rs @@ -2,11 +2,11 @@ use super::HashNibbles; use crate::{Hasher, Node, Pair, Pointer, HAMT_BITMASK_BIT_SIZE}; use anyhow::{Ok, Result}; use async_recursion::async_recursion; -use serde::de::DeserializeOwned; +use serde::{de::DeserializeOwned, Serialize}; use std::{collections::HashMap, hash::Hash, mem}; use wnfs_common::{ utils::{Arc, CondSync}, - BlockStore, Link, + BlockStore, Link, Storable, }; //-------------------------------------------------------------------------------------------------- @@ -30,6 +30,27 @@ pub struct KeyValueChange { pub value2: Option, } +//-------------------------------------------------------------------------------------------------- +// Implementations +//-------------------------------------------------------------------------------------------------- + +impl KeyValueChange { + pub fn map(self, f: &impl Fn(V) -> W) -> KeyValueChange { + let Self { + r#type, + key, + value1, + value2, + } = self; + KeyValueChange { + r#type, + key, + value1: value1.map(f), + value2: value2.map(f), + } + } +} + //-------------------------------------------------------------------------------------------------- // Functions //-------------------------------------------------------------------------------------------------- @@ -77,31 +98,35 @@ pub struct KeyValueChange { /// println!("Changes {:#?}", changes); /// } /// ``` -pub async fn diff( +pub async fn diff( main_link: Link>>, other_link: Link>>, store: &impl BlockStore, ) -> Result>> where - K: DeserializeOwned + Clone + Eq + Hash + AsRef<[u8]>, - V: DeserializeOwned + Clone + Eq, - H: Hasher + Clone + 'static, + K: Storable + Clone + Eq + Hash + AsRef<[u8]> + CondSync, + V: Storable + Clone + Eq + CondSync, + K::Serializable: Serialize + DeserializeOwned, + V::Serializable: Serialize + DeserializeOwned, + H: Hasher + CondSync, { diff_helper(main_link, other_link, 1, store).await } #[cfg_attr(not(target_arch = "wasm32"), async_recursion)] #[cfg_attr(target_arch = "wasm32", async_recursion(?Send))] -pub async fn diff_helper( +pub async fn diff_helper( main_link: Link>>, other_link: Link>>, depth: usize, store: &impl BlockStore, ) -> Result>> where - K: DeserializeOwned + Clone + Eq + Hash + AsRef<[u8]>, - V: DeserializeOwned + Clone + Eq, - H: Hasher + Clone + 'static, + K: Storable + Clone + Eq + Hash + AsRef<[u8]> + CondSync, + V: Storable + Clone + Eq + CondSync, + K::Serializable: Serialize + DeserializeOwned, + V::Serializable: Serialize + DeserializeOwned, + H: Hasher + CondSync, { // If Cids are available, check to see if they are equal so we can skip further comparisons. if let (Some(cid), Some(cid2)) = (main_link.get_cid(), other_link.get_cid()) { @@ -167,15 +192,17 @@ where Ok(changes) } -async fn generate_add_or_remove_changes( +async fn generate_add_or_remove_changes( node_pointer: &Pointer, r#type: ChangeType, store: &impl BlockStore, ) -> Result>> where - K: DeserializeOwned + Clone + Eq + Hash + AsRef<[u8]>, - V: DeserializeOwned + Clone + Eq, - H: Hasher + Clone + 'static, + K: Storable + Clone + Eq + Hash + AsRef<[u8]> + CondSync, + V: Storable + Clone + Eq + CondSync, + K::Serializable: Serialize + DeserializeOwned, + V::Serializable: Serialize + DeserializeOwned, + H: Hasher + CondSync, { match node_pointer { Pointer::Values(values) => Ok(values @@ -206,16 +233,18 @@ where } } -async fn pointers_diff( +async fn pointers_diff( main_pointer: Pointer, other_pointer: Pointer, depth: usize, store: &impl BlockStore, ) -> Result>> where - K: DeserializeOwned + Clone + Eq + Hash + AsRef<[u8]>, - V: DeserializeOwned + Clone + Eq, - H: Hasher + Clone + 'static, + K: Storable + Clone + Eq + Hash + AsRef<[u8]> + CondSync, + V: Storable + Clone + Eq + CondSync, + K::Serializable: Serialize + DeserializeOwned, + V::Serializable: Serialize + DeserializeOwned, + H: Hasher + CondSync, { match (main_pointer, other_pointer) { (Pointer::Link(main_link), Pointer::Link(other_link)) => { @@ -283,9 +312,11 @@ async fn create_node_from_pairs( store: &impl BlockStore, ) -> Result>> where - K: DeserializeOwned + Clone + AsRef<[u8]> + CondSync, - V: DeserializeOwned + Clone + CondSync, - H: Hasher + Clone + 'static + CondSync, + K: Storable + Clone + Eq + Hash + AsRef<[u8]> + CondSync, + V: Storable + Clone + Eq + CondSync, + K::Serializable: Serialize + DeserializeOwned, + V::Serializable: Serialize + DeserializeOwned, + H: Hasher + CondSync, { let mut node = Arc::new(Node::<_, _, H>::default()); for Pair { key, value } in values { diff --git a/wnfs-hamt/src/hamt.rs b/wnfs-hamt/src/hamt.rs index 668e2f94..19980295 100644 --- a/wnfs-hamt/src/hamt.rs +++ b/wnfs-hamt/src/hamt.rs @@ -1,18 +1,14 @@ use super::{KeyValueChange, Node, HAMT_VERSION}; -use crate::Hasher; +use crate::{serializable::HamtSerializable, Hasher}; use anyhow::Result; use async_trait::async_trait; -use libipld::{serde as ipld_serde, Ipld}; +use libipld::Cid; use semver::Version; -use serde::{ - de::{DeserializeOwned, Error as DeError}, - ser::Error as SerError, - Deserialize, Deserializer, Serialize, Serializer, -}; -use std::{collections::BTreeMap, hash::Hash, str::FromStr}; +use serde::{de::DeserializeOwned, Serialize}; +use std::hash::Hash; use wnfs_common::{ - utils::{Arc, CondSend, CondSync}, - AsyncSerialize, BlockStore, Link, + utils::{Arc, CondSync}, + BlockStore, Link, Storable, }; //-------------------------------------------------------------------------------------------------- @@ -119,9 +115,10 @@ impl Hamt { store: &impl BlockStore, ) -> Result>> where - K: DeserializeOwned + Clone + Eq + Hash + AsRef<[u8]>, - V: DeserializeOwned + Clone + Eq, - H: Clone + 'static, + K: Storable + Clone + Eq + Hash + AsRef<[u8]>, + V: Storable + Clone + Eq, + K::Serializable: Serialize + DeserializeOwned, + V::Serializable: Serialize + DeserializeOwned, { super::diff( Link::from(Arc::clone(&self.root)), @@ -130,76 +127,36 @@ impl Hamt { ) .await } - - async fn to_ipld(&self, store: &B) -> Result - where - K: Serialize, - V: Serialize, - { - Ok(Ipld::Map(BTreeMap::from([ - ("root".into(), self.root.to_ipld(store).await?), - ("version".into(), ipld_serde::to_ipld(&self.version)?), - ("structure".into(), ipld_serde::to_ipld("hamt")?), - ]))) - } } #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] -impl AsyncSerialize for Hamt -where - K: Serialize + CondSync, - V: Serialize + CondSync, -{ - async fn async_serialize(&self, serializer: S, store: &B) -> Result - where - S: Serializer + CondSend, - B: BlockStore + ?Sized, - { - self.to_ipld(store) - .await - .map_err(SerError::custom)? - .serialize(serializer) - } -} - -impl<'de, K, V> Deserialize<'de> for Hamt +impl Storable for Hamt where - K: DeserializeOwned + CondSync, - V: DeserializeOwned + CondSync, + K: Storable + CondSync, + V: Storable + CondSync, + K::Serializable: Serialize + DeserializeOwned, + V::Serializable: Serialize + DeserializeOwned, + H: Hasher + CondSync, { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - Ipld::deserialize(deserializer).and_then(|ipld| ipld.try_into().map_err(DeError::custom)) + type Serializable = HamtSerializable; + + async fn to_serializable(&self, store: &impl BlockStore) -> Result { + Ok(HamtSerializable { + root: self.root.to_serializable(store).await?, + version: self.version.clone(), + structure: "hamt".to_string(), + }) } -} -impl TryFrom for Hamt -where - K: DeserializeOwned + CondSync, - V: DeserializeOwned + CondSync, -{ - type Error = String; - - fn try_from(ipld: Ipld) -> Result { - match ipld { - Ipld::Map(mut map) => { - let root = Arc::new( - Node::::deserialize(map.remove("root").ok_or("Missing root")?) - .map_err(|e| e.to_string())?, - ); - - let version = match map.get("version").ok_or("Missing version")? { - Ipld::String(v) => Version::from_str(v).map_err(|e| e.to_string())?, - _ => return Err("`version` is not a string".into()), - }; - - Ok(Self { root, version }) - } - other => Err(format!("Expected `Ipld::Map`, got {other:#?}")), - } + async fn from_serializable( + _cid: Option<&Cid>, + serializable: Self::Serializable, + ) -> Result { + Ok(Self { + root: Arc::new(Node::from_serializable(None, serializable.root).await?), + version: serializable.version, + }) } } @@ -211,8 +168,10 @@ impl Default for Hamt { impl PartialEq for Hamt where - K: PartialEq, - V: PartialEq, + K: Storable + PartialEq + CondSync, + V: Storable + PartialEq + CondSync, + K::Serializable: Serialize + DeserializeOwned, + V::Serializable: Serialize + DeserializeOwned, H: Hasher + CondSync, { fn eq(&self, other: &Self) -> bool { @@ -227,8 +186,7 @@ where #[cfg(test)] mod tests { use super::*; - use libipld::cbor::DagCborCodec; - use wnfs_common::{async_encode, decode, MemoryBlockStore}; + use wnfs_common::MemoryBlockStore; #[async_std::test] async fn hamt_can_encode_decode_as_cbor() { @@ -236,8 +194,8 @@ mod tests { let root = Arc::new(Node::default()); let hamt: Hamt = Hamt::with_root(root); - let encoded_hamt = async_encode(&hamt, store, DagCborCodec).await.unwrap(); - let decoded_hamt: Hamt = decode(encoded_hamt.as_ref(), DagCborCodec).unwrap(); + let hamt_cid = hamt.store(store).await.unwrap(); + let decoded_hamt = Hamt::load(&hamt_cid, store).await.unwrap(); assert_eq!(hamt, decoded_hamt); } @@ -259,7 +217,7 @@ mod snapshot_tests { } let hamt = Hamt::with_root(Arc::clone(node)); - let cid = store.put_async_serializable(&hamt).await.unwrap(); + let cid = hamt.store(store).await.unwrap(); let hamt = store.get_block_snapshot(&cid).await.unwrap(); insta::assert_json_snapshot!(hamt); diff --git a/wnfs-hamt/src/lib.rs b/wnfs-hamt/src/lib.rs index 05f86e4b..49657b0a 100644 --- a/wnfs-hamt/src/lib.rs +++ b/wnfs-hamt/src/lib.rs @@ -11,7 +11,7 @@ //! //! The implementation is based on [fvm_ipld_hamt](https://github.com/filecoin-project/ref-fvm/tree/master/ipld/hamt) with some modifications for async blockstore access and immutability-by-default. -mod constants; +pub mod constants; mod diff; mod error; mod hamt; @@ -19,6 +19,7 @@ mod hash; mod merge; mod node; mod pointer; +pub mod serializable; pub(crate) use constants::*; pub use diff::*; diff --git a/wnfs-hamt/src/merge.rs b/wnfs-hamt/src/merge.rs index 318e0f41..748a0af9 100644 --- a/wnfs-hamt/src/merge.rs +++ b/wnfs-hamt/src/merge.rs @@ -1,11 +1,11 @@ use super::{ChangeType, Node}; use crate::{error::HamtError, Hasher}; use anyhow::Result; -use serde::de::DeserializeOwned; +use serde::{de::DeserializeOwned, Serialize}; use std::hash::Hash; use wnfs_common::{ utils::{Arc, CondSync}, - BlockStore, Link, + BlockStore, Link, Storable, }; //-------------------------------------------------------------------------------------------------- @@ -21,9 +21,11 @@ pub async fn merge( ) -> Result>> where F: Fn(&V, &V) -> Result, - K: DeserializeOwned + Eq + Clone + Hash + AsRef<[u8]>, - V: DeserializeOwned + Eq + Clone, - H: Hasher + CondSync + Clone + 'static, + K: Storable + Eq + Clone + Hash + AsRef<[u8]>, + V: Storable + Eq + Clone, + K::Serializable: Serialize + DeserializeOwned, + V::Serializable: Serialize + DeserializeOwned, + H: Hasher + CondSync, { let kv_changes = super::diff(main_link.clone(), other_link.clone(), store).await?; diff --git a/wnfs-hamt/src/node.rs b/wnfs-hamt/src/node.rs index 6accac30..7624be6e 100644 --- a/wnfs-hamt/src/node.rs +++ b/wnfs-hamt/src/node.rs @@ -3,21 +3,17 @@ use super::{ hash::{HashNibbles, Hasher}, HashPrefix, Pair, Pointer, HAMT_BITMASK_BIT_SIZE, HAMT_BITMASK_BYTE_SIZE, }; -use crate::HAMT_VALUES_BUCKET_SIZE; +use crate::{serializable::NodeSerializable, HAMT_VALUES_BUCKET_SIZE}; use anyhow::{bail, Result}; use async_once_cell::OnceCell; use async_recursion::async_recursion; use async_trait::async_trait; use bitvec::array::BitArray; use either::{Either, Either::*}; -use libipld::{serde as ipld_serde, Cid, Ipld}; +use libipld::Cid; #[cfg(feature = "log")] use log::debug; -use serde::{ - de::{Deserialize, DeserializeOwned}, - ser::Error as SerError, - Deserializer, Serialize, Serializer, -}; +use serde::{de::DeserializeOwned, Serialize}; use serde_byte_array::ByteArray; use std::{ collections::HashMap, @@ -27,7 +23,7 @@ use std::{ }; use wnfs_common::{ utils::{Arc, BoxFuture, CondSend, CondSync}, - AsyncSerialize, BlockStore, HashOutput, Link, RemembersCid, + BlockStore, HashOutput, Link, Storable, }; //-------------------------------------------------------------------------------------------------- @@ -69,7 +65,7 @@ where impl Node where - H: Hasher + 'static + CondSync, + H: Hasher + CondSync, K: CondSync, V: CondSync, { @@ -93,8 +89,10 @@ where /// ``` pub async fn set(self: &mut Arc, key: K, value: V, store: &impl BlockStore) -> Result<()> where - K: DeserializeOwned + Clone + AsRef<[u8]>, - V: DeserializeOwned + Clone, + K: Storable + AsRef<[u8]> + Clone, + V: Storable + Clone, + K::Serializable: Serialize + DeserializeOwned, + V::Serializable: Serialize + DeserializeOwned, { let hash = &H::hash(&key); @@ -125,8 +123,10 @@ where /// ``` pub async fn get<'a>(&'a self, key: &K, store: &impl BlockStore) -> Result> where - K: DeserializeOwned + AsRef<[u8]>, - V: DeserializeOwned, + K: Storable + AsRef<[u8]>, + V: Storable, + K::Serializable: Serialize + DeserializeOwned, + V::Serializable: Serialize + DeserializeOwned, { let hash = &H::hash(key); @@ -169,8 +169,10 @@ where store: &'a impl BlockStore, ) -> Result> where - K: DeserializeOwned + AsRef<[u8]> + Clone, - V: DeserializeOwned + Clone, + K: Storable + AsRef<[u8]> + Clone, + V: Storable + Clone, + K::Serializable: Serialize + DeserializeOwned, + V::Serializable: Serialize + DeserializeOwned, { let hash = &H::hash(key); @@ -211,8 +213,10 @@ where store: &impl BlockStore, ) -> Result>> where - K: DeserializeOwned + Clone + AsRef<[u8]>, - V: DeserializeOwned + Clone, + K: Storable + AsRef<[u8]> + Clone, + V: Storable + Clone, + K::Serializable: Serialize + DeserializeOwned, + V::Serializable: Serialize + DeserializeOwned, { let hash = &H::hash(key); @@ -248,8 +252,10 @@ where store: &impl BlockStore, ) -> Result> where - K: DeserializeOwned + AsRef<[u8]>, - V: DeserializeOwned, + K: Storable + AsRef<[u8]>, + V: Storable, + K::Serializable: Serialize + DeserializeOwned, + V::Serializable: Serialize + DeserializeOwned, { #[cfg(feature = "log")] debug!("get_by_hash: hash = {:02x?}", hash); @@ -290,8 +296,10 @@ where store: &impl BlockStore, ) -> Result>> where - K: DeserializeOwned + Clone + AsRef<[u8]>, - V: DeserializeOwned + Clone, + K: Storable + AsRef<[u8]> + Clone, + V: Storable + Clone, + K::Serializable: Serialize + DeserializeOwned, + V::Serializable: Serialize + DeserializeOwned, { self.remove_value(&mut HashNibbles::new(hash), store).await } @@ -342,9 +350,10 @@ where store: &'a impl BlockStore, ) -> BoxFuture<'a, Result<()>> where - K: DeserializeOwned + Clone + AsRef<[u8]> + 'a, - V: DeserializeOwned + Clone + 'a, - H: 'a, + K: Storable + Clone + AsRef<[u8]> + 'a, + V: Storable + Clone + 'a, + K::Serializable: Serialize + DeserializeOwned, + V::Serializable: Serialize + DeserializeOwned, { Box::pin(async move { let bit_index = hashnibbles.try_next()?; @@ -424,8 +433,10 @@ where store: &impl BlockStore, ) -> Result>> where - K: DeserializeOwned + AsRef<[u8]>, - V: DeserializeOwned, + K: Storable + AsRef<[u8]>, + V: Storable, + K::Serializable: Serialize + DeserializeOwned, + V::Serializable: Serialize + DeserializeOwned, { let bit_index = hashnibbles.try_next()?; @@ -456,8 +467,10 @@ where store: &'a impl BlockStore, ) -> Result>> where - K: DeserializeOwned + AsRef<[u8]> + Clone, - V: DeserializeOwned + Clone, + K: Storable + AsRef<[u8]> + Clone, + V: Storable + Clone, + K::Serializable: Serialize + DeserializeOwned, + V::Serializable: Serialize + DeserializeOwned, { let bit_index = hashnibbles.try_next()?; @@ -483,18 +496,16 @@ where } } - // It's internal and is only more complex because async_recursion doesn't work here - #[allow(clippy::type_complexity)] - pub fn remove_value<'k, 'v, 'a>( + pub fn remove_value<'a>( self: &'a mut Arc, hashnibbles: &'a mut HashNibbles, store: &'a impl BlockStore, ) -> BoxFuture<'a, Result>>> where - K: DeserializeOwned + Clone + AsRef<[u8]> + 'k, - V: DeserializeOwned + Clone + 'v, - 'k: 'a, - 'v: 'a, + K: Storable + AsRef<[u8]> + Clone + 'a, + V: Storable + Clone + 'a, + K::Serializable: Serialize + DeserializeOwned, + V::Serializable: Serialize + DeserializeOwned, { Box::pin(async move { let bit_index = hashnibbles.try_next()?; @@ -593,12 +604,13 @@ where /// ``` #[cfg_attr(not(target_arch = "wasm32"), async_recursion)] #[cfg_attr(target_arch = "wasm32", async_recursion(?Send))] - pub async fn flat_map(&self, f: &F, store: &B) -> Result> + pub async fn flat_map(&self, f: &F, store: &impl BlockStore) -> Result> where - B: BlockStore, F: Fn(&Pair) -> Result + CondSync, - K: DeserializeOwned, - V: DeserializeOwned, + K: Storable + AsRef<[u8]>, + V: Storable, + K::Serializable: Serialize + DeserializeOwned, + V::Serializable: Serialize + DeserializeOwned, T: CondSend, { let mut items = >::new(); @@ -649,31 +661,33 @@ where /// ``` #[cfg_attr(not(target_arch = "wasm32"), async_recursion)] #[cfg_attr(target_arch = "wasm32", async_recursion(?Send))] - pub async fn get_node_at<'a, B>( + pub async fn get_node_at<'a>( &'a self, hashprefix: &HashPrefix, - store: &B, + store: &impl BlockStore, ) -> Result, &'a Arc>>> where - K: DeserializeOwned + AsRef<[u8]>, - V: DeserializeOwned, - B: BlockStore, + K: Storable + AsRef<[u8]>, + V: Storable, + K::Serializable: Serialize + DeserializeOwned, + V::Serializable: Serialize + DeserializeOwned, { self.get_node_at_helper(hashprefix, 0, store).await } #[cfg_attr(not(target_arch = "wasm32"), async_recursion)] #[cfg_attr(target_arch = "wasm32", async_recursion(?Send))] - async fn get_node_at_helper<'a, B>( + async fn get_node_at_helper<'a>( &'a self, hashprefix: &HashPrefix, index: u8, - store: &B, + store: &impl BlockStore, ) -> Result, &'a Arc>>> where - K: DeserializeOwned + AsRef<[u8]>, - V: DeserializeOwned, - B: BlockStore, + K: Storable + AsRef<[u8]>, + V: Storable, + K::Serializable: Serialize + DeserializeOwned, + V::Serializable: Serialize + DeserializeOwned, { let bit_index = hashprefix .get(index) @@ -730,8 +744,10 @@ where /// ``` pub async fn to_hashmap(&self, store: &B) -> Result> where - K: DeserializeOwned + Clone + Eq + Hash, - V: DeserializeOwned + Clone, + K: Storable + AsRef<[u8]> + Clone + Eq + Hash, + V: Storable + Clone, + K::Serializable: Serialize + DeserializeOwned, + V::Serializable: Serialize + DeserializeOwned, { let mut map = HashMap::new(); let key_values = self @@ -747,9 +763,7 @@ where Ok(map) } -} -impl Node { /// Returns the count of the values in all the values pointer of a node. pub fn count_values(self: &Arc) -> Result { let mut len = 0; @@ -763,25 +777,6 @@ impl Node { Ok(len) } - - // TODO(appcypher): Do we really need this? Why not use PublicDirectorySerializable style instead. - /// Converts a Node to an IPLD object. - pub async fn to_ipld(&self, store: &B) -> Result - where - K: Serialize + CondSync, - V: Serialize + CondSync, - { - let bitmask_ipld = ipld_serde::to_ipld(ByteArray::from(self.bitmask.into_inner()))?; - let pointers_ipld = { - let mut tmp = Vec::with_capacity(self.pointers.len()); - for pointer in self.pointers.iter() { - tmp.push(pointer.to_ipld(store).await?); - } - Ipld::List(tmp) - }; - - Ok(Ipld::List(vec![bitmask_ipld, pointers_ipld])) - } } impl Clone for Node { @@ -800,12 +795,6 @@ impl Clone for N } } -impl RemembersCid for Node { - fn persisted_as(&self) -> &OnceCell { - &self.persisted_as - } -} - impl Default for Node { fn default() -> Self { Node { @@ -817,67 +806,12 @@ impl Default for Node { } } -#[cfg_attr(not(target_arch = "wasm32"), async_trait)] -#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] -impl AsyncSerialize for Node -where - K: Serialize + CondSync, - V: Serialize + CondSync, - H: Hasher + CondSync, -{ - async fn async_serialize(&self, serializer: S, store: &B) -> Result - where - S: Serializer + CondSend, - B: BlockStore + ?Sized, - { - self.to_ipld(store) - .await - .map_err(SerError::custom)? - .serialize(serializer) - } -} - -impl<'de, K, V, H> Deserialize<'de> for Node -where - K: DeserializeOwned + CondSync, - V: DeserializeOwned + CondSync, - H: Hasher + CondSync, -{ - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let (bytes, pointers): (ByteArray<2>, Vec>) = - Deserialize::deserialize(deserializer)?; - - let bitmask = BitArray::::from(bytes.into_array()); - if bitmask.len() != HAMT_BITMASK_BIT_SIZE { - return Err(serde::de::Error::custom(format!( - "invalid bitmask length, expected {HAMT_BITMASK_BIT_SIZE}, but got {}", - bitmask.len() - ))); - } - let bitmask_bits_set = bitmask.count_ones(); - if pointers.len() != bitmask_bits_set { - return Err(serde::de::Error::custom(format!( - "pointers length does not match bitmask, bitmask bits set: {}, pointers length: {}", - bitmask_bits_set, - pointers.len() - ))); - } - Ok(Node { - persisted_as: OnceCell::new(), - bitmask, - pointers, - hasher: PhantomData, - }) - } -} - impl PartialEq for Node where - K: PartialEq + CondSync, - V: PartialEq + CondSync, + K: Storable + PartialEq + CondSync, + V: Storable + PartialEq + CondSync, + K::Serializable: Serialize + DeserializeOwned, + V::Serializable: Serialize + DeserializeOwned, H: Hasher + CondSync, { fn eq(&self, other: &Self) -> bool { @@ -904,6 +838,64 @@ where } } +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl Storable for Node +where + K: Storable + CondSync, + V: Storable + CondSync, + K::Serializable: Serialize + DeserializeOwned, + V::Serializable: Serialize + DeserializeOwned, + H: Hasher + CondSync, +{ + type Serializable = NodeSerializable; + + async fn to_serializable(&self, store: &impl BlockStore) -> Result { + let bitmask = ByteArray::from(self.bitmask.into_inner()); + + let mut pointers = Vec::with_capacity(self.pointers.len()); + for pointer in self.pointers.iter() { + pointers.push(pointer.to_serializable(store).await?); + } + + Ok(NodeSerializable(bitmask, pointers)) + } + + async fn from_serializable( + cid: Option<&Cid>, + serializable: Self::Serializable, + ) -> Result { + let NodeSerializable(bitmask, ser_pointers) = serializable; + + let bitmask = BitArray::::new(bitmask.into()); + let bitmask_bits_set = bitmask.count_ones(); + + if ser_pointers.len() != bitmask_bits_set { + bail!( + "pointers length does not match bitmask, bitmask bits set: {}, pointers length: {}", + bitmask_bits_set, + ser_pointers.len() + ); + } + + let mut pointers = Vec::with_capacity(ser_pointers.len()); + for ser_pointer in ser_pointers { + pointers.push(Pointer::from_serializable(cid, ser_pointer).await?); + } + + Ok(Self { + persisted_as: cid.cloned().map(OnceCell::new_with).unwrap_or_default(), + bitmask, + pointers, + hasher: PhantomData, + }) + } + + fn persisted_as(&self) -> Option<&OnceCell> { + Some(&self.persisted_as) + } +} + //-------------------------------------------------------------------------------------------------- // Tests //-------------------------------------------------------------------------------------------------- @@ -1143,8 +1135,8 @@ mod tests { node2.set("key 68".into(), 870, store).await.unwrap(); node2.remove(&"key 17".into(), store).await.unwrap(); - let cid1 = store.put_async_serializable(node1).await.unwrap(); - let cid2 = store.put_async_serializable(node2).await.unwrap(); + let cid1 = node1.store(store).await.unwrap(); + let cid2 = node2.store(store).await.unwrap(); assert_eq!(cid1, cid2); } @@ -1219,10 +1211,9 @@ mod proptests { use crate::strategies::{ node_from_operations, operations, operations_and_shuffled, Operations, }; - use libipld::cbor::DagCborCodec; use proptest::prelude::*; use test_strategy::proptest; - use wnfs_common::{async_encode, decode, MemoryBlockStore}; + use wnfs_common::MemoryBlockStore; fn small_key() -> impl Strategy { (0..1000).prop_map(|i| format!("key {i}")) @@ -1242,10 +1233,10 @@ mod proptests { let node = &mut node_from_operations(&operations, store).await.unwrap(); node.set(key.clone(), value, store).await.unwrap(); - let cid1 = store.put_async_serializable(node).await.unwrap(); + let cid1 = node.store(store).await.unwrap(); node.set(key, value, store).await.unwrap(); - let cid2 = store.put_async_serializable(node).await.unwrap(); + let cid2 = node.store(store).await.unwrap(); assert_eq!(cid1, cid2); }) @@ -1264,10 +1255,10 @@ mod proptests { let node = &mut node_from_operations(&operations, store).await.unwrap(); node.remove(&key, store).await.unwrap(); - let cid1 = store.put_async_serializable(node).await.unwrap(); + let cid1 = node.store(store).await.unwrap(); node.remove(&key, store).await.unwrap(); - let cid2 = store.put_async_serializable(node).await.unwrap(); + let cid2 = node.store(store).await.unwrap(); assert_eq!(cid1, cid2); }) @@ -1284,9 +1275,8 @@ mod proptests { let store = &MemoryBlockStore::default(); let node = node_from_operations(&operations, store).await.unwrap(); - let encoded_node = async_encode(&node, store, DagCborCodec).await.unwrap(); - let decoded_node: Node = - decode(encoded_node.as_ref(), DagCborCodec).unwrap(); + let node_cid = node.store(store).await.unwrap(); + let decoded_node = Node::::load(&node_cid, store).await.unwrap(); assert_eq!(*node, decoded_node); }) @@ -1307,8 +1297,8 @@ mod proptests { let node1 = node_from_operations(&original, store).await.unwrap(); let node2 = node_from_operations(&shuffled, store).await.unwrap(); - let cid1 = store.put_async_serializable(&node1).await.unwrap(); - let cid2 = store.put_async_serializable(&node2).await.unwrap(); + let cid1 = node1.store(store).await.unwrap(); + let cid2 = node2.store(store).await.unwrap(); assert_eq!(cid1, cid2); }) @@ -1364,7 +1354,7 @@ mod snapshot_tests { .unwrap(); } - let cid = store.put_async_serializable(node).await.unwrap(); + let cid = node.store(store).await.unwrap(); let node = store.get_block_snapshot(&cid).await.unwrap(); insta::assert_json_snapshot!(node); diff --git a/wnfs-hamt/src/pointer.rs b/wnfs-hamt/src/pointer.rs index 2fff8c10..e115b155 100644 --- a/wnfs-hamt/src/pointer.rs +++ b/wnfs-hamt/src/pointer.rs @@ -1,16 +1,13 @@ use super::{error::HamtError, hash::Hasher, Node, HAMT_VALUES_BUCKET_SIZE}; +use crate::serializable::PointerSerializable; use anyhow::Result; use async_trait::async_trait; -use libipld::{serde as ipld_serde, Ipld}; -use serde::{ - de::{DeserializeOwned, Error as DeError}, - ser::Error as SerError, - Deserialize, Deserializer, Serialize, Serializer, -}; +use libipld::Cid; +use serde::{de::DeserializeOwned, Serialize}; use std::fmt::Debug; use wnfs_common::{ - utils::{error, Arc, CondSend, CondSync}, - AsyncSerialize, BlockStore, Link, + utils::{error, Arc, CondSync}, + BlockStore, Link, Storable, }; //-------------------------------------------------------------------------------------------------- @@ -68,8 +65,10 @@ impl Pointer { /// Converts a Link pointer to a canonical form to ensure consistent tree representation after deletes. pub async fn canonicalize(self, store: &impl BlockStore) -> Result> where - K: DeserializeOwned + Clone + AsRef<[u8]>, - V: DeserializeOwned + Clone, + K: Storable + Clone + AsRef<[u8]>, + V: Storable + Clone, + K::Serializable: Serialize + DeserializeOwned, + V::Serializable: Serialize + DeserializeOwned, H: CondSync, { match self { @@ -109,76 +108,77 @@ impl Pointer { _ => error(HamtError::NonCanonicalizablePointer), } } - - /// Converts a Pointer to an IPLD object. - pub async fn to_ipld(&self, store: &B) -> Result - where - K: Serialize, - V: Serialize, - { - Ok(match self { - Pointer::Values(values) => ipld_serde::to_ipld(values)?, - Pointer::Link(link) => ipld_serde::to_ipld(link.resolve_cid(store).await?)?, - }) - } } #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] -impl AsyncSerialize for Pointer +impl Storable for Pointer where - K: Serialize + CondSync, - V: Serialize + CondSync, - H: CondSync, + K: Storable + CondSync, + V: Storable + CondSync, + K::Serializable: Serialize + DeserializeOwned, + V::Serializable: Serialize + DeserializeOwned, + H: Hasher + CondSync, { - async fn async_serialize(&self, serializer: S, store: &B) -> Result - where - S: Serializer + CondSend, - B: BlockStore + ?Sized, - { - match self { - Pointer::Values(vals) => vals.serialize(serializer), - Pointer::Link(link) => link - .resolve_cid(store) - .await - .map_err(SerError::custom)? - .serialize(serializer), - } + type Serializable = PointerSerializable; + + async fn to_serializable(&self, store: &impl BlockStore) -> Result { + Ok(match self { + Pointer::Values(values) => { + let mut serializables = Vec::with_capacity(values.len()); + for pair in values.iter() { + serializables.push(pair.to_serializable(store).await?); + } + PointerSerializable::Values(serializables) + } + Pointer::Link(link) => { + let cid = link.resolve_cid(store).await?; + PointerSerializable::Link(cid) + } + }) } -} -impl<'de, K, V, H: Hasher + CondSync> Deserialize<'de> for Pointer -where - K: DeserializeOwned + CondSync, - V: DeserializeOwned + CondSync, -{ - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - Ipld::deserialize(deserializer).and_then(|ipld| ipld.try_into().map_err(DeError::custom)) + async fn from_serializable( + _cid: Option<&Cid>, + serializable: Self::Serializable, + ) -> Result { + Ok(match serializable { + PointerSerializable::Values(serializables) => { + let mut values = Vec::with_capacity(serializables.len()); + for serializable in serializables { + values.push(Pair::from_serializable(None, serializable).await?); + } + Self::Values(values) + } + PointerSerializable::Link(cid) => Self::Link(Link::from_cid(cid)), + }) } } -impl TryFrom for Pointer +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl Storable for Pair where - K: DeserializeOwned + CondSync, - V: DeserializeOwned + CondSync, + K: Storable + CondSync, + V: Storable + CondSync, + K::Serializable: Serialize + DeserializeOwned, + V::Serializable: Serialize + DeserializeOwned, { - type Error = String; + type Serializable = (K::Serializable, V::Serializable); - fn try_from(ipld: Ipld) -> Result { - match ipld { - ipld_list @ Ipld::List(_) => { - let values: Vec> = - Deserialize::deserialize(ipld_list).map_err(|error| error.to_string())?; - Ok(Self::Values(values)) - } - Ipld::Link(cid) => Ok(Self::Link(Link::from_cid(cid))), - other => Err(format!( - "Expected `Ipld::List` or `Ipld::Link`, got {other:?}", - )), - } + async fn to_serializable(&self, store: &impl BlockStore) -> Result { + let key = self.key.to_serializable(store).await?; + let value = self.value.to_serializable(store).await?; + Ok((key, value)) + } + + async fn from_serializable( + _cid: Option<&Cid>, + (key, value): Self::Serializable, + ) -> Result { + let key = K::from_serializable(None, key).await?; + let value = V::from_serializable(None, value).await?; + Ok(Pair { key, value }) } } @@ -210,8 +210,10 @@ impl Default for Pointer PartialEq for Pointer where - K: PartialEq + CondSync, - V: PartialEq + CondSync, + K: Storable + PartialEq + CondSync, + V: Storable + PartialEq + CondSync, + K::Serializable: Serialize + DeserializeOwned, + V::Serializable: Serialize + DeserializeOwned, { fn eq(&self, other: &Self) -> bool { match (self, other) { @@ -222,33 +224,6 @@ where } } -impl<'de, K, V> Deserialize<'de> for Pair -where - K: DeserializeOwned, - V: DeserializeOwned, -{ - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let (key, value) = <(K, V)>::deserialize(deserializer)?; - Ok(Pair { key, value }) - } -} - -impl Serialize for Pair -where - K: Serialize, - V: Serialize, -{ - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - (&self.key, &self.value).serialize(serializer) - } -} - //-------------------------------------------------------------------------------------------------- // Tests //-------------------------------------------------------------------------------------------------- @@ -256,11 +231,11 @@ where #[cfg(test)] mod tests { use super::*; - use libipld::cbor::DagCborCodec; - use wnfs_common::{async_encode, decode, MemoryBlockStore}; + use testresult::TestResult; + use wnfs_common::{MemoryBlockStore, Storable}; #[async_std::test] - async fn pointer_can_encode_decode_as_cbor() { + async fn pointer_can_encode_decode_as_cbor() -> TestResult { let store = &MemoryBlockStore::default(); let pointer: Pointer = Pointer::Values(vec![ Pair { @@ -273,21 +248,24 @@ mod tests { }, ]); - let encoded_pointer = async_encode(&pointer, store, DagCborCodec).await.unwrap(); - let decoded_pointer: Pointer = - decode(encoded_pointer.as_ref(), DagCborCodec).unwrap(); + let pointer_cid = pointer.store(store).await?; + let decoded_pointer = + Pointer::::load(&pointer_cid, store).await?; assert_eq!(pointer, decoded_pointer); + + Ok(()) } } #[cfg(test)] mod snapshot_tests { use super::*; + use testresult::TestResult; use wnfs_common::utils::SnapshotBlockStore; #[async_std::test] - async fn test_pointer() { + async fn test_pointer() -> TestResult { let store = &SnapshotBlockStore::default(); let pointer: Pointer = Pointer::Values(vec![ Pair { @@ -300,9 +278,11 @@ mod snapshot_tests { }, ]); - let cid = store.put_async_serializable(&pointer).await.unwrap(); - let ptr = store.get_block_snapshot(&cid).await.unwrap(); + let cid = pointer.store(store).await?; + let ptr = store.get_block_snapshot(&cid).await?; insta::assert_json_snapshot!(ptr); + + Ok(()) } } diff --git a/wnfs-hamt/src/serializable.rs b/wnfs-hamt/src/serializable.rs new file mode 100644 index 00000000..919b318d --- /dev/null +++ b/wnfs-hamt/src/serializable.rs @@ -0,0 +1,119 @@ +use crate::constants::HAMT_BITMASK_BYTE_SIZE; +use libipld::{ + cid::serde::{BytesToCidVisitor, CID_SERDE_PRIVATE_IDENTIFIER}, + Cid, +}; +use semver::Version; +use serde::{ + de::{SeqAccess, Visitor}, + Deserialize, Deserializer, Serialize, Serializer, +}; +use serde_byte_array::ByteArray; +use serde_bytes::ByteBuf; +use std::marker::PhantomData; + +//-------------------------------------------------------------------------------------------------- +// Type Definitions +//-------------------------------------------------------------------------------------------------- + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct HamtSerializable { + pub(crate) root: NodeSerializable, + pub(crate) version: Version, + pub(crate) structure: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct NodeSerializable( + pub(crate) ByteArray, + pub(crate) Vec>, +); + +#[derive(Debug, Clone, PartialEq)] +pub(crate) enum PointerSerializable { + Values(Vec<(K, V)>), + Link(Cid), +} + +//-------------------------------------------------------------------------------------------------- +// Implementations +//-------------------------------------------------------------------------------------------------- + +impl Serialize for PointerSerializable { + fn serialize(&self, serializer: S) -> Result { + match *self { + Self::Values(ref vec) => vec.serialize(serializer), + Self::Link(ref cid) => { + let value = ByteBuf::from(cid.to_bytes()); + serializer.serialize_newtype_struct(CID_SERDE_PRIVATE_IDENTIFIER, &value) + } + } + } +} + +impl<'de, K: Deserialize<'de>, V: Deserialize<'de>> Deserialize<'de> for PointerSerializable { + fn deserialize>(deserializer: D) -> Result { + struct PointerVisitor(PhantomData<(K, V)>); + + impl<'de, K: Deserialize<'de>, V: Deserialize<'de>> Visitor<'de> for PointerVisitor { + type Value = PointerSerializable; + + fn expecting(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + fmt, + "a valid PointerSerializable represented as CID bytes or as a sequence of tuples of keys and values" + ) + } + + fn visit_newtype_struct>( + self, + deserializer: D, + ) -> Result { + let cid = deserializer.deserialize_bytes(BytesToCidVisitor)?; + Ok(PointerSerializable::Link(cid)) + } + + fn visit_seq>(self, mut seq: A) -> Result { + let mut values = Vec::new(); + while let Some(elem) = seq.next_element::<(K, V)>()? { + values.push(elem); + } + Ok(PointerSerializable::Values(values)) + } + } + + let visitor = PointerVisitor(PhantomData); + deserializer.deserialize_any(visitor) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use testresult::TestResult; + + #[test] + fn test_pointer_link_roundtrip() -> TestResult { + let pointers = PointerSerializable::::Link(Cid::default()); + let bytes = serde_ipld_dagcbor::to_vec(&pointers)?; + + let pointers_back: PointerSerializable = + serde_ipld_dagcbor::from_slice(&bytes)?; + + assert_eq!(pointers, pointers_back); + + Ok(()) + } + + #[test] + fn test_pointer_values_roundtrip() -> TestResult { + let pointers = PointerSerializable::Values(vec![(1, 10), (2, 20), (3, 30)]); + let bytes = serde_ipld_dagcbor::to_vec(&pointers)?; + + let pointers_back: PointerSerializable = serde_ipld_dagcbor::from_slice(&bytes)?; + + assert_eq!(pointers, pointers_back); + + Ok(()) + } +} diff --git a/wnfs-hamt/src/strategies/changes.rs b/wnfs-hamt/src/strategies/changes.rs index 066191e9..cb77c1b5 100644 --- a/wnfs-hamt/src/strategies/changes.rs +++ b/wnfs-hamt/src/strategies/changes.rs @@ -3,11 +3,11 @@ use super::{operations, Operations}; use crate::Node; use anyhow::Result; use proptest::{collection::vec, strategy::Strategy}; -use serde::de::DeserializeOwned; +use serde::{de::DeserializeOwned, Serialize}; use std::{collections::HashMap, fmt::Debug}; use wnfs_common::{ utils::{Arc, CondSync}, - BlockStore, + BlockStore, Storable, }; //-------------------------------------------------------------------------------------------------- @@ -61,8 +61,10 @@ pub(crate) async fn apply_changes( store: &impl BlockStore, ) -> Result<()> where - K: Debug + Clone + AsRef<[u8]> + DeserializeOwned, - V: Debug + Clone + DeserializeOwned, + K: Storable + Debug + Clone + AsRef<[u8]>, + V: Storable + Debug + Clone, + K::Serializable: Serialize + DeserializeOwned, + V::Serializable: Serialize + DeserializeOwned, { for change in changes { match change { @@ -81,15 +83,16 @@ where Ok(()) } -pub(crate) async fn prepare_node( +pub(crate) async fn prepare_node( node: &mut Arc>, changes: &Vec>, - store: &B, + store: &impl BlockStore, ) -> Result<()> where - K: Debug + Clone + AsRef<[u8]> + DeserializeOwned, - V: Debug + Clone + DeserializeOwned, - B: BlockStore, + K: Storable + Debug + Clone + AsRef<[u8]>, + V: Storable + Debug + Clone, + K::Serializable: Serialize + DeserializeOwned, + V::Serializable: Serialize + DeserializeOwned, { for change in changes { if let Change::Add(k, _) = change { diff --git a/wnfs-hamt/src/strategies/kv.rs b/wnfs-hamt/src/strategies/kv.rs index c5220a80..fa97449b 100644 --- a/wnfs-hamt/src/strategies/kv.rs +++ b/wnfs-hamt/src/strategies/kv.rs @@ -5,7 +5,7 @@ use serde::{de::DeserializeOwned, Serialize}; use std::{collections::HashMap, fmt::Debug, hash::Hash}; use wnfs_common::{ utils::{Arc, CondSync}, - BlockStore, + BlockStore, Storable, }; //-------------------------------------------------------------------------------------------------- @@ -33,8 +33,10 @@ pub async fn node_from_kvs( store: &impl BlockStore, ) -> Result>> where - K: DeserializeOwned + Serialize + Clone + Debug + AsRef<[u8]> + CondSync, - V: DeserializeOwned + Serialize + Clone + Debug + CondSync, + K: Storable + Clone + Debug + AsRef<[u8]> + CondSync, + V: Storable + Clone + Debug + CondSync, + K::Serializable: Serialize + DeserializeOwned, + V::Serializable: Serialize + DeserializeOwned, { let mut node: Arc> = Arc::new(Node::default()); for (k, v) in pairs { diff --git a/wnfs-hamt/src/strategies/operations.rs b/wnfs-hamt/src/strategies/operations.rs index 257db850..cf6bf422 100644 --- a/wnfs-hamt/src/strategies/operations.rs +++ b/wnfs-hamt/src/strategies/operations.rs @@ -5,7 +5,7 @@ use serde::{de::DeserializeOwned, Serialize}; use std::{collections::HashMap, fmt::Debug, hash::Hash}; use wnfs_common::{ utils::{Arc, CondSync}, - BlockStore, + BlockStore, Storable, }; //-------------------------------------------------------------------------------------------------- @@ -188,8 +188,10 @@ pub async fn node_from_operations( store: &impl BlockStore, ) -> Result>> where - K: DeserializeOwned + Serialize + Clone + Debug + AsRef<[u8]>, - V: DeserializeOwned + Serialize + Clone + Debug, + K: Storable + Clone + Debug + AsRef<[u8]>, + V: Storable + Clone + Debug, + K::Serializable: Serialize + DeserializeOwned, + V::Serializable: Serialize + DeserializeOwned, { let mut node: Arc> = Arc::new(Node::default()); for op in &operations.0 { @@ -201,8 +203,8 @@ where node.remove(key, store).await?; } Operation::Reserialize => { - let cid = store.put_async_serializable(node.as_ref()).await?; - node = Arc::new(store.get_deserializable(&cid).await?); + let cid = node.store(store).await?; + node = Arc::new(Node::::load(&cid, store).await?); } }; } diff --git a/wnfs-nameaccumulator/Cargo.toml b/wnfs-nameaccumulator/Cargo.toml index ea37cb6e..00a6ff33 100644 --- a/wnfs-nameaccumulator/Cargo.toml +++ b/wnfs-nameaccumulator/Cargo.toml @@ -18,6 +18,7 @@ authors = ["The Fission Authors"] [dependencies] anyhow = "1.0" +async-trait = "0.1" blake3 = { version = "1.4", features = ["traits-preview"] } libipld = { version = "0.16", features = ["dag-cbor", "derive", "serde-codec"] } num-bigint-dig = { version = "0.8.2", features = ["prime", "zeroize"] } @@ -28,6 +29,7 @@ rand_core = "0.6" serde = { version = "1.0", features = ["rc"] } serde_bytes = "0.11.9" thiserror = "1.0" +wnfs-common = { path = "../wnfs-common", version = "=0.1.25" } zeroize = "1.6" [dev-dependencies] diff --git a/wnfs-nameaccumulator/src/name.rs b/wnfs-nameaccumulator/src/name.rs index 7fec0d53..425d9a22 100644 --- a/wnfs-nameaccumulator/src/name.rs +++ b/wnfs-nameaccumulator/src/name.rs @@ -11,6 +11,7 @@ use once_cell::sync::OnceCell; use rand_core::CryptoRngCore; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::{hash::Hash, str::FromStr}; +use wnfs_common::impl_storable_from_serde; use zeroize::Zeroize; /// The domain separation string for deriving the l hash in the PoKE* protocol. @@ -433,6 +434,8 @@ impl<'a> BatchedProofVerification<'a> { } } +impl_storable_from_serde! { AccumulatorSetup, NameSegment, NameAccumulator, UnbatchableProofPart, BatchedProofPart } + impl Serialize for NameSegment { fn serialize(&self, serializer: S) -> Result where diff --git a/wnfs-unixfs-file/Cargo.toml b/wnfs-unixfs-file/Cargo.toml index 462a8bae..b1890a0f 100644 --- a/wnfs-unixfs-file/Cargo.toml +++ b/wnfs-unixfs-file/Cargo.toml @@ -19,6 +19,7 @@ authors = ["The Fission Authors"] [dependencies] anyhow = "1.0" async-stream = "0.3" +async-trait = "0.1" bytes = "1.5" futures = "0.3" libipld = { version = "0.16", features = [] } diff --git a/wnfs-unixfs-file/src/unixfs.rs b/wnfs-unixfs-file/src/unixfs.rs index 76106ed1..74b02658 100644 --- a/wnfs-unixfs-file/src/unixfs.rs +++ b/wnfs-unixfs-file/src/unixfs.rs @@ -5,6 +5,7 @@ use crate::{ types::{Block, Link, LinkRef, Links, PbLinks}, }; use anyhow::{anyhow, bail, ensure, Result}; +use async_trait::async_trait; use bytes::Bytes; use futures::FutureExt; use libipld::Cid; @@ -18,7 +19,7 @@ use std::{ use tokio::io::{AsyncRead, AsyncSeek}; use wnfs_common::{ utils::{boxed_fut, BoxFuture}, - BlockStore, + BlockStore, LoadIpld, Storable, StoreIpld, }; #[derive( @@ -222,6 +223,36 @@ impl UnixFsFile { } } +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl Storable for UnixFsFile { + type Serializable = UnixFsFile; + + async fn to_serializable(&self, _store: &impl BlockStore) -> Result { + Ok(self.clone()) + } + + async fn from_serializable( + _cid: Option<&Cid>, + serializable: Self::Serializable, + ) -> Result { + Ok(serializable) + } +} + +impl StoreIpld for UnixFsFile { + fn encode_ipld(&self) -> Result<(Bytes, u64)> { + let (codec, bytes, _) = self.encode()?.into_parts(); + Ok((bytes, codec.into())) + } +} + +impl LoadIpld for UnixFsFile { + fn decode_ipld(cid: &Cid, bytes: Bytes) -> Result { + UnixFsFile::decode(cid, bytes) + } +} + #[derive(Debug)] pub struct UnixFsFileReader<'a, B: BlockStore> { root_node: UnixFsFile, diff --git a/wnfs-wasm/src/fs/private/forest.rs b/wnfs-wasm/src/fs/private/forest.rs index 2fbb8925..2925ab21 100644 --- a/wnfs-wasm/src/fs/private/forest.rs +++ b/wnfs-wasm/src/fs/private/forest.rs @@ -8,8 +8,11 @@ use libipld_core::cid::Cid; use std::rc::Rc; use wasm_bindgen::prelude::wasm_bindgen; use wasm_bindgen_futures::future_to_promise; -use wnfs::private::forest::{ - hamt::HamtForest as WnfsHamtForest, traits::PrivateForest as WnfsPrivateForest, +use wnfs::{ + common::Storable, + private::forest::{ + hamt::HamtForest as WnfsHamtForest, traits::PrivateForest as WnfsPrivateForest, + }, }; use wnfs_nameaccumulator::AccumulatorSetup; @@ -47,7 +50,7 @@ impl PrivateForest { let cid = Cid::read_bytes(&cid[..]).map_err(error("Cannot parse cid"))?; Ok(future_to_promise(async move { - let forest: WnfsHamtForest = WnfsHamtForest::load(&cid, &store) + let forest = WnfsHamtForest::load(&cid, &store) .await .map_err(error("Couldn't deserialize forest"))?; @@ -103,7 +106,7 @@ impl PrivateForest { Ok(value!(diff .into_iter() - .map(|c| value!(ForestChange(c))) + .map(|c| value!(ForestChange(c.map(&|c| c.0)))) .collect::())) })) } diff --git a/wnfs-wasm/src/fs/public/directory.rs b/wnfs-wasm/src/fs/public/directory.rs index 4bb78b8d..9f2961c1 100644 --- a/wnfs-wasm/src/fs/public/directory.rs +++ b/wnfs-wasm/src/fs/public/directory.rs @@ -15,7 +15,7 @@ use std::rc::Rc; use wasm_bindgen::{prelude::wasm_bindgen, JsValue}; use wasm_bindgen_futures::future_to_promise; use wnfs::{ - common::BlockStore as WnfsBlockStore, + common::Storable, public::{PublicDirectory as WnfsPublicDirectory, PublicNode as WnfsPublicNode}, traits::Id, }; @@ -97,8 +97,7 @@ impl PublicDirectory { let cid = Cid::read_bytes(&cid[..]).map_err(error("Cannot parse cid"))?; Ok(future_to_promise(async move { - let directory: WnfsPublicDirectory = store - .get_deserializable(&cid) + let directory = WnfsPublicDirectory::load(&cid, &store) .await .map_err(error("Couldn't deserialize directory"))?; diff --git a/wnfs-wasm/src/fs/public/file.rs b/wnfs-wasm/src/fs/public/file.rs index a4af17ae..58c95643 100644 --- a/wnfs-wasm/src/fs/public/file.rs +++ b/wnfs-wasm/src/fs/public/file.rs @@ -11,7 +11,7 @@ use std::rc::Rc; use wasm_bindgen::{prelude::wasm_bindgen, JsValue}; use wasm_bindgen_futures::future_to_promise; use wnfs::{ - common::BlockStore as WnfsBlockStore, + common::Storable, public::{PublicFile as WnfsPublicFile, PublicNode as WnfsPublicNode}, traits::Id, }; @@ -66,8 +66,7 @@ impl PublicFile { let cid = Cid::try_from(cid).map_err(|e| Error::new(&format!("Cannot parse cid: {e}")))?; Ok(future_to_promise(async move { - let file: WnfsPublicFile = store - .get_deserializable(&cid) + let file = WnfsPublicFile::load(&cid, &store) .await .map_err(|e| Error::new(&format!("Couldn't deserialize directory: {e}")))?; diff --git a/wnfs/examples/private.rs b/wnfs/examples/private.rs index e525bf03..ac6c294d 100644 --- a/wnfs/examples/private.rs +++ b/wnfs/examples/private.rs @@ -14,7 +14,7 @@ use wnfs::{ AccessKey, PrivateDirectory, PrivateNode, }, }; -use wnfs_common::utils::CondSend; +use wnfs_common::{utils::CondSend, Storable}; #[async_std::main] async fn main() -> Result<()> { diff --git a/wnfs/examples/tiered_blockstores.rs b/wnfs/examples/tiered_blockstores.rs index 473b5617..3e36bb0a 100644 --- a/wnfs/examples/tiered_blockstores.rs +++ b/wnfs/examples/tiered_blockstores.rs @@ -17,7 +17,7 @@ use wnfs::{ PrivateDirectory, PrivateNode, }, }; -use wnfs_common::utils::CondSend; +use wnfs_common::{utils::CondSend, Storable}; #[async_std::main] async fn main() -> Result<()> { diff --git a/wnfs/examples/write_proofs.rs b/wnfs/examples/write_proofs.rs index cf5955ea..492c4e82 100644 --- a/wnfs/examples/write_proofs.rs +++ b/wnfs/examples/write_proofs.rs @@ -16,6 +16,7 @@ use wnfs::{ AccessKey, PrivateDirectory, PrivateNode, }, }; +use wnfs_common::Storable; #[async_std::main] async fn main() -> Result<()> { diff --git a/wnfs/src/lib.rs b/wnfs/src/lib.rs index c5cd4dda..bbb7c8c8 100644 --- a/wnfs/src/lib.rs +++ b/wnfs/src/lib.rs @@ -12,8 +12,8 @@ //! use anyhow::Result; //! use chrono::Utc; //! use wnfs::{ -//! common::MemoryBlockStore, -//! public::PublicDirectory +//! common::{MemoryBlockStore, Storable}, +//! public::PublicDirectory, //! }; //! //! #[async_std::main] @@ -22,7 +22,7 @@ //! let dir = &mut PublicDirectory::new_rc(Utc::now()); //! //! // Create an in-memory block store. -//! let store = &MemoryBlockStore::default(); +//! let store = &MemoryBlockStore::new(); //! //! // Add a /pictures/cats subdirectory. //! dir.mkdir(&["pictures".into(), "cats".into()], Utc::now(), store) diff --git a/wnfs/src/private/directory.rs b/wnfs/src/private/directory.rs index 82eed11c..89313bba 100644 --- a/wnfs/src/private/directory.rs +++ b/wnfs/src/private/directory.rs @@ -2248,7 +2248,7 @@ mod snapshot_tests { use chrono::TimeZone; use rand_chacha::ChaCha12Rng; use rand_core::SeedableRng; - use wnfs_common::utils::SnapshotBlockStore; + use wnfs_common::{utils::SnapshotBlockStore, Storable}; #[async_std::test] async fn test_private_fs() -> Result<()> { diff --git a/wnfs/src/private/forest/hamt.rs b/wnfs/src/private/forest/hamt.rs index 0951d191..a93796fb 100644 --- a/wnfs/src/private/forest/hamt.rs +++ b/wnfs/src/private/forest/hamt.rs @@ -2,18 +2,21 @@ use super::traits::PrivateForest; use crate::error::FsError; use anyhow::Result; use async_trait::async_trait; -use libipld_core::{cid::Cid, ipld::Ipld}; +use libipld_core::cid::Cid; use quick_cache::sync::Cache; use rand_core::CryptoRngCore; -use serde::{ - de::Error as DeError, ser::Error as SerError, Deserialize, Deserializer, Serialize, Serializer, -}; +use semver::Version; +use serde::{Deserialize, Serialize}; use std::collections::BTreeSet; use wnfs_common::{ + impl_storable_from_serde, utils::{Arc, CondSend}, - AsyncSerialize, BlockStore, HashOutput, Link, + BlockStore, HashOutput, Link, Storable, +}; +use wnfs_hamt::{ + constants::HAMT_VERSION, merge, serializable::NodeSerializable, Hamt, Hasher, KeyValueChange, + Node, Pair, }; -use wnfs_hamt::{merge, Hamt, Hasher, KeyValueChange, Pair}; use wnfs_nameaccumulator::{AccumulatorSetup, ElementsProof, Name, NameAccumulator}; const APPROX_CACHE_ENTRY_SIZE: usize = @@ -45,11 +48,25 @@ const NAME_CACHE_CAPACITY: usize = 2_000_000 / APPROX_CACHE_ENTRY_SIZE; /// ``` #[derive(Debug, Clone)] pub struct HamtForest { - hamt: Hamt, blake3::Hasher>, + hamt: Hamt, accumulator: AccumulatorSetup, name_cache: Arc>, } +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct HamtForestSerializable { + pub(crate) root: NodeSerializable, + pub(crate) version: Version, + pub(crate) structure: String, + pub(crate) accumulator: AccumulatorSetup, +} + +/// Links to ciphertexts +#[derive(Serialize, Deserialize, Debug, Clone, Hash, Eq, PartialEq)] +pub struct Ciphertexts(pub BTreeSet); + +impl_storable_from_serde! { Ciphertexts } + //-------------------------------------------------------------------------------------------------- // Implementations //-------------------------------------------------------------------------------------------------- @@ -114,7 +131,7 @@ impl HamtForest { &self, other: &Self, store: &impl BlockStore, - ) -> Result>>> { + ) -> Result>> { if self.accumulator != other.accumulator { return Err(FsError::IncompatibleAccumulatorSetups.into()); } @@ -122,14 +139,95 @@ impl HamtForest { self.hamt.diff(&other.hamt, store).await } - /// Serializes the forest and stores it in the given block store. - pub async fn store(&self, store: &impl BlockStore) -> Result { - store.put_async_serializable(self).await - } + /// Merges a private forest with another. If there is a conflict with the values,they are union + /// combined into a single value in the final merge node + /// + /// # Examples + /// + /// ``` + /// use std::sync::Arc; + /// use anyhow::Result; + /// use chrono::Utc; + /// use rand_chacha::ChaCha12Rng; + /// use rand_core::SeedableRng; + /// use futures::StreamExt; + /// use wnfs::{ + /// common::MemoryBlockStore, + /// private::{ + /// PrivateDirectory, + /// forest::{hamt::HamtForest, traits::PrivateForest}, + /// }, + /// }; + /// + /// #[async_std::main] + /// async fn main() -> Result<()> { + /// let store = &mut MemoryBlockStore::new(); + /// let rng = &mut ChaCha12Rng::from_entropy(); + /// + /// let forest = &mut HamtForest::new_rsa_2048_rc(rng); + /// let root_dir = &mut PrivateDirectory::new_and_store( + /// &forest.empty_name(), + /// Utc::now(), + /// forest, + /// store, + /// rng + /// ).await?; + /// root_dir.as_node().store(forest, store, rng).await?; + /// + /// // Make two conflicting writes + /// let forest_one = &mut Arc::clone(forest); + /// let dir_one = &mut Arc::clone(root_dir); + /// dir_one.mkdir(&["DirOne".into()], true, Utc::now(), forest_one, store, rng).await?; + /// dir_one.as_node().store(forest_one, store, rng).await?; + /// + /// let forest_two = &mut Arc::clone(forest); + /// let dir_two = &mut Arc::clone(root_dir); + /// dir_two.mkdir(&["DirTwo".into()], true, Utc::now(), forest_two, store, rng).await?; + /// let access_key = dir_two.as_node().store(forest_two, store, rng).await?; + /// let label = access_key.get_label(); + /// let key = access_key.get_temporal_key()?; + /// + /// // Merge the forests together + /// let forest_merged = forest_one.merge(forest_two, store).await?; + /// + /// let multivalue: Vec<_> = forest_merged + /// .get_multivalue_by_hash(label, key, store, None) + /// .collect::>() + /// .await + /// .into_iter() + /// .filter_map(|result| result.ok()) + /// .collect::>(); + /// + /// // There's two conflicting values in the slot + /// assert_eq!(2, multivalue.len()); + /// + /// Ok(()) + /// } + /// ``` + pub async fn merge(&self, other: &Self, store: &impl BlockStore) -> Result { + if self.accumulator != other.accumulator { + return Err(FsError::IncompatibleAccumulatorSetups.into()); + } - /// Deserializes a forest from the given block store. - pub async fn load(cid: &Cid, store: &impl BlockStore) -> Result { - store.get_deserializable(cid).await + let merged_root = merge( + Link::from(Arc::clone(&self.hamt.root)), + Link::from(Arc::clone(&other.hamt.root)), + |a, b| Ok(Ciphertexts(a.0.union(&b.0).cloned().collect())), + store, + ) + .await?; + + // TODO(matheus23) Should we find some way to sensibly merge caches? + let name_cache = self.name_cache.clone(); + + Ok(Self { + hamt: Hamt { + version: self.hamt.version.clone(), + root: merged_root, + }, + accumulator: self.accumulator.clone(), + name_cache, + }) } } @@ -186,12 +284,11 @@ impl PrivateForest for HamtForest { let values = values.into_iter(); match self.hamt.root.get_mut(&accumulator, store).await? { - Some(cids) => cids.extend(values), + Some(ciphers) => ciphers.0.extend(values), None => { - self.hamt - .root - .set(accumulator.clone(), values.collect(), store) - .await?; + let label = accumulator.clone(); + let ciphers = Ciphertexts(values.collect()); + self.hamt.root.set(label, ciphers, store).await?; } } @@ -204,7 +301,12 @@ impl PrivateForest for HamtForest { name_hash: &HashOutput, store: &impl BlockStore, ) -> Result>> { - self.hamt.root.get_by_hash(name_hash, store).await + Ok(self + .hamt + .root + .get_by_hash(name_hash, store) + .await? + .map(|ciphers| &ciphers.0)) } async fn get_encrypted( @@ -222,7 +324,15 @@ impl PrivateForest for HamtForest { store: &impl BlockStore, ) -> Result>>> { let name_hash = &blake3::Hasher::hash(&self.get_accumulated_name(name)); - self.hamt.root.remove_by_hash(name_hash, store).await + Ok(self + .hamt + .root + .remove_by_hash(name_hash, store) + .await? + .map(|Pair { key, value }| Pair { + key, + value: value.0, + })) } } @@ -287,151 +397,30 @@ impl PrivateForest for Arc { } } -impl HamtForest { - /// Merges a private forest with another. If there is a conflict with the values,they are union - /// combined into a single value in the final merge node - /// - /// # Examples - /// - /// ``` - /// use std::sync::Arc; - /// use anyhow::Result; - /// use chrono::Utc; - /// use rand_chacha::ChaCha12Rng; - /// use rand_core::SeedableRng; - /// use futures::StreamExt; - /// use wnfs::{ - /// common::MemoryBlockStore, - /// private::{ - /// PrivateDirectory, - /// forest::{hamt::HamtForest, traits::PrivateForest}, - /// }, - /// }; - /// - /// #[async_std::main] - /// async fn main() -> Result<()> { - /// let store = &mut MemoryBlockStore::new(); - /// let rng = &mut ChaCha12Rng::from_entropy(); - /// - /// let forest = &mut HamtForest::new_rsa_2048_rc(rng); - /// let root_dir = &mut PrivateDirectory::new_and_store( - /// &forest.empty_name(), - /// Utc::now(), - /// forest, - /// store, - /// rng - /// ).await?; - /// root_dir.as_node().store(forest, store, rng).await?; - /// - /// // Make two conflicting writes - /// let forest_one = &mut Arc::clone(forest); - /// let dir_one = &mut Arc::clone(root_dir); - /// dir_one.mkdir(&["DirOne".into()], true, Utc::now(), forest_one, store, rng).await?; - /// dir_one.as_node().store(forest_one, store, rng).await?; - /// - /// let forest_two = &mut Arc::clone(forest); - /// let dir_two = &mut Arc::clone(root_dir); - /// dir_two.mkdir(&["DirTwo".into()], true, Utc::now(), forest_two, store, rng).await?; - /// let access_key = dir_two.as_node().store(forest_two, store, rng).await?; - /// let label = access_key.get_label(); - /// let key = access_key.get_temporal_key()?; - /// - /// // Merge the forests together - /// let forest_merged = forest_one.merge(forest_two, store).await?; - /// - /// let multivalue: Vec<_> = forest_merged - /// .get_multivalue_by_hash(label, key, store, None) - /// .collect::>() - /// .await - /// .into_iter() - /// .filter_map(|result| result.ok()) - /// .collect::>(); - /// - /// // There's two conflicting values in the slot - /// assert_eq!(2, multivalue.len()); - /// - /// Ok(()) - /// } - /// ``` - pub async fn merge(&self, other: &Self, store: &impl BlockStore) -> Result { - if self.accumulator != other.accumulator { - return Err(FsError::IncompatibleAccumulatorSetups.into()); - } - - let merged_root = merge( - Link::from(Arc::clone(&self.hamt.root)), - Link::from(Arc::clone(&other.hamt.root)), - |a, b| Ok(a.union(b).cloned().collect()), - store, - ) - .await?; - - // TODO(matheus23) Should we find some way to sensibly merge caches? - let name_cache = self.name_cache.clone(); - - Ok(Self { - hamt: Hamt { - version: self.hamt.version.clone(), - root: merged_root, - }, - accumulator: self.accumulator.clone(), - name_cache, - }) - } -} - #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] -impl AsyncSerialize for HamtForest { - async fn async_serialize(&self, serializer: S, store: &B) -> Result - where - S: Serializer + CondSend, - B: BlockStore + ?Sized, - { - let hamt_ipld = self - .hamt - .async_serialize(libipld_core::serde::Serializer, store) - .await - .map_err(serde::ser::Error::custom)?; - - let accumulator_ipld = self - .accumulator - .serialize(libipld_core::serde::Serializer) - .map_err(serde::ser::Error::custom)?; - - let Ipld::Map(mut ipld_map) = hamt_ipld else { - let msg = - format!("Expected HAMT root to serialize to an IPLD map, but got {hamt_ipld:#?}"); - return Err(SerError::custom(FsError::InvalidDeserialization(msg))); - }; - - ipld_map.insert("accumulator".into(), accumulator_ipld); - - Ipld::Map(ipld_map).serialize(serializer) +impl Storable for HamtForest { + type Serializable = HamtForestSerializable; + + async fn to_serializable(&self, store: &impl BlockStore) -> Result { + Ok(HamtForestSerializable { + root: self.hamt.root.to_serializable(store).await?, + version: HAMT_VERSION, + accumulator: self.accumulator.to_serializable(store).await?, + structure: "hamt".to_string(), + }) } -} - -impl<'de> Deserialize<'de> for HamtForest { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let ipld: Ipld = Deserialize::deserialize(deserializer)?; - let hamt = Hamt::deserialize(ipld.clone()).map_err(serde::de::Error::custom)?; - let Ipld::Map(ipld_map) = ipld else { - let msg = format!("Expected IPLD Map representing a private forest, but got {ipld:#?}"); - return Err(DeError::custom(FsError::InvalidDeserialization(msg))); - }; - let Some(accumulator_ipld) = ipld_map.get("accumulator").cloned() else { - let msg = "IPLD Map entry for 'accumulator' missing in private forest".to_string(); - return Err(DeError::custom(FsError::InvalidDeserialization(msg))); - }; - let accumulator = - AccumulatorSetup::deserialize(accumulator_ipld).map_err(serde::de::Error::custom)?; + async fn from_serializable( + _cid: Option<&Cid>, + serializable: Self::Serializable, + ) -> Result { Ok(Self { - hamt, - accumulator, + hamt: Hamt::with_root(Arc::new( + Node::from_serializable(None, serializable.root).await?, + )), + accumulator: AccumulatorSetup::from_serializable(None, serializable.accumulator) + .await?, name_cache: Arc::new(Cache::new(NAME_CACHE_CAPACITY)), }) } diff --git a/wnfs/src/public/directory.rs b/wnfs/src/public/directory.rs index b5439bc1..8c534e8f 100644 --- a/wnfs/src/public/directory.rs +++ b/wnfs/src/public/directory.rs @@ -8,17 +8,13 @@ use crate::{ }; use anyhow::{bail, ensure, Result}; use async_once_cell::OnceCell; -use async_recursion::async_recursion; use async_trait::async_trait; use chrono::{DateTime, Utc}; use libipld_core::cid::Cid; -use serde::{ - de::Error as DeError, ser::Error as SerError, Deserialize, Deserializer, Serialize, Serializer, -}; use std::collections::{BTreeMap, BTreeSet}; use wnfs_common::{ - utils::{error, Arc, CondSend}, - AsyncSerialize, BlockStore, Metadata, RemembersCid, + utils::{error, Arc}, + BlockStore, Metadata, NodeType, Storable, }; //-------------------------------------------------------------------------------------------------- @@ -40,9 +36,9 @@ use wnfs_common::{ #[derive(Debug)] pub struct PublicDirectory { persisted_as: OnceCell, - pub metadata: Metadata, - pub userland: BTreeMap, - pub previous: BTreeSet, + pub(crate) metadata: Metadata, + pub(crate) userland: BTreeMap, + pub(crate) previous: BTreeSet, } //-------------------------------------------------------------------------------------------------- @@ -799,62 +795,6 @@ impl PublicDirectory { Ok(()) } - - /// Stores directory in provided block store. - /// - /// This function can be recursive if the directory contains other directories. - /// - /// # Examples - /// - /// ``` - /// use wnfs::{ - /// public::PublicDirectory, - /// traits::Id, - /// common::{BlockStore, MemoryBlockStore} - /// }; - /// use chrono::Utc; - /// - /// #[async_std::main] - /// async fn main() { - /// let store = &MemoryBlockStore::default(); - /// let dir = PublicDirectory::new(Utc::now()); - /// - /// let cid = dir.store(store).await.unwrap(); - /// - /// assert_eq!( - /// dir, - /// store.get_deserializable(&cid).await.unwrap() - /// ); - /// } - /// ``` - #[cfg_attr(not(target_arch = "wasm32"), async_recursion)] - #[cfg_attr(target_arch = "wasm32", async_recursion(?Send))] - pub async fn store(&self, store: &impl BlockStore) -> Result { - Ok(*self - .persisted_as - .get_or_try_init(store.put_async_serializable(self)) - .await?) - } - - /// Creates a new directory from provided serializable. - pub(crate) fn from_serializable(serializable: PublicDirectorySerializable) -> Result { - if !is_readable_wnfs_version(&serializable.version) { - bail!(FsError::UnexpectedVersion(serializable.version)) - } - - let userland = serializable - .userland - .into_iter() - .map(|(name, cid)| (name, PublicLink::from_cid(cid))) - .collect(); - - Ok(Self { - persisted_as: OnceCell::new(), - metadata: serializable.metadata, - userland, - previous: serializable.previous.iter().cloned().collect(), - }) - } } impl Id for PublicDirectory { @@ -887,54 +827,60 @@ impl Clone for PublicDirectory { } } -impl RemembersCid for PublicDirectory { - fn persisted_as(&self) -> &OnceCell { - &self.persisted_as - } -} - #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] -impl AsyncSerialize for PublicDirectory { - async fn async_serialize(&self, serializer: S, store: &B) -> Result - where - S: Serializer + CondSend, - B: BlockStore + ?Sized, - { - let encoded_userland = { +impl Storable for PublicDirectory { + type Serializable = PublicNodeSerializable; + + async fn to_serializable(&self, store: &impl BlockStore) -> Result { + let userland = { let mut map = BTreeMap::new(); for (name, link) in self.userland.iter() { - map.insert( - name.clone(), - *link.resolve_cid(store).await.map_err(SerError::custom)?, - ); + map.insert(name.clone(), link.resolve_cid(store).await?); } map }; - (PublicNodeSerializable::Dir(PublicDirectorySerializable { + Ok(PublicNodeSerializable::Dir(PublicDirectorySerializable { version: WNFS_VERSION, metadata: self.metadata.clone(), - userland: encoded_userland, + userland, previous: self.previous.iter().cloned().collect(), })) - .serialize(serializer) } -} -impl<'de> Deserialize<'de> for PublicDirectory { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - match PublicNodeSerializable::deserialize(deserializer)? { - PublicNodeSerializable::Dir(dir) => { - PublicDirectory::from_serializable(dir).map_err(DeError::custom) - } - _ => Err(DeError::custom(FsError::InvalidDeserialization( - "Expected directory".into(), - ))), + async fn from_serializable( + cid: Option<&Cid>, + serializable: Self::Serializable, + ) -> Result { + println!( + "Public Directory from serializable cid: {}", + cid.map(Cid::to_string).unwrap_or("None".to_string()) + ); + let PublicNodeSerializable::Dir(serializable) = serializable else { + bail!(FsError::UnexpectedNodeType(NodeType::PublicFile)); + }; + + if !is_readable_wnfs_version(&serializable.version) { + bail!(FsError::UnexpectedVersion(serializable.version)) } + + let userland = serializable + .userland + .into_iter() + .map(|(name, cid)| (name, PublicLink::from_cid(cid))) + .collect(); + + Ok(Self { + persisted_as: cid.cloned().map(OnceCell::new_with).unwrap_or_default(), + metadata: serializable.metadata, + userland, + previous: serializable.previous.iter().cloned().collect(), + }) + } + + fn persisted_as(&self) -> Option<&OnceCell> { + Some(&self.persisted_as) } } @@ -1299,7 +1245,10 @@ mod tests { root_dir.mkdir(&["test".into()], time, store).await.unwrap(); - let ipld = root_dir.async_serialize_ipld(store).await.unwrap(); + let ipld = store + .get_deserializable::(&root_dir.store(store).await.unwrap()) + .await + .unwrap(); match ipld { Ipld::Map(map) => match map.get("wnfs/pub/dir") { Some(Ipld::Map(content)) => match content.get("previous") { @@ -1390,6 +1339,8 @@ mod snapshot_tests { let root_dir = &mut PublicDirectory::new_rc(time); let _ = root_dir.store(store).await.unwrap(); + assert!(root_dir.persisted_as().and_then(OnceCell::get).is_some()); + for path in paths.iter() { root_dir .write(path, b"Hello".to_vec(), time, store) diff --git a/wnfs/src/public/file.rs b/wnfs/src/public/file.rs index 9d599fd2..1dfc6e3d 100644 --- a/wnfs/src/public/file.rs +++ b/wnfs/src/public/file.rs @@ -8,15 +8,12 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; use futures::{AsyncRead, AsyncReadExt}; use libipld_core::cid::Cid; -use serde::{ - de::Error as DeError, ser::Error as SerError, Deserialize, Deserializer, Serialize, Serializer, -}; use std::{collections::BTreeSet, io::SeekFrom}; use tokio::io::AsyncSeekExt; use tokio_util::compat::{FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt}; use wnfs_common::{ utils::{Arc, CondSend}, - AsyncSerialize, BlockStore, Metadata, RemembersCid, + BlockStore, Link, Metadata, NodeType, Storable, }; use wnfs_unixfs_file::{builder::FileBuilder, unixfs::UnixFsFile}; @@ -35,15 +32,9 @@ use wnfs_unixfs_file::{builder::FileBuilder, unixfs::UnixFsFile}; #[derive(Debug)] pub struct PublicFile { persisted_as: OnceCell, - pub metadata: Metadata, - userland: FileUserland, - pub previous: BTreeSet, -} - -#[derive(Debug, Clone, PartialEq)] -enum FileUserland { - Loaded(UnixFsFile), - Stored(Cid), + pub(crate) metadata: Metadata, + pub(crate) userland: Link, + pub(crate) previous: BTreeSet, } //-------------------------------------------------------------------------------------------------- @@ -67,7 +58,7 @@ impl PublicFile { Self { persisted_as: OnceCell::new(), metadata: Metadata::new(time), - userland: FileUserland::Loaded(UnixFsFile::empty()), + userland: Link::from(UnixFsFile::empty()), previous: BTreeSet::new(), } } @@ -107,10 +98,11 @@ impl PublicFile { .build()? .store(store) .await?; + Ok(Self { persisted_as: OnceCell::new(), metadata: Metadata::new(time), - userland: FileUserland::Loaded(UnixFsFile::load(&content_cid, store).await?), + userland: Link::from_cid(content_cid), previous: BTreeSet::new(), }) } @@ -166,10 +158,11 @@ impl PublicFile { .build()? .store(store) .await?; + Ok(Self { persisted_as: OnceCell::new(), metadata: Metadata::new(time), - userland: FileUserland::Loaded(UnixFsFile::load(&content_cid, store).await?), + userland: Link::from_cid(content_cid), previous: BTreeSet::new(), }) } @@ -262,8 +255,9 @@ impl PublicFile { ) -> Result { let mut reader = self .userland - .get_cloned(store) + .resolve_value(store) .await? + .clone() .into_content_reader(store, None)?; reader.seek(SeekFrom::Start(byte_offset)).await?; Ok(TokioAsyncReadCompatExt::compat(reader)) @@ -345,11 +339,10 @@ impl PublicFile { .build()? .store(store) .await?; - let userland = UnixFsFile::load(&content_cid, store).await?; let file = self.prepare_next_revision(); file.metadata.upsert_mtime(time); - file.userland = FileUserland::Loaded(userland); + file.userland = Link::from_cid(content_cid); Ok(()) } @@ -384,88 +377,44 @@ impl PublicFile { pub fn get_metadata_mut_rc<'a>(self: &'a mut Arc) -> &'a mut Metadata { self.prepare_next_revision().get_metadata_mut() } +} - /// Stores file in provided block store. - /// - /// # Examples - /// - /// ``` - /// use wnfs::{ - /// public::PublicFile, - /// traits::Id, - /// common::MemoryBlockStore - /// }; - /// use chrono::Utc; - /// use libipld_core::cid::Cid; - /// - /// #[async_std::main] - /// async fn main() { - /// let store = &MemoryBlockStore::new(); - /// let file = PublicFile::new(Utc::now()); - /// - /// file.store(store).await.unwrap(); - /// } - /// ``` - pub async fn store(&self, store: &impl BlockStore) -> Result { - Ok(*self - .persisted_as - .get_or_try_init(store.put_async_serializable(self)) - .await?) +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl Storable for PublicFile { + type Serializable = PublicNodeSerializable; + + async fn to_serializable(&self, store: &impl BlockStore) -> Result { + Ok(PublicNodeSerializable::File(PublicFileSerializable { + version: WNFS_VERSION, + metadata: self.metadata.clone(), + userland: self.userland.resolve_cid(store).await?, + previous: self.previous.iter().cloned().collect(), + })) } - /// Creates a new file from a serializable. - pub(crate) fn from_serializable(serializable: PublicFileSerializable) -> Result { + async fn from_serializable( + cid: Option<&Cid>, + serializable: Self::Serializable, + ) -> Result { + let PublicNodeSerializable::File(serializable) = serializable else { + bail!(FsError::UnexpectedNodeType(NodeType::PublicDirectory)); + }; + if !is_readable_wnfs_version(&serializable.version) { bail!(FsError::UnexpectedVersion(serializable.version)) } Ok(Self { - persisted_as: OnceCell::new(), + persisted_as: cid.cloned().map(OnceCell::new_with).unwrap_or_default(), metadata: serializable.metadata, - userland: FileUserland::Stored(serializable.userland), + userland: Link::from_cid(serializable.userland), previous: serializable.previous.iter().cloned().collect(), }) } -} - -#[cfg_attr(not(target_arch = "wasm32"), async_trait)] -#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] -impl AsyncSerialize for PublicFile { - async fn async_serialize(&self, serializer: S, store: &B) -> Result - where - S: Serializer + CondSend, - S::Error: CondSend, - B: BlockStore, - { - let userland = self - .userland - .get_stored(store) - .await - .map_err(SerError::custom)?; - - PublicNodeSerializable::File(PublicFileSerializable { - version: WNFS_VERSION, - metadata: self.metadata.clone(), - userland, - previous: self.previous.iter().cloned().collect(), - }) - .serialize(serializer) - } -} -impl<'de> Deserialize<'de> for PublicFile { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - match PublicNodeSerializable::deserialize(deserializer)? { - PublicNodeSerializable::File(file) => { - PublicFile::from_serializable(file).map_err(DeError::custom) - } - _ => Err(DeError::custom(FsError::InvalidDeserialization( - "Expected directory".into(), - ))), - } + fn persisted_as(&self) -> Option<&OnceCell> { + Some(&self.persisted_as) } } @@ -499,28 +448,6 @@ impl Clone for PublicFile { } } -impl RemembersCid for PublicFile { - fn persisted_as(&self) -> &OnceCell { - &self.persisted_as - } -} - -impl FileUserland { - async fn get_cloned(&self, store: &impl BlockStore) -> Result { - match self { - Self::Loaded(file) => Ok(file.clone()), - Self::Stored(cid) => UnixFsFile::load(cid, store).await, - } - } - - async fn get_stored(&self, store: &impl BlockStore) -> Result { - match self { - Self::Loaded(file) => file.encode()?.store(store).await, - Self::Stored(cid) => Ok(*cid), - } - } -} - //-------------------------------------------------------------------------------------------------- // Tests //-------------------------------------------------------------------------------------------------- diff --git a/wnfs/src/public/link.rs b/wnfs/src/public/link.rs index 67a57ce3..34048e65 100644 --- a/wnfs/src/public/link.rs +++ b/wnfs/src/public/link.rs @@ -52,7 +52,7 @@ impl PublicLink { /// Gets the Cid stored in type. It attempts to get it from the store if it is not present in type. #[inline] - pub async fn resolve_cid(&self, store: &(impl BlockStore + ?Sized)) -> Result<&Cid> { + pub async fn resolve_cid(&self, store: &(impl BlockStore + ?Sized)) -> Result { self.0.resolve_cid(store).await } diff --git a/wnfs/src/public/node/node.rs b/wnfs/src/public/node/node.rs index b62be310..fab6ca53 100644 --- a/wnfs/src/public/node/node.rs +++ b/wnfs/src/public/node/node.rs @@ -11,12 +11,8 @@ use async_once_cell::OnceCell; use async_trait::async_trait; use chrono::{DateTime, Utc}; use libipld_core::cid::Cid; -use serde::{de::Error as DeError, Deserialize, Deserializer, Serializer}; use std::collections::BTreeSet; -use wnfs_common::{ - utils::{Arc, CondSend}, - AsyncSerialize, BlockStore, RemembersCid, -}; +use wnfs_common::{utils::Arc, BlockStore, Storable}; //-------------------------------------------------------------------------------------------------- // Type Definitions @@ -234,20 +230,6 @@ impl PublicNode { pub fn is_file(&self) -> bool { matches!(self, Self::File(_)) } - - /// Serializes a node to the block store and returns its CID. - pub async fn store(&self, store: &impl BlockStore) -> Result { - Ok(match self { - Self::File(file) => file.store(store).await?, - Self::Dir(dir) => dir.store(store).await?, - }) - } - - /// Loads a node from the block store. - #[inline] - pub async fn load(cid: &Cid, store: &impl BlockStore) -> Result { - store.get_deserializable(cid).await - } } impl Id for PublicNode { @@ -285,46 +267,37 @@ impl From for PublicNode { } } -impl<'de> Deserialize<'de> for PublicNode { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - Ok(match PublicNodeSerializable::deserialize(deserializer)? { - PublicNodeSerializable::File(file) => { - let file = PublicFile::from_serializable(file).map_err(DeError::custom)?; - Self::File(Arc::new(file)) - } - PublicNodeSerializable::Dir(dir) => { - let dir = PublicDirectory::from_serializable(dir).map_err(DeError::custom)?; - Self::Dir(Arc::new(dir)) - } +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl Storable for PublicNode { + type Serializable = PublicNodeSerializable; + + async fn to_serializable(&self, store: &impl BlockStore) -> Result { + Ok(match self { + Self::File(file) => file.to_serializable(store).await?, + Self::Dir(dir) => dir.to_serializable(store).await?, }) } -} -/// Implements async deserialization for serde serializable types. -#[cfg_attr(not(target_arch = "wasm32"), async_trait)] -#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] -impl AsyncSerialize for PublicNode { - async fn async_serialize(&self, serializer: S, store: &B) -> Result - where - S: Serializer + CondSend, - S::Error: CondSend, - B: BlockStore + ?Sized, - { - match self { - Self::File(file) => file.async_serialize(serializer, store).await, - Self::Dir(dir) => dir.async_serialize(serializer, store).await, - } + async fn from_serializable( + cid: Option<&Cid>, + serializable: Self::Serializable, + ) -> Result { + // TODO(matheus23) this is weird, refactor? + Ok(match serializable { + PublicNodeSerializable::File(file) => Self::File(Arc::new( + PublicFile::from_serializable(cid, PublicNodeSerializable::File(file)).await?, + )), + PublicNodeSerializable::Dir(dir) => Self::Dir(Arc::new( + PublicDirectory::from_serializable(cid, PublicNodeSerializable::Dir(dir)).await?, + )), + }) } -} -impl RemembersCid for PublicNode { - fn persisted_as(&self) -> &OnceCell { + fn persisted_as(&self) -> Option<&OnceCell> { match self { - PublicNode::File(file) => (*file).persisted_as(), - PublicNode::Dir(dir) => (*dir).persisted_as(), + PublicNode::File(file) => file.as_ref().persisted_as(), + PublicNode::Dir(dir) => dir.as_ref().persisted_as(), } } } @@ -338,7 +311,7 @@ mod tests { use crate::public::{PublicDirectory, PublicFile, PublicNode}; use chrono::Utc; use testresult::TestResult; - use wnfs_common::MemoryBlockStore; + use wnfs_common::{MemoryBlockStore, Storable}; #[async_std::test] async fn serialized_public_node_can_be_deserialized() -> TestResult { diff --git a/wnfs/src/public/node/serializable.rs b/wnfs/src/public/node/serializable.rs index ec651156..bee3c24b 100644 --- a/wnfs/src/public/node/serializable.rs +++ b/wnfs/src/public/node/serializable.rs @@ -9,7 +9,7 @@ use wnfs_common::Metadata; //-------------------------------------------------------------------------------------------------- #[derive(Debug, Clone, Serialize, Deserialize)] -pub(crate) enum PublicNodeSerializable { +pub enum PublicNodeSerializable { #[serde(rename = "wnfs/pub/file")] File(PublicFileSerializable), #[serde(rename = "wnfs/pub/dir")] @@ -17,17 +17,17 @@ pub(crate) enum PublicNodeSerializable { } #[derive(Debug, Clone, Serialize, Deserialize)] -pub(crate) struct PublicFileSerializable { +pub struct PublicFileSerializable { pub version: Version, pub metadata: Metadata, - pub userland: Cid, pub previous: Vec, + pub userland: Cid, } #[derive(Debug, Clone, Serialize, Deserialize)] -pub(crate) struct PublicDirectorySerializable { +pub struct PublicDirectorySerializable { pub version: Version, pub metadata: Metadata, - pub userland: BTreeMap, pub previous: Vec, + pub userland: BTreeMap, } diff --git a/wnfs/src/root_tree.rs b/wnfs/src/root_tree.rs index 80e3f33b..ec54ad52 100644 --- a/wnfs/src/root_tree.rs +++ b/wnfs/src/root_tree.rs @@ -26,7 +26,7 @@ use std::collections::HashMap; use wnfs_common::MemoryBlockStore; use wnfs_common::{ utils::{Arc, CondSend}, - BlockStore, Metadata, + BlockStore, Metadata, Storable, }; #[cfg(test)] use wnfs_nameaccumulator::AccumulatorSetup; @@ -320,8 +320,8 @@ where ) -> Result> { let deserialized: RootTreeSerializable = store.get_deserializable(cid).await?; let forest = Arc::new(HamtForest::load(&deserialized.forest, store).await?); - let public_root = Arc::new(store.get_deserializable(&deserialized.public).await?); - let exchange_root = Arc::new(store.get_deserializable(&deserialized.exchange).await?); + let public_root = Arc::new(PublicDirectory::load(&deserialized.public, store).await?); + let exchange_root = Arc::new(PublicDirectory::load(&deserialized.exchange, store).await?); Ok(Self { store,