diff --git a/core/src/services/hdfs_native/backend.rs b/core/src/services/hdfs_native/backend.rs index 48ef1caae699..c1f10bcebc11 100644 --- a/core/src/services/hdfs_native/backend.rs +++ b/core/src/services/hdfs_native/backend.rs @@ -131,7 +131,7 @@ impl Builder for HdfsNativeBuilder { Some(v) => v, None => { return Err(Error::new(ErrorKind::ConfigInvalid, "url is empty") - .with_context("service", Scheme::HdfsNative)) + .with_context("service", Scheme::HdfsNative)); } }; @@ -181,7 +181,30 @@ impl Accessor for HdfsNativeBackend { type BlockingLister = (); fn info(&self) -> AccessorInfo { - todo!() + let mut am = AccessorInfo::default(); + am.set_scheme(Scheme::HdfsNative) + .set_root(&self.root) + .set_native_capability(Capability { + stat: true, + + read: true, + read_can_seek: true, + + write: true, + write_can_append: self._enable_append, + + create_dir: true, + delete: true, + + list: true, + + rename: true, + blocking: true, + + ..Default::default() + }); + + am } async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result { @@ -218,20 +241,52 @@ impl Accessor for HdfsNativeBackend { Ok((RpWrite::new(), w)) } - async fn copy(&self, _from: &str, _to: &str, _args: OpCopy) -> Result { - todo!() - } + async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result { + let from_path = build_rooted_abs_path(&self.root, from); + let to_path = build_rooted_abs_path(&self.root, to); - async fn rename(&self, _from: &str, _to: &str, _args: OpRename) -> Result { - todo!() + self.client + .rename(&from_path, &to_path, false) + .await + .map_err(parse_hdfs_error)?; + + Ok(RpRename::default()) } - async fn stat(&self, _path: &str, _args: OpStat) -> Result { - todo!() + async fn stat(&self, path: &str, _args: OpStat) -> Result { + let p = build_rooted_abs_path(&self.root, path); + + let status: hdfs_native::client::FileStatus = self + .client + .get_file_info(&p) + .await + .map_err(parse_hdfs_error)?; + + let mode = if status.isdir { + EntryMode::DIR + } else { + EntryMode::FILE + }; + + let mut metadata = Metadata::new(mode); + metadata + .set_last_modified(parse_datetime_from_from_timestamp_millis( + status.modification_time as i64, + )?) + .set_content_length(status.length as u64); + + Ok(RpStat::new(metadata)) } - async fn delete(&self, _path: &str, _args: OpDelete) -> Result { - todo!() + async fn delete(&self, path: &str, _args: OpDelete) -> Result { + let p = build_rooted_abs_path(&self.root, path); + + self.client + .delete(&p, true) + .await + .map_err(parse_hdfs_error)?; + + Ok(RpDelete::default()) } async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> { diff --git a/core/src/services/hdfs_native/docs.md b/core/src/services/hdfs_native/docs.md index 7d720d49865a..4f7e245b9d82 100644 --- a/core/src/services/hdfs_native/docs.md +++ b/core/src/services/hdfs_native/docs.md @@ -1 +1,35 @@ A distributed file system that provides high-throughput access to application data. +Using [Native Rust HDFS client](https://github.com/Kimahriman/hdfs-native). + +## Capabilities + +This service can be used to: + +- [x] stat +- [x] read +- [x] write +- [x] create_dir +- [x] delete +- [x] rename +- [x] list +- [x] blocking +- [x] append + +## Differences with webhdfs + +[Webhdfs][crate::services::Webhdfs] is powered by hdfs's RESTful HTTP API. + +## Differences with hdfs + +[hdfs][crate::services::Hdfs] is powered by libhdfs and require the Java dependencies + +## Features + +HDFS-native support needs to enable feature `services-hdfs-native`. + +## Configuration + +- `root`: Set the work dir for backend. +- `url`: Set the url for backend. +- `enable_append`: enable the append capacity. Default is false. +