Skip to content

Commit

Permalink
Polish Sink
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo committed Dec 5, 2024
1 parent fdc3049 commit 72e7e8a
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 7 deletions.
6 changes: 3 additions & 3 deletions core/src/types/delete/deleter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ use std::pin::pin;
/// ```rust
/// use opendal::Operator;
/// use opendal::Result;
/// use futures::stream;
/// use futures::{stream, Sink};
/// use futures::SinkExt;
///
/// async fn example(op: Operator) -> Result<()> {
/// let mut sink = op.deleter().await?.into_sink();
/// sink.send_all(&mut stream::iter(vec!["path/to/file"])).await?;
/// sink.send("path/to/file").await?;
/// sink.close().await?;
/// Ok(())
/// }
Expand Down Expand Up @@ -210,7 +210,7 @@ impl Deleter {
}

/// Convert the deleter into a sink.
pub fn into_sink(self) -> FuturesDeleteSink {
pub fn into_sink<T: IntoDeleteInput>(self) -> FuturesDeleteSink<T> {
FuturesDeleteSink::new(self)
}
}
9 changes: 6 additions & 3 deletions core/src/types/delete/futures_delete_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
use crate::raw::*;
use crate::*;
use futures::Sink;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{ready, Context, Poll};

/// FuturesDeleteSink is a sink that generated by [`Deleter`]
pub struct FuturesDeleteSink {
pub struct FuturesDeleteSink<T: IntoDeleteInput> {
state: State,
_phantom: PhantomData<T>,
}

enum State {
Expand All @@ -33,16 +35,17 @@ enum State {
Close(BoxedStaticFuture<(Deleter, Result<()>)>),
}

impl FuturesDeleteSink {
impl<T: IntoDeleteInput> FuturesDeleteSink<T> {
#[inline]
pub(super) fn new(deleter: Deleter) -> Self {
Self {
state: State::Idle(Some(deleter)),
_phantom: PhantomData,
}
}
}

impl<T: IntoDeleteInput> Sink<T> for FuturesDeleteSink {
impl<T: IntoDeleteInput> Sink<T> for FuturesDeleteSink<T> {
type Error = Error;

fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
Expand Down
2 changes: 1 addition & 1 deletion core/src/types/delete/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub struct DeleteInput {
}

/// IntoDeleteInput is a helper trait that makes it easier for users to play with `Deleter`.
pub trait IntoDeleteInput: Send + Sync {
pub trait IntoDeleteInput: Send + Sync + Unpin {
/// Convert `self` into a `DeleteInput`.
fn into_delete_input(self) -> DeleteInput;
}
Expand Down

0 comments on commit 72e7e8a

Please sign in to comment.