Skip to content

Commit

Permalink
feat(query): support with clause in copy into (databendlabs#15343)
Browse files Browse the repository at this point in the history
* feat(query): support with clause in copy into

* feat(query): support with clause in copy into

* feat(query): support with clause in copy into
  • Loading branch information
sundy-li authored Apr 26, 2024
1 parent f26dba0 commit 88331f0
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 6 deletions.
9 changes: 9 additions & 0 deletions src/query/ast/src/ast/statements/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::ast::Hint;
use crate::ast::Identifier;
use crate::ast::Query;
use crate::ast::TableRef;
use crate::ast::With;

/// CopyIntoTableStmt is the parsed statement of `COPY into <table> from <location>`.
///
Expand All @@ -50,6 +51,7 @@ use crate::ast::TableRef;
/// ```
#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
pub struct CopyIntoTableStmt {
pub with: Option<With>,
pub src: CopyIntoTableSource,
pub dst: TableRef,
pub dst_columns: Option<Vec<Identifier>>,
Expand Down Expand Up @@ -137,6 +139,9 @@ impl CopyIntoTableStmt {

impl Display for CopyIntoTableStmt {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if let Some(cte) = &self.with {
write!(f, "WITH {} ", cte)?;
}
write!(f, "COPY")?;
if let Some(hints) = &self.hints {
write!(f, "{} ", hints)?;
Expand Down Expand Up @@ -189,6 +194,7 @@ impl Display for CopyIntoTableStmt {
/// CopyIntoLocationStmt is the parsed statement of `COPY into <location> from <table> ...`
#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
pub struct CopyIntoLocationStmt {
pub with: Option<With>,
pub hints: Option<Hint>,
pub src: CopyIntoLocationSource,
pub dst: FileLocation,
Expand All @@ -204,6 +210,9 @@ pub struct CopyIntoLocationStmt {

impl Display for CopyIntoLocationStmt {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if let Some(cte) = &self.with {
write!(f, "WITH {} ", cte)?;
}
write!(f, "COPY")?;
if let Some(hints) = &self.hints {
write!(f, "{} ", hints)?;
Expand Down
11 changes: 7 additions & 4 deletions src/query/ast/src/parser/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use nom::branch::alt;
use nom::combinator::map;

use super::query::with;
use crate::ast::CopyIntoLocationOption;
use crate::ast::CopyIntoLocationSource;
use crate::ast::CopyIntoLocationStmt;
Expand Down Expand Up @@ -50,14 +51,15 @@ pub fn copy_into_table(i: Input) -> IResult<Statement> {

map(
rule! {
COPY
#with? ~ COPY
~ #hint?
~ INTO ~ #table_ref ~ ( "(" ~ #comma_separated_list1(ident) ~ ")" )?
~ ^FROM ~ ^#copy_into_table_source
~ #copy_into_table_option*
},
|(_copy, opt_hints, _into, dst, dst_columns, _from, src, opts)| {
|(with, _copy, opt_hints, _into, dst, dst_columns, _from, src, opts)| {
let mut copy_stmt = CopyIntoTableStmt {
with,
hints: opt_hints,
src,
dst,
Expand Down Expand Up @@ -93,14 +95,15 @@ fn copy_into_location(i: Input) -> IResult<Statement> {

map(
rule! {
COPY
#with? ~ COPY
~ #hint?
~ INTO ~ #file_location
~ ^FROM ~ ^#copy_into_location_source
~ #copy_into_location_option*
},
|(_copy, opt_hints, _into, dst, _from, src, opts)| {
|(with, _copy, opt_hints, _into, dst, _from, src, opts)| {
let mut copy_stmt = CopyIntoLocationStmt {
with,
hints: opt_hints,
src,
dst,
Expand Down
20 changes: 20 additions & 0 deletions src/query/ast/tests/it/testdata/statement.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12185,6 +12185,7 @@ COPY INTO mytable FROM '@~/mybucket/my data.csv' SIZE_LIMIT = 10 PURGE = false F
---------- AST ------------
CopyIntoTable(
CopyIntoTableStmt {
with: None,
src: Location(
Stage(
"~/mybucket/my data.csv",
Expand Down Expand Up @@ -12237,6 +12238,7 @@ COPY INTO mytable FROM '@~/mybucket/data.csv' FILE_FORMAT = (field_delimiter = '
---------- AST ------------
CopyIntoTable(
CopyIntoTableStmt {
with: None,
src: Location(
Stage(
"~/mybucket/data.csv",
Expand Down Expand Up @@ -12303,6 +12305,7 @@ COPY INTO mytable FROM 's3://mybucket/data.csv' FILE_FORMAT = (field_delimiter =
---------- AST ------------
CopyIntoTable(
CopyIntoTableStmt {
with: None,
src: Location(
Uri(
UriLocation {
Expand Down Expand Up @@ -12378,6 +12381,7 @@ COPY INTO mytable FROM 's3://mybucket/data.csv' FILE_FORMAT = (field_delimiter =
---------- AST ------------
CopyIntoTable(
CopyIntoTableStmt {
with: None,
src: Location(
Uri(
UriLocation {
Expand Down Expand Up @@ -12455,6 +12459,7 @@ COPY INTO mytable FROM 's3://mybucket/data.csv' CONNECTION = ( endpoint_url = 'h
---------- AST ------------
CopyIntoTable(
CopyIntoTableStmt {
with: None,
src: Location(
Uri(
UriLocation {
Expand Down Expand Up @@ -12534,6 +12539,7 @@ COPY INTO mytable FROM 's3://mybucket/data.csv' CONNECTION = ( endpoint_url = 'h
---------- AST ------------
CopyIntoTable(
CopyIntoTableStmt {
with: None,
src: Location(
Uri(
UriLocation {
Expand Down Expand Up @@ -12603,6 +12609,7 @@ COPY INTO mytable FROM 'https://127.0.0.1:9900/' PURGE = false FORCE = false DIS
---------- AST ------------
CopyIntoTable(
CopyIntoTableStmt {
with: None,
src: Location(
Uri(
UriLocation {
Expand Down Expand Up @@ -12657,6 +12664,7 @@ COPY INTO mytable FROM 'https://127.0.0.1/' PURGE = false FORCE = false DISABLE_
---------- AST ------------
CopyIntoTable(
CopyIntoTableStmt {
with: None,
src: Location(
Uri(
UriLocation {
Expand Down Expand Up @@ -12719,6 +12727,7 @@ COPY INTO mytable FROM '@my_stage' FILE_FORMAT = (error_on_column_count_mismatch
---------- AST ------------
CopyIntoTable(
CopyIntoTableStmt {
with: None,
src: Location(
Stage(
"my_stage",
Expand Down Expand Up @@ -12786,6 +12795,7 @@ COPY INTO 's3://mybucket/data.csv' FROM mytable FILE_FORMAT = (field_delimiter =
---------- AST ------------
CopyIntoLocation(
CopyIntoLocationStmt {
with: None,
hints: None,
src: Table(
TableRef {
Expand Down Expand Up @@ -12844,6 +12854,7 @@ COPY INTO '@my_stage/my data' FROM mytable SINGLE = false MAX_FILE_SIZE = 0 DETA
---------- AST ------------
CopyIntoLocation(
CopyIntoLocationStmt {
with: None,
hints: None,
src: Table(
TableRef {
Expand Down Expand Up @@ -12886,6 +12897,7 @@ COPY INTO '@my_stage' FROM mytable FILE_FORMAT = (field_delimiter = ',', record_
---------- AST ------------
CopyIntoLocation(
CopyIntoLocationStmt {
with: None,
hints: None,
src: Table(
TableRef {
Expand Down Expand Up @@ -12946,6 +12958,7 @@ COPY INTO mytable FROM 's3://mybucket/data.csv' CONNECTION = ( aws_key_id = 'acc
---------- AST ------------
CopyIntoTable(
CopyIntoTableStmt {
with: None,
src: Location(
Uri(
UriLocation {
Expand Down Expand Up @@ -13023,6 +13036,7 @@ COPY INTO mytable FROM '@external_stage/path/to/file.csv' FILE_FORMAT = (field_d
---------- AST ------------
CopyIntoTable(
CopyIntoTableStmt {
with: None,
src: Location(
Stage(
"external_stage/path/to/file.csv",
Expand Down Expand Up @@ -13088,6 +13102,7 @@ COPY INTO mytable FROM '@external_stage/path/to/dir/' FILE_FORMAT = (field_delim
---------- AST ------------
CopyIntoTable(
CopyIntoTableStmt {
with: None,
src: Location(
Stage(
"external_stage/path/to/dir/",
Expand Down Expand Up @@ -13153,6 +13168,7 @@ COPY INTO mytable FROM '@external_stage/path/to/file.csv' FILE_FORMAT = (field_d
---------- AST ------------
CopyIntoTable(
CopyIntoTableStmt {
with: None,
src: Location(
Stage(
"external_stage/path/to/file.csv",
Expand Down Expand Up @@ -13219,6 +13235,7 @@ COPY INTO mytable FROM 'fs:///path/to/data.csv' FILE_FORMAT = (field_delimiter =
---------- AST ------------
CopyIntoTable(
CopyIntoTableStmt {
with: None,
src: Location(
Uri(
UriLocation {
Expand Down Expand Up @@ -13292,6 +13309,7 @@ COPY INTO books FROM 's3://databend/books.csv' CONNECTION = ( access_key_id = 'R
---------- AST ------------
CopyIntoTable(
CopyIntoTableStmt {
with: None,
src: Location(
Uri(
UriLocation {
Expand Down Expand Up @@ -18953,6 +18971,7 @@ CreatePipe(
auto_ingest: true,
comments: "This is test pipe 1",
copy_stmt: CopyIntoTableStmt {
with: None,
src: Location(
Stage(
"~/MyStage1",
Expand Down Expand Up @@ -19007,6 +19026,7 @@ CreatePipe(
auto_ingest: false,
comments: "",
copy_stmt: CopyIntoTableStmt {
with: None,
src: Location(
Stage(
"~/mybucket/data.csv",
Expand Down
3 changes: 3 additions & 0 deletions src/query/sql/src/planner/binder/copy_into_location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ impl<'a> Binder {
}
}
CopyIntoLocationSource::Query(query) => {
if let Some(with) = &stmt.with {
self.add_cte(with, bind_context)?;
}
let select_plan = self
.bind_statement(bind_context, &Statement::Query(query.clone()))
.await?;
Expand Down
4 changes: 4 additions & 0 deletions src/query/sql/src/planner/binder/copy_into_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ impl<'a> Binder {
.await
}
CopyIntoTableSource::Query(query) => {
if let Some(with) = &stmt.with {
self.add_cte(with, bind_context)?;
}

let mut max_column_position = MaxColumnPosition::new();
query.drive(&mut max_column_position);
self.metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,18 @@ statement ok
CREATE STAGE IF NOT EXISTS hello

statement ok
COPY INTO @hello from (select number from numbers(1)) FILE_FORMAT = (type = parquet)
COPY INTO @hello from (select number from numbers(10)) FILE_FORMAT = (type = parquet)

statement ok
COPY INTO @hello from (select number from numbers(1) where number in (select max(number) from numbers(1000)) ) FILE_FORMAT = (type = parquet)

statement ok
with S as (select number from numbers(1000) where number > 100) COPY INTO @hello from (select number from numbers(1) where number not in (SELECT number FROM S)) FILE_FORMAT = (type = parquet)

query I
select sum($1) from @hello;
----
45

statement ok
CREATE TABLE world(c1 int , c2 int);
Expand All @@ -55,7 +66,7 @@ statement ok
DROP STAGE IF EXISTS hello

statement ok
drop table world
drop table world

statement ok
DROP DATABASE db1
Expand Down

0 comments on commit 88331f0

Please sign in to comment.