Skip to content

Commit

Permalink
Add support for reading CSV files with comments (#10467)
Browse files Browse the repository at this point in the history
This patch adds support for parsing CSV files containing comment lines.

Closes #10262.
  • Loading branch information
bbannier authored Jun 10, 2024
1 parent 29fda88 commit 5912025
Show file tree
Hide file tree
Showing 20 changed files with 156 additions and 1 deletion.
1 change: 1 addition & 0 deletions datafusion-examples/examples/csv_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ async fn main() -> Result<()> {
b',',
b'"',
object_store,
Some(b'#'),
);

let opener = CsvOpener::new(Arc::new(config), FileCompressionType::UNCOMPRESSED);
Expand Down
1 change: 1 addition & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1567,6 +1567,7 @@ config_namespace! {
pub timestamp_tz_format: Option<String>, default = None
pub time_format: Option<String>, default = None
pub null_value: Option<String>, default = None
pub comment: Option<u8>, default = None
}
}

Expand Down
13 changes: 12 additions & 1 deletion datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ impl CsvFormat {
self.options.has_header
}

/// Lines beginning with this byte are ignored.
pub fn with_comment(mut self, comment: Option<u8>) -> Self {
self.options.comment = comment;
self
}

/// The character separating values within a row.
/// - default to ','
pub fn with_delimiter(mut self, delimiter: u8) -> Self {
Expand Down Expand Up @@ -252,6 +258,7 @@ impl FileFormat for CsvFormat {
self.options.delimiter,
self.options.quote,
self.options.escape,
self.options.comment,
self.options.compression.into(),
);
Ok(Arc::new(exec))
Expand Down Expand Up @@ -300,7 +307,7 @@ impl CsvFormat {
pin_mut!(stream);

while let Some(chunk) = stream.next().await.transpose()? {
let format = arrow::csv::reader::Format::default()
let mut format = arrow::csv::reader::Format::default()
.with_header(
first_chunk
&& self
Expand All @@ -310,6 +317,10 @@ impl CsvFormat {
)
.with_delimiter(self.options.delimiter);

if let Some(comment) = self.options.comment {
format = format.with_comment(comment);
}

let (Schema { fields, .. }, records_read) =
format.infer_schema(chunk.reader(), Some(records_to_read))?;

Expand Down
10 changes: 10 additions & 0 deletions datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ pub struct CsvReadOptions<'a> {
pub quote: u8,
/// An optional escape character. Defaults to None.
pub escape: Option<u8>,
/// If enabled, lines beginning with this byte are ignored.
pub comment: Option<u8>,
/// An optional schema representing the CSV files. If None, CSV reader will try to infer it
/// based on data in file.
pub schema: Option<&'a Schema>,
Expand Down Expand Up @@ -97,6 +99,7 @@ impl<'a> CsvReadOptions<'a> {
table_partition_cols: vec![],
file_compression_type: FileCompressionType::UNCOMPRESSED,
file_sort_order: vec![],
comment: None,
}
}

Expand All @@ -106,6 +109,12 @@ impl<'a> CsvReadOptions<'a> {
self
}

/// Specify comment char to use for CSV read
pub fn comment(mut self, comment: u8) -> Self {
self.comment = Some(comment);
self
}

/// Specify delimiter to use for CSV read
pub fn delimiter(mut self, delimiter: u8) -> Self {
self.delimiter = delimiter;
Expand Down Expand Up @@ -477,6 +486,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
let file_format = CsvFormat::default()
.with_options(table_options.csv)
.with_has_header(self.has_header)
.with_comment(self.comment)
.with_delimiter(self.delimiter)
.with_quote(self.quote)
.with_escape(self.escape)
Expand Down
22 changes: 22 additions & 0 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub struct CsvExec {
delimiter: u8,
quote: u8,
escape: Option<u8>,
comment: Option<u8>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// Compression type of the file associated with CsvExec
Expand All @@ -73,6 +74,7 @@ impl CsvExec {
delimiter: u8,
quote: u8,
escape: Option<u8>,
comment: Option<u8>,
file_compression_type: FileCompressionType,
) -> Self {
let (projected_schema, projected_statistics, projected_output_ordering) =
Expand All @@ -92,6 +94,7 @@ impl CsvExec {
metrics: ExecutionPlanMetricsSet::new(),
file_compression_type,
cache,
comment,
}
}

Expand All @@ -113,6 +116,11 @@ impl CsvExec {
self.quote
}

/// Lines beginning with this byte are ignored.
pub fn comment(&self) -> Option<u8> {
self.comment
}

/// The escape character
pub fn escape(&self) -> Option<u8> {
self.escape
Expand Down Expand Up @@ -234,6 +242,7 @@ impl ExecutionPlan for CsvExec {
quote: self.quote,
escape: self.escape,
object_store,
comment: self.comment,
});

let opener = CsvOpener {
Expand Down Expand Up @@ -265,9 +274,11 @@ pub struct CsvConfig {
quote: u8,
escape: Option<u8>,
object_store: Arc<dyn ObjectStore>,
comment: Option<u8>,
}

impl CsvConfig {
#[allow(clippy::too_many_arguments)]
/// Returns a [`CsvConfig`]
pub fn new(
batch_size: usize,
Expand All @@ -277,6 +288,7 @@ impl CsvConfig {
delimiter: u8,
quote: u8,
object_store: Arc<dyn ObjectStore>,
comment: Option<u8>,
) -> Self {
Self {
batch_size,
Expand All @@ -287,6 +299,7 @@ impl CsvConfig {
quote,
escape: None,
object_store,
comment,
}
}
}
Expand All @@ -309,6 +322,9 @@ impl CsvConfig {
if let Some(escape) = self.escape {
builder = builder.with_escape(escape)
}
if let Some(comment) = self.comment {
builder = builder.with_comment(comment);
}

builder
}
Expand Down Expand Up @@ -570,6 +586,7 @@ mod tests {
b',',
b'"',
None,
None,
file_compression_type.to_owned(),
);
assert_eq!(13, csv.base_config.file_schema.fields().len());
Expand Down Expand Up @@ -636,6 +653,7 @@ mod tests {
b',',
b'"',
None,
None,
file_compression_type.to_owned(),
);
assert_eq!(13, csv.base_config.file_schema.fields().len());
Expand Down Expand Up @@ -702,6 +720,7 @@ mod tests {
b',',
b'"',
None,
None,
file_compression_type.to_owned(),
);
assert_eq!(13, csv.base_config.file_schema.fields().len());
Expand Down Expand Up @@ -765,6 +784,7 @@ mod tests {
b',',
b'"',
None,
None,
file_compression_type.to_owned(),
);
assert_eq!(14, csv.base_config.file_schema.fields().len());
Expand Down Expand Up @@ -827,6 +847,7 @@ mod tests {
b',',
b'"',
None,
None,
file_compression_type.to_owned(),
);
assert_eq!(13, csv.base_config.file_schema.fields().len());
Expand Down Expand Up @@ -921,6 +942,7 @@ mod tests {
b',',
b'"',
None,
None,
file_compression_type.to_owned(),
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1471,6 +1471,7 @@ pub(crate) mod tests {
b',',
b'"',
None,
None,
FileCompressionType::UNCOMPRESSED,
))
}
Expand All @@ -1494,6 +1495,7 @@ pub(crate) mod tests {
b',',
b'"',
None,
None,
FileCompressionType::UNCOMPRESSED,
))
}
Expand Down Expand Up @@ -3767,6 +3769,7 @@ pub(crate) mod tests {
b',',
b'"',
None,
None,
compression_type,
)),
vec![("a".to_string(), "a".to_string())],
Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ fn try_swapping_with_csv(
csv.delimiter(),
csv.quote(),
csv.escape(),
csv.comment(),
csv.file_compression_type,
)) as _
})
Expand Down Expand Up @@ -1686,6 +1687,7 @@ mod tests {
0,
0,
None,
None,
FileCompressionType::UNCOMPRESSED,
))
}
Expand All @@ -1708,6 +1710,7 @@ mod tests {
0,
0,
None,
None,
FileCompressionType::UNCOMPRESSED,
))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1502,6 +1502,7 @@ mod tests {
0,
b'"',
None,
None,
FileCompressionType::UNCOMPRESSED,
))
}
Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ pub fn scan_partitioned_csv(partitions: usize, work_dir: &Path) -> Result<Arc<Cs
b',',
b'"',
None,
None,
FileCompressionType::UNCOMPRESSED,
)))
}
Expand Down Expand Up @@ -282,6 +283,7 @@ pub fn csv_exec_sorted(
0,
0,
None,
None,
FileCompressionType::UNCOMPRESSED,
))
}
Expand Down Expand Up @@ -337,6 +339,7 @@ pub fn csv_exec_ordered(
0,
b'"',
None,
None,
FileCompressionType::UNCOMPRESSED,
))
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/proto/datafusion_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ message CsvOptions {
string timestamp_tz_format = 10; // Optional timestamp with timezone format
string time_format = 11; // Optional time format
string null_value = 12; // Optional representation of null value
bytes comment = 13; // Optional comment character as a byte
}

// Options controlling CSV format
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,7 @@ impl TryFrom<&protobuf::CsvOptions> for CsvOptions {
.then(|| proto_opts.time_format.clone()),
null_value: (!proto_opts.null_value.is_empty())
.then(|| proto_opts.null_value.clone()),
comment: proto_opts.comment.first().copied(),
})
}
}
Expand Down
20 changes: 20 additions & 0 deletions datafusion/proto-common/src/generated/pbjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1850,6 +1850,9 @@ impl serde::Serialize for CsvOptions {
if !self.null_value.is_empty() {
len += 1;
}
if !self.comment.is_empty() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("datafusion_common.CsvOptions", len)?;
if !self.has_header.is_empty() {
#[allow(clippy::needless_borrow)]
Expand Down Expand Up @@ -1894,6 +1897,10 @@ impl serde::Serialize for CsvOptions {
if !self.null_value.is_empty() {
struct_ser.serialize_field("nullValue", &self.null_value)?;
}
if !self.comment.is_empty() {
#[allow(clippy::needless_borrow)]
struct_ser.serialize_field("comment", pbjson::private::base64::encode(&self.comment).as_str())?;
}
struct_ser.end()
}
}
Expand Down Expand Up @@ -1924,6 +1931,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
"timeFormat",
"null_value",
"nullValue",
"comment",
];

#[allow(clippy::enum_variant_names)]
Expand All @@ -1940,6 +1948,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
TimestampTzFormat,
TimeFormat,
NullValue,
Comment,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
Expand Down Expand Up @@ -1973,6 +1982,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
"timestampTzFormat" | "timestamp_tz_format" => Ok(GeneratedField::TimestampTzFormat),
"timeFormat" | "time_format" => Ok(GeneratedField::TimeFormat),
"nullValue" | "null_value" => Ok(GeneratedField::NullValue),
"comment" => Ok(GeneratedField::Comment),
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
}
}
Expand Down Expand Up @@ -2004,6 +2014,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
let mut timestamp_tz_format__ = None;
let mut time_format__ = None;
let mut null_value__ = None;
let mut comment__ = None;
while let Some(k) = map_.next_key()? {
match k {
GeneratedField::HasHeader => {
Expand Down Expand Up @@ -2088,6 +2099,14 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
}
null_value__ = Some(map_.next_value()?);
}
GeneratedField::Comment => {
if comment__.is_some() {
return Err(serde::de::Error::duplicate_field("comment"));
}
comment__ =
Some(map_.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0)
;
}
}
}
Ok(CsvOptions {
Expand All @@ -2103,6 +2122,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
timestamp_tz_format: timestamp_tz_format__.unwrap_or_default(),
time_format: time_format__.unwrap_or_default(),
null_value: null_value__.unwrap_or_default(),
comment: comment__.unwrap_or_default(),
})
}
}
Expand Down
Loading

0 comments on commit 5912025

Please sign in to comment.