Skip to content

Commit

Permalink
feat(services/compfs): implement auxiliary functions (#4778)
Browse files Browse the repository at this point in the history
* fix reader

* builder

* implement aux fn's
  • Loading branch information
George-Miao authored Jun 20, 2024
1 parent a7b1a6c commit 8027430
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 10 deletions.
106 changes: 100 additions & 6 deletions core/src/services/compfs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use compio::{dispatcher::Dispatcher, fs::OpenOptions};

use super::{core::CompfsCore, lister::CompfsLister, reader::CompfsReader, writer::CompfsWriter};

use crate::raw::*;
use crate::*;

Expand All @@ -42,7 +45,7 @@ impl CompfsBuilder {

impl Builder for CompfsBuilder {
const SCHEME: Scheme = Scheme::Compfs;
type Accessor = ();
type Accessor = CompfsBackend;

fn from_map(map: HashMap<String, String>) -> Self {
let mut builder = CompfsBuilder::default();
Expand All @@ -53,7 +56,27 @@ impl Builder for CompfsBuilder {
}

fn build(&mut self) -> Result<Self::Accessor> {
todo!()
let root = match self.root.take() {
Some(root) => Ok(root),
None => Err(Error::new(
ErrorKind::ConfigInvalid,
"root is not specified",
)),
}?;
let dispatcher = Dispatcher::new().map_err(|_| {
Error::new(
ErrorKind::Unexpected,
"failed to initiate compio dispatcher",
)
})?;
let core = CompfsCore {
root,
dispatcher,
buf_pool: oio::PooledBuf::new(16),
};
Ok(CompfsBackend {
core: Arc::new(core),
})
}
}

Expand Down Expand Up @@ -89,16 +112,86 @@ impl Access for CompfsBackend {

copy: true,
rename: true,
blocking: true,

..Default::default()
});

am
}

async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
let path = self.core.prepare_path(path);

self.core
.exec(move || async move { compio::fs::create_dir_all(path).await })
.await?;

Ok(RpCreateDir::default())
}

async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
let path = self.core.prepare_path(path);

let meta = self
.core
.exec(move || async move { compio::fs::metadata(path).await })
.await?;
let ty = meta.file_type();
let mode = if ty.is_dir() {
EntryMode::DIR
} else if ty.is_file() {
EntryMode::FILE
} else {
EntryMode::Unknown
};
let last_mod = meta.modified().map_err(new_std_io_error)?.into();
let ret = Metadata::new(mode).with_last_modified(last_mod);

Ok(RpStat::new(ret))
}

async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
let path = self.core.prepare_path(path);

self.core
.exec(move || async move { compio::fs::remove_file(path).await })
.await?;

Ok(RpDelete::default())
}

async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result<RpCopy> {
let from = self.core.prepare_path(from);
let to = self.core.prepare_path(to);

self.core
.exec(move || async move {
let from = OpenOptions::new().read(true).open(from).await?;
let to = OpenOptions::new().write(true).create(true).open(to).await?;

let (mut from, mut to) = (Cursor::new(from), Cursor::new(to));
compio::io::copy(&mut from, &mut to).await?;

Ok(())
})
.await?;

Ok(RpCopy::default())
}

async fn rename(&self, from: &str, to: &str, _: OpRename) -> Result<RpRename> {
let from = self.core.prepare_path(from);
let to = self.core.prepare_path(to);

self.core
.exec(move || async move { compio::fs::rename(from, to).await })
.await?;

Ok(RpRename::default())
}

async fn read(&self, path: &str, op: OpRead) -> Result<(RpRead, Self::Reader)> {
let path = self.core.root.join(path.trim_end_matches('/'));
let path = self.core.prepare_path(path);

let file = self
.core
Expand All @@ -110,7 +203,7 @@ impl Access for CompfsBackend {
}

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let path = self.core.root.join(path.trim_end_matches('/'));
let path = self.core.prepare_path(path);
let append = args.append();
let file = self
.core
Expand All @@ -130,7 +223,8 @@ impl Access for CompfsBackend {
}

async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
let path = self.core.root.join(path.trim_end_matches('/'));
let path = self.core.prepare_path(path);

let read_dir = match self
.core
.exec_blocking(move || std::fs::read_dir(path))
Expand Down
4 changes: 4 additions & 0 deletions core/src/services/compfs/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ pub(super) struct CompfsCore {
}

impl CompfsCore {
pub fn prepare_path(&self, path: &str) -> PathBuf {
self.root.join(path.trim_end_matches('/'))
}

pub async fn exec<Fn, Fut, R>(&self, f: Fn) -> crate::Result<R>
where
Fn: FnOnce() -> Fut + Send + 'static,
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/compfs/lister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub struct CompfsLister {
}

impl CompfsLister {
pub fn new(core: Arc<CompfsCore>, read_dir: ReadDir) -> Self {
pub(super) fn new(core: Arc<CompfsCore>, read_dir: ReadDir) -> Self {
Self {
core,
read_dir: Some(read_dir),
Expand Down
5 changes: 3 additions & 2 deletions core/src/services/compfs/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub struct CompfsReader {
}

impl CompfsReader {
pub fn new(core: Arc<CompfsCore>, file: compio::fs::File, range: BytesRange) -> Self {
pub(super) fn new(core: Arc<CompfsCore>, file: compio::fs::File, range: BytesRange) -> Self {
Self { core, file, range }
}
}
Expand All @@ -40,13 +40,14 @@ impl oio::Read for CompfsReader {
async fn read(&mut self) -> Result<Buffer> {
let mut bs = self.core.buf_pool.get();

let pos = self.range.offset();
let len = self.range.size().expect("range size is always Some");
bs.reserve(len as _);
let f = self.file.clone();
let mut bs = self
.core
.exec(move || async move {
let (_, bs) = buf_try!(@try f.read_at(bs, len).await);
let (_, bs) = buf_try!(@try f.read_at(bs, pos).await);
Ok(bs)
})
.await?;
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/compfs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub struct CompfsWriter {
}

impl CompfsWriter {
pub fn new(core: Arc<CompfsCore>, file: Cursor<File>) -> Self {
pub(super) fn new(core: Arc<CompfsCore>, file: Cursor<File>) -> Self {
Self { core, file }
}
}
Expand Down

0 comments on commit 8027430

Please sign in to comment.