From 4fcba9dddf66f5d36db30016999a89c2870216e3 Mon Sep 17 00:00:00 2001 From: hoslo Date: Sun, 4 Feb 2024 10:12:49 +0800 Subject: [PATCH] fix(services/fs,hdfs): fix poll_close when retry --- core/src/services/fs/writer.rs | 51 +++++++++++++++++++++++--------- core/src/services/hdfs/writer.rs | 22 ++++++++++---- 2 files changed, 53 insertions(+), 20 deletions(-) diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index 12e5a4fffe77..a8ebb4fd3c39 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -35,7 +35,7 @@ pub struct FsWriter { tmp_path: Option, f: Option, - fut: Option>>, + fut: Option)>>, } impl FsWriter { @@ -69,23 +69,35 @@ impl oio::Write for FsWriter { if let Some(fut) = self.fut.as_mut() { let res = ready!(fut.poll_unpin(cx)); self.fut = None; - return Poll::Ready(res); + if let Err(e) = res.1 { + self.f = Some(res.0); + return Poll::Ready(Err(e)); + } + return Poll::Ready(Ok(())); } let mut f = self.f.take().expect("FsWriter must be initialized"); let tmp_path = self.tmp_path.clone(); let target_path = self.target_path.clone(); self.fut = Some(Box::pin(async move { - f.flush().await.map_err(new_std_io_error)?; - f.sync_all().await.map_err(new_std_io_error)?; + if let Err(e) = f.flush().await.map_err(new_std_io_error) { + // Reserve the original error for retry. + return (f, Err(e)); + } + if let Err(e) = f.sync_all().await.map_err(new_std_io_error) { + return (f, Err(e)); + } if let Some(tmp_path) = &tmp_path { - tokio::fs::rename(tmp_path, &target_path) + if let Err(e) = tokio::fs::rename(tmp_path, &target_path) .await - .map_err(new_std_io_error)?; + .map_err(new_std_io_error) + { + return (f, Err(e)); + } } - Ok(()) + (f, Ok(())) })); } } @@ -95,21 +107,32 @@ impl oio::Write for FsWriter { if let Some(fut) = self.fut.as_mut() { let res = ready!(fut.poll_unpin(cx)); self.fut = None; - return Poll::Ready(res); + if let Err(e) = res.1 { + self.f = Some(res.0); + return Poll::Ready(Err(e)); + } + return Poll::Ready(Ok(())); } - let _ = self.f.take().expect("FsWriter must be initialized"); + let f = self.f.take().expect("FsWriter must be initialized"); let tmp_path = self.tmp_path.clone(); self.fut = Some(Box::pin(async move { if let Some(tmp_path) = &tmp_path { - tokio::fs::remove_file(tmp_path) + if let Err(e) = tokio::fs::remove_file(tmp_path) .await .map_err(new_std_io_error) + { + return (f, Err(e)); + } + (f, Ok(())) } else { - Err(Error::new( - ErrorKind::Unsupported, - "Fs doesn't support abort if atomic_write_dir is not set", - )) + ( + f, + Err(Error::new( + ErrorKind::Unsupported, + "Fs doesn't support abort if atomic_write_dir is not set", + )), + ) } })); } diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs index 6c77097d842f..535e97c38b5c 100644 --- a/core/src/services/hdfs/writer.rs +++ b/core/src/services/hdfs/writer.rs @@ -36,7 +36,7 @@ pub struct HdfsWriter { tmp_path: Option, f: Option, client: Arc, - fut: Option>>, + fut: Option)>>, } /// # Safety @@ -76,7 +76,11 @@ impl oio::Write for HdfsWriter { if let Some(fut) = self.fut.as_mut() { let res = ready!(fut.poll_unpin(cx)); self.fut = None; - return Poll::Ready(res); + if let Err(e) = res.1 { + self.f = Some(res.0); + return Poll::Ready(Err(e)); + } + return Poll::Ready(Ok(())); } let mut f = self.f.take().expect("HdfsWriter must be initialized"); @@ -86,15 +90,21 @@ impl oio::Write for HdfsWriter { let client = self.client.clone(); self.fut = Some(Box::pin(async move { - f.close().await.map_err(new_std_io_error)?; + if let Err(e) = f.close().await.map_err(new_std_io_error) { + // Reserve the original error for retry. + return (f, Err(e)); + } if let Some(tmp_path) = tmp_path { - client + if let Err(e) = client .rename_file(&tmp_path, &target_path) - .map_err(new_std_io_error)?; + .map_err(new_std_io_error) + { + return (f, Err(e)); + } } - Ok(()) + (f, Ok(())) })); } }