From 945d564e41bac517be4e8f41fa0533ac1b4e930c Mon Sep 17 00:00:00 2001 From: irenjj Date: Fri, 10 May 2024 20:48:40 +0800 Subject: [PATCH 01/18] feat: limit total rows copied in COPY TABLE FROM with LIMIT segment --- src/operator/src/statement.rs | 2 ++ src/operator/src/statement/copy_database.rs | 2 ++ src/operator/src/statement/copy_table_from.rs | 13 ++++++---- src/sql/src/parsers/copy_parser.rs | 24 ++++++++++++++----- src/sql/src/statements/copy.rs | 1 + src/table/src/requests.rs | 1 + .../common/copy/copy_from_fs_parquet.result | 20 ++++++++++++++++ .../common/copy/copy_from_fs_parquet.sql | 8 +++++++ 8 files changed, 60 insertions(+), 11 deletions(-) diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index 4dadffd1240d..d622485a2bf2 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -302,6 +302,7 @@ fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result< connection, with, table_name, + limit, .. } = match stmt { CopyTable::To(arg) => arg, @@ -328,6 +329,7 @@ fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result< pattern, direction, timestamp_range, + limit, }) } diff --git a/src/operator/src/statement/copy_database.rs b/src/operator/src/statement/copy_database.rs index fac6d77a6c15..daed3379aedc 100644 --- a/src/operator/src/statement/copy_database.rs +++ b/src/operator/src/statement/copy_database.rs @@ -90,6 +90,7 @@ impl StatementExecutor { pattern: None, direction: CopyDirection::Export, timestamp_range: req.time_range, + limit: None, }, ctx.clone(), ) @@ -155,6 +156,7 @@ impl StatementExecutor { pattern: None, direction: CopyDirection::Import, timestamp_range: None, + limit: None, }; debug!("Copy table, arg: {:?}", req); match self.copy_table_from(req, ctx.clone()).await { diff --git a/src/operator/src/statement/copy_table_from.rs b/src/operator/src/statement/copy_table_from.rs index aa4e2343a38a..374df8380949 100644 --- a/src/operator/src/statement/copy_table_from.rs +++ b/src/operator/src/statement/copy_table_from.rs @@ -379,11 +379,14 @@ impl StatementExecutor { let mut rows_inserted = 0; let mut insert_cost = 0; - let max_insert_rows = req - .with - .get(MAX_INSERT_ROWS) - .and_then(|val| val.parse::().ok()) - .unwrap_or(DEFAULT_MAX_INSERT_ROWS); + let max_insert_rows = if let Some(num) = req.limit { + num as usize + } else { + req.with + .get(MAX_INSERT_ROWS) + .and_then(|val| val.parse::().ok()) + .unwrap_or(DEFAULT_MAX_INSERT_ROWS) + }; for (compat_schema, file_schema_projection, projected_table_schema, file_metadata) in files { let mut stream = self diff --git a/src/sql/src/parsers/copy_parser.rs b/src/sql/src/parsers/copy_parser.rs index b9e9be85f732..ef05177a4386 100644 --- a/src/sql/src/parsers/copy_parser.rs +++ b/src/sql/src/parsers/copy_parser.rs @@ -56,7 +56,7 @@ impl<'a> ParserContext<'a> { })?; let req = if self.parser.parse_keyword(Keyword::TO) { - let (with, connection, location) = self.parse_copy_parameters()?; + let (with, connection, location, _) = self.parse_copy_parameters()?; let argument = CopyDatabaseArgument { database_name, with: with.into(), @@ -68,7 +68,7 @@ impl<'a> ParserContext<'a> { self.parser .expect_keyword(Keyword::FROM) .context(error::SyntaxSnafu)?; - let (with, connection, location) = self.parse_copy_parameters()?; + let (with, connection, location, _) = self.parse_copy_parameters()?; let argument = CopyDatabaseArgument { database_name, with: with.into(), @@ -91,28 +91,30 @@ impl<'a> ParserContext<'a> { let table_name = Self::canonicalize_object_name(raw_table_name); if self.parser.parse_keyword(Keyword::TO) { - let (with, connection, location) = self.parse_copy_parameters()?; + let (with, connection, location, limit) = self.parse_copy_parameters()?; Ok(CopyTable::To(CopyTableArgument { table_name, with: with.into(), connection: connection.into(), location, + limit, })) } else { self.parser .expect_keyword(Keyword::FROM) .context(error::SyntaxSnafu)?; - let (with, connection, location) = self.parse_copy_parameters()?; + let (with, connection, location, limit) = self.parse_copy_parameters()?; Ok(CopyTable::From(CopyTableArgument { table_name, with: with.into(), connection: connection.into(), location, + limit, })) } } - fn parse_copy_parameters(&mut self) -> Result<(With, Connection, String)> { + fn parse_copy_parameters(&mut self) -> Result<(With, Connection, String, Option)> { let location = self.parser .parse_literal_string() @@ -142,7 +144,17 @@ impl<'a> ParserContext<'a> { .map(parse_option_string) .collect::>()?; - Ok((with, connection, location)) + let limit = if self.parser.parse_keyword(Keyword::LIMIT) { + Some(self.parser.parse_literal_uint().with_context(|_| error::UnexpectedSnafu { + sql: self.sql, + expected: "maximum rows", + actual: self.peek_token_as_string(), + })?) + } else { + None + }; + + Ok((with, connection, location, limit)) } } diff --git a/src/sql/src/statements/copy.rs b/src/sql/src/statements/copy.rs index e99727f89a3f..c68b9d8c0321 100644 --- a/src/sql/src/statements/copy.rs +++ b/src/sql/src/statements/copy.rs @@ -111,6 +111,7 @@ pub struct CopyTableArgument { pub connection: OptionMap, /// Copy tbl [To|From] 'location'. pub location: String, + pub limit: Option, } #[cfg(test)] diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 4ddea65aea0f..10182baeb463 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -228,6 +228,7 @@ pub struct CopyTableRequest { pub pattern: Option, pub direction: CopyDirection, pub timestamp_range: Option, + pub limit: Option, } #[derive(Debug, Clone, Default)] diff --git a/tests/cases/standalone/common/copy/copy_from_fs_parquet.result b/tests/cases/standalone/common/copy/copy_from_fs_parquet.result index 3a2eaed6174c..dba6606bb534 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs_parquet.result +++ b/tests/cases/standalone/common/copy/copy_from_fs_parquet.result @@ -109,6 +109,22 @@ select count(*) from with_limit_rows; | 2 | +----------+ +CREATE TABLE with_limit_rows_segment(host string, cpu double, memory double, ts timestamp time index); + +Affected Rows: 0 + +Copy with_limit_rows FROM '/tmp/demo/export/parquet_files/' LIMIT 2; + +Affected Rows: 2 + +select count(*) from with_limit_rows_segment; + ++----------+ +| COUNT(*) | ++----------+ +| 0 | ++----------+ + drop table demo; Affected Rows: 0 @@ -137,3 +153,7 @@ drop table with_limit_rows; Affected Rows: 0 +drop table with_limit_rows_segment; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql b/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql index d2916e4b9322..5bd87a3f49d7 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql +++ b/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql @@ -40,6 +40,12 @@ Copy with_limit_rows FROM '/tmp/demo/export/parquet_files/' WITH (MAX_INSERT_ROW select count(*) from with_limit_rows; +CREATE TABLE with_limit_rows_segment(host string, cpu double, memory double, ts timestamp time index); + +Copy with_limit_rows FROM '/tmp/demo/export/parquet_files/' LIMIT 2; + +select count(*) from with_limit_rows_segment; + drop table demo; drop table demo_2; @@ -53,3 +59,5 @@ drop table with_pattern; drop table without_limit_rows; drop table with_limit_rows; + +drop table with_limit_rows_segment; From 847f9d7a14ec2edcda238be89478461008b6e800 Mon Sep 17 00:00:00 2001 From: irenjj Date: Fri, 10 May 2024 21:07:23 +0800 Subject: [PATCH 02/18] fmt --- src/sql/src/parsers/copy_parser.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/sql/src/parsers/copy_parser.rs b/src/sql/src/parsers/copy_parser.rs index ef05177a4386..ac0615610815 100644 --- a/src/sql/src/parsers/copy_parser.rs +++ b/src/sql/src/parsers/copy_parser.rs @@ -145,11 +145,15 @@ impl<'a> ParserContext<'a> { .collect::>()?; let limit = if self.parser.parse_keyword(Keyword::LIMIT) { - Some(self.parser.parse_literal_uint().with_context(|_| error::UnexpectedSnafu { - sql: self.sql, - expected: "maximum rows", - actual: self.peek_token_as_string(), - })?) + Some( + self.parser + .parse_literal_uint() + .with_context(|_| error::UnexpectedSnafu { + sql: self.sql, + expected: "maximum rows", + actual: self.peek_token_as_string(), + })?, + ) } else { None }; From 852f740fa4c73031f0860f938eb092eaf46232b6 Mon Sep 17 00:00:00 2001 From: irenjj Date: Sat, 11 May 2024 22:36:16 +0800 Subject: [PATCH 03/18] disable default limit --- src/operator/src/statement/copy_table_from.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/operator/src/statement/copy_table_from.rs b/src/operator/src/statement/copy_table_from.rs index 374df8380949..560edb533a4c 100644 --- a/src/operator/src/statement/copy_table_from.rs +++ b/src/operator/src/statement/copy_table_from.rs @@ -53,7 +53,6 @@ use crate::statement::StatementExecutor; const DEFAULT_BATCH_SIZE: usize = 8192; const DEFAULT_READ_BUFFER: usize = 256 * 1024; const MAX_INSERT_ROWS: &str = "max_insert_rows"; -const DEFAULT_MAX_INSERT_ROWS: usize = 1000; enum FileMetadata { Parquet { @@ -379,13 +378,13 @@ impl StatementExecutor { let mut rows_inserted = 0; let mut insert_cost = 0; - let max_insert_rows = if let Some(num) = req.limit { - num as usize + let (max_insert_rows, is_limit) = if let Some(num) = req.limit { + (num as usize, true) } else { - req.with - .get(MAX_INSERT_ROWS) - .and_then(|val| val.parse::().ok()) - .unwrap_or(DEFAULT_MAX_INSERT_ROWS) + match req.with.get(MAX_INSERT_ROWS) { + Some(num) => (num.parse::().ok().unwrap(), true), + None => (0, false), + } }; for (compat_schema, file_schema_projection, projected_table_schema, file_metadata) in files { @@ -438,7 +437,7 @@ impl StatementExecutor { insert_cost += cost; } - if rows_inserted >= max_insert_rows { + if is_limit && rows_inserted >= max_insert_rows { return Ok(gen_insert_output(rows_inserted, insert_cost)); } } From 5f1cac291a71bcd1e6d7c82da20d0e6ee6272e1f Mon Sep 17 00:00:00 2001 From: irenjj Date: Sun, 12 May 2024 22:07:20 +0800 Subject: [PATCH 04/18] fix: check parse --- src/operator/src/statement/copy_table_from.rs | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/src/operator/src/statement/copy_table_from.rs b/src/operator/src/statement/copy_table_from.rs index 560edb533a4c..a3cf366533bf 100644 --- a/src/operator/src/statement/copy_table_from.rs +++ b/src/operator/src/statement/copy_table_from.rs @@ -378,12 +378,21 @@ impl StatementExecutor { let mut rows_inserted = 0; let mut insert_cost = 0; - let (max_insert_rows, is_limit) = if let Some(num) = req.limit { - (num as usize, true) + let max_insert_rows = if let Some(num) = req.limit { + Some(num as usize) } else { match req.with.get(MAX_INSERT_ROWS) { - Some(num) => (num.parse::().ok().unwrap(), true), - None => (0, false), + Some(num) => match num.parse::() { + Ok(num) => Some(num), + Err(_) => { + return Err(error::InvalidCopyParameterSnafu { + key: MAX_INSERT_ROWS, + value: num, + } + .build()); + } + }, + None => None, } }; for (compat_schema, file_schema_projection, projected_table_schema, file_metadata) in files @@ -437,8 +446,10 @@ impl StatementExecutor { insert_cost += cost; } - if is_limit && rows_inserted >= max_insert_rows { - return Ok(gen_insert_output(rows_inserted, insert_cost)); + if let Some(max_insert_rows) = max_insert_rows { + if rows_inserted >= max_insert_rows { + return Ok(gen_insert_output(rows_inserted, insert_cost)); + } } } From 3f7198d6ccfe3fe204f7c8c28a330dc014aa6c9f Mon Sep 17 00:00:00 2001 From: irenjj Date: Mon, 13 May 2024 20:49:29 +0800 Subject: [PATCH 05/18] fix test, add error case --- .../standalone/common/copy/copy_from_fs_parquet.result | 8 ++++++-- .../cases/standalone/common/copy/copy_from_fs_parquet.sql | 4 +++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/tests/cases/standalone/common/copy/copy_from_fs_parquet.result b/tests/cases/standalone/common/copy/copy_from_fs_parquet.result index dba6606bb534..1e34604654f9 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs_parquet.result +++ b/tests/cases/standalone/common/copy/copy_from_fs_parquet.result @@ -113,7 +113,7 @@ CREATE TABLE with_limit_rows_segment(host string, cpu double, memory double, ts Affected Rows: 0 -Copy with_limit_rows FROM '/tmp/demo/export/parquet_files/' LIMIT 2; +Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT 2; Affected Rows: 2 @@ -122,9 +122,13 @@ select count(*) from with_limit_rows_segment; +----------+ | COUNT(*) | +----------+ -| 0 | +| 2 | +----------+ +Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT -1; + +Error: 2000(InvalidSyntax), Unexpected token while parsing SQL statement: Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT -1;, expected: 'maximum rows', found: 1: sql parser error: Expected literal int, found: - at Line: 1, Column 75 + drop table demo; Affected Rows: 0 diff --git a/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql b/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql index 5bd87a3f49d7..dd5e38143b35 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql +++ b/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql @@ -42,10 +42,12 @@ select count(*) from with_limit_rows; CREATE TABLE with_limit_rows_segment(host string, cpu double, memory double, ts timestamp time index); -Copy with_limit_rows FROM '/tmp/demo/export/parquet_files/' LIMIT 2; +Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT 2; select count(*) from with_limit_rows_segment; +Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT -1; + drop table demo; drop table demo_2; From 979229cbef46216a1b680787992dd89b136ac7f6 Mon Sep 17 00:00:00 2001 From: irenjj Date: Wed, 15 May 2024 22:05:39 +0800 Subject: [PATCH 06/18] fix: forbide LIMIT in database --- src/sql/src/error.rs | 8 ++++++++ src/sql/src/parsers/copy_parser.rs | 20 +++++++++++++++++-- .../copy/copy_database_from_fs_parquet.result | 8 ++++++++ .../copy/copy_database_from_fs_parquet.sql | 4 ++++ 4 files changed, 38 insertions(+), 2 deletions(-) diff --git a/src/sql/src/error.rs b/src/sql/src/error.rs index 6f1516d78f69..b706d07e851b 100644 --- a/src/sql/src/error.rs +++ b/src/sql/src/error.rs @@ -149,6 +149,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Unrecognized database option key: {}, value: {}", key, value))] + InvalidDatabaseOptionValue { + key: Ident, + value: u64, + location: Location, + }, + #[snafu(display("Failed to serialize column default constraint"))] SerializeColumnDefaultConstraint { location: Location, @@ -210,6 +217,7 @@ impl ErrorExt for Error { InvalidColumnOption { .. } | InvalidTableOptionValue { .. } + | InvalidDatabaseOptionValue { .. } | InvalidDatabaseName { .. } | ColumnTypeMismatch { .. } | InvalidTableName { .. } diff --git a/src/sql/src/parsers/copy_parser.rs b/src/sql/src/parsers/copy_parser.rs index ac0615610815..24efbc20a146 100644 --- a/src/sql/src/parsers/copy_parser.rs +++ b/src/sql/src/parsers/copy_parser.rs @@ -56,7 +56,15 @@ impl<'a> ParserContext<'a> { })?; let req = if self.parser.parse_keyword(Keyword::TO) { - let (with, connection, location, _) = self.parse_copy_parameters()?; + let (with, connection, location, limit) = self.parse_copy_parameters()?; + if let Some(num) = limit { + return Err(error::InvalidDatabaseOptionValueSnafu { + key: "LIMIT", + value: num, + } + .build()); + } + let argument = CopyDatabaseArgument { database_name, with: with.into(), @@ -68,7 +76,15 @@ impl<'a> ParserContext<'a> { self.parser .expect_keyword(Keyword::FROM) .context(error::SyntaxSnafu)?; - let (with, connection, location, _) = self.parse_copy_parameters()?; + let (with, connection, location, limit) = self.parse_copy_parameters()?; + if let Some(num) = limit { + return Err(error::InvalidDatabaseOptionValueSnafu { + key: "LIMIT", + value: num, + } + .build()); + } + let argument = CopyDatabaseArgument { database_name, with: with.into(), diff --git a/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result b/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result index 8d0ef6e834a3..36cb9afbd4ec 100644 --- a/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result +++ b/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result @@ -52,6 +52,14 @@ SELECT * FROM demo ORDER BY ts; | host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 | +-------+------+--------+---------------------+ +DELETE FROM demo; + +Affected Rows: 1 + +COPY DATABASE public FROM '/tmp/demo/export/parquet_range/' LIMIT 2; + +Error: 1004(InvalidArguments), Unrecognized database option key: LIMIT, value: 2 + DROP TABLE demo; Affected Rows: 0 diff --git a/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.sql b/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.sql index 80fee15303d3..5d3716dc2215 100644 --- a/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.sql +++ b/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.sql @@ -20,4 +20,8 @@ COPY DATABASE public FROM '/tmp/demo/export/parquet_range/'; SELECT * FROM demo ORDER BY ts; +DELETE FROM demo; + +COPY DATABASE public FROM '/tmp/demo/export/parquet_range/' LIMIT 2; + DROP TABLE demo; From a32c7005b5423776b6f9944c103922eec6f51caf Mon Sep 17 00:00:00 2001 From: irenjj Date: Wed, 15 May 2024 22:13:47 +0800 Subject: [PATCH 07/18] fix: only support LIMIT segment --- src/operator/src/statement/copy_table_from.rs | 21 ++----------------- .../common/copy/copy_from_fs_parquet.result | 20 ------------------ .../common/copy/copy_from_fs_parquet.sql | 8 ------- 3 files changed, 2 insertions(+), 47 deletions(-) diff --git a/src/operator/src/statement/copy_table_from.rs b/src/operator/src/statement/copy_table_from.rs index a3cf366533bf..a2cf97191cf9 100644 --- a/src/operator/src/statement/copy_table_from.rs +++ b/src/operator/src/statement/copy_table_from.rs @@ -52,7 +52,6 @@ use crate::statement::StatementExecutor; const DEFAULT_BATCH_SIZE: usize = 8192; const DEFAULT_READ_BUFFER: usize = 256 * 1024; -const MAX_INSERT_ROWS: &str = "max_insert_rows"; enum FileMetadata { Parquet { @@ -378,23 +377,7 @@ impl StatementExecutor { let mut rows_inserted = 0; let mut insert_cost = 0; - let max_insert_rows = if let Some(num) = req.limit { - Some(num as usize) - } else { - match req.with.get(MAX_INSERT_ROWS) { - Some(num) => match num.parse::() { - Ok(num) => Some(num), - Err(_) => { - return Err(error::InvalidCopyParameterSnafu { - key: MAX_INSERT_ROWS, - value: num, - } - .build()); - } - }, - None => None, - } - }; + let max_insert_rows = req.limit; for (compat_schema, file_schema_projection, projected_table_schema, file_metadata) in files { let mut stream = self @@ -447,7 +430,7 @@ impl StatementExecutor { } if let Some(max_insert_rows) = max_insert_rows { - if rows_inserted >= max_insert_rows { + if rows_inserted >= max_insert_rows as usize { return Ok(gen_insert_output(rows_inserted, insert_cost)); } } diff --git a/tests/cases/standalone/common/copy/copy_from_fs_parquet.result b/tests/cases/standalone/common/copy/copy_from_fs_parquet.result index 1e34604654f9..b284d24ffdc1 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs_parquet.result +++ b/tests/cases/standalone/common/copy/copy_from_fs_parquet.result @@ -93,22 +93,6 @@ select count(*) from without_limit_rows; | 4 | +----------+ -CREATE TABLE with_limit_rows(host string, cpu double, memory double, ts timestamp time index); - -Affected Rows: 0 - -Copy with_limit_rows FROM '/tmp/demo/export/parquet_files/' WITH (MAX_INSERT_ROWS = 2); - -Affected Rows: 2 - -select count(*) from with_limit_rows; - -+----------+ -| COUNT(*) | -+----------+ -| 2 | -+----------+ - CREATE TABLE with_limit_rows_segment(host string, cpu double, memory double, ts timestamp time index); Affected Rows: 0 @@ -153,10 +137,6 @@ drop table without_limit_rows; Affected Rows: 0 -drop table with_limit_rows; - -Affected Rows: 0 - drop table with_limit_rows_segment; Affected Rows: 0 diff --git a/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql b/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql index dd5e38143b35..741c3a7fb361 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql +++ b/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql @@ -34,12 +34,6 @@ Copy without_limit_rows FROM '/tmp/demo/export/parquet_files/'; select count(*) from without_limit_rows; -CREATE TABLE with_limit_rows(host string, cpu double, memory double, ts timestamp time index); - -Copy with_limit_rows FROM '/tmp/demo/export/parquet_files/' WITH (MAX_INSERT_ROWS = 2); - -select count(*) from with_limit_rows; - CREATE TABLE with_limit_rows_segment(host string, cpu double, memory double, ts timestamp time index); Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT 2; @@ -60,6 +54,4 @@ drop table with_pattern; drop table without_limit_rows; -drop table with_limit_rows; - drop table with_limit_rows_segment; From 1325a5c5595da2e916394ae9347830b3e4f871fc Mon Sep 17 00:00:00 2001 From: irenjj Date: Wed, 15 May 2024 22:16:28 +0800 Subject: [PATCH 08/18] fix: simplify --- src/operator/src/statement/copy_table_from.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/operator/src/statement/copy_table_from.rs b/src/operator/src/statement/copy_table_from.rs index a2cf97191cf9..52880d700e29 100644 --- a/src/operator/src/statement/copy_table_from.rs +++ b/src/operator/src/statement/copy_table_from.rs @@ -377,7 +377,7 @@ impl StatementExecutor { let mut rows_inserted = 0; let mut insert_cost = 0; - let max_insert_rows = req.limit; + let max_insert_rows = req.limit.map(|n| n as usize); for (compat_schema, file_schema_projection, projected_table_schema, file_metadata) in files { let mut stream = self @@ -430,7 +430,7 @@ impl StatementExecutor { } if let Some(max_insert_rows) = max_insert_rows { - if rows_inserted >= max_insert_rows as usize { + if rows_inserted >= max_insert_rows { return Ok(gen_insert_output(rows_inserted, insert_cost)); } } From 81da1354d46a6bb9dfbff5443bc11244d2749733 Mon Sep 17 00:00:00 2001 From: irenjj Date: Wed, 15 May 2024 22:38:35 +0800 Subject: [PATCH 09/18] fix --- src/sql/src/parsers/copy_parser.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sql/src/parsers/copy_parser.rs b/src/sql/src/parsers/copy_parser.rs index 24efbc20a146..bdbd3419b739 100644 --- a/src/sql/src/parsers/copy_parser.rs +++ b/src/sql/src/parsers/copy_parser.rs @@ -58,11 +58,11 @@ impl<'a> ParserContext<'a> { let req = if self.parser.parse_keyword(Keyword::TO) { let (with, connection, location, limit) = self.parse_copy_parameters()?; if let Some(num) = limit { - return Err(error::InvalidDatabaseOptionValueSnafu { + return error::InvalidDatabaseOptionValueSnafu { key: "LIMIT", value: num, } - .build()); + .fail(); } let argument = CopyDatabaseArgument { From a03396baf13b1f961f6bcb61fe862afaa1d5dab6 Mon Sep 17 00:00:00 2001 From: irenjj Date: Wed, 15 May 2024 22:45:24 +0800 Subject: [PATCH 10/18] fix --- src/sql/src/error.rs | 2 +- src/sql/src/parsers/copy_parser.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/sql/src/error.rs b/src/sql/src/error.rs index b706d07e851b..634d755d0266 100644 --- a/src/sql/src/error.rs +++ b/src/sql/src/error.rs @@ -151,7 +151,7 @@ pub enum Error { #[snafu(display("Unrecognized database option key: {}, value: {}", key, value))] InvalidDatabaseOptionValue { - key: Ident, + key: String, value: u64, location: Location, }, diff --git a/src/sql/src/parsers/copy_parser.rs b/src/sql/src/parsers/copy_parser.rs index bdbd3419b739..771de885f778 100644 --- a/src/sql/src/parsers/copy_parser.rs +++ b/src/sql/src/parsers/copy_parser.rs @@ -78,11 +78,11 @@ impl<'a> ParserContext<'a> { .context(error::SyntaxSnafu)?; let (with, connection, location, limit) = self.parse_copy_parameters()?; if let Some(num) = limit { - return Err(error::InvalidDatabaseOptionValueSnafu { + return error::InvalidDatabaseOptionValueSnafu { key: "LIMIT", value: num, } - .build()); + .fail(); } let argument = CopyDatabaseArgument { From 1d321f3969b971944708d9924f4b10d21d375e34 Mon Sep 17 00:00:00 2001 From: irenjj Date: Wed, 15 May 2024 22:53:05 +0800 Subject: [PATCH 11/18] fix --- src/sql/src/error.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/sql/src/error.rs b/src/sql/src/error.rs index 634d755d0266..4ab3c28617ad 100644 --- a/src/sql/src/error.rs +++ b/src/sql/src/error.rs @@ -146,12 +146,11 @@ pub enum Error { InvalidTableOptionValue { key: Ident, value: Expr, - location: Location, }, #[snafu(display("Unrecognized database option key: {}, value: {}", key, value))] InvalidDatabaseOptionValue { - key: String, + key: Ident, value: u64, location: Location, }, From 0c7372b47f9ecff09be1cf2563b82e2f2c6029cb Mon Sep 17 00:00:00 2001 From: irenjj Date: Wed, 15 May 2024 22:59:02 +0800 Subject: [PATCH 12/18] fix --- src/sql/src/error.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sql/src/error.rs b/src/sql/src/error.rs index 4ab3c28617ad..a02deaac75ae 100644 --- a/src/sql/src/error.rs +++ b/src/sql/src/error.rs @@ -146,13 +146,13 @@ pub enum Error { InvalidTableOptionValue { key: Ident, value: Expr, + location: Location, }, #[snafu(display("Unrecognized database option key: {}, value: {}", key, value))] InvalidDatabaseOptionValue { key: Ident, value: u64, - location: Location, }, #[snafu(display("Failed to serialize column default constraint"))] From 082d5c482abcf357d77ddfbdbad49cc0bbfb1797 Mon Sep 17 00:00:00 2001 From: irenjj Date: Wed, 15 May 2024 23:05:20 +0800 Subject: [PATCH 13/18] fix: test --- src/sql/src/error.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/sql/src/error.rs b/src/sql/src/error.rs index a02deaac75ae..e312a41b71fb 100644 --- a/src/sql/src/error.rs +++ b/src/sql/src/error.rs @@ -153,6 +153,8 @@ pub enum Error { InvalidDatabaseOptionValue { key: Ident, value: u64, + #[snafu(implicit)] + location: Location, }, #[snafu(display("Failed to serialize column default constraint"))] From 628a33d044e7483c57e16f007e5b8cc7c4810838 Mon Sep 17 00:00:00 2001 From: irenjj Date: Wed, 15 May 2024 23:33:37 +0800 Subject: [PATCH 14/18] fix: change error info --- src/sql/src/error.rs | 9 --------- src/sql/src/parsers/copy_parser.rs | 10 ++++------ .../common/copy/copy_database_from_fs_parquet.result | 2 +- 3 files changed, 5 insertions(+), 16 deletions(-) diff --git a/src/sql/src/error.rs b/src/sql/src/error.rs index e312a41b71fb..6f1516d78f69 100644 --- a/src/sql/src/error.rs +++ b/src/sql/src/error.rs @@ -149,14 +149,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Unrecognized database option key: {}, value: {}", key, value))] - InvalidDatabaseOptionValue { - key: Ident, - value: u64, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Failed to serialize column default constraint"))] SerializeColumnDefaultConstraint { location: Location, @@ -218,7 +210,6 @@ impl ErrorExt for Error { InvalidColumnOption { .. } | InvalidTableOptionValue { .. } - | InvalidDatabaseOptionValue { .. } | InvalidDatabaseName { .. } | ColumnTypeMismatch { .. } | InvalidTableName { .. } diff --git a/src/sql/src/parsers/copy_parser.rs b/src/sql/src/parsers/copy_parser.rs index 771de885f778..cc775b12340a 100644 --- a/src/sql/src/parsers/copy_parser.rs +++ b/src/sql/src/parsers/copy_parser.rs @@ -58,9 +58,8 @@ impl<'a> ParserContext<'a> { let req = if self.parser.parse_keyword(Keyword::TO) { let (with, connection, location, limit) = self.parse_copy_parameters()?; if let Some(num) = limit { - return error::InvalidDatabaseOptionValueSnafu { - key: "LIMIT", - value: num, + return error::InvalidSqlSnafu { + msg: "limit xxx is not supported", } .fail(); } @@ -78,9 +77,8 @@ impl<'a> ParserContext<'a> { .context(error::SyntaxSnafu)?; let (with, connection, location, limit) = self.parse_copy_parameters()?; if let Some(num) = limit { - return error::InvalidDatabaseOptionValueSnafu { - key: "LIMIT", - value: num, + return error::InvalidSqlSnafu { + msg: "limit xxx is not supported", } .fail(); } diff --git a/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result b/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result index 36cb9afbd4ec..9b5dc243360a 100644 --- a/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result +++ b/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result @@ -58,7 +58,7 @@ Affected Rows: 1 COPY DATABASE public FROM '/tmp/demo/export/parquet_range/' LIMIT 2; -Error: 1004(InvalidArguments), Unrecognized database option key: LIMIT, value: 2 +Error: 2000(InvalidSyntax), Invalid SQL, error: limit xxx is not supported DROP TABLE demo; From f06d64ac1cd612a426a73180aed1cda24d334aa3 Mon Sep 17 00:00:00 2001 From: irenjj Date: Wed, 15 May 2024 23:42:37 +0800 Subject: [PATCH 15/18] fix clippy --- src/sql/src/parsers/copy_parser.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sql/src/parsers/copy_parser.rs b/src/sql/src/parsers/copy_parser.rs index cc775b12340a..9f91dba6124b 100644 --- a/src/sql/src/parsers/copy_parser.rs +++ b/src/sql/src/parsers/copy_parser.rs @@ -57,7 +57,7 @@ impl<'a> ParserContext<'a> { let req = if self.parser.parse_keyword(Keyword::TO) { let (with, connection, location, limit) = self.parse_copy_parameters()?; - if let Some(num) = limit { + if limit.is_some() { return error::InvalidSqlSnafu { msg: "limit xxx is not supported", } @@ -76,7 +76,7 @@ impl<'a> ParserContext<'a> { .expect_keyword(Keyword::FROM) .context(error::SyntaxSnafu)?; let (with, connection, location, limit) = self.parse_copy_parameters()?; - if let Some(num) = limit { + if limit.is_some() { return error::InvalidSqlSnafu { msg: "limit xxx is not supported", } From 5eea38535f1792903a746a9df84fa21526d01e92 Mon Sep 17 00:00:00 2001 From: irenjj Date: Thu, 16 May 2024 13:54:14 +0800 Subject: [PATCH 16/18] fix: fix error msg --- src/sql/src/parsers/copy_parser.rs | 2 +- .../standalone/common/copy/copy_database_from_fs_parquet.result | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sql/src/parsers/copy_parser.rs b/src/sql/src/parsers/copy_parser.rs index 9f91dba6124b..da923c8d9131 100644 --- a/src/sql/src/parsers/copy_parser.rs +++ b/src/sql/src/parsers/copy_parser.rs @@ -59,7 +59,7 @@ impl<'a> ParserContext<'a> { let (with, connection, location, limit) = self.parse_copy_parameters()?; if limit.is_some() { return error::InvalidSqlSnafu { - msg: "limit xxx is not supported", + msg: "limit is not supported", } .fail(); } diff --git a/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result b/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result index 9b5dc243360a..18badf93f1d0 100644 --- a/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result +++ b/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result @@ -58,7 +58,7 @@ Affected Rows: 1 COPY DATABASE public FROM '/tmp/demo/export/parquet_range/' LIMIT 2; -Error: 2000(InvalidSyntax), Invalid SQL, error: limit xxx is not supported +Error: 2000(InvalidSyntax), Invalid SQL, error: limit is not supported DROP TABLE demo; From 633b63fc3154d6ba061ebe0da38e012be37cbdf9 Mon Sep 17 00:00:00 2001 From: irenjj Date: Thu, 16 May 2024 14:39:39 +0800 Subject: [PATCH 17/18] fix test --- src/sql/src/parsers/copy_parser.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sql/src/parsers/copy_parser.rs b/src/sql/src/parsers/copy_parser.rs index da923c8d9131..b09cc57537e3 100644 --- a/src/sql/src/parsers/copy_parser.rs +++ b/src/sql/src/parsers/copy_parser.rs @@ -78,7 +78,7 @@ impl<'a> ParserContext<'a> { let (with, connection, location, limit) = self.parse_copy_parameters()?; if limit.is_some() { return error::InvalidSqlSnafu { - msg: "limit xxx is not supported", + msg: "limit is not supported", } .fail(); } From f78088c22f30ee4d7fb2ce6016f0ac255fb54568 Mon Sep 17 00:00:00 2001 From: irenjj Date: Thu, 16 May 2024 21:03:04 +0800 Subject: [PATCH 18/18] fix: test error info --- src/sql/src/parsers/copy_parser.rs | 2 +- .../cases/standalone/common/copy/copy_from_fs_parquet.result | 4 ++-- tests/cases/standalone/common/copy/copy_from_fs_parquet.sql | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/sql/src/parsers/copy_parser.rs b/src/sql/src/parsers/copy_parser.rs index b09cc57537e3..a5ec78377b33 100644 --- a/src/sql/src/parsers/copy_parser.rs +++ b/src/sql/src/parsers/copy_parser.rs @@ -164,7 +164,7 @@ impl<'a> ParserContext<'a> { .parse_literal_uint() .with_context(|_| error::UnexpectedSnafu { sql: self.sql, - expected: "maximum rows", + expected: "the number of maximum rows", actual: self.peek_token_as_string(), })?, ) diff --git a/tests/cases/standalone/common/copy/copy_from_fs_parquet.result b/tests/cases/standalone/common/copy/copy_from_fs_parquet.result index b284d24ffdc1..54ec2f1af308 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs_parquet.result +++ b/tests/cases/standalone/common/copy/copy_from_fs_parquet.result @@ -109,9 +109,9 @@ select count(*) from with_limit_rows_segment; | 2 | +----------+ -Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT -1; +Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT hello; -Error: 2000(InvalidSyntax), Unexpected token while parsing SQL statement: Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT -1;, expected: 'maximum rows', found: 1: sql parser error: Expected literal int, found: - at Line: 1, Column 75 +Error: 2000(InvalidSyntax), Unexpected token while parsing SQL statement: Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT hello;, expected: 'the number of maximum rows', found: ;: sql parser error: Expected literal int, found: hello at Line: 1, Column 75 drop table demo; diff --git a/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql b/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql index 741c3a7fb361..83cdc4f74ceb 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql +++ b/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql @@ -40,7 +40,7 @@ Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT 2; select count(*) from with_limit_rows_segment; -Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT -1; +Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT hello; drop table demo;