Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Da service backend #381

Merged
merged 9 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions nomos-core/src/blob/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use bytes::Bytes;
use std::hash::Hash;

pub type BlobHasher<T> = fn(&T) -> <T as Blob>::Hash;

pub trait Blob {
const HASHER: BlobHasher<Self>;
type Hash: Hash + Eq + Clone;
fn hash(&self) -> Self::Hash {
Self::HASHER(self)
}
fn as_bytes(&self) -> Bytes;
}
Comment on lines +4 to +13
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be expanded later if needed when we know more about how would it looks.

1 change: 1 addition & 0 deletions nomos-core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod account;
pub mod blob;
pub mod block;
pub mod crypto;
pub mod fountain;
Expand Down
2 changes: 2 additions & 0 deletions nomos-services/data-availability/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ edition = "2021"
[dependencies]
async-trait = "0.1"
futures = "0.3"
moka = { version = "0.11", features = ["future"] }
nomos-core = { path = "../../nomos-core" }
nomos-network = { path = "../network" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
tracing = "0.1"
Expand Down
73 changes: 73 additions & 0 deletions nomos-services/data-availability/src/backend/memory_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use crate::backend::{DaBackend, DaError};
use moka::future::{Cache, CacheBuilder};
use nomos_core::blob::Blob;
use std::time::Duration;

#[derive(Clone, Copy)]
pub struct BlobCacheSettings {
max_capacity: usize,
evicting_period: Duration,
}

pub struct BlobCache<H, B>(Cache<H, B>);

impl<B> BlobCache<B::Hash, B>
where
B: Clone + Blob + Send + Sync + 'static,
B::Hash: Send + Sync + 'static,
{
pub fn new(settings: BlobCacheSettings) -> Self {
let BlobCacheSettings {
max_capacity,
evicting_period,
} = settings;
let cache = CacheBuilder::new(max_capacity as u64)
.time_to_live(evicting_period)
// can we leverage this to evict really old blobs?
.time_to_idle(evicting_period)
Comment on lines +26 to +27
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed in https://github.com/logos-co/nomos-node/pull/376/files#r1317083701, the expectation is that the outer service retrieves pending blobs from this service, does something with those blobs, and after that, requests removing those blobs? If so, it seems it's okay to use time_to_idle. This will reset eviction timers whenever pending_blobs() is called, but I guess it's okay if it wouldn't take too long for the outer service to process and remove blobs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. So the idea here is that. Either a blob is pending for the whole life until is added into a block. Or is removed. (that is why is using the same evicting period as the time to live). But it would make sense that if it is not included it dies as well after some time.

.build();
Self(cache)
}

pub async fn add(&self, blob: B) {
self.0.insert(blob.hash(), blob).await
}

pub async fn remove(&self, hash: &B::Hash) {
self.0.remove(hash).await;
}

pub fn pending_blobs(&self) -> Box<dyn Iterator<Item = B> + Send> {
// bypass lifetime
let blobs: Vec<_> = self.0.iter().map(|t| t.1).collect();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moka clones by default.

Box::new(blobs.into_iter())
}
}

#[async_trait::async_trait]
impl<B> DaBackend for BlobCache<B::Hash, B>
where
B: Clone + Blob + Send + Sync + 'static,
B::Hash: Send + Sync + 'static,
{
type Settings = BlobCacheSettings;
type Blob = B;

fn new(settings: Self::Settings) -> Self {
BlobCache::new(settings)
}

async fn add_blob(&self, blob: Self::Blob) -> Result<(), DaError> {
self.add(blob).await;
Ok(())
}

async fn remove_blob(&self, blob: &<Self::Blob as Blob>::Hash) -> Result<(), DaError> {
self.remove(blob).await;
Ok(())
}

fn pending_blobs(&self) -> Box<dyn Iterator<Item = Self::Blob> + Send> {
BlobCache::pending_blobs(self)
}
}
10 changes: 8 additions & 2 deletions nomos-services/data-availability/src/backend/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
mod memory_cache;

use nomos_core::blob::Blob;
use overwatch_rs::DynError;

#[derive(Debug)]
pub enum DaError {
Dyn(DynError),
}

#[async_trait::async_trait]
pub trait DaBackend {
type Settings: Clone;

type Blob;
type Blob: Blob;

fn new(settings: Self::Settings) -> Self;

fn add_blob(&mut self, blob: Self::Blob) -> Result<(), DaError>;
async fn add_blob(&self, blob: Self::Blob) -> Result<(), DaError>;

async fn remove_blob(&self, blob: &<Self::Blob as Blob>::Hash) -> Result<(), DaError>;

fn pending_blobs(&self) -> Box<dyn Iterator<Item = Self::Blob> + Send>;
}
35 changes: 28 additions & 7 deletions nomos-services/data-availability/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tokio::sync::oneshot::Sender;
// internal
use crate::backend::{DaBackend, DaError};
use crate::network::NetworkAdapter;
use nomos_core::blob::Blob;
use nomos_network::NetworkService;
use overwatch_rs::services::handle::ServiceStateHandle;
use overwatch_rs::services::relay::{Relay, RelayMessage};
Expand All @@ -27,23 +28,29 @@ where
network_relay: Relay<NetworkService<N::Backend>>,
}

pub enum DaMsg<Blob> {
pub enum DaMsg<B: Blob> {
PendingBlobs {
reply_channel: Sender<Box<dyn Iterator<Item = Blob> + Send>>,
reply_channel: Sender<Box<dyn Iterator<Item = B> + Send>>,
},
RemoveBlobs {
blobs: Box<dyn Iterator<Item = <B as Blob>::Hash> + Send>,
},
}

impl<Blob: 'static> Debug for DaMsg<Blob> {
impl<B: Blob + 'static> Debug for DaMsg<B> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
DaMsg::PendingBlobs { .. } => {
write!(f, "DaMsg::PendingBlobs")
}
DaMsg::RemoveBlobs { .. } => {
write!(f, "DaMsg::RemoveBlobs")
}
}
}
}

impl<Blob: 'static> RelayMessage for DaMsg<Blob> {}
impl<B: Blob + 'static> RelayMessage for DaMsg<B> {}

impl<B, N> ServiceData for DataAvailabilityService<B, N>
where
Expand All @@ -61,9 +68,10 @@ where
#[async_trait::async_trait]
impl<B, N> ServiceCore for DataAvailabilityService<B, N>
where
B: DaBackend + Send,
B: DaBackend + Send + Sync,
B::Settings: Clone + Send + Sync + 'static,
B::Blob: Send,
<B::Blob as Blob>::Hash: Debug + Send + Sync,
// TODO: Reply type must be piped together, for now empty array.
N: NetworkAdapter<Blob = B::Blob, Reply = [u8; 32]> + Send + Sync,
{
Expand Down Expand Up @@ -115,21 +123,34 @@ async fn handle_new_blob<B: DaBackend, A: NetworkAdapter<Blob = B::Blob, Reply =
blob: B::Blob,
) -> Result<(), DaError> {
// we need to handle the reply (verification + signature)
backend.add_blob(blob)?;
backend.add_blob(blob).await?;
adapter
.send_attestation([0u8; 32])
.await
.map_err(DaError::Dyn)
youngjoon-lee marked this conversation as resolved.
Show resolved Hide resolved
}

async fn handle_da_msg<B: DaBackend>(backend: &mut B, msg: DaMsg<B::Blob>) -> Result<(), DaError> {
async fn handle_da_msg<B: DaBackend>(backend: &mut B, msg: DaMsg<B::Blob>) -> Result<(), DaError>
where
<B::Blob as Blob>::Hash: Debug,
{
match msg {
DaMsg::PendingBlobs { reply_channel } => {
let pending_blobs = backend.pending_blobs();
if reply_channel.send(pending_blobs).is_err() {
tracing::debug!("Could not send pending blobs");
}
}
DaMsg::RemoveBlobs { blobs } => {
let backend = &*backend;
futures::stream::iter(blobs)
.for_each_concurrent(None, |blob| async move {
if let Err(e) = backend.remove_blob(&blob).await {
tracing::debug!("Could not remove blob {blob:?} due to: {e:?}");
}
})
.await;
}
}
Ok(())
}
2 changes: 1 addition & 1 deletion nomos-services/data-availability/src/network/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
// std
// crates
use futures::Stream;
use overwatch_rs::DynError;
// internal
use nomos_network::backends::NetworkBackend;
use nomos_network::NetworkService;
use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData;
use overwatch_rs::DynError;

#[async_trait::async_trait]
pub trait NetworkAdapter {
Expand Down
Loading