From 3176d21a66052244e7f0a9f05014afd6d075c859 Mon Sep 17 00:00:00 2001 From: Flash <71162630+Young-Flash@users.noreply.github.com> Date: Sat, 7 Oct 2023 19:09:22 +0800 Subject: [PATCH] feat(oay): port `WebdavFs` to dav-server-fs-opendal (#3119) --- Cargo.lock | 17 ++ Cargo.toml | 1 + integrations/README.md | 2 + integrations/dav-server/.gitignore | 0 integrations/dav-server/Cargo.toml | 47 +++++ integrations/dav-server/README.md | 3 + integrations/dav-server/src/dir_entry.rs | 54 ++++++ integrations/dav-server/src/file.rs | 98 +++++++++++ integrations/dav-server/src/lib.rs | 23 +++ integrations/dav-server/src/metadata.rs | 62 +++++++ integrations/dav-server/src/opendalfs.rs | 210 +++++++++++++++++++++++ integrations/dav-server/tests/test.rs | 41 +++++ 12 files changed, 558 insertions(+) create mode 100644 integrations/dav-server/.gitignore create mode 100644 integrations/dav-server/Cargo.toml create mode 100644 integrations/dav-server/README.md create mode 100644 integrations/dav-server/src/dir_entry.rs create mode 100644 integrations/dav-server/src/file.rs create mode 100644 integrations/dav-server/src/lib.rs create mode 100644 integrations/dav-server/src/metadata.rs create mode 100644 integrations/dav-server/src/opendalfs.rs create mode 100644 integrations/dav-server/tests/test.rs diff --git a/Cargo.lock b/Cargo.lock index 5cc7e68be272..ef379ea43123 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1556,6 +1556,23 @@ dependencies = [ "xmltree", ] +[[package]] +name = "dav-server-opendalfs" +version = "0.40.0" +dependencies = [ + "anyhow", + "bytes", + "chrono", + "dav-server", + "dirs 5.0.1", + "futures", + "futures-util", + "opendal", + "quick-xml 0.29.0", + "serde", + "tokio", +] + [[package]] name = "deadpool" version = "0.9.5" diff --git a/Cargo.toml b/Cargo.toml index b761b8b48d55..ac45ee858484 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ members = [ "bin/oay", "integrations/object_store", + "integrations/dav-server", ] resolver = "2" diff --git a/integrations/README.md b/integrations/README.md index dd89e76cf124..ba5dfb6b4a75 100644 --- a/integrations/README.md +++ b/integrations/README.md @@ -5,3 +5,5 @@ This folder contains the integrations for OpenDAL. Integrations are used to inte ## Available Integrations - [`object_store_opendal`](./object_store): Use OpenDAL as a backend for the [object_store](https://docs.rs/object_store/latest/object_store/). + +- [`dav-server-opendalfs`](./dav-server-opendalfs/): Use OpenDAL as a backend to access data in various service with WebDAV protocol. diff --git a/integrations/dav-server/.gitignore b/integrations/dav-server/.gitignore new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/integrations/dav-server/Cargo.toml b/integrations/dav-server/Cargo.toml new file mode 100644 index 000000000000..aa82c51a14f0 --- /dev/null +++ b/integrations/dav-server/Cargo.toml @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +description = "Use OpenDAL as a backend to access data in various service with WebDAV protocol" +name = "dav-server-opendalfs" + +authors.workspace = true +edition.workspace = true +homepage.workspace = true +license.workspace = true +repository.workspace = true +rust-version.workspace = true +version.workspace = true + +[dependencies] +anyhow = "1" +chrono = "0.4.28" +dirs = "5.0.0" +bytes = { version = "1.4.0" } +dav-server = { version = "0.5.5" } +futures = "0.3" +futures-util = { version = "0.3.16" } +opendal.workspace = true +quick-xml = { version = "0.29", features = ["serialize", "overlapped-lists"] } +serde = { version = "1", features = ["derive"] } +tokio = { version = "1.27", features = [ + "fs", + "macros", + "rt-multi-thread", + "io-std", +] } + diff --git a/integrations/dav-server/README.md b/integrations/dav-server/README.md new file mode 100644 index 000000000000..15420debac0d --- /dev/null +++ b/integrations/dav-server/README.md @@ -0,0 +1,3 @@ +# dav-server-opendalfs + +`dav-server-opendalfs` is a integration which use OpenDAL as a backend to access data in various service with WebDAV protocol. diff --git a/integrations/dav-server/src/dir_entry.rs b/integrations/dav-server/src/dir_entry.rs new file mode 100644 index 000000000000..955266546cf4 --- /dev/null +++ b/integrations/dav-server/src/dir_entry.rs @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use dav_server::fs::DavDirEntry; +use futures::FutureExt; +use opendal::Entry; +use opendal::Operator; + +use super::file::convert_error; +use super::metadata::WebdavMetaData; + +pub struct WebDAVDirEntry { + dir_entry: Entry, + op: Operator, +} + +impl DavDirEntry for WebDAVDirEntry { + fn name(&self) -> Vec { + self.dir_entry.name().as_bytes().to_vec() + } + + fn metadata(&self) -> dav_server::fs::FsFuture> { + async move { + self.op + .stat(self.dir_entry.path()) + .await + .map(|metadata| { + Box::new(WebdavMetaData::new(metadata)) as Box + }) + .map_err(convert_error) + } + .boxed() + } +} + +impl WebDAVDirEntry { + pub fn new(dir_entry: Entry, op: Operator) -> Self { + WebDAVDirEntry { dir_entry, op } + } +} diff --git a/integrations/dav-server/src/file.rs b/integrations/dav-server/src/file.rs new file mode 100644 index 000000000000..c8983d1fc258 --- /dev/null +++ b/integrations/dav-server/src/file.rs @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::io::SeekFrom; + +use bytes::Bytes; +use dav_server::davpath::DavPath; +use dav_server::fs::DavFile; +use dav_server::fs::DavMetaData; +use dav_server::fs::FsFuture; +use futures::FutureExt; +use opendal::Operator; + +use super::metadata::WebdavMetaData; + +#[derive(Debug)] +pub struct WebdavFile { + op: Operator, + path: DavPath, +} + +impl WebdavFile { + pub fn new(op: Operator, path: DavPath) -> Self { + Self { op, path } + } +} + +impl DavFile for WebdavFile { + fn read_bytes(&mut self, count: usize) -> FsFuture { + async move { + let file_path = self.path.as_url_string(); + self.op + .read_with(&file_path) + .range(0..count as u64) + .await + .map(Bytes::from) + .map_err(convert_error) + } + .boxed() + } + + fn metadata(&mut self) -> FsFuture> { + async move { + self.op + .stat(self.path.as_url_string().as_str()) + .await + .map(|opendal_metadata| { + Box::new(WebdavMetaData::new(opendal_metadata)) as Box + }) + .map_err(convert_error) + } + .boxed() + } + + fn write_buf(&mut self, buf: Box) -> FsFuture<()> { + self.write_bytes(Bytes::copy_from_slice(buf.chunk())) + } + + fn write_bytes(&mut self, buf: Bytes) -> FsFuture<()> { + async move { + let file_path = self.path.as_url_string(); + self.op.write(&file_path, buf).await.map_err(convert_error) + } + .boxed() + } + + fn seek(&mut self, _pos: SeekFrom) -> FsFuture { + futures_util::future::err(dav_server::fs::FsError::NotImplemented).boxed() + } + + fn flush(&mut self) -> FsFuture<()> { + futures_util::future::ok(()).boxed() + } +} + +pub fn convert_error(opendal_error: opendal::Error) -> dav_server::fs::FsError { + match opendal_error.kind() { + opendal::ErrorKind::AlreadyExists | opendal::ErrorKind::IsSameFile => { + dav_server::fs::FsError::Exists + } + opendal::ErrorKind::NotFound => dav_server::fs::FsError::NotFound, + _ => dav_server::fs::FsError::GeneralFailure, + } +} diff --git a/integrations/dav-server/src/lib.rs b/integrations/dav-server/src/lib.rs new file mode 100644 index 000000000000..6436fd7370b4 --- /dev/null +++ b/integrations/dav-server/src/lib.rs @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod dir_entry; +mod file; +mod metadata; +mod opendalfs; + +pub use opendalfs::OpendalFs; diff --git a/integrations/dav-server/src/metadata.rs b/integrations/dav-server/src/metadata.rs new file mode 100644 index 000000000000..03295125d5ee --- /dev/null +++ b/integrations/dav-server/src/metadata.rs @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use dav_server::fs::DavMetaData; +use dav_server::fs::FsError; +use opendal::Metadata; + +#[derive(Debug, Clone)] +pub struct WebdavMetaData { + metadata: Metadata, +} + +impl WebdavMetaData { + pub fn new(metadata: Metadata) -> Self { + WebdavMetaData { metadata } + } +} + +impl DavMetaData for WebdavMetaData { + fn len(&self) -> u64 { + self.metadata.content_length() + } + + fn modified(&self) -> dav_server::fs::FsResult { + match self.metadata.last_modified() { + Some(t) => Ok(t.into()), + None => Err(FsError::GeneralFailure), + } + } + + fn is_dir(&self) -> bool { + self.metadata.is_dir() + } + + fn is_file(&self) -> bool { + self.metadata.is_file() + } + + fn etag(&self) -> Option { + self.metadata.etag().map(|s| s.to_string()) + } + + fn status_changed(&self) -> dav_server::fs::FsResult { + self.metadata + .last_modified() + .map_or(Err(FsError::GeneralFailure), |t| Ok(t.into())) + } +} diff --git a/integrations/dav-server/src/opendalfs.rs b/integrations/dav-server/src/opendalfs.rs new file mode 100644 index 000000000000..92a2d7783749 --- /dev/null +++ b/integrations/dav-server/src/opendalfs.rs @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::path::Path; +use std::task::ready; +use std::task::Poll::Ready; + +use dav_server::davpath::DavPath; +use dav_server::fs::DavDirEntry; +use dav_server::fs::DavFile; +use dav_server::fs::DavFileSystem; +use dav_server::fs::DavMetaData; +use dav_server::fs::FsError; +use futures::FutureExt; +use futures_util::Stream; +use futures_util::StreamExt; +use opendal::Lister; +use opendal::Operator; + +use super::file::convert_error; +use super::file::WebdavFile; +use super::metadata::WebdavMetaData; +use crate::dir_entry::WebDAVDirEntry; + +#[derive(Clone)] +pub struct OpendalFs { + pub op: Operator, +} + +impl DavFileSystem for OpendalFs { + fn open<'a>( + &'a self, + path: &'a dav_server::davpath::DavPath, + _options: dav_server::fs::OpenOptions, + ) -> dav_server::fs::FsFuture> { + async move { + let file = WebdavFile::new(self.op.clone(), path.clone()); + Ok(Box::new(file) as Box) + } + .boxed() + } + + fn read_dir<'a>( + &'a self, + path: &'a dav_server::davpath::DavPath, + _meta: dav_server::fs::ReadDirMeta, + ) -> dav_server::fs::FsFuture>> + { + async move { + self.op + .lister(path.as_url_string().as_str()) + .await + .map(|lister| DavStream::new(self.op.clone(), lister).boxed()) + .map_err(convert_error) + } + .boxed() + } + + fn metadata<'a>( + &'a self, + path: &'a dav_server::davpath::DavPath, + ) -> dav_server::fs::FsFuture> { + async move { + let opendal_metadata = self.op.stat(path.as_url_string().as_str()).await; + match opendal_metadata { + Ok(metadata) => { + let webdav_metadata = WebdavMetaData::new(metadata); + Ok(Box::new(webdav_metadata) as Box) + } + Err(e) => Err(convert_error(e)), + } + } + .boxed() + } + + fn create_dir<'a>(&'a self, path: &'a DavPath) -> dav_server::fs::FsFuture<()> { + async move { + let path = path.as_url_string(); + + // check if the parent path is exist. + // During MKCOL processing, a server MUST make the Request-URI a member of its parent collection, unless the Request-URI is "/". If no such ancestor exists, the method MUST fail. + // refer to https://datatracker.ietf.org/doc/html/rfc2518#section-8.3.1 + let parent = Path::new(&path).parent().unwrap(); + match self.op.is_exist(parent.to_str().unwrap()).await { + Ok(exist) => { + if !exist && parent != Path::new("/") { + return Err(dav_server::fs::FsError::NotFound); + } + } + Err(e) => { + return Err(convert_error(e)); + } + } + + let path = path.as_str(); + // check if the given path is exist (MKCOL on existing collection should fail (RFC2518:8.3.1)) + let exist = self.op.is_exist(path).await; + match exist { + Ok(exist) => match exist { + true => Err(dav_server::fs::FsError::Exists), + false => { + let res = self.op.create_dir(path).await; + match res { + Ok(_) => Ok(()), + Err(e) => Err(convert_error(e)), + } + } + }, + Err(e) => Err(convert_error(e)), + } + } + .boxed() + } + + fn remove_file<'a>(&'a self, path: &'a DavPath) -> dav_server::fs::FsFuture<()> { + async move { + self.op + .delete(path.as_url_string().as_str()) + .await + .map_err(convert_error) + } + .boxed() + } + + fn remove_dir<'a>(&'a self, path: &'a DavPath) -> dav_server::fs::FsFuture<()> { + self.remove_file(path) + } + + fn copy<'a>(&'a self, from: &'a DavPath, to: &'a DavPath) -> dav_server::fs::FsFuture<()> { + async move { + let from_path = from + .as_rel_ospath() + .to_str() + .ok_or(FsError::GeneralFailure)?; + let to_path = to.as_rel_ospath().to_str().ok_or(FsError::GeneralFailure)?; + self.op + .copy(from_path, to_path) + .await + .map_err(convert_error) + } + .boxed() + } + + fn rename<'a>(&'a self, from: &'a DavPath, to: &'a DavPath) -> dav_server::fs::FsFuture<()> { + async move { + let from_path = from + .as_rel_ospath() + .to_str() + .ok_or(FsError::GeneralFailure)?; + let to_path = to.as_rel_ospath().to_str().ok_or(FsError::GeneralFailure)?; + if from.is_collection() { + let _ = self.remove_file(to).await; + } + self.op + .rename(from_path, to_path) + .await + .map_err(convert_error) + } + .boxed() + } +} + +impl OpendalFs { + pub fn new(op: Operator) -> Box { + Box::new(OpendalFs { op }) + } +} + +struct DavStream { + op: Operator, + lister: Lister, +} + +impl Stream for DavStream { + type Item = Box; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let dav_stream = self.get_mut(); + match ready!(dav_stream.lister.poll_next_unpin(cx)) { + Some(entry) => { + let webdav_entry = WebDAVDirEntry::new(entry.unwrap(), dav_stream.op.clone()); + Ready(Some(Box::new(webdav_entry) as Box)) + } + None => Ready(None), + } + } +} + +impl DavStream { + fn new(op: Operator, lister: Lister) -> Self { + DavStream { op, lister } + } +} diff --git a/integrations/dav-server/tests/test.rs b/integrations/dav-server/tests/test.rs new file mode 100644 index 000000000000..0ad79f720762 --- /dev/null +++ b/integrations/dav-server/tests/test.rs @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use anyhow::Result; +use dav_server::davpath::DavPath; +use dav_server::fs::DavFileSystem; +use dav_server_opendalfs::OpendalFs; +use opendal::services::Fs; +use opendal::Operator; + +#[tokio::test] +async fn test() -> Result<()> { + let mut builder = Fs::default(); + builder.root("/tmp"); + + let op = Operator::new(builder)?.finish(); + + let webdavfs = OpendalFs::new(op); + + let metadata = webdavfs + .metadata(&DavPath::new("/").unwrap()) + .await + .unwrap(); + println!("{}", metadata.is_dir()); + + Ok(()) +}