diff --git a/nomos-services/data-availability/src/backend/memory_cache.rs b/nomos-services/data-availability/src/backend/memory_cache.rs index 8471f06ee..a97d7d095 100644 --- a/nomos-services/data-availability/src/backend/memory_cache.rs +++ b/nomos-services/data-availability/src/backend/memory_cache.rs @@ -1,7 +1,9 @@ +use crate::backend::{DaBackend, DaError}; use moka::future::{Cache, CacheBuilder}; use nomos_core::blob::Blob; use std::time::Duration; +#[derive(Clone, Copy)] pub struct BlobCacheSettings { max_capacity: usize, evicting_period: Duration, @@ -35,7 +37,37 @@ where self.0.remove(hash).await; } - pub async fn pending_blobs(&self) -> Box + Send + '_> { - Box::new(self.0.iter().map(|t| t.1)) + pub fn pending_blobs(&self) -> Box + Send> { + // bypass lifetime + let blobs: Vec<_> = self.0.iter().map(|t| t.1).collect(); + Box::new(blobs.into_iter()) + } +} + +#[async_trait::async_trait] +impl DaBackend for BlobCache +where + B: Clone + Blob + Send + Sync + 'static, + B::Hash: Send + Sync + 'static, +{ + type Settings = BlobCacheSettings; + type Blob = B; + + fn new(settings: Self::Settings) -> Self { + BlobCache::new(settings) + } + + async fn add_blob(&self, blob: Self::Blob) -> Result<(), DaError> { + self.add(blob).await; + Ok(()) + } + + async fn remove_blob(&self, blob: &::Hash) -> Result<(), DaError> { + self.remove(blob).await; + Ok(()) + } + + fn pending_blobs(&self) -> Box + Send> { + BlobCache::pending_blobs(self) } } diff --git a/nomos-services/data-availability/src/backend/mod.rs b/nomos-services/data-availability/src/backend/mod.rs index cf4668564..d19c3e810 100644 --- a/nomos-services/data-availability/src/backend/mod.rs +++ b/nomos-services/data-availability/src/backend/mod.rs @@ -16,7 +16,9 @@ pub trait DaBackend { fn new(settings: Self::Settings) -> Self; - async fn add_blob(&mut self, blob: Self::Blob) -> Result<(), DaError>; + async fn add_blob(&self, blob: Self::Blob) -> Result<(), DaError>; - async fn pending_blobs(&self) -> Box + Send>; + async fn remove_blob(&self, blob: &::Hash) -> Result<(), DaError>; + + fn pending_blobs(&self) -> Box + Send>; } diff --git a/nomos-services/data-availability/src/lib.rs b/nomos-services/data-availability/src/lib.rs index 7ff0653cf..ffc72166d 100644 --- a/nomos-services/data-availability/src/lib.rs +++ b/nomos-services/data-availability/src/lib.rs @@ -10,6 +10,7 @@ use tokio::sync::oneshot::Sender; // internal use crate::backend::{DaBackend, DaError}; use crate::network::NetworkAdapter; +use nomos_core::blob::Blob; use nomos_network::NetworkService; use overwatch_rs::services::handle::ServiceStateHandle; use overwatch_rs::services::relay::{Relay, RelayMessage}; @@ -27,23 +28,29 @@ where network_relay: Relay>, } -pub enum DaMsg { +pub enum DaMsg { PendingBlobs { - reply_channel: Sender + Send>>, + reply_channel: Sender + Send>>, + }, + RemoveBlobs { + blobs: Box::Hash> + Send>, }, } -impl Debug for DaMsg { +impl Debug for DaMsg { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { DaMsg::PendingBlobs { .. } => { write!(f, "DaMsg::PendingBlobs") } + DaMsg::RemoveBlobs { .. } => { + write!(f, "DaMsg::RemoveBlobs") + } } } } -impl RelayMessage for DaMsg {} +impl RelayMessage for DaMsg {} impl ServiceData for DataAvailabilityService where @@ -64,6 +71,7 @@ where B: DaBackend + Send + Sync, B::Settings: Clone + Send + Sync + 'static, B::Blob: Send, + ::Hash: Debug + Send + Sync, // TODO: Reply type must be piped together, for now empty array. N: NetworkAdapter + Send + Sync, { @@ -122,14 +130,27 @@ async fn handle_new_blob(backend: &mut B, msg: DaMsg) -> Result<(), DaError> { +async fn handle_da_msg(backend: &mut B, msg: DaMsg) -> Result<(), DaError> +where + ::Hash: Debug, +{ match msg { DaMsg::PendingBlobs { reply_channel } => { - let pending_blobs = backend.pending_blobs().await; + let pending_blobs = backend.pending_blobs(); if reply_channel.send(pending_blobs).is_err() { tracing::debug!("Could not send pending blobs"); } } + DaMsg::RemoveBlobs { blobs } => { + let backend = &*backend; + futures::stream::iter(blobs) + .for_each_concurrent(None, |blob| async move { + if let Err(e) = backend.remove_blob(&blob).await { + tracing::debug!("Could not remove blob {blob:?} due to: {e:?}"); + } + }) + .await; + } } Ok(()) }