Skip to content

Commit

Permalink
feat: limit total rows copied in COPY TABLE FROM with LIMIT segment (#…
Browse files Browse the repository at this point in the history
…3910)

* feat: limit total rows copied in COPY TABLE FROM with LIMIT segment

* fmt

* disable default limit

* fix: check parse

* fix test, add error case

* fix: forbide LIMIT in database

* fix: only support LIMIT segment

* fix: simplify

* fix

* fix

* fix

* fix

* fix: test

* fix: change error info

* fix clippy

* fix: fix error msg

* fix test

* fix: test error info
  • Loading branch information
irenjj authored May 16, 2024
1 parent 669a6d8 commit f93b5b1
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 23 deletions.
2 changes: 2 additions & 0 deletions src/operator/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<
connection,
with,
table_name,
limit,
..
} = match stmt {
CopyTable::To(arg) => arg,
Expand All @@ -347,6 +348,7 @@ fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<
pattern,
direction,
timestamp_range,
limit,
})
}

Expand Down
2 changes: 2 additions & 0 deletions src/operator/src/statement/copy_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ impl StatementExecutor {
pattern: None,
direction: CopyDirection::Export,
timestamp_range: req.time_range,
limit: None,
},
ctx.clone(),
)
Expand Down Expand Up @@ -155,6 +156,7 @@ impl StatementExecutor {
pattern: None,
direction: CopyDirection::Import,
timestamp_range: None,
limit: None,
};
debug!("Copy table, arg: {:?}", req);
match self.copy_table_from(req, ctx.clone()).await {
Expand Down
14 changes: 5 additions & 9 deletions src/operator/src/statement/copy_table_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ use crate::statement::StatementExecutor;

const DEFAULT_BATCH_SIZE: usize = 8192;
const DEFAULT_READ_BUFFER: usize = 256 * 1024;
const MAX_INSERT_ROWS: &str = "max_insert_rows";
const DEFAULT_MAX_INSERT_ROWS: usize = 1000;

enum FileMetadata {
Parquet {
Expand Down Expand Up @@ -379,11 +377,7 @@ impl StatementExecutor {

let mut rows_inserted = 0;
let mut insert_cost = 0;
let max_insert_rows = req
.with
.get(MAX_INSERT_ROWS)
.and_then(|val| val.parse::<usize>().ok())
.unwrap_or(DEFAULT_MAX_INSERT_ROWS);
let max_insert_rows = req.limit.map(|n| n as usize);
for (compat_schema, file_schema_projection, projected_table_schema, file_metadata) in files
{
let mut stream = self
Expand Down Expand Up @@ -435,8 +429,10 @@ impl StatementExecutor {
insert_cost += cost;
}

if rows_inserted >= max_insert_rows {
return Ok(gen_insert_output(rows_inserted, insert_cost));
if let Some(max_insert_rows) = max_insert_rows {
if rows_inserted >= max_insert_rows {
return Ok(gen_insert_output(rows_inserted, insert_cost));
}
}
}

Expand Down
42 changes: 36 additions & 6 deletions src/sql/src/parsers/copy_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,14 @@ impl<'a> ParserContext<'a> {
})?;

let req = if self.parser.parse_keyword(Keyword::TO) {
let (with, connection, location) = self.parse_copy_parameters()?;
let (with, connection, location, limit) = self.parse_copy_parameters()?;
if limit.is_some() {
return error::InvalidSqlSnafu {
msg: "limit is not supported",
}
.fail();
}

let argument = CopyDatabaseArgument {
database_name,
with: with.into(),
Expand All @@ -68,7 +75,14 @@ impl<'a> ParserContext<'a> {
self.parser
.expect_keyword(Keyword::FROM)
.context(error::SyntaxSnafu)?;
let (with, connection, location) = self.parse_copy_parameters()?;
let (with, connection, location, limit) = self.parse_copy_parameters()?;
if limit.is_some() {
return error::InvalidSqlSnafu {
msg: "limit is not supported",
}
.fail();
}

let argument = CopyDatabaseArgument {
database_name,
with: with.into(),
Expand All @@ -91,28 +105,30 @@ impl<'a> ParserContext<'a> {
let table_name = Self::canonicalize_object_name(raw_table_name);

if self.parser.parse_keyword(Keyword::TO) {
let (with, connection, location) = self.parse_copy_parameters()?;
let (with, connection, location, limit) = self.parse_copy_parameters()?;
Ok(CopyTable::To(CopyTableArgument {
table_name,
with: with.into(),
connection: connection.into(),
location,
limit,
}))
} else {
self.parser
.expect_keyword(Keyword::FROM)
.context(error::SyntaxSnafu)?;
let (with, connection, location) = self.parse_copy_parameters()?;
let (with, connection, location, limit) = self.parse_copy_parameters()?;
Ok(CopyTable::From(CopyTableArgument {
table_name,
with: with.into(),
connection: connection.into(),
location,
limit,
}))
}
}

fn parse_copy_parameters(&mut self) -> Result<(With, Connection, String)> {
fn parse_copy_parameters(&mut self) -> Result<(With, Connection, String, Option<u64>)> {
let location =
self.parser
.parse_literal_string()
Expand Down Expand Up @@ -142,7 +158,21 @@ impl<'a> ParserContext<'a> {
.map(parse_option_string)
.collect::<Result<Connection>>()?;

Ok((with, connection, location))
let limit = if self.parser.parse_keyword(Keyword::LIMIT) {
Some(
self.parser
.parse_literal_uint()
.with_context(|_| error::UnexpectedSnafu {
sql: self.sql,
expected: "the number of maximum rows",
actual: self.peek_token_as_string(),
})?,
)
} else {
None
};

Ok((with, connection, location, limit))
}
}

Expand Down
1 change: 1 addition & 0 deletions src/sql/src/statements/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ pub struct CopyTableArgument {
pub connection: OptionMap,
/// Copy tbl [To|From] 'location'.
pub location: String,
pub limit: Option<u64>,
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions src/table/src/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ pub struct CopyTableRequest {
pub pattern: Option<String>,
pub direction: CopyDirection,
pub timestamp_range: Option<TimestampRange>,
pub limit: Option<u64>,
}

#[derive(Debug, Clone, Default)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ SELECT * FROM demo ORDER BY ts;
| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 |
+-------+------+--------+---------------------+

DELETE FROM demo;

Affected Rows: 1

COPY DATABASE public FROM '/tmp/demo/export/parquet_range/' LIMIT 2;

Error: 2000(InvalidSyntax), Invalid SQL, error: limit is not supported

DROP TABLE demo;

Affected Rows: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,8 @@ COPY DATABASE public FROM '/tmp/demo/export/parquet_range/';

SELECT * FROM demo ORDER BY ts;

DELETE FROM demo;

COPY DATABASE public FROM '/tmp/demo/export/parquet_range/' LIMIT 2;

DROP TABLE demo;
12 changes: 8 additions & 4 deletions tests/cases/standalone/common/copy/copy_from_fs_parquet.result
Original file line number Diff line number Diff line change
Expand Up @@ -93,22 +93,26 @@ select count(*) from without_limit_rows;
| 4 |
+----------+

CREATE TABLE with_limit_rows(host string, cpu double, memory double, ts timestamp time index);
CREATE TABLE with_limit_rows_segment(host string, cpu double, memory double, ts timestamp time index);

Affected Rows: 0

Copy with_limit_rows FROM '/tmp/demo/export/parquet_files/' WITH (MAX_INSERT_ROWS = 2);
Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT 2;

Affected Rows: 2

select count(*) from with_limit_rows;
select count(*) from with_limit_rows_segment;

+----------+
| COUNT(*) |
+----------+
| 2 |
+----------+

Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT hello;

Error: 2000(InvalidSyntax), Unexpected token while parsing SQL statement: Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT hello;, expected: 'the number of maximum rows', found: ;: sql parser error: Expected literal int, found: hello at Line: 1, Column 75

drop table demo;

Affected Rows: 0
Expand All @@ -133,7 +137,7 @@ drop table without_limit_rows;

Affected Rows: 0

drop table with_limit_rows;
drop table with_limit_rows_segment;

Affected Rows: 0

10 changes: 6 additions & 4 deletions tests/cases/standalone/common/copy/copy_from_fs_parquet.sql
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ Copy without_limit_rows FROM '/tmp/demo/export/parquet_files/';

select count(*) from without_limit_rows;

CREATE TABLE with_limit_rows(host string, cpu double, memory double, ts timestamp time index);
CREATE TABLE with_limit_rows_segment(host string, cpu double, memory double, ts timestamp time index);

Copy with_limit_rows FROM '/tmp/demo/export/parquet_files/' WITH (MAX_INSERT_ROWS = 2);
Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT 2;

select count(*) from with_limit_rows;
select count(*) from with_limit_rows_segment;

Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT hello;

drop table demo;

Expand All @@ -52,4 +54,4 @@ drop table with_pattern;

drop table without_limit_rows;

drop table with_limit_rows;
drop table with_limit_rows_segment;

0 comments on commit f93b5b1

Please sign in to comment.