Skip to content

Commit

Permalink
refactor(core): Remove unused size for RangeWrite.
Browse files Browse the repository at this point in the history
  • Loading branch information
reswqa committed Jun 18, 2024
1 parent dbb9b9f commit 343491d
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 35 deletions.
34 changes: 14 additions & 20 deletions core/src/raw/oio/write/range_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub trait RangeWrite: Send + Sync + Unpin + 'static {
/// RangeWriter will call this API when:
///
/// - All the data has been written to the buffer and we can perform the upload at once.
fn write_once(&self, size: u64, body: Buffer) -> impl Future<Output = Result<()>> + MaybeSend;
fn write_once(&self, body: Buffer) -> impl Future<Output = Result<()>> + MaybeSend;

/// Initiate range the range write, the returning value is the location.
fn initiate_range(&self) -> impl Future<Output = Result<String>> + MaybeSend;
Expand All @@ -68,7 +68,6 @@ pub trait RangeWrite: Send + Sync + Unpin + 'static {
&self,
location: &str,
offset: u64,
size: u64,
body: Buffer,
) -> impl Future<Output = Result<()>> + MaybeSend;

Expand All @@ -77,7 +76,6 @@ pub trait RangeWrite: Send + Sync + Unpin + 'static {
&self,
location: &str,
offset: u64,
size: u64,
body: Buffer,
) -> impl Future<Output = Result<()>> + MaybeSend;

Expand Down Expand Up @@ -119,12 +117,10 @@ impl<W: RangeWrite> RangeWriter<W> {

tasks: ConcurrentTasks::new(executor, concurrent, |input| {
Box::pin(async move {
let fut = input.w.write_range(
&input.location,
input.offset,
input.bytes.len() as u64,
input.bytes.clone(),
);
let fut =
input
.w
.write_range(&input.location, input.offset, input.bytes.clone());
match input.executor.timeout() {
None => {
let result = fut.await;
Expand Down Expand Up @@ -197,12 +193,9 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {

async fn close(&mut self) -> Result<()> {
let Some(location) = self.location.clone() else {
let (size, body) = match self.cache.clone() {
Some(cache) => (cache.len(), cache),
None => (0, Buffer::new()),
};
let body = self.cache.clone().unwrap_or_default();
// Call write_once if there is no data in buffer and no location.
self.w.write_once(size as u64, body).await?;
self.w.write_once(body).await?;
self.cache = None;
return Ok(());
};
Expand All @@ -212,9 +205,7 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {

if let Some(buffer) = self.cache.clone() {
let offset = self.next_offset;
self.w
.complete_range(&location, offset, buffer.len() as u64, buffer)
.await?;
self.w.complete_range(&location, offset, buffer).await?;
self.cache = None;
}

Expand Down Expand Up @@ -265,8 +256,9 @@ mod tests {
}

impl RangeWrite for Arc<Mutex<TestWrite>> {
async fn write_once(&self, size: u64, _: Buffer) -> Result<()> {
async fn write_once(&self, body: Buffer) -> Result<()> {
let mut test = self.lock().unwrap();
let size = body.len() as u64;
test.length += size;
test.bytes.extend(0..size);

Expand All @@ -277,7 +269,7 @@ mod tests {
Ok("test".to_string())
}

async fn write_range(&self, _: &str, offset: u64, size: u64, _: Buffer) -> Result<()> {
async fn write_range(&self, _: &str, offset: u64, body: Buffer) -> Result<()> {
// Add an async sleep here to enforce some pending.
sleep(Duration::from_millis(50)).await;

Expand All @@ -289,6 +281,7 @@ mod tests {
}

let mut test = self.lock().unwrap();
let size = body.len() as u64;
test.length += size;

let input = (offset..offset + size).collect::<HashSet<_>>();
Expand All @@ -302,7 +295,7 @@ mod tests {
Ok(())
}

async fn complete_range(&self, _: &str, offset: u64, size: u64, _: Buffer) -> Result<()> {
async fn complete_range(&self, _: &str, offset: u64, body: Buffer) -> Result<()> {
// Add an async sleep here to enforce some pending.
sleep(Duration::from_millis(50)).await;

Expand All @@ -314,6 +307,7 @@ mod tests {
}

let mut test = self.lock().unwrap();
let size = body.len() as u64;
test.length += size;

let input = (offset..offset + size).collect::<HashSet<_>>();
Expand Down
21 changes: 6 additions & 15 deletions core/src/services/gcs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ impl GcsWriter {
}

impl oio::RangeWrite for GcsWriter {
async fn write_once(&self, size: u64, body: Buffer) -> Result<()> {
async fn write_once(&self, body: Buffer) -> Result<()> {
let size = body.len() as u64;
let mut req = self.core.gcs_insert_object_request(
&percent_encode_path(&self.path),
Some(size),
Expand Down Expand Up @@ -83,13 +84,8 @@ impl oio::RangeWrite for GcsWriter {
}
}

async fn write_range(
&self,
location: &str,
written: u64,
size: u64,
body: Buffer,
) -> Result<()> {
async fn write_range(&self, location: &str, written: u64, body: Buffer) -> Result<()> {
let size = body.len() as u64;
let mut req = self
.core
.gcs_upload_in_resumable_upload(location, size, written, body)?;
Expand All @@ -105,13 +101,8 @@ impl oio::RangeWrite for GcsWriter {
}
}

async fn complete_range(
&self,
location: &str,
written: u64,
size: u64,
body: Buffer,
) -> Result<()> {
async fn complete_range(&self, location: &str, written: u64, body: Buffer) -> Result<()> {
let size = body.len() as u64;
let resp = self
.core
.gcs_complete_resumable_upload(location, written, size, body)
Expand Down

0 comments on commit 343491d

Please sign in to comment.