diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index 75fcaddf8def..528e7dd857e5 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -57,8 +57,7 @@ cargo run --example dataframe - [`custom_datasource.rs`](examples/custom_datasource.rs): Run queries against a custom datasource (TableProvider) - [`custom_file_format.rs`](examples/custom_file_format.rs): Write data to a custom file format - [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3 and writing back to s3 -- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame against a local parquet file -- [`dataframe_in_memory.rs`](examples/dataframe_in_memory.rs): Run a query using a DataFrame against data in memory +- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame API against parquet files, csv files, and in-memory data - [`dataframe_output.rs`](examples/dataframe_output.rs): Examples of methods which write data out from a DataFrame - [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results into rust structs using serde - [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify, analyze and coerce `Expr`s diff --git a/datafusion-examples/examples/dataframe.rs b/datafusion-examples/examples/dataframe.rs index d7e0068ef88f..59766e881e8b 100644 --- a/datafusion-examples/examples/dataframe.rs +++ b/datafusion-examples/examples/dataframe.rs @@ -15,90 +15,82 @@ // specific language governing permissions and limitations // under the License. +use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray}; use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::error::Result; use datafusion::prelude::*; use std::fs::File; use std::io::Write; +use std::sync::Arc; use tempfile::tempdir; -/// This example demonstrates executing a simple query against an Arrow data source (Parquet) and -/// fetching results, using the DataFrame trait +/// This example demonstrates using DataFusion's DataFrame API to +/// +/// * [read_parquet]: execute queries against parquet files +/// * [read_csv]: execute queries against csv files +/// * [read_memory]: execute queries against in-memory arrow data #[tokio::main] async fn main() -> Result<()> { - // create local execution context + // The SessionContext is the main high level API for interacting with DataFusion let ctx = SessionContext::new(); + read_parquet(&ctx).await?; + read_csv(&ctx).await?; + read_memory(&ctx).await?; + Ok(()) +} +/// Use DataFrame API to +/// 1. Read parquet files, +/// 2. Show the schema +/// 3. Select columns and rows +async fn read_parquet(ctx: &SessionContext) -> Result<()> { + // Find the local path of "alltypes_plain.parquet" let testdata = datafusion::test_util::parquet_test_data(); - let filename = &format!("{testdata}/alltypes_plain.parquet"); - // define the query using the DataFrame trait - let df = ctx - .read_parquet(filename, ParquetReadOptions::default()) - .await? - .select_columns(&["id", "bool_col", "timestamp_col"])? - .filter(col("id").gt(lit(1)))?; - - // print the results - df.show().await?; - - // create a csv file waiting to be written - let dir = tempdir()?; - let file_path = dir.path().join("example.csv"); - let file = File::create(&file_path)?; - write_csv_file(file); - - // Reading CSV file with inferred schema example - let csv_df = - example_read_csv_file_with_inferred_schema(file_path.to_str().unwrap()).await; - csv_df.show().await?; - - // Reading CSV file with defined schema - let csv_df = example_read_csv_file_with_schema(file_path.to_str().unwrap()).await; - csv_df.show().await?; - - // Reading PARQUET file and print describe + // Read the parquet files and show its schema using 'describe' let parquet_df = ctx .read_parquet(filename, ParquetReadOptions::default()) .await?; - parquet_df.describe().await.unwrap().show().await?; - let dyn_ctx = ctx.enable_url_table(); - let df = dyn_ctx - .sql(&format!("SELECT * FROM '{}'", file_path.to_str().unwrap())) + // show its schema using 'describe' + parquet_df.clone().describe().await?.show().await?; + + // Select three columns and filter the results + // so that only rows where id > 1 are returned + parquet_df + .select_columns(&["id", "bool_col", "timestamp_col"])? + .filter(col("id").gt(lit(1)))? + .show() .await?; - df.show().await?; Ok(()) } -// Function to create an test CSV file -fn write_csv_file(mut file: File) { - // Create the data to put into the csv file with headers - let content = r#"id,time,vote,unixtime,rating -a1,"10 6, 2013",3,1381017600,5.0 -a2,"08 9, 2013",2,1376006400,4.5"#; - // write the data - file.write_all(content.as_ref()) - .expect("Problem with writing file!"); -} +/// Use the DataFrame API to +/// 1. Read CSV files +/// 2. Optionally specify schema +async fn read_csv(ctx: &SessionContext) -> Result<()> { + // create example.csv file in a temporary directory + let dir = tempdir()?; + let file_path = dir.path().join("example.csv"); + { + let mut file = File::create(&file_path)?; + // write CSV data + file.write_all( + r#"id,time,vote,unixtime,rating + a1,"10 6, 2013",3,1381017600,5.0 + a2,"08 9, 2013",2,1376006400,4.5"# + .as_bytes(), + )?; + } // scope closes the file + let file_path = file_path.to_str().unwrap(); -// Example to read data from a csv file with inferred schema -async fn example_read_csv_file_with_inferred_schema(file_path: &str) -> DataFrame { - // Create a session context - let ctx = SessionContext::new(); - // Register a lazy DataFrame using the context - ctx.read_csv(file_path, CsvReadOptions::default()) - .await - .unwrap() -} + // You can read a CSV file and DataFusion will infer the schema automatically + let csv_df = ctx.read_csv(file_path, CsvReadOptions::default()).await?; + csv_df.show().await?; -// Example to read csv file with a defined schema for the csv file -async fn example_read_csv_file_with_schema(file_path: &str) -> DataFrame { - // Create a session context - let ctx = SessionContext::new(); - // Define the schema + // If you know the types of your data you can specify them explicitly let schema = Schema::new(vec![ Field::new("id", DataType::Utf8, false), Field::new("time", DataType::Utf8, false), @@ -112,6 +104,38 @@ async fn example_read_csv_file_with_schema(file_path: &str) -> DataFrame { schema: Some(&schema), ..Default::default() }; - // Register a lazy DataFrame by using the context and option provider - ctx.read_csv(file_path, csv_read_option).await.unwrap() + let csv_df = ctx.read_csv(file_path, csv_read_option).await?; + csv_df.show().await?; + + // You can also create DataFrames from the result of sql queries + // and using the `enable_url_table` refer to local files directly + let dyn_ctx = ctx.clone().enable_url_table(); + let csv_df = dyn_ctx + .sql(&format!("SELECT rating, unixtime FROM '{}'", file_path)) + .await?; + csv_df.show().await?; + + Ok(()) +} + +/// Use the DataFrame API to: +/// 1. Read in-memory data. +async fn read_memory(ctx: &SessionContext) -> Result<()> { + // define data in memory + let a: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])); + let b: ArrayRef = Arc::new(Int32Array::from(vec![1, 10, 10, 100])); + let batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b)])?; + + // declare a table in memory. In Apache Spark API, this corresponds to createDataFrame(...). + ctx.register_batch("t", batch)?; + let df = ctx.table("t").await?; + + // construct an expression corresponding to "SELECT a, b FROM t WHERE b = 10" in SQL + let filter = col("b").eq(lit(10)); + let df = df.select_columns(&["a", "b"])?.filter(filter)?; + + // print the results + df.show().await?; + + Ok(()) } diff --git a/datafusion-examples/examples/dataframe_in_memory.rs b/datafusion-examples/examples/dataframe_in_memory.rs deleted file mode 100644 index c57c38870a7e..000000000000 --- a/datafusion-examples/examples/dataframe_in_memory.rs +++ /dev/null @@ -1,60 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::sync::Arc; - -use datafusion::arrow::array::{Int32Array, StringArray}; -use datafusion::arrow::datatypes::{DataType, Field, Schema}; -use datafusion::arrow::record_batch::RecordBatch; -use datafusion::error::Result; -use datafusion::prelude::*; - -/// This example demonstrates how to use the DataFrame API against in-memory data. -#[tokio::main] -async fn main() -> Result<()> { - // define a schema. - let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Utf8, false), - Field::new("b", DataType::Int32, false), - ])); - - // define data. - let batch = RecordBatch::try_new( - schema, - vec![ - Arc::new(StringArray::from(vec!["a", "b", "c", "d"])), - Arc::new(Int32Array::from(vec![1, 10, 10, 100])), - ], - )?; - - // declare a new context. In spark API, this corresponds to a new spark SQLsession - let ctx = SessionContext::new(); - - // declare a table in memory. In spark API, this corresponds to createDataFrame(...). - ctx.register_batch("t", batch)?; - let df = ctx.table("t").await?; - - // construct an expression corresponding to "SELECT a, b FROM t WHERE b = 10" in SQL - let filter = col("b").eq(lit(10)); - - let df = df.select_columns(&["a", "b"])?.filter(filter)?; - - // print the results - df.show().await?; - - Ok(()) -}