Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

s3s-fs: fix incomplete uploads by writing via a temp file #116

Merged
merged 1 commit into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 79 additions & 2 deletions crates/s3s-fs/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ use s3s::dto;
use std::env;
use std::ops::Not;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};

use tokio::fs;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio::io::{AsyncReadExt, BufWriter};

use md5::{Digest, Md5};
use path_absolutize::Absolutize;
Expand All @@ -19,14 +20,35 @@ use uuid::Uuid;
#[derive(Debug)]
pub struct FileSystem {
pub(crate) root: PathBuf,
tmp_file_counter: AtomicU64,
}

pub(crate) type InternalInfo = serde_json::Map<String, serde_json::Value>;

fn clean_old_tmp_files(root: &Path) -> std::io::Result<()> {
let entries = match std::fs::read_dir(root) {
Ok(entries) => Ok(entries),
Err(ref io_err) if io_err.kind() == std::io::ErrorKind::NotFound => return Ok(()),
Err(io_err) => Err(io_err),
}?;
for entry in entries {
let entry = entry?;
let file_name = entry.file_name();
let Some(file_name) = file_name.to_str() else { continue };
// See `FileSystem::write_file`
if file_name.starts_with(".tmp.") && file_name.ends_with(".internal.part") {
std::fs::remove_file(entry.path())?;
}
}
Ok(())
}

impl FileSystem {
pub fn new(root: impl AsRef<Path>) -> Result<Self> {
let root = env::current_dir()?.join(root).canonicalize()?;
Ok(Self { root })
clean_old_tmp_files(&root)?;
let tmp_file_counter = AtomicU64::new(0);
Ok(Self { root, tmp_file_counter })
}

pub(crate) fn resolve_abs_path(&self, path: impl AsRef<Path>) -> Result<PathBuf> {
Expand Down Expand Up @@ -146,4 +168,59 @@ impl FileSystem {
}
Ok(())
}

/// Write to the filesystem atomically.
/// This is done by first writing to a temporary location and then moving the file.
pub(crate) async fn prepare_file_write(&self, bucket: &str, key: &str) -> Result<FileWriter> {
let final_path = Some(self.get_object_path(bucket, key)?);
let tmp_name = format!(".tmp.{}.internal.part", self.tmp_file_counter.fetch_add(1, Ordering::SeqCst));
let tmp_path = self.resolve_abs_path(tmp_name)?;
let file = File::create(&tmp_path).await?;
let writer = BufWriter::new(file);
Ok(FileWriter {
tmp_path,
final_path,
writer,
clean_tmp: true,
})
}
}

pub(crate) struct FileWriter {
tmp_path: PathBuf,
final_path: Option<PathBuf>,
writer: BufWriter<File>,
clean_tmp: bool,
}

impl FileWriter {
pub(crate) fn tmp_path(&self) -> &Path {
&self.tmp_path
}

pub(crate) fn final_path(&self) -> &Path {
self.final_path.as_ref().unwrap()
}

pub(crate) fn writer(&mut self) -> &mut BufWriter<File> {
&mut self.writer
}

pub(crate) async fn done(mut self) -> Result<PathBuf> {
if let Some(final_dir_path) = self.final_path().parent() {
fs::create_dir_all(&final_dir_path).await?;
}

fs::rename(&self.tmp_path, &self.final_path()).await?;
self.clean_tmp = false;
Ok(self.final_path.take().unwrap())
}
}

impl Drop for FileWriter {
fn drop(&mut self) {
if self.clean_tmp {
let _ = std::fs::remove_file(&self.tmp_path);
}
}
}
20 changes: 7 additions & 13 deletions crates/s3s-fs/src/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,21 +466,17 @@ impl S3 for FileSystem {
return Ok(S3Response::new(output));
}

let object_path = self.get_object_path(&bucket, &key)?;
if let Some(dir_path) = object_path.parent() {
try_!(fs::create_dir_all(&dir_path).await);
}
let mut file_writer = self.prepare_file_write(&bucket, &key).await?;

let mut md5_hash = Md5::new();
let stream = body.inspect_ok(|bytes| {
md5_hash.update(bytes.as_ref());
checksum.update(bytes.as_ref());
});

let file = try_!(fs::File::create(&object_path).await);
let mut writer = BufWriter::new(file);
let size = copy_bytes(stream, file_writer.writer()).await?;
let object_path = file_writer.done().await?;

let size = copy_bytes(stream, &mut writer).await?;
let md5_sum = hex(md5_hash.finalize());

let checksum = checksum.finalize();
Expand Down Expand Up @@ -711,9 +707,7 @@ impl S3 for FileSystem {

self.delete_upload_id(&upload_id).await?;

let object_path = self.get_object_path(&bucket, &key)?;
let file = try_!(fs::File::create(&object_path).await);
let mut writer = BufWriter::new(file);
let mut file_writer = self.prepare_file_write(&bucket, &key).await?;

let mut cnt: i32 = 0;
for part in multipart_upload.parts.into_iter().flatten() {
Expand All @@ -726,12 +720,12 @@ impl S3 for FileSystem {
let part_path = self.resolve_abs_path(format!(".upload_id-{upload_id}.part-{part_number}"))?;

let mut reader = try_!(fs::File::open(&part_path).await);
let size = try_!(tokio::io::copy(&mut reader, &mut writer).await);
let size = try_!(tokio::io::copy(&mut reader, &mut file_writer.writer()).await);

debug!(from = %part_path.display(), to = %object_path.display(), ?size, "write file");
debug!(from = %part_path.display(), tmp = %file_writer.tmp_path().display(), to = %file_writer.final_path().display(), ?size, "write file");
try_!(fs::remove_file(&part_path).await);
}
drop(writer);
let object_path = file_writer.done().await?;

let file_size = try_!(fs::metadata(&object_path).await).len();
let md5_sum = self.get_md5_sum(&bucket, &key).await?;
Expand Down