Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make COPY TO align with CREATE EXTERNAL TABLE #9604

Merged
merged 8 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ mod tests {
)
})?;
for location in locations {
let sql = format!("copy (values (1,2)) to '{}';", location);
let sql = format!("copy (values (1,2)) to '{}' STORED AS PARQUET;", location);
let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?;
for statement in statements {
//Should not fail
Expand All @@ -438,8 +438,8 @@ mod tests {
let location = "s3://bucket/path/file.parquet";

// Missing region, use object_store defaults
let sql = format!("COPY (values (1,2)) TO '{location}'
(format parquet, 'aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' '{secret_access_key}')");
let sql = format!("COPY (values (1,2)) TO '{location}' STORED AS PARQUET
OPTIONS ('aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' '{secret_access_key}')");
copy_to_table_test(location, &sql).await?;

Ok(())
Expand Down
83 changes: 25 additions & 58 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1131,10 +1131,21 @@ impl ConfigField for TableOptions {
fn set(&mut self, key: &str, value: &str) -> Result<()> {
// Extensions are handled in the public `ConfigOptions::set`
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
let format = if let Some(format) = &self.current_format {
format
} else {
return _config_err!("Specify a format for TableOptions");
};
match key {
"csv" => self.csv.set(rem, value),
"parquet" => self.parquet.set(rem, value),
"json" => self.json.set(rem, value),
"format" => match format {
#[cfg(feature = "parquet")]
FileType::PARQUET => self.parquet.set(rem, value),
FileType::CSV => self.csv.set(rem, value),
FileType::JSON => self.json.set(rem, value),
_ => {
_config_err!("Config value \"{key}\" is not supported on {}", format)
}
},
_ => _config_err!("Config value \"{key}\" not found on TableOptions"),
}
}
Expand Down Expand Up @@ -1170,28 +1181,7 @@ impl TableOptions {
))
})?;

if prefix == "csv" || prefix == "json" || prefix == "parquet" {
if let Some(format) = &self.current_format {
match format {
FileType::CSV if prefix != "csv" => {
return Err(DataFusionError::Configuration(format!(
"Key \"{key}\" is not applicable for CSV format"
)))
}
#[cfg(feature = "parquet")]
FileType::PARQUET if prefix != "parquet" => {
return Err(DataFusionError::Configuration(format!(
"Key \"{key}\" is not applicable for PARQUET format"
)))
}
FileType::JSON if prefix != "json" => {
return Err(DataFusionError::Configuration(format!(
"Key \"{key}\" is not applicable for JSON format"
)))
}
_ => {}
}
}
if prefix == "format" {
return ConfigField::set(self, key, value);
}

Expand Down Expand Up @@ -1251,9 +1241,7 @@ impl TableOptions {
}

let mut v = Visitor(vec![]);
self.visit(&mut v, "csv", "");
self.visit(&mut v, "json", "");
self.visit(&mut v, "parquet", "");
self.visit(&mut v, "format", "");

v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
v.0
Expand Down Expand Up @@ -1558,6 +1546,7 @@ mod tests {
use crate::config::{
ConfigEntry, ConfigExtension, ExtensionOptions, Extensions, TableOptions,
};
use crate::FileType;

#[derive(Default, Debug, Clone)]
pub struct TestExtensionConfig {
Expand Down Expand Up @@ -1611,12 +1600,13 @@ mod tests {
}

#[test]
fn alter_kafka_config() {
fn alter_test_extension_config() {
let mut extension = Extensions::new();
extension.insert(TestExtensionConfig::default());
let mut table_config = TableOptions::new().with_extensions(extension);
table_config.set("parquet.write_batch_size", "10").unwrap();
assert_eq!(table_config.parquet.global.write_batch_size, 10);
table_config.set_file_format(FileType::CSV);
table_config.set("format.delimiter", ";").unwrap();
assert_eq!(table_config.csv.delimiter, b';');
table_config.set("test.bootstrap.servers", "asd").unwrap();
let kafka_config = table_config
.extensions
Expand All @@ -1628,38 +1618,15 @@ mod tests {
);
}

#[test]
fn parquet_table_options() {
let mut table_config = TableOptions::new();
table_config
.set("parquet.bloom_filter_enabled::col1", "true")
.unwrap();
assert_eq!(
table_config.parquet.column_specific_options["col1"].bloom_filter_enabled,
Some(true)
);
}

metesynnada marked this conversation as resolved.
Show resolved Hide resolved
#[test]
fn csv_u8_table_options() {
let mut table_config = TableOptions::new();
table_config.set("csv.delimiter", ";").unwrap();
table_config.set_file_format(FileType::CSV);
table_config.set("format.delimiter", ";").unwrap();
assert_eq!(table_config.csv.delimiter as char, ';');
table_config.set("csv.escape", "\"").unwrap();
table_config.set("format.escape", "\"").unwrap();
assert_eq!(table_config.csv.escape.unwrap() as char, '"');
table_config.set("csv.escape", "\'").unwrap();
table_config.set("format.escape", "\'").unwrap();
assert_eq!(table_config.csv.escape.unwrap() as char, '\'');
}

#[test]
fn parquet_table_options_config_entry() {
let mut table_config = TableOptions::new();
table_config
.set("parquet.bloom_filter_enabled::col1", "true")
.unwrap();
let entries = table_config.entries();
assert!(entries
.iter()
.any(|item| item.key == "parquet.bloom_filter_enabled::col1"))
}
metesynnada marked this conversation as resolved.
Show resolved Hide resolved
}
85 changes: 43 additions & 42 deletions datafusion/common/src/file_options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ mod tests {
config::TableOptions,
file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
parsers::CompressionTypeVariant,
Result,
FileType, Result,
};

use parquet::{
Expand All @@ -47,35 +47,36 @@ mod tests {
#[test]
fn test_writeroptions_parquet_from_statement_options() -> Result<()> {
let mut option_map: HashMap<String, String> = HashMap::new();
option_map.insert("parquet.max_row_group_size".to_owned(), "123".to_owned());
option_map.insert("parquet.data_pagesize_limit".to_owned(), "123".to_owned());
option_map.insert("parquet.write_batch_size".to_owned(), "123".to_owned());
option_map.insert("parquet.writer_version".to_owned(), "2.0".to_owned());
option_map.insert("format.max_row_group_size".to_owned(), "123".to_owned());
option_map.insert("format.data_pagesize_limit".to_owned(), "123".to_owned());
option_map.insert("format.write_batch_size".to_owned(), "123".to_owned());
option_map.insert("format.writer_version".to_owned(), "2.0".to_owned());
option_map.insert(
"parquet.dictionary_page_size_limit".to_owned(),
"format.dictionary_page_size_limit".to_owned(),
"123".to_owned(),
);
option_map.insert(
"parquet.created_by".to_owned(),
"format.created_by".to_owned(),
"df write unit test".to_owned(),
);
option_map.insert(
"parquet.column_index_truncate_length".to_owned(),
"format.column_index_truncate_length".to_owned(),
"123".to_owned(),
);
option_map.insert(
"parquet.data_page_row_count_limit".to_owned(),
"format.data_page_row_count_limit".to_owned(),
"123".to_owned(),
);
option_map.insert("parquet.bloom_filter_enabled".to_owned(), "true".to_owned());
option_map.insert("parquet.encoding".to_owned(), "plain".to_owned());
option_map.insert("parquet.dictionary_enabled".to_owned(), "true".to_owned());
option_map.insert("parquet.compression".to_owned(), "zstd(4)".to_owned());
option_map.insert("parquet.statistics_enabled".to_owned(), "page".to_owned());
option_map.insert("parquet.bloom_filter_fpp".to_owned(), "0.123".to_owned());
option_map.insert("parquet.bloom_filter_ndv".to_owned(), "123".to_owned());
option_map.insert("format.bloom_filter_enabled".to_owned(), "true".to_owned());
option_map.insert("format.encoding".to_owned(), "plain".to_owned());
option_map.insert("format.dictionary_enabled".to_owned(), "true".to_owned());
option_map.insert("format.compression".to_owned(), "zstd(4)".to_owned());
option_map.insert("format.statistics_enabled".to_owned(), "page".to_owned());
option_map.insert("format.bloom_filter_fpp".to_owned(), "0.123".to_owned());
option_map.insert("format.bloom_filter_ndv".to_owned(), "123".to_owned());

let mut table_config = TableOptions::new();
table_config.set_file_format(FileType::PARQUET);
table_config.alter_with_string_hash_map(&option_map)?;

let parquet_options = ParquetWriterOptions::try_from(&table_config.parquet)?;
Expand Down Expand Up @@ -131,54 +132,52 @@ mod tests {
let mut option_map: HashMap<String, String> = HashMap::new();

option_map.insert(
"parquet.bloom_filter_enabled::col1".to_owned(),
"format.bloom_filter_enabled::col1".to_owned(),
"true".to_owned(),
);
option_map.insert(
"parquet.bloom_filter_enabled::col2.nested".to_owned(),
"format.bloom_filter_enabled::col2.nested".to_owned(),
"true".to_owned(),
);
option_map.insert("parquet.encoding::col1".to_owned(), "plain".to_owned());
option_map.insert("parquet.encoding::col2.nested".to_owned(), "rle".to_owned());
option_map.insert("format.encoding::col1".to_owned(), "plain".to_owned());
option_map.insert("format.encoding::col2.nested".to_owned(), "rle".to_owned());
option_map.insert(
"parquet.dictionary_enabled::col1".to_owned(),
"format.dictionary_enabled::col1".to_owned(),
"true".to_owned(),
);
option_map.insert(
"parquet.dictionary_enabled::col2.nested".to_owned(),
"format.dictionary_enabled::col2.nested".to_owned(),
"true".to_owned(),
);
option_map.insert("parquet.compression::col1".to_owned(), "zstd(4)".to_owned());
option_map.insert("format.compression::col1".to_owned(), "zstd(4)".to_owned());
option_map.insert(
"parquet.compression::col2.nested".to_owned(),
"format.compression::col2.nested".to_owned(),
"zstd(10)".to_owned(),
);
option_map.insert(
"parquet.statistics_enabled::col1".to_owned(),
"format.statistics_enabled::col1".to_owned(),
"page".to_owned(),
);
option_map.insert(
"parquet.statistics_enabled::col2.nested".to_owned(),
"format.statistics_enabled::col2.nested".to_owned(),
"none".to_owned(),
);
option_map.insert(
"parquet.bloom_filter_fpp::col1".to_owned(),
"format.bloom_filter_fpp::col1".to_owned(),
"0.123".to_owned(),
);
option_map.insert(
"parquet.bloom_filter_fpp::col2.nested".to_owned(),
"format.bloom_filter_fpp::col2.nested".to_owned(),
"0.456".to_owned(),
);
option_map.insert("format.bloom_filter_ndv::col1".to_owned(), "123".to_owned());
option_map.insert(
"parquet.bloom_filter_ndv::col1".to_owned(),
"123".to_owned(),
);
option_map.insert(
"parquet.bloom_filter_ndv::col2.nested".to_owned(),
"format.bloom_filter_ndv::col2.nested".to_owned(),
"456".to_owned(),
);

let mut table_config = TableOptions::new();
table_config.set_file_format(FileType::PARQUET);
table_config.alter_with_string_hash_map(&option_map)?;

let parquet_options = ParquetWriterOptions::try_from(&table_config.parquet)?;
Expand Down Expand Up @@ -271,16 +270,17 @@ mod tests {
// for StatementOptions
fn test_writeroptions_csv_from_statement_options() -> Result<()> {
let mut option_map: HashMap<String, String> = HashMap::new();
option_map.insert("csv.has_header".to_owned(), "true".to_owned());
option_map.insert("csv.date_format".to_owned(), "123".to_owned());
option_map.insert("csv.datetime_format".to_owned(), "123".to_owned());
option_map.insert("csv.timestamp_format".to_owned(), "2.0".to_owned());
option_map.insert("csv.time_format".to_owned(), "123".to_owned());
option_map.insert("csv.null_value".to_owned(), "123".to_owned());
option_map.insert("csv.compression".to_owned(), "gzip".to_owned());
option_map.insert("csv.delimiter".to_owned(), ";".to_owned());
option_map.insert("format.has_header".to_owned(), "true".to_owned());
option_map.insert("format.date_format".to_owned(), "123".to_owned());
option_map.insert("format.datetime_format".to_owned(), "123".to_owned());
option_map.insert("format.timestamp_format".to_owned(), "2.0".to_owned());
option_map.insert("format.time_format".to_owned(), "123".to_owned());
option_map.insert("format.null_value".to_owned(), "123".to_owned());
option_map.insert("format.compression".to_owned(), "gzip".to_owned());
option_map.insert("format.delimiter".to_owned(), ";".to_owned());

let mut table_config = TableOptions::new();
table_config.set_file_format(FileType::CSV);
table_config.alter_with_string_hash_map(&option_map)?;

let csv_options = CsvWriterOptions::try_from(&table_config.csv)?;
Expand All @@ -299,9 +299,10 @@ mod tests {
// for StatementOptions
fn test_writeroptions_json_from_statement_options() -> Result<()> {
let mut option_map: HashMap<String, String> = HashMap::new();
option_map.insert("json.compression".to_owned(), "gzip".to_owned());
option_map.insert("format.compression".to_owned(), "gzip".to_owned());

let mut table_config = TableOptions::new();
table_config.set_file_format(FileType::JSON);
table_config.alter_with_string_hash_map(&option_map)?;

let json_options = JsonWriterOptions::try_from(&table_config.json)?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ mod tests {
let name = OwnedTableReference::bare("foo".to_string());

let mut options = HashMap::new();
options.insert("csv.schema_infer_max_rec".to_owned(), "1000".to_owned());
options.insert("format.schema_infer_max_rec".to_owned(), "1000".to_owned());
let cmd = CreateExternalTable {
name,
location: csv_file.path().to_str().unwrap().to_string(),
Expand Down
6 changes: 1 addition & 5 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1785,11 +1785,7 @@ impl SessionState {
.0
.insert(ObjectName(vec![Ident::from(table.name.as_str())]));
}
DFStatement::CopyTo(CopyToStatement {
source,
target: _,
options: _,
}) => match source {
DFStatement::CopyTo(CopyToStatement { source, .. }) => match source {
CopyToSource::Relation(table_name) => {
visitor.insert(table_name);
}
Expand Down
5 changes: 4 additions & 1 deletion datafusion/core/tests/sql/sql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ async fn unsupported_copy_returns_error() {

let options = SQLOptions::new().with_allow_dml(false);

let sql = format!("copy (values(1)) to '{}'", tmpfile.to_string_lossy());
let sql = format!(
"copy (values(1)) to '{}' STORED AS parquet",
tmpfile.to_string_lossy()
);
let df = ctx.sql_with_options(&sql, options).await;
assert_eq!(
df.unwrap_err().strip_backtrace(),
Expand Down
9 changes: 4 additions & 5 deletions datafusion/proto/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use datafusion_common::config::{FormatOptions, TableOptions};
use datafusion_common::scalar::ScalarStructBuilder;
use datafusion_common::{
internal_err, not_impl_err, plan_err, DFField, DFSchema, DFSchemaRef,
DataFusionError, Result, ScalarValue,
DataFusionError, FileType, Result, ScalarValue,
};
use datafusion_expr::dml::CopyTo;
use datafusion_expr::expr::{
Expand Down Expand Up @@ -314,10 +314,9 @@ async fn roundtrip_logical_plan_copy_to_sql_options() -> Result<()> {
let ctx = SessionContext::new();

let input = create_csv_scan(&ctx).await?;

let mut table_options =
TableOptions::default_from_session_config(ctx.state().config_options());
table_options.set("csv.delimiter", ";")?;
let mut table_options = ctx.state().default_table_options().clone();
table_options.set_file_format(FileType::CSV);
table_options.set("format.delimiter", ";")?;

let plan = LogicalPlan::Copy(CopyTo {
input: Arc::new(input),
Expand Down
Loading
Loading