You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hi everyone, I'm new to Rust, Parquet and this library, but I managed to hack together this function using Datafusion 15.0.0 which takes a list of parquet files on S3 and can pass batches to a callback function as the plan is executed:
pubasyncfnquery_files(bucket:&String,files:Vec<String>,query:String,callback:Option<Arc<Box<dynFn(Vec<Map<String,Value>>) -> () + Send + Sync>>>,) -> Result<Vec<Map<String,Value>>>{// Setup datafusion session and add S3 object storelet ctx = SessionContext::new();let prefix = "s3";let table_name = "data_table";// Get AWS region and credentials from environment variableslet s3 = AmazonS3Builder::new().with_bucket_name(bucket).with_region(env::var("AWS_REGION").expect("AWS_REGION is not set")).with_access_key_id(env::var("AWS_ACCESS_KEY_ID").expect("AWS_ACCESS_KEY_ID is not set")).with_secret_access_key(
env::var("AWS_SECRET_ACCESS_KEY").expect("AWS_SECRET_ACCESS_KEY is not set"),).build()?;
ctx.runtime_env().register_object_store(prefix, bucket,Arc::new(s3));// Load Parquet files into datafusionletmut file_paths:Vec<ListingTableUrl> = Vec::new();for file in files {
file_paths
.push(ListingTableUrl::parse(format!("{}://{}/{}", prefix, bucket, file)).unwrap());}// Configure listing and table optionslet file_format = ParquetFormat::new(ctx.config_options()).with_enable_pruning(Some(true));let listing_options =
ListingOptions::new(Arc::new(file_format)).with_file_extension(FileType::PARQUET.get_ext());let schema = listing_options
.infer_schema(&ctx.state(),&file_paths[0]).await?;let table_config = ListingTableConfig::new_with_multi_paths(file_paths).with_listing_options(listing_options).with_schema(schema);let table = ListingTable::try_new(table_config)?;// Register the table
ctx.register_table(table_name,Arc::new(table))?;// Execute the query and stream results back via callback function if provided// Otherwise collect results and return them when query is completelet df = ctx.sql(&query).await?;letmut rows:Vec<Map<String,Value>> = Vec::<Map<String,Value>>::new();let streams = df.execute_stream_partitioned().await?;for stream in streams {let batch = common::collect(stream).await?;let results = record_batches_to_json_rows(&batch)?;
rows.append(&mut results.clone());ifletSome(callback) = callback.clone(){if results.len() > 0{// Only call callback function if there are results to processcallback(results);}}}Ok(rows)}
Question 1: How is this code running behind the scenes?
Based on watching some presentations and trying to read through the docs/source code can you confirm that the out of the box library does the following to ensure its getting as much performance as possible:
The library is only reading the required chunks from the parquet files on S3 to memory and not downloading the entire file
When it infers the schema of the first file, it only gets the parquet metadata chunk for that file from S3 to read the columns available
For each file, it first reads the metadata chunk and uses the metadata to automatically skip over files which don't include a particular value or column based on the column statistics?
In parallel it creates a Stream per partition of the physical plan, and using all available CPU cores on the machine, it will then run the query by getting only the required chunks from S3 in parallel, and then process these chunks as they stream in from S3 in memory?
Is that generally correct or am I misunderstanding anything on what the library actually does behind the scenes?
Question 2: Parquet schemas
Right now the code takes the schema of the first parquet file and uses that for the table. What happens if other parquet files have additional or different columns?
How can you address the scenario that the column you're querying on is in the total file set, but potentially not in the first file? Is there a way to merge the schemas across all the files quickly or update the schema as the files' metadata are read to ensure all columns across the files are available to query?
Question 3: Code optimisations
Some general questions to help improve the code:
I want to pass a list of individual S3 paths, but could only figure out how to do that by creating a custom ListingTableConfig which is quite verbose. Is there a way to pass in an array of S3 paths without having to write all that code or is that the only way currently?
How does it calculate partitions? Is there a way to modify this so my callback function can be called as soon as the first chunk is processed?
Is there a way to stream rows to the callback function as soon as they're found while executing the query? I.e. As soon as a row is found, I can pass it to the callback function in my code to process?
Any other code optimisations you would do if you were trying to query a large amount of Parquet files in S3 and process the results as quickly as possible?
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
Hi everyone, I'm new to Rust, Parquet and this library, but I managed to hack together this function using Datafusion 15.0.0 which takes a list of parquet files on S3 and can pass batches to a callback function as the plan is executed:
Question 1: How is this code running behind the scenes?
Based on watching some presentations and trying to read through the docs/source code can you confirm that the out of the box library does the following to ensure its getting as much performance as possible:
Is that generally correct or am I misunderstanding anything on what the library actually does behind the scenes?
Question 2: Parquet schemas
Right now the code takes the schema of the first parquet file and uses that for the table. What happens if other parquet files have additional or different columns?
How can you address the scenario that the column you're querying on is in the total file set, but potentially not in the first file? Is there a way to merge the schemas across all the files quickly or update the schema as the files' metadata are read to ensure all columns across the files are available to query?
Question 3: Code optimisations
Some general questions to help improve the code:
Beta Was this translation helpful? Give feedback.
All reactions