Skip to content

Commit

Permalink
test(blocking): tests for blocking append (#3023)
Browse files Browse the repository at this point in the history
* test for blocking append & fix fs

Signed-off-by: suyanhanx <[email protected]>

* move path check to path build block

Signed-off-by: suyanhanx <[email protected]>

* path check when append enabled only

Signed-off-by: suyanhanx <[email protected]>

---------

Signed-off-by: suyanhanx <[email protected]>
  • Loading branch information
suyanhanx authored Sep 18, 2023
1 parent 82a865f commit 4b02228
Show file tree
Hide file tree
Showing 4 changed files with 262 additions and 8 deletions.
46 changes: 38 additions & 8 deletions core/src/services/fs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ impl FsBuilder {
}

/// Set temp dir for atomic write.
///
/// # Notes
///
/// - When append is enabled, we will not use atomic write
/// to avoid data loss and performance issue.
pub fn atomic_write_dir(&mut self, dir: &str) -> &mut Self {
self.atomic_write_dir = if dir.is_empty() {
None
Expand Down Expand Up @@ -367,15 +372,24 @@ impl Accessor for FsBackend {
let target_path = Self::ensure_write_abs_path(&self.root, path).await?;
let tmp_path =
Self::ensure_write_abs_path(atomic_write_dir, &tmp_file_of(path)).await?;
(target_path, Some(tmp_path))

// If the target file exists, we should append to the end of it directly.
if op.append()
&& tokio::fs::try_exists(&target_path)
.await
.map_err(parse_io_error)?
{
(target_path, None)
} else {
(target_path, Some(tmp_path))
}
} else {
let p = Self::ensure_write_abs_path(&self.root, path).await?;

(p, None)
};

let mut open_options = tokio::fs::OpenOptions::new();

open_options.create(true).write(true);
if op.append() {
open_options.append(true);
Expand Down Expand Up @@ -554,22 +568,38 @@ impl Accessor for FsBackend {
Ok((RpRead::new(end - start), r))
}

fn blocking_write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
fn blocking_write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.atomic_write_dir {
let target_path = Self::blocking_ensure_write_abs_path(&self.root, path)?;
let tmp_path =
Self::blocking_ensure_write_abs_path(atomic_write_dir, &tmp_file_of(path))?;
(target_path, Some(tmp_path))

// If the target file exists, we should append to the end of it directly.
if op.append()
&& Path::new(&target_path)
.try_exists()
.map_err(parse_io_error)?
{
(target_path, None)
} else {
(target_path, Some(tmp_path))
}
} else {
let p = Self::blocking_ensure_write_abs_path(&self.root, path)?;

(p, None)
};

let f = std::fs::OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
let mut f = std::fs::OpenOptions::new();
f.create(true).write(true);

if op.append() {
f.append(true);
} else {
f.truncate(true);
}

let f = f
.open(tmp_path.as_ref().unwrap_or(&target_path))
.map_err(parse_io_error)?;

Expand Down
1 change: 1 addition & 0 deletions core/src/services/fs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ impl<F> FsWriter<F> {
Self {
target_path,
tmp_path,

f: Some(f),
fut: None,
}
Expand Down
220 changes: 220 additions & 0 deletions core/tests/behavior/blocking_append.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::vec;

use anyhow::Result;
use sha2::Digest;
use sha2::Sha256;
use std::io::BufReader;
use std::io::Cursor;

use crate::*;

pub fn behavior_blocking_append_tests(op: &Operator) -> Vec<Trial> {
let cap = op.info().full_capability();

if !(cap.read && cap.write && cap.blocking && cap.write_can_append) {
return vec![];
}

blocking_trials!(
op,
test_blocking_append_create_append,
test_blocking_append_with_dir_path,
test_blocking_append_with_cache_control,
test_blocking_append_with_content_type,
test_blocking_append_with_content_disposition,
test_blocking_appender_std_copy,
test_blocking_fuzz_appender
)
}

/// Test append to a file must success.
pub fn test_blocking_append_create_append(op: BlockingOperator) -> Result<()> {
let path = uuid::Uuid::new_v4().to_string();
let (content_one, size_one) = gen_bytes();
let (content_two, size_two) = gen_bytes();

op.write_with(&path, content_one.clone())
.append(true)
.call()
.expect("append file first time must success");

op.write_with(&path, content_two.clone())
.append(true)
.call()
.expect("append to an existing file must success");

let bs = op.read(&path).expect("read file must success");

assert_eq!(bs.len(), size_one + size_two);
assert_eq!(bs[..size_one], content_one);
assert_eq!(bs[size_one..], content_two);

op.delete(&path).expect("delete file must success");

Ok(())
}

/// Test append to a directory path must fail.
pub fn test_blocking_append_with_dir_path(op: BlockingOperator) -> Result<()> {
let path = format!("{}/", uuid::Uuid::new_v4());
let (content, _) = gen_bytes();

let res = op.write_with(&path, content).append(true).call();
assert!(res.is_err());
assert_eq!(res.unwrap_err().kind(), ErrorKind::IsADirectory);

Ok(())
}

/// Test append with cache control must success.
pub fn test_blocking_append_with_cache_control(op: BlockingOperator) -> Result<()> {
if !op.info().full_capability().write_with_cache_control {
return Ok(());
}

let path = uuid::Uuid::new_v4().to_string();
let (content, _) = gen_bytes();

let target_cache_control = "no-cache, no-store, max-age=300";
op.write_with(&path, content)
.append(true)
.cache_control(target_cache_control)
.call()?;

let meta = op.stat(&path).expect("stat must succeed");
assert_eq!(meta.mode(), EntryMode::FILE);
assert_eq!(
meta.cache_control().expect("cache control must exist"),
target_cache_control
);

op.delete(&path).expect("delete must succeed");

Ok(())
}

/// Test append with content type must success.
pub fn test_blocking_append_with_content_type(op: BlockingOperator) -> Result<()> {
if !op.info().full_capability().write_with_content_type {
return Ok(());
}

let path = uuid::Uuid::new_v4().to_string();
let (content, size) = gen_bytes();

let target_content_type = "application/json";
op.write_with(&path, content)
.append(true)
.content_type(target_content_type)
.call()?;

let meta = op.stat(&path).expect("stat must succeed");
assert_eq!(meta.mode(), EntryMode::FILE);
assert_eq!(
meta.content_type().expect("content type must exist"),
target_content_type
);
assert_eq!(meta.content_length(), size as u64);

op.delete(&path).expect("delete must succeed");

Ok(())
}

/// Write a single file with content disposition should succeed.
pub fn test_blocking_append_with_content_disposition(op: BlockingOperator) -> Result<()> {
if !op.info().full_capability().write_with_content_disposition {
return Ok(());
}

let path = uuid::Uuid::new_v4().to_string();
let (content, size) = gen_bytes();

let target_content_disposition = "attachment; filename=\"filename.jpg\"";
op.write_with(&path, content)
.append(true)
.content_disposition(target_content_disposition)
.call()?;

let meta = op.stat(&path).expect("stat must succeed");
assert_eq!(meta.mode(), EntryMode::FILE);
assert_eq!(
meta.content_disposition().expect("content type must exist"),
target_content_disposition
);
assert_eq!(meta.content_length(), size as u64);

op.delete(&path).expect("delete must succeed");

Ok(())
}

/// Copy data from reader to writer
pub fn test_blocking_appender_std_copy(op: BlockingOperator) -> Result<()> {
let path = uuid::Uuid::new_v4().to_string();
let (content, size): (Vec<u8>, usize) =
gen_bytes_with_range(10 * 1024 * 1024..20 * 1024 * 1024);

let mut a = op.writer_with(&path).append(true).call()?;

// Wrap a buf reader here to make sure content is read in 1MiB chunks.
let mut cursor = BufReader::with_capacity(1024 * 1024, Cursor::new(content.clone()));
std::io::copy(&mut cursor, &mut a)?;
a.close()?;

let meta = op.stat(&path).expect("stat must succeed");
assert_eq!(meta.content_length(), size as u64);

let bs = op.read(&path)?;
assert_eq!(bs.len(), size, "read size");
assert_eq!(
format!("{:x}", Sha256::digest(&bs[..size])),
format!("{:x}", Sha256::digest(content)),
"read content"
);

op.delete(&path).expect("delete must succeed");
Ok(())
}

/// Test for fuzzing appender.
pub fn test_blocking_fuzz_appender(op: BlockingOperator) -> Result<()> {
let path = uuid::Uuid::new_v4().to_string();

let mut fuzzer = ObjectWriterFuzzer::new(&path, None);

let mut a = op.writer_with(&path).append(true).call()?;

for _ in 0..100 {
match fuzzer.fuzz() {
ObjectWriterAction::Write(bs) => {
a.write(bs)?;
}
}
}
a.close()?;

let content = op.read(&path)?;
fuzzer.check(&content);

op.delete(&path).expect("delete file must success");

Ok(())
}
3 changes: 3 additions & 0 deletions core/tests/behavior/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@ use rename::behavior_rename_tests;
use write::behavior_write_tests;

// Blocking test cases
mod blocking_append;
mod blocking_copy;
mod blocking_list;
mod blocking_read_only;
mod blocking_rename;
mod blocking_write;

use blocking_append::behavior_blocking_append_tests;
use blocking_copy::behavior_blocking_copy_tests;
use blocking_list::behavior_blocking_list_tests;
use blocking_read_only::behavior_blocking_read_only_tests;
Expand All @@ -73,6 +75,7 @@ fn behavior_test<B: Builder>() -> Vec<Trial> {

let mut trials = vec![];
// Blocking tests
trials.extend(behavior_blocking_append_tests(&operator));
trials.extend(behavior_blocking_copy_tests(&operator));
trials.extend(behavior_blocking_list_tests(&operator));
trials.extend(behavior_blocking_read_only_tests(&operator));
Expand Down

0 comments on commit 4b02228

Please sign in to comment.