Skip to content

Commit

Permalink
Added remove blob
Browse files Browse the repository at this point in the history
  • Loading branch information
danielSanchezQ committed Sep 7, 2023
1 parent 44ce2fb commit 87d4d99
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 10 deletions.
36 changes: 34 additions & 2 deletions nomos-services/data-availability/src/backend/memory_cache.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::backend::{DaBackend, DaError};
use moka::future::{Cache, CacheBuilder};
use nomos_core::blob::Blob;
use std::time::Duration;

#[derive(Clone, Copy)]
pub struct BlobCacheSettings {
max_capacity: usize,
evicting_period: Duration,
Expand Down Expand Up @@ -35,7 +37,37 @@ where
self.0.remove(hash).await;
}

pub async fn pending_blobs(&self) -> Box<dyn Iterator<Item = B> + Send + '_> {
Box::new(self.0.iter().map(|t| t.1))
pub fn pending_blobs(&self) -> Box<dyn Iterator<Item = B> + Send> {
// bypass lifetime
let blobs: Vec<_> = self.0.iter().map(|t| t.1).collect();
Box::new(blobs.into_iter())
}
}

#[async_trait::async_trait]
impl<B> DaBackend for BlobCache<B::Hash, B>
where
B: Clone + Blob + Send + Sync + 'static,
B::Hash: Send + Sync + 'static,
{
type Settings = BlobCacheSettings;
type Blob = B;

fn new(settings: Self::Settings) -> Self {
BlobCache::new(settings)
}

async fn add_blob(&self, blob: Self::Blob) -> Result<(), DaError> {
self.add(blob).await;
Ok(())
}

async fn remove_blob(&self, blob: &<Self::Blob as Blob>::Hash) -> Result<(), DaError> {
self.remove(blob).await;
Ok(())
}

fn pending_blobs(&self) -> Box<dyn Iterator<Item = Self::Blob> + Send> {
BlobCache::pending_blobs(self)
}
}
6 changes: 4 additions & 2 deletions nomos-services/data-availability/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ pub trait DaBackend {

fn new(settings: Self::Settings) -> Self;

async fn add_blob(&mut self, blob: Self::Blob) -> Result<(), DaError>;
async fn add_blob(&self, blob: Self::Blob) -> Result<(), DaError>;

async fn pending_blobs(&self) -> Box<dyn Iterator<Item = Self::Blob> + Send>;
async fn remove_blob(&self, blob: &<Self::Blob as Blob>::Hash) -> Result<(), DaError>;

fn pending_blobs(&self) -> Box<dyn Iterator<Item = Self::Blob> + Send>;
}
33 changes: 27 additions & 6 deletions nomos-services/data-availability/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tokio::sync::oneshot::Sender;
// internal
use crate::backend::{DaBackend, DaError};
use crate::network::NetworkAdapter;
use nomos_core::blob::Blob;
use nomos_network::NetworkService;
use overwatch_rs::services::handle::ServiceStateHandle;
use overwatch_rs::services::relay::{Relay, RelayMessage};
Expand All @@ -27,23 +28,29 @@ where
network_relay: Relay<NetworkService<N::Backend>>,
}

pub enum DaMsg<Blob> {
pub enum DaMsg<B: Blob> {
PendingBlobs {
reply_channel: Sender<Box<dyn Iterator<Item = Blob> + Send>>,
reply_channel: Sender<Box<dyn Iterator<Item = B> + Send>>,
},
RemoveBlobs {
blobs: Box<dyn Iterator<Item = <B as Blob>::Hash> + Send>,
},
}

impl<Blob: 'static> Debug for DaMsg<Blob> {
impl<B: Blob + 'static> Debug for DaMsg<B> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
DaMsg::PendingBlobs { .. } => {
write!(f, "DaMsg::PendingBlobs")
}
DaMsg::RemoveBlobs { .. } => {
write!(f, "DaMsg::RemoveBlobs")
}
}
}
}

impl<Blob: 'static> RelayMessage for DaMsg<Blob> {}
impl<B: Blob + 'static> RelayMessage for DaMsg<B> {}

impl<B, N> ServiceData for DataAvailabilityService<B, N>
where
Expand All @@ -64,6 +71,7 @@ where
B: DaBackend + Send + Sync,
B::Settings: Clone + Send + Sync + 'static,
B::Blob: Send,
<B::Blob as Blob>::Hash: Debug + Send + Sync,
// TODO: Reply type must be piped together, for now empty array.
N: NetworkAdapter<Blob = B::Blob, Reply = [u8; 32]> + Send + Sync,
{
Expand Down Expand Up @@ -122,14 +130,27 @@ async fn handle_new_blob<B: DaBackend, A: NetworkAdapter<Blob = B::Blob, Reply =
.map_err(DaError::Dyn)
}

async fn handle_da_msg<B: DaBackend>(backend: &mut B, msg: DaMsg<B::Blob>) -> Result<(), DaError> {
async fn handle_da_msg<B: DaBackend>(backend: &mut B, msg: DaMsg<B::Blob>) -> Result<(), DaError>
where
<B::Blob as Blob>::Hash: Debug,
{
match msg {
DaMsg::PendingBlobs { reply_channel } => {
let pending_blobs = backend.pending_blobs().await;
let pending_blobs = backend.pending_blobs();
if reply_channel.send(pending_blobs).is_err() {
tracing::debug!("Could not send pending blobs");
}
}
DaMsg::RemoveBlobs { blobs } => {
let backend = &*backend;
futures::stream::iter(blobs)
.for_each_concurrent(None, |blob| async move {
if let Err(e) = backend.remove_blob(&blob).await {
tracing::debug!("Could not remove blob {blob:?} due to: {e:?}");
}
})
.await;
}
}
Ok(())
}

0 comments on commit 87d4d99

Please sign in to comment.