diff --git a/core/src/raw/oio/write/block_write.rs b/core/src/raw/oio/write/block_write.rs index a88b00cb460a..d3625c41d048 100644 --- a/core/src/raw/oio/write/block_write.rs +++ b/core/src/raw/oio/write/block_write.rs @@ -20,7 +20,6 @@ use std::sync::Arc; use std::task::Context; use std::task::Poll; -use bytes::Bytes; use futures::Future; use futures::FutureExt; use futures::StreamExt; @@ -110,7 +109,7 @@ pub trait BlockWrite: Send + Sync + Unpin + 'static { /// WriteBlockResult is the result returned by [`WriteBlockFuture`]. /// /// The error part will carries input `(block_id, bytes, err)` so caller can retry them. -type WriteBlockResult = std::result::Result; +type WriteBlockResult = std::result::Result; struct WriteBlockFuture(BoxedStaticFuture); @@ -132,9 +131,9 @@ impl Future for WriteBlockFuture { } impl WriteBlockFuture { - pub fn new(w: Arc, block_id: Uuid, bytes: Bytes) -> Self { + pub fn new(w: Arc, block_id: Uuid, bytes: Buffer) -> Self { let fut = async move { - w.write_block(block_id, bytes.len() as u64, Buffer::from(bytes.clone())) + w.write_block(block_id, bytes.len() as u64, bytes.clone()) .await // Return bytes while we got an error to allow retry. .map_err(|err| (block_id, bytes, err)) @@ -152,7 +151,7 @@ pub struct BlockWriter { w: Arc, block_ids: Vec, - cache: Option, + cache: Option, futures: ConcurrentFutures, } @@ -167,7 +166,7 @@ impl BlockWriter { } } - fn fill_cache(&mut self, bs: Bytes) -> usize { + fn fill_cache(&mut self, bs: Buffer) -> usize { let size = bs.len(); assert!(self.cache.is_none()); self.cache = Some(bs); @@ -184,7 +183,7 @@ where if self.futures.has_remaining() { // Fill cache with the first write. if self.cache.is_none() { - let size = self.fill_cache(bs.to_bytes()); + let size = self.fill_cache(bs); return Ok(size); } @@ -195,7 +194,7 @@ where cache, )); - let size = self.fill_cache(bs.to_bytes()); + let size = self.fill_cache(bs); return Ok(size); } else if let Some(res) = self.futures.next().await { match res { @@ -220,7 +219,7 @@ where // No write block has been sent. if self.futures.is_empty() && self.block_ids.is_empty() { let (size, body) = match self.cache.clone() { - Some(cache) => (cache.len(), Buffer::from(cache)), + Some(cache) => (cache.len(), cache), None => (0, Buffer::new()), }; self.w.write_once(size as u64, body).await?;