Skip to content

Commit

Permalink
fix(services/fs,hdfs): fix poll_close when retry
Browse files Browse the repository at this point in the history
  • Loading branch information
hoslo committed Feb 4, 2024
1 parent 0f316b4 commit 4fcba9d
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 20 deletions.
51 changes: 37 additions & 14 deletions core/src/services/fs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub struct FsWriter<F> {
tmp_path: Option<PathBuf>,

f: Option<F>,
fut: Option<BoxFuture<'static, Result<()>>>,
fut: Option<BoxFuture<'static, (F, Result<()>)>>,
}

impl<F> FsWriter<F> {
Expand Down Expand Up @@ -69,23 +69,35 @@ impl oio::Write for FsWriter<tokio::fs::File> {
if let Some(fut) = self.fut.as_mut() {
let res = ready!(fut.poll_unpin(cx));
self.fut = None;
return Poll::Ready(res);
if let Err(e) = res.1 {
self.f = Some(res.0);
return Poll::Ready(Err(e));
}
return Poll::Ready(Ok(()));
}

let mut f = self.f.take().expect("FsWriter must be initialized");
let tmp_path = self.tmp_path.clone();
let target_path = self.target_path.clone();
self.fut = Some(Box::pin(async move {
f.flush().await.map_err(new_std_io_error)?;
f.sync_all().await.map_err(new_std_io_error)?;
if let Err(e) = f.flush().await.map_err(new_std_io_error) {
// Reserve the original error for retry.
return (f, Err(e));
}
if let Err(e) = f.sync_all().await.map_err(new_std_io_error) {
return (f, Err(e));
}

if let Some(tmp_path) = &tmp_path {
tokio::fs::rename(tmp_path, &target_path)
if let Err(e) = tokio::fs::rename(tmp_path, &target_path)
.await
.map_err(new_std_io_error)?;
.map_err(new_std_io_error)
{
return (f, Err(e));
}
}

Ok(())
(f, Ok(()))
}));
}
}
Expand All @@ -95,21 +107,32 @@ impl oio::Write for FsWriter<tokio::fs::File> {
if let Some(fut) = self.fut.as_mut() {
let res = ready!(fut.poll_unpin(cx));
self.fut = None;
return Poll::Ready(res);
if let Err(e) = res.1 {
self.f = Some(res.0);
return Poll::Ready(Err(e));
}
return Poll::Ready(Ok(()));
}

let _ = self.f.take().expect("FsWriter must be initialized");
let f = self.f.take().expect("FsWriter must be initialized");
let tmp_path = self.tmp_path.clone();
self.fut = Some(Box::pin(async move {
if let Some(tmp_path) = &tmp_path {
tokio::fs::remove_file(tmp_path)
if let Err(e) = tokio::fs::remove_file(tmp_path)
.await
.map_err(new_std_io_error)
{
return (f, Err(e));
}
(f, Ok(()))
} else {
Err(Error::new(
ErrorKind::Unsupported,
"Fs doesn't support abort if atomic_write_dir is not set",
))
(
f,
Err(Error::new(
ErrorKind::Unsupported,
"Fs doesn't support abort if atomic_write_dir is not set",
)),
)
}
}));
}
Expand Down
22 changes: 16 additions & 6 deletions core/src/services/hdfs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub struct HdfsWriter<F> {
tmp_path: Option<String>,
f: Option<F>,
client: Arc<hdrs::Client>,
fut: Option<BoxFuture<'static, Result<()>>>,
fut: Option<BoxFuture<'static, (F, Result<()>)>>,
}

/// # Safety
Expand Down Expand Up @@ -76,7 +76,11 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
if let Some(fut) = self.fut.as_mut() {
let res = ready!(fut.poll_unpin(cx));
self.fut = None;
return Poll::Ready(res);
if let Err(e) = res.1 {
self.f = Some(res.0);
return Poll::Ready(Err(e));
}
return Poll::Ready(Ok(()));
}

let mut f = self.f.take().expect("HdfsWriter must be initialized");
Expand All @@ -86,15 +90,21 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
let client = self.client.clone();

self.fut = Some(Box::pin(async move {
f.close().await.map_err(new_std_io_error)?;
if let Err(e) = f.close().await.map_err(new_std_io_error) {
// Reserve the original error for retry.
return (f, Err(e));
}

if let Some(tmp_path) = tmp_path {
client
if let Err(e) = client
.rename_file(&tmp_path, &target_path)
.map_err(new_std_io_error)?;
.map_err(new_std_io_error)
{
return (f, Err(e));
}
}

Ok(())
(f, Ok(()))
}));
}
}
Expand Down

0 comments on commit 4fcba9d

Please sign in to comment.