Skip to content

Commit

Permalink
Merge branch 'master' of github.com:calimero-network/core into 370-ad…
Browse files Browse the repository at this point in the history
…d-near-app-side-library
  • Loading branch information
saeed-zil committed Jun 19, 2024
2 parents 3d38742 + 721d851 commit 66d3042
Show file tree
Hide file tree
Showing 12 changed files with 410 additions and 78 deletions.
176 changes: 105 additions & 71 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ axum = "0.7.4"
base64 = "0.22.0"
borsh = "1.3.1"
bs58 = "0.5.0"
bytes = "1.6.0"
camino = "1.1.6"
chrono = "0.4.37"
clap = "4.4.18"
Expand All @@ -46,6 +47,7 @@ fragile = "2.0.0"
futures-util = "0.3.30"
hex = "0.4.3"
libp2p = "0.53.2"
libp2p-stream = "0.1.0-alpha.1"
multiaddr = "0.18.1"
multibase = "0.9.1"
near-jsonrpc-client = "0.8.0"
Expand All @@ -70,6 +72,8 @@ serde_json = "1.0.113"
syn = "2.0"
thiserror = "1.0.56"
tokio = "1.35.1"
tokio-test = "0.4.4"
tokio-util = "0.7.11"
toml = "0.8.9"
tower = "0.4.13"
tower-http = "0.5.2"
Expand Down
26 changes: 25 additions & 1 deletion crates/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,38 @@ repository.workspace = true
license.workspace = true

[dependencies]
bytes.workspace = true
eyre.workspace = true
libp2p = { workspace = true, features = ["full"] }
futures-util.workspace = true
libp2p = { workspace = true, features = [
"dcutr",
"gossipsub",
"identify",
"kad",
"macros",
"mdns",
"noise",
"ping",
"quic",
"rendezvous",
"relay",
"tokio",
"tcp",
"tls",
"yamux",
] }
libp2p-stream.workspace = true
multiaddr.workspace = true
owo-colors.workspace = true
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-util = { workspace = true, features = ["codec", "compat"] }
tracing.workspace = true

calimero-node-primitives = { path = "../node-primitives" }
calimero-primitives = { path = "../primitives" }

[dev-dependencies]
tokio-test.workspace = true
13 changes: 12 additions & 1 deletion crates/network/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::collections::HashSet;
use libp2p::{gossipsub, Multiaddr, PeerId};
use tokio::sync::{mpsc, oneshot};

use crate::Command;
use crate::{stream, Command};

#[derive(Clone)]
pub struct NetworkClient {
Expand Down Expand Up @@ -63,6 +63,17 @@ impl NetworkClient {
receiver.await.expect("Sender not to be dropped.")
}

pub async fn open_stream(&self, peer_id: PeerId) -> eyre::Result<stream::Stream> {
let (sender, receiver) = oneshot::channel();

self.sender
.send(Command::OpenStream { peer_id, sender })
.await
.expect("Command receiver not to be dropped.");

receiver.await.expect("Sender not to be dropped.")
}

pub async fn peer_count(&self) -> usize {
let (sender, receiver) = oneshot::channel();

Expand Down
1 change: 1 addition & 0 deletions crates/network/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ impl EventLoop {
BehaviourEvent::Rendezvous(event) => {
events::EventHandler::handle(self, event).await
}
BehaviourEvent::Stream(()) => {}
},
SwarmEvent::NewListenAddr {
listener_id,
Expand Down
40 changes: 38 additions & 2 deletions crates/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub mod client;
pub mod config;
mod discovery;
mod events;
pub mod stream;
pub mod types;

use client::NetworkClient;
Expand All @@ -34,6 +35,7 @@ struct Behaviour {
ping: ping::Behaviour,
rendezvous: rendezvous::client::Behaviour,
relay: relay::client::Behaviour,
stream: libp2p_stream::Behaviour,
}

pub async fn run(
Expand Down Expand Up @@ -126,12 +128,25 @@ async fn init(
ping: ping::Behaviour::default(),
relay: relay_behaviour,
rendezvous: rendezvous::client::Behaviour::new(key.clone()),
stream: libp2p_stream::Behaviour::new(),
})?
.with_swarm_config(|cfg| {
cfg.with_idle_connection_timeout(tokio::time::Duration::from_secs(30))
})
.build();

let incoming_streams = match swarm
.behaviour()
.stream
.new_control()
.accept(stream::CALIMERO_STREAM_PROTOCOL)
{
Ok(incoming_streams) => incoming_streams,
Err(err) => {
eyre::bail!("Failed to setup control for stream protocol: {:?}", err)
}
};

let (command_sender, command_receiver) = mpsc::channel(32);
let (event_sender, event_receiver) = mpsc::channel(32);

Expand All @@ -141,13 +156,20 @@ async fn init(

let discovery = discovery::Discovery::new(&config.discovery.rendezvous);

let event_loop = EventLoop::new(swarm, command_receiver, event_sender, discovery);
let event_loop = EventLoop::new(
swarm,
incoming_streams,
command_receiver,
event_sender,
discovery,
);

Ok((client, event_receiver, event_loop))
}

pub(crate) struct EventLoop {
swarm: Swarm<Behaviour>,
incoming_streams: libp2p_stream::IncomingStreams,
command_receiver: mpsc::Receiver<Command>,
event_sender: mpsc::Sender<types::NetworkEvent>,
discovery: discovery::Discovery,
Expand All @@ -160,12 +182,14 @@ pub(crate) struct EventLoop {
impl EventLoop {
fn new(
swarm: Swarm<Behaviour>,
incoming_streams: libp2p_stream::IncomingStreams,
command_receiver: mpsc::Receiver<Command>,
event_sender: mpsc::Sender<types::NetworkEvent>,
discovery: discovery::Discovery,
) -> Self {
Self {
swarm,
incoming_streams,
command_receiver,
event_sender,
discovery,
Expand All @@ -182,7 +206,12 @@ impl EventLoop {

loop {
tokio::select! {
event = self.swarm.next() => self.handle_swarm_event(event.expect("Swarm stream to be infinite.")).await,
event = self.swarm.next() => {
self.handle_swarm_event(event.expect("Swarm stream to be infinite.")).await;
},
incoming_stream = self.incoming_streams.next() => {
self.handle_incoming_stream(incoming_stream.expect("Incoming streams to be infinite.")).await;
},
command = self.command_receiver.recv() => {
let Some(c) = command else { break };
self.handle_command(c).await;
Expand Down Expand Up @@ -260,6 +289,9 @@ impl EventLoop {

let _ = sender.send(Ok(topic));
}
Command::OpenStream { peer_id, sender } => {
let _ = sender.send(self.open_stream(peer_id).await.map_err(Into::into));
}
Command::PeerCount { sender } => {
let _ = sender.send(self.swarm.connected_peers().count());
}
Expand Down Expand Up @@ -329,6 +361,10 @@ enum Command {
topic: gossipsub::IdentTopic,
sender: oneshot::Sender<eyre::Result<gossipsub::IdentTopic>>,
},
OpenStream {
peer_id: PeerId,
sender: oneshot::Sender<eyre::Result<stream::Stream>>,
},
PeerCount {
sender: oneshot::Sender<usize>,
},
Expand Down
92 changes: 92 additions & 0 deletions crates/network/src/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use std::pin::Pin;
use std::task::{Context, Poll};

use futures_util::{Sink as FuturesSink, SinkExt, Stream as FuturesStream};
use libp2p::PeerId;
use tokio::io::BufStream;
use tokio_util::codec::Framed;
use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt};

use super::{types, EventLoop};

mod codec;

pub use codec::{CodecError, Message};

pub(crate) const CALIMERO_STREAM_PROTOCOL: libp2p::StreamProtocol =
libp2p::StreamProtocol::new("/calimero/stream/0.0.1");

#[derive(Debug)]
pub struct Stream {
inner: Framed<BufStream<Compat<libp2p::Stream>>, codec::MessageJsonCodec>,
}

impl Stream {
pub fn new(stream: libp2p::Stream) -> Self {
let stream = BufStream::new(stream.compat());
let stream = Framed::new(stream, codec::MessageJsonCodec);
Stream { inner: stream }
}
}

impl FuturesStream for Stream {
type Item = Result<Message, CodecError>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let inner = Pin::new(&mut self.get_mut().inner);
inner.poll_next(cx)
}
}

impl FuturesSink<Message> for Stream {
type Error = CodecError;

fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready_unpin(cx)
}

fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
self.inner.start_send_unpin(item)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_flush_unpin(cx)
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_close_unpin(cx)
}
}

impl EventLoop {
pub(crate) async fn handle_incoming_stream(
&mut self,
(peer, stream): (PeerId, libp2p::Stream),
) {
self.event_sender
.send(types::NetworkEvent::StreamOpened {
peer_id: peer,
stream: Stream::new(stream),
})
.await
.expect("Failed to send stream opened event");
}

pub(crate) async fn open_stream(&mut self, peer_id: PeerId) -> eyre::Result<Stream> {
let stream = match self
.swarm
.behaviour()
.stream
.new_control()
.open_stream(peer_id, CALIMERO_STREAM_PROTOCOL)
.await
{
Ok(stream) => stream,
Err(err) => {
eyre::bail!("Failed to open stream: {:?}", err);
}
};

Ok(Stream::new(stream))
}
}
Loading

0 comments on commit 66d3042

Please sign in to comment.