diff --git a/bin/ofs/src/bin/ofs.rs b/bin/ofs/src/bin/ofs.rs index 08399b56ef22..89a619928724 100644 --- a/bin/ofs/src/bin/ofs.rs +++ b/bin/ofs/src/bin/ofs.rs @@ -15,69 +15,13 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; -use std::str::FromStr; - -use anyhow::anyhow; -use anyhow::Context; use anyhow::Result; use clap::Parser; -use fuse3::path::Session; -use fuse3::MountOptions; -use ofs::Ofs; -use opendal::Operator; -use opendal::Scheme; -use url::Url; #[tokio::main] async fn main() -> Result<()> { - env_logger::init(); - fuse().await -} - -#[derive(Parser, Debug)] -#[command(version, about)] -struct Config { - /// fuse mount path - #[arg(env = "OFS_MOUNT_PATH", index = 1)] - mount_path: String, - - /// location of opendal service - /// format: ://?=&= - /// example: fs://root=/tmp - #[arg(env = "OFS_BACKEND", index = 2)] - backend: String, -} - -async fn fuse() -> Result<()> { - let cfg = Config::try_parse().context("parse command line arguments")?; - - let location = Url::parse(&cfg.backend)?; - if location.has_host() { - Err(anyhow!("Host part in a location is not supported."))?; - } + let cfg = ofs::Config::parse(); - let scheme_str = location.scheme(); - - let op_args = location - .query_pairs() - .into_owned() - .collect::>(); - - let scheme = Scheme::from_str(scheme_str).context("unsupported scheme")?; - let op = Operator::via_map(scheme, op_args)?; - - let mut mount_option = MountOptions::default(); - mount_option.uid(nix::unistd::getuid().into()); - mount_option.gid(nix::unistd::getgid().into()); - - let ofs = Ofs { op }; - - let mounthandle = Session::new(mount_option) - .mount_with_unprivileged(ofs, cfg.mount_path) - .await?; - - mounthandle.await?; - - Ok(()) + env_logger::init(); + ofs::new_app(cfg).await } diff --git a/bin/ofs/src/config.rs b/bin/ofs/src/config.rs new file mode 100644 index 000000000000..6afa729a79ef --- /dev/null +++ b/bin/ofs/src/config.rs @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use clap::Parser; +use url::Url; + +#[derive(Parser, Debug)] +#[command(version, about)] +pub struct Config { + /// fuse mount path + #[arg(env = "OFS_MOUNT_PATH", index = 1)] + pub(crate) mount_path: String, + + /// location of opendal service + /// format: ://?=&= + /// example: fs://?root=/tmp + #[arg(env = "OFS_BACKEND", index = 2)] + pub(crate) backend: Url, +} diff --git a/bin/ofs/src/frontend/fuse.rs b/bin/ofs/src/frontend/fuse.rs new file mode 100644 index 000000000000..090f12a7cd89 --- /dev/null +++ b/bin/ofs/src/frontend/fuse.rs @@ -0,0 +1,229 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::ffi::OsStr; +use std::vec::IntoIter; + +use async_trait::async_trait; +use fuse3::path::prelude::*; +use fuse3::Result; +use futures_util::stream::Empty; +use futures_util::stream::Iter; +use opendal::Operator; + +pub(super) struct Ofs { + op: Operator, +} + +impl Ofs { + pub fn new(op: Operator) -> Self { + Self { op } + } +} + +#[async_trait] +impl PathFilesystem for Ofs { + type DirEntryStream = Empty>; + type DirEntryPlusStream = Iter>>; + + // Init a fuse filesystem + async fn init(&self, _req: Request) -> Result<()> { + Ok(()) + } + + // Callback when fs is being destroyed + async fn destroy(&self, _req: Request) {} + + async fn lookup(&self, _req: Request, _parent: &OsStr, _name: &OsStr) -> Result { + // TODO + Err(libc::ENOSYS.into()) + } + + async fn getattr( + &self, + _req: Request, + path: Option<&OsStr>, + _fh: Option, + _flags: u32, + ) -> Result { + // TODO + log::debug!("getattr(path={:?})", path); + + Err(libc::ENOSYS.into()) + } + + async fn read( + &self, + _req: Request, + path: Option<&OsStr>, + fh: u64, + offset: u64, + size: u32, + ) -> Result { + // TODO + log::debug!( + "read(path={:?}, fh={}, offset={}, size={})", + path, + fh, + offset, + size + ); + + Err(libc::ENOSYS.into()) + } + + async fn mkdir( + &self, + _req: Request, + parent: &OsStr, + name: &OsStr, + mode: u32, + _umask: u32, + ) -> Result { + // TODO + log::debug!( + "mkdir(parent={:?}, name={:?}, mode=0o{:o})", + parent, + name, + mode + ); + + Err(libc::ENOSYS.into()) + } + + async fn readdir( + &self, + _req: Request, + path: &OsStr, + fh: u64, + offset: i64, + ) -> Result> { + // TODO + log::debug!("readdir(path={:?}, fh={}, offset={})", path, fh, offset); + + Err(libc::ENOSYS.into()) + } + + async fn mknod( + &self, + _req: Request, + parent: &OsStr, + name: &OsStr, + mode: u32, + _rdev: u32, + ) -> Result { + // TODO + log::debug!( + "mknod(parent={:?}, name={:?}, mode=0o{:o})", + parent, + name, + mode + ); + + Err(libc::ENOSYS.into()) + } + + async fn open(&self, _req: Request, path: &OsStr, flags: u32) -> Result { + // TODO + log::debug!("open(path={:?}, flags=0x{:x})", path, flags); + + Err(libc::ENOSYS.into()) + } + + async fn setattr( + &self, + _req: Request, + path: Option<&OsStr>, + _fh: Option, + _set_attr: SetAttr, + ) -> Result { + // TODO + log::debug!("setattr(path={:?})", path); + + Err(libc::ENOSYS.into()) + } + + async fn write( + &self, + _req: Request, + path: Option<&OsStr>, + fh: u64, + offset: u64, + data: &[u8], + flags: u32, + ) -> Result { + // TODO + log::debug!( + "write(path={:?}, fh={}, offset={}, len={}, flags=0x{:x})", + path, + fh, + offset, + data.len(), + flags + ); + + Err(libc::ENOSYS.into()) + } + + async fn release( + &self, + _req: Request, + path: Option<&OsStr>, + fh: u64, + flags: u32, + _lock_owner: u64, + flush: bool, + ) -> Result<()> { + // TODO + log::debug!( + "release(path={:?}, fh={}, flags={}, flush={})", + path, + fh, + flags, + flush + ); + + Err(libc::ENOSYS.into()) + } + + async fn rename( + &self, + _req: Request, + origin_parent: &OsStr, + origin_name: &OsStr, + parent: &OsStr, + name: &OsStr, + ) -> Result<()> { + // TODO + log::debug!( + "rename(p={:?}, name={:?}, newp={:?}, newname={:?})", + origin_parent, + origin_name, + parent, + name + ); + + Err(libc::ENOSYS.into()) + } + + async fn unlink(&self, _req: Request, parent: &OsStr, name: &OsStr) -> Result<()> { + // TODO + log::debug!("unlink(parent={:?}, name={:?})", parent, name); + + Err(libc::ENOSYS.into()) + } +} diff --git a/bin/ofs/src/frontend/mod.rs b/bin/ofs/src/frontend/mod.rs new file mode 100644 index 000000000000..7ea87ec29875 --- /dev/null +++ b/bin/ofs/src/frontend/mod.rs @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use anyhow::Result; +use opendal::Operator; + +#[cfg(target_os = "linux")] +mod fuse; + +pub(crate) struct FrontendArgs { + pub mount_path: String, + pub backend: Operator, +} + +pub(crate) struct Frontend; + +impl Frontend { + #[cfg(any(not(target_os = "linux")))] + pub async fn execute(_: FrontendArgs) -> Result<()> { + Err(anyhow::anyhow!("platform not supported")) + } + + #[cfg(target_os = "linux")] + pub async fn execute(args: FrontendArgs) -> Result<()> { + use fuse3::path::Session; + use fuse3::MountOptions; + + let mut mount_option = MountOptions::default(); + mount_option.uid(nix::unistd::getuid().into()); + mount_option.gid(nix::unistd::getgid().into()); + + let ofs = fuse::Ofs::new(args.backend); + + let mount_handle = Session::new(mount_option) + .mount_with_unprivileged(ofs, args.mount_path) + .await?; + + mount_handle.await?; + + Ok(()) + } +} diff --git a/bin/ofs/src/lib.rs b/bin/ofs/src/lib.rs index 3fc0cadae6b5..d7972eeaf6a0 100644 --- a/bin/ofs/src/lib.rs +++ b/bin/ofs/src/lib.rs @@ -15,209 +15,42 @@ // specific language governing permissions and limitations // under the License. -use std::ffi::OsStr; -use std::vec::IntoIter; +use std::collections::HashMap; +use std::str::FromStr; -use async_trait::async_trait; -use fuse3::path::prelude::*; -use fuse3::Result; -use futures_util::stream::Empty; -use futures_util::stream::Iter; +use anyhow::anyhow; +use anyhow::Result; +use frontend::Frontend; +use frontend::FrontendArgs; use opendal::Operator; +use opendal::Scheme; -pub struct Ofs { - pub op: Operator, -} - -#[async_trait] -impl PathFilesystem for Ofs { - type DirEntryStream = Empty>; - type DirEntryPlusStream = Iter>>; - - // Init a fuse filesystem - async fn init(&self, _req: Request) -> Result<()> { - Ok(()) - } - - // Callback when fs is being destroyed - async fn destroy(&self, _req: Request) {} - - async fn lookup(&self, _req: Request, _parent: &OsStr, _name: &OsStr) -> Result { - // TODO - Err(libc::ENOSYS.into()) - } - - async fn getattr( - &self, - _req: Request, - path: Option<&OsStr>, - _fh: Option, - _flags: u32, - ) -> Result { - // TODO - log::debug!("getattr(path={:?})", path); - - Err(libc::ENOSYS.into()) - } - - async fn read( - &self, - _req: Request, - path: Option<&OsStr>, - fh: u64, - offset: u64, - size: u32, - ) -> Result { - // TODO - log::debug!( - "read(path={:?}, fh={}, offset={}, size={})", - path, - fh, - offset, - size - ); - - Err(libc::ENOSYS.into()) - } - - async fn mkdir( - &self, - _req: Request, - parent: &OsStr, - name: &OsStr, - mode: u32, - _umask: u32, - ) -> Result { - // TODO - log::debug!( - "mkdir(parent={:?}, name={:?}, mode=0o{:o})", - parent, - name, - mode - ); - - Err(libc::ENOSYS.into()) - } - - async fn readdir( - &self, - _req: Request, - path: &OsStr, - fh: u64, - offset: i64, - ) -> Result> { - // TODO - log::debug!("readdir(path={:?}, fh={}, offset={})", path, fh, offset); +pub mod config; +pub use config::Config; - Err(libc::ENOSYS.into()) - } - - async fn mknod( - &self, - _req: Request, - parent: &OsStr, - name: &OsStr, - mode: u32, - _rdev: u32, - ) -> Result { - // TODO - log::debug!( - "mknod(parent={:?}, name={:?}, mode=0o{:o})", - parent, - name, - mode - ); - - Err(libc::ENOSYS.into()) - } - - async fn open(&self, _req: Request, path: &OsStr, flags: u32) -> Result { - // TODO - log::debug!("open(path={:?}, flags=0x{:x})", path, flags); +mod frontend; - Err(libc::ENOSYS.into()) +pub async fn new_app(cfg: Config) -> Result<()> { + if cfg.backend.has_host() { + log::warn!("backend host will be ignored"); } - async fn setattr( - &self, - _req: Request, - path: Option<&OsStr>, - _fh: Option, - _set_attr: SetAttr, - ) -> Result { - // TODO - log::debug!("setattr(path={:?})", path); + let scheme_str = cfg.backend.scheme(); + let op_args = cfg + .backend + .query_pairs() + .into_owned() + .collect::>(); - Err(libc::ENOSYS.into()) - } - - async fn write( - &self, - _req: Request, - path: Option<&OsStr>, - fh: u64, - offset: u64, - data: &[u8], - flags: u32, - ) -> Result { - // TODO - log::debug!( - "write(path={:?}, fh={}, offset={}, len={}, flags=0x{:x})", - path, - fh, - offset, - data.len(), - flags - ); - - Err(libc::ENOSYS.into()) - } + let scheme = match Scheme::from_str(scheme_str) { + Ok(Scheme::Custom(_)) | Err(_) => Err(anyhow!("invalid scheme: {}", scheme_str)), + Ok(s) => Ok(s), + }?; + let backend = Operator::via_map(scheme, op_args)?; - async fn release( - &self, - _req: Request, - path: Option<&OsStr>, - fh: u64, - flags: u32, - _lock_owner: u64, - flush: bool, - ) -> Result<()> { - // TODO - log::debug!( - "release(path={:?}, fh={}, flags={}, flush={})", - path, - fh, - flags, - flush - ); - - Err(libc::ENOSYS.into()) - } - - async fn rename( - &self, - _req: Request, - origin_parent: &OsStr, - origin_name: &OsStr, - parent: &OsStr, - name: &OsStr, - ) -> Result<()> { - // TODO - log::debug!( - "rename(p={:?}, name={:?}, newp={:?}, newname={:?})", - origin_parent, - origin_name, - parent, - name - ); - - Err(libc::ENOSYS.into()) - } - - async fn unlink(&self, _req: Request, parent: &OsStr, name: &OsStr) -> Result<()> { - // TODO - log::debug!("unlink(parent={:?}, name={:?})", parent, name); - - Err(libc::ENOSYS.into()) - } + let args = FrontendArgs { + mount_path: cfg.mount_path, + backend, + }; + Frontend::execute(args).await }