diff --git a/.github/services/pcloud/pcloud/action.yml b/.github/services/pcloud/pcloud/action.yml new file mode 100644 index 000000000000..0d01853d1de3 --- /dev/null +++ b/.github/services/pcloud/pcloud/action.yml @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: pcloud +description: 'Behavior test for Pcloud.' + +runs: + using: "composite" + steps: + - name: Setup + uses: 1password/load-secrets-action@v1 + with: + export-env: true + env: + OPENDAL_PCLOUD_ROOT: / + OPENDAL_PCLOUD_ENDPOINT: op://services/pcloud/endpoint + OPENDAL_PCLOUD_USERNAME: op://services/pcloud/username + OPENDAL_PCLOUD_PASSWORD: op://services/pcloud/password diff --git a/bindings/java/Cargo.toml b/bindings/java/Cargo.toml index 9d199342c31b..f91b64e41253 100644 --- a/bindings/java/Cargo.toml +++ b/bindings/java/Cargo.toml @@ -71,6 +71,7 @@ services-all = [ "services-onedrive", "services-persy", "services-postgresql", + "services-pcloud", "services-koofr", "services-mysql", "services-redb", @@ -135,6 +136,7 @@ services-mysql = ["opendal/services-mysql"] services-onedrive = ["opendal/services-onedrive"] services-persy = ["opendal/services-persy"] services-postgresql = ["opendal/services-postgresql"] +services-pcloud = ["opendal/services-pcloud"] services-redb = ["opendal/services-redb"] services-redis = ["opendal/services-redis"] services-rocksdb = ["opendal/services-rocksdb"] diff --git a/bindings/nodejs/Cargo.toml b/bindings/nodejs/Cargo.toml index f9182e3a88e1..afa30bcd0f85 100644 --- a/bindings/nodejs/Cargo.toml +++ b/bindings/nodejs/Cargo.toml @@ -74,6 +74,7 @@ services-all = [ "services-onedrive", "services-persy", "services-postgresql", + "services-pcloud", "services-mysql", "services-redb", "services-redis", @@ -130,6 +131,7 @@ services-mysql = ["opendal/services-mysql"] services-onedrive = ["opendal/services-onedrive"] services-persy = ["opendal/services-persy"] services-postgresql = ["opendal/services-postgresql"] +services-pcloud = ["opendal/services-pcloud"] services-redb = ["opendal/services-redb"] services-redis = ["opendal/services-redis"] services-rocksdb = ["opendal/services-rocksdb"] diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml index f2145c8ee539..c7ccc6565ef8 100644 --- a/bindings/python/Cargo.toml +++ b/bindings/python/Cargo.toml @@ -68,6 +68,7 @@ services-all = [ "services-onedrive", "services-persy", "services-postgresql", + "services-pcloud", "services-mysql", "services-redb", "services-redis", @@ -132,6 +133,7 @@ services-mysql = ["opendal/services-mysql"] services-onedrive = ["opendal/services-onedrive"] services-persy = ["opendal/services-persy"] services-postgresql = ["opendal/services-postgresql"] +services-pcloud = ["opendal/services-pcloud"] services-redb = ["opendal/services-redb"] services-redis = ["opendal/services-redis"] services-rocksdb = ["opendal/services-rocksdb"] diff --git a/core/Cargo.toml b/core/Cargo.toml index 880f4ff3da1c..b8c4c2ea2fef 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -52,6 +52,7 @@ default = [ "services-webdav", "services-webhdfs", "services-azfile", + "services-pcloud" ] # Build test utils or not. diff --git a/core/src/services/pcloud/backend.rs b/core/src/services/pcloud/backend.rs index 8283ae884610..2696704e26aa 100644 --- a/core/src/services/pcloud/backend.rs +++ b/core/src/services/pcloud/backend.rs @@ -27,6 +27,7 @@ use serde::Deserialize; use super::core::*; use super::error::parse_error; +use super::error::parse_result; use super::error::PcloudError; use super::lister::PcloudLister; use super::writer::PcloudWriter; @@ -277,6 +278,9 @@ impl Accessor for PcloudBackend { let resp: StatResponse = serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?; let result = resp.result; + + parse_result(result)?; + if result == 2010 || result == 2055 || result == 2002 { return Err(Error::new(ErrorKind::NotFound, &format!("{resp:?}"))); } @@ -338,9 +342,10 @@ impl Accessor for PcloudBackend { let resp: PcloudError = serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?; let result = resp.result; - - // pCloud returns 2005 or 2009 if the file or folder is not found - if result != 0 && result != 2005 && result != 2009 { + parse_result(result)?; + // pCloud returns 2005 if the folder is not found. + // And 2009 or 2002 if the file is not found. + if result != 0 && result != 2005 && result != 2009 && result != 2002 { return Err(Error::new(ErrorKind::Unexpected, &format!("{resp:?}"))); } @@ -372,6 +377,8 @@ impl Accessor for PcloudBackend { let resp: PcloudError = serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?; let result = resp.result; + + parse_result(result)?; if result == 2009 || result == 2010 || result == 2055 || result == 2002 { return Err(Error::new(ErrorKind::NotFound, &format!("{resp:?}"))); } @@ -402,6 +409,8 @@ impl Accessor for PcloudBackend { let resp: PcloudError = serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?; let result = resp.result; + + parse_result(result)?; if result == 2009 || result == 2010 || result == 2055 || result == 2002 { return Err(Error::new(ErrorKind::NotFound, &format!("{resp:?}"))); } diff --git a/core/src/services/pcloud/core.rs b/core/src/services/pcloud/core.rs index 21d3dfedd780..3fff6abbd846 100644 --- a/core/src/services/pcloud/core.rs +++ b/core/src/services/pcloud/core.rs @@ -19,12 +19,14 @@ use std::fmt::Debug; use std::fmt::Formatter; use bytes::Bytes; +use http::header; use http::Request; use http::Response; use http::StatusCode; use serde::Deserialize; use super::error::parse_error; +use super::error::parse_result; use super::error::PcloudError; use crate::raw::*; use crate::*; @@ -88,6 +90,9 @@ impl PcloudCore { let resp: GetFileLinkResponse = serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?; let result = resp.result; + + parse_result(result)?; + if result == 2010 || result == 2055 || result == 2002 { return Err(Error::new(ErrorKind::NotFound, &format!("{resp:?}"))); } @@ -136,6 +141,9 @@ impl PcloudCore { let resp: PcloudError = serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?; let result = resp.result; + + parse_result(result)?; + if result == 2010 || result == 2055 || result == 2002 { return Err(Error::new(ErrorKind::NotFound, &format!("{resp:?}"))); } @@ -331,28 +339,73 @@ impl PcloudCore { self.send(req).await } - pub async fn upload_file(&self, path: &str, bs: Bytes) -> Result> { + pub async fn upload_file( + &self, + path: &str, + progresshash: &str, + bs: Bytes, + ) -> Result> { let path = build_abs_path(&self.root, path); let (name, path) = (get_basename(&path), get_parent(&path).trim_end_matches('/')); let url = format!( - "{}/uploadfile?path=/{}&filename={}&username={}&password={}", + "{}/uploadfile?path=/{}&progresshash={}&nopartial=1&mtime={}&username={}&password={}", self.endpoint, percent_encode_path(path), - percent_encode_path(name), + percent_encode_path(progresshash), + chrono::Local::now().timestamp(), self.username, self.password ); - let req = Request::put(url); + let req = Request::post(url); + + let file_part = FormDataPart::new("file") + .header( + header::CONTENT_DISPOSITION, + format!("form-data; name=\"file\"; filename=\"{name}\"") + .parse() + .unwrap(), + ) + .content(bs); + + let multipart = Multipart::new().part(file_part); + + let req = multipart.apply(req)?; + + self.send(req).await + } + + pub async fn get_upload_progress(&self, progresshash: &str) -> Result { + let url = format!( + "{}/uploadprogress?progresshash={}&username={}&password={}", + self.endpoint, + percent_encode_path(progresshash), + self.username, + self.password + ); + + let req = Request::get(url); // set body let req = req - .body(AsyncBody::Bytes(bs)) + .body(AsyncBody::Empty) .map_err(new_request_build_error)?; - self.send(req).await + let resp = self.send(req).await?; + + let status = resp.status(); + match status { + StatusCode::OK => { + let bs = resp.into_body().bytes().await?; + let resp: UploadProgressResponse = + serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?; + + Ok(resp) + } + _ => Err(parse_error(resp).await?), + } } pub async fn list_folder(&self, path: &str) -> Result> { @@ -423,14 +476,14 @@ pub(super) fn parse_list_metadata(content: ListMetadata) -> Result { #[derive(Debug, Deserialize)] pub struct GetFileLinkResponse { - pub result: u64, + pub result: u32, pub path: Option, pub hosts: Option>, } #[derive(Debug, Deserialize)] pub struct StatResponse { - pub result: u64, + pub result: u32, pub metadata: Option, } @@ -445,7 +498,7 @@ pub struct StatMetadata { #[derive(Debug, Deserialize)] pub struct ListFolderResponse { - pub result: u64, + pub result: u32, pub metadata: Option, } @@ -459,3 +512,15 @@ pub struct ListMetadata { pub contenttype: Option, pub contents: Option>, } + +#[derive(Debug, Deserialize)] +pub struct UploadFileResponse { + pub result: u32, + pub metadata: Vec, +} + +#[derive(Debug, Deserialize)] +pub struct UploadProgressResponse { + pub result: u32, + pub finished: Option, +} diff --git a/core/src/services/pcloud/error.rs b/core/src/services/pcloud/error.rs index e59bcb672cc8..054859b8e112 100644 --- a/core/src/services/pcloud/error.rs +++ b/core/src/services/pcloud/error.rs @@ -42,16 +42,38 @@ impl Debug for PcloudError { } } +/// Deal with error response result. +pub fn parse_result(result: u32) -> Result<()> { + match result / 1000 { + 4 | 5 => { + let mut err = Error::new(ErrorKind::Unexpected, "Pcloud service returns an error."); + err = err.set_temporary(); + Err(err) + } + _ => Ok(()), + } +} + /// Parse error response into Error. pub async fn parse_error(resp: Response) -> Result { let (parts, body) = resp.into_parts(); + + let (kind, retryable) = match parts.status.as_u16() { + 429 | 500 | 502 | 503 | 504 | 509 => (ErrorKind::Unexpected, true), + _ => (ErrorKind::Unexpected, false), + }; + let bs = body.bytes().await?; let message = String::from_utf8_lossy(&bs).into_owned(); - let mut err = Error::new(ErrorKind::Unexpected, &message); + let mut err = Error::new(kind, &message); err = with_error_response_context(err, parts); + if retryable { + err = err.set_temporary(); + } + Ok(err) } diff --git a/core/src/services/pcloud/lister.rs b/core/src/services/pcloud/lister.rs index e4dd65c38127..7334a7ea4a3e 100644 --- a/core/src/services/pcloud/lister.rs +++ b/core/src/services/pcloud/lister.rs @@ -21,7 +21,7 @@ use async_trait::async_trait; use http::StatusCode; use super::core::*; -use super::error::parse_error; +use super::error::{parse_error, parse_result}; use crate::raw::oio::Entry; use crate::raw::*; use crate::*; @@ -56,6 +56,8 @@ impl oio::PageList for PcloudLister { serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?; let result = resp.result; + parse_result(result)?; + if result == 2005 { ctx.done = true; return Ok(()); diff --git a/core/src/services/pcloud/writer.rs b/core/src/services/pcloud/writer.rs index 108080ff172a..5c23d9e43964 100644 --- a/core/src/services/pcloud/writer.rs +++ b/core/src/services/pcloud/writer.rs @@ -21,8 +21,9 @@ use async_trait::async_trait; use http::StatusCode; use super::core::PcloudCore; +use super::core::UploadFileResponse; use super::error::parse_error; -use super::error::PcloudError; +use super::error::parse_result; use crate::raw::*; use crate::*; @@ -46,24 +47,50 @@ impl oio::OneShotWrite for PcloudWriter { self.core.ensure_dir_exists(&self.path).await?; - let resp = self.core.upload_file(&self.path, bs).await?; + let mut finished = false; - let status = resp.status(); + while !finished { + let progresshash = uuid::Uuid::new_v4().to_string(); - match status { - StatusCode::OK => { - let bs = resp.into_body().bytes().await?; - let resp: PcloudError = - serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?; - let result = resp.result; + let resp = self + .core + .upload_file(&self.path, &progresshash, bs.clone()) + .await?; - if result != 0 { - return Err(Error::new(ErrorKind::Unexpected, &format!("{resp:?}"))); - } + let status = resp.status(); + + match status { + StatusCode::OK => { + let bs = resp.into_body().bytes().await?; + let resp: UploadFileResponse = + serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?; + let result = resp.result; + + parse_result(result)?; + + if result != 0 { + return Err(Error::new(ErrorKind::Unexpected, &format!("{resp:?}"))); + } + + if resp.metadata.len() != 1 { + return Err(Error::new(ErrorKind::Unexpected, &format!("{resp:?}"))); + } - Ok(()) + let resp = self.core.get_upload_progress(&progresshash).await?; + + let result = resp.result; + + if result == 1009 { + continue; + } + + parse_result(result)?; + + finished = true; + } + _ => return Err(parse_error(resp).await?), } - _ => Err(parse_error(resp).await?), } + Ok(()) } }