diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index d1eb1d064263..38b3236fa21c 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -321,6 +321,7 @@ fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result< connection, with, table_name, + limit, .. } = match stmt { CopyTable::To(arg) => arg, @@ -347,6 +348,7 @@ fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result< pattern, direction, timestamp_range, + limit, }) } diff --git a/src/operator/src/statement/copy_database.rs b/src/operator/src/statement/copy_database.rs index fac6d77a6c15..daed3379aedc 100644 --- a/src/operator/src/statement/copy_database.rs +++ b/src/operator/src/statement/copy_database.rs @@ -90,6 +90,7 @@ impl StatementExecutor { pattern: None, direction: CopyDirection::Export, timestamp_range: req.time_range, + limit: None, }, ctx.clone(), ) @@ -155,6 +156,7 @@ impl StatementExecutor { pattern: None, direction: CopyDirection::Import, timestamp_range: None, + limit: None, }; debug!("Copy table, arg: {:?}", req); match self.copy_table_from(req, ctx.clone()).await { diff --git a/src/operator/src/statement/copy_table_from.rs b/src/operator/src/statement/copy_table_from.rs index aa4e2343a38a..52880d700e29 100644 --- a/src/operator/src/statement/copy_table_from.rs +++ b/src/operator/src/statement/copy_table_from.rs @@ -52,8 +52,6 @@ use crate::statement::StatementExecutor; const DEFAULT_BATCH_SIZE: usize = 8192; const DEFAULT_READ_BUFFER: usize = 256 * 1024; -const MAX_INSERT_ROWS: &str = "max_insert_rows"; -const DEFAULT_MAX_INSERT_ROWS: usize = 1000; enum FileMetadata { Parquet { @@ -379,11 +377,7 @@ impl StatementExecutor { let mut rows_inserted = 0; let mut insert_cost = 0; - let max_insert_rows = req - .with - .get(MAX_INSERT_ROWS) - .and_then(|val| val.parse::().ok()) - .unwrap_or(DEFAULT_MAX_INSERT_ROWS); + let max_insert_rows = req.limit.map(|n| n as usize); for (compat_schema, file_schema_projection, projected_table_schema, file_metadata) in files { let mut stream = self @@ -435,8 +429,10 @@ impl StatementExecutor { insert_cost += cost; } - if rows_inserted >= max_insert_rows { - return Ok(gen_insert_output(rows_inserted, insert_cost)); + if let Some(max_insert_rows) = max_insert_rows { + if rows_inserted >= max_insert_rows { + return Ok(gen_insert_output(rows_inserted, insert_cost)); + } } } diff --git a/src/sql/src/parsers/copy_parser.rs b/src/sql/src/parsers/copy_parser.rs index b9e9be85f732..a5ec78377b33 100644 --- a/src/sql/src/parsers/copy_parser.rs +++ b/src/sql/src/parsers/copy_parser.rs @@ -56,7 +56,14 @@ impl<'a> ParserContext<'a> { })?; let req = if self.parser.parse_keyword(Keyword::TO) { - let (with, connection, location) = self.parse_copy_parameters()?; + let (with, connection, location, limit) = self.parse_copy_parameters()?; + if limit.is_some() { + return error::InvalidSqlSnafu { + msg: "limit is not supported", + } + .fail(); + } + let argument = CopyDatabaseArgument { database_name, with: with.into(), @@ -68,7 +75,14 @@ impl<'a> ParserContext<'a> { self.parser .expect_keyword(Keyword::FROM) .context(error::SyntaxSnafu)?; - let (with, connection, location) = self.parse_copy_parameters()?; + let (with, connection, location, limit) = self.parse_copy_parameters()?; + if limit.is_some() { + return error::InvalidSqlSnafu { + msg: "limit is not supported", + } + .fail(); + } + let argument = CopyDatabaseArgument { database_name, with: with.into(), @@ -91,28 +105,30 @@ impl<'a> ParserContext<'a> { let table_name = Self::canonicalize_object_name(raw_table_name); if self.parser.parse_keyword(Keyword::TO) { - let (with, connection, location) = self.parse_copy_parameters()?; + let (with, connection, location, limit) = self.parse_copy_parameters()?; Ok(CopyTable::To(CopyTableArgument { table_name, with: with.into(), connection: connection.into(), location, + limit, })) } else { self.parser .expect_keyword(Keyword::FROM) .context(error::SyntaxSnafu)?; - let (with, connection, location) = self.parse_copy_parameters()?; + let (with, connection, location, limit) = self.parse_copy_parameters()?; Ok(CopyTable::From(CopyTableArgument { table_name, with: with.into(), connection: connection.into(), location, + limit, })) } } - fn parse_copy_parameters(&mut self) -> Result<(With, Connection, String)> { + fn parse_copy_parameters(&mut self) -> Result<(With, Connection, String, Option)> { let location = self.parser .parse_literal_string() @@ -142,7 +158,21 @@ impl<'a> ParserContext<'a> { .map(parse_option_string) .collect::>()?; - Ok((with, connection, location)) + let limit = if self.parser.parse_keyword(Keyword::LIMIT) { + Some( + self.parser + .parse_literal_uint() + .with_context(|_| error::UnexpectedSnafu { + sql: self.sql, + expected: "the number of maximum rows", + actual: self.peek_token_as_string(), + })?, + ) + } else { + None + }; + + Ok((with, connection, location, limit)) } } diff --git a/src/sql/src/statements/copy.rs b/src/sql/src/statements/copy.rs index e99727f89a3f..c68b9d8c0321 100644 --- a/src/sql/src/statements/copy.rs +++ b/src/sql/src/statements/copy.rs @@ -111,6 +111,7 @@ pub struct CopyTableArgument { pub connection: OptionMap, /// Copy tbl [To|From] 'location'. pub location: String, + pub limit: Option, } #[cfg(test)] diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 4ddea65aea0f..10182baeb463 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -228,6 +228,7 @@ pub struct CopyTableRequest { pub pattern: Option, pub direction: CopyDirection, pub timestamp_range: Option, + pub limit: Option, } #[derive(Debug, Clone, Default)] diff --git a/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result b/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result index 8d0ef6e834a3..18badf93f1d0 100644 --- a/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result +++ b/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.result @@ -52,6 +52,14 @@ SELECT * FROM demo ORDER BY ts; | host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 | +-------+------+--------+---------------------+ +DELETE FROM demo; + +Affected Rows: 1 + +COPY DATABASE public FROM '/tmp/demo/export/parquet_range/' LIMIT 2; + +Error: 2000(InvalidSyntax), Invalid SQL, error: limit is not supported + DROP TABLE demo; Affected Rows: 0 diff --git a/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.sql b/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.sql index 80fee15303d3..5d3716dc2215 100644 --- a/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.sql +++ b/tests/cases/standalone/common/copy/copy_database_from_fs_parquet.sql @@ -20,4 +20,8 @@ COPY DATABASE public FROM '/tmp/demo/export/parquet_range/'; SELECT * FROM demo ORDER BY ts; +DELETE FROM demo; + +COPY DATABASE public FROM '/tmp/demo/export/parquet_range/' LIMIT 2; + DROP TABLE demo; diff --git a/tests/cases/standalone/common/copy/copy_from_fs_parquet.result b/tests/cases/standalone/common/copy/copy_from_fs_parquet.result index 3a2eaed6174c..54ec2f1af308 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs_parquet.result +++ b/tests/cases/standalone/common/copy/copy_from_fs_parquet.result @@ -93,15 +93,15 @@ select count(*) from without_limit_rows; | 4 | +----------+ -CREATE TABLE with_limit_rows(host string, cpu double, memory double, ts timestamp time index); +CREATE TABLE with_limit_rows_segment(host string, cpu double, memory double, ts timestamp time index); Affected Rows: 0 -Copy with_limit_rows FROM '/tmp/demo/export/parquet_files/' WITH (MAX_INSERT_ROWS = 2); +Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT 2; Affected Rows: 2 -select count(*) from with_limit_rows; +select count(*) from with_limit_rows_segment; +----------+ | COUNT(*) | @@ -109,6 +109,10 @@ select count(*) from with_limit_rows; | 2 | +----------+ +Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT hello; + +Error: 2000(InvalidSyntax), Unexpected token while parsing SQL statement: Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT hello;, expected: 'the number of maximum rows', found: ;: sql parser error: Expected literal int, found: hello at Line: 1, Column 75 + drop table demo; Affected Rows: 0 @@ -133,7 +137,7 @@ drop table without_limit_rows; Affected Rows: 0 -drop table with_limit_rows; +drop table with_limit_rows_segment; Affected Rows: 0 diff --git a/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql b/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql index d2916e4b9322..83cdc4f74ceb 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql +++ b/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql @@ -34,11 +34,13 @@ Copy without_limit_rows FROM '/tmp/demo/export/parquet_files/'; select count(*) from without_limit_rows; -CREATE TABLE with_limit_rows(host string, cpu double, memory double, ts timestamp time index); +CREATE TABLE with_limit_rows_segment(host string, cpu double, memory double, ts timestamp time index); -Copy with_limit_rows FROM '/tmp/demo/export/parquet_files/' WITH (MAX_INSERT_ROWS = 2); +Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT 2; -select count(*) from with_limit_rows; +select count(*) from with_limit_rows_segment; + +Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT hello; drop table demo; @@ -52,4 +54,4 @@ drop table with_pattern; drop table without_limit_rows; -drop table with_limit_rows; +drop table with_limit_rows_segment;