From 8027430dba32cc57311e30bd925853a1c94f8ae2 Mon Sep 17 00:00:00 2001 From: Pop Date: Thu, 20 Jun 2024 19:24:54 +0900 Subject: [PATCH] feat(services/compfs): implement auxiliary functions (#4778) * fix reader * builder * implement aux fn's --- core/src/services/compfs/backend.rs | 106 ++++++++++++++++++++++++++-- core/src/services/compfs/core.rs | 4 ++ core/src/services/compfs/lister.rs | 2 +- core/src/services/compfs/reader.rs | 5 +- core/src/services/compfs/writer.rs | 2 +- 5 files changed, 109 insertions(+), 10 deletions(-) diff --git a/core/src/services/compfs/backend.rs b/core/src/services/compfs/backend.rs index 65bd189b683a..9926e04f977f 100644 --- a/core/src/services/compfs/backend.rs +++ b/core/src/services/compfs/backend.rs @@ -15,7 +15,10 @@ // specific language governing permissions and limitations // under the License. +use compio::{dispatcher::Dispatcher, fs::OpenOptions}; + use super::{core::CompfsCore, lister::CompfsLister, reader::CompfsReader, writer::CompfsWriter}; + use crate::raw::*; use crate::*; @@ -42,7 +45,7 @@ impl CompfsBuilder { impl Builder for CompfsBuilder { const SCHEME: Scheme = Scheme::Compfs; - type Accessor = (); + type Accessor = CompfsBackend; fn from_map(map: HashMap) -> Self { let mut builder = CompfsBuilder::default(); @@ -53,7 +56,27 @@ impl Builder for CompfsBuilder { } fn build(&mut self) -> Result { - todo!() + let root = match self.root.take() { + Some(root) => Ok(root), + None => Err(Error::new( + ErrorKind::ConfigInvalid, + "root is not specified", + )), + }?; + let dispatcher = Dispatcher::new().map_err(|_| { + Error::new( + ErrorKind::Unexpected, + "failed to initiate compio dispatcher", + ) + })?; + let core = CompfsCore { + root, + dispatcher, + buf_pool: oio::PooledBuf::new(16), + }; + Ok(CompfsBackend { + core: Arc::new(core), + }) } } @@ -89,7 +112,6 @@ impl Access for CompfsBackend { copy: true, rename: true, - blocking: true, ..Default::default() }); @@ -97,8 +119,79 @@ impl Access for CompfsBackend { am } + async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result { + let path = self.core.prepare_path(path); + + self.core + .exec(move || async move { compio::fs::create_dir_all(path).await }) + .await?; + + Ok(RpCreateDir::default()) + } + + async fn stat(&self, path: &str, _: OpStat) -> Result { + let path = self.core.prepare_path(path); + + let meta = self + .core + .exec(move || async move { compio::fs::metadata(path).await }) + .await?; + let ty = meta.file_type(); + let mode = if ty.is_dir() { + EntryMode::DIR + } else if ty.is_file() { + EntryMode::FILE + } else { + EntryMode::Unknown + }; + let last_mod = meta.modified().map_err(new_std_io_error)?.into(); + let ret = Metadata::new(mode).with_last_modified(last_mod); + + Ok(RpStat::new(ret)) + } + + async fn delete(&self, path: &str, _: OpDelete) -> Result { + let path = self.core.prepare_path(path); + + self.core + .exec(move || async move { compio::fs::remove_file(path).await }) + .await?; + + Ok(RpDelete::default()) + } + + async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result { + let from = self.core.prepare_path(from); + let to = self.core.prepare_path(to); + + self.core + .exec(move || async move { + let from = OpenOptions::new().read(true).open(from).await?; + let to = OpenOptions::new().write(true).create(true).open(to).await?; + + let (mut from, mut to) = (Cursor::new(from), Cursor::new(to)); + compio::io::copy(&mut from, &mut to).await?; + + Ok(()) + }) + .await?; + + Ok(RpCopy::default()) + } + + async fn rename(&self, from: &str, to: &str, _: OpRename) -> Result { + let from = self.core.prepare_path(from); + let to = self.core.prepare_path(to); + + self.core + .exec(move || async move { compio::fs::rename(from, to).await }) + .await?; + + Ok(RpRename::default()) + } + async fn read(&self, path: &str, op: OpRead) -> Result<(RpRead, Self::Reader)> { - let path = self.core.root.join(path.trim_end_matches('/')); + let path = self.core.prepare_path(path); let file = self .core @@ -110,7 +203,7 @@ impl Access for CompfsBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - let path = self.core.root.join(path.trim_end_matches('/')); + let path = self.core.prepare_path(path); let append = args.append(); let file = self .core @@ -130,7 +223,8 @@ impl Access for CompfsBackend { } async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> { - let path = self.core.root.join(path.trim_end_matches('/')); + let path = self.core.prepare_path(path); + let read_dir = match self .core .exec_blocking(move || std::fs::read_dir(path)) diff --git a/core/src/services/compfs/core.rs b/core/src/services/compfs/core.rs index 60c68106f154..e600edbe7d2e 100644 --- a/core/src/services/compfs/core.rs +++ b/core/src/services/compfs/core.rs @@ -44,6 +44,10 @@ pub(super) struct CompfsCore { } impl CompfsCore { + pub fn prepare_path(&self, path: &str) -> PathBuf { + self.root.join(path.trim_end_matches('/')) + } + pub async fn exec(&self, f: Fn) -> crate::Result where Fn: FnOnce() -> Fut + Send + 'static, diff --git a/core/src/services/compfs/lister.rs b/core/src/services/compfs/lister.rs index 7a380d48b71b..ac12fc7820ef 100644 --- a/core/src/services/compfs/lister.rs +++ b/core/src/services/compfs/lister.rs @@ -28,7 +28,7 @@ pub struct CompfsLister { } impl CompfsLister { - pub fn new(core: Arc, read_dir: ReadDir) -> Self { + pub(super) fn new(core: Arc, read_dir: ReadDir) -> Self { Self { core, read_dir: Some(read_dir), diff --git a/core/src/services/compfs/reader.rs b/core/src/services/compfs/reader.rs index 758c110894db..421efd8af111 100644 --- a/core/src/services/compfs/reader.rs +++ b/core/src/services/compfs/reader.rs @@ -31,7 +31,7 @@ pub struct CompfsReader { } impl CompfsReader { - pub fn new(core: Arc, file: compio::fs::File, range: BytesRange) -> Self { + pub(super) fn new(core: Arc, file: compio::fs::File, range: BytesRange) -> Self { Self { core, file, range } } } @@ -40,13 +40,14 @@ impl oio::Read for CompfsReader { async fn read(&mut self) -> Result { let mut bs = self.core.buf_pool.get(); + let pos = self.range.offset(); let len = self.range.size().expect("range size is always Some"); bs.reserve(len as _); let f = self.file.clone(); let mut bs = self .core .exec(move || async move { - let (_, bs) = buf_try!(@try f.read_at(bs, len).await); + let (_, bs) = buf_try!(@try f.read_at(bs, pos).await); Ok(bs) }) .await?; diff --git a/core/src/services/compfs/writer.rs b/core/src/services/compfs/writer.rs index 95a97519104c..d46c1b69b7c3 100644 --- a/core/src/services/compfs/writer.rs +++ b/core/src/services/compfs/writer.rs @@ -30,7 +30,7 @@ pub struct CompfsWriter { } impl CompfsWriter { - pub fn new(core: Arc, file: Cursor) -> Self { + pub(super) fn new(core: Arc, file: Cursor) -> Self { Self { core, file } } }