Skip to content

Commit

Permalink
Fix ObjectStore.LocalFileSystem.put_opts for blobfuse (#5094)
Browse files Browse the repository at this point in the history
* Fix ObjectStore.LocalFileSystem.put_opts for blobfuse

* Fix ObjectStore.LocalFileSystem.put_opts for blobfuse

* fix comment

* fix race condition

* add comment
  • Loading branch information
RobinLin666 authored Nov 27, 2023
1 parent 409bb81 commit d5a6cf4
Showing 1 changed file with 32 additions and 24 deletions.
56 changes: 32 additions & 24 deletions object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,28 +338,41 @@ impl ObjectStore for LocalFileSystem {
maybe_spawn_blocking(move || {
let (mut file, suffix) = new_staged_upload(&path)?;
let staging_path = staged_upload_path(&path, &suffix);
let mut e_tag = None;

let err = match file.write_all(&bytes) {
Ok(_) => match opts.mode {
PutMode::Overwrite => match std::fs::rename(&staging_path, &path) {
Ok(_) => None,
Err(source) => Some(Error::UnableToRenameFile { source }),
},
PutMode::Create => match std::fs::hard_link(&staging_path, &path) {
Ok(_) => {
let _ = std::fs::remove_file(&staging_path); // Attempt to cleanup
None
Ok(_) => {
let metadata = file.metadata().map_err(|e| Error::Metadata {
source: e.into(),
path: path.to_string_lossy().to_string(),
})?;
e_tag = Some(get_etag(&metadata));
match opts.mode {
PutMode::Overwrite => {
// For some fuse types of file systems, the file must be closed first
// to trigger the upload operation, and then renamed, such as Blobfuse
std::mem::drop(file);
match std::fs::rename(&staging_path, &path) {
Ok(_) => None,
Err(source) => Some(Error::UnableToRenameFile { source }),
}
}
Err(source) => match source.kind() {
ErrorKind::AlreadyExists => Some(Error::AlreadyExists {
path: path.to_str().unwrap().to_string(),
source,
}),
_ => Some(Error::UnableToRenameFile { source }),
PutMode::Create => match std::fs::hard_link(&staging_path, &path) {
Ok(_) => {
let _ = std::fs::remove_file(&staging_path); // Attempt to cleanup
None
}
Err(source) => match source.kind() {
ErrorKind::AlreadyExists => Some(Error::AlreadyExists {
path: path.to_str().unwrap().to_string(),
source,
}),
_ => Some(Error::UnableToRenameFile { source }),
},
},
},
PutMode::Update(_) => unreachable!(),
},
PutMode::Update(_) => unreachable!(),
}
}
Err(source) => Some(Error::UnableToCopyDataToFile { source }),
};

Expand All @@ -368,13 +381,8 @@ impl ObjectStore for LocalFileSystem {
return Err(err.into());
}

let metadata = file.metadata().map_err(|e| Error::Metadata {
source: e.into(),
path: path.to_string_lossy().to_string(),
})?;

Ok(PutResult {
e_tag: Some(get_etag(&metadata)),
e_tag,
version: None,
})
})
Expand Down

0 comments on commit d5a6cf4

Please sign in to comment.