Skip to content

Commit b131cac

Browse files
cj-zhukovSergey Zhukovalamb
authored
Consolidate custom data source examples (#18142) (#18553)
## Which issue does this PR close? This PR is for consolidating all the `custom_data_source` examples into a single example binary. We are agreed on the pattern and we can apply it to the remaining examples <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - part of ##18142. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Sergey Zhukov <[email protected]> Co-authored-by: Andrew Lamb <[email protected]>
1 parent 812bb35 commit b131cac

File tree

12 files changed

+210
-70
lines changed

12 files changed

+210
-70
lines changed

datafusion-examples/Cargo.toml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,6 @@ path = "examples/external_dependency/dataframe-to-s3.rs"
4343
name = "query_aws_s3"
4444
path = "examples/external_dependency/query-aws-s3.rs"
4545

46-
[[example]]
47-
name = "custom_file_casts"
48-
path = "examples/custom_file_casts.rs"
49-
5046
[dev-dependencies]
5147
arrow = { workspace = true }
5248
# arrow_schema is required for record_batch! macro :sad:

datafusion-examples/README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,18 +54,18 @@ cargo run --example dataframe
5454
- [`analyzer_rule.rs`](examples/analyzer_rule.rs): Use a custom AnalyzerRule to change a query's semantics (row level access control)
5555
- [`catalog.rs`](examples/catalog.rs): Register the table into a custom catalog
5656
- [`composed_extension_codec`](examples/composed_extension_codec.rs): Example of using multiple extension codecs for serialization / deserialization
57-
- [`csv_sql_streaming.rs`](examples/csv_sql_streaming.rs): Build and run a streaming query plan from a SQL statement against a local CSV file
58-
- [`csv_json_opener.rs`](examples/csv_json_opener.rs): Use low level `FileOpener` APIs to read CSV/JSON into Arrow `RecordBatch`es
59-
- [`custom_datasource.rs`](examples/custom_datasource.rs): Run queries against a custom datasource (TableProvider)
60-
- [`custom_file_casts.rs`](examples/custom_file_casts.rs): Implement custom casting rules to adapt file schemas
61-
- [`custom_file_format.rs`](examples/custom_file_format.rs): Write data to a custom file format
57+
- [`examples/custom_data_source/csv_sql_streaming.rs`](examples/custom_data_source/csv_sql_streaming.rs): Build and run a streaming query plan from a SQL statement against a local CSV file
58+
- [`examples/custom_data_source/csv_json_opener.rs`](examples/custom_data_source/csv_json_opener.rs): Use low level `FileOpener` APIs to read CSV/JSON into Arrow `RecordBatch`es
59+
- [`examples/custom_data_source/custom_datasource.rs`](examples/custom_data_source/custom_datasource.rs): Run queries against a custom datasource (TableProvider)
60+
- [`examples/custom_data_source/custom_file_casts.rs`](examples/custom_data_source/custom_file_casts.rs): Implement custom casting rules to adapt file schemas
61+
- [`examples/custom_data_source/custom_file_format.rs`](examples/custom_data_source/custom_file_format.rs): Write data to a custom file format
6262
- [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3 and writing back to s3
6363
- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame API against parquet files, csv files, and in-memory data, including multiple subqueries. Also demonstrates the various methods to write out a DataFrame to a table, parquet file, csv file, and json file.
6464
- [`examples/builtin_functions/date_time`](examples/builtin_functions/date_time.rs): Examples of date-time related functions and queries
6565
- [`default_column_values.rs`](examples/default_column_values.rs): Implement custom default value handling for missing columns using field metadata and PhysicalExprAdapter
6666
- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results (Arrow ArrayRefs) into Rust structs
6767
- [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify, analyze and coerce `Expr`s
68-
- [`file_stream_provider.rs`](examples/file_stream_provider.rs): Run a query on `FileStreamProvider` which implements `StreamProvider` for reading and writing to arbitrary stream sources / sinks.
68+
- [`examples/custom_data_source/file_stream_provider.rs`](examples/custom_data_source/file_stream_provider.rs): Run a query on `FileStreamProvider` which implements `StreamProvider` for reading and writing to arbitrary stream sources / sinks.
6969
- [`flight/sql_server.rs`](examples/flight/sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from Flight and and FlightSQL (e.g. JDBC) clients
7070
- [`examples/builtin_functions/function_factory.rs`](examples/builtin_functions/function_factory.rs): Register `CREATE FUNCTION` handler to implement SQL macros
7171
- [`memory_pool_tracking.rs`](examples/memory_pool_tracking.rs): Demonstrates TrackConsumersPool for memory tracking and debugging with enhanced error messages

datafusion-examples/examples/builtin_functions/main.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@
1919
//!
2020
//! These examples demonstrate miscellaneous function-related features.
2121
//!
22+
//! ## Usage
23+
//! ```bash
24+
//! cargo run --example builtin_functions -- [date_time|function_factory|regexp]
25+
//! ```
26+
//!
2227
//! Each subcommand runs a corresponding example:
2328
//! - `date_time` — examples of date-time related functions and queries
2429
//! - `function_factory` — register `CREATE FUNCTION` handler to implement SQL macros

datafusion-examples/examples/csv_json_opener.rs renamed to datafusion-examples/examples/custom_data_source/csv_json_opener.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,7 @@ use object_store::{local::LocalFileSystem, memory::InMemory, ObjectStore};
4040
/// read data from (CSV/JSON) into Arrow RecordBatches.
4141
///
4242
/// If you want to query data in CSV or JSON files, see the [`dataframe.rs`] and [`sql_query.rs`] examples
43-
#[tokio::main]
44-
async fn main() -> Result<()> {
43+
pub async fn csv_json_opener() -> Result<()> {
4544
csv_opener().await?;
4645
json_opener().await?;
4746
Ok(())

datafusion-examples/examples/csv_sql_streaming.rs renamed to datafusion-examples/examples/custom_data_source/csv_sql_streaming.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@ use datafusion::prelude::*;
2121

2222
/// This example demonstrates executing a simple query against an Arrow data source (CSV) and
2323
/// fetching results with streaming aggregation and streaming window
24-
#[tokio::main]
25-
async fn main() -> Result<()> {
24+
pub async fn csv_sql_streaming() -> Result<()> {
2625
// create local execution context
2726
let ctx = SessionContext::new();
2827

datafusion-examples/examples/custom_datasource.rs renamed to datafusion-examples/examples/custom_data_source/custom_datasource.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,7 @@ use datafusion::catalog::Session;
4242
use tokio::time::timeout;
4343

4444
/// This example demonstrates executing a simple query against a custom datasource
45-
#[tokio::main]
46-
async fn main() -> Result<()> {
45+
pub async fn custom_datasource() -> Result<()> {
4746
// create our custom datasource and adding some users
4847
let db = CustomDataSource::default();
4948
db.populate_users();

datafusion-examples/examples/custom_file_casts.rs renamed to datafusion-examples/examples/custom_data_source/custom_file_casts.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,7 @@ use object_store::{ObjectStore, PutPayload};
4444
// This example enforces that casts must be strictly widening: if the file type is Int64 and the table type is Int32, it will error
4545
// before even reading the data.
4646
// Without this custom cast rule DataFusion would happily do the narrowing cast, potentially erroring only if it found a row with data it could not cast.
47-
48-
#[tokio::main]
49-
async fn main() -> Result<()> {
47+
pub async fn custom_file_casts() -> Result<()> {
5048
println!("=== Creating example data ===");
5149

5250
// Create a logical / table schema with an Int32 column

datafusion-examples/examples/custom_file_format.rs renamed to datafusion-examples/examples/custom_data_source/custom_file_format.rs

Lines changed: 36 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,42 @@ use tempfile::tempdir;
4848
/// TSVFileFormatFactory is responsible for creating instances of TSVFileFormat.
4949
/// The former, once registered with the SessionState, will then be used
5050
/// to facilitate SQL operations on TSV files, such as `COPY TO` shown here.
51+
pub async fn custom_file_format() -> Result<()> {
52+
// Create a new context with the default configuration
53+
let mut state = SessionStateBuilder::new().with_default_features().build();
54+
55+
// Register the custom file format
56+
let file_format = Arc::new(TSVFileFactory::new());
57+
state.register_file_format(file_format, true)?;
58+
59+
// Create a new context with the custom file format
60+
let ctx = SessionContext::new_with_state(state);
61+
62+
let mem_table = create_mem_table();
63+
ctx.register_table("mem_table", mem_table)?;
64+
65+
let temp_dir = tempdir().unwrap();
66+
let table_save_path = temp_dir.path().join("mem_table.tsv");
67+
68+
let d = ctx
69+
.sql(&format!(
70+
"COPY mem_table TO '{}' STORED AS TSV;",
71+
table_save_path.display(),
72+
))
73+
.await?;
74+
75+
let results = d.collect().await?;
76+
println!(
77+
"Number of inserted rows: {:?}",
78+
(results[0]
79+
.column_by_name("count")
80+
.unwrap()
81+
.as_primitive::<UInt64Type>()
82+
.value(0))
83+
);
84+
85+
Ok(())
86+
}
5187

5288
#[derive(Debug)]
5389
/// Custom file format that reads and writes TSV files
@@ -181,44 +217,6 @@ impl GetExt for TSVFileFactory {
181217
}
182218
}
183219

184-
#[tokio::main]
185-
async fn main() -> Result<()> {
186-
// Create a new context with the default configuration
187-
let mut state = SessionStateBuilder::new().with_default_features().build();
188-
189-
// Register the custom file format
190-
let file_format = Arc::new(TSVFileFactory::new());
191-
state.register_file_format(file_format, true).unwrap();
192-
193-
// Create a new context with the custom file format
194-
let ctx = SessionContext::new_with_state(state);
195-
196-
let mem_table = create_mem_table();
197-
ctx.register_table("mem_table", mem_table).unwrap();
198-
199-
let temp_dir = tempdir().unwrap();
200-
let table_save_path = temp_dir.path().join("mem_table.tsv");
201-
202-
let d = ctx
203-
.sql(&format!(
204-
"COPY mem_table TO '{}' STORED AS TSV;",
205-
table_save_path.display(),
206-
))
207-
.await?;
208-
209-
let results = d.collect().await?;
210-
println!(
211-
"Number of inserted rows: {:?}",
212-
(results[0]
213-
.column_by_name("count")
214-
.unwrap()
215-
.as_primitive::<UInt64Type>()
216-
.value(0))
217-
);
218-
219-
Ok(())
220-
}
221-
222220
// create a simple mem table
223221
fn create_mem_table() -> Arc<MemTable> {
224222
let fields = vec![

datafusion-examples/examples/file_stream_provider.rs renamed to datafusion-examples/examples/custom_data_source/file_stream_provider.rs

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,29 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
/// Demonstrates how to use [`FileStreamProvider`] and [`StreamTable`] to stream data
19+
/// from a file-like source (FIFO) into DataFusion for continuous querying.
20+
///
21+
/// On non-Windows systems, this example creates a named pipe (FIFO) and
22+
/// writes rows into it asynchronously while DataFusion reads the data
23+
/// through a `FileStreamProvider`.
24+
///
25+
/// This illustrates how to integrate dynamically updated data sources
26+
/// with DataFusion without needing to reload the entire dataset each time.
27+
///
28+
/// This example does not work on Windows.
29+
pub async fn file_stream_provider() -> datafusion::error::Result<()> {
30+
#[cfg(target_os = "windows")]
31+
{
32+
println!("file_stream_provider example does not work on windows");
33+
Ok(())
34+
}
35+
#[cfg(not(target_os = "windows"))]
36+
{
37+
non_windows::main().await
38+
}
39+
}
40+
1841
#[cfg(not(target_os = "windows"))]
1942
mod non_windows {
2043
use datafusion::assert_batches_eq;
@@ -186,16 +209,3 @@ mod non_windows {
186209
Ok(())
187210
}
188211
}
189-
190-
#[tokio::main]
191-
async fn main() -> datafusion::error::Result<()> {
192-
#[cfg(target_os = "windows")]
193-
{
194-
println!("file_stream_provider example does not work on windows");
195-
Ok(())
196-
}
197-
#[cfg(not(target_os = "windows"))]
198-
{
199-
non_windows::main().await
200-
}
201-
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! # These examples are all related to extending or defining how DataFusion reads data
19+
//!
20+
//! These examples demonstrate how DataFusion reads data.
21+
//!
22+
//! ## Usage
23+
//! ```bash
24+
//! cargo run --example custom_data_source -- [csv_json_opener|csv_sql_streaming|custom_datasource|custom_file_casts|custom_file_format|file_stream_provider]
25+
//! ```
26+
//!
27+
//! Each subcommand runs a corresponding example:
28+
//! - `csv_json_opener` — use low level FileOpener APIs to read CSV/JSON into Arrow RecordBatches
29+
//! - `csv_sql_streaming` — build and run a streaming query plan from a SQL statement against a local CSV file
30+
//! - `custom_datasource` — run queries against a custom datasource (TableProvider)
31+
//! - `custom_file_casts` — implement custom casting rules to adapt file schemas
32+
//! - `custom_file_format` — write data to a custom file format
33+
//! - `file_stream_provider` — run a query on FileStreamProvider which implements StreamProvider for reading and writing to arbitrary stream sources/sinks
34+
35+
mod csv_json_opener;
36+
mod csv_sql_streaming;
37+
mod custom_datasource;
38+
mod custom_file_casts;
39+
mod custom_file_format;
40+
mod file_stream_provider;
41+
42+
use std::str::FromStr;
43+
44+
use datafusion::error::{DataFusionError, Result};
45+
46+
enum ExampleKind {
47+
CsvJsonOpener,
48+
CsvSqlStreaming,
49+
CustomDatasource,
50+
CustomFileCasts,
51+
CustomFileFormat,
52+
FileFtreamProvider,
53+
}
54+
55+
impl AsRef<str> for ExampleKind {
56+
fn as_ref(&self) -> &str {
57+
match self {
58+
Self::CsvJsonOpener => "csv_json_opener",
59+
Self::CsvSqlStreaming => "csv_sql_streaming",
60+
Self::CustomDatasource => "custom_datasource",
61+
Self::CustomFileCasts => "custom_file_casts",
62+
Self::CustomFileFormat => "custom_file_format",
63+
Self::FileFtreamProvider => "file_stream_provider",
64+
}
65+
}
66+
}
67+
68+
impl FromStr for ExampleKind {
69+
type Err = DataFusionError;
70+
71+
fn from_str(s: &str) -> Result<Self> {
72+
match s {
73+
"csv_json_opener" => Ok(Self::CsvJsonOpener),
74+
"csv_sql_streaming" => Ok(Self::CsvSqlStreaming),
75+
"custom_datasource" => Ok(Self::CustomDatasource),
76+
"custom_file_casts" => Ok(Self::CustomFileCasts),
77+
"custom_file_format" => Ok(Self::CustomFileFormat),
78+
"file_stream_provider" => Ok(Self::FileFtreamProvider),
79+
_ => Err(DataFusionError::Execution(format!("Unknown example: {s}"))),
80+
}
81+
}
82+
}
83+
84+
impl ExampleKind {
85+
const ALL: [Self; 6] = [
86+
Self::CsvJsonOpener,
87+
Self::CsvSqlStreaming,
88+
Self::CustomDatasource,
89+
Self::CustomFileCasts,
90+
Self::CustomFileFormat,
91+
Self::FileFtreamProvider,
92+
];
93+
94+
const EXAMPLE_NAME: &str = "custom_data_source";
95+
96+
fn variants() -> Vec<&'static str> {
97+
Self::ALL.iter().map(|x| x.as_ref()).collect()
98+
}
99+
}
100+
101+
#[tokio::main]
102+
async fn main() -> Result<()> {
103+
let usage = format!(
104+
"Usage: cargo run --example {} -- [{}]",
105+
ExampleKind::EXAMPLE_NAME,
106+
ExampleKind::variants().join("|")
107+
);
108+
109+
let arg = std::env::args().nth(1).ok_or_else(|| {
110+
eprintln!("{usage}");
111+
DataFusionError::Execution("Missing argument".to_string())
112+
})?;
113+
114+
match arg.parse::<ExampleKind>()? {
115+
ExampleKind::CsvJsonOpener => csv_json_opener::csv_json_opener().await?,
116+
ExampleKind::CsvSqlStreaming => csv_sql_streaming::csv_sql_streaming().await?,
117+
ExampleKind::CustomDatasource => custom_datasource::custom_datasource().await?,
118+
ExampleKind::CustomFileCasts => custom_file_casts::custom_file_casts().await?,
119+
ExampleKind::CustomFileFormat => custom_file_format::custom_file_format().await?,
120+
ExampleKind::FileFtreamProvider => {
121+
file_stream_provider::file_stream_provider().await?
122+
}
123+
}
124+
125+
Ok(())
126+
}

0 commit comments

Comments
 (0)