Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Customizable Serialization support through Transmog #39

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ trust-dns = ["trust-dns-resolver"]

[dependencies]
async-trait = "0.1"
bincode = "1"
transmog = "0.1.0-dev.2"
bytes = "1"
ct-logs = "0.9"
flume = "0.10"
Expand Down Expand Up @@ -54,6 +54,8 @@ fabruic = { path = "", features = ["rcgen", "test"] }
quinn-proto = { version = "0.8", default-features = false }
tokio = { version = "1", features = ["macros"] }
trust-dns-proto = "0.21.0-alpha.4"
transmog-bincode = { version = "0.1.0-dev.2" }
transmog-pot = { version = "0.1.0-dev.2" }

[profile.release]
codegen-units = 1
Expand Down
5 changes: 3 additions & 2 deletions examples/basic.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::{Error, Result};
use fabruic::{Endpoint, KeyPair};
use futures_util::{future, StreamExt, TryFutureExt};
use transmog_bincode::Bincode;

const SERVER_NAME: &str = "test";
/// Some random port.
Expand Down Expand Up @@ -38,7 +39,7 @@ async fn main() -> Result<()> {
index,
connecting.remote_address()
);
let connection = connecting.accept::<()>().await?;
let connection = connecting.accept::<(), _>(Bincode::default()).await?;
println!(
"[client:{}] Successfully connected to {}",
index,
Expand Down Expand Up @@ -107,7 +108,7 @@ async fn main() -> Result<()> {
// every new incoming connections is handled in it's own task
connections.push(
tokio::spawn(async move {
let mut connection = connecting.accept::<()>().await?;
let mut connection = connecting.accept::<(), _>(Bincode::default()).await?;
println!("[server] New Connection: {}", connection.remote_address());

// start listening to new incoming streams
Expand Down
25 changes: 17 additions & 8 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@
// TODO: error type is becoming too big, split it up

use std::{
fmt::{self, Debug, Formatter},
fmt::{self, Debug, Display, Formatter},
io,
};

pub use bincode::ErrorKind;
use quinn::ConnectionClose;
pub use quinn::{ConnectError, ConnectionError, ReadError, WriteError};
use thiserror::Error;
Expand Down Expand Up @@ -207,7 +206,9 @@ impl From<ConnectionError> for Connecting {
reason,
}) if reason.as_ref() == b"peer doesn't support any known protocol"
&& error_code.to_string() == "the cryptographic handshake failed: error 120" =>
Self::ProtocolMismatch,
{
Self::ProtocolMismatch
}
other => Self::Connection(other),
}
}
Expand Down Expand Up @@ -241,23 +242,25 @@ pub enum Incoming {

/// Error receiving a message from a [`Receiver`](crate::Receiver).
#[derive(Debug, Error)]
#[allow(variant_size_differences)]
pub enum Receiver {
/// Failed to read from a [`Receiver`](crate::Receiver).
#[error("Error reading from `Receiver`: {0}")]
Read(#[from] ReadError),
/// Failed to [`Deserialize`](serde::Deserialize) a message from a
/// [`Receiver`](crate::Receiver).
#[error("Error deserializing a message from `Receiver`: {0}")]
Deserialize(#[from] ErrorKind),
Deserialize(Box<dyn SerializationError>),
}

/// Error sending a message to a [`Sender`](crate::Sender).
#[derive(Debug, Error)]
#[allow(variant_size_differences)]
pub enum Sender {
/// Failed to [`Serialize`](serde::Serialize) a message for a
/// [`Sender`](crate::Sender).
#[error("Error serializing a message to `Sender`: {0}")]
Serialize(ErrorKind),
Serialize(Box<dyn SerializationError>),
/// Failed to write to a [`Sender`](crate::Sender).
#[error("Error writing to `Sender`: {0}")]
Write(#[from] WriteError),
Expand All @@ -266,8 +269,14 @@ pub enum Sender {
Closed(#[from] AlreadyClosed),
}

impl From<Box<ErrorKind>> for Sender {
fn from(error: Box<ErrorKind>) -> Self {
Self::Serialize(*error)
impl Sender {
/// Returns a new instance after boxing `err`.
pub(crate) fn from_serialization<E: SerializationError>(err: E) -> Self {
Self::Serialize(Box::new(err))
}
}

/// An error raised from serialization.
pub trait SerializationError: Display + Debug + Send + Sync + 'static {}

impl<T> SerializationError for T where T: Display + Debug + Send + Sync + 'static {}
18 changes: 12 additions & 6 deletions src/quic/connection/connecting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
use std::net::SocketAddr;

use quinn::{crypto::rustls::HandshakeData, NewConnection};
use serde::{de::DeserializeOwned, Serialize};
use transmog::OwnedDeserializer;

use crate::{error, Connection};
use crate::{
error::{self, SerializationError},
Connection,
};

/// Represent's an intermediate state to build a [`Connection`].
#[must_use = "`Connecting` does nothing unless accepted with `Connecting::accept`"]
Expand Down Expand Up @@ -47,17 +50,20 @@ impl Connecting {
///
/// # Errors
/// [`error::Connecting`] if the [`Connection`] failed to be established.
pub async fn accept<T: DeserializeOwned + Serialize + Send + 'static>(
self,
) -> Result<Connection<T>, error::Connecting> {
pub async fn accept<T, F>(self, format: F) -> Result<Connection<T, F>, error::Connecting>
where
T: Send + 'static,
F: OwnedDeserializer<T> + Clone,
F::Error: SerializationError,
{
self.0
.await
.map(
|NewConnection {
connection,
bi_streams,
..
}| Connection::new(connection, bi_streams),
}| Connection::new(connection, bi_streams, format),
)
.map_err(error::Connecting::from)
}
Expand Down
65 changes: 50 additions & 15 deletions src/quic/connection/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,30 @@ use std::fmt::{self, Debug, Formatter};

use futures_util::StreamExt;
use quinn::{RecvStream, SendStream};
use serde::{de::DeserializeOwned, Serialize};
use transmog::{Format, OwnedDeserializer};

use super::ReceiverStream;
use crate::{error, Receiver, Sender};
use crate::{
error::{self, SerializationError},
Receiver, Sender,
};

/// An intermediate state to define which type to accept in this stream. See
/// [`accept_stream`](Self::accept).
#[must_use = "`Incoming` does nothing unless accepted with `Incoming::accept`"]
pub struct Incoming<T: DeserializeOwned> {
pub struct Incoming<T, F: OwnedDeserializer<T>> {
/// [`SendStream`] to build [`Sender`].
sender: SendStream,
/// [`RecvStream`] to build [`Receiver`].
receiver: ReceiverStream<T>,
receiver: ReceiverStream<T, F>,
/// Requested type.
r#type: Option<Result<T, error::Incoming>>,
}

impl<T: DeserializeOwned> Debug for Incoming<T> {
impl<T, F> Debug for Incoming<T, F>
where
F: OwnedDeserializer<T>,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Incoming")
.field("sender", &self.sender)
Expand All @@ -31,12 +37,16 @@ impl<T: DeserializeOwned> Debug for Incoming<T> {
}
}

impl<T: DeserializeOwned> Incoming<T> {
impl<T, F> Incoming<T, F>
where
F: OwnedDeserializer<T> + Clone,
F::Error: SerializationError,
{
/// Builds a new [`Incoming`] from raw [`quinn`] types.
pub(super) fn new(sender: SendStream, receiver: RecvStream) -> Self {
pub(super) fn new(sender: SendStream, receiver: RecvStream, format: F) -> Self {
Self {
sender,
receiver: ReceiverStream::new(receiver),
receiver: ReceiverStream::new(receiver, format),
r#type: None,
}
}
Expand Down Expand Up @@ -80,12 +90,37 @@ impl<T: DeserializeOwned> Incoming<T> {
/// - [`error::Incoming::Receiver`] if receiving the type information to the
/// peer failed, see [`error::Receiver`] for more details
/// - [`error::Incoming::Closed`] if the stream was closed
pub async fn accept<
S: DeserializeOwned + Serialize + Send + 'static,
R: DeserializeOwned + Serialize + Send + 'static,
>(
pub async fn accept<S: Send + 'static, R: Send + 'static>(
self,
) -> Result<(Sender<S, F>, Receiver<R>), error::Incoming>
where
F: OwnedDeserializer<R> + Format<'static, S> + 'static,
<F as Format<'static, S>>::Error: SerializationError,
<F as Format<'static, R>>::Error: SerializationError,
{
let format = self.receiver.format.clone();
self.accept_with_format(format).await
}

/// Accept the incoming stream with the given types, using `format` for
/// serializing the stream.
///
/// Use `S` and `R` to define which type this stream is sending and
/// receiving.
///
/// # Errors
/// - [`error::Incoming::Receiver`] if receiving the type information to the
/// peer failed, see [`error::Receiver`] for more details
/// - [`error::Incoming::Closed`] if the stream was closed
pub async fn accept_with_format<S: Send + 'static, R: Send + 'static, NewFormat>(
mut self,
) -> Result<(Sender<S>, Receiver<R>), error::Incoming> {
format: NewFormat,
) -> Result<(Sender<S, NewFormat>, Receiver<R>), error::Incoming>
where
NewFormat: OwnedDeserializer<R> + Format<'static, S> + Clone + 'static,
<NewFormat as Format<'static, S>>::Error: SerializationError,
<NewFormat as Format<'static, R>>::Error: SerializationError,
{
match self.r#type {
Some(Ok(_)) => (),
Some(Err(error)) => return Err(error),
Expand All @@ -100,8 +135,8 @@ impl<T: DeserializeOwned> Incoming<T> {
}
}

let sender = Sender::new(self.sender);
let receiver = Receiver::new(self.receiver.transmute());
let sender = Sender::new(self.sender, format.clone());
let receiver = Receiver::new(self.receiver.transmute(format));

Ok((sender, receiver))
}
Expand Down
Loading