Skip to content

Commit

Permalink
fix(services/dropbox): Workaround for dropbox limitations for create_…
Browse files Browse the repository at this point in the history
…folder (#3719)

try to fix create dir

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Dec 6, 2023
1 parent 876b185 commit 989dedb
Show file tree
Hide file tree
Showing 2 changed files with 216 additions and 237 deletions.
174 changes: 42 additions & 132 deletions core/src/services/dropbox/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,17 @@

use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use backon::ExponentialBuilder;
use backon::Retryable;
use http::StatusCode;
use once_cell::sync::Lazy;
use serde::Deserialize;

use super::core::DropboxCore;
use super::error::parse_error;
use super::core::*;
use super::error::*;
use super::writer::DropboxWriter;
use crate::raw::*;
use crate::services::dropbox::error::DropboxErrorResponse;
use crate::*;

static BACKOFF: Lazy<ExponentialBuilder> = Lazy::new(|| {
ExponentialBuilder::default()
.with_max_delay(Duration::from_secs(10))
.with_max_times(10)
.with_jitter()
});

#[derive(Clone, Debug)]
pub struct DropboxBackend {
pub core: Arc<DropboxCore>,
Expand Down Expand Up @@ -96,18 +84,17 @@ impl Accessor for DropboxBackend {
}
}

let resp = self.core.dropbox_create_folder(path).await?;
let status = resp.status();
match status {
StatusCode::OK => Ok(RpCreateDir::default()),
_ => {
let err = parse_error(resp).await?;
match err.kind() {
ErrorKind::AlreadyExists => Ok(RpCreateDir::default()),
_ => Err(err),
}
}
}
// Dropbox has very, very, very strong limitation on the create_folder requests.
//
// Let's try our best to make sure it won't failed for rate limited issues.
let res = { || self.core.dropbox_create_folder(path) }
.retry(&*BACKOFF)
.when(|e| e.is_temporary())
.await
// Set this error to permanent to avoid retrying.
.map_err(|e| e.set_permanent())?;

Ok(res)
}

async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
Expand Down Expand Up @@ -199,115 +186,38 @@ impl Accessor for DropboxBackend {
let paths = ops.into_iter().map(|(p, _)| p).collect::<Vec<_>>();

let resp = self.core.dropbox_delete_batch(paths).await?;
if resp.status() != StatusCode::OK {
return Err(parse_error(resp).await?);
}

let status = resp.status();

match status {
StatusCode::OK => {
let (_parts, body) = resp.into_parts();
let bs = body.bytes().await?;
let decoded_response = serde_json::from_slice::<DropboxDeleteBatchResponse>(&bs)
.map_err(new_json_deserialize_error)?;

match decoded_response.tag.as_str() {
"complete" => {
let entries = decoded_response.entries.unwrap_or_default();
let results = self.core.handle_batch_delete_complete_result(entries);
Ok(RpBatch::new(results))
}
"async_job_id" => {
let job_id = decoded_response
.async_job_id
.expect("async_job_id should be present");
let res = { || self.core.dropbox_delete_batch_check(job_id.clone()) }
.retry(&*BACKOFF)
.when(|e| e.is_temporary())
.await?;
let bs = resp.into_body().bytes().await?;
let decoded_response = serde_json::from_slice::<DropboxDeleteBatchResponse>(&bs)
.map_err(new_json_deserialize_error)?;

Ok(res)
}
_ => Err(Error::new(
ErrorKind::Unexpected,
&format!(
"delete batch failed with unexpected tag {}",
decoded_response.tag
),
)),
}
match decoded_response.tag.as_str() {
"complete" => {
let entries = decoded_response.entries.unwrap_or_default();
let results = self.core.handle_batch_delete_complete_result(entries);
Ok(RpBatch::new(results))
}
_ => Err(parse_error(resp).await?),
"async_job_id" => {
let job_id = decoded_response
.async_job_id
.expect("async_job_id should be present");
let res = { || self.core.dropbox_delete_batch_check(job_id.clone()) }
.retry(&*BACKOFF)
.when(|e| e.is_temporary())
.await?;

Ok(res)
}
_ => Err(Error::new(
ErrorKind::Unexpected,
&format!(
"delete batch failed with unexpected tag {}",
decoded_response.tag
),
)),
}
}
}

#[derive(Default, Debug, Deserialize)]
#[serde(default)]
pub struct DropboxMetadataResponse {
#[serde(rename(deserialize = ".tag"))]
pub tag: String,
pub client_modified: String,
pub content_hash: Option<String>,
pub file_lock_info: Option<DropboxMetadataFileLockInfo>,
pub has_explicit_shared_members: Option<bool>,
pub id: String,
pub is_downloadable: Option<bool>,
pub name: String,
pub path_display: String,
pub path_lower: String,
pub property_groups: Option<Vec<DropboxMetadataPropertyGroup>>,
pub rev: Option<String>,
pub server_modified: Option<String>,
pub sharing_info: Option<DropboxMetadataSharingInfo>,
pub size: Option<u64>,
}

#[derive(Default, Debug, Deserialize)]
#[serde(default)]
pub struct DropboxMetadataFileLockInfo {
pub created: Option<String>,
pub is_lockholder: bool,
pub lockholder_name: Option<String>,
}

#[derive(Default, Debug, Deserialize)]
#[serde(default)]
pub struct DropboxMetadataPropertyGroup {
pub fields: Vec<DropboxMetadataPropertyGroupField>,
pub template_id: String,
}

#[derive(Default, Debug, Deserialize)]
#[serde(default)]
pub struct DropboxMetadataPropertyGroupField {
pub name: String,
pub value: String,
}

#[derive(Default, Debug, Deserialize)]
#[serde(default)]
pub struct DropboxMetadataSharingInfo {
pub modified_by: Option<String>,
pub parent_shared_folder_id: Option<String>,
pub read_only: Option<bool>,
pub shared_folder_id: Option<String>,
pub traverse_only: Option<bool>,
pub no_access: Option<bool>,
}

#[derive(Default, Debug, Deserialize)]
#[serde(default)]
pub struct DropboxDeleteBatchResponse {
#[serde(rename(deserialize = ".tag"))]
pub tag: String,
pub async_job_id: Option<String>,
pub entries: Option<Vec<DropboxDeleteBatchResponseEntry>>,
}

#[derive(Default, Debug, Deserialize)]
#[serde(default)]
pub struct DropboxDeleteBatchResponseEntry {
#[serde(rename(deserialize = ".tag"))]
pub tag: String,
pub metadata: Option<DropboxMetadataResponse>,
pub error: Option<DropboxErrorResponse>,
}
Loading

0 comments on commit 989dedb

Please sign in to comment.