Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make COPY TO align with CREATE EXTERNAL TABLE #9604

Merged
merged 8 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-cli/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ impl SchemaProvider for DynamicFileSchemaProvider {
&state,
table_url.scheme(),
url,
state.default_table_options(),
&state.default_table_options(),
)
.await?;
state.runtime_env().register_object_store(url, store);
Expand Down
6 changes: 3 additions & 3 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ mod tests {
)
})?;
for location in locations {
let sql = format!("copy (values (1,2)) to '{}';", location);
let sql = format!("copy (values (1,2)) to '{}' STORED AS PARQUET;", location);
let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?;
for statement in statements {
//Should not fail
Expand All @@ -438,8 +438,8 @@ mod tests {
let location = "s3://bucket/path/file.parquet";

// Missing region, use object_store defaults
let sql = format!("COPY (values (1,2)) TO '{location}'
(format parquet, 'aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' '{secret_access_key}')");
let sql = format!("COPY (values (1,2)) TO '{location}' STORED AS PARQUET
OPTIONS ('aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' '{secret_access_key}')");
copy_to_table_test(location, &sql).await?;

Ok(())
Expand Down
221 changes: 166 additions & 55 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1109,87 +1109,171 @@ macro_rules! extensions_options {
}
}

/// Represents the configuration options available for handling different table formats within a data processing application.
/// This struct encompasses options for various file formats including CSV, Parquet, and JSON, allowing for flexible configuration
/// of parsing and writing behaviors specific to each format. Additionally, it supports extending functionality through custom extensions.
#[derive(Debug, Clone, Default)]
pub struct TableOptions {
/// Configuration options for CSV file handling. This includes settings like the delimiter,
/// quote character, and whether the first row is considered as headers.
pub csv: CsvOptions,

/// Configuration options for Parquet file handling. This includes settings for compression,
/// encoding, and other Parquet-specific file characteristics.
pub parquet: TableParquetOptions,

/// Configuration options for JSON file handling.
pub json: JsonOptions,

/// The current file format that the table operations should assume. This option allows
/// for dynamic switching between the supported file types (e.g., CSV, Parquet, JSON).
pub current_format: Option<FileType>,
/// Optional extensions registered using [`Extensions::insert`]

/// Optional extensions that can be used to extend or customize the behavior of the table
/// options. Extensions can be registered using `Extensions::insert` and might include
/// custom file handling logic, additional configuration parameters, or other enhancements.
pub extensions: Extensions,
}

impl ConfigField for TableOptions {
/// Visits configuration settings for the current file format, or all formats if none is selected.
///
/// This method adapts the behavior based on whether a file format is currently selected in `current_format`.
/// If a format is selected, it visits only the settings relevant to that format. Otherwise,
/// it visits all available format settings.
fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
self.csv.visit(v, "csv", "");
self.parquet.visit(v, "parquet", "");
self.json.visit(v, "json", "");
if let Some(file_type) = &self.current_format {
match file_type {
#[cfg(feature = "parquet")]
FileType::PARQUET => self.parquet.visit(v, "format", ""),
FileType::CSV => self.csv.visit(v, "format", ""),
FileType::JSON => self.json.visit(v, "format", ""),
_ => {}
}
} else {
self.csv.visit(v, "csv", "");
self.parquet.visit(v, "parquet", "");
self.json.visit(v, "json", "");
}
}

/// Sets a configuration value for a specific key within `TableOptions`.
///
/// This method delegates setting configuration values to the specific file format configurations,
/// based on the current format selected. If no format is selected, it returns an error.
///
/// # Parameters
///
/// * `key`: The configuration key specifying which setting to adjust, prefixed with the format (e.g., "format.delimiter")
/// for CSV format.
/// * `value`: The value to set for the specified configuration key.
///
/// # Returns
///
/// A result indicating success or an error if the key is not recognized, if a format is not specified,
/// or if setting the configuration value fails for the specific format.
fn set(&mut self, key: &str, value: &str) -> Result<()> {
// Extensions are handled in the public `ConfigOptions::set`
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
let Some(format) = &self.current_format else {
return _config_err!("Specify a format for TableOptions");
};
match key {
"csv" => self.csv.set(rem, value),
"parquet" => self.parquet.set(rem, value),
"json" => self.json.set(rem, value),
"format" => match format {
#[cfg(feature = "parquet")]
FileType::PARQUET => self.parquet.set(rem, value),
FileType::CSV => self.csv.set(rem, value),
FileType::JSON => self.json.set(rem, value),
_ => {
_config_err!("Config value \"{key}\" is not supported on {}", format)
}
},
_ => _config_err!("Config value \"{key}\" not found on TableOptions"),
}
}
}

impl TableOptions {
/// Creates a new [`ConfigOptions`] with default values
/// Constructs a new instance of `TableOptions` with default settings.
///
/// # Returns
///
/// A new `TableOptions` instance with default configuration values.
pub fn new() -> Self {
Self::default()
}

/// Sets the file format for the table.
///
/// # Parameters
///
/// * `format`: The file format to use (e.g., CSV, Parquet).
pub fn set_file_format(&mut self, format: FileType) {
self.current_format = Some(format);
}

/// Creates a new `TableOptions` instance initialized with settings from a given session config.
///
/// # Parameters
///
/// * `config`: A reference to the session `ConfigOptions` from which to derive initial settings.
///
/// # Returns
///
/// A new `TableOptions` instance with settings applied from the session config.
pub fn default_from_session_config(config: &ConfigOptions) -> Self {
let mut initial = TableOptions::default();
initial.parquet.global = config.execution.parquet.clone();
let initial = TableOptions::default();
initial.combine_with_session_config(config);
initial
}

/// Set extensions to provided value
/// Updates the current `TableOptions` with settings from a given session config.
///
/// # Parameters
///
/// * `config`: A reference to the session `ConfigOptions` whose settings are to be applied.
///
/// # Returns
///
/// A new `TableOptions` instance with updated settings from the session config.
pub fn combine_with_session_config(&self, config: &ConfigOptions) -> Self {
let mut clone = self.clone();
clone.parquet.global = config.execution.parquet.clone();
clone
}

/// Sets the extensions for this `TableOptions` instance.
///
/// # Parameters
///
/// * `extensions`: The `Extensions` instance to set.
///
/// # Returns
///
/// A new `TableOptions` instance with the specified extensions applied.
pub fn with_extensions(mut self, extensions: Extensions) -> Self {
self.extensions = extensions;
self
}

/// Set a configuration option
/// Sets a specific configuration option.
///
/// # Parameters
///
/// * `key`: The configuration key (e.g., "format.delimiter").
/// * `value`: The value to set for the specified key.
///
/// # Returns
///
/// A result indicating success or failure in setting the configuration option.
pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
let (prefix, _) = key.split_once('.').ok_or_else(|| {
DataFusionError::Configuration(format!(
"could not find config namespace for key \"{key}\""
))
})?;

if prefix == "csv" || prefix == "json" || prefix == "parquet" {
if let Some(format) = &self.current_format {
match format {
FileType::CSV if prefix != "csv" => {
return Err(DataFusionError::Configuration(format!(
"Key \"{key}\" is not applicable for CSV format"
)))
}
#[cfg(feature = "parquet")]
FileType::PARQUET if prefix != "parquet" => {
return Err(DataFusionError::Configuration(format!(
"Key \"{key}\" is not applicable for PARQUET format"
)))
}
FileType::JSON if prefix != "json" => {
return Err(DataFusionError::Configuration(format!(
"Key \"{key}\" is not applicable for JSON format"
)))
}
_ => {}
}
}
if prefix == "format" {
return ConfigField::set(self, key, value);
}

Expand All @@ -1202,6 +1286,15 @@ impl TableOptions {
e.0.set(key, value)
}

/// Initializes a new `TableOptions` from a hash map of string settings.
///
/// # Parameters
///
/// * `settings`: A hash map where each key-value pair represents a configuration setting.
///
/// # Returns
///
/// A result containing the new `TableOptions` instance or an error if any setting could not be applied.
pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
let mut ret = Self::default();
for (k, v) in settings {
Expand All @@ -1211,6 +1304,15 @@ impl TableOptions {
Ok(ret)
}

/// Modifies the current `TableOptions` instance with settings from a hash map.
///
/// # Parameters
///
/// * `settings`: A hash map where each key-value pair represents a configuration setting.
///
/// # Returns
///
/// A result indicating success or failure in applying the settings.
pub fn alter_with_string_hash_map(
&mut self,
settings: &HashMap<String, String>,
Expand All @@ -1221,7 +1323,11 @@ impl TableOptions {
Ok(())
}

/// Returns the [`ConfigEntry`] stored within this [`ConfigOptions`]
/// Retrieves all configuration entries from this `TableOptions`.
///
/// # Returns
///
/// A vector of `ConfigEntry` instances, representing all the configuration options within this `TableOptions`.
pub fn entries(&self) -> Vec<ConfigEntry> {
struct Visitor(Vec<ConfigEntry>);

Expand Down Expand Up @@ -1249,9 +1355,7 @@ impl TableOptions {
}

let mut v = Visitor(vec![]);
self.visit(&mut v, "csv", "");
self.visit(&mut v, "json", "");
self.visit(&mut v, "parquet", "");
self.visit(&mut v, "format", "");

v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
v.0
Expand Down Expand Up @@ -1556,6 +1660,7 @@ mod tests {
use crate::config::{
ConfigEntry, ConfigExtension, ExtensionOptions, Extensions, TableOptions,
};
use crate::FileType;

#[derive(Default, Debug, Clone)]
pub struct TestExtensionConfig {
Expand Down Expand Up @@ -1609,12 +1714,13 @@ mod tests {
}

#[test]
fn alter_kafka_config() {
fn alter_test_extension_config() {
let mut extension = Extensions::new();
extension.insert(TestExtensionConfig::default());
let mut table_config = TableOptions::new().with_extensions(extension);
table_config.set("parquet.write_batch_size", "10").unwrap();
assert_eq!(table_config.parquet.global.write_batch_size, 10);
table_config.set_file_format(FileType::CSV);
table_config.set("format.delimiter", ";").unwrap();
assert_eq!(table_config.csv.delimiter, b';');
table_config.set("test.bootstrap.servers", "asd").unwrap();
let kafka_config = table_config
.extensions
Expand All @@ -1626,38 +1732,43 @@ mod tests {
);
}

#[test]
fn csv_u8_table_options() {
let mut table_config = TableOptions::new();
table_config.set_file_format(FileType::CSV);
table_config.set("format.delimiter", ";").unwrap();
assert_eq!(table_config.csv.delimiter as char, ';');
table_config.set("format.escape", "\"").unwrap();
assert_eq!(table_config.csv.escape.unwrap() as char, '"');
table_config.set("format.escape", "\'").unwrap();
assert_eq!(table_config.csv.escape.unwrap() as char, '\'');
}

#[cfg(feature = "parquet")]
#[test]
fn parquet_table_options() {
let mut table_config = TableOptions::new();
table_config.set_file_format(FileType::PARQUET);
table_config
.set("parquet.bloom_filter_enabled::col1", "true")
.set("format.bloom_filter_enabled::col1", "true")
.unwrap();
assert_eq!(
table_config.parquet.column_specific_options["col1"].bloom_filter_enabled,
Some(true)
);
}

#[test]
fn csv_u8_table_options() {
let mut table_config = TableOptions::new();
table_config.set("csv.delimiter", ";").unwrap();
assert_eq!(table_config.csv.delimiter as char, ';');
table_config.set("csv.escape", "\"").unwrap();
assert_eq!(table_config.csv.escape.unwrap() as char, '"');
table_config.set("csv.escape", "\'").unwrap();
assert_eq!(table_config.csv.escape.unwrap() as char, '\'');
}

#[cfg(feature = "parquet")]
#[test]
fn parquet_table_options_config_entry() {
let mut table_config = TableOptions::new();
table_config.set_file_format(FileType::PARQUET);
table_config
.set("parquet.bloom_filter_enabled::col1", "true")
.set("format.bloom_filter_enabled::col1", "true")
.unwrap();
let entries = table_config.entries();
assert!(entries
.iter()
.any(|item| item.key == "parquet.bloom_filter_enabled::col1"))
.any(|item| item.key == "format.bloom_filter_enabled::col1"))
}
}
Loading
Loading