Skip to content

Commit

Permalink
feat(services/gdrive): Use trash instead of permanently deletes (#4002)
Browse files Browse the repository at this point in the history
* Refactor config

Signed-off-by: Xuanwo <[email protected]>

* Use trash instead of delete

Signed-off-by: Xuanwo <[email protected]>

* Fix CI

Signed-off-by: Xuanwo <[email protected]>

* Fix rename

Signed-off-by: Xuanwo <[email protected]>

* Fix copy

Signed-off-by: Xuanwo <[email protected]>

* Fix typo

Signed-off-by: Xuanwo <[email protected]>

* retry 405 errors

Signed-off-by: Xuanwo <[email protected]>

* Ignore test for gdrive

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Jan 17, 2024
1 parent 746cf43 commit 398d17d
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 35 deletions.
12 changes: 6 additions & 6 deletions core/src/services/gdrive/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,9 @@ impl Accessor for GdriveBackend {
return Ok(RpDelete::default());
};

let resp = self.core.gdrive_delete(&file_id).await?;
let resp = self.core.gdrive_trash(&file_id).await?;
let status = resp.status();
if status != StatusCode::NO_CONTENT && status != StatusCode::NOT_FOUND {
if status != StatusCode::OK {
return Err(parse_error(resp).await?);
}

Expand Down Expand Up @@ -183,9 +183,9 @@ impl Accessor for GdriveBackend {

// copy will overwrite `to`, delete it if exist
if let Some(id) = self.core.path_cache.get(&to_path).await? {
let resp = self.core.gdrive_delete(&id).await?;
let resp = self.core.gdrive_trash(&id).await?;
let status = resp.status();
if status != StatusCode::NO_CONTENT && status != StatusCode::NOT_FOUND {
if status != StatusCode::OK {
return Err(parse_error(resp).await?);
}

Expand Down Expand Up @@ -223,9 +223,9 @@ impl Accessor for GdriveBackend {

// rename will overwrite `to`, delete it if exist
if let Some(id) = self.core.path_cache.get(&target).await? {
let resp = self.core.gdrive_delete(&id).await?;
let resp = self.core.gdrive_trash(&id).await?;
let status = resp.status();
if status != StatusCode::NO_CONTENT && status != StatusCode::NOT_FOUND {
if status != StatusCode::OK {
return Err(parse_error(resp).await?);
}

Expand Down
74 changes: 49 additions & 25 deletions core/src/services/gdrive/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,41 +23,63 @@ use std::sync::Arc;
use chrono::DateTime;
use chrono::Utc;
use log::debug;
use serde::Deserialize;
use tokio::sync::Mutex;

use super::backend::GdriveBackend;
use crate::raw::HttpClient;
use crate::raw::{normalize_root, PathCacher};
use crate::raw::{ConfigDeserializer, HttpClient};
use crate::services::gdrive::core::GdriveSigner;
use crate::services::gdrive::core::{GdriveCore, GdrivePathQuery};
use crate::Scheme;
use crate::*;

/// [GoogleDrive](https://drive.google.com/) configuration.
#[derive(Default, Deserialize)]
#[serde(default)]
#[non_exhaustive]
pub struct GdriveConfig {
/// The root for gdrive
pub root: Option<String>,
/// Access token for gdrive.
pub access_token: Option<String>,
/// Refresh token for gdrive.
pub refresh_token: Option<String>,
/// Client id for gdrive.
pub client_id: Option<String>,
/// Client secret for gdrive.
pub client_secret: Option<String>,
}

impl Debug for GdriveConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GdriveConfig")
.field("root", &self.root)
.finish_non_exhaustive()
}
}

/// [GoogleDrive](https://drive.google.com/) backend support.
#[derive(Default)]
#[doc = include_str!("docs.md")]
pub struct GdriveBuilder {
root: Option<String>,

access_token: Option<String>,

refresh_token: Option<String>,
client_id: Option<String>,
client_secret: Option<String>,
config: GdriveConfig,

http_client: Option<HttpClient>,
}

impl Debug for GdriveBuilder {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Backend").field("root", &self.root).finish()
f.debug_struct("Backend")
.field("config", &self.config)
.finish()
}
}

impl GdriveBuilder {
/// Set root path of GoogleDrive folder.
pub fn root(&mut self, root: &str) -> &mut Self {
self.root = Some(root.to_string());
self.config.root = Some(root.to_string());
self
}

Expand All @@ -72,7 +94,7 @@ impl GdriveBuilder {
/// - If you want to use the access token for a long time,
/// you can use the refresh token to get a new access token.
pub fn access_token(&mut self, access_token: &str) -> &mut Self {
self.access_token = Some(access_token.to_string());
self.config.access_token = Some(access_token.to_string());
self
}

Expand All @@ -82,23 +104,23 @@ impl GdriveBuilder {
///
/// OpenDAL will use this refresh token to get a new access token when the old one is expired.
pub fn refresh_token(&mut self, refresh_token: &str) -> &mut Self {
self.refresh_token = Some(refresh_token.to_string());
self.config.refresh_token = Some(refresh_token.to_string());
self
}

/// Set the client id for GoogleDrive.
///
/// This is required for OAuth 2.0 Flow to refresh the access token.
pub fn client_id(&mut self, client_id: &str) -> &mut Self {
self.client_id = Some(client_id.to_string());
self.config.client_id = Some(client_id.to_string());
self
}

/// Set the client secret for GoogleDrive.
///
/// This is required for OAuth 2.0 Flow with refresh the access token.
pub fn client_secret(&mut self, client_secret: &str) -> &mut Self {
self.client_secret = Some(client_secret.to_string());
self.config.client_secret = Some(client_secret.to_string());
self
}

Expand All @@ -120,19 +142,18 @@ impl Builder for GdriveBuilder {
type Accessor = GdriveBackend;

fn from_map(map: HashMap<String, String>) -> Self {
let mut builder = Self::default();
let config = GdriveConfig::deserialize(ConfigDeserializer::new(map))
.expect("config deserialize must succeed");

map.get("root").map(|v| builder.root(v));
map.get("access_token").map(|v| builder.access_token(v));
map.get("refresh_token").map(|v| builder.refresh_token(v));
map.get("client_id").map(|v| builder.client_id(v));
map.get("client_secret").map(|v| builder.client_secret(v));
Self {
config,

builder
http_client: None,
}
}

fn build(&mut self) -> Result<Self::Accessor> {
let root = normalize_root(&self.root.take().unwrap_or_default());
let root = normalize_root(&self.config.root.take().unwrap_or_default());
debug!("backend use root {}", root);

let client = if let Some(client) = self.http_client.take() {
Expand All @@ -145,21 +166,24 @@ impl Builder for GdriveBuilder {
};

let mut signer = GdriveSigner::new(client.clone());
match (self.access_token.take(), self.refresh_token.take()) {
match (
self.config.access_token.take(),
self.config.refresh_token.take(),
) {
(Some(access_token), None) => {
signer.access_token = access_token;
// We will never expire user specified access token.
signer.expires_in = DateTime::<Utc>::MAX_UTC;
}
(None, Some(refresh_token)) => {
let client_id = self.client_id.take().ok_or_else(|| {
let client_id = self.config.client_id.take().ok_or_else(|| {
Error::new(
ErrorKind::ConfigInvalid,
"client_id must be set when refresh_token is set",
)
.with_context("service", Scheme::Gdrive)
})?;
let client_secret = self.client_secret.take().ok_or_else(|| {
let client_secret = self.config.client_secret.take().ok_or_else(|| {
Error::new(
ErrorKind::ConfigInvalid,
"client_secret must be set when refresh_token is set",
Expand Down
11 changes: 8 additions & 3 deletions core/src/services/gdrive/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,16 @@ impl GdriveCore {
self.client.send(req).await
}

pub async fn gdrive_delete(&self, file_id: &str) -> Result<Response<IncomingAsyncBody>> {
pub async fn gdrive_trash(&self, file_id: &str) -> Result<Response<IncomingAsyncBody>> {
let url = format!("https://www.googleapis.com/drive/v3/files/{}", file_id);

let mut req = Request::delete(&url)
.body(AsyncBody::Empty)
let body = serde_json::to_vec(&json!({
"trashed": true
}))
.map_err(new_json_serialize_error)?;

let mut req = Request::patch(&url)
.body(AsyncBody::Bytes(Bytes::from(body)))
.map_err(new_request_build_error)?;

self.sign(&mut req).await?;
Expand Down
4 changes: 3 additions & 1 deletion core/src/services/gdrive/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {
StatusCode::INTERNAL_SERVER_ERROR
| StatusCode::BAD_GATEWAY
| StatusCode::SERVICE_UNAVAILABLE
| StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true),
| StatusCode::GATEWAY_TIMEOUT
// Gdrive sometimes return METHOD_NOT_ALLOWED for our requests for abuse detection.
| StatusCode::METHOD_NOT_ALLOWED => (ErrorKind::Unexpected, true),
_ => (ErrorKind::Unexpected, false),
};

Expand Down
5 changes: 5 additions & 0 deletions core/tests/behavior/async_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ pub async fn test_delete_stream(op: Operator) -> Result<()> {
if !op.info().full_capability().create_dir {
return Ok(());
}
// Gdrive think that this test is an abuse of their service and redirect us
// to an infinite loop. Let's ignore this test for gdrive.
if op.info().scheme() == Scheme::Gdrive {
return Ok(());
}

let dir = uuid::Uuid::new_v4().to_string();
op.create_dir(&format!("{dir}/"))
Expand Down
6 changes: 6 additions & 0 deletions core/tests/behavior/async_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@ pub async fn test_list_prefix(op: Operator) -> Result<()> {

/// listing a directory, which contains more objects than a single page can take.
pub async fn test_list_rich_dir(op: Operator) -> Result<()> {
// Gdrive think that this test is an abuse of their service and redirect us
// to an infinite loop. Let's ignore this test for gdrive.
if op.info().scheme() == Scheme::Gdrive {
return Ok(());
}

op.create_dir("test_list_rich_dir/").await?;

let mut expected: Vec<String> = (0..=100)
Expand Down

0 comments on commit 398d17d

Please sign in to comment.