From f4d97edda657d470bab2d4021e9676b6a896372d Mon Sep 17 00:00:00 2001 From: danielsanchezq Date: Wed, 6 Sep 2023 15:48:35 +0200 Subject: [PATCH 1/6] Make da backend async --- nomos-services/data-availability/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nomos-services/data-availability/src/lib.rs b/nomos-services/data-availability/src/lib.rs index 4b3c8205b..156bdc333 100644 --- a/nomos-services/data-availability/src/lib.rs +++ b/nomos-services/data-availability/src/lib.rs @@ -139,7 +139,7 @@ where { match msg { DaMsg::PendingBlobs { reply_channel } => { - let pending_blobs = backend.pending_blobs(); + let pending_blobs = backend.pending_blobs().await; if reply_channel.send(pending_blobs).is_err() { tracing::debug!("Could not send pending blobs"); } From 2301f2fb03212b04f61310b43880153e80b6edc6 Mon Sep 17 00:00:00 2001 From: danielsanchezq Date: Wed, 6 Sep 2023 16:26:23 +0200 Subject: [PATCH 2/6] Added remove blob --- nomos-services/data-availability/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nomos-services/data-availability/src/lib.rs b/nomos-services/data-availability/src/lib.rs index 156bdc333..4b3c8205b 100644 --- a/nomos-services/data-availability/src/lib.rs +++ b/nomos-services/data-availability/src/lib.rs @@ -139,7 +139,7 @@ where { match msg { DaMsg::PendingBlobs { reply_channel } => { - let pending_blobs = backend.pending_blobs().await; + let pending_blobs = backend.pending_blobs(); if reply_channel.send(pending_blobs).is_err() { tracing::debug!("Could not send pending blobs"); } From 16a47f78b4a81a3d4c9953c3e3bcf9a4eabf3623 Mon Sep 17 00:00:00 2001 From: danielsanchezq Date: Thu, 7 Sep 2023 13:27:51 +0200 Subject: [PATCH 3/6] Added send_blob method to network adapter trait --- nomos-services/data-availability/src/network/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/nomos-services/data-availability/src/network/mod.rs b/nomos-services/data-availability/src/network/mod.rs index 38dd1f2fe..a4b69d5c8 100644 --- a/nomos-services/data-availability/src/network/mod.rs +++ b/nomos-services/data-availability/src/network/mod.rs @@ -22,4 +22,6 @@ pub trait NetworkAdapter { async fn blob_stream(&self) -> Box + Unpin + Send>; async fn send_attestation(&self, attestation: Self::Attestation) -> Result<(), DynError>; + + async fn send_blob(&self, blob: Self::Blob) -> Result<(), DynError>; } From b0d5caf741ccff1d109cf30653abb60fac4898f8 Mon Sep 17 00:00:00 2001 From: danielsanchezq Date: Thu, 7 Sep 2023 18:40:47 +0200 Subject: [PATCH 4/6] Added libp2p backend Implemented blob stream --- nomos-services/data-availability/Cargo.toml | 5 ++ .../src/network/adapters/libp2p.rs | 88 +++++++++++++++++++ .../src/network/adapters/mod.rs | 2 + .../data-availability/src/network/mod.rs | 8 +- 4 files changed, 101 insertions(+), 2 deletions(-) create mode 100644 nomos-services/data-availability/src/network/adapters/libp2p.rs create mode 100644 nomos-services/data-availability/src/network/adapters/mod.rs diff --git a/nomos-services/data-availability/Cargo.toml b/nomos-services/data-availability/Cargo.toml index 2a60bbc41..b1cfcbfdd 100644 --- a/nomos-services/data-availability/Cargo.toml +++ b/nomos-services/data-availability/Cargo.toml @@ -12,5 +12,10 @@ moka = { version = "0.11", features = ["future"] } nomos-core = { path = "../../nomos-core" } nomos-network = { path = "../network" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" } +serde = "1.0" tracing = "0.1" tokio = { version = "1", features = ["sync", "macros"] } +tokio-stream = "0.1" + +[features] +libp2p = ["nomos-network/nomos-libp2p"] \ No newline at end of file diff --git a/nomos-services/data-availability/src/network/adapters/libp2p.rs b/nomos-services/data-availability/src/network/adapters/libp2p.rs new file mode 100644 index 000000000..1faac59f4 --- /dev/null +++ b/nomos-services/data-availability/src/network/adapters/libp2p.rs @@ -0,0 +1,88 @@ +// std +use futures::Stream; +use overwatch_rs::DynError; +use std::marker::PhantomData; +// crates + +// internal +use crate::network::NetworkAdapter; +use nomos_core::wire; +use nomos_network::backends::libp2p::{Command, Event, EventKind, Libp2p, Message, TopicHash}; +use nomos_network::{NetworkMsg, NetworkService}; +use overwatch_rs::services::relay::OutboundRelay; +use overwatch_rs::services::ServiceData; +use serde::de::DeserializeOwned; +use serde::Serialize; +use tokio_stream::wrappers::BroadcastStream; +use tokio_stream::StreamExt; +use tracing::log::error; + +pub const NOMOS_DA_BLOBS_TOPIC: &str = "DaBlobs"; + +pub struct Libp2pAdapter { + network_relay: OutboundRelay< as ServiceData>::Message>, + _blob: PhantomData, + _attestation: PhantomData, +} + +#[async_trait::async_trait] +impl NetworkAdapter for Libp2pAdapter +where + B: Serialize + DeserializeOwned + Send + Sync + 'static, + A: Serialize + DeserializeOwned + Send + Sync + 'static, +{ + type Backend = Libp2p; + type Blob = B; + type Attestation = A; + + async fn new( + network_relay: OutboundRelay< as ServiceData>::Message>, + ) -> Self { + network_relay + .send(NetworkMsg::Process(Command::Subscribe( + NOMOS_DA_BLOBS_TOPIC.to_string(), + ))) + .await + .expect("Network backend should be ready"); + Self { + network_relay, + _blob: Default::default(), + _attestation: Default::default(), + } + } + + async fn blob_stream(&self) -> Box + Unpin + Send> { + let topic_hash = TopicHash::from_raw(NOMOS_DA_BLOBS_TOPIC); + let (sender, receiver) = tokio::sync::oneshot::channel(); + self.network_relay + .send(NetworkMsg::Subscribe { + kind: EventKind::Message, + sender, + }) + .await + .expect("Network backend should be ready"); + let receiver = receiver.await.unwrap(); + Box::new(Box::pin(BroadcastStream::new(receiver).filter_map( + move |msg| match msg { + Ok(Event::Message(Message { topic, data, .. })) if topic == topic_hash => { + match wire::deserialize::(&data) { + Ok(msg) => Some(msg), + Err(e) => { + error!("Unrecognized Blob message: {e}"); + None + } + } + } + _ => None, + }, + ))) + } + + async fn send_attestation(&self, attestation: Self::Attestation) -> Result<(), DynError> { + todo!() + } + + async fn send_blob(&self, blob: Self::Blob) -> Result<(), DynError> { + todo!() + } +} diff --git a/nomos-services/data-availability/src/network/adapters/mod.rs b/nomos-services/data-availability/src/network/adapters/mod.rs new file mode 100644 index 000000000..a22ade976 --- /dev/null +++ b/nomos-services/data-availability/src/network/adapters/mod.rs @@ -0,0 +1,2 @@ +#[cfg(feature = "libp2p")] +pub mod libp2p; diff --git a/nomos-services/data-availability/src/network/mod.rs b/nomos-services/data-availability/src/network/mod.rs index a4b69d5c8..d01328d17 100644 --- a/nomos-services/data-availability/src/network/mod.rs +++ b/nomos-services/data-availability/src/network/mod.rs @@ -1,3 +1,5 @@ +mod adapters; + // std // crates use futures::Stream; @@ -7,13 +9,15 @@ use nomos_network::NetworkService; use overwatch_rs::services::relay::OutboundRelay; use overwatch_rs::services::ServiceData; use overwatch_rs::DynError; +use serde::de::DeserializeOwned; +use serde::Serialize; #[async_trait::async_trait] pub trait NetworkAdapter { type Backend: NetworkBackend + 'static; - type Blob: Send + Sync + 'static; - type Attestation: Send + Sync + 'static; + type Blob: Serialize + DeserializeOwned + Send + Sync + 'static; + type Attestation: Serialize + DeserializeOwned + Send + Sync + 'static; async fn new( network_relay: OutboundRelay< as ServiceData>::Message>, From 233318b8ac5fc043d6db159d64d4050724663070 Mon Sep 17 00:00:00 2001 From: danielsanchezq Date: Thu, 7 Sep 2023 18:52:01 +0200 Subject: [PATCH 5/6] Implement attestation stream --- .../src/network/adapters/libp2p.rs | 66 +++++++++++-------- .../data-availability/src/network/mod.rs | 2 + 2 files changed, 42 insertions(+), 26 deletions(-) diff --git a/nomos-services/data-availability/src/network/adapters/libp2p.rs b/nomos-services/data-availability/src/network/adapters/libp2p.rs index 1faac59f4..3b63007b3 100644 --- a/nomos-services/data-availability/src/network/adapters/libp2p.rs +++ b/nomos-services/data-availability/src/network/adapters/libp2p.rs @@ -17,7 +17,7 @@ use tokio_stream::wrappers::BroadcastStream; use tokio_stream::StreamExt; use tracing::log::error; -pub const NOMOS_DA_BLOBS_TOPIC: &str = "DaBlobs"; +pub const NOMOS_DA_TOPIC: &str = "NomosDa"; pub struct Libp2pAdapter { network_relay: OutboundRelay< as ServiceData>::Message>, @@ -25,34 +25,13 @@ pub struct Libp2pAdapter { _attestation: PhantomData, } -#[async_trait::async_trait] -impl NetworkAdapter for Libp2pAdapter +impl Libp2pAdapter where B: Serialize + DeserializeOwned + Send + Sync + 'static, A: Serialize + DeserializeOwned + Send + Sync + 'static, { - type Backend = Libp2p; - type Blob = B; - type Attestation = A; - - async fn new( - network_relay: OutboundRelay< as ServiceData>::Message>, - ) -> Self { - network_relay - .send(NetworkMsg::Process(Command::Subscribe( - NOMOS_DA_BLOBS_TOPIC.to_string(), - ))) - .await - .expect("Network backend should be ready"); - Self { - network_relay, - _blob: Default::default(), - _attestation: Default::default(), - } - } - - async fn blob_stream(&self) -> Box + Unpin + Send> { - let topic_hash = TopicHash::from_raw(NOMOS_DA_BLOBS_TOPIC); + async fn stream_for(&self) -> Box + Unpin + Send> { + let topic_hash = TopicHash::from_raw(NOMOS_DA_TOPIC); let (sender, receiver) = tokio::sync::oneshot::channel(); self.network_relay .send(NetworkMsg::Subscribe { @@ -65,7 +44,7 @@ where Box::new(Box::pin(BroadcastStream::new(receiver).filter_map( move |msg| match msg { Ok(Event::Message(Message { topic, data, .. })) if topic == topic_hash => { - match wire::deserialize::(&data) { + match wire::deserialize::(&data) { Ok(msg) => Some(msg), Err(e) => { error!("Unrecognized Blob message: {e}"); @@ -77,6 +56,41 @@ where }, ))) } +} + +#[async_trait::async_trait] +impl NetworkAdapter for Libp2pAdapter +where + B: Serialize + DeserializeOwned + Send + Sync + 'static, + A: Serialize + DeserializeOwned + Send + Sync + 'static, +{ + type Backend = Libp2p; + type Blob = B; + type Attestation = A; + + async fn new( + network_relay: OutboundRelay< as ServiceData>::Message>, + ) -> Self { + network_relay + .send(NetworkMsg::Process(Command::Subscribe( + NOMOS_DA_TOPIC.to_string(), + ))) + .await + .expect("Network backend should be ready"); + Self { + network_relay, + _blob: Default::default(), + _attestation: Default::default(), + } + } + + async fn blob_stream(&self) -> Box + Unpin + Send> { + self.stream_for::().await + } + + async fn attestation_stream(&self) -> Box + Unpin + Send> { + self.stream_for::().await + } async fn send_attestation(&self, attestation: Self::Attestation) -> Result<(), DynError> { todo!() diff --git a/nomos-services/data-availability/src/network/mod.rs b/nomos-services/data-availability/src/network/mod.rs index d01328d17..d6b783d60 100644 --- a/nomos-services/data-availability/src/network/mod.rs +++ b/nomos-services/data-availability/src/network/mod.rs @@ -25,6 +25,8 @@ pub trait NetworkAdapter { async fn blob_stream(&self) -> Box + Unpin + Send>; + async fn attestation_stream(&self) -> Box + Unpin + Send>; + async fn send_attestation(&self, attestation: Self::Attestation) -> Result<(), DynError>; async fn send_blob(&self, blob: Self::Blob) -> Result<(), DynError>; From 11186f226b32901c5a79a5d838cd791feaa4040b Mon Sep 17 00:00:00 2001 From: danielsanchezq Date: Thu, 7 Sep 2023 19:04:07 +0200 Subject: [PATCH 6/6] Implement send methods --- .../src/network/adapters/libp2p.rs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/nomos-services/data-availability/src/network/adapters/libp2p.rs b/nomos-services/data-availability/src/network/adapters/libp2p.rs index 3b63007b3..0755acfbb 100644 --- a/nomos-services/data-availability/src/network/adapters/libp2p.rs +++ b/nomos-services/data-availability/src/network/adapters/libp2p.rs @@ -56,6 +56,17 @@ where }, ))) } + + async fn send(&self, data: E) -> Result<(), DynError> { + let message = wire::serialize(&data)?.into_boxed_slice(); + self.network_relay + .send(NetworkMsg::Process(Command::Broadcast { + topic: NOMOS_DA_TOPIC.to_string(), + message, + })) + .await + .map_err(|(e, _)| Box::new(e) as DynError) + } } #[async_trait::async_trait] @@ -93,10 +104,10 @@ where } async fn send_attestation(&self, attestation: Self::Attestation) -> Result<(), DynError> { - todo!() + self.send(attestation).await } async fn send_blob(&self, blob: Self::Blob) -> Result<(), DynError> { - todo!() + self.send(blob).await } }