From 7edff9638e70344252ce753932fcd867a228faba Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 31 Oct 2023 12:03:41 +0000 Subject: [PATCH] Don't canonicalize ListingTableUrl (#7994) --- datafusion-cli/src/exec.rs | 10 +- .../core/src/datasource/listing/table.rs | 9 +- datafusion/core/src/datasource/listing/url.rs | 137 ++++++++++++------ .../src/datasource/listing_table_factory.rs | 11 +- datafusion/core/src/physical_planner.rs | 6 +- .../test_files/insert_to_external.slt | 34 ++--- docs/source/user-guide/sql/write_options.md | 12 +- 7 files changed, 115 insertions(+), 104 deletions(-) diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index b62ad12dbfbb..4861d6b1ddc3 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -387,15 +387,7 @@ mod tests { // Ensure that local files are also registered let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET LOCATION '{location}'"); - let err = create_external_table_test(location, &sql) - .await - .unwrap_err(); - - if let DataFusionError::IoError(e) = err { - assert_eq!(e.kind(), std::io::ErrorKind::NotFound); - } else { - return Err(err); - } + create_external_table_test(location, &sql).await.unwrap(); Ok(()) } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index d26d417bd8b2..0e62666bf7b4 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -855,14 +855,17 @@ impl TableProvider for ListingTable { let input_partitions = input.output_partitioning().partition_count(); let writer_mode = match self.options.insert_mode { ListingTableInsertMode::AppendToFile => { - if input_partitions > file_groups.len() { + if file_groups.is_empty() && self.options.single_file { + // This is a hack, longer term append should be split out (#7994) + crate::datasource::file_format::write::FileWriterMode::PutMultipart + } else if input_partitions > file_groups.len() { return plan_err!( "Cannot append {input_partitions} partitions to {} files!", file_groups.len() ); + } else { + crate::datasource::file_format::write::FileWriterMode::Append } - - crate::datasource::file_format::write::FileWriterMode::Append } ListingTableInsertMode::AppendNewFiles => { crate::datasource::file_format::write::FileWriterMode::PutMultipart diff --git a/datafusion/core/src/datasource/listing/url.rs b/datafusion/core/src/datasource/listing/url.rs index 4d1ca4853a73..7514fd2ad43c 100644 --- a/datafusion/core/src/datasource/listing/url.rs +++ b/datafusion/core/src/datasource/listing/url.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::fs; +use std::path::{Component, PathBuf}; use crate::datasource::object_store::ObjectStoreUrl; use crate::execution::context::SessionState; @@ -46,22 +46,35 @@ pub struct ListingTableUrl { impl ListingTableUrl { /// Parse a provided string as a `ListingTableUrl` /// + /// A URL can either refer to a single object, or a collection of objects with a + /// common prefix, with the presence of a trailing `/` indicating a collection. + /// + /// For example, `file:///foo.txt` refers to the file at `/foo.txt`, whereas + /// `file:///foo/` refers to all the files under the directory `/foo` and its + /// subdirectories. + /// + /// Similarly `s3://BUCKET/blob.csv` refers to `blob.csv` in the S3 bucket `BUCKET`, + /// wherease `s3://BUCKET/foo/` refers to all objects with the prefix `foo/` in the + /// S3 bucket `BUCKET` + /// /// # Paths without a Scheme /// /// If no scheme is provided, or the string is an absolute filesystem path - /// as determined [`std::path::Path::is_absolute`], the string will be + /// as determined by [`std::path::Path::is_absolute`], the string will be /// interpreted as a path on the local filesystem using the operating /// system's standard path delimiter, i.e. `\` on Windows, `/` on Unix. /// /// If the path contains any of `'?', '*', '['`, it will be considered /// a glob expression and resolved as described in the section below. /// - /// Otherwise, the path will be resolved to an absolute path, returning - /// an error if it does not exist, and converted to a [file URI] + /// Otherwise, the path will be resolved to an absolute path based on the current + /// working directory, and converted to a [file URI]. /// - /// If you wish to specify a path that does not exist on the local - /// machine you must provide it as a fully-qualified [file URI] - /// e.g. `file:///myfile.txt` + /// If the path already exists in the local filesystem this will be used to determine if this + /// [`ListingTableUrl`] refers to a collection or a single object, otherwise the presence + /// of a trailing path delimiter will be used to indicate a directory. For the avoidance + /// of ambiguity it is recommended users always include trailing `/` when intending to + /// refer to a directory. /// /// ## Glob File Paths /// @@ -69,14 +82,13 @@ impl ListingTableUrl { /// be resolved as follows. /// /// The string up to the first path segment containing a glob expression will be extracted, - /// and resolved in the same manner as a normal scheme-less path. That is, resolved to - /// an absolute path on the local filesystem, returning an error if it does not exist, - /// and converted to a [file URI] + /// and resolved in the same manner as a normal scheme-less path above. /// /// The remaining string will be interpreted as a [`glob::Pattern`] and used as a /// filter when listing files from object storage /// /// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme + /// [URL]: https://url.spec.whatwg.org/ pub fn parse(s: impl AsRef) -> Result { let s = s.as_ref(); @@ -92,32 +104,6 @@ impl ListingTableUrl { } } - /// Get object store for specified input_url - /// if input_url is actually not a url, we assume it is a local file path - /// if we have a local path, create it if not exists so ListingTableUrl::parse works - pub fn parse_create_local_if_not_exists( - s: impl AsRef, - is_directory: bool, - ) -> Result { - let s = s.as_ref(); - let is_valid_url = Url::parse(s).is_ok(); - - match is_valid_url { - true => ListingTableUrl::parse(s), - false => { - let path = std::path::PathBuf::from(s); - if !path.exists() { - if is_directory { - fs::create_dir_all(path)?; - } else { - fs::File::create(path)?; - } - } - ListingTableUrl::parse(s) - } - } - } - /// Creates a new [`ListingTableUrl`] interpreting `s` as a filesystem path fn parse_path(s: &str) -> Result { let (prefix, glob) = match split_glob_expression(s) { @@ -129,15 +115,9 @@ impl ListingTableUrl { None => (s, None), }; - let path = std::path::Path::new(prefix).canonicalize()?; - let url = if path.is_dir() { - Url::from_directory_path(path) - } else { - Url::from_file_path(path) - } - .map_err(|_| DataFusionError::Internal(format!("Can not open path: {s}")))?; - // TODO: Currently we do not have an IO-related error variant that accepts () - // or a string. Once we have such a variant, change the error type above. + let url = url_from_path(prefix).ok_or_else(|| { + DataFusionError::Internal(format!("Can not open path: {s}")) + })?; Ok(Self::new(url, glob)) } @@ -214,7 +194,12 @@ impl ListingTableUrl { } } }, - false => futures::stream::once(store.head(&self.prefix)).boxed(), + false => futures::stream::once(store.head(&self.prefix)) + .filter(|r| { + let p = !matches!(r, Err(object_store::Error::NotFound { .. })); + futures::future::ready(p) + }) + .boxed(), }; Ok(list .try_filter(move |meta| { @@ -257,6 +242,45 @@ impl std::fmt::Display for ListingTableUrl { } } +fn url_from_path(s: &str) -> Option { + let path = std::path::Path::new(s); + let is_dir = match path.exists() { + true => path.is_dir(), + // Fallback to inferring from trailing separator + false => std::path::is_separator(s.chars().last()?), + }; + + let p = match path.is_absolute() { + true => resolve_path(path)?, + false => { + let absolute = std::env::current_dir().ok()?.join(path); + resolve_path(&absolute)? + } + }; + + match is_dir { + true => Url::from_directory_path(p).ok(), + false => Url::from_file_path(p).ok(), + } +} + +fn resolve_path(path: &std::path::Path) -> Option { + let mut base = PathBuf::with_capacity(path.as_os_str().len()); + for component in path.components() { + match component { + Component::Prefix(_) | Component::RootDir => base.push(component.as_os_str()), + Component::Normal(p) => base.push(p), + Component::CurDir => {} // Do nothing + Component::ParentDir => { + if !base.pop() { + return None; + } + } + } + } + Some(base) +} + const GLOB_START_CHARS: [char; 3] = ['?', '*', '[']; /// Splits `path` at the first path segment containing a glob expression, returning @@ -368,4 +392,25 @@ mod tests { Some(("/a/b/c//", "alltypes_plain*.parquet")), ); } + + #[test] + fn test_resolve_path() { + let r = resolve_path("/foo/bar/../baz.txt".as_ref()).unwrap(); + assert_eq!(r.to_str().unwrap(), "/foo/baz.txt"); + + let r = resolve_path("/foo/bar/./baz.txt".as_ref()).unwrap(); + assert_eq!(r.to_str().unwrap(), "/foo/bar/baz.txt"); + + let r = resolve_path("/foo/bar/../../../baz.txt".as_ref()); + assert_eq!(r, None); + } + + #[test] + fn test_url_from_path() { + let url = url_from_path("foo/bar").unwrap(); + assert!(url.path().ends_with("foo/bar")); + + let url = url_from_path("foo/bar/").unwrap(); + assert!(url.path().ends_with("foo/bar/")); + } } diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 26f40518979a..16fa1b9b5ffd 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -147,9 +147,6 @@ impl TableProviderFactory for ListingTableFactory { .unwrap_or(false) }; - let create_local_path = statement_options - .take_bool_option("create_local_path")? - .unwrap_or(false); let single_file = statement_options .take_bool_option("single_file")? .unwrap_or(false); @@ -205,13 +202,7 @@ impl TableProviderFactory for ListingTableFactory { FileType::AVRO => file_type_writer_options, }; - let table_path = match create_local_path { - true => ListingTableUrl::parse_create_local_if_not_exists( - &cmd.location, - !single_file, - ), - false => ListingTableUrl::parse(&cmd.location), - }?; + let table_path = ListingTableUrl::parse(&cmd.location)?; let options = ListingOptions::new(file_format) .with_collect_stat(state.config().collect_statistics()) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index f941e88f3a36..aa43879dfcb0 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -565,11 +565,7 @@ impl DefaultPhysicalPlanner { copy_options, }) => { let input_exec = self.create_initial_plan(input, session_state).await?; - - // TODO: make this behavior configurable via options (should copy to create path/file as needed?) - // TODO: add additional configurable options for if existing files should be overwritten or - // appended to - let parsed_url = ListingTableUrl::parse_create_local_if_not_exists(output_url, !*single_file_output)?; + let parsed_url = ListingTableUrl::parse(output_url)?; let object_store_url = parsed_url.object_store(); let schema: Schema = (**input.schema()).clone().into(); diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt b/datafusion/sqllogictest/test_files/insert_to_external.slt index 8b01a14568e7..a9cf97136e02 100644 --- a/datafusion/sqllogictest/test_files/insert_to_external.slt +++ b/datafusion/sqllogictest/test_files/insert_to_external.slt @@ -57,10 +57,9 @@ CREATE EXTERNAL TABLE dictionary_encoded_parquet_partitioned( b varchar, ) STORED AS parquet -LOCATION 'test_files/scratch/insert_to_external/parquet_types_partitioned' +LOCATION 'test_files/scratch/insert_to_external/parquet_types_partitioned/' PARTITIONED BY (b) OPTIONS( -create_local_path 'true', insert_mode 'append_new_files', ); @@ -88,7 +87,6 @@ STORED AS csv LOCATION 'test_files/scratch/insert_to_external/insert_to_ordered/' WITH ORDER (a ASC, B DESC) OPTIONS( -create_local_path 'true', insert_mode 'append_new_files', ); @@ -130,9 +128,8 @@ CREATE EXTERNAL TABLE partitioned_insert_test(a string, b string, c bigint) STORED AS csv LOCATION 'test_files/scratch/insert_to_external/insert_to_partitioned/' -PARTITIONED BY (a, b) +PARTITIONED BY (a, b) OPTIONS( -create_local_path 'true', insert_mode 'append_new_files', ); @@ -172,9 +169,8 @@ CREATE EXTERNAL TABLE partitioned_insert_test_json(a string, b string) STORED AS json LOCATION 'test_files/scratch/insert_to_external/insert_to_partitioned_json/' -PARTITIONED BY (a) +PARTITIONED BY (a) OPTIONS( -create_local_path 'true', insert_mode 'append_new_files', ); @@ -207,9 +203,8 @@ CREATE EXTERNAL TABLE partitioned_insert_test_pq(a string, b bigint) STORED AS parquet LOCATION 'test_files/scratch/insert_to_external/insert_to_partitioned_pq/' -PARTITIONED BY (a) +PARTITIONED BY (a) OPTIONS( -create_local_path 'true', insert_mode 'append_new_files', ); @@ -250,7 +245,6 @@ single_file_test(a bigint, b bigint) STORED AS csv LOCATION 'test_files/scratch/insert_to_external/single_csv_table.csv' OPTIONS( -create_local_path 'true', single_file 'true', ); @@ -276,10 +270,7 @@ statement ok CREATE EXTERNAL TABLE directory_test(a bigint, b bigint) STORED AS parquet -LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q0' -OPTIONS( -create_local_path 'true', -); +LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q0/'; query II INSERT INTO directory_test values (1, 2), (3, 4); @@ -296,8 +287,7 @@ statement ok CREATE EXTERNAL TABLE table_without_values(field1 BIGINT NULL, field2 BIGINT NULL) STORED AS parquet -LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q1' -OPTIONS (create_local_path 'true'); +LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q1/'; query TT EXPLAIN @@ -362,8 +352,7 @@ statement ok CREATE EXTERNAL TABLE table_without_values(field1 BIGINT NULL, field2 BIGINT NULL) STORED AS parquet -LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q2' -OPTIONS (create_local_path 'true'); +LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q2/'; query TT EXPLAIN @@ -407,8 +396,7 @@ statement ok CREATE EXTERNAL TABLE table_without_values(c1 varchar NULL) STORED AS parquet -LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q3' -OPTIONS (create_local_path 'true'); +LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q3/'; # verify that the sort order of the insert query is maintained into the # insert (there should be a SortExec in the following plan) @@ -447,8 +435,7 @@ statement ok CREATE EXTERNAL TABLE table_without_values(id BIGINT, name varchar) STORED AS parquet -LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q4' -OPTIONS (create_local_path 'true'); +LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q4/'; query IT insert into table_without_values(id, name) values(1, 'foo'); @@ -486,8 +473,7 @@ statement ok CREATE EXTERNAL TABLE table_without_values(field1 BIGINT NOT NULL, field2 BIGINT NULL) STORED AS parquet -LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q5' -OPTIONS (create_local_path 'true'); +LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q5/'; query II insert into table_without_values values(1, 100); diff --git a/docs/source/user-guide/sql/write_options.md b/docs/source/user-guide/sql/write_options.md index 941484e84efd..cf6407e7b708 100644 --- a/docs/source/user-guide/sql/write_options.md +++ b/docs/source/user-guide/sql/write_options.md @@ -42,12 +42,11 @@ WITH HEADER ROW DELIMITER ';' LOCATION '/test/location/my_csv_table/' OPTIONS( -CREATE_LOCAL_PATH 'true', NULL_VALUE 'NAN' ); ``` -When running `INSERT INTO my_table ...`, the options from the `CREATE TABLE` will be respected (gzip compression, special delimiter, and header row included). Note that compression, header, and delimiter settings can also be specified within the `OPTIONS` tuple list. Dedicated syntax within the SQL statement always takes precedence over arbitrary option tuples, so if both are specified the `OPTIONS` setting will be ignored. CREATE_LOCAL_PATH is a special option that indicates if DataFusion should create local file paths when writing new files if they do not already exist. This option is useful if you wish to create an external table from scratch, using only DataFusion SQL statements. Finally, NULL_VALUE is a CSV format specific option that determines how null values should be encoded within the CSV file. +When running `INSERT INTO my_table ...`, the options from the `CREATE TABLE` will be respected (gzip compression, special delimiter, and header row included). Note that compression, header, and delimiter settings can also be specified within the `OPTIONS` tuple list. Dedicated syntax within the SQL statement always takes precedence over arbitrary option tuples, so if both are specified the `OPTIONS` setting will be ignored. Finally, NULL_VALUE is a CSV format specific option that determines how null values should be encoded within the CSV file. Finally, options can be passed when running a `COPY` command. @@ -78,11 +77,10 @@ The following special options are specific to the `COPY` command. The following special options are specific to creating an external table. -| Option | Description | Default Value | -| ----------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | ---------------------------------------------------------------------------- | -| SINGLE_FILE | If true, indicates that this external table is backed by a single file. INSERT INTO queries will append to this file. | false | -| CREATE_LOCAL_PATH | If true, the folder or file backing this table will be created on the local file system if it does not already exist when running INSERT INTO queries. | false | -| INSERT_MODE | Determines if INSERT INTO queries should append to existing files or append new files to an existing directory. Valid values are append_to_file, append_new_files, and error. Note that "error" will block inserting data into this table. | CSV and JSON default to append_to_file. Parquet defaults to append_new_files | +| Option | Description | Default Value | +| ----------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | ---------------------------------------------------------------------------- | +| SINGLE_FILE | If true, indicates that this external table is backed by a single file. INSERT INTO queries will append to this file. | false | +| INSERT_MODE | Determines if INSERT INTO queries should append to existing files or append new files to an existing directory. Valid values are append_to_file, append_new_files, and error. Note that "error" will block inserting data into this table. | CSV and JSON default to append_to_file. Parquet defaults to append_new_files | ### JSON Format Specific Options