diff --git a/Cargo.lock b/Cargo.lock index 522552a..ff76b6d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -565,7 +565,7 @@ checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" [[package]] name = "migrate-payload-archive" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anyhow", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 55a9d6e..e571bc7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "migrate-payload-archive" -version = "0.1.0" +version = "0.2.0" edition = "2021" [dependencies] @@ -24,7 +24,10 @@ object_store = { version = "0.7.1", default-features = false, features = [ "aws", "gcp", ] } -serde = { version = "1.0.189", default-features = false, features = ["derive", "std"] } +serde = { version = "1.0.189", default-features = false, features = [ + "derive", + "std", +] } serde_json = { version = "1.0.107", default-features = false, features = [ "alloc", "std", diff --git a/src/bin/migrate-from-gcs.rs b/src/bin/migrate-from-gcs.rs index a8c46c1..b6daf52 100644 --- a/src/bin/migrate-from-gcs.rs +++ b/src/bin/migrate-from-gcs.rs @@ -3,39 +3,49 @@ use std::{ fs::File, io::{Read, Write}, path::Path, - time::{Duration, SystemTime, UNIX_EPOCH}, + sync::{atomic::AtomicU64, Arc, Mutex}, + time::{Duration, SystemTime}, }; use chrono::{DateTime, Datelike, Timelike, Utc}; use flate2::{read::GzDecoder, write::GzEncoder, Compression}; -use futures::{FutureExt, TryStreamExt}; +use futures::{channel::mpsc::channel, FutureExt, SinkExt, StreamExt, TryStreamExt}; use lazy_static::lazy_static; use migrate_payload_archive::{env::ENV_CONFIG, log}; use object_store::{ aws::AmazonS3Builder, gcp::GoogleCloudStorageBuilder, ObjectMeta, ObjectStore, RetryConfig, }; use serde::Serialize; -use tokio::time::interval; -use tokio_util::io::StreamReader; -use tracing::{debug, info}; +use tokio::{task::spawn_blocking, time::interval}; +use tokio_util::io::{StreamReader, SyncIoBridge}; +use tracing::{debug, info, trace}; const PROGRESS_FILE_PATH: &str = "progress.json"; -fn read_last_file() -> anyhow::Result { +fn read_progress() -> anyhow::Result> { let progress_file_path = Path::new(PROGRESS_FILE_PATH); if !progress_file_path.exists() { // No progress file found, returning empty string - return Ok(String::new()); + info!("no progress file found"); + return Ok(None); } let mut file = File::open(progress_file_path)?; let mut last_file = String::new(); file.read_to_string(&mut last_file)?; - Ok(last_file) + let mut iter = last_file.split(':'); + let progress = ( + iter.next().unwrap().to_string(), + iter.next().unwrap().to_string(), + ); + info!(last_file = %progress.0, progress_id = %progress.1, "found progress file"); + Ok(Some(progress)) } -fn write_last_file(last_file: &ObjectMeta) -> anyhow::Result<()> { +fn write_progress(last_file: &ObjectMeta, payload_id: &str) -> anyhow::Result<()> { + info!(last_file = %last_file.location, payload_id, "writing progress"); let mut file = File::create(Path::new(PROGRESS_FILE_PATH))?; - file.write_all(last_file.location.to_string().as_bytes())?; + let progress = format!("{}:{}", last_file.location.to_string(), payload_id); + file.write_all(progress.as_bytes())?; Ok(()) } @@ -70,6 +80,7 @@ fn get_ovh_object_store() -> anyhow::Result { let s3_store = AmazonS3Builder::from_env() .with_bucket_name(s3_bucket) .with_retry(RetryConfig::default()) + .with_retry(RetryConfig::default()) .build()?; Ok(s3_store) } @@ -140,99 +151,6 @@ impl fmt::Debug for ExecutionPayload { } } -async fn migrate_bundle( - gcs: &impl ObjectStore, - ovh: &impl ObjectStore, - object: &ObjectMeta, -) -> anyhow::Result<()> { - info!(object = %object.location, size_mib=object.size / 1_000_000, "migrating bundle"); - - // Introduce a counter and a timer - let mut payloads_migrated = 0u64; - let mut timer = interval(Duration::from_secs(10)); - let start_time = SystemTime::now(); - - let payload_stream = gcs.get(&object.location).await?.into_stream(); - let reader = StreamReader::new(payload_stream); - - let (decoded_tx, decoded_rx) = std::sync::mpsc::sync_channel(16); - - let handle = tokio::task::spawn_blocking(move || { - let reader_sync = tokio_util::io::SyncIoBridge::new(reader); - let decoder = GzDecoder::new(reader_sync); - let mut csv_reader = csv::Reader::from_reader(decoder); - let mut iter = csv_reader.byte_records(); - - while let Some(record) = iter.next().transpose().unwrap() { - let execution_payload = { - unsafe { - ExecutionPayload { - block_hash: String::from_utf8_unchecked(record[4].into()), - id: String::from_utf8_unchecked(record[0].into()), - inserted_at: String::from_utf8_unchecked(record[1].into()), - payload: serde_json::from_slice(&record[6]).unwrap(), - proposer_pubkey: String::from_utf8_unchecked(record[3].into()), - slot: std::str::from_utf8_unchecked(&record[2]) - .parse() - .map(Slot) - .unwrap(), - version: String::from_utf8_unchecked(record[5].into()), - } - } - }; - - decoded_tx.send(execution_payload).unwrap(); - } - }); - - while let Ok(payload) = decoded_rx.recv() { - let timestamp_micros = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("expect duration since UNIX_EPOCH to be positive regardless of clock shift") - .as_micros(); - - let block_hash = &payload.block_hash; - - let slot = &payload.slot; - let slot_date_time = slot.date_time(); - let year = slot_date_time.year(); - let month = slot_date_time.month(); - let day = slot_date_time.day(); - let hour = slot_date_time.hour(); - let minute = slot_date_time.minute(); - - let path_string = - format!("old_formats/gcs/{year}/{month:02}/{day:02}/{hour:02}/{minute:02}/{slot}/{timestamp_micros}-{block_hash}.json.gz"); - let path = object_store::path::Path::from(path_string); - - let bytes = serde_json::to_vec(&payload).unwrap(); - let bytes_gz = Vec::new(); - let mut encoder = GzEncoder::new(bytes_gz, Compression::default()); - encoder.write_all(&bytes).unwrap(); - let bytes_gz = encoder.finish().unwrap(); - - ovh.put(&path, bytes_gz.into()).await.unwrap(); - - debug!(object = %path, "migrated"); - - payloads_migrated += 1; // Increment the counter for each payload migrated - - // Check if it's time to report the migration rate - if timer.tick().now_or_never().is_some() { - let elapsed = SystemTime::now() - .duration_since(start_time) - .unwrap() - .as_secs(); - - print_migration_rate(payloads_migrated, elapsed); - } - } - - handle.await?; - - Ok(()) -} - #[tokio::main] async fn main() -> anyhow::Result<()> { log::init(); @@ -241,7 +159,7 @@ async fn main() -> anyhow::Result<()> { let gcs = get_gcs_object_store()?; // Initialize OVH object store - let ovh = get_ovh_object_store()?; + let ovh = &get_ovh_object_store()?; debug!("initialized object stores"); debug!("listing day bundles"); @@ -249,9 +167,14 @@ async fn main() -> anyhow::Result<()> { let mut day_bundle_metas = day_bundle_meta_stream.try_collect::>().await?; debug!("found {} day bundles", day_bundle_metas.len()); - let last_file = read_last_file()?; + let progress = read_progress()?; - day_bundle_metas.retain(|file| file.location.to_string() > last_file); + if let Some((last_file, last_id)) = progress.as_ref() { + info!(last_file = %last_file, last_id, "resuming migration"); + day_bundle_metas.retain(|file| file.location.to_string() >= *last_file); + } else { + info!("starting migration from scratch"); + } if day_bundle_metas.is_empty() { info!("no new files to process"); @@ -261,15 +184,134 @@ async fn main() -> anyhow::Result<()> { // Sort the files by name to make sure we process them in order day_bundle_metas.sort_by(|a, b| a.location.to_string().cmp(&b.location.to_string())); - for file in &day_bundle_metas { - migrate_bundle(&gcs, &ovh, file).await?; + for object_meta in &day_bundle_metas { + info!(object = %object_meta.location, size_mib=object_meta.size / 1_000_000, "migrating bundle"); + + // Introduce a counter and a timer + let payloads_migrated_counter = &AtomicU64::new(0); + let interval_10_seconds = &Arc::new(Mutex::new(interval(Duration::from_secs(10)))); + let start_time = SystemTime::now(); + + let payload_stream = gcs.get(&object_meta.location).await?.into_stream(); + let reader = StreamReader::new(payload_stream); + + const DECODED_BUFFER_SIZE: usize = 32; + let (mut decoded_tx, decoded_rx) = channel(DECODED_BUFFER_SIZE); + + let handle = spawn_blocking(move || { + let reader_sync = SyncIoBridge::new(reader); + let decoder = GzDecoder::new(reader_sync); + let mut csv_reader = csv::Reader::from_reader(decoder); + let mut iter = csv_reader.byte_records(); + + while let Some(record) = iter.next().transpose().unwrap() { + let execution_payload = { + unsafe { + ExecutionPayload { + block_hash: String::from_utf8_unchecked(record[4].into()), + id: String::from_utf8_unchecked(record[0].into()), + inserted_at: String::from_utf8_unchecked(record[1].into()), + payload: serde_json::from_slice(&record[6]).unwrap(), + proposer_pubkey: String::from_utf8_unchecked(record[3].into()), + slot: std::str::from_utf8_unchecked(&record[2]) + .parse() + .map(Slot) + .unwrap(), + version: String::from_utf8_unchecked(record[5].into()), + } + } + }; + + futures::executor::block_on(decoded_tx.send(execution_payload)).unwrap(); + } + }); + + const CONCURRENT_PUT_LIMIT: usize = 1; + + // Skip payloads that have already been processed. + decoded_rx + .skip_while(|payload| { + match progress.as_ref() { + // If there was previous progress + Some((_last_file, last_id)) => { + // And the current payload matches our last progress, process remaining payloads in + // the stream. + if payload.id == *last_id { + debug!(payload_id = %payload.id, "found last processed payload"); + futures::future::ready(false) + } else { + // Otherwise, skip this one. + trace!(payload_id = %payload.id, "skipping payload"); + futures::future::ready(true) + } + } + // If there was no previous progress (first run), process all payloads in the stream. + None => futures::future::ready(false), + } + }) + .map(Ok) + .try_for_each_concurrent(CONCURRENT_PUT_LIMIT, |payload| async move { + let block_hash = payload.block_hash.clone(); + let payload_id = payload.id.clone(); + + debug!(block_hash, payload_id, "storing payload"); + + let slot = &payload.slot; + let slot_date_time = slot.date_time(); + let year = slot_date_time.year(); + let month = slot_date_time.month(); + let day = slot_date_time.day(); + let hour = slot_date_time.hour(); + let minute = slot_date_time.minute(); + + let path_string = + format!("old_formats/gcs/{year}/{month:02}/{day:02}/{hour:02}/{minute:02}/{slot}/{payload_id}-{block_hash}.json.gz"); + let path = object_store::path::Path::from(path_string); + + let payload_id = payload.id.clone(); + + let bytes_gz = spawn_blocking(move || { + let bytes = serde_json::to_vec(&payload).unwrap(); + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + encoder.write_all(&bytes).unwrap(); + encoder.finish().unwrap() + }) + .await + .unwrap(); + + ovh.put(&path, bytes_gz.into()).await.unwrap(); + + let payloads_migrated_count = payloads_migrated_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + + // Check if it's time to report the migration rate + if interval_10_seconds.lock().unwrap().tick().now_or_never().is_some() { + let elapsed = SystemTime::now() + .duration_since(start_time) + .unwrap() + .as_secs(); + + print_migration_rate(payloads_migrated_count, elapsed); + + // As we process concurrently on a sudden shut down, we may lose payloads we + // processed before this one by skipping over them when we resume. + write_progress(&object_meta, &payload_id)?; + } + - write_last_file(file)?; + debug!(block_hash, payload_id, "payload stored"); + + Ok::<_, anyhow::Error>(()) + }) + .await?; + + handle.await?; } // Migration complete, clean up the progress file - if last_file == day_bundle_metas.last().unwrap().location.to_string() { - cleanup_last_file()?; + if let Some((last_file, _row)) = progress { + if last_file == day_bundle_metas.last().unwrap().location.to_string() { + cleanup_last_file()?; + } } Ok(())