Skip to content

Commit

Permalink
feat: flesh out more of the leaf-protocol crate and update Iroh.
Browse files Browse the repository at this point in the history
  • Loading branch information
zicklag committed Jul 24, 2024
1 parent 3eb391a commit 65113bd
Show file tree
Hide file tree
Showing 12 changed files with 2,949 additions and 1,989 deletions.
1,072 changes: 690 additions & 382 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
resolver = "2"
members = [
"backend",
"backend/leaf-protocol",
"backend/gdata",
"backend/gdata/gdata_explorer",
"backend/leaf-protocol",
"backend/weird",
]

Expand Down
2 changes: 1 addition & 1 deletion backend/gdata/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ serde = ["dep:serde", "smallvec/serde", "smallstr/serde"]
anyhow = "1.0.86"
cobs = "0.2.3"
futures = { version = "0.3.30", default-features = false }
iroh = { version = "0.18.0", default-features = false }
iroh = { version = "0.21.0", default-features = false }
quic-rpc = "0.10.1"
quick_cache = "0.5.1"
serde = { version = "1.0", features = ["derive"], optional = true }
Expand Down
2 changes: 1 addition & 1 deletion backend/gdata/gdata_explorer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ crossterm = { version = "0.27.0", features = ["event-stream"] }
futures = "0.3.30"
futures-timer = "3.0.3"
gdata = { path = "../" }
iroh = { version = "0.18.0", default-features = false, features = ["fs-store"] }
iroh = { version = "0.21.0", default-features = false, features = ["fs-store"] }
once_cell = "1.19.0"
ratatui = "0.26.3"
tokio = { version = "1.38.0" }
9 changes: 3 additions & 6 deletions backend/gdata/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,17 @@
use anyhow::Result;
use futures::{Stream, StreamExt};
use quic_rpc::transport::flume::FlumeConnection;
use quick_cache::sync::Cache;
use smallstr::SmallString;
use smallvec::SmallVec;
use std::{future::Future, sync::Arc};

use iroh::{
base::base32,
client::RpcService,
client::Doc,
docs::{store::Query, AuthorId, NamespaceId},
};

type Doc = iroh::client::docs::Doc<FlumeConnection<RpcService>>;

#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Link {
pub namespace: NamespaceId,
Expand Down Expand Up @@ -173,7 +170,7 @@ impl<T: Into<KeySegment>, const N: usize> From<[T; N]> for Key {

#[derive(Clone, Debug)]
pub struct IrohGStore {
pub iroh: iroh::client::MemIroh,
pub iroh: iroh::client::Iroh,
pub default_author: AuthorId,
pub docs: Arc<Cache<NamespaceId, Doc>>,
}
Expand All @@ -197,7 +194,7 @@ impl IrohGStore {
/// Create a new [`IrohGStore`] that wraps an iroh client.
///
/// The `default_author` is used when writing entries if another author is not specified.
pub fn new(iroh: iroh::client::MemIroh, default_author: AuthorId) -> Self {
pub fn new(iroh: iroh::client::Iroh, default_author: AuthorId) -> Self {
Self {
iroh,
default_author,
Expand Down
9 changes: 9 additions & 0 deletions backend/leaf-protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,14 @@ name = "leaf-protocol"
version = "0.1.0"
edition = "2021"

[features]
default = ["backend_iroh"]
backend_iroh = ["iroh"]

[dependencies]
anyhow = "1.0.86"
borsh = { version = "1.5.1", features = ["derive"] }

iroh = { version = "0.21.0", optional = true }
redb = "2.1.1"
tokio = { version = "1.39.1", default-features = false, features = ["rt"] }
29 changes: 3 additions & 26 deletions backend/leaf-protocol/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,6 @@
pub mod store;
pub mod types;

pub trait KeyResolverImpl<KeyId> {
/// Returns the `EncryptionAlgoirthmId` that this implements.
fn id(&self) -> KeyId;
/// Resolve the given data to a key using this algorithm.
fn resolve(&self, data: &[u8]) -> KeyId;
pub struct Leaf<Store> {
pub store: Store,
}

pub trait EncryptionAlgorithmImpl<Digest> {
/// Returns the `EncryptionAlgoirthmId` that this implements.
fn id(&self) -> Digest;
/// Encrypts the data using the provided key.
fn encrypt(&self, key_id: [u8; 32], data: &[u8]) -> Vec<u8>;
/// Decrypts the data using the provided key.
fn decrypt(&self, key_id: [u8; 32], data: &[u8]) -> Vec<u8>;
}

pub trait LeafBackend {
/// The public key type.
type KeyId;
/// The content-addressed store digest/hash type.
type PayloadDigest;
/// Get an iterator over key resolver algorithms implemented by this backend.
fn key_resolvers(&self) -> impl Iterator<Item = &dyn KeyResolverImpl<Self::KeyId>>;
/// Get an iterator over encryption algorithms implemented by this backend.
fn encryption_algorithms(&self) -> impl Iterator<Item = &dyn EncryptionAlgorithmImpl<Self::PayloadDigest>>;
}

57 changes: 57 additions & 0 deletions backend/leaf-protocol/src/store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::future::Future;

#[cfg(feature = "backend_iroh")]
pub mod iroh;

pub trait KeyResolverImpl<KeyId> {
/// Returns the `EncryptionAlgoirthmId` that this implements.
fn id(&self) -> KeyId;
/// Resolve the given data to a key using this algorithm.
fn resolve(&self, data: &[u8]) -> KeyId;
}

pub trait EncryptionAlgorithmImpl<Digest> {
/// Returns the `EncryptionAlgoirthmId` that this implements.
fn id(&self) -> Digest;
/// Encrypts the data using the provided key.
fn encrypt(&self, key_id: [u8; 32], data: &[u8]) -> Vec<u8>;
/// Decrypts the data using the provided key.
fn decrypt(&self, key_id: [u8; 32], data: &[u8]) -> Vec<u8>;
}

pub trait LeafStore {
type NamespaceId;
type SubspaceId;
/// The content-addressed store digest/hash type.
type PayloadDigest;

/// Get an iterator over key resolver algorithms implemented by this backend.
// TODO: try avoid allocating while still being object safe.
fn key_resolvers(
&self,
) -> Box<dyn Iterator<Item = &dyn KeyResolverImpl<Self::PayloadDigest>> + '_>;
/// Get an iterator over encryption algorithms implemented by this backend.
fn encryption_algorithms(
&self,
) -> Box<dyn Iterator<Item = &dyn EncryptionAlgorithmImpl<Self::PayloadDigest>> + '_>;

fn store_blob(&self, data: &[u8]) -> impl Future<Output = anyhow::Result<Self::PayloadDigest>>;
fn get_blob(
&self,
payload: Self::PayloadDigest,
) -> impl Future<Output = anyhow::Result<Vec<u8>>>;

fn write_entity(
&self,
namespace: Self::NamespaceId,
subspace: Self::SubspaceId,
path: &[crate::types::PathComponent],
digest: Self::PayloadDigest,
) -> impl Future<Output = anyhow::Result<()>>;
fn read_entity(
&self,
namespace: Self::NamespaceId,
subspace: Self::SubspaceId,
path: &[crate::types::PathComponent],
) -> impl Future<Output = anyhow::Result<Self::PayloadDigest>>;
}
200 changes: 200 additions & 0 deletions backend/leaf-protocol/src/store/iroh.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
use std::sync::Arc;

use iroh::blobs::{format::collection::Collection, Tag};
use redb::ReadableTable;

pub type LeafIroh = crate::Leaf<LeafIrohStore>;

pub struct LeafIrohStore {
pub client: iroh::client::Iroh,
pub gc: TagGc,
}
impl LeafIrohStore {
pub fn new(client: iroh::client::Iroh, db: redb::Database) -> Self {
Self {
gc: TagGc {
iroh: client.clone(),
db: Arc::new(db),
},
client,
}
}
}

#[derive(Clone)]
pub struct TagGc {
pub iroh: iroh::client::Iroh,
pub db: Arc<redb::Database>,
}

#[derive(Eq, PartialEq, Debug, Clone, Copy)]
pub enum TagGcDecAction {
None,
DeleteTag,
}
#[derive(Eq, PartialEq, Debug, Clone, Copy)]
pub enum TagGcIncAction {
None,
CreateTag,
}

impl TagGc {
const TABLE_DEF: redb::TableDefinition<'static, iroh::blobs::Hash, u32> =
redb::TableDefinition::new("refcounts");

pub async fn decrement(&self, hash: iroh::blobs::Hash) -> anyhow::Result<TagGcDecAction> {
let this = self.clone();
let action = tokio::task::spawn_blocking(move || -> anyhow::Result<TagGcDecAction> {
let mut action = TagGcDecAction::None;
let transaction = this.db.begin_write()?;
{
let mut table = transaction.open_table(Self::TABLE_DEF)?;
let count = table.get(hash)?.map(|x| x.value()).unwrap_or_default();

if count <= 1 {
table.remove(hash)?;
action = TagGcDecAction::DeleteTag;
} else {
table.insert(hash, count - 1)?;
}
}
transaction.commit()?;

Ok(action)
})
.await??;

match action {
TagGcDecAction::None => (),
TagGcDecAction::DeleteTag => {
self.iroh
.tags()
.delete(Tag(hash.as_bytes().to_vec().into()))
.await?
}
}

Ok(action)
}

pub async fn increment(&self, hash: iroh::blobs::Hash) -> anyhow::Result<TagGcIncAction> {
let this = self.clone();
let action = tokio::task::spawn_blocking(move || -> anyhow::Result<TagGcIncAction> {
let mut action = TagGcIncAction::None;
let transaction = this.db.begin_write()?;
{
let mut table = transaction.open_table(Self::TABLE_DEF)?;
let count = table.get(hash)?.map(|x| x.value()).unwrap_or_default();

if count == 0 {
table.insert(hash, 1)?;
action = TagGcIncAction::CreateTag;
} else {
table.insert(hash, count + 1)?;
}
}

transaction.commit()?;

Ok(action)
})
.await??;

match action {
TagGcIncAction::None => (),
TagGcIncAction::CreateTag => {
let mut c = Collection::default();
c.push("".into(), hash);
self.iroh
.blobs()
.create_collection(
c,
iroh::blobs::util::SetTagOption::Named(Tag(hash
.as_bytes()
.to_vec()
.into())),
Default::default(),
)
.await?;
}
}

Ok(action)
}
}

impl crate::store::LeafStore for LeafIrohStore {
type NamespaceId = iroh::docs::NamespaceId;
type SubspaceId = iroh::docs::AuthorId;
type PayloadDigest = iroh::blobs::Hash;

fn key_resolvers(
&self,
) -> Box<dyn Iterator<Item = &dyn super::KeyResolverImpl<Self::PayloadDigest>> + '_> {
Box::new([].into_iter())
}

fn encryption_algorithms(
&self,
) -> Box<dyn Iterator<Item = &dyn super::EncryptionAlgorithmImpl<Self::PayloadDigest>> + '_>
{
Box::new([].into_iter())
}

async fn store_blob(&self, data: &[u8]) -> anyhow::Result<Self::PayloadDigest> {
let outcome = self.client.blobs().add_bytes(data.to_vec()).await?;
Ok(outcome.hash)
}
async fn get_blob(&self, payload: Self::PayloadDigest) -> anyhow::Result<Vec<u8>> {
Ok(self.client.blobs().read_to_bytes(payload).await?.into())
}

async fn write_entity(
&self,
namespace: Self::NamespaceId,
subspace: Self::SubspaceId,
path: &[crate::types::PathComponent],
digest: Self::PayloadDigest,
) -> anyhow::Result<()> {
todo!()
}
async fn read_entity(
&self,
namespace: Self::NamespaceId,
subspace: Self::SubspaceId,
path: &[crate::types::PathComponent],
) -> anyhow::Result<Self::PayloadDigest> {
todo!()
}
}

#[cfg(test)]
mod test {
use super::*;

#[tokio::test]
async fn basic_tag_gc_inc_dec() {
let node = iroh::node::Builder::default().spawn().await.unwrap();
let client = node.client().clone();
let db = redb::Database::builder()
.create_with_backend(redb::backends::InMemoryBackend::new())
.unwrap();
let blobs = client.blobs().clone();
let gc = TagGc {
iroh: client,
db: Arc::new(db),
};

let b1 = blobs.add_bytes("Hello").await.unwrap().hash;
let b2 = blobs.add_bytes("World").await.unwrap().hash;

assert_eq!(gc.increment(b1).await.unwrap(), TagGcIncAction::CreateTag);
assert_eq!(gc.increment(b1).await.unwrap(), TagGcIncAction::None);
assert_eq!(gc.increment(b2).await.unwrap(), TagGcIncAction::CreateTag);
assert_eq!(gc.decrement(b1).await.unwrap(), TagGcDecAction::None);
assert_eq!(gc.increment(b2).await.unwrap(), TagGcIncAction::None);
assert_eq!(gc.decrement(b1).await.unwrap(), TagGcDecAction::DeleteTag);
assert_eq!(gc.decrement(b2).await.unwrap(), TagGcDecAction::None);
assert_eq!(gc.decrement(b2).await.unwrap(), TagGcDecAction::DeleteTag);
}
}
Loading

0 comments on commit 65113bd

Please sign in to comment.