Skip to content

Commit

Permalink
refactor: AsyncReadWrite (#822)
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi committed Mar 24, 2024
1 parent 353a477 commit df348cf
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 24 deletions.
2 changes: 2 additions & 0 deletions rumqttc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

* rename `N` as `AsyncReadWrite` to describe usage.

### Deprecated

### Removed
Expand Down
4 changes: 2 additions & 2 deletions rumqttc/src/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{framed::Network, Transport};
use crate::{Incoming, MqttState, NetworkOptions, Packet, Request, StateError};
use crate::{MqttOptions, Outgoing};

use crate::framed::N;
use crate::framed::AsyncReadWrite;
use crate::mqttbytes::v4::*;
use flume::{bounded, Receiver, Sender};
use tokio::net::{lookup_host, TcpSocket, TcpStream};
Expand Down Expand Up @@ -369,7 +369,7 @@ async fn network_connect(
_ => options.broker_address(),
};

let tcp_stream: Box<dyn N> = {
let tcp_stream: Box<dyn AsyncReadWrite> = {
#[cfg(feature = "proxy")]
match options.proxy() {
Some(proxy) => proxy.connect(&domain, port, network_options).await?,
Expand Down
10 changes: 5 additions & 5 deletions rumqttc/src/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::io;
/// appropriate to achieve performance
pub struct Network {
/// Socket for IO
socket: Box<dyn N>,
socket: Box<dyn AsyncReadWrite>,
/// Buffered reads
read: BytesMut,
/// Maximum packet size
Expand All @@ -20,8 +20,8 @@ pub struct Network {
}

impl Network {
pub fn new(socket: impl N + 'static, max_incoming_size: usize) -> Network {
let socket = Box::new(socket) as Box<dyn N>;
pub fn new(socket: impl AsyncReadWrite + 'static, max_incoming_size: usize) -> Network {
let socket = Box::new(socket) as Box<dyn AsyncReadWrite>;
Network {
socket,
read: BytesMut::with_capacity(10 * 1024),
Expand Down Expand Up @@ -117,5 +117,5 @@ impl Network {
}
}

pub trait N: AsyncRead + AsyncWrite + Send + Unpin {}
impl<T> N for T where T: AsyncRead + AsyncWrite + Send + Unpin {}
pub trait AsyncReadWrite: AsyncRead + AsyncWrite + Send + Unpin {}
impl<T> AsyncReadWrite for T where T: AsyncRead + AsyncWrite + Send + Unpin {}
8 changes: 4 additions & 4 deletions rumqttc/src/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::eventloop::socket_connect;
use crate::framed::N;
use crate::framed::AsyncReadWrite;
use crate::NetworkOptions;

use std::io;
Expand Down Expand Up @@ -46,10 +46,10 @@ impl Proxy {
broker_addr: &str,
broker_port: u16,
network_options: NetworkOptions,
) -> Result<Box<dyn N>, ProxyError> {
) -> Result<Box<dyn AsyncReadWrite>, ProxyError> {
let proxy_addr = format!("{}:{}", self.addr, self.port);

let tcp: Box<dyn N> = Box::new(socket_connect(proxy_addr, network_options).await?);
let tcp: Box<dyn AsyncReadWrite> = Box::new(socket_connect(proxy_addr, network_options).await?);
let mut tcp = match self.ty {
ProxyType::Http => tcp,
#[cfg(any(feature = "use-rustls", feature = "use-native-tls"))]
Expand All @@ -67,7 +67,7 @@ impl ProxyAuth {
self,
host: &str,
port: u16,
tcp_stream: &mut Box<dyn N>,
tcp_stream: &mut Box<dyn AsyncReadWrite>,
) -> Result<(), ProxyError> {
match self {
Self::None => async_http_proxy::http_connect_tokio(tcp_stream, host, port).await?,
Expand Down
8 changes: 4 additions & 4 deletions rumqttc/src/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::io::{BufReader, Cursor};
#[cfg(feature = "use-rustls")]
use std::sync::Arc;

use crate::framed::N;
use crate::framed::AsyncReadWrite;
use crate::TlsConfiguration;

#[cfg(feature = "use-native-tls")]
Expand Down Expand Up @@ -166,9 +166,9 @@ pub async fn tls_connect(
addr: &str,
_port: u16,
tls_config: &TlsConfiguration,
tcp: Box<dyn N>,
) -> Result<Box<dyn N>, Error> {
let tls: Box<dyn N> = match tls_config {
tcp: Box<dyn AsyncReadWrite>,
) -> Result<Box<dyn AsyncReadWrite>, Error> {
let tls: Box<dyn AsyncReadWrite> = match tls_config {
#[cfg(feature = "use-rustls")]
TlsConfiguration::Simple { .. } | TlsConfiguration::Rustls(_) => {
let connector = rustls_connector(tls_config).await?;
Expand Down
4 changes: 2 additions & 2 deletions rumqttc/src/v5/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::framed::Network;
use super::mqttbytes::v5::*;
use super::{Incoming, MqttOptions, MqttState, Outgoing, Request, StateError, Transport};
use crate::eventloop::socket_connect;
use crate::framed::N;
use crate::framed::AsyncReadWrite;

use flume::{bounded, Receiver, Sender};
use tokio::select;
Expand Down Expand Up @@ -304,7 +304,7 @@ async fn network_connect(options: &MqttOptions) -> Result<Network, ConnectionErr
_ => options.broker_address(),
};

let tcp_stream: Box<dyn N> = {
let tcp_stream: Box<dyn AsyncReadWrite> = {
#[cfg(feature = "proxy")]
match options.proxy() {
Some(proxy) => {
Expand Down
13 changes: 6 additions & 7 deletions rumqttc/src/v5/framed.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use bytes::BytesMut;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

use crate::framed::AsyncReadWrite;

use super::mqttbytes;
use super::mqttbytes::v5::{Connect, Login, Packet};
Expand All @@ -11,7 +13,7 @@ use std::io;
/// appropriate to achieve performance
pub struct Network {
/// Socket for IO
socket: Box<dyn N>,
socket: Box<dyn AsyncReadWrite>,
/// Buffered reads
read: BytesMut,
/// Maximum packet size
Expand All @@ -21,8 +23,8 @@ pub struct Network {
}

impl Network {
pub fn new(socket: impl N + 'static, max_incoming_size: Option<usize>) -> Network {
let socket = Box::new(socket) as Box<dyn N>;
pub fn new(socket: impl AsyncReadWrite + 'static, max_incoming_size: Option<usize>) -> Network {
let socket = Box::new(socket) as Box<dyn AsyncReadWrite>;
Network {
socket,
read: BytesMut::with_capacity(10 * 1024),
Expand Down Expand Up @@ -127,6 +129,3 @@ impl Network {
Ok(())
}
}

pub trait N: AsyncRead + AsyncWrite + Send + Unpin {}
impl<T> N for T where T: AsyncRead + AsyncWrite + Send + Unpin {}

0 comments on commit df348cf

Please sign in to comment.