Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial generalized Sink trait #92

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions crates/cli/src/parse/args.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
use std::sync::Arc;

use cryo_freeze::{ExecutionEnv, FileOutput, ParseError, Query, Source};

use super::{execution, query, sink, source};
use crate::args::Args;
use clap_cryo::Parser;

use super::{execution, file_output, query, source};
use cryo_freeze::{ExecutionEnv, ParseError, Query, Sink, Source};
use std::sync::Arc;

/// parse options for running freeze
pub async fn parse_args(
args: &Args,
) -> Result<(Query, Source, FileOutput, ExecutionEnv), ParseError> {
) -> Result<(Query, Source, Arc<dyn Sink>, ExecutionEnv), ParseError> {
let source = source::parse_source(args).await?;
let query = query::parse_query(args, Arc::clone(&source.fetcher)).await?;
let sink = file_output::parse_file_output(args, &source)?;
let env = execution::parse_execution_env(args, query.n_tasks() as u64)?;
let sink = sink::parse_sink(args, &source)?;
Ok((query, source, sink, env))
}

Expand Down
7 changes: 6 additions & 1 deletion crates/cli/src/parse/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,16 @@ pub(crate) fn parse_execution_env(args: &Args, n_tasks: u64) -> Result<Execution
(false, false) => 1,
};

let report_dir = match args.report_dir.clone() {
Some(report_dir) => Some(report_dir),
None => Some(std::path::Path::new(&args.output_dir).join(".cryo/reports")),
};

let builder = ExecutionEnvBuilder::new()
.dry(args.dry)
.verbose(verbose)
.report(!args.no_report)
.report_dir(args.report_dir.clone())
.report_dir(report_dir)
.args(args_str);

let builder = if !args.no_verbose {
Expand Down
25 changes: 3 additions & 22 deletions crates/cli/src/parse/file_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,28 +62,9 @@ pub(crate) fn parse_subdirs(args: &Args) -> Vec<SubDir> {
pub(crate) fn parse_network_name(args: &Args, chain_id: u64) -> String {
match &args.network_name {
Some(name) => name.clone(),
None => match chain_id {
1 => "ethereum".to_string(),
5 => "goerli".to_string(),
10 => "optimism".to_string(),
56 => "bnb".to_string(),
69 => "optimism_kovan".to_string(),
100 => "gnosis".to_string(),
137 => "polygon".to_string(),
420 => "optimism_goerli".to_string(),
1101 => "polygon_zkevm".to_string(),
1442 => "polygon_zkevm_testnet".to_string(),
8453 => "base".to_string(),
10200 => "gnosis_chidao".to_string(),
17000 => "holesky".to_string(),
42161 => "arbitrum".to_string(),
42170 => "arbitrum_nova".to_string(),
43114 => "avalanche".to_string(),
80001 => "polygon_mumbai".to_string(),
84531 => "base_goerli".to_string(),
7777777 => "zora".to_string(),
11155111 => "sepolia".to_string(),
chain_id => "network_".to_string() + chain_id.to_string().as_str(),
None => match cryo_freeze::get_network_name(chain_id) {
Some(name) => name,
None => "network_".to_string() + chain_id.to_string().as_str(),
},
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/cli/src/parse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod parse_utils;
mod partitions;
mod query;
pub(crate) mod schemas;
mod sink;
mod source;

pub use args::*;
Expand Down
10 changes: 10 additions & 0 deletions crates/cli/src/parse/sink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use super::file_output;
use crate::args::Args;
use cryo_freeze::{FileSink, ParseError, Sink, Source};
use std::sync::Arc;

pub(crate) fn parse_sink(args: &Args, source: &Source) -> Result<Arc<dyn Sink>, ParseError> {
let file_output = file_output::parse_file_output(args, source)?;
let sink = FileSink(file_output);
Ok(Arc::new(sink))
}
2 changes: 1 addition & 1 deletion crates/cli/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub async fn run(args: args::Args) -> Result<Option<FreezeSummary>, CollectError
let source = Arc::new(source);
let env = ExecutionEnv { t_start_parse, ..env };
let env = env.set_start_time();
cryo_freeze::freeze(&query, &source, &sink, &env).await
cryo_freeze::freeze(&query, &source, sink, &env).await
}

async fn handle_help_subcommands(args: args::Args) -> Result<Option<FreezeSummary>, CollectError> {
Expand Down
47 changes: 21 additions & 26 deletions crates/freeze/src/freeze.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
use crate::{
collect_partition, dataframes, err, reports, summaries, CollectError, Datatype, ExecutionEnv,
FileOutput, FreezeSummary, MetaDatatype, Partition, Query, Source,
collect_partition, err, reports, summaries, CollectError, Datatype, ExecutionEnv,
FreezeSummary, MetaDatatype, Partition, Query, Sink, Source,
};
use chrono::{DateTime, Local};
use futures::{stream::FuturesUnordered, StreamExt};
use std::{
collections::{HashMap, HashSet},
path::PathBuf,
sync::Arc,
};
use tokio::sync::Semaphore;

type PartitionPayload = (
Partition,
MetaDatatype,
HashMap<Datatype, PathBuf>,
HashMap<Datatype, String>,
Arc<Query>,
Arc<Source>,
FileOutput,
Arc<dyn Sink>,
ExecutionEnv,
Option<std::sync::Arc<Semaphore>>,
);
Expand All @@ -26,18 +25,18 @@ type PartitionPayload = (
pub async fn freeze(
query: &Query,
source: &Source,
sink: &FileOutput,
sink: Arc<dyn Sink>,
env: &ExecutionEnv,
) -> Result<Option<FreezeSummary>, CollectError> {
// check validity of query
query.is_valid()?;

// get partitions
let (payloads, skipping) = get_payloads(query, source, sink, env)?;
let (payloads, skipping) = get_payloads(query, source, &sink, env)?;

// print summary
if env.verbose >= 1 {
summaries::print_cryo_intro(query, source, sink, env, payloads.len() as u64);
summaries::print_cryo_intro(query, source, &sink, env, payloads.len() as u64);
}

// check dry run
Expand All @@ -56,7 +55,7 @@ pub async fn freeze(

// create initial report
if env.report {
reports::write_report(env, query, sink, None)?;
reports::write_report(env, query, &sink, None)?;
};

// perform collection
Expand All @@ -69,7 +68,7 @@ pub async fn freeze(

// create final report
if env.report {
reports::write_report(env, query, sink, Some(&results))?;
reports::write_report(env, query, &sink, Some(&results))?;
};

// return
Expand All @@ -79,7 +78,7 @@ pub async fn freeze(
fn get_payloads(
query: &Query,
source: &Source,
sink: &FileOutput,
sink: &Arc<dyn Sink>,
env: &ExecutionEnv,
) -> Result<(Vec<PartitionPayload>, Vec<Partition>), CollectError> {
let semaphore = source
Expand All @@ -89,29 +88,28 @@ fn get_payloads(
let arc_query = Arc::new(query.clone());
let mut payloads = Vec::new();
let mut skipping = Vec::new();
let mut all_paths = HashSet::new();
let mut all_ids = HashSet::new();
for datatype in query.datatypes.clone().into_iter() {
for partition in query.partitions.clone().into_iter() {
let paths = sink.get_paths(query, &partition, Some(vec![datatype.clone()]))?;
if !sink.overwrite && paths.values().all(|path| path.exists()) {
let ids = sink.get_ids(query, &partition, Some(vec![datatype.clone()]))?;
if !sink.overwrite() && ids.values().all(|path| sink.exists(path)) {
skipping.push(partition);
continue
}

// check for path collisions
let paths_set: HashSet<_> = paths.clone().into_values().collect();
if paths_set.intersection(&all_paths).next().is_none() {
all_paths.extend(paths_set);
let ids_set: HashSet<_> = ids.clone().into_values().collect();
if ids_set.intersection(&all_ids).next().is_none() {
all_ids.extend(ids_set);
} else {
let message =
format!("output path collision: {:?}", paths_set.intersection(&all_paths));
let message = format!("output id collision: {:?}", ids_set.intersection(&all_ids));
return Err(err(&message))
};

let payload = (
partition.clone(),
datatype.clone(),
paths,
ids,
arc_query.clone(),
source.clone(),
sink.clone(),
Expand Down Expand Up @@ -165,7 +163,7 @@ async fn freeze_partitions(
}

async fn freeze_partition(payload: PartitionPayload) -> Result<(), CollectError> {
let (partition, datatype, paths, query, source, sink, env, semaphore) = payload;
let (partition, datatype, ids, query, source, sink, env, semaphore) = payload;

// acquire chunk semaphore
let _permit = match &semaphore {
Expand All @@ -178,11 +176,8 @@ async fn freeze_partition(payload: PartitionPayload) -> Result<(), CollectError>

// write dataframes to disk
for (datatype, mut df) in dfs {
let path = paths.get(&datatype).ok_or_else(|| {
CollectError::CollectError("could not get path for datatype".to_string())
})?;
let result = dataframes::df_to_file(&mut df, path, &sink);
result.map_err(|_| CollectError::CollectError("error writing file".to_string()))?
let id = ids.get(&datatype).ok_or_else(|| err("could not get id for datatype"))?;
sink.sink_df(&mut df, id)?;
}

// update progress bar
Expand Down
3 changes: 2 additions & 1 deletion crates/freeze/src/types/dataframes/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ use crate::types::{FileError, FileOutput};
/// write polars dataframe to file
pub(crate) fn df_to_file(
df: &mut DataFrame,
filename: &Path,
filename: &str,
file_output: &FileOutput,
) -> Result<(), FileError> {
let filename: std::path::PathBuf = filename.into();
let tmp_filename = filename.with_extension("_tmp");
let result = match filename.extension().and_then(|ex| ex.to_str()) {
Some("parquet") => df_to_parquet(df, &tmp_filename, file_output),
Expand Down
27 changes: 27 additions & 0 deletions crates/freeze/src/types/directory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/// get name of network if known
pub fn get_network_name(chain_id: u64) -> Option<String> {
match chain_id {
1 => Some("ethereum"),
5 => Some("goerli"),
10 => Some("optimism"),
56 => Some("bnb"),
69 => Some("optimism_kovan"),
100 => Some("gnosis"),
137 => Some("polygon"),
420 => Some("optimism_goerli"),
1101 => Some("polygon_zkevm"),
1442 => Some("polygon_zkevm_testnet"),
8453 => Some("base"),
10200 => Some("gnosis_chidao"),
17000 => Some("holesky"),
42161 => Some("arbitrum"),
42170 => Some("arbitrum_nova"),
43114 => Some("avalanche"),
80001 => Some("polygon_mumbai"),
84531 => Some("base_goerli"),
7777777 => Some("zora"),
11155111 => Some("sepolia"),
_ => None,
}
.map(|x| x.to_string())
}
19 changes: 0 additions & 19 deletions crates/freeze/src/types/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,22 +111,3 @@ impl FileFormat {
}
}
}

/// Encoding for binary data in a column
#[derive(Clone, Eq, PartialEq, Debug)]
pub enum ColumnEncoding {
/// Raw binary encoding
Binary,
/// Hex binary encoding
Hex,
}

impl ColumnEncoding {
/// convert ColumnEncoding to str
pub fn as_str(&self) -> &'static str {
match *self {
ColumnEncoding::Binary => "binary",
ColumnEncoding::Hex => "hex",
}
}
}
12 changes: 10 additions & 2 deletions crates/freeze/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ pub use chunks::{
pub use conversions::{bytes_to_u32, ToVecHex, ToVecU8};
pub use dataframes::*;
pub use datatypes::*;
pub use files::{ColumnEncoding, FileFormat, FileOutput, SubDir};
pub use files::{FileFormat, FileOutput, SubDir};
pub use queries::{Query, QueryLabels, TimeDimension};
pub use schemas::{ColumnType, SchemaFunctions, Schemas, Table, U256Type};
pub use schemas::{ColumnEncoding, ColumnType, SchemaFunctions, Schemas, Table, U256Type};
pub use sources::{Fetcher, RateLimiter, Source, SourceLabels};
// pub(crate) use summaries::FreezeSummaryAgg;
// pub use summaries::{FreezeChunkSummary, FreezeSummary};
Expand All @@ -74,3 +74,11 @@ pub use signatures::*;
/// decoders
pub mod decoders;
pub use decoders::*;

/// sinks
pub mod sinks;
pub use sinks::*;

/// directory
pub mod directory;
pub use directory::*;
Loading
Loading