From 6c78aba04a1d44f502c7e6f38bb098f06c5e09be Mon Sep 17 00:00:00 2001 From: sslivkoff Date: Sat, 21 Oct 2023 18:05:17 -0700 Subject: [PATCH] initial generalized Sink trait --- crates/cli/src/parse/args.rs | 13 ++--- crates/cli/src/parse/execution.rs | 7 ++- crates/cli/src/parse/file_output.rs | 25 ++-------- crates/cli/src/parse/mod.rs | 1 + crates/cli/src/parse/sink.rs | 10 ++++ crates/cli/src/run.rs | 2 +- crates/freeze/src/freeze.rs | 47 ++++++++---------- crates/freeze/src/types/dataframes/export.rs | 3 +- crates/freeze/src/types/directory.rs | 27 ++++++++++ crates/freeze/src/types/files.rs | 19 ------- crates/freeze/src/types/mod.rs | 12 ++++- crates/freeze/src/types/reports.rs | 36 ++++++-------- crates/freeze/src/types/schemas.rs | 21 +++++++- crates/freeze/src/types/sinks/bucket_sink.rs | 39 +++++++++++++++ crates/freeze/src/types/sinks/file_sink.rs | 52 ++++++++++++++++++++ crates/freeze/src/types/sinks/mod.rs | 11 +++++ crates/freeze/src/types/sinks/sink.rs | 50 +++++++++++++++++++ crates/freeze/src/types/summaries.rs | 27 ++++++---- 18 files changed, 291 insertions(+), 111 deletions(-) create mode 100644 crates/cli/src/parse/sink.rs create mode 100644 crates/freeze/src/types/directory.rs create mode 100644 crates/freeze/src/types/sinks/bucket_sink.rs create mode 100644 crates/freeze/src/types/sinks/file_sink.rs create mode 100644 crates/freeze/src/types/sinks/mod.rs create mode 100644 crates/freeze/src/types/sinks/sink.rs diff --git a/crates/cli/src/parse/args.rs b/crates/cli/src/parse/args.rs index f2b03e27..a05b8201 100644 --- a/crates/cli/src/parse/args.rs +++ b/crates/cli/src/parse/args.rs @@ -1,20 +1,17 @@ -use std::sync::Arc; - -use cryo_freeze::{ExecutionEnv, FileOutput, ParseError, Query, Source}; - +use super::{execution, query, sink, source}; use crate::args::Args; use clap_cryo::Parser; - -use super::{execution, file_output, query, source}; +use cryo_freeze::{ExecutionEnv, ParseError, Query, Sink, Source}; +use std::sync::Arc; /// parse options for running freeze pub async fn parse_args( args: &Args, -) -> Result<(Query, Source, FileOutput, ExecutionEnv), ParseError> { +) -> Result<(Query, Source, Arc, ExecutionEnv), ParseError> { let source = source::parse_source(args).await?; let query = query::parse_query(args, Arc::clone(&source.fetcher)).await?; - let sink = file_output::parse_file_output(args, &source)?; let env = execution::parse_execution_env(args, query.n_tasks() as u64)?; + let sink = sink::parse_sink(args, &source)?; Ok((query, source, sink, env)) } diff --git a/crates/cli/src/parse/execution.rs b/crates/cli/src/parse/execution.rs index 949a6356..e3acb117 100644 --- a/crates/cli/src/parse/execution.rs +++ b/crates/cli/src/parse/execution.rs @@ -12,11 +12,16 @@ pub(crate) fn parse_execution_env(args: &Args, n_tasks: u64) -> Result 1, }; + let report_dir = match args.report_dir.clone() { + Some(report_dir) => Some(report_dir), + None => Some(std::path::Path::new(&args.output_dir).join(".cryo/reports")), + }; + let builder = ExecutionEnvBuilder::new() .dry(args.dry) .verbose(verbose) .report(!args.no_report) - .report_dir(args.report_dir.clone()) + .report_dir(report_dir) .args(args_str); let builder = if !args.no_verbose { diff --git a/crates/cli/src/parse/file_output.rs b/crates/cli/src/parse/file_output.rs index f4bca65a..2bf2d515 100644 --- a/crates/cli/src/parse/file_output.rs +++ b/crates/cli/src/parse/file_output.rs @@ -62,28 +62,9 @@ pub(crate) fn parse_subdirs(args: &Args) -> Vec { pub(crate) fn parse_network_name(args: &Args, chain_id: u64) -> String { match &args.network_name { Some(name) => name.clone(), - None => match chain_id { - 1 => "ethereum".to_string(), - 5 => "goerli".to_string(), - 10 => "optimism".to_string(), - 56 => "bnb".to_string(), - 69 => "optimism_kovan".to_string(), - 100 => "gnosis".to_string(), - 137 => "polygon".to_string(), - 420 => "optimism_goerli".to_string(), - 1101 => "polygon_zkevm".to_string(), - 1442 => "polygon_zkevm_testnet".to_string(), - 8453 => "base".to_string(), - 10200 => "gnosis_chidao".to_string(), - 17000 => "holesky".to_string(), - 42161 => "arbitrum".to_string(), - 42170 => "arbitrum_nova".to_string(), - 43114 => "avalanche".to_string(), - 80001 => "polygon_mumbai".to_string(), - 84531 => "base_goerli".to_string(), - 7777777 => "zora".to_string(), - 11155111 => "sepolia".to_string(), - chain_id => "network_".to_string() + chain_id.to_string().as_str(), + None => match cryo_freeze::get_network_name(chain_id) { + Some(name) => name, + None => "network_".to_string() + chain_id.to_string().as_str(), }, } } diff --git a/crates/cli/src/parse/mod.rs b/crates/cli/src/parse/mod.rs index b1c48874..0cd6281c 100644 --- a/crates/cli/src/parse/mod.rs +++ b/crates/cli/src/parse/mod.rs @@ -6,6 +6,7 @@ mod parse_utils; mod partitions; mod query; pub(crate) mod schemas; +mod sink; mod source; pub use args::*; diff --git a/crates/cli/src/parse/sink.rs b/crates/cli/src/parse/sink.rs new file mode 100644 index 00000000..9dab4ee1 --- /dev/null +++ b/crates/cli/src/parse/sink.rs @@ -0,0 +1,10 @@ +use super::file_output; +use crate::args::Args; +use cryo_freeze::{FileSink, ParseError, Sink, Source}; +use std::sync::Arc; + +pub(crate) fn parse_sink(args: &Args, source: &Source) -> Result, ParseError> { + let file_output = file_output::parse_file_output(args, source)?; + let sink = FileSink(file_output); + Ok(Arc::new(sink)) +} diff --git a/crates/cli/src/run.rs b/crates/cli/src/run.rs index 97f1def8..1e7671ac 100644 --- a/crates/cli/src/run.rs +++ b/crates/cli/src/run.rs @@ -51,7 +51,7 @@ pub async fn run(args: args::Args) -> Result, CollectError let source = Arc::new(source); let env = ExecutionEnv { t_start_parse, ..env }; let env = env.set_start_time(); - cryo_freeze::freeze(&query, &source, &sink, &env).await + cryo_freeze::freeze(&query, &source, sink, &env).await } async fn handle_help_subcommands(args: args::Args) -> Result, CollectError> { diff --git a/crates/freeze/src/freeze.rs b/crates/freeze/src/freeze.rs index 805baa4e..b2de6298 100644 --- a/crates/freeze/src/freeze.rs +++ b/crates/freeze/src/freeze.rs @@ -1,12 +1,11 @@ use crate::{ - collect_partition, dataframes, err, reports, summaries, CollectError, Datatype, ExecutionEnv, - FileOutput, FreezeSummary, MetaDatatype, Partition, Query, Source, + collect_partition, err, reports, summaries, CollectError, Datatype, ExecutionEnv, + FreezeSummary, MetaDatatype, Partition, Query, Sink, Source, }; use chrono::{DateTime, Local}; use futures::{stream::FuturesUnordered, StreamExt}; use std::{ collections::{HashMap, HashSet}, - path::PathBuf, sync::Arc, }; use tokio::sync::Semaphore; @@ -14,10 +13,10 @@ use tokio::sync::Semaphore; type PartitionPayload = ( Partition, MetaDatatype, - HashMap, + HashMap, Arc, Arc, - FileOutput, + Arc, ExecutionEnv, Option>, ); @@ -26,18 +25,18 @@ type PartitionPayload = ( pub async fn freeze( query: &Query, source: &Source, - sink: &FileOutput, + sink: Arc, env: &ExecutionEnv, ) -> Result, CollectError> { // check validity of query query.is_valid()?; // get partitions - let (payloads, skipping) = get_payloads(query, source, sink, env)?; + let (payloads, skipping) = get_payloads(query, source, &sink, env)?; // print summary if env.verbose >= 1 { - summaries::print_cryo_intro(query, source, sink, env, payloads.len() as u64); + summaries::print_cryo_intro(query, source, &sink, env, payloads.len() as u64); } // check dry run @@ -56,7 +55,7 @@ pub async fn freeze( // create initial report if env.report { - reports::write_report(env, query, sink, None)?; + reports::write_report(env, query, &sink, None)?; }; // perform collection @@ -69,7 +68,7 @@ pub async fn freeze( // create final report if env.report { - reports::write_report(env, query, sink, Some(&results))?; + reports::write_report(env, query, &sink, Some(&results))?; }; // return @@ -79,7 +78,7 @@ pub async fn freeze( fn get_payloads( query: &Query, source: &Source, - sink: &FileOutput, + sink: &Arc, env: &ExecutionEnv, ) -> Result<(Vec, Vec), CollectError> { let semaphore = source @@ -89,29 +88,28 @@ fn get_payloads( let arc_query = Arc::new(query.clone()); let mut payloads = Vec::new(); let mut skipping = Vec::new(); - let mut all_paths = HashSet::new(); + let mut all_ids = HashSet::new(); for datatype in query.datatypes.clone().into_iter() { for partition in query.partitions.clone().into_iter() { - let paths = sink.get_paths(query, &partition, Some(vec![datatype.clone()]))?; - if !sink.overwrite && paths.values().all(|path| path.exists()) { + let ids = sink.get_ids(query, &partition, Some(vec![datatype.clone()]))?; + if !sink.overwrite() && ids.values().all(|path| sink.exists(path)) { skipping.push(partition); continue } // check for path collisions - let paths_set: HashSet<_> = paths.clone().into_values().collect(); - if paths_set.intersection(&all_paths).next().is_none() { - all_paths.extend(paths_set); + let ids_set: HashSet<_> = ids.clone().into_values().collect(); + if ids_set.intersection(&all_ids).next().is_none() { + all_ids.extend(ids_set); } else { - let message = - format!("output path collision: {:?}", paths_set.intersection(&all_paths)); + let message = format!("output id collision: {:?}", ids_set.intersection(&all_ids)); return Err(err(&message)) }; let payload = ( partition.clone(), datatype.clone(), - paths, + ids, arc_query.clone(), source.clone(), sink.clone(), @@ -165,7 +163,7 @@ async fn freeze_partitions( } async fn freeze_partition(payload: PartitionPayload) -> Result<(), CollectError> { - let (partition, datatype, paths, query, source, sink, env, semaphore) = payload; + let (partition, datatype, ids, query, source, sink, env, semaphore) = payload; // acquire chunk semaphore let _permit = match &semaphore { @@ -178,11 +176,8 @@ async fn freeze_partition(payload: PartitionPayload) -> Result<(), CollectError> // write dataframes to disk for (datatype, mut df) in dfs { - let path = paths.get(&datatype).ok_or_else(|| { - CollectError::CollectError("could not get path for datatype".to_string()) - })?; - let result = dataframes::df_to_file(&mut df, path, &sink); - result.map_err(|_| CollectError::CollectError("error writing file".to_string()))? + let id = ids.get(&datatype).ok_or_else(|| err("could not get id for datatype"))?; + sink.sink_df(&mut df, id)?; } // update progress bar diff --git a/crates/freeze/src/types/dataframes/export.rs b/crates/freeze/src/types/dataframes/export.rs index 1e544988..0d042115 100644 --- a/crates/freeze/src/types/dataframes/export.rs +++ b/crates/freeze/src/types/dataframes/export.rs @@ -7,9 +7,10 @@ use crate::types::{FileError, FileOutput}; /// write polars dataframe to file pub(crate) fn df_to_file( df: &mut DataFrame, - filename: &Path, + filename: &str, file_output: &FileOutput, ) -> Result<(), FileError> { + let filename: std::path::PathBuf = filename.into(); let tmp_filename = filename.with_extension("_tmp"); let result = match filename.extension().and_then(|ex| ex.to_str()) { Some("parquet") => df_to_parquet(df, &tmp_filename, file_output), diff --git a/crates/freeze/src/types/directory.rs b/crates/freeze/src/types/directory.rs new file mode 100644 index 00000000..0282d025 --- /dev/null +++ b/crates/freeze/src/types/directory.rs @@ -0,0 +1,27 @@ +/// get name of network if known +pub fn get_network_name(chain_id: u64) -> Option { + match chain_id { + 1 => Some("ethereum"), + 5 => Some("goerli"), + 10 => Some("optimism"), + 56 => Some("bnb"), + 69 => Some("optimism_kovan"), + 100 => Some("gnosis"), + 137 => Some("polygon"), + 420 => Some("optimism_goerli"), + 1101 => Some("polygon_zkevm"), + 1442 => Some("polygon_zkevm_testnet"), + 8453 => Some("base"), + 10200 => Some("gnosis_chidao"), + 17000 => Some("holesky"), + 42161 => Some("arbitrum"), + 42170 => Some("arbitrum_nova"), + 43114 => Some("avalanche"), + 80001 => Some("polygon_mumbai"), + 84531 => Some("base_goerli"), + 7777777 => Some("zora"), + 11155111 => Some("sepolia"), + _ => None, + } + .map(|x| x.to_string()) +} diff --git a/crates/freeze/src/types/files.rs b/crates/freeze/src/types/files.rs index 0dffb25c..47791655 100644 --- a/crates/freeze/src/types/files.rs +++ b/crates/freeze/src/types/files.rs @@ -111,22 +111,3 @@ impl FileFormat { } } } - -/// Encoding for binary data in a column -#[derive(Clone, Eq, PartialEq, Debug)] -pub enum ColumnEncoding { - /// Raw binary encoding - Binary, - /// Hex binary encoding - Hex, -} - -impl ColumnEncoding { - /// convert ColumnEncoding to str - pub fn as_str(&self) -> &'static str { - match *self { - ColumnEncoding::Binary => "binary", - ColumnEncoding::Hex => "hex", - } - } -} diff --git a/crates/freeze/src/types/mod.rs b/crates/freeze/src/types/mod.rs index d4eb84c3..216cebfb 100644 --- a/crates/freeze/src/types/mod.rs +++ b/crates/freeze/src/types/mod.rs @@ -56,9 +56,9 @@ pub use chunks::{ pub use conversions::{bytes_to_u32, ToVecHex, ToVecU8}; pub use dataframes::*; pub use datatypes::*; -pub use files::{ColumnEncoding, FileFormat, FileOutput, SubDir}; +pub use files::{FileFormat, FileOutput, SubDir}; pub use queries::{Query, QueryLabels, TimeDimension}; -pub use schemas::{ColumnType, SchemaFunctions, Schemas, Table, U256Type}; +pub use schemas::{ColumnEncoding, ColumnType, SchemaFunctions, Schemas, Table, U256Type}; pub use sources::{Fetcher, RateLimiter, Source, SourceLabels}; // pub(crate) use summaries::FreezeSummaryAgg; // pub use summaries::{FreezeChunkSummary, FreezeSummary}; @@ -74,3 +74,11 @@ pub use signatures::*; /// decoders pub mod decoders; pub use decoders::*; + +/// sinks +pub mod sinks; +pub use sinks::*; + +/// directory +pub mod directory; +pub use directory::*; diff --git a/crates/freeze/src/types/reports.rs b/crates/freeze/src/types/reports.rs index ca6e5736..2b10df84 100644 --- a/crates/freeze/src/types/reports.rs +++ b/crates/freeze/src/types/reports.rs @@ -1,10 +1,6 @@ -use crate::{err, CollectError, ExecutionEnv, FileOutput, FreezeSummary, Query}; +use crate::{err, CollectError, ExecutionEnv, FreezeSummary, Query, Sink}; use chrono::{DateTime, Local}; -use std::{ - fs::File, - io::Write, - path::{Path, PathBuf}, -}; +use std::{fs::File, io::Write, path::PathBuf, sync::Arc}; #[derive(serde::Serialize, Debug)] struct FreezeReport { @@ -17,20 +13,18 @@ struct FreezeReport { #[derive(serde::Serialize, Debug)] struct SerializedFreezeSummary { - completed_paths: Vec, - errored_paths: Vec, + completed_paths: Vec, + errored_paths: Vec, n_skipped: u64, } pub(crate) fn get_report_path( env: &ExecutionEnv, - sink: &FileOutput, is_complete: bool, ) -> Result { - // create directory - let report_dir = match &env.report_dir { - Some(report_dir) => Path::new(&report_dir).into(), - None => Path::new(&sink.output_dir).join(".cryo/reports"), + let report_dir = match env.report_dir.clone() { + Some(report_dir) => report_dir, + None => return Err(err("report dir unspecified")), }; std::fs::create_dir_all(&report_dir) .map_err(|_| CollectError::CollectError("could not create report dir".to_string()))?; @@ -51,7 +45,7 @@ pub(crate) fn get_report_path( pub(crate) fn write_report( env: &ExecutionEnv, query: &Query, - sink: &FileOutput, + sink: &Arc, freeze_summary: Option<&FreezeSummary>, ) -> Result { // determine version @@ -70,7 +64,7 @@ pub(crate) fn write_report( .map_err(|_| CollectError::CollectError("could not serialize report".to_string()))?; // create path - let path = get_report_path(env, sink, freeze_summary.is_some())?; + let path = get_report_path(env, freeze_summary.is_some())?; // save to file let mut file = File::create(&path) @@ -80,7 +74,7 @@ pub(crate) fn write_report( // delete initial report if freeze_summary.is_some() { - let incomplete_path = get_report_path(env, sink, false)?; + let incomplete_path = get_report_path(env, false)?; std::fs::remove_file(incomplete_path) .map_err(|_| err("could not delete initial report file"))?; } @@ -91,13 +85,13 @@ pub(crate) fn write_report( fn serialize_summary( summary: &FreezeSummary, query: &Query, - sink: &FileOutput, + sink: &Arc, ) -> Result { - let completed_paths: Vec = summary + let completed_paths: Vec = summary .completed .iter() .map(|partition| { - sink.get_paths(query, partition, None) + sink.get_ids(query, partition, None) .map(|paths| paths.values().cloned().collect::>()) }) .collect::, _>>()? @@ -105,12 +99,12 @@ fn serialize_summary( .flatten() .collect(); - let errored_paths: Vec = summary + let errored_paths: Vec = summary .errored .iter() .filter_map(|(partition_option, _error)| { partition_option.as_ref().map(|partition| { - sink.get_paths(query, partition, None) + sink.get_ids(query, partition, None) .map(|paths| paths.values().cloned().collect::>()) }) }) diff --git a/crates/freeze/src/types/schemas.rs b/crates/freeze/src/types/schemas.rs index c0a2be92..d95edc92 100644 --- a/crates/freeze/src/types/schemas.rs +++ b/crates/freeze/src/types/schemas.rs @@ -1,7 +1,7 @@ /// types and functions related to schemas use std::collections::{HashMap, HashSet}; -use crate::{err, CollectError, ColumnEncoding, Datatype, LogDecoder}; +use crate::{err, CollectError, Datatype, LogDecoder}; use indexmap::{IndexMap, IndexSet}; use thiserror::Error; @@ -162,6 +162,25 @@ pub enum SchemaError { InvalidColumn, } +/// Encoding for binary data in a column +#[derive(Clone, Eq, PartialEq, Debug)] +pub enum ColumnEncoding { + /// Raw binary encoding + Binary, + /// Hex binary encoding + Hex, +} + +impl ColumnEncoding { + /// convert ColumnEncoding to str + pub fn as_str(&self) -> &'static str { + match *self { + ColumnEncoding::Binary => "binary", + ColumnEncoding::Hex => "hex", + } + } +} + impl Datatype { /// get schema for a particular datatype #[allow(clippy::too_many_arguments)] diff --git a/crates/freeze/src/types/sinks/bucket_sink.rs b/crates/freeze/src/types/sinks/bucket_sink.rs new file mode 100644 index 00000000..708a9f8c --- /dev/null +++ b/crates/freeze/src/types/sinks/bucket_sink.rs @@ -0,0 +1,39 @@ +use crate::*; +use polars::prelude::*; + +/// Bucket sink +pub struct BucketSink { + /// file output + pub file_output: FileOutput, +} + +impl Sink for BucketSink { + fn exists(&self, _id: &str) -> bool { + todo!() + } + + fn get_id( + &self, + _query: &Query, + _partition: &Partition, + _datatype: Datatype, + ) -> Result { + todo!() + } + + fn sink_df(&self, _dataframe: &mut DataFrame, _id: &str) -> Result<(), CollectError> { + todo!() + } + + fn overwrite(&self) -> bool { + todo!() + } + + fn output_format(&self) -> String { + todo!() + } + + fn output_location(&self) -> Result { + todo!() + } +} diff --git a/crates/freeze/src/types/sinks/file_sink.rs b/crates/freeze/src/types/sinks/file_sink.rs new file mode 100644 index 00000000..4e1fe3ed --- /dev/null +++ b/crates/freeze/src/types/sinks/file_sink.rs @@ -0,0 +1,52 @@ +use crate::*; +use polars::prelude::*; + +/// file sink +pub struct FileSink(pub FileOutput); + +impl Sink for FileSink { + fn exists(&self, id: &str) -> bool { + let path: std::path::PathBuf = id.into(); + path.exists() + } + + fn get_id( + &self, + query: &Query, + partition: &Partition, + datatype: Datatype, + ) -> Result { + let FileSink(file_output) = self; + file_output + .get_path(query, partition, datatype)? + .into_os_string() + .into_string() + .map_err(|_| err("could not convert path to string")) + } + + fn sink_df(&self, dataframe: &mut DataFrame, id: &str) -> Result<(), CollectError> { + let FileSink(file_output) = self; + dataframes::df_to_file(dataframe, id, file_output) + .map_err(|_| CollectError::CollectError("error writing file".to_string())) + } + + fn overwrite(&self) -> bool { + let FileSink(output) = self; + output.overwrite + } + + fn output_location(&self) -> Result { + let FileSink(file_output) = self; + file_output + .output_dir + .clone() + .into_os_string() + .into_string() + .map_err(|_| err("could not convert output directory to string")) + } + + fn output_format(&self) -> String { + let FileSink(file_output) = self; + file_output.format.as_str().to_string() + } +} diff --git a/crates/freeze/src/types/sinks/mod.rs b/crates/freeze/src/types/sinks/mod.rs new file mode 100644 index 00000000..c144bbf1 --- /dev/null +++ b/crates/freeze/src/types/sinks/mod.rs @@ -0,0 +1,11 @@ +/// sink trait +pub mod sink; +pub use sink::Sink; + +/// file sink +pub mod file_sink; +pub use file_sink::FileSink; + +/// bucket sink sink +pub mod bucket_sink; +pub use bucket_sink::BucketSink; diff --git a/crates/freeze/src/types/sinks/sink.rs b/crates/freeze/src/types/sinks/sink.rs new file mode 100644 index 00000000..10b3f40b --- /dev/null +++ b/crates/freeze/src/types/sinks/sink.rs @@ -0,0 +1,50 @@ +use crate::*; +use polars::prelude::*; +use std::collections::HashMap; + +/// data sink +pub trait Sink: Sync + Send { + /// check whether data with id exists + fn exists(&self, id: &str) -> bool; + + /// get id of data + fn get_id( + &self, + query: &Query, + partition: &Partition, + datatype: Datatype, + ) -> Result; + + /// send data to sink + fn sink_df(&self, dataframe: &mut DataFrame, id: &str) -> Result<(), CollectError>; + + /// overwrite data if it already exists + fn overwrite(&self) -> bool; + + /// String representation of output format (e.g. parquet, csv, json) + fn output_format(&self) -> String; + + /// String representation of output location (e.g. root output directory) + fn output_location(&self) -> Result; + + /// get multiple ids at once + fn get_ids( + &self, + query: &Query, + partition: &Partition, + meta_datatypes: Option>, + ) -> Result, CollectError> { + let mut ids = HashMap::new(); + let meta_datatypes = if let Some(meta_datatypes) = meta_datatypes { + meta_datatypes + } else { + query.datatypes.clone() + }; + for meta_datatype in meta_datatypes.iter() { + for datatype in meta_datatype.datatypes().into_iter() { + ids.insert(datatype, self.get_id(query, partition, datatype)?); + } + } + Ok(ids) + } +} diff --git a/crates/freeze/src/types/summaries.rs b/crates/freeze/src/types/summaries.rs index f26ee41b..af5cc464 100644 --- a/crates/freeze/src/types/summaries.rs +++ b/crates/freeze/src/types/summaries.rs @@ -2,11 +2,12 @@ use std::collections::HashMap; use chrono::{DateTime, Local}; use colored::Colorize; +use std::sync::Arc; use thousands::Separable; use crate::{ - chunks::chunk_ops::ValueToString, ChunkData, ChunkStats, CollectError, ColumnType, Datatype, - Dim, ExecutionEnv, FileOutput, MultiDatatype, Partition, Query, Source, Table, + chunks::chunk_ops::ValueToString, get_network_name, ChunkData, ChunkStats, CollectError, + ColumnType, Datatype, Dim, ExecutionEnv, MultiDatatype, Partition, Query, Sink, Source, Table, }; use std::path::PathBuf; @@ -134,7 +135,7 @@ fn print_bullet_indent, B: AsRef>(key: A, value: B, indent: u pub(crate) fn print_cryo_intro( query: &Query, source: &Source, - sink: &FileOutput, + sink: &Arc, env: &ExecutionEnv, n_chunks_remaining: u64, ) { @@ -152,8 +153,13 @@ pub(crate) fn print_cryo_intro( print_bullet_indent("exclude failed items", query.exclude_failed.to_string(), 4); } + let network = match get_network_name(source.chain_id) { + Some(name) => name, + None => source.chain_id.to_string(), + }; + print_bullet("source", ""); - print_bullet_indent("network", &sink.prefix, 4); + print_bullet_indent("network", network, 4); print_bullet_indent("rpc url", &source.rpc_url, 4); match source.labels.max_requests_per_second { Some(max_requests_per_second) => print_bullet_indent( @@ -219,15 +225,18 @@ pub(crate) fn print_cryo_intro( (n_datatypes * query.partitions.len()).separate_with_commas() ); print_bullet_indent("chunks to collect", chunk_text, 4); - print_bullet_indent("output format", sink.format.as_str(), 4); - print_bullet_indent("output dir", sink.output_dir.clone().to_string_lossy(), 4); + print_bullet_indent("output format", sink.output_format(), 4); + let output_dir = match sink.output_location() { + Ok(output_dir) => output_dir, + Err(_) => "bad output location".to_string(), + }; + print_bullet_indent("output dir", output_dir.clone(), 4); // print report path let report_path = if env.report && n_chunks_remaining > 0 { - match super::reports::get_report_path(env, sink, true) { + match super::reports::get_report_path(env, true) { Ok(report_path) => { - let stripped_path: PathBuf = match report_path.strip_prefix(sink.output_dir.clone()) - { + let stripped_path: PathBuf = match report_path.strip_prefix(output_dir) { Ok(stripped) => PathBuf::from("$OUTPUT_DIR").join(PathBuf::from(stripped)), Err(_) => report_path, };