Skip to content

Commit

Permalink
Expose parser on DFParser to enable user controlled parsing (apache…
Browse files Browse the repository at this point in the history
…#9729)

* poc: custom parser

* play with extension statement

* tweak

* Revert "tweak"

This reverts commit e57006e.

* Revert "play with extension statement"

This reverts commit 86588e4.

* style: cargo fmt

* Update datafusion-examples/examples/sql_parsing.rs

Co-authored-by: Andrew Lamb <[email protected]>

* Apply suggestions from code review

Co-authored-by: Andrew Lamb <[email protected]>

* style: cargo cmt

* refactor: less nesting in parse statement

* docs: better example description

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
2 people authored and Lordworms committed Apr 1, 2024
1 parent 21c7254 commit 1c7a385
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 11 deletions.
21 changes: 11 additions & 10 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,36 +42,37 @@ cargo run --example csv_sql

## Single Process

- [`advanced_udaf.rs`](examples/advanced_udaf.rs): Define and invoke a more complicated User Defined Aggregate Function (UDAF)
- [`advanced_udf.rs`](examples/advanced_udf.rs): Define and invoke a more complicated User Defined Scalar Function (UDF)
- [`advanced_udwf.rs`](examples/advanced_udwf.rs): Define and invoke a more complicated User Defined Window Function (UDWF)
- [`avro_sql.rs`](examples/avro_sql.rs): Build and run a query plan from a SQL statement against a local AVRO file
- [`catalog.rs`](examples/catalog.rs): Register the table into a custom catalog
- [`csv_sql.rs`](examples/csv_sql.rs): Build and run a query plan from a SQL statement against a local CSV file
- [`csv_sql_streaming.rs`](examples/csv_sql_streaming.rs): Build and run a streaming query plan from a SQL statement against a local CSV file
- [`catalog.rs`](examples/catalog.rs): Register the table into a custom catalog
- [`custom_datasource.rs`](examples/custom_datasource.rs): Run queries against a custom datasource (TableProvider)
- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame against a local parquet file
- [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3 and writing back to s3
- [`dataframe_output.rs`](examples/dataframe_output.rs): Examples of methods which write data out from a DataFrame
- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame against a local parquet file
- [`dataframe_in_memory.rs`](examples/dataframe_in_memory.rs): Run a query using a DataFrame against data in memory
- [`dataframe_output.rs`](examples/dataframe_output.rs): Examples of methods which write data out from a DataFrame
- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results into rust structs using serde
- [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify and analyze `Expr`s
- [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from JDBC clients
- [`function_factory.rs`](examples/function_factory.rs): Register `CREATE FUNCTION` handler to implement SQL macros
- [`make_date.rs`](examples/make_date.rs): Examples of using the make_date function
- [`memtable.rs`](examples/memtable.rs): Create an query data in memory using SQL and `RecordBatch`es
- [`pruning.rs`](examples/parquet_sql.rs): Use pruning to rule out files based on statistics
- [`parquet_sql.rs`](examples/parquet_sql.rs): Build and run a query plan from a SQL statement against a local Parquet file
- [`parquet_sql_multiple_files.rs`](examples/parquet_sql_multiple_files.rs): Build and run a query plan from a SQL statement against multiple local Parquet files
- [`pruning.rs`](examples/parquet_sql.rs): Use pruning to rule out files based on statistics
- [`query-aws-s3.rs`](examples/external_dependency/query-aws-s3.rs): Configure `object_store` and run a query against files stored in AWS S3
- [`query-http-csv.rs`](examples/query-http-csv.rs): Configure `object_store` and run a query against files vi HTTP
- [`regexp.rs`](examples/regexp.rs): Examples of using regular expression functions
- [`rewrite_expr.rs`](examples/rewrite_expr.rs): Define and invoke a custom Query Optimizer pass
- [`to_char.rs`](examples/to_char.rs): Examples of using the to_char function
- [`to_timestamp.rs`](examples/to_timestamp.rs): Examples of using to_timestamp functions
- [`simple_udf.rs`](examples/simple_udf.rs): Define and invoke a User Defined Scalar Function (UDF)
- [`advanced_udf.rs`](examples/advanced_udf.rs): Define and invoke a more complicated User Defined Scalar Function (UDF)
- [`simple_udaf.rs`](examples/simple_udaf.rs): Define and invoke a User Defined Aggregate Function (UDAF)
- [`advanced_udaf.rs`](examples/advanced_udaf.rs): Define and invoke a more complicated User Defined Aggregate Function (UDAF)
- [`simple_udf.rs`](examples/simple_udf.rs): Define and invoke a User Defined Scalar Function (UDF)
- [`simple_udfw.rs`](examples/simple_udwf.rs): Define and invoke a User Defined Window Function (UDWF)
- [`advanced_udwf.rs`](examples/advanced_udwf.rs): Define and invoke a more complicated User Defined Window Function (UDWF)
- [`sql_dialect.rs`](examples/sql_dialect.rs): Example of implementing a custom SQL dialect on top of `DFParser`
- [`to_char.rs`](examples/to_char.rs): Examples of using the to_char function
- [`to_timestamp.rs`](examples/to_timestamp.rs): Examples of using to_timestamp functions

## Distributed

Expand Down
134 changes: 134 additions & 0 deletions datafusion-examples/examples/sql_dialect.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::fmt::Display;

use datafusion::error::Result;
use datafusion_sql::{
parser::{CopyToSource, CopyToStatement, DFParser, Statement},
sqlparser::{keywords::Keyword, parser::ParserError, tokenizer::Token},
};

/// This example demonstrates how to use the DFParser to parse a statement in a custom way
///
/// This technique can be used to implement a custom SQL dialect, for example.
#[tokio::main]
async fn main() -> Result<()> {
let mut my_parser =
MyParser::new("COPY source_table TO 'file.fasta' STORED AS FASTA")?;

let my_statement = my_parser.parse_statement()?;

match my_statement {
MyStatement::DFStatement(s) => println!("df: {}", s),
MyStatement::MyCopyTo(s) => println!("my_copy: {}", s),
}

Ok(())
}

/// Here we define a Parser for our new SQL dialect that wraps the existing `DFParser`
struct MyParser<'a> {
df_parser: DFParser<'a>,
}

impl MyParser<'_> {
fn new(sql: &str) -> Result<Self> {
let df_parser = DFParser::new(sql)?;
Ok(Self { df_parser })
}

/// Returns true if the next token is `COPY` keyword, false otherwise
fn is_copy(&self) -> bool {
matches!(
self.df_parser.parser.peek_token().token,
Token::Word(w) if w.keyword == Keyword::COPY
)
}

/// This is the entry point to our parser -- it handles `COPY` statements specially
/// but otherwise delegates to the existing DataFusion parser.
pub fn parse_statement(&mut self) -> Result<MyStatement, ParserError> {
if self.is_copy() {
self.df_parser.parser.next_token(); // COPY
let df_statement = self.df_parser.parse_copy()?;

if let Statement::CopyTo(s) = df_statement {
Ok(MyStatement::from(s))
} else {
Ok(MyStatement::DFStatement(Box::from(df_statement)))
}
} else {
let df_statement = self.df_parser.parse_statement()?;
Ok(MyStatement::from(df_statement))
}
}
}

enum MyStatement {
DFStatement(Box<Statement>),
MyCopyTo(MyCopyToStatement),
}

impl Display for MyStatement {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
MyStatement::DFStatement(s) => write!(f, "{}", s),
MyStatement::MyCopyTo(s) => write!(f, "{}", s),
}
}
}

impl From<Statement> for MyStatement {
fn from(s: Statement) -> Self {
Self::DFStatement(Box::from(s))
}
}

impl From<CopyToStatement> for MyStatement {
fn from(s: CopyToStatement) -> Self {
if s.stored_as == Some("FASTA".to_string()) {
Self::MyCopyTo(MyCopyToStatement::from(s))
} else {
Self::DFStatement(Box::from(Statement::CopyTo(s)))
}
}
}

struct MyCopyToStatement {
pub source: CopyToSource,
pub target: String,
}

impl From<CopyToStatement> for MyCopyToStatement {
fn from(s: CopyToStatement) -> Self {
Self {
source: s.source,
target: s.target,
}
}
}

impl Display for MyCopyToStatement {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"COPY {} TO '{}' STORED AS FASTA",
self.source, self.target
)
}
}
2 changes: 1 addition & 1 deletion datafusion/sql/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ fn ensure_not_set<T>(field: &Option<T>, name: &str) -> Result<(), ParserError> {
/// `CREATE EXTERNAL TABLE` have special syntax in DataFusion. See
/// [`Statement`] for a list of this special syntax
pub struct DFParser<'a> {
parser: Parser<'a>,
pub parser: Parser<'a>,
}

impl<'a> DFParser<'a> {
Expand Down

0 comments on commit 1c7a385

Please sign in to comment.