Skip to content

Commit

Permalink
feat(raw/oio): block write change to buffer (#4466)
Browse files Browse the repository at this point in the history
  • Loading branch information
hoslo authored Apr 11, 2024
1 parent 6dae58e commit bf79f73
Showing 1 changed file with 8 additions and 9 deletions.
17 changes: 8 additions & 9 deletions core/src/raw/oio/write/block_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::sync::Arc;
use std::task::Context;
use std::task::Poll;

use bytes::Bytes;
use futures::Future;
use futures::FutureExt;
use futures::StreamExt;
Expand Down Expand Up @@ -110,7 +109,7 @@ pub trait BlockWrite: Send + Sync + Unpin + 'static {
/// WriteBlockResult is the result returned by [`WriteBlockFuture`].
///
/// The error part will carries input `(block_id, bytes, err)` so caller can retry them.
type WriteBlockResult = std::result::Result<Uuid, (Uuid, Bytes, Error)>;
type WriteBlockResult = std::result::Result<Uuid, (Uuid, Buffer, Error)>;

struct WriteBlockFuture(BoxedStaticFuture<WriteBlockResult>);

Expand All @@ -132,9 +131,9 @@ impl Future for WriteBlockFuture {
}

impl WriteBlockFuture {
pub fn new<W: BlockWrite>(w: Arc<W>, block_id: Uuid, bytes: Bytes) -> Self {
pub fn new<W: BlockWrite>(w: Arc<W>, block_id: Uuid, bytes: Buffer) -> Self {
let fut = async move {
w.write_block(block_id, bytes.len() as u64, Buffer::from(bytes.clone()))
w.write_block(block_id, bytes.len() as u64, bytes.clone())
.await
// Return bytes while we got an error to allow retry.
.map_err(|err| (block_id, bytes, err))
Expand All @@ -152,7 +151,7 @@ pub struct BlockWriter<W: BlockWrite> {
w: Arc<W>,

block_ids: Vec<Uuid>,
cache: Option<Bytes>,
cache: Option<Buffer>,
futures: ConcurrentFutures<WriteBlockFuture>,
}

Expand All @@ -167,7 +166,7 @@ impl<W: BlockWrite> BlockWriter<W> {
}
}

fn fill_cache(&mut self, bs: Bytes) -> usize {
fn fill_cache(&mut self, bs: Buffer) -> usize {
let size = bs.len();
assert!(self.cache.is_none());
self.cache = Some(bs);
Expand All @@ -184,7 +183,7 @@ where
if self.futures.has_remaining() {
// Fill cache with the first write.
if self.cache.is_none() {
let size = self.fill_cache(bs.to_bytes());
let size = self.fill_cache(bs);
return Ok(size);
}

Expand All @@ -195,7 +194,7 @@ where
cache,
));

let size = self.fill_cache(bs.to_bytes());
let size = self.fill_cache(bs);
return Ok(size);
} else if let Some(res) = self.futures.next().await {
match res {
Expand All @@ -220,7 +219,7 @@ where
// No write block has been sent.
if self.futures.is_empty() && self.block_ids.is_empty() {
let (size, body) = match self.cache.clone() {
Some(cache) => (cache.len(), Buffer::from(cache)),
Some(cache) => (cache.len(), cache),
None => (0, Buffer::new()),
};
self.w.write_once(size as u64, body).await?;
Expand Down

0 comments on commit bf79f73

Please sign in to comment.