diff --git a/nomos-core/src/blob/mod.rs b/nomos-core/src/blob/mod.rs new file mode 100644 index 000000000..f7780db4a --- /dev/null +++ b/nomos-core/src/blob/mod.rs @@ -0,0 +1,13 @@ +use bytes::Bytes; +use std::hash::Hash; + +pub type BlobHasher = fn(&T) -> ::Hash; + +pub trait Blob { + const HASHER: BlobHasher; + type Hash: Hash + Eq + Clone; + fn hash(&self) -> Self::Hash { + Self::HASHER(self) + } + fn as_bytes(&self) -> Bytes; +} diff --git a/nomos-core/src/lib.rs b/nomos-core/src/lib.rs index cb48e37b4..ded3b7a2b 100644 --- a/nomos-core/src/lib.rs +++ b/nomos-core/src/lib.rs @@ -1,4 +1,5 @@ pub mod account; +pub mod blob; pub mod block; pub mod crypto; pub mod fountain; diff --git a/nomos-services/data-availability/Cargo.toml b/nomos-services/data-availability/Cargo.toml index a4816c73f..2a60bbc41 100644 --- a/nomos-services/data-availability/Cargo.toml +++ b/nomos-services/data-availability/Cargo.toml @@ -8,6 +8,8 @@ edition = "2021" [dependencies] async-trait = "0.1" futures = "0.3" +moka = { version = "0.11", features = ["future"] } +nomos-core = { path = "../../nomos-core" } nomos-network = { path = "../network" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" } tracing = "0.1" diff --git a/nomos-services/data-availability/src/backend/memory_cache.rs b/nomos-services/data-availability/src/backend/memory_cache.rs new file mode 100644 index 000000000..a97d7d095 --- /dev/null +++ b/nomos-services/data-availability/src/backend/memory_cache.rs @@ -0,0 +1,73 @@ +use crate::backend::{DaBackend, DaError}; +use moka::future::{Cache, CacheBuilder}; +use nomos_core::blob::Blob; +use std::time::Duration; + +#[derive(Clone, Copy)] +pub struct BlobCacheSettings { + max_capacity: usize, + evicting_period: Duration, +} + +pub struct BlobCache(Cache); + +impl BlobCache +where + B: Clone + Blob + Send + Sync + 'static, + B::Hash: Send + Sync + 'static, +{ + pub fn new(settings: BlobCacheSettings) -> Self { + let BlobCacheSettings { + max_capacity, + evicting_period, + } = settings; + let cache = CacheBuilder::new(max_capacity as u64) + .time_to_live(evicting_period) + // can we leverage this to evict really old blobs? + .time_to_idle(evicting_period) + .build(); + Self(cache) + } + + pub async fn add(&self, blob: B) { + self.0.insert(blob.hash(), blob).await + } + + pub async fn remove(&self, hash: &B::Hash) { + self.0.remove(hash).await; + } + + pub fn pending_blobs(&self) -> Box + Send> { + // bypass lifetime + let blobs: Vec<_> = self.0.iter().map(|t| t.1).collect(); + Box::new(blobs.into_iter()) + } +} + +#[async_trait::async_trait] +impl DaBackend for BlobCache +where + B: Clone + Blob + Send + Sync + 'static, + B::Hash: Send + Sync + 'static, +{ + type Settings = BlobCacheSettings; + type Blob = B; + + fn new(settings: Self::Settings) -> Self { + BlobCache::new(settings) + } + + async fn add_blob(&self, blob: Self::Blob) -> Result<(), DaError> { + self.add(blob).await; + Ok(()) + } + + async fn remove_blob(&self, blob: &::Hash) -> Result<(), DaError> { + self.remove(blob).await; + Ok(()) + } + + fn pending_blobs(&self) -> Box + Send> { + BlobCache::pending_blobs(self) + } +} diff --git a/nomos-services/data-availability/src/backend/mod.rs b/nomos-services/data-availability/src/backend/mod.rs index 52ee258ad..d19c3e810 100644 --- a/nomos-services/data-availability/src/backend/mod.rs +++ b/nomos-services/data-availability/src/backend/mod.rs @@ -1,3 +1,6 @@ +mod memory_cache; + +use nomos_core::blob::Blob; use overwatch_rs::DynError; #[derive(Debug)] @@ -5,14 +8,17 @@ pub enum DaError { Dyn(DynError), } +#[async_trait::async_trait] pub trait DaBackend { type Settings: Clone; - type Blob; + type Blob: Blob; fn new(settings: Self::Settings) -> Self; - fn add_blob(&mut self, blob: Self::Blob) -> Result<(), DaError>; + async fn add_blob(&self, blob: Self::Blob) -> Result<(), DaError>; + + async fn remove_blob(&self, blob: &::Hash) -> Result<(), DaError>; fn pending_blobs(&self) -> Box + Send>; } diff --git a/nomos-services/data-availability/src/lib.rs b/nomos-services/data-availability/src/lib.rs index 9101a5bff..4b3c8205b 100644 --- a/nomos-services/data-availability/src/lib.rs +++ b/nomos-services/data-availability/src/lib.rs @@ -10,6 +10,7 @@ use tokio::sync::oneshot::Sender; // internal use crate::backend::{DaBackend, DaError}; use crate::network::NetworkAdapter; +use nomos_core::blob::Blob; use nomos_network::NetworkService; use overwatch_rs::services::handle::ServiceStateHandle; use overwatch_rs::services::relay::{Relay, RelayMessage}; @@ -27,23 +28,29 @@ where network_relay: Relay>, } -pub enum DaMsg { +pub enum DaMsg { PendingBlobs { - reply_channel: Sender + Send>>, + reply_channel: Sender + Send>>, + }, + RemoveBlobs { + blobs: Box::Hash> + Send>, }, } -impl Debug for DaMsg { +impl Debug for DaMsg { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { DaMsg::PendingBlobs { .. } => { write!(f, "DaMsg::PendingBlobs") } + DaMsg::RemoveBlobs { .. } => { + write!(f, "DaMsg::RemoveBlobs") + } } } } -impl RelayMessage for DaMsg {} +impl RelayMessage for DaMsg {} impl ServiceData for DataAvailabilityService where @@ -61,11 +68,12 @@ where #[async_trait::async_trait] impl ServiceCore for DataAvailabilityService where - B: DaBackend + Send, + B: DaBackend + Send + Sync, B::Settings: Clone + Send + Sync + 'static, B::Blob: Send, + ::Hash: Debug + Send + Sync, // TODO: Reply type must be piped together, for now empty array. - N: NetworkAdapter + Send + Sync, + N: NetworkAdapter + Send + Sync, { fn init(service_state: ServiceStateHandle) -> Result { let network_relay = service_state.overwatch_handle.relay(); @@ -109,20 +117,26 @@ where } } -async fn handle_new_blob>( +async fn handle_new_blob< + B: DaBackend, + A: NetworkAdapter, +>( backend: &mut B, adapter: &A, blob: B::Blob, ) -> Result<(), DaError> { // we need to handle the reply (verification + signature) - backend.add_blob(blob)?; + backend.add_blob(blob).await?; adapter .send_attestation([0u8; 32]) .await .map_err(DaError::Dyn) } -async fn handle_da_msg(backend: &mut B, msg: DaMsg) -> Result<(), DaError> { +async fn handle_da_msg(backend: &mut B, msg: DaMsg) -> Result<(), DaError> +where + ::Hash: Debug, +{ match msg { DaMsg::PendingBlobs { reply_channel } => { let pending_blobs = backend.pending_blobs(); @@ -130,6 +144,16 @@ async fn handle_da_msg(backend: &mut B, msg: DaMsg) -> Re tracing::debug!("Could not send pending blobs"); } } + DaMsg::RemoveBlobs { blobs } => { + let backend = &*backend; + futures::stream::iter(blobs) + .for_each_concurrent(None, |blob| async move { + if let Err(e) = backend.remove_blob(&blob).await { + tracing::debug!("Could not remove blob {blob:?} due to: {e:?}"); + } + }) + .await; + } } Ok(()) } diff --git a/nomos-services/data-availability/src/network/mod.rs b/nomos-services/data-availability/src/network/mod.rs index c09dcd31c..38dd1f2fe 100644 --- a/nomos-services/data-availability/src/network/mod.rs +++ b/nomos-services/data-availability/src/network/mod.rs @@ -1,19 +1,19 @@ // std // crates use futures::Stream; -use overwatch_rs::DynError; // internal use nomos_network::backends::NetworkBackend; use nomos_network::NetworkService; use overwatch_rs::services::relay::OutboundRelay; use overwatch_rs::services::ServiceData; +use overwatch_rs::DynError; #[async_trait::async_trait] pub trait NetworkAdapter { type Backend: NetworkBackend + 'static; type Blob: Send + Sync + 'static; - type Reply: Send + Sync + 'static; + type Attestation: Send + Sync + 'static; async fn new( network_relay: OutboundRelay< as ServiceData>::Message>, @@ -21,5 +21,5 @@ pub trait NetworkAdapter { async fn blob_stream(&self) -> Box + Unpin + Send>; - async fn send_attestation(&self, attestation: Self::Reply) -> Result<(), DynError>; + async fn send_attestation(&self, attestation: Self::Attestation) -> Result<(), DynError>; }