Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidate dataframe example #13410

Merged
merged 4 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ cargo run --example dataframe
- [`custom_datasource.rs`](examples/custom_datasource.rs): Run queries against a custom datasource (TableProvider)
- [`custom_file_format.rs`](examples/custom_file_format.rs): Write data to a custom file format
- [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3 and writing back to s3
- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame against a local parquet file
- [`dataframe_in_memory.rs`](examples/dataframe_in_memory.rs): Run a query using a DataFrame against data in memory
- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame API against parquet files, csv files, and in-memory data
- [`dataframe_output.rs`](examples/dataframe_output.rs): Examples of methods which write data out from a DataFrame
- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results into rust structs using serde
- [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify, analyze and coerce `Expr`s
Expand Down
146 changes: 85 additions & 61 deletions datafusion-examples/examples/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,90 +15,82 @@
// specific language governing permissions and limitations
// under the License.

use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::error::Result;
use datafusion::prelude::*;
use std::fs::File;
use std::io::Write;
use std::sync::Arc;
use tempfile::tempdir;

/// This example demonstrates executing a simple query against an Arrow data source (Parquet) and
/// fetching results, using the DataFrame trait
/// This example demonstrates using DataFusion's DataFrame API to
///
/// * [read_parquet]: execute queries against parquet files
/// * [read_csv]: execute queries against csv files
/// * [read_memory]: execute queries against in-memory arrow data
#[tokio::main]
async fn main() -> Result<()> {
// create local execution context
// The SessionContext is the main high level API for interacting with DataFusion
let ctx = SessionContext::new();
read_parquet(&ctx).await?;
read_csv(&ctx).await?;
read_memory(&ctx).await?;
Ok(())
}

/// Use DataFrame API to
/// 1. Read parquet files,
/// 2. Show the schema
/// 3. Select columns and rows
async fn read_parquet(ctx: &SessionContext) -> Result<()> {
// Find the local path of "alltypes_plain.parquet"
let testdata = datafusion::test_util::parquet_test_data();

let filename = &format!("{testdata}/alltypes_plain.parquet");

// define the query using the DataFrame trait
let df = ctx
.read_parquet(filename, ParquetReadOptions::default())
.await?
.select_columns(&["id", "bool_col", "timestamp_col"])?
.filter(col("id").gt(lit(1)))?;

// print the results
df.show().await?;

// create a csv file waiting to be written
let dir = tempdir()?;
let file_path = dir.path().join("example.csv");
let file = File::create(&file_path)?;
write_csv_file(file);

// Reading CSV file with inferred schema example
let csv_df =
example_read_csv_file_with_inferred_schema(file_path.to_str().unwrap()).await;
csv_df.show().await?;

// Reading CSV file with defined schema
let csv_df = example_read_csv_file_with_schema(file_path.to_str().unwrap()).await;
csv_df.show().await?;

// Reading PARQUET file and print describe
// Read the parquet files and show its schema using 'describe'
let parquet_df = ctx
.read_parquet(filename, ParquetReadOptions::default())
.await?;
parquet_df.describe().await.unwrap().show().await?;

let dyn_ctx = ctx.enable_url_table();
let df = dyn_ctx
.sql(&format!("SELECT * FROM '{}'", file_path.to_str().unwrap()))
// show its schema using 'describe'
parquet_df.clone().describe().await?.show().await?;

// Select three columns and filter the results
// so that only rows where id > 1 are returned
parquet_df
.select_columns(&["id", "bool_col", "timestamp_col"])?
.filter(col("id").gt(lit(1)))?
.show()
.await?;
df.show().await?;

Ok(())
}

// Function to create an test CSV file
fn write_csv_file(mut file: File) {
// Create the data to put into the csv file with headers
let content = r#"id,time,vote,unixtime,rating
a1,"10 6, 2013",3,1381017600,5.0
a2,"08 9, 2013",2,1376006400,4.5"#;
// write the data
file.write_all(content.as_ref())
.expect("Problem with writing file!");
}
/// Use the DataFrame API to
/// 1. Read CSV files
/// 2. Optionally specify schema
async fn read_csv(ctx: &SessionContext) -> Result<()> {
// create example.csv file in a temporary directory
let dir = tempdir()?;
let file_path = dir.path().join("example.csv");
{
let mut file = File::create(&file_path)?;
// write CSV data
file.write_all(
r#"id,time,vote,unixtime,rating
a1,"10 6, 2013",3,1381017600,5.0
a2,"08 9, 2013",2,1376006400,4.5"#
.as_bytes(),
)?;
} // scope closes the file
let file_path = file_path.to_str().unwrap();

// Example to read data from a csv file with inferred schema
async fn example_read_csv_file_with_inferred_schema(file_path: &str) -> DataFrame {
// Create a session context
let ctx = SessionContext::new();
// Register a lazy DataFrame using the context
ctx.read_csv(file_path, CsvReadOptions::default())
.await
.unwrap()
}
// You can read a CSV file and DataFusion will infer the schema automatically
let csv_df = ctx.read_csv(file_path, CsvReadOptions::default()).await?;
csv_df.show().await?;

// Example to read csv file with a defined schema for the csv file
async fn example_read_csv_file_with_schema(file_path: &str) -> DataFrame {
// Create a session context
let ctx = SessionContext::new();
// Define the schema
// If you know the types of your data you can specify them explicitly
let schema = Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("time", DataType::Utf8, false),
Expand All @@ -112,6 +104,38 @@ async fn example_read_csv_file_with_schema(file_path: &str) -> DataFrame {
schema: Some(&schema),
..Default::default()
};
// Register a lazy DataFrame by using the context and option provider
ctx.read_csv(file_path, csv_read_option).await.unwrap()
let csv_df = ctx.read_csv(file_path, csv_read_option).await?;
csv_df.show().await?;

// You can also create DataFrames from the result of sql queries
// and using the `enable_url_table` refer to local files directly
let dyn_ctx = ctx.clone().enable_url_table();
let csv_df = dyn_ctx
.sql(&format!("SELECT rating, unixtime FROM '{}'", file_path))
.await?;
csv_df.show().await?;

Ok(())
}

/// Use the DataFrame API to:
/// 1. Read in-memory data.
async fn read_memory(ctx: &SessionContext) -> Result<()> {
// define data in memory
let a: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c", "d"]));
let b: ArrayRef = Arc::new(Int32Array::from(vec![1, 10, 10, 100]));
let batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b)])?;

// declare a table in memory. In Apache Spark API, this corresponds to createDataFrame(...).
ctx.register_batch("t", batch)?;
let df = ctx.table("t").await?;

// construct an expression corresponding to "SELECT a, b FROM t WHERE b = 10" in SQL
let filter = col("b").eq(lit(10));
let df = df.select_columns(&["a", "b"])?.filter(filter)?;

// print the results
df.show().await?;

Ok(())
}
60 changes: 0 additions & 60 deletions datafusion-examples/examples/dataframe_in_memory.rs

This file was deleted.