From e1e430d3f342619ce2f16fe4c75212f6b8ec6d33 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 13 Jun 2024 00:54:30 +0800 Subject: [PATCH] refactor(ofs): Split fuse3 impl into fuse3_opendal (#4721) --- bin/ofs/Cargo.lock | 15 + bin/ofs/Cargo.toml | 3 +- bin/ofs/src/fuse/mod.rs | 134 --------- bin/ofs/src/{bin/ofs.rs => main.rs} | 32 +- bin/ofs/tests/common/mod.rs | 27 +- integrations/fuse3/.gitignore | 1 + integrations/fuse3/Cargo.toml | 40 +++ integrations/fuse3/src/file.rs | 66 +++++ .../fuse3/src/file_system.rs | 279 ++++++++---------- {bin/ofs => integrations/fuse3}/src/lib.rs | 5 +- 10 files changed, 297 insertions(+), 305 deletions(-) delete mode 100644 bin/ofs/src/fuse/mod.rs rename bin/ofs/src/{bin/ofs.rs => main.rs} (78%) create mode 100644 integrations/fuse3/.gitignore create mode 100644 integrations/fuse3/Cargo.toml create mode 100644 integrations/fuse3/src/file.rs rename bin/ofs/src/fuse/adapter.rs => integrations/fuse3/src/file_system.rs (96%) rename {bin/ofs => integrations/fuse3}/src/lib.rs (91%) diff --git a/bin/ofs/Cargo.lock b/bin/ofs/Cargo.lock index c50168f26250..4b5636cfbe32 100644 --- a/bin/ofs/Cargo.lock +++ b/bin/ofs/Cargo.lock @@ -488,6 +488,20 @@ dependencies = [ "which", ] +[[package]] +name = "fuse3_opendal" +version = "0.1.0" +dependencies = [ + "bytes", + "fuse3", + "futures-util", + "libc", + "log", + "opendal", + "sharded-slab", + "tokio", +] + [[package]] name = "futures" version = "0.3.30" @@ -936,6 +950,7 @@ dependencies = [ "clap", "env_logger", "fuse3", + "fuse3_opendal", "futures-util", "libc", "log", diff --git a/bin/ofs/Cargo.toml b/bin/ofs/Cargo.toml index 0903093fc841..3f831453a347 100644 --- a/bin/ofs/Cargo.toml +++ b/bin/ofs/Cargo.toml @@ -36,6 +36,7 @@ chrono = "0.4.34" clap = { version = "4.5.1", features = ["derive", "env"] } env_logger = "0.11.2" fuse3 = { "version" = "0.7.2", "features" = ["tokio-runtime", "unprivileged"] } +fuse3_opendal = { version = "0.1.0", path = "../../integrations/fuse3" } futures-util = "0.3.30" libc = "0.2.154" log = "0.4.21" @@ -57,9 +58,9 @@ services-fs = ["opendal/services-fs"] services-s3 = ["opendal/services-s3"] [dev-dependencies] +opendal = { version = "0.47.0", path = "../../core", features = ["tests"] } tempfile = "3.10.1" test-context = "0.3.0" urlencoding = "2.1.3" uuid = "1.7.0" walkdir = "2.5.0" -opendal = { version = "0.47.0", path = "../../core", features = ["tests"] } diff --git a/bin/ofs/src/fuse/mod.rs b/bin/ofs/src/fuse/mod.rs deleted file mode 100644 index a4998602d552..000000000000 --- a/bin/ofs/src/fuse/mod.rs +++ /dev/null @@ -1,134 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -mod adapter; - -use std::io; -use std::path::Path; - -use adapter::FuseAdapter; -use fuse3::path::Session; -pub use fuse3::raw::MountHandle; -use fuse3::MountOptions; -use opendal::Operator; - -/// Ofs fuse filesystem mounter. -#[derive(Debug, Clone)] -pub struct Fuse { - mount_options: MountOptions, - uid: Option, - gid: Option, -} - -impl Fuse { - pub fn new() -> Self { - Fuse::default() - } - - /// Set fuse filesystem mount user_id, default is current uid. - pub fn uid(mut self, uid: u32) -> Self { - self.uid.replace(uid); - self.mount_options.uid(uid); - self - } - - /// Set fuse filesystem mount group_id, default is current gid. - pub fn gid(mut self, gid: u32) -> Self { - self.gid.replace(gid); - self.mount_options.gid(gid); - self - } - - /// Set fuse filesystem name, default is __OpenDAL Filesystem__. - pub fn fs_name(mut self, name: impl Into) -> Self { - self.mount_options.fs_name(name); - self - } - - /// Set fuse filesystem `allow_root` mount option, default is disable. - pub fn allow_root(mut self, allow_root: bool) -> Self { - self.mount_options.allow_root(allow_root); - self - } - - /// Set fuse filesystem allow_other mount option, default is disable. - pub fn allow_other(mut self, allow_other: bool) -> Self { - self.mount_options.allow_other(allow_other); - self - } - - /// Set fuse filesystem `ro` mount option, default is disable. - pub fn read_only(mut self, read_only: bool) -> Self { - self.mount_options.read_only(read_only); - self - } - - /// Allow fuse filesystem mount on a non-empty directory, default is not allowed. - pub fn allow_non_empty(mut self, allow_non_empty: bool) -> Self { - self.mount_options.nonempty(allow_non_empty); - self - } - - /// Mount the filesystem with root permission. - pub async fn mount( - self, - mount_point: impl AsRef, - op: Operator, - ) -> io::Result { - let adapter = FuseAdapter::new( - op, - self.uid.unwrap_or_else(|| nix::unistd::getuid().into()), - self.gid.unwrap_or_else(|| nix::unistd::getgid().into()), - ); - Session::new(self.mount_options) - .mount(adapter, mount_point) - .await - } - - /// Mount the filesystem without root permission. - pub async fn mount_with_unprivileged( - self, - mount_point: impl AsRef, - op: Operator, - ) -> io::Result { - log::warn!("unprivileged mount may not detect external unmount, tracking issue: https://github.com/Sherlock-Holo/fuse3/issues/72"); - - let adapter = FuseAdapter::new( - op, - self.uid.unwrap_or_else(|| nix::unistd::getuid().into()), - self.gid.unwrap_or_else(|| nix::unistd::getgid().into()), - ); - Session::new(self.mount_options) - .mount_with_unprivileged(adapter, mount_point) - .await - } -} - -impl Default for Fuse { - fn default() -> Self { - let mut mount_options = MountOptions::default(); - mount_options - .fs_name("OpenDAL Filesystem") - .no_open_dir_support(true); - - Self { - mount_options, - uid: None, - gid: None, - } - } -} diff --git a/bin/ofs/src/bin/ofs.rs b/bin/ofs/src/main.rs similarity index 78% rename from bin/ofs/src/bin/ofs.rs rename to bin/ofs/src/main.rs index 0a9c862cd153..04e3f670baef 100644 --- a/bin/ofs/src/bin/ofs.rs +++ b/bin/ofs/src/main.rs @@ -17,6 +17,8 @@ use anyhow::Result; use clap::Parser; +use fuse3::path::Session; +use fuse3::MountOptions; use url::Url; #[derive(Parser, Debug)] @@ -48,7 +50,6 @@ async fn execute(cfg: Config) -> Result<()> { use std::str::FromStr; use anyhow::anyhow; - use ofs::fuse::Fuse; use opendal::Operator; use opendal::Scheme; @@ -69,25 +70,38 @@ async fn execute(cfg: Config) -> Result<()> { }?; let backend = Operator::via_map(scheme, op_args)?; + let mut mount_options = MountOptions::default(); + let mut gid = nix::unistd::getgid().into(); + mount_options.gid(gid); + let mut uid = nix::unistd::getuid().into(); + mount_options.uid(uid); + #[cfg(target_os = "linux")] let mut mount_handle = if nix::unistd::getuid().is_root() { - let mut fuse = Fuse::new(); - if let Some(gid) = env::var("SUDO_GID") + if let Some(sudo_gid) = env::var("SUDO_GID") .ok() .and_then(|gid_str| gid_str.parse::().ok()) { - fuse = fuse.gid(gid); + mount_options.gid(sudo_gid); + gid = sudo_gid; } - if let Some(uid) = env::var("SUDO_UID") + + if let Some(sudo_uid) = env::var("SUDO_UID") .ok() .and_then(|gid_str| gid_str.parse::().ok()) { - fuse = fuse.uid(uid); + mount_options.uid(uid); + uid = sudo_uid; } - fuse.mount(cfg.mount_path, backend).await? + + let fs = fuse3_opendal::Filesystem::new(backend, uid, gid); + Session::new(mount_options) + .mount(fs, cfg.mount_path) + .await? } else { - Fuse::new() - .mount_with_unprivileged(cfg.mount_path, backend) + let fs = fuse3_opendal::Filesystem::new(backend, uid, gid); + Session::new(mount_options) + .mount_with_unprivileged(fs, cfg.mount_path) .await? }; diff --git a/bin/ofs/tests/common/mod.rs b/bin/ofs/tests/common/mod.rs index b78a0d2a3f31..394d10622b5a 100644 --- a/bin/ofs/tests/common/mod.rs +++ b/bin/ofs/tests/common/mod.rs @@ -15,8 +15,11 @@ // specific language governing permissions and limitations // under the License. +use fuse3::path::Session; +use fuse3::MountOptions; use std::sync::OnceLock; +use fuse3::raw::MountHandle; use opendal::raw::tests; use opendal::Capability; use tempfile::TempDir; @@ -30,7 +33,7 @@ static RUNTIME: OnceLock = OnceLock::new(); pub struct OfsTestContext { pub mount_point: TempDir, pub capability: Capability, - mount_handle: ofs::fuse::MountHandle, + mount_handle: MountHandle, } impl TestContext for OfsTestContext { @@ -51,12 +54,22 @@ impl TestContext for OfsTestContext { .build() .expect("build runtime") }) - .block_on(async move { - ofs::fuse::Fuse::new() - .mount_with_unprivileged(mount_point_str, backend) - .await - }) - .unwrap(); + .block_on( + #[allow(clippy::async_yields_async)] + async move { + let mut mount_options = MountOptions::default(); + let gid = nix::unistd::getgid().into(); + mount_options.gid(gid); + let uid = nix::unistd::getuid().into(); + mount_options.uid(uid); + + let fs = fuse3_opendal::Filesystem::new(backend, uid, gid); + Session::new(mount_options) + .mount_with_unprivileged(fs, mount_point_str) + .await + .unwrap() + }, + ); OfsTestContext { mount_point, diff --git a/integrations/fuse3/.gitignore b/integrations/fuse3/.gitignore new file mode 100644 index 000000000000..03314f77b5aa --- /dev/null +++ b/integrations/fuse3/.gitignore @@ -0,0 +1 @@ +Cargo.lock diff --git a/integrations/fuse3/Cargo.toml b/integrations/fuse3/Cargo.toml new file mode 100644 index 000000000000..23a4bec14145 --- /dev/null +++ b/integrations/fuse3/Cargo.toml @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +description = "fuse3 integration for Apache OpenDAL" +name = "fuse3_opendal" + +authors = ["Apache OpenDAL "] +edition = "2021" +homepage = "https://opendal.apache.org/" +license = "Apache-2.0" +repository = "https://github.com/apache/opendal" +rust-version = "1.75" +version = "0.1.0" + +[dependencies] +bytes = "1.6.0" +fuse3 = { version = "0.7.2", "features" = ["tokio-runtime", "unprivileged"] } +futures-util = "0.3.30" +libc = "0.2.155" +log = "0.4.21" +opendal = { version = "0.47.0", path = "../../core" } +sharded-slab = "0.1.7" +tokio = "1.38.0" + +[dev-dependencies] diff --git a/integrations/fuse3/src/file.rs b/integrations/fuse3/src/file.rs new file mode 100644 index 000000000000..3e6bbb29c75f --- /dev/null +++ b/integrations/fuse3/src/file.rs @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use fuse3::Errno; +use opendal::Writer; +use std::ffi::OsString; +use std::sync::Arc; +use tokio::sync::Mutex; + +/// Opened file represents file that opened in memory. +/// +/// # FIXME +/// +/// We should remove the `pub` filed to avoid unexpected changes. +pub struct OpenedFile { + pub path: OsString, + pub is_read: bool, + pub inner_writer: Option>>, +} + +/// # FIXME +/// +/// We need better naming and API for this struct. +pub struct InnerWriter { + pub writer: Writer, + pub written: u64, +} + +/// File key is the key of opened file. +/// +/// # FIXME +/// +/// We should remove the `pub` filed to avoid unexpected changes. +#[derive(Debug, Clone, Copy)] +pub struct FileKey(pub usize); + +impl TryFrom for FileKey { + type Error = Errno; + + fn try_from(value: u64) -> std::result::Result { + match value { + 0 => Err(Errno::from(libc::EBADF)), + _ => Ok(FileKey(value as usize - 1)), + } + } +} + +impl FileKey { + pub fn to_fh(self) -> u64 { + self.0 as u64 + 1 // ensure fh is not 0 + } +} diff --git a/bin/ofs/src/fuse/adapter.rs b/integrations/fuse3/src/file_system.rs similarity index 96% rename from bin/ofs/src/fuse/adapter.rs rename to integrations/fuse3/src/file_system.rs index ddccf08ab4ee..a57babe21482 100644 --- a/bin/ofs/src/fuse/adapter.rs +++ b/integrations/fuse3/src/file_system.rs @@ -16,7 +16,6 @@ // under the License. use std::ffi::OsStr; -use std::ffi::OsString; use std::num::NonZeroU32; use std::path::PathBuf; use std::sync::Arc; @@ -35,51 +34,26 @@ use opendal::ErrorKind; use opendal::Metadata; use opendal::Metakey; use opendal::Operator; -use opendal::Writer; use sharded_slab::Slab; use tokio::sync::Mutex; -const TTL: Duration = Duration::from_secs(1); // 1 second - -struct OpenedFile { - path: OsString, - is_read: bool, - inner_writer: Option>>, -} - -struct InnerWriter { - writer: Writer, - written: u64, -} - -#[derive(Debug, Clone, Copy)] -struct FileKey(usize); - -impl TryFrom for FileKey { - type Error = Errno; - - fn try_from(value: u64) -> std::result::Result { - match value { - 0 => Err(Errno::from(libc::EBADF)), - _ => Ok(FileKey(value as usize - 1)), - } - } -} +use super::file::FileKey; +use super::file::InnerWriter; +use super::file::OpenedFile; -impl FileKey { - fn to_fh(self) -> u64 { - self.0 as u64 + 1 // ensure fh is not 0 - } -} +const TTL: Duration = Duration::from_secs(1); // 1 second -pub(super) struct FuseAdapter { +/// Filesystem represents the filesystem that implements [`PathFilesystem`]. +pub struct Filesystem { op: Operator, gid: u32, uid: u32, + opened_files: Slab, } -impl FuseAdapter { +impl Filesystem { + /// Create a new filesystem with given operator, uid and gid. pub fn new(op: Operator, uid: u32, gid: u32) -> Self { Self { op, @@ -150,10 +124,7 @@ impl FuseAdapter { } } -impl PathFilesystem for FuseAdapter { - type DirEntryStream<'a> = BoxStream<'a, Result>; - type DirEntryPlusStream<'a> = BoxStream<'a, Result>; - +impl PathFilesystem for Filesystem { // Init a fuse filesystem async fn init(&self, _req: Request) -> Result { Ok(ReplyInit { @@ -366,119 +337,6 @@ impl PathFilesystem for FuseAdapter { Err(libc::EOPNOTSUPP.into()) } - async fn create( - &self, - _req: Request, - parent: &OsStr, - name: &OsStr, - mode: u32, - flags: u32, - ) -> Result { - log::debug!( - "create(parent={:?}, name={:?}, mode=0o{:o}, flags=0x{:x})", - parent, - name, - mode, - flags - ); - - let (is_read, is_trunc, is_append) = self.check_flags(flags | libc::O_CREAT as u32)?; - - let path = PathBuf::from(parent).join(name); - - let inner_writer = if is_trunc || is_append { - let writer = self - .op - .writer_with(&path.to_string_lossy()) - .chunk(4 * 1024 * 1024) - .append(is_append) - .await - .map_err(opendal_error2errno)?; - Some(Arc::new(Mutex::new(InnerWriter { writer, written: 0 }))) - } else { - None - }; - - let now = SystemTime::now(); - let attr = dummy_file_attr(FileType::RegularFile, now, self.uid, self.gid); - - let key = self - .opened_files - .insert(OpenedFile { - path: path.into(), - is_read, - inner_writer, - }) - .ok_or(Errno::from(libc::EBUSY))?; - - Ok(ReplyCreated { - ttl: TTL, - attr, - generation: 0, - fh: FileKey(key).to_fh(), - flags, - }) - } - - /// In design, flush could be called multiple times for a single open. But there is the only - /// place that we can handle the write operations. - /// - /// So we only support the use case that flush only be called once. - async fn flush( - &self, - _req: Request, - path: Option<&OsStr>, - fh: u64, - lock_owner: u64, - ) -> Result<()> { - log::debug!( - "flush(path={:?}, fh={}, lock_owner={})", - path, - fh, - lock_owner, - ); - - let file = self - .opened_files - .take(FileKey::try_from(fh)?.0) - .ok_or(Errno::from(libc::EBADF))?; - - if let Some(inner_writer) = file.inner_writer { - let mut lock = inner_writer.lock().await; - let res = lock.writer.close().await.map_err(opendal_error2errno); - return res; - } - - if matches!(path, Some(ref p) if p != &file.path) { - Err(Errno::from(libc::EBADF))?; - } - - Ok(()) - } - - async fn release( - &self, - _req: Request, - path: Option<&OsStr>, - fh: u64, - flags: u32, - lock_owner: u64, - flush: bool, - ) -> Result<()> { - log::debug!( - "release(path={:?}, fh={}, flags=0x{:x}, lock_owner={}, flush={})", - path, - fh, - flags, - lock_owner, - flush - ); - - // Just take and forget it. - let _ = self.opened_files.take(FileKey::try_from(fh)?.0); - Ok(()) - } - async fn open(&self, _req: Request, path: &OsStr, flags: u32) -> Result { log::debug!("open(path={:?}, flags=0x{:x})", path, flags); @@ -607,6 +465,67 @@ impl PathFilesystem for FuseAdapter { }) } + async fn release( + &self, + _req: Request, + path: Option<&OsStr>, + fh: u64, + flags: u32, + lock_owner: u64, + flush: bool, + ) -> Result<()> { + log::debug!( + "release(path={:?}, fh={}, flags=0x{:x}, lock_owner={}, flush={})", + path, + fh, + flags, + lock_owner, + flush + ); + + // Just take and forget it. + let _ = self.opened_files.take(FileKey::try_from(fh)?.0); + Ok(()) + } + + /// In design, flush could be called multiple times for a single open. But there is the only + /// place that we can handle the write operations. + /// + /// So we only support the use case that flush only be called once. + async fn flush( + &self, + _req: Request, + path: Option<&OsStr>, + fh: u64, + lock_owner: u64, + ) -> Result<()> { + log::debug!( + "flush(path={:?}, fh={}, lock_owner={})", + path, + fh, + lock_owner, + ); + + let file = self + .opened_files + .take(FileKey::try_from(fh)?.0) + .ok_or(Errno::from(libc::EBADF))?; + + if let Some(inner_writer) = file.inner_writer { + let mut lock = inner_writer.lock().await; + let res = lock.writer.close().await.map_err(opendal_error2errno); + return res; + } + + if matches!(path, Some(ref p) if p != &file.path) { + Err(Errno::from(libc::EBADF))?; + } + + Ok(()) + } + + type DirEntryStream<'a> = BoxStream<'a, Result>; + async fn readdir<'a>( &'a self, _req: Request, @@ -663,6 +582,62 @@ impl PathFilesystem for FuseAdapter { Ok(()) } + async fn create( + &self, + _req: Request, + parent: &OsStr, + name: &OsStr, + mode: u32, + flags: u32, + ) -> Result { + log::debug!( + "create(parent={:?}, name={:?}, mode=0o{:o}, flags=0x{:x})", + parent, + name, + mode, + flags + ); + + let (is_read, is_trunc, is_append) = self.check_flags(flags | libc::O_CREAT as u32)?; + + let path = PathBuf::from(parent).join(name); + + let inner_writer = if is_trunc || is_append { + let writer = self + .op + .writer_with(&path.to_string_lossy()) + .chunk(4 * 1024 * 1024) + .append(is_append) + .await + .map_err(opendal_error2errno)?; + Some(Arc::new(Mutex::new(InnerWriter { writer, written: 0 }))) + } else { + None + }; + + let now = SystemTime::now(); + let attr = dummy_file_attr(FileType::RegularFile, now, self.uid, self.gid); + + let key = self + .opened_files + .insert(OpenedFile { + path: path.into(), + is_read, + inner_writer, + }) + .ok_or(Errno::from(libc::EBUSY))?; + + Ok(ReplyCreated { + ttl: TTL, + attr, + generation: 0, + fh: FileKey(key).to_fh(), + flags, + }) + } + + type DirEntryPlusStream<'a> = BoxStream<'a, Result>; + async fn readdirplus<'a>( &'a self, _req: Request, diff --git a/bin/ofs/src/lib.rs b/integrations/fuse3/src/lib.rs similarity index 91% rename from bin/ofs/src/lib.rs rename to integrations/fuse3/src/lib.rs index 599e58249c69..ca6b64d4dec7 100644 --- a/bin/ofs/src/lib.rs +++ b/integrations/fuse3/src/lib.rs @@ -15,5 +15,6 @@ // specific language governing permissions and limitations // under the License. -#[cfg(any(target_os = "linux", target_os = "freebsd"))] -pub mod fuse; +mod file; +mod file_system; +pub use file_system::Filesystem;