Skip to content

Commit

Permalink
code reformat
Browse files Browse the repository at this point in the history
  • Loading branch information
Eugeny committed Nov 1, 2023
1 parent 42369c7 commit abbdbf3
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 57 deletions.
3 changes: 2 additions & 1 deletion russh-keys/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ impl PublicKey {
if key_algo != b"ssh-ed25519" {
return Err(Error::CouldNotReadKey);
}
let Ok(key_bytes) = <&[u8; ed25519_dalek::PUBLIC_KEY_LENGTH]>::try_from(key_bytes) else {
let Ok(key_bytes) = <&[u8; ed25519_dalek::PUBLIC_KEY_LENGTH]>::try_from(key_bytes)
else {
return Err(Error::CouldNotReadKey);
};
ed25519_dalek::VerifyingKey::from_bytes(key_bytes)
Expand Down
6 changes: 2 additions & 4 deletions russh/src/channels/channel_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ use std::task::{Context, Poll};

use tokio::io::{AsyncRead, AsyncWrite};

use super::{
io::{ChannelRx, ChannelTx},
ChannelId, ChannelMsg,
};
use super::io::{ChannelRx, ChannelTx};
use super::{ChannelId, ChannelMsg};

/// AsyncRead/AsyncWrite wrapper for SSH Channels
pub struct ChannelStream<S>
Expand Down
2 changes: 1 addition & 1 deletion russh/src/channels/io/rx.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll, ready};
use std::task::{ready, Context, Poll};

use tokio::io::AsyncRead;

Expand Down
72 changes: 40 additions & 32 deletions russh/src/channels/io/tx.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
use std::io;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll, ready};
use std::task::{ready, Context, Poll};

use futures::FutureExt;
use russh_cryptovec::CryptoVec;
use tokio::io::AsyncWrite;
use tokio::sync::mpsc::{self, OwnedPermit, error::SendError};
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::{self, OwnedPermit};
use tokio::sync::{Mutex, OwnedMutexGuard};

use super::ChannelMsg;
use crate::ChannelId;

type BoxedThreadsafeFuture<T> = Pin<Box<dyn Sync + Send + std::future::Future<Output=T>>>;
type OwnedPermitFuture<S> = BoxedThreadsafeFuture<Result<(OwnedPermit<S>, ChannelMsg, usize), SendError<()>>>;
type BoxedThreadsafeFuture<T> = Pin<Box<dyn Sync + Send + std::future::Future<Output = T>>>;
type OwnedPermitFuture<S> =
BoxedThreadsafeFuture<Result<(OwnedPermit<S>, ChannelMsg, usize), SendError<()>>>;

pub struct ChannelTx<S> {
sender: mpsc::Sender<S>,
Expand Down Expand Up @@ -48,17 +50,17 @@ where
}
}

fn poll_mk_msg(
&mut self,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<(ChannelMsg, usize)> {
fn poll_mk_msg(&mut self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<(ChannelMsg, usize)> {
let window_size = self.window_size.clone();
let window_size_fut = self.window_size_fut.get_or_insert_with(|| Box::pin(window_size.lock_owned()));
let window_size_fut = self
.window_size_fut
.get_or_insert_with(|| Box::pin(window_size.lock_owned()));
let mut window_size = ready!(window_size_fut.poll_unpin(cx));
self.window_size_fut.take();

let writable = (self.max_packet_size).min(*window_size).min(buf.len() as u32) as usize;
let writable = (self.max_packet_size)
.min(*window_size)
.min(buf.len() as u32) as usize;
if writable == 0 {
// TODO fix this busywait
cx.waker().wake_by_ref();
Expand All @@ -82,19 +84,25 @@ where

fn activate(&mut self, msg: ChannelMsg, writable: usize) -> &mut OwnedPermitFuture<S> {
use futures::TryFutureExt;
self.send_fut.insert(Box::pin(self.sender.clone().reserve_owned().map_ok(move |p| (p, msg, writable))))
self.send_fut.insert(Box::pin(
self.sender
.clone()
.reserve_owned()
.map_ok(move |p| (p, msg, writable)),
))
}

fn handle_write_result(&mut self, r: Result<(OwnedPermit<S>, ChannelMsg, usize), SendError<()>>) -> Result<usize, io::Error> {
fn handle_write_result(
&mut self,
r: Result<(OwnedPermit<S>, ChannelMsg, usize), SendError<()>>,
) -> Result<usize, io::Error> {
self.send_fut = None;
match r {
Ok((permit, msg, writable)) => {
permit.send((self.id, msg).into());
Ok(writable)
}
Err(SendError(())) => {
Err(io::Error::new(io::ErrorKind::BrokenPipe, "channel closed"))
}
Err(SendError(())) => Err(io::Error::new(io::ErrorKind::BrokenPipe, "channel closed")),
}
}
}
Expand All @@ -109,13 +117,12 @@ where
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
let send_fut =
if let Some(x) = self.send_fut.as_mut() {
x
} else {
let (msg, writable) = ready!(self.poll_mk_msg(cx, buf));
self.activate(msg, writable)
};
let send_fut = if let Some(x) = self.send_fut.as_mut() {
x
} else {
let (msg, writable) = ready!(self.poll_mk_msg(cx, buf));
self.activate(msg, writable)
};
let r = ready!(send_fut.as_mut().poll_unpin(cx));
Poll::Ready(self.handle_write_result(r))
}
Expand All @@ -124,15 +131,16 @@ where
Poll::Ready(Ok(()))
}

fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
let send_fut =
if let Some(x) = self.send_fut.as_mut() {
x
} else {
self.activate(ChannelMsg::Eof, 0)
};
let r = ready!(send_fut.as_mut().poll_unpin(cx))
.map(|(p, _, _)| (p, ChannelMsg::Eof, 0));
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
let send_fut = if let Some(x) = self.send_fut.as_mut() {
x
} else {
self.activate(ChannelMsg::Eof, 0)
};
let r = ready!(send_fut.as_mut().poll_unpin(cx)).map(|(p, _, _)| (p, ChannelMsg::Eof, 0));
Poll::Ready(self.handle_write_result(r).map(drop))
}
}
6 changes: 2 additions & 4 deletions russh/src/channels/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use std::sync::Arc;

use russh_cryptovec::CryptoVec;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::sync::mpsc::{Sender, UnboundedReceiver};
use tokio::sync::Mutex;
use tokio::{
io::{AsyncRead, AsyncWrite},
sync::mpsc::{Sender, UnboundedReceiver},
};

use crate::{ChannelId, ChannelOpenFailure, Error, Pty, Sig};

Expand Down
5 changes: 4 additions & 1 deletion russh/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1276,7 +1276,10 @@ pub struct Config {

impl Config {
fn keepalive_deadline(&self) -> tokio::time::Instant {
tokio::time::Instant::now() + self.keepalive_interval.unwrap_or(std::time::Duration::from_secs(86400*365))
tokio::time::Instant::now()
+ self
.keepalive_interval
.unwrap_or(std::time::Duration::from_secs(86400 * 365))
}
}

Expand Down
5 changes: 1 addition & 4 deletions russh/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,7 @@ pub trait Handler: Sized {
user: &str,
public_key: &key::PublicKey,
) -> Result<(Self, Auth), Self::Error> {
Ok((
self,
Auth::Accept,
))
Ok((self, Auth::Accept))
}

/// Check authentication using the "publickey" method. This method
Expand Down
2 changes: 1 addition & 1 deletion russh/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,10 @@ mod compress {
mod channels {
use async_trait::async_trait;
use russh_cryptovec::CryptoVec;
use server::Session;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

use super::*;
use server::Session;

async fn test_session<RC, RS, CH, SH, F1, F2>(
client_handler: CH,
Expand Down
13 changes: 4 additions & 9 deletions russh/tests/test_data_stream.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
use std::{
net::{SocketAddr, TcpListener, TcpStream},
sync::Arc,
};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::sync::Arc;

use rand::RngCore;
use russh::{
client,
server::{self, Auth, Msg, Session},
Channel,
};
use russh::server::{self, Auth, Msg, Session};
use russh::{client, Channel};
use russh_keys::key;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

Expand Down

0 comments on commit abbdbf3

Please sign in to comment.