From 40f7b9ee29f8f6c684bcb5c461b93e76967f2a6f Mon Sep 17 00:00:00 2001 From: Florian Lemaitre Date: Tue, 2 Apr 2024 23:55:51 +0200 Subject: [PATCH] Proper support of directory listing --- src/client/dir.rs | 173 ++++++++++++++++++++++++++++++++++++++++++++++ src/client/mod.rs | 46 ++++++++++-- 2 files changed, 215 insertions(+), 4 deletions(-) create mode 100644 src/client/dir.rs diff --git a/src/client/dir.rs b/src/client/dir.rs new file mode 100644 index 0000000..44583ed --- /dev/null +++ b/src/client/dir.rs @@ -0,0 +1,173 @@ +// This file is part of the rusftp project +// +// Copyright (C) ANEO, 2024-2024. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{future::Future, pin::Pin, task::ready}; + +use futures::Stream; + +use crate::{Close, Handle, Name, NameEntry, ReadDir, SftpClient, Status, StatusCode}; + +/// Directory accessible remotely with SFTP +pub struct Dir { + client: SftpClient, + handle: Option, + buffer: Option, + pending: Option, +} + +type PendingOperation = Pin> + Send + Sync + 'static>>; + +impl Dir { + /// Create a directory from a raw [`Handle`]. + /// + /// The handle must come from `SftpClient::opendir`. + /// + /// The remote dir will be closed when the object is dropped. + /// + /// # Arguments + /// + /// * `handle` - Handle of the open directory + pub fn new(client: SftpClient, handle: Handle) -> Self { + Dir { + client, + handle: Some(handle), + buffer: Some(Default::default()), + pending: None, + } + } + + /// Create a closed directory. + /// + /// The directory cannot be opened by any means. + pub const fn new_closed() -> Self { + Dir { + client: SftpClient::new_stopped(), + handle: None, + buffer: None, + pending: None, + } + } +} + +pub static DIR_CLOSED: Dir = Dir::new_closed(); + +impl Dir { + /// Check whether the directory is closed + pub fn is_closed(&self) -> bool { + self.handle.is_none() + } + + /// Close the remote dir + pub fn close(&mut self) -> impl Future> { + let future = if let Some(handle) = std::mem::take(&mut self.handle) { + Some(self.client.request(Close { handle })) + } else { + // If the dir was already closed, no need to close it + None + }; + + async move { + match future { + Some(future) => future.await, + None => Ok(()), + } + } + } +} + +impl Drop for Dir { + fn drop(&mut self) { + _ = futures::executor::block_on(self.close()); + } +} + +impl std::fmt::Debug for Dir { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Dir") + .field("client", &self.client) + .field("handle", &self.handle) + .field("buffer", &self.buffer) + .field("pending", &self.pending.as_ref().map(|_| "...")) + .finish() + } +} + +impl Stream for Dir { + type Item = Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + // If end of file reached, resturn None + let Some(buffer) = &mut self.buffer else { + return std::task::Poll::Ready(None); + }; + + // If still some entries in the buffer, get next + if let Some(entry) = buffer.0.pop() { + return std::task::Poll::Ready(Some(Ok(entry))); + } + + let result = match &mut self.pending { + Some(pending) => ready!(pending.as_mut().poll(cx)), + None => { + let Some(handle) = &self.handle else { + // Force end of iteration + self.buffer = None; + return std::task::Poll::Ready(Some(Err(std::io::Error::new( + std::io::ErrorKind::BrokenPipe, + "Dir was closed", + )))); + }; + + let readdir = self.client.request(ReadDir { + handle: handle.clone(), + }); + let pending = self.pending.insert(Box::pin(readdir)); + + ready!(pending.as_mut().poll(cx)) + } + }; + + // If the read was successful, the buffer will be populated again + // Stop the iteration otherwise + self.buffer = None; + + let result = match result { + Ok(mut entries) => { + entries.reverse(); + + if let Some(entry) = entries.0.pop() { + self.buffer = Some(entries); + Some(Ok(entry)) + } else { + Some(Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "Found no more directory entries while it was expecting some", + ))) + } + } + Err(Status { + code: StatusCode::Eof, + .. + }) => None, + Err(err) => Some(Err(err.into())), + }; + + std::task::Poll::Ready(result) + } +} diff --git a/src/client/mod.rs b/src/client/mod.rs index 91ddacd..9b7615c 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -31,8 +31,10 @@ use crate::{ Stat, Status, StatusCode, Symlink, Write, }; +mod dir; mod file; +pub use dir::{Dir, DIR_CLOSED}; pub use file::{File, FILE_CLOSED}; /// SFTP client @@ -542,6 +544,24 @@ impl SftpClient { self.request(OpenDir { path: path.into() }) } + /// Open a directory for listing. + /// + /// Returns a [`Dir`] for the directory specified. + /// It implements [`Stream>`](futures::stream::Stream). + /// + /// # Arguments + /// + /// * `path` - Path of the directory to open (convertible to [`Path`]) + pub fn opendir>( + &self, + path: P, + ) -> impl Future> + Send + Sync + 'static { + let request = self.request(OpenDir { path: path.into() }); + let client = self.clone(); + + async move { Ok(Dir::new(client, request.await?)) } + } + /// Read a portion of an opened file. /// /// # Arguments @@ -586,20 +606,38 @@ impl SftpClient { /// Read a directory listing. /// + /// If you need an asynchronous [`Stream`](futures::stream::Stream), you can use `opendir()` instead + /// /// # Arguments /// /// * `path`: Path of the directory to list (convertible to [`Path`]) pub fn readdir>( &self, path: P, - ) -> impl Future> + Send + Sync + '_ { + ) -> impl Future> + Send + Sync + 'static { let dir = self.request(OpenDir { path: path.into() }); + let client = self.clone(); + let mut entries = Name::default(); async move { let handle = dir.await?; - let name = self.readdir_handle(handle.clone()).await?; - self.close(handle).await?; - Ok(name) + + loop { + match client.readdir_handle(handle.clone()).await { + Ok(mut chunk) => entries.0.append(&mut chunk.0), + Err(Status { + code: StatusCode::Eof, + .. + }) => break, + Err(err) => { + _ = client.close(handle).await; + return Err(err); + } + } + } + + client.close(handle).await?; + Ok(entries) } }