diff --git a/daft/daft.pyi b/daft/daft.pyi index d016c7cd3c..ecc1cb11d0 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -190,6 +190,7 @@ class CsvSourceConfig: delimiter: str has_headers: bool + double_quote: bool buffer_size: int | None chunk_size: int | None @@ -197,6 +198,7 @@ class CsvSourceConfig: self, delimiter: str, has_headers: bool, + double_quote: bool, buffer_size: int | None = None, chunk_size: int | None = None, ): ... @@ -432,6 +434,7 @@ def read_csv( num_rows: int | None = None, has_header: bool | None = None, delimiter: str | None = None, + double_quote: bool | None = None, io_config: IOConfig | None = None, multithreaded_io: bool | None = None, schema: PySchema | None = None, @@ -442,6 +445,7 @@ def read_csv_schema( uri: str, has_header: bool | None = None, delimiter: str | None = None, + double_quote: bool | None = None, io_config: IOConfig | None = None, multithreaded_io: bool | None = None, ): ... @@ -778,6 +782,7 @@ class PyMicroPartition: num_rows: int | None = None, has_header: bool | None = None, delimiter: str | None = None, + double_quote: bool | None = None, io_config: IOConfig | None = None, multithreaded_io: bool | None = None, schema: PySchema | None = None, diff --git a/daft/execution/execution_step.py b/daft/execution/execution_step.py index 7dd8b2d18d..da03267d5e 100644 --- a/daft/execution/execution_step.py +++ b/daft/execution/execution_step.py @@ -373,6 +373,7 @@ def _handle_tabular_files_scan( csv_options=TableParseCSVOptions( delimiter=format_config.delimiter, header_index=0 if format_config.has_headers else None, + double_quote=format_config.double_quote, buffer_size=format_config.buffer_size, chunk_size=format_config.chunk_size, ), diff --git a/daft/io/_csv.py b/daft/io/_csv.py index 6925e5e5fc..babb6c4c35 100644 --- a/daft/io/_csv.py +++ b/daft/io/_csv.py @@ -23,6 +23,7 @@ def read_csv( has_headers: bool = True, column_names: Optional[List[str]] = None, delimiter: str = ",", + double_quote: bool = True, io_config: Optional["IOConfig"] = None, use_native_downloader: bool = True, _buffer_size: Optional[int] = None, @@ -42,6 +43,7 @@ def read_csv( disable all schema inference on data being read, and throw an error if data being read is incompatible. has_headers (bool): Whether the CSV has a header or not, defaults to True delimiter (Str): Delimiter used in the CSV, defaults to "," + doubled_quote_escape (bool): Whether to support double quote escapes, defaults to True io_config (IOConfig): Config to be used with the native downloader use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet. This is currently experimental. @@ -62,6 +64,7 @@ def read_csv( csv_config = CsvSourceConfig( delimiter=delimiter, has_headers=has_headers, + double_quote=double_quote, buffer_size=_buffer_size, chunk_size=_chunk_size, ) diff --git a/daft/logical/schema.py b/daft/logical/schema.py index 89fad30b31..43fb79e65e 100644 --- a/daft/logical/schema.py +++ b/daft/logical/schema.py @@ -166,6 +166,7 @@ def from_csv( path: str, has_header: bool | None = None, delimiter: str | None = None, + double_quote: bool | None = None, io_config: IOConfig | None = None, multithreaded_io: bool | None = None, ) -> Schema: @@ -174,6 +175,7 @@ def from_csv( uri=path, has_header=has_header, delimiter=delimiter, + double_quote=double_quote, io_config=io_config, multithreaded_io=multithreaded_io, ) diff --git a/daft/runners/partitioning.py b/daft/runners/partitioning.py index 6b6f2f001e..16ed931274 100644 --- a/daft/runners/partitioning.py +++ b/daft/runners/partitioning.py @@ -44,12 +44,14 @@ class TableParseCSVOptions: Args: delimiter: The delimiter to use when parsing CSVs, defaults to "," header_index: Index of the header row, or None if no header + double_quote: Whether to support escaping quotes by doubling them, defaults to True buffer_size: Size of the buffer (in bytes) used by the streaming reader. chunk_size: Size of the chunks (in bytes) deserialized in parallel by the streaming reader. """ delimiter: str = "," header_index: int | None = 0 + double_quote: bool = True buffer_size: int | None = None chunk_size: int | None = None diff --git a/daft/table/micropartition.py b/daft/table/micropartition.py index 10ffb63fed..af25430725 100644 --- a/daft/table/micropartition.py +++ b/daft/table/micropartition.py @@ -351,6 +351,7 @@ def read_csv( num_rows: int | None = None, has_header: bool | None = None, delimiter: str | None = None, + double_quote: bool | None = None, io_config: IOConfig | None = None, multithreaded_io: bool | None = None, schema: Schema | None = None, @@ -365,6 +366,7 @@ def read_csv( num_rows=num_rows, has_header=has_header, delimiter=delimiter, + double_quote=double_quote, io_config=io_config, multithreaded_io=multithreaded_io, schema=schema._schema if schema is not None else None, diff --git a/daft/table/table.py b/daft/table/table.py index 3828e6f7e4..d4e28fb80a 100644 --- a/daft/table/table.py +++ b/daft/table/table.py @@ -448,6 +448,7 @@ def read_csv( num_rows: int | None = None, has_header: bool | None = None, delimiter: str | None = None, + double_quote: bool | None = None, io_config: IOConfig | None = None, multithreaded_io: bool | None = None, schema: Schema | None = None, @@ -462,6 +463,7 @@ def read_csv( num_rows=num_rows, has_header=has_header, delimiter=delimiter, + double_quote=double_quote, io_config=io_config, multithreaded_io=multithreaded_io, schema=schema._schema if schema is not None else None, diff --git a/daft/table/table_io.py b/daft/table/table_io.py index 73aff2f66a..90eaa3d5e3 100644 --- a/daft/table/table_io.py +++ b/daft/table/table_io.py @@ -222,6 +222,7 @@ def read_csv( num_rows=read_options.num_rows, has_header=has_header, delimiter=csv_options.delimiter, + double_quote=csv_options.double_quote, io_config=config.io_config, schema=schema, buffer_size=csv_options.buffer_size, diff --git a/src/daft-csv/src/metadata.rs b/src/daft-csv/src/metadata.rs index 76b3d14c9f..10a1d132ce 100644 --- a/src/daft-csv/src/metadata.rs +++ b/src/daft-csv/src/metadata.rs @@ -21,6 +21,7 @@ pub fn read_csv_schema( uri: &str, has_header: bool, delimiter: Option, + double_quote: bool, max_bytes: Option, io_client: Arc, io_stats: Option, @@ -32,6 +33,7 @@ pub fn read_csv_schema( uri, has_header, delimiter, + double_quote, // Default to 1 MiB. max_bytes.or(Some(1024 * 1024)), io_client, @@ -45,6 +47,7 @@ pub(crate) async fn read_csv_schema_single( uri: &str, has_header: bool, delimiter: Option, + double_quote: bool, max_bytes: Option, io_client: Arc, io_stats: Option, @@ -60,6 +63,7 @@ pub(crate) async fn read_csv_schema_single( compression_codec, has_header, delimiter, + double_quote, max_bytes, ) .await @@ -70,6 +74,7 @@ pub(crate) async fn read_csv_schema_single( compression_codec, has_header, delimiter, + double_quote, // Truncate max_bytes to size if both are set. max_bytes.map(|m| size.map(|s| m.min(s)).unwrap_or(m)), ) @@ -83,6 +88,7 @@ async fn read_csv_schema_from_compressed_reader( compression_codec: Option, has_header: bool, delimiter: Option, + double_quote: bool, max_bytes: Option, ) -> DaftResult<(Schema, usize, usize, f64, f64)> where @@ -94,12 +100,20 @@ where compression.to_decoder(reader), has_header, delimiter, + double_quote, max_bytes, ) .await } None => { - read_csv_schema_from_uncompressed_reader(reader, has_header, delimiter, max_bytes).await + read_csv_schema_from_uncompressed_reader( + reader, + has_header, + delimiter, + double_quote, + max_bytes, + ) + .await } } } @@ -108,14 +122,21 @@ async fn read_csv_schema_from_uncompressed_reader( reader: R, has_header: bool, delimiter: Option, + double_quote: bool, max_bytes: Option, ) -> DaftResult<(Schema, usize, usize, f64, f64)> where R: AsyncRead + Unpin + Send, { let (schema, total_bytes_read, num_records_read, mean_size, std_size) = - read_csv_arrow_schema_from_uncompressed_reader(reader, has_header, delimiter, max_bytes) - .await?; + read_csv_arrow_schema_from_uncompressed_reader( + reader, + has_header, + delimiter, + double_quote, + max_bytes, + ) + .await?; Ok(( Schema::try_from(&schema)?, total_bytes_read, @@ -129,6 +150,7 @@ async fn read_csv_arrow_schema_from_uncompressed_reader( reader: R, has_header: bool, delimiter: Option, + double_quote: bool, max_bytes: Option, ) -> DaftResult<(arrow2::datatypes::Schema, usize, usize, f64, f64)> where @@ -137,6 +159,7 @@ where let mut reader = AsyncReaderBuilder::new() .has_headers(has_header) .delimiter(delimiter.unwrap_or(b',')) + .double_quote(double_quote) .buffer_capacity(max_bytes.unwrap_or(1 << 20).min(1 << 20)) .create_reader(reader.compat()); let (fields, total_bytes_read, num_records_read, mean_size, std_size) = @@ -275,8 +298,15 @@ mod tests { io_config.s3.anonymous = true; let io_client = Arc::new(IOClient::new(io_config.into())?); - let (schema, total_bytes_read, num_records_read, _, _) = - read_csv_schema(file.as_ref(), true, None, None, io_client.clone(), None)?; + let (schema, total_bytes_read, num_records_read, _, _) = read_csv_schema( + file.as_ref(), + true, + None, + true, + None, + io_client.clone(), + None, + )?; assert_eq!( schema, Schema::new(vec![ @@ -308,6 +338,7 @@ mod tests { file.as_ref(), true, Some(b'|'), + true, None, io_client.clone(), None, @@ -336,8 +367,15 @@ mod tests { io_config.s3.anonymous = true; let io_client = Arc::new(IOClient::new(io_config.into())?); - let (_, total_bytes_read, num_records_read, _, _) = - read_csv_schema(file.as_ref(), true, None, None, io_client.clone(), None)?; + let (_, total_bytes_read, num_records_read, _, _) = read_csv_schema( + file.as_ref(), + true, + None, + true, + None, + io_client.clone(), + None, + )?; assert_eq!(total_bytes_read, 328); assert_eq!(num_records_read, 20); @@ -355,8 +393,15 @@ mod tests { io_config.s3.anonymous = true; let io_client = Arc::new(IOClient::new(io_config.into())?); - let (schema, total_bytes_read, num_records_read, _, _) = - read_csv_schema(file.as_ref(), false, None, None, io_client.clone(), None)?; + let (schema, total_bytes_read, num_records_read, _, _) = read_csv_schema( + file.as_ref(), + false, + None, + true, + None, + io_client.clone(), + None, + )?; assert_eq!( schema, Schema::new(vec![ @@ -384,8 +429,15 @@ mod tests { io_config.s3.anonymous = true; let io_client = Arc::new(IOClient::new(io_config.into())?); - let (schema, total_bytes_read, num_records_read, _, _) = - read_csv_schema(file.as_ref(), true, None, None, io_client.clone(), None)?; + let (schema, total_bytes_read, num_records_read, _, _) = read_csv_schema( + file.as_ref(), + true, + None, + true, + None, + io_client.clone(), + None, + )?; assert_eq!( schema, Schema::new(vec![ @@ -410,8 +462,15 @@ mod tests { io_config.s3.anonymous = true; let io_client = Arc::new(IOClient::new(io_config.into())?); - let (schema, total_bytes_read, num_records_read, _, _) = - read_csv_schema(file.as_ref(), true, None, None, io_client.clone(), None)?; + let (schema, total_bytes_read, num_records_read, _, _) = read_csv_schema( + file.as_ref(), + true, + None, + true, + None, + io_client.clone(), + None, + )?; assert_eq!( schema, Schema::new(vec![ @@ -439,8 +498,15 @@ mod tests { io_config.s3.anonymous = true; let io_client = Arc::new(IOClient::new(io_config.into())?); - let (schema, total_bytes_read, num_records_read, _, _) = - read_csv_schema(file.as_ref(), true, None, None, io_client.clone(), None)?; + let (schema, total_bytes_read, num_records_read, _, _) = read_csv_schema( + file.as_ref(), + true, + None, + true, + None, + io_client.clone(), + None, + )?; assert_eq!( schema, Schema::new(vec![ @@ -470,6 +536,7 @@ mod tests { file.as_ref(), true, None, + true, Some(100), io_client.clone(), None, @@ -502,7 +569,15 @@ mod tests { io_config.s3.anonymous = true; let io_client = Arc::new(IOClient::new(io_config.into())?); - let err = read_csv_schema(file.as_ref(), true, None, None, io_client.clone(), None); + let err = read_csv_schema( + file.as_ref(), + true, + None, + true, + None, + io_client.clone(), + None, + ); assert!(err.is_err()); let err = err.unwrap_err(); assert!(matches!(err, DaftError::ArrowError(_)), "{}", err); @@ -527,7 +602,15 @@ mod tests { io_config.s3.anonymous = true; let io_client = Arc::new(IOClient::new(io_config.into())?); - let err = read_csv_schema(file.as_ref(), true, None, None, io_client.clone(), None); + let err = read_csv_schema( + file.as_ref(), + true, + None, + true, + None, + io_client.clone(), + None, + ); assert!(err.is_err()); let err = err.unwrap_err(); assert!(matches!(err, DaftError::ArrowError(_)), "{}", err); @@ -574,8 +657,15 @@ mod tests { io_config.s3.anonymous = true; let io_client = Arc::new(IOClient::new(io_config.into())?); - let (schema, _, _, _, _) = - read_csv_schema(file.as_ref(), true, None, None, io_client.clone(), None)?; + let (schema, _, _, _, _) = read_csv_schema( + file.as_ref(), + true, + None, + true, + None, + io_client.clone(), + None, + )?; assert_eq!( schema, Schema::new(vec![ diff --git a/src/daft-csv/src/python.rs b/src/daft-csv/src/python.rs index def9bfa966..4799cb7218 100644 --- a/src/daft-csv/src/python.rs +++ b/src/daft-csv/src/python.rs @@ -30,6 +30,7 @@ pub mod pylib { num_rows: Option, has_header: Option, delimiter: Option<&str>, + double_quote: Option, io_config: Option, multithreaded_io: Option, schema: Option, @@ -50,6 +51,7 @@ pub mod pylib { num_rows, has_header.unwrap_or(true), str_delimiter_to_byte(delimiter)?, + double_quote.unwrap_or(true), io_client, Some(io_stats), multithreaded_io.unwrap_or(true), @@ -63,11 +65,13 @@ pub mod pylib { } #[pyfunction] + #[allow(clippy::too_many_arguments)] pub fn read_csv_schema( py: Python, uri: &str, has_header: Option, delimiter: Option<&str>, + double_quote: Option, max_bytes: Option, io_config: Option, multithreaded_io: Option, @@ -83,6 +87,7 @@ pub mod pylib { uri, has_header.unwrap_or(true), str_delimiter_to_byte(delimiter)?, + double_quote.unwrap_or(true), max_bytes, io_client, Some(io_stats), diff --git a/src/daft-csv/src/read.rs b/src/daft-csv/src/read.rs index 1cf4f4d479..97ebfb9ae3 100644 --- a/src/daft-csv/src/read.rs +++ b/src/daft-csv/src/read.rs @@ -41,6 +41,7 @@ pub fn read_csv( num_rows: Option, has_header: bool, delimiter: Option, + double_quote: bool, io_client: Arc, io_stats: Option, multithreaded_io: bool, @@ -59,6 +60,7 @@ pub fn read_csv( num_rows, has_header, delimiter.unwrap_or(b','), + double_quote, io_client, io_stats, schema, @@ -78,6 +80,7 @@ async fn read_csv_single( num_rows: Option, has_header: bool, delimiter: u8, + double_quote: bool, io_client: Arc, io_stats: Option, schema: Option, @@ -92,6 +95,7 @@ async fn read_csv_single( uri, has_header, Some(delimiter), + double_quote, // Read at most 1 MiB when doing schema inference. Some(1024 * 1024), io_client.clone(), @@ -115,6 +119,7 @@ async fn read_csv_single( num_rows, has_header, delimiter, + double_quote, schema, // Default buffer size of 512 KiB. buffer_size.unwrap_or(512 * 1024), @@ -144,6 +149,7 @@ async fn read_csv_single( num_rows, has_header, delimiter, + double_quote, schema, // Default buffer size of 512 KiB. buffer_size.unwrap_or(512 * 1024), @@ -176,6 +182,7 @@ async fn read_csv_from_compressed_reader( num_rows: Option, has_header: bool, delimiter: u8, + double_quote: bool, schema: arrow2::datatypes::Schema, buffer_size: usize, chunk_size: usize, @@ -195,6 +202,7 @@ where num_rows, has_header, delimiter, + double_quote, schema, buffer_size, chunk_size, @@ -212,6 +220,7 @@ where num_rows, has_header, delimiter, + double_quote, schema, buffer_size, chunk_size, @@ -232,6 +241,7 @@ async fn read_csv_from_uncompressed_reader( num_rows: Option, has_header: bool, delimiter: u8, + double_quote: bool, schema: arrow2::datatypes::Schema, buffer_size: usize, chunk_size: usize, @@ -245,6 +255,7 @@ where let reader = AsyncReaderBuilder::new() .has_headers(has_header) .delimiter(delimiter) + .double_quote(double_quote) .buffer_capacity(buffer_size) .create_reader(stream_reader.compat()); let mut fields = schema.fields; @@ -453,12 +464,14 @@ mod tests { out: &Table, has_header: bool, delimiter: Option, + double_quote: bool, column_names: Option>, projection: Option>, limit: Option, ) { let mut reader = ReaderBuilder::new() .delimiter(delimiter.unwrap_or(b',')) + .double_quote(double_quote) .from_path(path) .unwrap(); let (mut fields, _) = infer_schema(&mut reader, None, has_header, &infer).unwrap(); @@ -534,6 +547,7 @@ mod tests { None, true, None, + true, io_client, None, true, @@ -555,7 +569,7 @@ mod tests { .into(), ); if compression.is_none() { - check_equal_local_arrow2(file.as_ref(), &table, true, None, None, None, None); + check_equal_local_arrow2(file.as_ref(), &table, true, None, true, None, None, None); } Ok(()) @@ -587,6 +601,7 @@ mod tests { None, false, None, + true, io_client, None, true, @@ -612,6 +627,7 @@ mod tests { &table, false, None, + true, Some(column_names), None, None, @@ -639,6 +655,7 @@ mod tests { Some(5), true, Some(b'|'), + true, io_client, None, true, @@ -659,7 +676,70 @@ mod tests { ])? .into(), ); - check_equal_local_arrow2(file.as_ref(), &table, true, Some(b'|'), None, None, Some(5)); + check_equal_local_arrow2( + file.as_ref(), + &table, + true, + Some(b'|'), + true, + None, + None, + Some(5), + ); + + Ok(()) + } + + #[test] + fn test_csv_read_local_double_quote() -> DaftResult<()> { + let file = format!( + "{}/test/iris_tiny_double_quote.csv", + env!("CARGO_MANIFEST_DIR"), + ); + + let mut io_config = IOConfig::default(); + io_config.s3.anonymous = true; + + let io_client = Arc::new(IOClient::new(io_config.into())?); + + let table = read_csv( + file.as_ref(), + None, + None, + Some(5), + true, + None, + false, + io_client, + None, + true, + None, + None, + None, + None, + )?; + assert_eq!(table.len(), 5); + assert_eq!( + table.schema, + Schema::new(vec![ + Field::new("\"sepal.\"\"length\"", DataType::Float64), + Field::new("sepal.width", DataType::Float64), + Field::new("petal.length", DataType::Float64), + Field::new("petal.width", DataType::Float64), + Field::new("variety", DataType::Utf8), + ])? + .into(), + ); + check_equal_local_arrow2( + file.as_ref(), + &table, + true, + None, + false, + None, + None, + Some(5), + ); Ok(()) } @@ -680,6 +760,7 @@ mod tests { Some(5), true, None, + true, io_client, None, true, @@ -700,7 +781,7 @@ mod tests { ])? .into(), ); - check_equal_local_arrow2(file.as_ref(), &table, true, None, None, None, Some(5)); + check_equal_local_arrow2(file.as_ref(), &table, true, None, true, None, None, Some(5)); Ok(()) } @@ -721,6 +802,7 @@ mod tests { None, true, None, + true, io_client, None, true, @@ -743,6 +825,7 @@ mod tests { &table, true, None, + true, None, Some(vec![2, 3]), None, @@ -777,6 +860,7 @@ mod tests { None, false, None, + true, io_client, None, true, @@ -799,6 +883,7 @@ mod tests { &table, false, None, + true, Some(column_names), Some(vec![2, 3]), None, @@ -823,6 +908,7 @@ mod tests { None, true, None, + true, io_client, None, true, @@ -843,7 +929,7 @@ mod tests { ])? .into(), ); - check_equal_local_arrow2(file.as_ref(), &table, true, None, None, None, None); + check_equal_local_arrow2(file.as_ref(), &table, true, None, true, None, None, None); Ok(()) } @@ -864,6 +950,7 @@ mod tests { None, true, None, + true, io_client, None, true, @@ -884,7 +971,7 @@ mod tests { ])? .into(), ); - check_equal_local_arrow2(file.as_ref(), &table, true, None, None, None, None); + check_equal_local_arrow2(file.as_ref(), &table, true, None, true, None, None, None); Ok(()) } @@ -905,6 +992,7 @@ mod tests { None, true, None, + true, io_client, None, true, @@ -925,7 +1013,7 @@ mod tests { ])? .into(), ); - check_equal_local_arrow2(file.as_ref(), &table, true, None, None, None, None); + check_equal_local_arrow2(file.as_ref(), &table, true, None, true, None, None, None); Ok(()) } @@ -946,6 +1034,7 @@ mod tests { None, true, None, + true, io_client, None, true, @@ -966,7 +1055,7 @@ mod tests { ])? .into(), ); - check_equal_local_arrow2(file.as_ref(), &table, true, None, None, None, None); + check_equal_local_arrow2(file.as_ref(), &table, true, None, true, None, None, None); Ok(()) } @@ -990,6 +1079,7 @@ mod tests { None, true, None, + true, io_client, None, true, @@ -1051,6 +1141,7 @@ mod tests { None, true, None, + true, io_client, None, true, @@ -1105,6 +1196,7 @@ mod tests { None, true, None, + true, io_client, None, true, @@ -1125,7 +1217,7 @@ mod tests { ])? .into(), ); - check_equal_local_arrow2(file.as_ref(), &table, true, None, None, None, None); + check_equal_local_arrow2(file.as_ref(), &table, true, None, true, None, None, None); Ok(()) } @@ -1154,6 +1246,7 @@ mod tests { None, true, None, + true, io_client, None, true, @@ -1192,6 +1285,7 @@ mod tests { None, true, None, + true, io_client, None, true, @@ -1232,6 +1326,7 @@ mod tests { None, false, None, + true, io_client, None, true, @@ -1294,6 +1389,7 @@ mod tests { None, true, None, + true, io_client, None, true, @@ -1332,6 +1428,7 @@ mod tests { None, false, None, + true, io_client, None, true, @@ -1370,6 +1467,7 @@ mod tests { None, false, None, + true, io_client, None, true, @@ -1403,6 +1501,7 @@ mod tests { Some(10), true, None, + true, io_client, None, true, @@ -1440,6 +1539,7 @@ mod tests { None, true, None, + true, io_client, None, true, @@ -1473,6 +1573,7 @@ mod tests { None, true, None, + true, io_client, None, true, @@ -1502,6 +1603,7 @@ mod tests { None, true, None, + true, io_client, None, true, @@ -1531,6 +1633,7 @@ mod tests { None, true, None, + true, io_client, None, true, diff --git a/src/daft-csv/test/iris_tiny_double_quote.csv b/src/daft-csv/test/iris_tiny_double_quote.csv new file mode 100644 index 0000000000..8a78523533 --- /dev/null +++ b/src/daft-csv/test/iris_tiny_double_quote.csv @@ -0,0 +1,21 @@ +"""sepal.""length",sepal.width,petal.length,petal.width,variety +5.1,3.5,1.4,.2,"""Setosa" +4.9,3,1.4,.2,"Setosa""" +4.7,3.2,1.3,.2,"""Setosa""" +4.6,3.1,1.5,.2,"Se""tosa" +5,3.6,1.4,.2,"Se""to""sa" +5.4,3.9,1.7,.4,"Se""""tosa" +4.6,3.4,1.4,.3,"Setosa"""" +5,3.4,1.5,.2,Setosa" +4.4,2.9,1.4,.2,"Setosa +4.9,3.1,1.5,.1,"Setosa" +5.4,3.7,1.5,.2,"Setosa" +4.8,3.4,1.6,.2,"Setosa" +4.8,3,1.4,.1,"Setosa" +4.3,3,1.1,.1,"Setosa" +5.8,4,1.2,.2,"Setosa" +5.7,4.4,1.5,.4,"Setosa" +5.4,3.9,1.3,.4,"Setosa" +5.1,3.5,1.4,.3,"Setosa" +5.7,3.8,1.7,.3,"Setosa" +5.1,3.8,1.5,.3,"Setosa" diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index b2a382165e..b3b45d44e3 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -258,6 +258,7 @@ pub(crate) fn read_csv_into_micropartition( num_rows: Option, has_header: bool, delimiter: Option, + double_quote: bool, io_config: Arc, multithreaded_io: bool, io_stats: Option, @@ -285,6 +286,7 @@ pub(crate) fn read_csv_into_micropartition( remaining_rows, has_header, delimiter, + double_quote, io_client.clone(), io_stats.clone(), multithreaded_io, diff --git a/src/daft-micropartition/src/python.rs b/src/daft-micropartition/src/python.rs index 9e76142bbb..4f430eb237 100644 --- a/src/daft-micropartition/src/python.rs +++ b/src/daft-micropartition/src/python.rs @@ -353,6 +353,7 @@ impl PyMicroPartition { num_rows: Option, has_header: Option, delimiter: Option<&str>, + double_quote: Option, io_config: Option, multithreaded_io: Option, schema: Option, @@ -379,6 +380,7 @@ impl PyMicroPartition { num_rows, has_header.unwrap_or(true), delimiter, + double_quote.unwrap_or(true), io_config, multithreaded_io.unwrap_or(true), Some(io_stats), diff --git a/src/daft-plan/src/source_info/file_format.rs b/src/daft-plan/src/source_info/file_format.rs index 2bcb542f0e..60f6acc863 100644 --- a/src/daft-plan/src/source_info/file_format.rs +++ b/src/daft-plan/src/source_info/file_format.rs @@ -79,6 +79,7 @@ impl_bincode_py_state_serialization!(ParquetSourceConfig); pub struct CsvSourceConfig { pub delimiter: String, pub has_headers: bool, + pub double_quote: bool, pub buffer_size: Option, pub chunk_size: Option, } @@ -98,12 +99,14 @@ impl CsvSourceConfig { fn new( delimiter: String, has_headers: bool, + double_quote: bool, buffer_size: Option, chunk_size: Option, ) -> Self { Self { delimiter, has_headers, + double_quote, buffer_size, chunk_size, }