Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Da service network #384

Merged
merged 6 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions nomos-services/data-availability/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,10 @@ moka = { version = "0.11", features = ["future"] }
nomos-core = { path = "../../nomos-core" }
nomos-network = { path = "../network" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
serde = "1.0"
tracing = "0.1"
tokio = { version = "1", features = ["sync", "macros"] }
tokio-stream = "0.1"

[features]
libp2p = ["nomos-network/nomos-libp2p"]
113 changes: 113 additions & 0 deletions nomos-services/data-availability/src/network/adapters/libp2p.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// std
use futures::Stream;
use overwatch_rs::DynError;
use std::marker::PhantomData;
// crates

// internal
use crate::network::NetworkAdapter;
use nomos_core::wire;
use nomos_network::backends::libp2p::{Command, Event, EventKind, Libp2p, Message, TopicHash};
use nomos_network::{NetworkMsg, NetworkService};
use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData;
use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::StreamExt;
use tracing::log::error;

pub const NOMOS_DA_TOPIC: &str = "NomosDa";

pub struct Libp2pAdapter<B, A> {
network_relay: OutboundRelay<<NetworkService<Libp2p> as ServiceData>::Message>,
_blob: PhantomData<B>,
_attestation: PhantomData<A>,
}

impl<B, A> Libp2pAdapter<B, A>
where
B: Serialize + DeserializeOwned + Send + Sync + 'static,
A: Serialize + DeserializeOwned + Send + Sync + 'static,
{
async fn stream_for<E: DeserializeOwned>(&self) -> Box<dyn Stream<Item = E> + Unpin + Send> {
let topic_hash = TopicHash::from_raw(NOMOS_DA_TOPIC);
let (sender, receiver) = tokio::sync::oneshot::channel();
self.network_relay
.send(NetworkMsg::Subscribe {
kind: EventKind::Message,
sender,
})
.await
.expect("Network backend should be ready");
let receiver = receiver.await.unwrap();
Box::new(Box::pin(BroadcastStream::new(receiver).filter_map(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

move |msg| match msg {
Ok(Event::Message(Message { topic, data, .. })) if topic == topic_hash => {
match wire::deserialize::<E>(&data) {
Ok(msg) => Some(msg),
Err(e) => {
error!("Unrecognized Blob message: {e}");
None

Check warning on line 51 in nomos-services/data-availability/src/network/adapters/libp2p.rs

View check run for this annotation

Codecov / codecov/patch

nomos-services/data-availability/src/network/adapters/libp2p.rs#L33-L51

Added lines #L33 - L51 were not covered by tests
}
}
}
_ => None,
},
)))
}

Check warning on line 58 in nomos-services/data-availability/src/network/adapters/libp2p.rs

View check run for this annotation

Codecov / codecov/patch

nomos-services/data-availability/src/network/adapters/libp2p.rs#L55-L58

Added lines #L55 - L58 were not covered by tests

async fn send<E: Serialize>(&self, data: E) -> Result<(), DynError> {
let message = wire::serialize(&data)?.into_boxed_slice();
self.network_relay
.send(NetworkMsg::Process(Command::Broadcast {
topic: NOMOS_DA_TOPIC.to_string(),
message,
}))
.await
.map_err(|(e, _)| Box::new(e) as DynError)
}

Check warning on line 69 in nomos-services/data-availability/src/network/adapters/libp2p.rs

View check run for this annotation

Codecov / codecov/patch

nomos-services/data-availability/src/network/adapters/libp2p.rs#L60-L69

Added lines #L60 - L69 were not covered by tests
}

#[async_trait::async_trait]
impl<B, A> NetworkAdapter for Libp2pAdapter<B, A>
where
B: Serialize + DeserializeOwned + Send + Sync + 'static,
A: Serialize + DeserializeOwned + Send + Sync + 'static,
{
type Backend = Libp2p;
type Blob = B;
type Attestation = A;

async fn new(
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
) -> Self {
network_relay
.send(NetworkMsg::Process(Command::Subscribe(
NOMOS_DA_TOPIC.to_string(),
)))
.await
.expect("Network backend should be ready");
Self {
network_relay,
_blob: Default::default(),
_attestation: Default::default(),
}
}

Check warning on line 96 in nomos-services/data-availability/src/network/adapters/libp2p.rs

View check run for this annotation

Codecov / codecov/patch

nomos-services/data-availability/src/network/adapters/libp2p.rs#L82-L96

Added lines #L82 - L96 were not covered by tests

async fn blob_stream(&self) -> Box<dyn Stream<Item = Self::Blob> + Unpin + Send> {
self.stream_for::<Self::Blob>().await
}

Check warning on line 100 in nomos-services/data-availability/src/network/adapters/libp2p.rs

View check run for this annotation

Codecov / codecov/patch

nomos-services/data-availability/src/network/adapters/libp2p.rs#L98-L100

Added lines #L98 - L100 were not covered by tests

async fn attestation_stream(&self) -> Box<dyn Stream<Item = Self::Attestation> + Unpin + Send> {
self.stream_for::<Self::Attestation>().await
}

Check warning on line 104 in nomos-services/data-availability/src/network/adapters/libp2p.rs

View check run for this annotation

Codecov / codecov/patch

nomos-services/data-availability/src/network/adapters/libp2p.rs#L102-L104

Added lines #L102 - L104 were not covered by tests

async fn send_attestation(&self, attestation: Self::Attestation) -> Result<(), DynError> {
self.send(attestation).await
}

Check warning on line 108 in nomos-services/data-availability/src/network/adapters/libp2p.rs

View check run for this annotation

Codecov / codecov/patch

nomos-services/data-availability/src/network/adapters/libp2p.rs#L106-L108

Added lines #L106 - L108 were not covered by tests

async fn send_blob(&self, blob: Self::Blob) -> Result<(), DynError> {
self.send(blob).await
}

Check warning on line 112 in nomos-services/data-availability/src/network/adapters/libp2p.rs

View check run for this annotation

Codecov / codecov/patch

nomos-services/data-availability/src/network/adapters/libp2p.rs#L110-L112

Added lines #L110 - L112 were not covered by tests
}
2 changes: 2 additions & 0 deletions nomos-services/data-availability/src/network/adapters/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#[cfg(feature = "libp2p")]
pub mod libp2p;
12 changes: 10 additions & 2 deletions nomos-services/data-availability/src/network/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mod adapters;

// std
// crates
use futures::Stream;
Expand All @@ -7,19 +9,25 @@ use nomos_network::NetworkService;
use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData;
use overwatch_rs::DynError;
use serde::de::DeserializeOwned;
use serde::Serialize;

#[async_trait::async_trait]
pub trait NetworkAdapter {
type Backend: NetworkBackend + 'static;

type Blob: Send + Sync + 'static;
type Attestation: Send + Sync + 'static;
type Blob: Serialize + DeserializeOwned + Send + Sync + 'static;
type Attestation: Serialize + DeserializeOwned + Send + Sync + 'static;

async fn new(
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
) -> Self;

async fn blob_stream(&self) -> Box<dyn Stream<Item = Self::Blob> + Unpin + Send>;

async fn attestation_stream(&self) -> Box<dyn Stream<Item = Self::Attestation> + Unpin + Send>;

async fn send_attestation(&self, attestation: Self::Attestation) -> Result<(), DynError>;

async fn send_blob(&self, blob: Self::Blob) -> Result<(), DynError>;
}
Loading