From 398d17d45b12a2f01a682d938f8423d023db4324 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 17 Jan 2024 17:58:52 +0800 Subject: [PATCH] feat(services/gdrive): Use trash instead of permanently deletes (#4002) * Refactor config Signed-off-by: Xuanwo * Use trash instead of delete Signed-off-by: Xuanwo * Fix CI Signed-off-by: Xuanwo * Fix rename Signed-off-by: Xuanwo * Fix copy Signed-off-by: Xuanwo * Fix typo Signed-off-by: Xuanwo * retry 405 errors Signed-off-by: Xuanwo * Ignore test for gdrive Signed-off-by: Xuanwo --------- Signed-off-by: Xuanwo --- core/src/services/gdrive/backend.rs | 12 ++--- core/src/services/gdrive/builder.rs | 74 +++++++++++++++++++---------- core/src/services/gdrive/core.rs | 11 +++-- core/src/services/gdrive/error.rs | 4 +- core/tests/behavior/async_delete.rs | 5 ++ core/tests/behavior/async_list.rs | 6 +++ 6 files changed, 77 insertions(+), 35 deletions(-) diff --git a/core/src/services/gdrive/backend.rs b/core/src/services/gdrive/backend.rs index ed9ed8354ea5..671fa79e1c08 100644 --- a/core/src/services/gdrive/backend.rs +++ b/core/src/services/gdrive/backend.rs @@ -148,9 +148,9 @@ impl Accessor for GdriveBackend { return Ok(RpDelete::default()); }; - let resp = self.core.gdrive_delete(&file_id).await?; + let resp = self.core.gdrive_trash(&file_id).await?; let status = resp.status(); - if status != StatusCode::NO_CONTENT && status != StatusCode::NOT_FOUND { + if status != StatusCode::OK { return Err(parse_error(resp).await?); } @@ -183,9 +183,9 @@ impl Accessor for GdriveBackend { // copy will overwrite `to`, delete it if exist if let Some(id) = self.core.path_cache.get(&to_path).await? { - let resp = self.core.gdrive_delete(&id).await?; + let resp = self.core.gdrive_trash(&id).await?; let status = resp.status(); - if status != StatusCode::NO_CONTENT && status != StatusCode::NOT_FOUND { + if status != StatusCode::OK { return Err(parse_error(resp).await?); } @@ -223,9 +223,9 @@ impl Accessor for GdriveBackend { // rename will overwrite `to`, delete it if exist if let Some(id) = self.core.path_cache.get(&target).await? { - let resp = self.core.gdrive_delete(&id).await?; + let resp = self.core.gdrive_trash(&id).await?; let status = resp.status(); - if status != StatusCode::NO_CONTENT && status != StatusCode::NOT_FOUND { + if status != StatusCode::OK { return Err(parse_error(resp).await?); } diff --git a/core/src/services/gdrive/builder.rs b/core/src/services/gdrive/builder.rs index f3ca9b1bc496..bcb495d8f703 100644 --- a/core/src/services/gdrive/builder.rs +++ b/core/src/services/gdrive/builder.rs @@ -23,41 +23,63 @@ use std::sync::Arc; use chrono::DateTime; use chrono::Utc; use log::debug; +use serde::Deserialize; use tokio::sync::Mutex; use super::backend::GdriveBackend; -use crate::raw::HttpClient; use crate::raw::{normalize_root, PathCacher}; +use crate::raw::{ConfigDeserializer, HttpClient}; use crate::services::gdrive::core::GdriveSigner; use crate::services::gdrive::core::{GdriveCore, GdrivePathQuery}; use crate::Scheme; use crate::*; +/// [GoogleDrive](https://drive.google.com/) configuration. +#[derive(Default, Deserialize)] +#[serde(default)] +#[non_exhaustive] +pub struct GdriveConfig { + /// The root for gdrive + pub root: Option, + /// Access token for gdrive. + pub access_token: Option, + /// Refresh token for gdrive. + pub refresh_token: Option, + /// Client id for gdrive. + pub client_id: Option, + /// Client secret for gdrive. + pub client_secret: Option, +} + +impl Debug for GdriveConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("GdriveConfig") + .field("root", &self.root) + .finish_non_exhaustive() + } +} + /// [GoogleDrive](https://drive.google.com/) backend support. #[derive(Default)] #[doc = include_str!("docs.md")] pub struct GdriveBuilder { - root: Option, - - access_token: Option, - - refresh_token: Option, - client_id: Option, - client_secret: Option, + config: GdriveConfig, http_client: Option, } impl Debug for GdriveBuilder { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Backend").field("root", &self.root).finish() + f.debug_struct("Backend") + .field("config", &self.config) + .finish() } } impl GdriveBuilder { /// Set root path of GoogleDrive folder. pub fn root(&mut self, root: &str) -> &mut Self { - self.root = Some(root.to_string()); + self.config.root = Some(root.to_string()); self } @@ -72,7 +94,7 @@ impl GdriveBuilder { /// - If you want to use the access token for a long time, /// you can use the refresh token to get a new access token. pub fn access_token(&mut self, access_token: &str) -> &mut Self { - self.access_token = Some(access_token.to_string()); + self.config.access_token = Some(access_token.to_string()); self } @@ -82,7 +104,7 @@ impl GdriveBuilder { /// /// OpenDAL will use this refresh token to get a new access token when the old one is expired. pub fn refresh_token(&mut self, refresh_token: &str) -> &mut Self { - self.refresh_token = Some(refresh_token.to_string()); + self.config.refresh_token = Some(refresh_token.to_string()); self } @@ -90,7 +112,7 @@ impl GdriveBuilder { /// /// This is required for OAuth 2.0 Flow to refresh the access token. pub fn client_id(&mut self, client_id: &str) -> &mut Self { - self.client_id = Some(client_id.to_string()); + self.config.client_id = Some(client_id.to_string()); self } @@ -98,7 +120,7 @@ impl GdriveBuilder { /// /// This is required for OAuth 2.0 Flow with refresh the access token. pub fn client_secret(&mut self, client_secret: &str) -> &mut Self { - self.client_secret = Some(client_secret.to_string()); + self.config.client_secret = Some(client_secret.to_string()); self } @@ -120,19 +142,18 @@ impl Builder for GdriveBuilder { type Accessor = GdriveBackend; fn from_map(map: HashMap) -> Self { - let mut builder = Self::default(); + let config = GdriveConfig::deserialize(ConfigDeserializer::new(map)) + .expect("config deserialize must succeed"); - map.get("root").map(|v| builder.root(v)); - map.get("access_token").map(|v| builder.access_token(v)); - map.get("refresh_token").map(|v| builder.refresh_token(v)); - map.get("client_id").map(|v| builder.client_id(v)); - map.get("client_secret").map(|v| builder.client_secret(v)); + Self { + config, - builder + http_client: None, + } } fn build(&mut self) -> Result { - let root = normalize_root(&self.root.take().unwrap_or_default()); + let root = normalize_root(&self.config.root.take().unwrap_or_default()); debug!("backend use root {}", root); let client = if let Some(client) = self.http_client.take() { @@ -145,21 +166,24 @@ impl Builder for GdriveBuilder { }; let mut signer = GdriveSigner::new(client.clone()); - match (self.access_token.take(), self.refresh_token.take()) { + match ( + self.config.access_token.take(), + self.config.refresh_token.take(), + ) { (Some(access_token), None) => { signer.access_token = access_token; // We will never expire user specified access token. signer.expires_in = DateTime::::MAX_UTC; } (None, Some(refresh_token)) => { - let client_id = self.client_id.take().ok_or_else(|| { + let client_id = self.config.client_id.take().ok_or_else(|| { Error::new( ErrorKind::ConfigInvalid, "client_id must be set when refresh_token is set", ) .with_context("service", Scheme::Gdrive) })?; - let client_secret = self.client_secret.take().ok_or_else(|| { + let client_secret = self.config.client_secret.take().ok_or_else(|| { Error::new( ErrorKind::ConfigInvalid, "client_secret must be set when refresh_token is set", diff --git a/core/src/services/gdrive/core.rs b/core/src/services/gdrive/core.rs index 73685b863208..96ad5f1e11e2 100644 --- a/core/src/services/gdrive/core.rs +++ b/core/src/services/gdrive/core.rs @@ -161,11 +161,16 @@ impl GdriveCore { self.client.send(req).await } - pub async fn gdrive_delete(&self, file_id: &str) -> Result> { + pub async fn gdrive_trash(&self, file_id: &str) -> Result> { let url = format!("https://www.googleapis.com/drive/v3/files/{}", file_id); - let mut req = Request::delete(&url) - .body(AsyncBody::Empty) + let body = serde_json::to_vec(&json!({ + "trashed": true + })) + .map_err(new_json_serialize_error)?; + + let mut req = Request::patch(&url) + .body(AsyncBody::Bytes(Bytes::from(body))) .map_err(new_request_build_error)?; self.sign(&mut req).await?; diff --git a/core/src/services/gdrive/error.rs b/core/src/services/gdrive/error.rs index 9a1360fa3bc2..8b7ad781aed7 100644 --- a/core/src/services/gdrive/error.rs +++ b/core/src/services/gdrive/error.rs @@ -45,7 +45,9 @@ pub async fn parse_error(resp: Response) -> Result { StatusCode::INTERNAL_SERVER_ERROR | StatusCode::BAD_GATEWAY | StatusCode::SERVICE_UNAVAILABLE - | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true), + | StatusCode::GATEWAY_TIMEOUT + // Gdrive sometimes return METHOD_NOT_ALLOWED for our requests for abuse detection. + | StatusCode::METHOD_NOT_ALLOWED => (ErrorKind::Unexpected, true), _ => (ErrorKind::Unexpected, false), }; diff --git a/core/tests/behavior/async_delete.rs b/core/tests/behavior/async_delete.rs index 11e5c2ca4868..3efb3ad436ac 100644 --- a/core/tests/behavior/async_delete.rs +++ b/core/tests/behavior/async_delete.rs @@ -131,6 +131,11 @@ pub async fn test_delete_stream(op: Operator) -> Result<()> { if !op.info().full_capability().create_dir { return Ok(()); } + // Gdrive think that this test is an abuse of their service and redirect us + // to an infinite loop. Let's ignore this test for gdrive. + if op.info().scheme() == Scheme::Gdrive { + return Ok(()); + } let dir = uuid::Uuid::new_v4().to_string(); op.create_dir(&format!("{dir}/")) diff --git a/core/tests/behavior/async_list.rs b/core/tests/behavior/async_list.rs index ffb64e0058c9..9358ec4f0492 100644 --- a/core/tests/behavior/async_list.rs +++ b/core/tests/behavior/async_list.rs @@ -197,6 +197,12 @@ pub async fn test_list_prefix(op: Operator) -> Result<()> { /// listing a directory, which contains more objects than a single page can take. pub async fn test_list_rich_dir(op: Operator) -> Result<()> { + // Gdrive think that this test is an abuse of their service and redirect us + // to an infinite loop. Let's ignore this test for gdrive. + if op.info().scheme() == Scheme::Gdrive { + return Ok(()); + } + op.create_dir("test_list_rich_dir/").await?; let mut expected: Vec = (0..=100)