diff --git a/src/bin/migrate-from-gcs.rs b/src/bin/migrate-from-gcs.rs index 2180d7f..cb04657 100644 --- a/src/bin/migrate-from-gcs.rs +++ b/src/bin/migrate-from-gcs.rs @@ -8,7 +8,7 @@ use std::{ time::{Duration, SystemTime}, }; -use backoff::{backoff::Backoff, ExponentialBackoff}; +use backoff::ExponentialBackoff; use bytes::Bytes; use chrono::{DateTime, Datelike, Timelike, Utc}; use flate2::{read::GzDecoder, write::GzEncoder, Compression}; @@ -279,23 +279,24 @@ async fn main() -> anyhow::Result<()> { } let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); encoder.write_all(&bytes).unwrap(); - encoder.finish().unwrap() + let bytes_gz = encoder.finish().unwrap(); + Bytes::from(bytes_gz) }) .await .unwrap(); - - let mut backoff = ExponentialBackoff::default(); - let bytes_gz_shared = Bytes::from(bytes_gz); - while let Err(err) = ovh.put(&path, bytes_gz_shared.clone()).await { - if let Some(wait) = backoff.next_backoff() { - warn!("failed to execute OVH put operation: {}, retrying", err); - tokio::time::sleep(wait).await; - continue; - } - error!("failed to execute OVH put operation: {}", err); - break; - } + let op = || async { + ovh.put(&path, bytes_gz.clone()).await.map_err(|err| { + if err.to_string().contains("409 Conflict") { + warn!("failed to execute OVH put operation: {}, retrying", err); + backoff::Error::Transient { err, retry_after: None } + } else { + error!("failed to execute OVH put operation: {}", err); + backoff::Error::Permanent(err) + } + }) + }; + backoff::future::retry(ExponentialBackoff::default(), op).await?; let payloads_migrated_count = payloads_migrated_counter.fetch_add(payloads_count, std::sync::atomic::Ordering::Relaxed);