From 4b02228f7fe9a7c0f21a1660fee95716910c7a0a Mon Sep 17 00:00:00 2001 From: Suyan Date: Mon, 18 Sep 2023 13:16:11 +0800 Subject: [PATCH] test(blocking): tests for blocking append (#3023) * test for blocking append & fix fs Signed-off-by: suyanhanx * move path check to path build block Signed-off-by: suyanhanx * path check when append enabled only Signed-off-by: suyanhanx --------- Signed-off-by: suyanhanx --- core/src/services/fs/backend.rs | 46 +++++- core/src/services/fs/writer.rs | 1 + core/tests/behavior/blocking_append.rs | 220 +++++++++++++++++++++++++ core/tests/behavior/main.rs | 3 + 4 files changed, 262 insertions(+), 8 deletions(-) create mode 100644 core/tests/behavior/blocking_append.rs diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs index b1f1fdf49c19..a225958d2cea 100644 --- a/core/src/services/fs/backend.rs +++ b/core/src/services/fs/backend.rs @@ -55,6 +55,11 @@ impl FsBuilder { } /// Set temp dir for atomic write. + /// + /// # Notes + /// + /// - When append is enabled, we will not use atomic write + /// to avoid data loss and performance issue. pub fn atomic_write_dir(&mut self, dir: &str) -> &mut Self { self.atomic_write_dir = if dir.is_empty() { None @@ -367,7 +372,17 @@ impl Accessor for FsBackend { let target_path = Self::ensure_write_abs_path(&self.root, path).await?; let tmp_path = Self::ensure_write_abs_path(atomic_write_dir, &tmp_file_of(path)).await?; - (target_path, Some(tmp_path)) + + // If the target file exists, we should append to the end of it directly. + if op.append() + && tokio::fs::try_exists(&target_path) + .await + .map_err(parse_io_error)? + { + (target_path, None) + } else { + (target_path, Some(tmp_path)) + } } else { let p = Self::ensure_write_abs_path(&self.root, path).await?; @@ -375,7 +390,6 @@ impl Accessor for FsBackend { }; let mut open_options = tokio::fs::OpenOptions::new(); - open_options.create(true).write(true); if op.append() { open_options.append(true); @@ -554,22 +568,38 @@ impl Accessor for FsBackend { Ok((RpRead::new(end - start), r)) } - fn blocking_write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { + fn blocking_write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.atomic_write_dir { let target_path = Self::blocking_ensure_write_abs_path(&self.root, path)?; let tmp_path = Self::blocking_ensure_write_abs_path(atomic_write_dir, &tmp_file_of(path))?; - (target_path, Some(tmp_path)) + + // If the target file exists, we should append to the end of it directly. + if op.append() + && Path::new(&target_path) + .try_exists() + .map_err(parse_io_error)? + { + (target_path, None) + } else { + (target_path, Some(tmp_path)) + } } else { let p = Self::blocking_ensure_write_abs_path(&self.root, path)?; (p, None) }; - let f = std::fs::OpenOptions::new() - .create(true) - .truncate(true) - .write(true) + let mut f = std::fs::OpenOptions::new(); + f.create(true).write(true); + + if op.append() { + f.append(true); + } else { + f.truncate(true); + } + + let f = f .open(tmp_path.as_ref().unwrap_or(&target_path)) .map_err(parse_io_error)?; diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index 5b1ae953ea87..d1283d4ca8c1 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -45,6 +45,7 @@ impl FsWriter { Self { target_path, tmp_path, + f: Some(f), fut: None, } diff --git a/core/tests/behavior/blocking_append.rs b/core/tests/behavior/blocking_append.rs new file mode 100644 index 000000000000..ccb89f786a63 --- /dev/null +++ b/core/tests/behavior/blocking_append.rs @@ -0,0 +1,220 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::vec; + +use anyhow::Result; +use sha2::Digest; +use sha2::Sha256; +use std::io::BufReader; +use std::io::Cursor; + +use crate::*; + +pub fn behavior_blocking_append_tests(op: &Operator) -> Vec { + let cap = op.info().full_capability(); + + if !(cap.read && cap.write && cap.blocking && cap.write_can_append) { + return vec![]; + } + + blocking_trials!( + op, + test_blocking_append_create_append, + test_blocking_append_with_dir_path, + test_blocking_append_with_cache_control, + test_blocking_append_with_content_type, + test_blocking_append_with_content_disposition, + test_blocking_appender_std_copy, + test_blocking_fuzz_appender + ) +} + +/// Test append to a file must success. +pub fn test_blocking_append_create_append(op: BlockingOperator) -> Result<()> { + let path = uuid::Uuid::new_v4().to_string(); + let (content_one, size_one) = gen_bytes(); + let (content_two, size_two) = gen_bytes(); + + op.write_with(&path, content_one.clone()) + .append(true) + .call() + .expect("append file first time must success"); + + op.write_with(&path, content_two.clone()) + .append(true) + .call() + .expect("append to an existing file must success"); + + let bs = op.read(&path).expect("read file must success"); + + assert_eq!(bs.len(), size_one + size_two); + assert_eq!(bs[..size_one], content_one); + assert_eq!(bs[size_one..], content_two); + + op.delete(&path).expect("delete file must success"); + + Ok(()) +} + +/// Test append to a directory path must fail. +pub fn test_blocking_append_with_dir_path(op: BlockingOperator) -> Result<()> { + let path = format!("{}/", uuid::Uuid::new_v4()); + let (content, _) = gen_bytes(); + + let res = op.write_with(&path, content).append(true).call(); + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::IsADirectory); + + Ok(()) +} + +/// Test append with cache control must success. +pub fn test_blocking_append_with_cache_control(op: BlockingOperator) -> Result<()> { + if !op.info().full_capability().write_with_cache_control { + return Ok(()); + } + + let path = uuid::Uuid::new_v4().to_string(); + let (content, _) = gen_bytes(); + + let target_cache_control = "no-cache, no-store, max-age=300"; + op.write_with(&path, content) + .append(true) + .cache_control(target_cache_control) + .call()?; + + let meta = op.stat(&path).expect("stat must succeed"); + assert_eq!(meta.mode(), EntryMode::FILE); + assert_eq!( + meta.cache_control().expect("cache control must exist"), + target_cache_control + ); + + op.delete(&path).expect("delete must succeed"); + + Ok(()) +} + +/// Test append with content type must success. +pub fn test_blocking_append_with_content_type(op: BlockingOperator) -> Result<()> { + if !op.info().full_capability().write_with_content_type { + return Ok(()); + } + + let path = uuid::Uuid::new_v4().to_string(); + let (content, size) = gen_bytes(); + + let target_content_type = "application/json"; + op.write_with(&path, content) + .append(true) + .content_type(target_content_type) + .call()?; + + let meta = op.stat(&path).expect("stat must succeed"); + assert_eq!(meta.mode(), EntryMode::FILE); + assert_eq!( + meta.content_type().expect("content type must exist"), + target_content_type + ); + assert_eq!(meta.content_length(), size as u64); + + op.delete(&path).expect("delete must succeed"); + + Ok(()) +} + +/// Write a single file with content disposition should succeed. +pub fn test_blocking_append_with_content_disposition(op: BlockingOperator) -> Result<()> { + if !op.info().full_capability().write_with_content_disposition { + return Ok(()); + } + + let path = uuid::Uuid::new_v4().to_string(); + let (content, size) = gen_bytes(); + + let target_content_disposition = "attachment; filename=\"filename.jpg\""; + op.write_with(&path, content) + .append(true) + .content_disposition(target_content_disposition) + .call()?; + + let meta = op.stat(&path).expect("stat must succeed"); + assert_eq!(meta.mode(), EntryMode::FILE); + assert_eq!( + meta.content_disposition().expect("content type must exist"), + target_content_disposition + ); + assert_eq!(meta.content_length(), size as u64); + + op.delete(&path).expect("delete must succeed"); + + Ok(()) +} + +/// Copy data from reader to writer +pub fn test_blocking_appender_std_copy(op: BlockingOperator) -> Result<()> { + let path = uuid::Uuid::new_v4().to_string(); + let (content, size): (Vec, usize) = + gen_bytes_with_range(10 * 1024 * 1024..20 * 1024 * 1024); + + let mut a = op.writer_with(&path).append(true).call()?; + + // Wrap a buf reader here to make sure content is read in 1MiB chunks. + let mut cursor = BufReader::with_capacity(1024 * 1024, Cursor::new(content.clone())); + std::io::copy(&mut cursor, &mut a)?; + a.close()?; + + let meta = op.stat(&path).expect("stat must succeed"); + assert_eq!(meta.content_length(), size as u64); + + let bs = op.read(&path)?; + assert_eq!(bs.len(), size, "read size"); + assert_eq!( + format!("{:x}", Sha256::digest(&bs[..size])), + format!("{:x}", Sha256::digest(content)), + "read content" + ); + + op.delete(&path).expect("delete must succeed"); + Ok(()) +} + +/// Test for fuzzing appender. +pub fn test_blocking_fuzz_appender(op: BlockingOperator) -> Result<()> { + let path = uuid::Uuid::new_v4().to_string(); + + let mut fuzzer = ObjectWriterFuzzer::new(&path, None); + + let mut a = op.writer_with(&path).append(true).call()?; + + for _ in 0..100 { + match fuzzer.fuzz() { + ObjectWriterAction::Write(bs) => { + a.write(bs)?; + } + } + } + a.close()?; + + let content = op.read(&path)?; + fuzzer.check(&content); + + op.delete(&path).expect("delete file must success"); + + Ok(()) +} diff --git a/core/tests/behavior/main.rs b/core/tests/behavior/main.rs index c46ba4245244..188da6365d3c 100644 --- a/core/tests/behavior/main.rs +++ b/core/tests/behavior/main.rs @@ -42,12 +42,14 @@ use rename::behavior_rename_tests; use write::behavior_write_tests; // Blocking test cases +mod blocking_append; mod blocking_copy; mod blocking_list; mod blocking_read_only; mod blocking_rename; mod blocking_write; +use blocking_append::behavior_blocking_append_tests; use blocking_copy::behavior_blocking_copy_tests; use blocking_list::behavior_blocking_list_tests; use blocking_read_only::behavior_blocking_read_only_tests; @@ -73,6 +75,7 @@ fn behavior_test() -> Vec { let mut trials = vec![]; // Blocking tests + trials.extend(behavior_blocking_append_tests(&operator)); trials.extend(behavior_blocking_copy_tests(&operator)); trials.extend(behavior_blocking_list_tests(&operator)); trials.extend(behavior_blocking_read_only_tests(&operator));