diff --git a/datafusion-examples/examples/csv_opener.rs b/datafusion-examples/examples/csv_opener.rs index 96753c8c52608..fd1c485eec396 100644 --- a/datafusion-examples/examples/csv_opener.rs +++ b/datafusion-examples/examples/csv_opener.rs @@ -49,6 +49,7 @@ async fn main() -> Result<()> { b',', b'"', object_store, + Some(b'#'), ); let opener = CsvOpener::new(Arc::new(config), FileCompressionType::UNCOMPRESSED); diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index a4f937b6e2a3b..1c431d04cd35e 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1567,6 +1567,7 @@ config_namespace! { pub timestamp_tz_format: Option, default = None pub time_format: Option, default = None pub null_value: Option, default = None + pub comment: Option, default = None } } diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 17bc7aafce85d..970ea7968bae2 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -147,6 +147,12 @@ impl CsvFormat { self.options.has_header } + /// Lines beginning with this byte are ignored. + pub fn with_comment(mut self, comment: Option) -> Self { + self.options.comment = comment; + self + } + /// The character separating values within a row. /// - default to ',' pub fn with_delimiter(mut self, delimiter: u8) -> Self { @@ -252,6 +258,7 @@ impl FileFormat for CsvFormat { self.options.delimiter, self.options.quote, self.options.escape, + self.options.comment, self.options.compression.into(), ); Ok(Arc::new(exec)) @@ -304,7 +311,7 @@ impl CsvFormat { pin_mut!(stream); while let Some(chunk) = stream.next().await.transpose()? { - let format = arrow::csv::reader::Format::default() + let mut format = arrow::csv::reader::Format::default() .with_header( first_chunk && self @@ -314,6 +321,10 @@ impl CsvFormat { ) .with_delimiter(self.options.delimiter); + if let Some(comment) = self.options.comment { + format = format.with_comment(comment); + } + let (Schema { fields, .. }, records_read) = format.infer_schema(chunk.reader(), Some(records_to_read))?; diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index f5bd72495d662..c6d143ed6749a 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -61,6 +61,8 @@ pub struct CsvReadOptions<'a> { pub quote: u8, /// An optional escape character. Defaults to None. pub escape: Option, + /// If enabled, lines beginning with this byte are ignored. + pub comment: Option, /// An optional schema representing the CSV files. If None, CSV reader will try to infer it /// based on data in file. pub schema: Option<&'a Schema>, @@ -97,6 +99,7 @@ impl<'a> CsvReadOptions<'a> { table_partition_cols: vec![], file_compression_type: FileCompressionType::UNCOMPRESSED, file_sort_order: vec![], + comment: None, } } @@ -106,6 +109,12 @@ impl<'a> CsvReadOptions<'a> { self } + /// Specify comment char to use for CSV read + pub fn comment(mut self, comment: u8) -> Self { + self.comment = Some(comment); + self + } + /// Specify delimiter to use for CSV read pub fn delimiter(mut self, delimiter: u8) -> Self { self.delimiter = delimiter; @@ -477,6 +486,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> { let file_format = CsvFormat::default() .with_options(table_options.csv) .with_has_header(self.has_header) + .with_comment(self.comment) .with_delimiter(self.delimiter) .with_quote(self.quote) .with_escape(self.escape) diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index cc7c837e471e3..0cbcf4380b04b 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -58,6 +58,7 @@ pub struct CsvExec { delimiter: u8, quote: u8, escape: Option, + comment: Option, /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Compression type of the file associated with CsvExec @@ -73,6 +74,7 @@ impl CsvExec { delimiter: u8, quote: u8, escape: Option, + comment: Option, file_compression_type: FileCompressionType, ) -> Self { let (projected_schema, projected_statistics, projected_output_ordering) = @@ -92,6 +94,7 @@ impl CsvExec { metrics: ExecutionPlanMetricsSet::new(), file_compression_type, cache, + comment, } } @@ -113,6 +116,11 @@ impl CsvExec { self.quote } + /// Lines beginning with this byte are ignored. + pub fn comment(&self) -> Option { + self.comment + } + /// The escape character pub fn escape(&self) -> Option { self.escape @@ -234,6 +242,7 @@ impl ExecutionPlan for CsvExec { quote: self.quote, escape: self.escape, object_store, + comment: self.comment, }); let opener = CsvOpener { @@ -265,9 +274,11 @@ pub struct CsvConfig { quote: u8, escape: Option, object_store: Arc, + comment: Option, } impl CsvConfig { + #[allow(clippy::too_many_arguments)] /// Returns a [`CsvConfig`] pub fn new( batch_size: usize, @@ -277,6 +288,7 @@ impl CsvConfig { delimiter: u8, quote: u8, object_store: Arc, + comment: Option, ) -> Self { Self { batch_size, @@ -287,6 +299,7 @@ impl CsvConfig { quote, escape: None, object_store, + comment, } } } @@ -309,6 +322,9 @@ impl CsvConfig { if let Some(escape) = self.escape { builder = builder.with_escape(escape) } + if let Some(comment) = self.comment { + builder = builder.with_comment(comment); + } builder } @@ -570,6 +586,7 @@ mod tests { b',', b'"', None, + None, file_compression_type.to_owned(), ); assert_eq!(13, csv.base_config.file_schema.fields().len()); @@ -636,6 +653,7 @@ mod tests { b',', b'"', None, + None, file_compression_type.to_owned(), ); assert_eq!(13, csv.base_config.file_schema.fields().len()); @@ -702,6 +720,7 @@ mod tests { b',', b'"', None, + None, file_compression_type.to_owned(), ); assert_eq!(13, csv.base_config.file_schema.fields().len()); @@ -765,6 +784,7 @@ mod tests { b',', b'"', None, + None, file_compression_type.to_owned(), ); assert_eq!(14, csv.base_config.file_schema.fields().len()); @@ -827,6 +847,7 @@ mod tests { b',', b'"', None, + None, file_compression_type.to_owned(), ); assert_eq!(13, csv.base_config.file_schema.fields().len()); @@ -921,6 +942,7 @@ mod tests { b',', b'"', None, + None, file_compression_type.to_owned(), ); diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index c07f2c5dcf249..a673d755c7eb7 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1496,6 +1496,7 @@ pub(crate) mod tests { b',', b'"', None, + None, FileCompressionType::UNCOMPRESSED, )) } @@ -1526,6 +1527,7 @@ pub(crate) mod tests { b',', b'"', None, + None, FileCompressionType::UNCOMPRESSED, )) } @@ -3803,6 +3805,7 @@ pub(crate) mod tests { b',', b'"', None, + None, compression_type, )), vec![("a".to_string(), "a".to_string())], diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index 0190f35cc97b8..df010e8cb32ed 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -185,6 +185,7 @@ fn try_swapping_with_csv( csv.delimiter(), csv.quote(), csv.escape(), + csv.comment(), csv.file_compression_type, )) as _ }) @@ -1694,6 +1695,7 @@ mod tests { 0, 0, None, + None, FileCompressionType::UNCOMPRESSED, )) } @@ -1720,6 +1722,7 @@ mod tests { 0, 0, None, + None, FileCompressionType::UNCOMPRESSED, )) } diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index f69c0df32e8a5..757d48ed09ee1 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -1508,6 +1508,7 @@ mod tests { 0, b'"', None, + None, FileCompressionType::UNCOMPRESSED, )) } diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index 1152c70d43915..1857ac8e18101 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -98,6 +98,7 @@ pub fn scan_partitioned_csv(partitions: usize, work_dir: &Path) -> Result serde::Deserialize<'de> for CsvOptions { "timeFormat", "null_value", "nullValue", + "comment", ]; #[allow(clippy::enum_variant_names)] @@ -5527,6 +5535,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { TimestampTzFormat, TimeFormat, NullValue, + Comment, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -5560,6 +5569,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { "timestampTzFormat" | "timestamp_tz_format" => Ok(GeneratedField::TimestampTzFormat), "timeFormat" | "time_format" => Ok(GeneratedField::TimeFormat), "nullValue" | "null_value" => Ok(GeneratedField::NullValue), + "comment" => Ok(GeneratedField::Comment), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -5591,6 +5601,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { let mut timestamp_tz_format__ = None; let mut time_format__ = None; let mut null_value__ = None; + let mut comment__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::HasHeader => { @@ -5675,6 +5686,14 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { } null_value__ = Some(map_.next_value()?); } + GeneratedField::Comment => { + if comment__.is_some() { + return Err(serde::de::Error::duplicate_field("comment")); + } + comment__ = + Some(map_.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0) + ; + } } } Ok(CsvOptions { @@ -5690,6 +5709,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { timestamp_tz_format: timestamp_tz_format__.unwrap_or_default(), time_format: time_format__.unwrap_or_default(), null_value: null_value__.unwrap_or_default(), + comment: comment__.unwrap_or_default(), }) } } @@ -5719,6 +5739,9 @@ impl serde::Serialize for CsvScanExecNode { if self.optional_escape.is_some() { len += 1; } + if self.optional_comment.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.CsvScanExecNode", len)?; if let Some(v) = self.base_conf.as_ref() { struct_ser.serialize_field("baseConf", v)?; @@ -5739,6 +5762,13 @@ impl serde::Serialize for CsvScanExecNode { } } } + if let Some(v) = self.optional_comment.as_ref() { + match v { + csv_scan_exec_node::OptionalComment::Comment(v) => { + struct_ser.serialize_field("comment", v)?; + } + } + } struct_ser.end() } } @@ -5756,6 +5786,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { "delimiter", "quote", "escape", + "comment", ]; #[allow(clippy::enum_variant_names)] @@ -5765,6 +5796,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { Delimiter, Quote, Escape, + Comment, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -5791,6 +5823,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { "delimiter" => Ok(GeneratedField::Delimiter), "quote" => Ok(GeneratedField::Quote), "escape" => Ok(GeneratedField::Escape), + "comment" => Ok(GeneratedField::Comment), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -5815,6 +5848,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { let mut delimiter__ = None; let mut quote__ = None; let mut optional_escape__ = None; + let mut optional_comment__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::BaseConf => { @@ -5847,6 +5881,12 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { } optional_escape__ = map_.next_value::<::std::option::Option<_>>()?.map(csv_scan_exec_node::OptionalEscape::Escape); } + GeneratedField::Comment => { + if optional_comment__.is_some() { + return Err(serde::de::Error::duplicate_field("comment")); + } + optional_comment__ = map_.next_value::<::std::option::Option<_>>()?.map(csv_scan_exec_node::OptionalComment::Comment); + } } } Ok(CsvScanExecNode { @@ -5855,6 +5895,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { delimiter: delimiter__.unwrap_or_default(), quote: quote__.unwrap_or_default(), optional_escape: optional_escape__, + optional_comment: optional_comment__, }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 64e72ba038783..9ebe9cfeac330 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1713,6 +1713,9 @@ pub struct CsvOptions { /// Optional representation of null value #[prost(string, tag = "12")] pub null_value: ::prost::alloc::string::String, + /// Optional comment character as a byte + #[prost(bytes = "vec", tag = "13")] + pub comment: ::prost::alloc::vec::Vec, } /// Options controlling CSV format #[allow(clippy::derive_partial_eq_without_eq)] @@ -2355,6 +2358,8 @@ pub struct CsvScanExecNode { pub quote: ::prost::alloc::string::String, #[prost(oneof = "csv_scan_exec_node::OptionalEscape", tags = "5")] pub optional_escape: ::core::option::Option, + #[prost(oneof = "csv_scan_exec_node::OptionalComment", tags = "6")] + pub optional_comment: ::core::option::Option, } /// Nested message and enum types in `CsvScanExecNode`. pub mod csv_scan_exec_node { @@ -2364,6 +2369,12 @@ pub mod csv_scan_exec_node { #[prost(string, tag = "5")] Escape(::prost::alloc::string::String), } + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum OptionalComment { + #[prost(string, tag = "6")] + Comment(::prost::alloc::string::String), + } } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index c907e991fb86c..465ee61761548 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -807,6 +807,7 @@ impl TryFrom<&protobuf::CsvOptions> for CsvOptions { Ok(CsvOptions { has_header: proto_opts.has_header.first().map(|h| *h != 0), delimiter: proto_opts.delimiter[0], + comment: proto_opts.comment.first().copied(), quote: proto_opts.quote[0], escape: proto_opts.escape.first().copied(), compression: proto_opts.compression().into(), diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 1c5ba861d2974..926991d544d54 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -204,6 +204,14 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { } else { None }, + if let Some(protobuf::csv_scan_exec_node::OptionalComment::Comment( + comment, + )) = &scan.optional_comment + { + Some(str_to_byte(comment, "comment")?) + } else { + None + }, FileCompressionType::UNCOMPRESSED, ))), #[cfg(feature = "parquet")] @@ -1577,6 +1585,13 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { } else { None }, + optional_comment: if let Some(comment) = exec.comment() { + Some(protobuf::csv_scan_exec_node::OptionalComment::Comment( + byte_to_string(comment, "comment")?, + )) + } else { + None + }, }, )), }); diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index c6b94a934f235..67fe545cfd746 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -985,6 +985,7 @@ impl TryFrom<&CsvOptions> for protobuf::CsvOptions { timestamp_tz_format: opts.timestamp_tz_format.clone().unwrap_or_default(), time_format: opts.time_format.clone().unwrap_or_default(), null_value: opts.null_value.clone().unwrap_or_default(), + comment: opts.comment.map_or_else(Vec::new, |c| vec![c]), }) } }