Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Defer ListingTable Path Creation To Write #8007

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,15 +387,7 @@ mod tests {
// Ensure that local files are also registered
let sql =
format!("CREATE EXTERNAL TABLE test STORED AS PARQUET LOCATION '{location}'");
let err = create_external_table_test(location, &sql)
.await
.unwrap_err();

if let DataFusionError::IoError(e) = err {
assert_eq!(e.kind(), std::io::ErrorKind::NotFound);
} else {
return Err(err);
}
create_external_table_test(location, &sql).await.unwrap();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This no longer returns an error, making it consistent with the other stores


Ok(())
}
Expand Down
9 changes: 6 additions & 3 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -855,14 +855,17 @@ impl TableProvider for ListingTable {
let input_partitions = input.output_partitioning().partition_count();
let writer_mode = match self.options.insert_mode {
ListingTableInsertMode::AppendToFile => {
if input_partitions > file_groups.len() {
if file_groups.is_empty() && self.options.single_file {
// This is a hack, longer term append should be split out (#7994)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this we get an error if this is the first write as there are no file_groups. Longer-term we shouldn't be conflating ListingTable IO and that of streaming use-cases

crate::datasource::file_format::write::FileWriterMode::PutMultipart
} else if input_partitions > file_groups.len() {
return plan_err!(
"Cannot append {input_partitions} partitions to {} files!",
file_groups.len()
);
} else {
crate::datasource::file_format::write::FileWriterMode::Append
}

crate::datasource::file_format::write::FileWriterMode::Append
}
ListingTableInsertMode::AppendNewFiles => {
crate::datasource::file_format::write::FileWriterMode::PutMultipart
Expand Down
137 changes: 91 additions & 46 deletions datafusion/core/src/datasource/listing/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::fs;
use std::path::{Component, PathBuf};

use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::context::SessionState;
Expand Down Expand Up @@ -46,37 +46,49 @@ pub struct ListingTableUrl {
impl ListingTableUrl {
/// Parse a provided string as a `ListingTableUrl`
///
/// A URL can either refer to a single object, or a collection of objects with a
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't changing the pre-existing behaviour but attempting to better document it

/// common prefix, with the presence of a trailing `/` indicating a collection.
///
/// For example, `file:///foo.txt` refers to the file at `/foo.txt`, whereas
/// `file:///foo/` refers to all the files under the directory `/foo` and its
/// subdirectories.
///
/// Similarly `s3://BUCKET/blob.csv` refers to `blob.csv` in the S3 bucket `BUCKET`,
/// wherease `s3://BUCKET/foo/` refers to all objects with the prefix `foo/` in the
/// S3 bucket `BUCKET`
///
/// # Paths without a Scheme
///
/// If no scheme is provided, or the string is an absolute filesystem path
/// as determined [`std::path::Path::is_absolute`], the string will be
/// as determined by [`std::path::Path::is_absolute`], the string will be
/// interpreted as a path on the local filesystem using the operating
/// system's standard path delimiter, i.e. `\` on Windows, `/` on Unix.
///
/// If the path contains any of `'?', '*', '['`, it will be considered
/// a glob expression and resolved as described in the section below.
///
/// Otherwise, the path will be resolved to an absolute path, returning
/// an error if it does not exist, and converted to a [file URI]
/// Otherwise, the path will be resolved to an absolute path based on the current
/// working directory, and converted to a [file URI].
///
/// If you wish to specify a path that does not exist on the local
/// machine you must provide it as a fully-qualified [file URI]
/// e.g. `file:///myfile.txt`
/// If the path already exists in the local filesystem this will be used to determine if this
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is necessary to preserve the pre-existing behaviour where it would use the filesystem to determine if the path refers to a directory or not

/// [`ListingTableUrl`] refers to a collection or a single object, otherwise the presence
/// of a trailing path delimiter will be used to indicate a directory. For the avoidance
/// of ambiguity it is recommended users always include trailing `/` when intending to
/// refer to a directory.
///
/// ## Glob File Paths
///
/// If no scheme is provided, and the path contains a glob expression, it will
/// be resolved as follows.
///
/// The string up to the first path segment containing a glob expression will be extracted,
/// and resolved in the same manner as a normal scheme-less path. That is, resolved to
/// an absolute path on the local filesystem, returning an error if it does not exist,
/// and converted to a [file URI]
/// and resolved in the same manner as a normal scheme-less path above.
///
/// The remaining string will be interpreted as a [`glob::Pattern`] and used as a
/// filter when listing files from object storage
///
/// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme
/// [URL]: https://url.spec.whatwg.org/
pub fn parse(s: impl AsRef<str>) -> Result<Self> {
let s = s.as_ref();

Expand All @@ -92,32 +104,6 @@ impl ListingTableUrl {
}
}

/// Get object store for specified input_url
/// if input_url is actually not a url, we assume it is a local file path
/// if we have a local path, create it if not exists so ListingTableUrl::parse works
pub fn parse_create_local_if_not_exists(
s: impl AsRef<str>,
is_directory: bool,
) -> Result<Self> {
let s = s.as_ref();
let is_valid_url = Url::parse(s).is_ok();

match is_valid_url {
true => ListingTableUrl::parse(s),
false => {
let path = std::path::PathBuf::from(s);
if !path.exists() {
if is_directory {
fs::create_dir_all(path)?;
} else {
fs::File::create(path)?;
}
}
ListingTableUrl::parse(s)
}
}
}

/// Creates a new [`ListingTableUrl`] interpreting `s` as a filesystem path
fn parse_path(s: &str) -> Result<Self> {
let (prefix, glob) = match split_glob_expression(s) {
Expand All @@ -129,15 +115,9 @@ impl ListingTableUrl {
None => (s, None),
};

let path = std::path::Path::new(prefix).canonicalize()?;
let url = if path.is_dir() {
Url::from_directory_path(path)
} else {
Url::from_file_path(path)
}
.map_err(|_| DataFusionError::Internal(format!("Can not open path: {s}")))?;
// TODO: Currently we do not have an IO-related error variant that accepts ()
// or a string. Once we have such a variant, change the error type above.
let url = url_from_path(prefix).ok_or_else(|| {
DataFusionError::Internal(format!("Can not open path: {s}"))
})?;
Ok(Self::new(url, glob))
}

Expand Down Expand Up @@ -214,7 +194,12 @@ impl ListingTableUrl {
}
}
},
false => futures::stream::once(store.head(&self.prefix)).boxed(),
false => futures::stream::once(store.head(&self.prefix))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes the behaviour consistent with listing above

.filter(|r| {
let p = !matches!(r, Err(object_store::Error::NotFound { .. }));
futures::future::ready(p)
})
.boxed(),
};
Ok(list
.try_filter(move |meta| {
Expand Down Expand Up @@ -257,6 +242,45 @@ impl std::fmt::Display for ListingTableUrl {
}
}

fn url_from_path(s: &str) -> Option<Url> {
let path = std::path::Path::new(s);
let is_dir = match path.exists() {
true => path.is_dir(),
// Fallback to inferring from trailing separator
false => std::path::is_separator(s.chars().last()?),
};

let p = match path.is_absolute() {
true => resolve_path(path)?,
false => {
let absolute = std::env::current_dir().ok()?.join(path);
resolve_path(&absolute)?
}
};

match is_dir {
true => Url::from_directory_path(p).ok(),
false => Url::from_file_path(p).ok(),
}
}

fn resolve_path(path: &std::path::Path) -> Option<PathBuf> {
let mut base = PathBuf::with_capacity(path.as_os_str().len());
for component in path.components() {
match component {
Component::Prefix(_) | Component::RootDir => base.push(component.as_os_str()),
Component::Normal(p) => base.push(p),
Component::CurDir => {} // Do nothing
Component::ParentDir => {
if !base.pop() {
return None;
}
}
}
}
Some(base)
}

const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];

/// Splits `path` at the first path segment containing a glob expression, returning
Expand Down Expand Up @@ -368,4 +392,25 @@ mod tests {
Some(("/a/b/c//", "alltypes_plain*.parquet")),
);
}

#[test]
fn test_resolve_path() {
let r = resolve_path("/foo/bar/../baz.txt".as_ref()).unwrap();
assert_eq!(r.to_str().unwrap(), "/foo/baz.txt");

let r = resolve_path("/foo/bar/./baz.txt".as_ref()).unwrap();
assert_eq!(r.to_str().unwrap(), "/foo/bar/baz.txt");

let r = resolve_path("/foo/bar/../../../baz.txt".as_ref());
assert_eq!(r, None);
}

#[test]
fn test_url_from_path() {
let url = url_from_path("foo/bar").unwrap();
assert!(url.path().ends_with("foo/bar"));

let url = url_from_path("foo/bar/").unwrap();
assert!(url.path().ends_with("foo/bar/"));
}
}
11 changes: 1 addition & 10 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,6 @@ impl TableProviderFactory for ListingTableFactory {
.unwrap_or(false)
};

let create_local_path = statement_options
.take_bool_option("create_local_path")?
.unwrap_or(false);
let single_file = statement_options
.take_bool_option("single_file")?
.unwrap_or(false);
Expand Down Expand Up @@ -205,13 +202,7 @@ impl TableProviderFactory for ListingTableFactory {
FileType::AVRO => file_type_writer_options,
};

let table_path = match create_local_path {
true => ListingTableUrl::parse_create_local_if_not_exists(
&cmd.location,
!single_file,
),
false => ListingTableUrl::parse(&cmd.location),
}?;
let table_path = ListingTableUrl::parse(&cmd.location)?;

let options = ListingOptions::new(file_format)
.with_collect_stat(state.config().collect_statistics())
Expand Down
6 changes: 1 addition & 5 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,11 +565,7 @@ impl DefaultPhysicalPlanner {
copy_options,
}) => {
let input_exec = self.create_initial_plan(input, session_state).await?;

// TODO: make this behavior configurable via options (should copy to create path/file as needed?)
// TODO: add additional configurable options for if existing files should be overwritten or
// appended to
let parsed_url = ListingTableUrl::parse_create_local_if_not_exists(output_url, !*single_file_output)?;
let parsed_url = ListingTableUrl::parse(output_url)?;
let object_store_url = parsed_url.object_store();

let schema: Schema = (**input.schema()).clone().into();
Expand Down
Loading
Loading