Skip to content

Commit

Permalink
Proper support of directory listing
Browse files Browse the repository at this point in the history
  • Loading branch information
lemaitre-aneo committed Apr 2, 2024
1 parent 8977c82 commit 40f7b9e
Show file tree
Hide file tree
Showing 2 changed files with 215 additions and 4 deletions.
173 changes: 173 additions & 0 deletions src/client/dir.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// This file is part of the rusftp project
//
// Copyright (C) ANEO, 2024-2024. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License")
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{future::Future, pin::Pin, task::ready};

use futures::Stream;

use crate::{Close, Handle, Name, NameEntry, ReadDir, SftpClient, Status, StatusCode};

/// Directory accessible remotely with SFTP
pub struct Dir {
client: SftpClient,
handle: Option<Handle>,
buffer: Option<Name>,
pending: Option<PendingOperation>,
}

type PendingOperation = Pin<Box<dyn Future<Output = Result<Name, Status>> + Send + Sync + 'static>>;

impl Dir {
/// Create a directory from a raw [`Handle`].
///
/// The handle must come from `SftpClient::opendir`.
///
/// The remote dir will be closed when the object is dropped.
///
/// # Arguments
///
/// * `handle` - Handle of the open directory
pub fn new(client: SftpClient, handle: Handle) -> Self {
Dir {
client,
handle: Some(handle),
buffer: Some(Default::default()),
pending: None,
}
}

/// Create a closed directory.
///
/// The directory cannot be opened by any means.
pub const fn new_closed() -> Self {
Dir {
client: SftpClient::new_stopped(),
handle: None,
buffer: None,
pending: None,
}
}
}

pub static DIR_CLOSED: Dir = Dir::new_closed();

impl Dir {
/// Check whether the directory is closed
pub fn is_closed(&self) -> bool {
self.handle.is_none()
}

/// Close the remote dir
pub fn close(&mut self) -> impl Future<Output = Result<(), Status>> {
let future = if let Some(handle) = std::mem::take(&mut self.handle) {
Some(self.client.request(Close { handle }))
} else {
// If the dir was already closed, no need to close it
None
};

async move {
match future {
Some(future) => future.await,
None => Ok(()),
}
}
}
}

impl Drop for Dir {
fn drop(&mut self) {
_ = futures::executor::block_on(self.close());
}
}

impl std::fmt::Debug for Dir {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Dir")
.field("client", &self.client)
.field("handle", &self.handle)
.field("buffer", &self.buffer)
.field("pending", &self.pending.as_ref().map(|_| "..."))
.finish()
}
}

impl Stream for Dir {
type Item = Result<NameEntry, std::io::Error>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
// If end of file reached, resturn None
let Some(buffer) = &mut self.buffer else {
return std::task::Poll::Ready(None);
};

// If still some entries in the buffer, get next
if let Some(entry) = buffer.0.pop() {
return std::task::Poll::Ready(Some(Ok(entry)));
}

let result = match &mut self.pending {
Some(pending) => ready!(pending.as_mut().poll(cx)),
None => {
let Some(handle) = &self.handle else {
// Force end of iteration
self.buffer = None;
return std::task::Poll::Ready(Some(Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"Dir was closed",
))));
};

let readdir = self.client.request(ReadDir {
handle: handle.clone(),
});
let pending = self.pending.insert(Box::pin(readdir));

ready!(pending.as_mut().poll(cx))
}
};

// If the read was successful, the buffer will be populated again
// Stop the iteration otherwise
self.buffer = None;

let result = match result {
Ok(mut entries) => {
entries.reverse();

if let Some(entry) = entries.0.pop() {
self.buffer = Some(entries);
Some(Ok(entry))
} else {
Some(Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"Found no more directory entries while it was expecting some",
)))
}
}
Err(Status {
code: StatusCode::Eof,
..
}) => None,
Err(err) => Some(Err(err.into())),
};

std::task::Poll::Ready(result)
}
}
46 changes: 42 additions & 4 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ use crate::{
Stat, Status, StatusCode, Symlink, Write,
};

mod dir;
mod file;

pub use dir::{Dir, DIR_CLOSED};
pub use file::{File, FILE_CLOSED};

/// SFTP client
Expand Down Expand Up @@ -542,6 +544,24 @@ impl SftpClient {
self.request(OpenDir { path: path.into() })
}

/// Open a directory for listing.
///
/// Returns a [`Dir`] for the directory specified.
/// It implements [`Stream<Item = Result<NameEntry, ...>>`](futures::stream::Stream).
///
/// # Arguments
///
/// * `path` - Path of the directory to open (convertible to [`Path`])
pub fn opendir<P: Into<Path>>(
&self,
path: P,
) -> impl Future<Output = Result<Dir, Status>> + Send + Sync + 'static {
let request = self.request(OpenDir { path: path.into() });
let client = self.clone();

async move { Ok(Dir::new(client, request.await?)) }
}

/// Read a portion of an opened file.
///
/// # Arguments
Expand Down Expand Up @@ -586,20 +606,38 @@ impl SftpClient {

/// Read a directory listing.
///
/// If you need an asynchronous [`Stream`](futures::stream::Stream), you can use `opendir()` instead
///
/// # Arguments
///
/// * `path`: Path of the directory to list (convertible to [`Path`])
pub fn readdir<P: Into<Path>>(
&self,
path: P,
) -> impl Future<Output = Result<Name, Status>> + Send + Sync + '_ {
) -> impl Future<Output = Result<Name, Status>> + Send + Sync + 'static {
let dir = self.request(OpenDir { path: path.into() });
let client = self.clone();
let mut entries = Name::default();

async move {
let handle = dir.await?;
let name = self.readdir_handle(handle.clone()).await?;
self.close(handle).await?;
Ok(name)

loop {
match client.readdir_handle(handle.clone()).await {
Ok(mut chunk) => entries.0.append(&mut chunk.0),
Err(Status {
code: StatusCode::Eof,
..
}) => break,
Err(err) => {
_ = client.close(handle).await;
return Err(err);
}
}
}

client.close(handle).await?;
Ok(entries)
}
}

Expand Down

0 comments on commit 40f7b9e

Please sign in to comment.