Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add empty chunk handling, importing python crate dependencies, tx filtering by address, support for new datatypes in python crate #147

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
4 changes: 4 additions & 0 deletions crates/cli/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ pub struct Args {
#[arg(long, help_heading = "Output Options")]
pub no_report: bool,

/// Skip writing to dataframe if the df is empty
#[arg(long, help_heading = "Acquisition Options")]
pub skip_empty: Option<bool>,

/// Address(es)
#[arg(long, help_heading = "Dataset-specific Options", num_args(1..))]
pub address: Option<Vec<String>>,
Expand Down
1 change: 1 addition & 0 deletions crates/cli/src/parse/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub(crate) fn parse_execution_env(args: &Args, n_tasks: u64) -> Result<Execution
.verbose(verbose)
.report(!args.no_report)
.report_dir(args.report_dir.clone())
.skip_empty(args.skip_empty)
.args(args_str);

let builder = if !args.no_verbose {
Expand Down
59 changes: 51 additions & 8 deletions crates/freeze/src/datasets/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl Dataset for Transactions {
}

fn optional_parameters() -> Vec<Dim> {
vec![Dim::FromAddress, Dim::ToAddress]
vec![Dim::Address, Dim::FromAddress, Dim::ToAddress]
}
}

Expand All @@ -65,8 +65,22 @@ pub type TransactionAndReceipt = (Transaction, Option<TransactionReceipt>);
#[async_trait::async_trait]
impl CollectByBlock for Transactions {
type Response = (Block<Transaction>, Vec<TransactionAndReceipt>, bool);

async fn extract(request: Params, source: Arc<Source>, query: Arc<Query>) -> R<Self::Response> {
fn get_addresses() -> Vec<H160> {
let env = ExecutionEnv::default();
let cli_command = env.cli_command.unwrap();
if let Some(address_index) = cli_command.iter().position(|arg| arg == "--address") {
cli_command[address_index + 1..]
.to_vec()
.iter()
.take_while(|&arg| !arg.starts_with("--"))
.map(|s| s.parse::<H160>().expect("Invalid H160"))
.collect::<Vec<H160>>()
} else {
Vec::new()
}
}

let block = source
.get_block_with_txs(request.block_number()?)
.await?
Expand All @@ -77,7 +91,7 @@ impl CollectByBlock for Transactions {
// filter by from_address
let from_filter: Box<dyn Fn(&Transaction) -> bool + Send> =
if let Some(from_address) = &request.from_address {
Box::new(move |tx| tx.from.as_bytes() == from_address)
Box::new(move |tx| from_address == tx.from.as_bytes())
} else {
Box::new(|_| true)
};
Expand All @@ -88,16 +102,45 @@ impl CollectByBlock for Transactions {
} else {
Box::new(|_| true)
};
let transactions =
block.transactions.clone().into_iter().filter(from_filter).filter(to_filter).collect();
// filter by addresses (if either the to or from address are in the vector of addresses)
let addresses = get_addresses();
let addr_filter: Box<dyn Fn(&Transaction) -> bool + Send> =
if let Some(address) = &request.address {
Box::new(move |tx| {
let to_address_is_in = addresses.contains(&tx.to.unwrap());
let from_address_is_in = addresses.contains(&tx.from);
if addresses.len() == 1 {
to_address_is_in || from_address_is_in
} else {
if !(to_address_is_in && from_address_is_in) {
tx.to.as_ref().map_or(false, |x| x.as_bytes() == address)
} else {
tx.from.as_bytes() == address
}
}
})
} else {
Box::new(|_| true)
};
let transactions = block
.transactions
.clone()
.into_iter()
.filter(from_filter)
.filter(to_filter)
.filter(addr_filter)
.collect();

// 2. collect receipts if necessary
// if transactions are filtered fetch by set of transaction hashes, else fetch all receipts
// in block
let receipts: Vec<Option<_>> =
if schema.has_column("gas_used") | schema.has_column("success") {
// receipts required
let receipts = if request.from_address.is_some() || request.to_address.is_some() {
let receipts = if request.from_address.is_some() ||
request.to_address.is_some() ||
request.address.is_some()
{
source.get_tx_receipts(&transactions).await?
} else {
source.get_tx_receipts_in_block(&block).await?
Expand All @@ -107,8 +150,8 @@ impl CollectByBlock for Transactions {
vec![None; block.transactions.len()]
};

let transactions_with_receips = transactions.into_iter().zip(receipts).collect();
Ok((block, transactions_with_receips, query.exclude_failed))
let transactions_with_receipts = transactions.into_iter().zip(receipts).collect();
Ok((block, transactions_with_receipts, query.exclude_failed))
}

fn transform(response: Self::Response, columns: &mut Self, query: &Arc<Query>) -> R<()> {
Expand Down
6 changes: 5 additions & 1 deletion crates/freeze/src/freeze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,11 @@ async fn freeze_partition(payload: PartitionPayload) -> Result<u64, CollectError
// write dataframes to disk
let mut n_rows = 0;
for (datatype, mut df) in dfs {
n_rows += df.height() as u64;
let df_height = df.height() as u64;
if env.skip_empty == Some(true) && df_height == 0 {
continue;
};
n_rows += df_height;
let path = paths.get(&datatype).ok_or_else(|| {
CollectError::CollectError("could not get path for datatype".to_string())
})?;
Expand Down
10 changes: 10 additions & 0 deletions crates/freeze/src/types/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ pub struct ExecutionEnv {
pub t_end: Option<SystemTime>,
/// report directory
pub report_dir: Option<PathBuf>,
/// skips writing empty dfs
pub skip_empty: Option<bool>,
}

impl ExecutionEnv {
Expand Down Expand Up @@ -67,6 +69,7 @@ pub struct ExecutionEnvBuilder {
t_start: SystemTime,
t_end: Option<SystemTime>,
report_dir: Option<PathBuf>,
skip_empty: Option<bool>,
}

impl Default for ExecutionEnvBuilder {
Expand All @@ -82,6 +85,7 @@ impl Default for ExecutionEnvBuilder {
t_start: SystemTime::now(),
t_end: None,
report_dir: None,
skip_empty: Some(false),
}
}
}
Expand Down Expand Up @@ -134,6 +138,11 @@ impl ExecutionEnvBuilder {
self
}

/// skip_empty
pub fn skip_empty(mut self, skip_empty: Option<bool>) -> Self {
self.skip_empty = skip_empty;
self
}
/// build final output
pub fn build(self) -> ExecutionEnv {
ExecutionEnv {
Expand All @@ -147,6 +156,7 @@ impl ExecutionEnvBuilder {
t_start: self.t_start,
t_end: self.t_end,
report_dir: self.report_dir,
skip_empty: self.skip_empty,
}
}
}
3 changes: 3 additions & 0 deletions crates/python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ build-backend = "maturin"
name = "cryo"
description = "cryo is the easiest way to extract blockchain data to parquet, csv, json, or a python dataframe."
requires-python = ">=3.7"
dependencies = ["polars","pyarrow"]
classifiers = [
"Programming Language :: Rust",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
]
readme = "README.md"

[project.optional-dependencies]
pandas = ["pandas>=2.1.3"]

[tool.maturin]
python-source = "python"
Expand Down
5 changes: 3 additions & 2 deletions crates/python/python/cryo/_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ def parse_cli_args(
else:
raise Exception('unknown file_format')

kwargs['no_verbose'] = not verbose

if 'no_verbose' not in kwargs.keys():
kwargs['no_verbose'] = not verbose

return kwargs

10 changes: 10 additions & 0 deletions crates/python/python/cryo/_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,14 @@ class CryoCliArgs(TypedDict, total=False):
topic3: str | bytes | None
inner_request_size: int | None
no_verbose: bool
address: typing.Sequence[str] | None
to_address: typing.Sequence[str] | None
from_address: typing.Sequence[str] | None
call_data: typing.Sequence[str] | None
function: typing.Sequence[str] | None
inputs: typing.Sequence[str] | None
slot: typing.Sequence[str] | None
event_signature: str | bytes | None
inner_request_size: int | None
js_tracer: str | bytes | None

3 changes: 3 additions & 0 deletions crates/python/rust/collect_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ use cryo_freeze::collect;
verbose = false,
no_verbose = false,
event_signature = None,
skip_empty = Some(false),
)
)]
#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -125,6 +126,7 @@ pub fn _collect(
verbose: bool,
no_verbose: bool,
event_signature: Option<String>,
skip_empty: Option<bool>,
) -> PyResult<&PyAny> {
if let Some(command) = command {
pyo3_asyncio::tokio::future_into_py(py, async move {
Expand Down Expand Up @@ -190,6 +192,7 @@ pub fn _collect(
verbose,
no_verbose,
event_signature,
skip_empty,
};
pyo3_asyncio::tokio::future_into_py(py, async move {
match run_collect(args).await {
Expand Down
3 changes: 3 additions & 0 deletions crates/python/rust/freeze_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ use cryo_cli::{run, Args};
verbose = false,
no_verbose = false,
event_signature = None,
skip_empty = Some(false),
)
)]
#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -122,6 +123,7 @@ pub fn _freeze(
verbose: bool,
no_verbose: bool,
event_signature: Option<String>,
skip_empty: Option<bool>,
) -> PyResult<&PyAny> {
if let Some(command) = command {
freeze_command(py, command)
Expand Down Expand Up @@ -182,6 +184,7 @@ pub fn _freeze(
verbose,
no_verbose,
event_signature,
skip_empty,
};

pyo3_asyncio::tokio::future_into_py(py, async move {
Expand Down