diff --git a/core/Cargo.lock b/core/Cargo.lock index fbaaf3f2a8a..a3f52e44e23 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -17,6 +17,17 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "aes" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac1f845298e95f983ff1944b728ae08b8cebab80d684f0a832ed0fc74dfa27e2" +dependencies = [ + "cfg-if", + "cipher", + "cpufeatures", +] + [[package]] name = "ahash" version = "0.7.7" @@ -1055,6 +1066,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block-padding" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93" +dependencies = [ + "generic-array", +] + [[package]] name = "blocking" version = "1.5.1" @@ -1253,6 +1273,15 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" +[[package]] +name = "cbc" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6" +dependencies = [ + "cipher", +] + [[package]] name = "cc" version = "1.0.83" @@ -1325,6 +1354,16 @@ dependencies = [ "half", ] +[[package]] +name = "cipher" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" +dependencies = [ + "crypto-common", + "inout", +] + [[package]] name = "clang-sys" version = "1.7.0" @@ -3050,6 +3089,16 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "64e9829a50b42bb782c1df523f78d332fe371b10c661e78b7a3c34b0198e9fac" +[[package]] +name = "inout" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5" +dependencies = [ + "block-padding", + "generic-array", +] + [[package]] name = "instant" version = "0.1.12" @@ -3721,7 +3770,7 @@ dependencies = [ "hmac", "lazy_static", "md-5", - "pbkdf2", + "pbkdf2 0.11.0", "percent-encoding", "rand 0.8.5", "rustc_version_runtime", @@ -4393,6 +4442,16 @@ dependencies = [ "digest", ] +[[package]] +name = "pbkdf2" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8ed6a7761f76e3b9f92dfb0a60a6a6477c61024b775147ff0973a02653abaf2" +dependencies = [ + "digest", + "hmac", +] + [[package]] name = "peeking_take_while" version = "0.1.2" @@ -4532,6 +4591,21 @@ dependencies = [ "spki 0.7.3", ] +[[package]] +name = "pkcs5" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e847e2c91a18bfa887dd028ec33f2fe6f25db77db3619024764914affe8b69a6" +dependencies = [ + "aes", + "cbc", + "der 0.7.8", + "pbkdf2 0.12.2", + "scrypt", + "sha2", + "spki 0.7.3", +] + [[package]] name = "pkcs8" version = "0.9.0" @@ -4549,6 +4623,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" dependencies = [ "der 0.7.8", + "pkcs5", + "rand_core 0.6.4", "spki 0.7.3", ] @@ -5259,8 +5335,7 @@ dependencies = [ [[package]] name = "reqsign" version = "0.14.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed08ac3aa0676637644b1b892202f1ae789c28c15ebfa906128d111ae8086062" +source = "git+https://github.com/xuanwo/reqsign.git#dfd0bd64f91c4a59e01876667cfe298a312e32cf" dependencies = [ "anyhow", "async-trait", @@ -5285,6 +5360,7 @@ dependencies = [ "serde_json", "sha1", "sha2", + "toml", ] [[package]] @@ -5452,6 +5528,7 @@ dependencies = [ "pkcs1", "pkcs8 0.10.2", "rand_core 0.6.4", + "sha2", "signature 2.2.0", "spki 0.7.3", "subtle", @@ -5653,6 +5730,15 @@ version = "1.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c" +[[package]] +name = "salsa20" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97a22f5af31f73a954c10289c93e8a50cc23d971e80ee446f1f6f7137a088213" +dependencies = [ + "cipher", +] + [[package]] name = "same-file" version = "1.0.6" @@ -5698,6 +5784,17 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "scrypt" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0516a385866c09368f0b5bcd1caff3366aace790fcd46e2bb032697bb172fd1f" +dependencies = [ + "pbkdf2 0.12.2", + "salsa20", + "sha2", +] + [[package]] name = "sct" version = "0.7.1" diff --git a/core/Cargo.toml b/core/Cargo.toml index 69d1010d16b..70501cfa788 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -51,6 +51,7 @@ default = [ "services-ipmfs", "services-memory", "services-obs", + "services-ocios", "services-oss", "services-s3", "services-webdav", @@ -176,6 +177,11 @@ services-obs = [ "reqsign?/services-huaweicloud", "reqsign?/reqwest_request", ] +services-ocios = [ + "dep:reqsign", + "reqsign?/services-oracle", + "reqsign?/reqwest_request", +] services-onedrive = [] services-oss = [ "dep:reqsign", @@ -279,7 +285,7 @@ sha1 = { version = "0.10.6", optional = true } sha2 = { version = "0.10", optional = true } # For http based services. -reqsign = { version = "0.14.7", default-features = false, optional = true } +reqsign = { git = "https://github.com/xuanwo/reqsign.git", default-features = false, optional = true } # for services-atomic-server atomic_lib = { version = "0.34.5", optional = true } diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index de5c0cef022..d8abd3d0531 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -224,6 +224,11 @@ pub use onedrive::Onedrive; #[cfg(feature = "services-onedrive")] pub use onedrive::OnedriveConfig; +#[cfg(feature = "services-ocios")] +mod ocios; +#[cfg(feature = "services-ocios")] +pub use ocios::OciOs; + #[cfg(feature = "services-gdrive")] mod gdrive; #[cfg(feature = "services-gdrive")] diff --git a/core/src/services/ocios/backend.rs b/core/src/services/ocios/backend.rs new file mode 100644 index 00000000000..a87510b9b44 --- /dev/null +++ b/core/src/services/ocios/backend.rs @@ -0,0 +1,278 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::sync::Arc; + +use async_trait::async_trait; +use http::StatusCode; +use log::debug; +use reqsign::OCIAPIKeySigner; +use reqsign::OCILoader; + +use super::core::*; +use super::error::parse_error; +use super::lister::OciOsLister; +use crate::raw::*; +use crate::*; + +/// Oracle Cloud Infrastructure Object Storage support +#[doc = include_str!("docs.md")] +#[derive(Default)] +pub struct OciOsBuilder { + root: Option, + + region: String, + namespace: String, + endpoint: String, + bucket: String, + + http_client: Option, +} + +impl Debug for OciOsBuilder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut d = f.debug_struct("Builder"); + d.field("root", &self.root) + .field("region", &self.region) + .field("bucket", &self.bucket) + .field("namespace", &self.namespace) + .field("endpoint", &self.endpoint); + + d.finish_non_exhaustive() + } +} + +impl OciOsBuilder { + /// Set root of this backend. + /// + /// All operations will happen under this root. + pub fn root(&mut self, root: &str) -> &mut Self { + self.root = if root.is_empty() { + None + } else { + Some(root.to_string()) + }; + + self + } + + /// Set bucket name of this backend. + pub fn bucket(&mut self, bucket: &str) -> &mut Self { + self.bucket = bucket.to_string(); + + self + } + + /// Set namespace of this backend. + pub fn namespace(&mut self, ns: &str) -> &mut Self { + self.namespace = ns.to_string(); + + self + } + + /// Specify the http client that used by this service. + /// + /// # Notes + /// + /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed + /// during minor updates. + pub fn http_client(&mut self, client: HttpClient) -> &mut Self { + self.http_client = Some(client); + self + } +} + +impl Builder for OciOsBuilder { + const SCHEME: Scheme = Scheme::OciOs; + type Accessor = OciOsBackend; + + fn from_map(_map: HashMap) -> Self { + OciOsBuilder::default() + } + + fn build(&mut self) -> Result { + debug!("backend build started: {:?}", &self); + + let root = normalize_root(&self.root.clone().unwrap_or_default()); + debug!("backend use root {}", &root); + + let region = match self.region.is_empty() { + false => Ok(&self.region), + true => Err( + Error::new(ErrorKind::ConfigInvalid, "The region is not set") + .with_context("service", Scheme::OciOs), + ), + }?; + + let bucket = match self.bucket.is_empty() { + false => Ok(&self.bucket), + true => Err( + Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured") + .with_context("service", Scheme::OciOs), + ), + }?; + + let namespace = match self.namespace.is_empty() { + false => Ok(&self.namespace), + true => Err( + Error::new(ErrorKind::ConfigInvalid, "The namespace is not set") + .with_context("service", Scheme::OciOs), + ), + }?; + + let client = if let Some(client) = self.http_client.take() { + client + } else { + HttpClient::new().map_err(|err| { + err.with_operation("Builder::build") + .with_context("service", Scheme::OciOs) + })? + }; + + let loader = OCILoader::default(); + + let signer = OCIAPIKeySigner::default(); + + debug!("Backend build finished"); + + Ok(OciOsBackend { + core: Arc::new(OciOsCore { + region: region.to_owned(), + root, + bucket: bucket.to_owned(), + namespace: namespace.to_owned(), + signer, + loader, + client, + }), + }) + } +} + +#[derive(Debug, Clone)] +/// Oracle Cloud Infrastructure Object Storage Service backend +pub struct OciOsBackend { + core: Arc, +} + +#[async_trait] +impl Accessor for OciOsBackend { + type Reader = IncomingAsyncBody; + type Lister = oio::PageLister; + type Writer = (); + type BlockingReader = (); + type BlockingWriter = (); + type BlockingLister = (); + + fn info(&self) -> AccessorInfo { + let mut am = AccessorInfo::default(); + am.set_scheme(Scheme::OciOs) + .set_root(&self.core.root) + .set_name(&self.core.bucket) + .set_native_capability(Capability { + stat: true, + stat_with_if_match: true, + stat_with_if_none_match: true, + + read: true, + read_can_next: true, + read_with_range: true, + read_with_if_match: true, + read_with_if_none_match: true, + + write: false, + + delete: false, + copy: false, + + list: true, + list_with_limit: true, + list_with_start_after: true, + list_with_recursive: true, + + presign: false, + presign_stat: true, + presign_read: true, + presign_write: true, + + batch: false, + + ..Default::default() + }); + + am + } + + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + let resp = self + .core + .os_get_object( + path, + args.range(), + args.if_match(), + args.if_none_match(), + args.override_content_disposition(), + ) + .await?; + + let status = resp.status(); + + match status { + StatusCode::OK | StatusCode::PARTIAL_CONTENT => { + let size = parse_content_length(resp.headers())?; + let range = parse_content_range(resp.headers())?; + Ok(( + RpRead::new().with_size(size).with_range(range), + resp.into_body(), + )) + } + StatusCode::RANGE_NOT_SATISFIABLE => { + resp.into_body().consume().await?; + Ok((RpRead::new().with_size(Some(0)), IncomingAsyncBody::empty())) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn stat(&self, path: &str, args: OpStat) -> Result { + let resp = self + .core + .os_head_object(path, args.if_match(), args.if_none_match()) + .await?; + + let status = resp.status(); + + match status { + StatusCode::OK => parse_into_metadata(path, resp.headers()).map(RpStat::new), + _ => Err(parse_error(resp).await?), + } + } + + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { + let l = OciOsLister::new( + self.core.clone(), + path, + args.recursive(), + args.limit(), + args.start_after(), + ); + Ok((RpList::default(), oio::PageLister::new(l))) + } +} diff --git a/core/src/services/ocios/core.rs b/core/src/services/ocios/core.rs new file mode 100644 index 00000000000..a7fe250c771 --- /dev/null +++ b/core/src/services/ocios/core.rs @@ -0,0 +1,309 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Debug; +use std::fmt::Formatter; + +use http::Request; +use http::Response; +use reqsign::OCIAPIKeySigner; +use reqsign::OCICredential; +use reqsign::OCILoader; +use serde::Deserialize; +use serde::Serialize; + +use crate::raw::*; +use crate::*; + +mod constants { + pub const QUERY_LIST_PREFIXY: &str = "prefix"; +} + +pub struct OciOsCore { + pub region: String, + pub root: String, + pub bucket: String, + pub namespace: String, + + pub client: HttpClient, + pub loader: OCILoader, + pub signer: OCIAPIKeySigner, +} + +impl Debug for OciOsCore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Backend") + .field("root", &self.root) + .field("bucket", &self.bucket) + .field("namespace", &self.namespace) + .finish_non_exhaustive() + } +} + +impl OciOsCore { + async fn load_credential(&self) -> Result> { + let cred = self + .loader + .load() + .await + .map_err(new_request_credential_error)?; + + if let Some(cred) = cred { + Ok(Some(cred)) + } else { + // Mark this error as temporary since it could be caused by Aliyun STS. + Err(Error::new( + ErrorKind::PermissionDenied, + "no valid credential found, please check configuration or try again", + ) + .set_temporary()) + } + } + + pub async fn sign(&self, req: &mut Request) -> Result<()> { + let cred = if let Some(cred) = self.load_credential().await? { + cred + } else { + return Ok(()); + }; + + self.signer.sign(req, &cred).map_err(new_request_sign_error) + } + + #[inline] + pub async fn send(&self, req: Request) -> Result> { + self.client.send(req).await + } +} + +impl OciOsCore { + pub fn os_get_object_request( + &self, + path: &str, + _range: BytesRange, + _is_presign: bool, + _if_match: Option<&str>, + _if_none_match: Option<&str>, + _override_content_disposition: Option<&str>, + ) -> Result> { + let p = build_abs_path(&self.root, path); + let url = format!( + "objectstorage.{}.oraclecloud.com/n/{}/b/{}/o/{}", + self.region, + self.namespace, + self.bucket, + percent_encode_path(&p) + ); + + let req = Request::get(&url) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + Ok(req) + } + + pub fn os_head_object_request( + &self, + path: &str, + _is_presign: bool, + _if_match: Option<&str>, + _if_none_match: Option<&str>, + ) -> Result> { + let p = build_abs_path(&self.root, path); + let url = format!( + "objectstorage.{}.oraclecloud.com/n/{}/b/{}/o/{}", + self.region, + self.namespace, + self.bucket, + percent_encode_path(&p) + ); + + let req = Request::head(&url) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + Ok(req) + } + + pub fn os_list_object_request( + &self, + path: &str, + _delimiter: &str, + _limit: Option, + _start_after: Option, + ) -> Result> { + let p = build_abs_path(&self.root, path); + let url = format!( + "objectstorage.{}.oraclecloud.com/n/{}/b/{}/o", + self.region, self.namespace, self.bucket, + ); + + // Add query arguments to the URL based on response overrides + let mut query_args = Vec::new(); + query_args.push(format!("{}={}", constants::QUERY_LIST_PREFIXY, p,)); + + let req = Request::get(&url) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + Ok(req) + } + + pub async fn os_list_object( + &self, + path: &str, + _delimiter: &str, + _limit: Option, + start_after: Option, + ) -> Result> { + let mut req = self.os_list_object_request(path, _delimiter, _limit, start_after)?; + + self.sign(&mut req).await?; + self.send(req).await + } + + pub async fn os_get_object( + &self, + path: &str, + range: BytesRange, + if_match: Option<&str>, + if_none_match: Option<&str>, + override_content_disposition: Option<&str>, + ) -> Result> { + let mut req = self.os_get_object_request( + path, + range, + false, + if_match, + if_none_match, + override_content_disposition, + )?; + self.sign(&mut req).await?; + self.send(req).await + } + + pub async fn os_head_object( + &self, + path: &str, + if_match: Option<&str>, + if_none_match: Option<&str>, + ) -> Result> { + let mut req = self.os_head_object_request(path, false, if_match, if_none_match)?; + + self.sign(&mut req).await?; + self.send(req).await + } +} + +/// Request of DeleteObjects. +#[derive(Default, Debug, Serialize)] +#[serde(default, rename = "Delete", rename_all = "PascalCase")] +pub struct DeleteObjectsRequest { + pub object: Vec, +} + +#[derive(Default, Debug, Serialize)] +#[serde(rename_all = "PascalCase")] +pub struct DeleteObjectsRequestObject { + pub key: String, +} + +/// Result of DeleteObjects. +#[derive(Default, Debug, Deserialize)] +#[serde(default, rename = "DeleteResult", rename_all = "PascalCase")] +pub struct DeleteObjectsResult { + pub deleted: Vec, +} + +#[derive(Default, Debug, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct DeleteObjectsResultDeleted { + pub key: String, +} + +#[derive(Default, Debug, Deserialize)] +#[serde(default, rename_all = "PascalCase")] +pub struct DeleteObjectsResultError { + pub code: String, + pub key: String, + pub message: String, +} + +#[derive(Default, Debug, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct InitiateMultipartUploadResult { + #[cfg(test)] + pub bucket: String, + #[cfg(test)] + pub key: String, + pub upload_id: String, +} + +#[derive(Clone, Default, Debug, Serialize)] +#[serde(default, rename_all = "PascalCase")] +pub struct MultipartUploadPart { + #[serde(rename = "PartNumber")] + pub part_number: usize, + #[serde(rename = "ETag")] + pub etag: String, +} + +#[derive(Default, Debug, Serialize)] +#[serde(default, rename = "CompleteMultipartUpload", rename_all = "PascalCase")] +pub struct CompleteMultipartUploadRequest { + pub part: Vec, +} + +#[derive(Default, Debug, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct CompleteMultipartUploadResult { + pub location: String, + pub bucket: String, + pub key: String, + #[serde(rename = "ETag")] + pub etag: String, +} + +#[derive(Default, Debug, Deserialize)] +#[serde(default, rename_all = "PascalCase")] +pub struct ListObjectsOutput { + pub prefix: String, + pub max_keys: u64, + pub encoding_type: String, + pub is_truncated: bool, + pub common_prefixes: Vec, + pub contents: Vec, + pub key_count: u64, + + pub next_continuation_token: Option, +} + +#[derive(Default, Debug, Deserialize, PartialEq, Eq)] +#[serde(default, rename_all = "PascalCase")] +pub struct ListObjectsOutputContent { + pub key: String, + pub last_modified: String, + #[serde(rename = "ETag")] + pub etag: String, + pub size: u64, +} + +#[derive(Default, Debug, Deserialize)] +#[serde(default, rename_all = "PascalCase")] +pub struct CommonPrefix { + pub prefix: String, +} diff --git a/core/src/services/ocios/docs.md b/core/src/services/ocios/docs.md new file mode 100644 index 00000000000..1209002dba0 --- /dev/null +++ b/core/src/services/ocios/docs.md @@ -0,0 +1,69 @@ +## Capabilities + +This service can be used to: + +- [x] stat +- [x] read +- [ ] write +- [ ] append +- [ ] create_dir +- [ ] delete +- [ ] copy +- [ ] rename +- [x] list +- [ ] scan +- [ ] presign +- [ ] blocking + +# Configuration + +This service will look into the OCI default configuration at `~/.oci/config`, +please follow [OCI Docs](https://docs.oracle.com/en-us/iaas/Content/API/Concepts/sdkconfig.htm#SDK_and_CLI_Configuration_File) to setup your configuration file. + +- `root`: Set the work dir for backend. +- `bucket`: Set the container name for backend. + +Refer to [`OciOsBuilder`]'s public API docs for more information, +and [Oracle IaaS API docs](https://docs.oracle.com/en-us/iaas/api/) for API Detail. + +# Example + +## Via Builder + +```rust +use std::sync::Arc; + +use anyhow::Result; +use opendal::services::OciOs; +use opendal::Operator; + +#[tokio::main] +async fn main() -> Result<()> { + // Create OciOs backend builder. + let mut builder = OciOs::default(); + // Set the root for oci os, all operations will happen under this root. + // + // NOTE: the root must be absolute path. + builder.root("/path/to/dir"); + // Set the bucket name, this is required. + builder.bucket("test"); + // Set the host. + // + // For example: + // - "compat.objectstorage.us-phoenix-1.oraclecloud.com" + builder.host("compat.objectstorage.us-phoenix-1.oraclecloud.com"); + // Set the namespace + builder.namespace("namespace"); + // Set the access_key_id and access_key_secret. + // + // OpenDAL will try load credential from the env. + // If credential not set and no valid credential in env, OpenDAL will + // send request without signing like anonymous user. + builder.access_key_id("access_key_id"); + builder.access_key_secret("access_key_secret"); + + let op: Operator = Operator::new(builder)?.finish(); + + Ok(()) +} +``` diff --git a/core/src/services/ocios/error.rs b/core/src/services/ocios/error.rs new file mode 100644 index 00000000000..ecead41c4f7 --- /dev/null +++ b/core/src/services/ocios/error.rs @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use bytes::Buf; +use http::Response; +use http::StatusCode; +use quick_xml::de; +use serde::Deserialize; + +use crate::raw::*; +use crate::Error; +use crate::ErrorKind; +use crate::Result; + +/// OssError is the error returned by oss service. +#[derive(Default, Debug, Deserialize)] +#[serde(default, rename_all = "PascalCase")] +struct OssError { + code: String, + message: String, + request_id: String, + host_id: String, +} + +/// Parse error response into Error. +pub async fn parse_error(resp: Response) -> Result { + let (parts, body) = resp.into_parts(); + let bs = body.bytes().await?; + + let (kind, retryable) = match parts.status { + StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), + StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false), + StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED => { + (ErrorKind::ConditionNotMatch, false) + } + StatusCode::INTERNAL_SERVER_ERROR + | StatusCode::BAD_GATEWAY + | StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true), + _ => (ErrorKind::Unexpected, false), + }; + + let message = match de::from_reader::<_, OssError>(bs.clone().reader()) { + Ok(oss_err) => format!("{oss_err:?}"), + Err(_) => String::from_utf8_lossy(&bs).into_owned(), + }; + + let mut err = Error::new(kind, &message); + + err = with_error_response_context(err, parts); + + if retryable { + err = err.set_temporary(); + } + + Ok(err) +} + +#[cfg(test)] +mod tests { + use super::*; + + /// Error response example is from https://www.alibabacloud.com/help/en/object-storage-service/latest/error-responses + #[test] + fn test_parse_error() { + let bs = bytes::Bytes::from( + r#" + + + + AccessDenied + + + Query-string authentication requires the Signature, Expires and OSSAccessKeyId parameters + + + 1D842BC54255**** + + + oss-cn-hangzhou.aliyuncs.com + + +"#, + ); + + let out: OssError = de::from_reader(bs.reader()).expect("must success"); + println!("{out:?}"); + + assert_eq!(out.code, "AccessDenied"); + assert_eq!(out.message, "Query-string authentication requires the Signature, Expires and OSSAccessKeyId parameters"); + assert_eq!(out.request_id, "1D842BC54255****"); + assert_eq!(out.host_id, "oss-cn-hangzhou.aliyuncs.com"); + } +} diff --git a/core/src/services/ocios/lister.rs b/core/src/services/ocios/lister.rs new file mode 100644 index 00000000000..1028cf5dd15 --- /dev/null +++ b/core/src/services/ocios/lister.rs @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Buf; +use quick_xml::de; + +use super::core::*; +use super::error::parse_error; +use crate::raw::*; +use crate::*; + +pub struct OciOsLister { + core: Arc, + + path: String, + delimiter: &'static str, + limit: Option, + /// Filter results to objects whose names are lexicographically + /// **equal to or after** startOffset + start_after: Option, +} + +impl OciOsLister { + pub fn new( + core: Arc, + path: &str, + recursive: bool, + limit: Option, + start_after: Option<&str>, + ) -> Self { + let delimiter = if recursive { "" } else { "/" }; + Self { + core, + path: path.to_string(), + delimiter, + limit, + start_after: start_after.map(String::from), + } + } +} + +#[async_trait] +impl oio::PageList for OciOsLister { + async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { + let resp = self + .core + .os_list_object( + &self.path, + self.delimiter, + self.limit, + if ctx.token.is_empty() { + self.start_after.clone() + } else { + None + }, + ) + .await?; + + if resp.status() != http::StatusCode::OK { + return Err(parse_error(resp).await?); + } + + let bs = resp.into_body().bytes().await?; + + let output: ListObjectsOutput = de::from_reader(bs.reader()) + .map_err(|e| Error::new(ErrorKind::Unexpected, "deserialize xml").set_source(e))?; + + ctx.done = !output.is_truncated; + ctx.token = output.next_continuation_token.unwrap_or_default(); + + for prefix in output.common_prefixes { + let de = oio::Entry::new( + &build_rel_path(&self.core.root, &prefix.prefix), + Metadata::new(EntryMode::DIR), + ); + ctx.entries.push_back(de); + } + + for object in output.contents { + let path = build_rel_path(&self.core.root, &object.key); + if path == self.path { + continue; + } + if self.start_after.as_ref() == Some(&path) { + continue; + } + + let mut meta = Metadata::new(EntryMode::from_path(&path)); + meta.set_etag(&object.etag); + meta.set_content_md5(object.etag.trim_matches('"')); + meta.set_content_length(object.size); + meta.set_last_modified(parse_datetime_from_rfc3339(object.last_modified.as_str())?); + + let de = oio::Entry::with(path, meta); + ctx.entries.push_back(de); + } + + Ok(()) + } +} diff --git a/core/src/services/ocios/mod.rs b/core/src/services/ocios/mod.rs new file mode 100644 index 00000000000..1334a3a16b2 --- /dev/null +++ b/core/src/services/ocios/mod.rs @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod backend; +pub use backend::OciOsBuilder as OciOs; + +mod core; +mod error; +mod lister; diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index 6620fa81454..ce2b4553322 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -109,6 +109,8 @@ pub enum Scheme { Gdrive, /// [dropbox][crate::services::Dropbox]: Dropbox services. Dropbox, + /// [ocios][crate::services::OciOs]: Oracle Cloud Object Storage Services + OciOs, /// [oss][crate::services::Oss]: Aliyun Object Storage Services Oss, /// [persy][crate::services::Persy]: persy backend support. @@ -249,6 +251,8 @@ impl Scheme { Scheme::Postgresql, #[cfg(feature = "services-gdrive")] Scheme::Gdrive, + #[cfg(feature = "services-ocios")] + Scheme::OciOs, #[cfg(feature = "services-oss")] Scheme::Oss, #[cfg(feature = "services-persy")] @@ -433,6 +437,7 @@ impl From for &'static str { Scheme::Swift => "swift", Scheme::VercelArtifacts => "vercel_artifacts", Scheme::VercelBlob => "vercel_blob", + Scheme::OciOs => "ocios", Scheme::Oss => "oss", Scheme::Webdav => "webdav", Scheme::Webhdfs => "webhdfs",