Skip to content

Commit

Permalink
feat(core): Implement RFC-3574 Concurrent Stat In List (#3599)
Browse files Browse the repository at this point in the history
* feat: add concurrent stat in list

* feat: merge main syntax minor fix

* feat: modify for PR review

* chore: cargo fmt

* feat: fix early exit bug

* chore: cargo fmt & clippy

* feat: fix for PR review

* chore: switch to push_back and pop_front

* feature: discard the while loop with an if-else clause

* chore: prune code blocks

* chore: rename Task to StatTask

* chore: cargo fmt

* feat: fix miss changed push_back to pop_front

* feat: replace listing by Option<oio::Lister>
  • Loading branch information
morristai authored Nov 23, 2023
1 parent 3f868cc commit 2f98ea6
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 41 deletions.
21 changes: 21 additions & 0 deletions core/src/raw/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ pub struct OpList {
/// - `Some(v)` means exist.
/// - `None` means services doesn't have this meta.
metakey: FlagSet<Metakey>,
/// The concurrent of stat operations inside list operation.
/// Users could use this to control the number of concurrent stat operation when metadata is unknown.
///
/// - If this is set to <= 1, the list operation will be sequential.
/// - If this is set to > 1, the list operation will be concurrent,
/// and the maximum number of concurrent operations will be determined by this value.
concurrent: usize,
}

impl Default for OpList {
Expand All @@ -102,6 +109,7 @@ impl Default for OpList {
recursive: false,
// By default, we want to know what's the mode of this entry.
metakey: Metakey::Mode.into(),
concurrent: 1,
}
}
}
Expand Down Expand Up @@ -162,6 +170,19 @@ impl OpList {
pub fn metakey(&self) -> FlagSet<Metakey> {
self.metakey
}

/// Change the concurrent of this list operation.
///
/// The default concurrent is 1.
pub fn with_concurrent(mut self, concurrent: usize) -> Self {
self.concurrent = concurrent;
self
}

/// Get the concurrent of list operation.
pub fn concurrent(&self) -> usize {
self.concurrent
}
}

/// Args for `presign` operation.
Expand Down
117 changes: 76 additions & 41 deletions core/src/types/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,22 @@
// specific language governing permissions and limitations
// under the License.

use std::cmp;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::ready;
use std::task::Context;
use std::task::Poll;

use flagset::FlagSet;
use futures::future::BoxFuture;
use futures::FutureExt;
use futures::Stream;
use tokio::task::JoinHandle;

use crate::raw::oio::List;
use crate::raw::*;
use crate::*;

/// Future constructed by stating.
type StatFuture = BoxFuture<'static, (String, Result<RpStat>)>;

/// Lister is designed to list entries at given path in an asynchronous
/// manner.
///
Expand All @@ -41,14 +40,22 @@ type StatFuture = BoxFuture<'static, (String, Result<RpStat>)>;
/// - Lister will return `None` if there is no more entries or error has been returned.
pub struct Lister {
acc: FusedAccessor,
lister: oio::Lister,
lister: Option<oio::Lister>,
/// required_metakey is the metakey required by users.
required_metakey: FlagSet<Metakey>,

stating: Option<StatFuture>,
/// task_queue is used to store tasks that are run in concurrent.
task_queue: VecDeque<StatTask>,
errored: bool,
}

enum StatTask {
/// Handle is used to store the join handle of spawned task.
Handle(JoinHandle<(String, Result<RpStat>)>),
/// KnownEntry is used to store the entry that already contains the required metakey.
KnownEntry(Box<Option<(String, Metadata)>>),
}

/// # Safety
///
/// Lister will only be accessed by `&mut Self`
Expand All @@ -58,14 +65,16 @@ impl Lister {
/// Create a new lister.
pub(crate) async fn create(acc: FusedAccessor, path: &str, args: OpList) -> Result<Self> {
let required_metakey = args.metakey();
let concurrent = cmp::max(1, args.concurrent());

let (_, lister) = acc.list(path, args).await?;

Ok(Self {
acc,
lister,
lister: Some(lister),
required_metakey,

stating: None,
task_queue: VecDeque::with_capacity(concurrent),
errored: false,
})
}
Expand All @@ -80,44 +89,70 @@ impl Stream for Lister {
return Poll::Ready(None);
}

if let Some(fut) = self.stating.as_mut() {
let (path, rp) = ready!(fut.poll_unpin(cx));
let task_queue_len = self.task_queue.len();
let task_queue_cap = self.task_queue.capacity();

if let Some(lister) = self.lister.as_mut() {
if task_queue_len < task_queue_cap {
match lister.poll_next(cx) {
Poll::Pending => {
if task_queue_len == 0 {
return Poll::Pending;
}
}
Poll::Ready(Ok(Some(oe))) => {
let (path, metadata) = oe.into_entry().into_parts();
// TODO: we can optimize this by checking the provided metakey provided by services.
if metadata.contains_metakey(self.required_metakey) {
self.task_queue
.push_back(StatTask::KnownEntry(Box::new(Some((path, metadata)))));
} else {
let acc = self.acc.clone();
let fut = async move {
let res = acc.stat(&path, OpStat::default()).await;
(path, res)
};
self.task_queue
.push_back(StatTask::Handle(tokio::spawn(fut)));
}
}
Poll::Ready(Ok(None)) => {
self.lister = None;
}
Poll::Ready(Err(err)) => {
self.errored = true;
return Poll::Ready(Some(Err(err)));
}
};
}
}

// Make sure we will not poll this future again.
self.stating = None;
let metadata = match rp {
Ok(rp) => rp.into_metadata(),
Err(err) => {
self.errored = true;
return Poll::Ready(Some(Err(err)));
if let Some(handle) = self.task_queue.front_mut() {
return match handle {
StatTask::Handle(handle) => {
let (path, rp) = ready!(handle.poll_unpin(cx)).map_err(new_task_join_error)?;

match rp {
Ok(rp) => {
self.task_queue.pop_front();
let metadata = rp.into_metadata();
Poll::Ready(Some(Ok(Entry::new(path, metadata))))
}
Err(err) => {
self.errored = true;
Poll::Ready(Some(Err(err)))
}
}
}
StatTask::KnownEntry(entry) => {
let (path, metadata) = entry.take().expect("entry must be valid");
self.task_queue.pop_front();
Poll::Ready(Some(Ok(Entry::new(path, metadata))))
}
};

return Poll::Ready(Some(Ok(Entry::new(path, metadata))));
}

match ready!(self.lister.poll_next(cx)) {
Ok(Some(oe)) => {
let (path, metadata) = oe.into_entry().into_parts();
if metadata.contains_metakey(self.required_metakey) {
return Poll::Ready(Some(Ok(Entry::new(path, metadata))));
}

let acc = self.acc.clone();
let fut = async move {
let res = acc.stat(&path, OpStat::default()).await;

(path, res)
};
self.stating = Some(Box::pin(fut));
self.poll_next(cx)
}
Ok(None) => Poll::Ready(None),
Err(err) => {
self.errored = true;
Poll::Ready(Some(Err(err)))
}
}
Poll::Ready(None)
}
}

Expand Down
10 changes: 10 additions & 0 deletions core/src/types/operator/operator_futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,16 @@ impl FutureLister {
self.0 = self.0.map_args(|args| args.with_metakey(v));
self
}

/// Concurrent is used to control the number of concurrent stat requests.
///
/// If concurrent is set to <=1, the lister will perform stat requests sequentially.
///
/// The default concurrent is 1.
pub fn concurrent(mut self, v: usize) -> Self {
self.0 = self.0.map_args(|args| args.with_concurrent(v));
self
}
}

impl Future for FutureLister {
Expand Down

0 comments on commit 2f98ea6

Please sign in to comment.