-
I've recently tried switching transport in my libp2p application from TCP to QUIC. When I'm sending messages with
What could be the reason? Is my QUIC transport misconfigured? Rust-libp2p version: let gossipsub_config = gossipsub::ConfigBuilder::default()
.validate_messages()
.message_id_fn(gossipsub_msg_id)
.build()
.expect("config should be valid");
let behaviour = |keypair: &Keypair, relay| Behaviour {
relay,
identify: identify::Behaviour::new(
identify::Config::new(protocol, keypair.public())
.with_interval(Duration::from_secs(60))
.with_push_listen_addr_updates(true),
),
kademlia: kad::Behaviour::with_config(
local_peer_id,
MemoryStore::new(local_peer_id),
Default::default(),
),
dcutr: dcutr::Behaviour::new(local_peer_id),
request: request_response::Behaviour::new(
vec![(WORKER_PROTOCOL, ProtocolSupport::Full)],
request_response::Config::default().with_request_timeout(Duration::from_secs(60)),
),
gossipsub: gossipsub::Behaviour::new(
MessageAuthenticity::Signed(keypair.clone()),
gossipsub_config,
)
.unwrap(),
ping: ping::Behaviour::new(Default::default()),
autonat: autonat::Behaviour::new(local_peer_id, Default::default()),
};
// SwarmBuilder::with_tokio(transport, behaviour, local_peer_id).build()
Ok(SwarmBuilder::with_existing_identity(keypair)
.with_tokio()
.with_tcp(Default::default(), noise::Config::new, yamux::Config::default)?
.with_quic()
.with_dns()?
.with_relay_client(noise::Config::new, yamux::Config::default)?
.with_behaviour(behaviour)
.expect("infallible")
.with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(120)))
.build()) |
Beta Was this translation helpful? Give feedback.
Replies: 6 comments 3 replies
-
Potentially important further information: The setup in which I'm experiencing the issue is this: Node A (without public addr) connects to node B (with public addr). Then it sends message X (a few kB) to B. After a few seconds, B sends message Y (a few MB) to A. Then A sends message X again, and so on... The responses are sent straight after receiving messages, but they're empty
|
Beta Was this translation helpful? Give feedback.
-
My message codec (after changing response type to struct MessageCodec<T: MsgContent> {
_phantom: PhantomData<T>,
}
impl<T: MsgContent> Default for MessageCodec<T> {
fn default() -> Self {
Self {
_phantom: Default::default(),
}
}
}
impl<T: MsgContent> Clone for MessageCodec<T> {
fn clone(&self) -> Self {
*self
}
}
impl<T: MsgContent> Copy for MessageCodec<T> {}
#[async_trait]
impl<M: MsgContent> request_response::Codec for MessageCodec<M> {
type Protocol = &'static str;
type Request = M;
type Response = u8;
async fn read_request<T>(
&mut self,
_protocol: &Self::Protocol,
io: &mut T,
) -> std::io::Result<Self::Request>
where
T: futures::AsyncRead + Unpin + Send,
{
let mut buf = [0u8; std::mem::size_of::<usize>()];
io.read_exact(&mut buf).await?;
let msg_len = usize::from_be_bytes(buf);
let mut msg = M::new(msg_len);
io.read_exact(msg.as_mut_slice()).await?;
log::trace!("New message decoded: {}", String::from_utf8_lossy(msg.as_slice()));
Ok(msg)
}
async fn read_response<T>(
&mut self,
_protocol: &Self::Protocol,
io: &mut T,
) -> std::io::Result<Self::Response>
where
T: futures::AsyncRead + Unpin + Send,
{
let mut buf = [0u8; 1];
io.read_exact(&mut buf).await?;
Ok(buf[0])
}
async fn write_request<T>(
&mut self,
_protocol: &Self::Protocol,
io: &mut T,
req: Self::Request,
) -> std::io::Result<()>
where
T: futures::AsyncWrite + Unpin + Send,
{
let req = req.as_slice();
log::trace!("New message to encode: {}", String::from_utf8_lossy(req));
let msg_len = req.len().to_be_bytes();
io.write_all(&msg_len).await?;
io.write_all(req).await
}
async fn write_response<T>(
&mut self,
_protocol: &Self::Protocol,
io: &mut T,
res: Self::Response,
) -> std::io::Result<()>
where
T: futures::AsyncWrite + Unpin + Send,
{
io.write_all(&[res]).await
}
} |
Beta Was this translation helpful? Give feedback.
-
Alright, so the errors stopped when I removed length-prefixing requests and simply did this: async fn read_request<T>(
&mut self,
_protocol: &Self::Protocol,
io: &mut T,
) -> std::io::Result<Self::Request>
where
T: futures::AsyncRead + Unpin + Send,
{
let mut buf = Vec::new();
io.take(MAX_REQ_SIZE).read_to_end(&mut buf).await?;
Ok(M::from_vec(buf))
} I would still be very grateful if some could please 🙏🏻 🙏🏻 explain:
|
Beta Was this translation helpful? Give feedback.
-
@dhuseby Tagging |
Beta Was this translation helpful? Give feedback.
-
hi @Wiezzel are you till having this with the latest |
Beta Was this translation helpful? Give feedback.
-
@jxs I'm afraid I have another (quite possibly related) problem now. Quite often I see that an empty response is received. I didn't spot it right away, because protobuf decodes empty buffer without any errors. Here is my current codec code for the context (I see the use std::marker::PhantomData;
use async_trait::async_trait;
use futures::{AsyncReadExt, AsyncWriteExt};
use libp2p::request_response;
use prost::Message;
pub const ACK_SIZE: u64 = 4;
pub struct ProtoCodec<Req, Res> {
_req: PhantomData<Req>,
_res: PhantomData<Res>,
max_req_size: u64,
max_res_size: u64,
}
impl<Req, Res> ProtoCodec<Req, Res> {
pub fn new(max_req_size: u64, max_res_size: u64) -> Self {
Self {
_req: Default::default(),
_res: Default::default(),
max_req_size,
max_res_size,
}
}
}
impl<Req, Res> Clone for ProtoCodec<Req, Res> {
fn clone(&self) -> Self {
*self
}
}
impl<Req, Res> Copy for ProtoCodec<Req, Res> {}
#[async_trait]
impl<Req: Message + Default, Res: Message + Default> request_response::Codec
for ProtoCodec<Req, Res>
{
type Protocol = &'static str;
type Request = Req;
type Response = Res;
async fn read_request<T>(
&mut self,
_protocol: &Self::Protocol,
io: &mut T,
) -> std::io::Result<Self::Request>
where
T: futures::AsyncRead + Unpin + Send,
{
let mut buf = Vec::new();
io.take(self.max_req_size).read_to_end(&mut buf).await?;
if buf.is_empty() {
log::warn!("Received an empty request");
}
Ok(Req::decode(buf.as_slice())?)
}
async fn read_response<T>(
&mut self,
_protocol: &Self::Protocol,
io: &mut T,
) -> std::io::Result<Self::Response>
where
T: futures::AsyncRead + Unpin + Send,
{
let mut buf = Vec::new();
io.take(self.max_res_size).read_to_end(&mut buf).await?;
if buf.is_empty() {
log::warn!("Received an empty response");
}
Ok(Res::decode(buf.as_slice())?)
}
async fn write_request<T>(
&mut self,
_protocol: &Self::Protocol,
io: &mut T,
req: Self::Request,
) -> std::io::Result<()>
where
T: futures::AsyncWrite + Unpin + Send,
{
let buf = req.encode_to_vec();
io.write_all(buf.as_slice()).await
}
async fn write_response<T>(
&mut self,
_protocol: &Self::Protocol,
io: &mut T,
res: Self::Response,
) -> std::io::Result<()>
where
T: futures::AsyncWrite + Unpin + Send,
{
let buf = res.encode_to_vec();
io.write_all(buf.as_slice()).await
}
} |
Beta Was this translation helpful? Give feedback.
@jxs I've confirmed that with the latest
libp2p-quic
these errors are gone. 👍🏻