diff --git a/.env.example b/.env.example index 849e09786bef..448f94ce194c 100644 --- a/.env.example +++ b/.env.example @@ -77,6 +77,7 @@ OPENDAL_WEBDAV_ENDPOINT=http://127.0.0.1:8080 OPENDAL_WEBHDFS_ROOT=/tmp/opendal/ OPENDAL_WEBHDFS_ENDPOINT=http://127.0.0.1:9870 OPENDAL_WEBHDFS_DELEGATION= +OPENDAL_WEBHDFS_ATOMIC_WRITE_DIR=.opendal_tmp/ OPENDAL_WEBHDFS_DISABLE_LIST_BATCH=false # supbase OPENDAL_SUPABASE_BUCKET= diff --git a/.github/services/webhdfs/webhdfs/action.yml b/.github/services/webhdfs/webhdfs/action.yml index ae9cdfa73c93..1a748bbebb5b 100644 --- a/.github/services/webhdfs/webhdfs/action.yml +++ b/.github/services/webhdfs/webhdfs/action.yml @@ -32,4 +32,5 @@ runs: cat << EOF >> $GITHUB_ENV OPENDAL_WEBHDFS_ROOT=/ OPENDAL_WEBHDFS_ENDPOINT=http://127.0.0.1:9870 + OPENDAL_WEBHDFS_ATOMIC_WRITE_DIR=.opendal_tmp/ EOF diff --git a/.github/services/webhdfs/webhdfs_with_list_batch_disabled/action.yml b/.github/services/webhdfs/webhdfs_with_list_batch_disabled/action.yml index 7fb294b045e5..b27ab6a6e426 100644 --- a/.github/services/webhdfs/webhdfs_with_list_batch_disabled/action.yml +++ b/.github/services/webhdfs/webhdfs_with_list_batch_disabled/action.yml @@ -32,5 +32,6 @@ runs: cat << EOF >> $GITHUB_ENV OPENDAL_WEBHDFS_ROOT=/ OPENDAL_WEBHDFS_ENDPOINT=http://127.0.0.1:9870 + OPENDAL_WEBHDFS_ATOMIC_WRITE_DIR=.opendal_tmp/ OPENDAL_WEBHDFS_DISABLE_LIST_BATCH=true EOF diff --git a/core/src/raw/oio/write/block_write.rs b/core/src/raw/oio/write/block_write.rs index 8665d50c00e8..56db176f3262 100644 --- a/core/src/raw/oio/write/block_write.rs +++ b/core/src/raw/oio/write/block_write.rs @@ -170,8 +170,8 @@ where }))); let size = self.fill_cache(bs); return Poll::Ready(Ok(size)); - } else { - ready!(self.futures.poll_next_unpin(cx)); + } else if let Some(res) = ready!(self.futures.poll_next_unpin(cx)) { + res?; } } State::Close(_) => { @@ -209,8 +209,7 @@ where })); } } - } - if self.futures.is_empty() && self.cache.is_none() { + } else if self.futures.is_empty() && self.cache.is_none() { self.state = State::Close(Box::pin( async move { w.complete_block(block_ids).await }, @@ -232,7 +231,9 @@ where }))); } } - while ready!(self.futures.poll_next_unpin(cx)).is_some() {} + while let Some(res) = ready!(self.futures.poll_next_unpin(cx)) { + res?; + } } } State::Close(fut) => { diff --git a/core/src/services/webhdfs/backend.rs b/core/src/services/webhdfs/backend.rs index ba804db99596..80b9e3e72ed1 100644 --- a/core/src/services/webhdfs/backend.rs +++ b/core/src/services/webhdfs/backend.rs @@ -49,6 +49,8 @@ pub struct WebhdfsBuilder { endpoint: Option, delegation: Option, disable_list_batch: bool, + /// atomic_write_dir of this backend + pub atomic_write_dir: Option, } impl Debug for WebhdfsBuilder { @@ -56,6 +58,7 @@ impl Debug for WebhdfsBuilder { f.debug_struct("Builder") .field("root", &self.root) .field("endpoint", &self.endpoint) + .field("atomic_write_dir", &self.atomic_write_dir) .finish_non_exhaustive() } } @@ -117,6 +120,20 @@ impl WebhdfsBuilder { self.disable_list_batch = true; self } + + /// Set temp dir for atomic write. + /// + /// # Notes + /// + /// If not set, write multi not support, eg: `.opendal_tmp/`. + pub fn atomic_write_dir(&mut self, dir: &str) -> &mut Self { + self.atomic_write_dir = if dir.is_empty() { + None + } else { + Some(String::from(dir)) + }; + self + } } impl Builder for WebhdfsBuilder { @@ -132,6 +149,8 @@ impl Builder for WebhdfsBuilder { map.get("disable_list_batch") .filter(|v| v == &"true") .map(|_| builder.disable_list_batch()); + map.get("atomic_write_dir") + .map(|v| builder.atomic_write_dir(v)); builder } @@ -162,6 +181,8 @@ impl Builder for WebhdfsBuilder { }; debug!("backend use endpoint {}", endpoint); + let atomic_write_dir = self.atomic_write_dir.take(); + let auth = self .delegation .take() @@ -175,6 +196,7 @@ impl Builder for WebhdfsBuilder { auth, client, root_checker: OnceCell::new(), + atomic_write_dir, disable_list_batch: self.disable_list_batch, }; @@ -190,52 +212,78 @@ pub struct WebhdfsBackend { auth: Option, root_checker: OnceCell<()>, + pub atomic_write_dir: Option, pub disable_list_batch: bool, pub client: HttpClient, } impl WebhdfsBackend { - /// create object or make a directory - /// - /// TODO: we should split it into mkdir and create - pub fn webhdfs_create_object_request( + pub fn webhdfs_create_dir_request(&self, path: &str) -> Result> { + let p = build_abs_path(&self.root, path); + + let mut url = format!( + "{}/webhdfs/v1/{}?op=MKDIRS&overwrite=true&noredirect=true", + self.endpoint, + percent_encode_path(&p), + ); + if let Some(auth) = &self.auth { + url += format!("&{auth}").as_str(); + } + + let req = Request::put(&url); + + req.body(AsyncBody::Empty).map_err(new_request_build_error) + } + /// create object + pub async fn webhdfs_create_object_request( &self, path: &str, - size: Option, + size: Option, args: &OpWrite, body: AsyncBody, ) -> Result> { let p = build_abs_path(&self.root, path); - let op = if path.ends_with('/') { - "MKDIRS" - } else { - "CREATE" - }; + let mut url = format!( - "{}/webhdfs/v1/{}?op={}&overwrite=true", + "{}/webhdfs/v1/{}?op=CREATE&overwrite=true&noredirect=true", self.endpoint, percent_encode_path(&p), - op, ); if let Some(auth) = &self.auth { url += format!("&{auth}").as_str(); } - let mut req = Request::put(&url); + let req = Request::put(&url); + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; - // mkdir does not redirect - if path.ends_with('/') { - return req.body(AsyncBody::Empty).map_err(new_request_build_error); + let resp = self.client.send(req).await?; + + let status = resp.status(); + + if status != StatusCode::CREATED && status != StatusCode::OK { + return Err(parse_error(resp).await?); } + let bs = resp.into_body().bytes().await?; + + let resp = + serde_json::from_slice::(&bs).map_err(new_json_deserialize_error)?; + + let mut req = Request::put(&resp.location); + if let Some(size) = size { - req = req.header(CONTENT_LENGTH, size.to_string()); - } + req = req.header(CONTENT_LENGTH, size); + }; + if let Some(content_type) = args.content_type() { req = req.header(CONTENT_TYPE, content_type); - } + }; + let req = req.body(body).map_err(new_request_build_error)?; - req.body(body).map_err(new_request_build_error) + Ok(req) } pub async fn webhdfs_init_append_request(&self, path: &str) -> Result { @@ -260,7 +308,7 @@ impl WebhdfsBackend { match status { StatusCode::OK => { let bs = resp.into_body().bytes().await?; - let resp: InitAppendResponse = + let resp: LocationResponse = serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?; Ok(resp.location) @@ -269,6 +317,32 @@ impl WebhdfsBackend { } } + pub async fn webhdfs_rename_object( + &self, + from: &str, + to: &str, + ) -> Result> { + let from = build_abs_path(&self.root, from); + let to = build_rooted_abs_path(&self.root, to); + + let mut url = format!( + "{}/webhdfs/v1/{}?op=RENAME&destination={}", + self.endpoint, + percent_encode_path(&from), + percent_encode_path(&to) + ); + + if let Some(auth) = &self.auth { + url += &format!("&{auth}"); + } + + let req = Request::put(&url) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.client.send(req).await + } + pub async fn webhdfs_append_request( &self, location: &str, @@ -288,6 +362,36 @@ impl WebhdfsBackend { req.body(body).map_err(new_request_build_error) } + /// CONCAT will concat sources to the path + pub fn webhdfs_concat_request( + &self, + path: &str, + sources: Vec, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let sources = sources + .iter() + .map(|p| build_rooted_abs_path(&self.root, p)) + .collect::>() + .join(","); + + let mut url = format!( + "{}/webhdfs/v1/{}?op=CONCAT&sources={}", + self.endpoint, + percent_encode_path(&p), + percent_encode_path(&sources), + ); + + if let Some(auth) = &self.auth { + url += &format!("&{auth}"); + } + + let req = Request::post(url); + + req.body(AsyncBody::Empty).map_err(new_request_build_error) + } + async fn webhdfs_open_request( &self, path: &str, @@ -403,7 +507,7 @@ impl WebhdfsBackend { self.client.send(req).await } - async fn webhdfs_delete(&self, path: &str) -> Result> { + pub async fn webhdfs_delete(&self, path: &str) -> Result> { let p = build_abs_path(&self.root, path); let mut url = format!( "{}/webhdfs/v1/{}?op=DELETE&recursive=false", @@ -469,6 +573,8 @@ impl Accessor for WebhdfsBackend { write: true, write_can_append: true, + write_can_multi: self.atomic_write_dir.is_some(), + create_dir: true, delete: true, @@ -481,12 +587,7 @@ impl Accessor for WebhdfsBackend { /// Create a file or directory async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result { - let req = self.webhdfs_create_object_request( - path, - Some(0), - &OpWrite::default(), - AsyncBody::Empty, - )?; + let req = self.webhdfs_create_dir_request(path)?; let resp = self.client.send(req).await?; @@ -585,7 +686,7 @@ impl Accessor for WebhdfsBackend { let w = if args.append() { WebhdfsWriters::Two(oio::AppendObjectWriter::new(w)) } else { - WebhdfsWriters::One(oio::OneShotWriter::new(w)) + WebhdfsWriters::One(oio::BlockWriter::new(w, args.concurrent())) }; Ok((RpWrite::default(), w)) @@ -619,6 +720,6 @@ impl Accessor for WebhdfsBackend { #[derive(Debug, Deserialize)] #[serde(rename_all = "PascalCase")] -pub(super) struct InitAppendResponse { +pub(super) struct LocationResponse { pub location: String, } diff --git a/core/src/services/webhdfs/docs.md b/core/src/services/webhdfs/docs.md index 314b81cf97fa..497c46a9dc7c 100644 --- a/core/src/services/webhdfs/docs.md +++ b/core/src/services/webhdfs/docs.md @@ -28,6 +28,7 @@ This service can be used to: - `root`: The root path of the WebHDFS service. - `endpoint`: The endpoint of the WebHDFS service. - `delegation`: The delegation token for WebHDFS. +- `atomic_write_dir`: The tmp write dir of multi write for WebHDFS. Refer to [`Builder`]'s public API docs for more information. @@ -58,6 +59,8 @@ async fn main() -> Result<()> { builder.endpoint("http://127.0.0.1:9870"); // set the delegation_token for builder builder.delegation("delegation_token"); + // set atomic_write_dir for builder + builder.atomic_write_dir(".opendal_tmp/"); let op: Operator = Operator::new(builder)?.finish(); diff --git a/core/src/services/webhdfs/writer.rs b/core/src/services/webhdfs/writer.rs index 296f3edb1063..52a955b1aed6 100644 --- a/core/src/services/webhdfs/writer.rs +++ b/core/src/services/webhdfs/writer.rs @@ -20,12 +20,11 @@ use http::StatusCode; use super::backend::WebhdfsBackend; use super::error::parse_error; -use crate::raw::oio::WriteBuf; use crate::raw::*; use crate::*; pub type WebhdfsWriters = - TwoWays, oio::AppendObjectWriter>; + TwoWays, oio::AppendObjectWriter>; pub struct WebhdfsWriter { backend: WebhdfsBackend, @@ -41,17 +40,41 @@ impl WebhdfsWriter { } #[async_trait] -impl oio::OneShotWrite for WebhdfsWriter { - /// Using `bytes` instead of `vectored_bytes` to allow request to be redirected. - async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> { - let bs = bs.bytes(bs.remaining()); - - let req = self.backend.webhdfs_create_object_request( - &self.path, - Some(bs.len()), - &self.op, - AsyncBody::Bytes(bs), - )?; +impl oio::BlockWrite for WebhdfsWriter { + async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> { + let req = self + .backend + .webhdfs_create_object_request(&self.path, Some(size), &self.op, body) + .await?; + + let resp = self.backend.client.send(req).await?; + + let status = resp.status(); + match status { + StatusCode::CREATED | StatusCode::OK => { + resp.into_body().consume().await?; + Ok(()) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn write_block(&self, size: u64, block_id: String, body: AsyncBody) -> Result<()> { + let Some(ref atomic_write_dir) = self.backend.atomic_write_dir else { + return Err(Error::new( + ErrorKind::Unsupported, + "write multi is not supported when atomic is not set", + )); + }; + let req = self + .backend + .webhdfs_create_object_request( + &format!("{}{}", atomic_write_dir, block_id), + Some(size), + &self.op, + body, + ) + .await?; let resp = self.backend.client.send(req).await?; @@ -64,6 +87,69 @@ impl oio::OneShotWrite for WebhdfsWriter { _ => Err(parse_error(resp).await?), } } + + async fn complete_block(&self, block_ids: Vec) -> Result<()> { + let Some(ref atomic_write_dir) = self.backend.atomic_write_dir else { + return Err(Error::new( + ErrorKind::Unsupported, + "write multi is not supported when atomic is not set", + )); + }; + let first_block_id = format!("{}{}", atomic_write_dir, block_ids[0].clone()); + if block_ids.len() >= 2 { + let sources: Vec = block_ids[1..] + .iter() + .map(|s| format!("{}{}", atomic_write_dir, s)) + .collect(); + // concat blocks + let req = self + .backend + .webhdfs_concat_request(&first_block_id, sources)?; + + let resp = self.backend.client.send(req).await?; + + let status = resp.status(); + + if status != StatusCode::OK { + return Err(parse_error(resp).await?); + } + } + // delete the path file + let resp = self.backend.webhdfs_delete(&self.path).await?; + let status = resp.status(); + if status != StatusCode::OK { + return Err(parse_error(resp).await?); + } + + // rename concat file to path + let resp = self + .backend + .webhdfs_rename_object(&first_block_id, &self.path) + .await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + resp.into_body().consume().await?; + Ok(()) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn abort_block(&self, block_ids: Vec) -> Result<()> { + for block_id in block_ids { + let resp = self.backend.webhdfs_delete(&block_id).await?; + match resp.status() { + StatusCode::OK => { + resp.into_body().consume().await?; + } + _ => return Err(parse_error(resp).await?), + } + } + Ok(()) + } } #[async_trait] @@ -84,12 +170,10 @@ impl oio::AppendObjectWrite for WebhdfsWriter { location = self.backend.webhdfs_init_append_request(&self.path).await?; } StatusCode::NOT_FOUND => { - let req = self.backend.webhdfs_create_object_request( - &self.path, - None, - &self.op, - AsyncBody::Empty, - )?; + let req = self + .backend + .webhdfs_create_object_request(&self.path, None, &self.op, AsyncBody::Empty) + .await?; let resp = self.backend.client.send(req).await?;