Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: limit total rows copied in COPY TABLE FROM with LIMIT segment #3910

Merged
merged 18 commits into from
May 16, 2024
2 changes: 2 additions & 0 deletions src/operator/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<
connection,
with,
table_name,
limit,
..
} = match stmt {
CopyTable::To(arg) => arg,
Expand All @@ -328,6 +329,7 @@ fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<
pattern,
direction,
timestamp_range,
limit,
})
}

Expand Down
2 changes: 2 additions & 0 deletions src/operator/src/statement/copy_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ impl StatementExecutor {
pattern: None,
direction: CopyDirection::Export,
timestamp_range: req.time_range,
limit: None,
},
ctx.clone(),
)
Expand Down Expand Up @@ -155,6 +156,7 @@ impl StatementExecutor {
pattern: None,
direction: CopyDirection::Import,
timestamp_range: None,
limit: None,
};
debug!("Copy table, arg: {:?}", req);
match self.copy_table_from(req, ctx.clone()).await {
Expand Down
29 changes: 21 additions & 8 deletions src/operator/src/statement/copy_table_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ use crate::statement::StatementExecutor;
const DEFAULT_BATCH_SIZE: usize = 8192;
const DEFAULT_READ_BUFFER: usize = 256 * 1024;
const MAX_INSERT_ROWS: &str = "max_insert_rows";
const DEFAULT_MAX_INSERT_ROWS: usize = 1000;

enum FileMetadata {
Parquet {
Expand Down Expand Up @@ -379,11 +378,23 @@ impl StatementExecutor {

let mut rows_inserted = 0;
let mut insert_cost = 0;
let max_insert_rows = req
.with
.get(MAX_INSERT_ROWS)
.and_then(|val| val.parse::<usize>().ok())
.unwrap_or(DEFAULT_MAX_INSERT_ROWS);
let max_insert_rows = if let Some(num) = req.limit {
Some(num as usize)
} else {
match req.with.get(MAX_INSERT_ROWS) {
Some(num) => match num.parse::<usize>() {
Ok(num) => Some(num),
Err(_) => {
return Err(error::InvalidCopyParameterSnafu {
key: MAX_INSERT_ROWS,
value: num,
}
.build());
}
},
None => None,
}
};
for (compat_schema, file_schema_projection, projected_table_schema, file_metadata) in files
{
let mut stream = self
Expand Down Expand Up @@ -435,8 +446,10 @@ impl StatementExecutor {
insert_cost += cost;
}

if rows_inserted >= max_insert_rows {
return Ok(gen_insert_output(rows_inserted, insert_cost));
if let Some(max_insert_rows) = max_insert_rows {
if rows_inserted >= max_insert_rows {
return Ok(gen_insert_output(rows_inserted, insert_cost));
}
}
}

Expand Down
28 changes: 22 additions & 6 deletions src/sql/src/parsers/copy_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl<'a> ParserContext<'a> {
})?;

let req = if self.parser.parse_keyword(Keyword::TO) {
let (with, connection, location) = self.parse_copy_parameters()?;
let (with, connection, location, _) = self.parse_copy_parameters()?;
let argument = CopyDatabaseArgument {
database_name,
with: with.into(),
Expand All @@ -68,7 +68,7 @@ impl<'a> ParserContext<'a> {
self.parser
.expect_keyword(Keyword::FROM)
.context(error::SyntaxSnafu)?;
let (with, connection, location) = self.parse_copy_parameters()?;
let (with, connection, location, _) = self.parse_copy_parameters()?;
let argument = CopyDatabaseArgument {
database_name,
with: with.into(),
Expand All @@ -91,28 +91,30 @@ impl<'a> ParserContext<'a> {
let table_name = Self::canonicalize_object_name(raw_table_name);

if self.parser.parse_keyword(Keyword::TO) {
let (with, connection, location) = self.parse_copy_parameters()?;
let (with, connection, location, limit) = self.parse_copy_parameters()?;
Ok(CopyTable::To(CopyTableArgument {
table_name,
with: with.into(),
connection: connection.into(),
location,
limit,
}))
} else {
self.parser
.expect_keyword(Keyword::FROM)
.context(error::SyntaxSnafu)?;
let (with, connection, location) = self.parse_copy_parameters()?;
let (with, connection, location, limit) = self.parse_copy_parameters()?;
Ok(CopyTable::From(CopyTableArgument {
table_name,
with: with.into(),
connection: connection.into(),
location,
limit,
}))
}
}

fn parse_copy_parameters(&mut self) -> Result<(With, Connection, String)> {
fn parse_copy_parameters(&mut self) -> Result<(With, Connection, String, Option<u64>)> {
let location =
self.parser
.parse_literal_string()
Expand Down Expand Up @@ -142,7 +144,21 @@ impl<'a> ParserContext<'a> {
.map(parse_option_string)
.collect::<Result<Connection>>()?;

Ok((with, connection, location))
let limit = if self.parser.parse_keyword(Keyword::LIMIT) {
Some(
self.parser
.parse_literal_uint()
.with_context(|_| error::UnexpectedSnafu {
sql: self.sql,
expected: "maximum rows",
actual: self.peek_token_as_string(),
})?,
)
} else {
None
};

Ok((with, connection, location, limit))
}
}

Expand Down
1 change: 1 addition & 0 deletions src/sql/src/statements/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ pub struct CopyTableArgument {
pub connection: OptionMap,
/// Copy tbl [To|From] 'location'.
pub location: String,
pub limit: Option<u64>,
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions src/table/src/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ pub struct CopyTableRequest {
pub pattern: Option<String>,
pub direction: CopyDirection,
pub timestamp_range: Option<TimestampRange>,
pub limit: Option<u64>,
}

#[derive(Debug, Clone, Default)]
Expand Down
24 changes: 24 additions & 0 deletions tests/cases/standalone/common/copy/copy_from_fs_parquet.result
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,26 @@ select count(*) from with_limit_rows;
| 2 |
+----------+

CREATE TABLE with_limit_rows_segment(host string, cpu double, memory double, ts timestamp time index);

Affected Rows: 0

Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT 2;

Affected Rows: 2

select count(*) from with_limit_rows_segment;

+----------+
| COUNT(*) |
+----------+
| 2 |
+----------+

Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT -1;

Error: 2000(InvalidSyntax), Unexpected token while parsing SQL statement: Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT -1;, expected: 'maximum rows', found: 1: sql parser error: Expected literal int, found: - at Line: 1, Column 75

drop table demo;

Affected Rows: 0
Expand Down Expand Up @@ -137,3 +157,7 @@ drop table with_limit_rows;

Affected Rows: 0

drop table with_limit_rows_segment;

Affected Rows: 0

10 changes: 10 additions & 0 deletions tests/cases/standalone/common/copy/copy_from_fs_parquet.sql
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ Copy with_limit_rows FROM '/tmp/demo/export/parquet_files/' WITH (MAX_INSERT_ROW

select count(*) from with_limit_rows;

CREATE TABLE with_limit_rows_segment(host string, cpu double, memory double, ts timestamp time index);

Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT 2;

select count(*) from with_limit_rows_segment;

Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT -1;

drop table demo;

drop table demo_2;
Expand All @@ -53,3 +61,5 @@ drop table with_pattern;
drop table without_limit_rows;

drop table with_limit_rows;

drop table with_limit_rows_segment;