From ec629dd2069ef75112c3ff0e6d32e87df0863398 Mon Sep 17 00:00:00 2001 From: Brent Gardner Date: Fri, 22 Nov 2024 07:34:43 -0700 Subject: [PATCH 1/2] Working on test --- .../core/src/datasource/file_format/csv.rs | 56 ++++++++++++++++++- 1 file changed, 55 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 369534d620c7..aa03b218e3ce 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -538,12 +538,13 @@ mod tests { use crate::assert_batches_eq; use crate::datasource::file_format::file_compression_type::FileCompressionType; use crate::datasource::file_format::test_util::VariableStream; - use crate::datasource::listing::ListingOptions; + use crate::datasource::listing::{ListingOptions, ListingTableUrl}; use crate::physical_plan::collect; use crate::prelude::{CsvReadOptions, SessionConfig, SessionContext}; use crate::test_util::arrow_test_data; use arrow::compute::concat_batches; + use arrow_array::StringArray; use datafusion_common::cast::as_string_array; use datafusion_common::internal_err; use datafusion_common::stats::Precision; @@ -556,6 +557,59 @@ mod tests { use object_store::path::Path; use regex::Regex; use rstest::*; + use url::Url; + use datafusion_common::parsers::CompressionTypeVariant; + use datafusion_physical_plan::stream::RecordBatchStreamAdapter; + + #[tokio::test] + async fn write_multipart_csv_with_signature() { + // setup session + let config = SessionConfig::new().with_batch_size(2); + let session_ctx = SessionContext::new_with_config(config); + let task_ctx = session_ctx.task_ctx(); + + // setup sink + let str = "s3://eu-west-1_cgx-production-c4c-archive-data/cx/exports/team_id=555585/file9.csv"; + let url = Url::parse(str).unwrap(); + let table = ListingTableUrl::parse(str).unwrap(); + let cfg = FileSinkConfig { + object_store_url: url, + file_groups: vec![], + table_paths: vec![table], + output_schema: Arc::new(Schema { + fields: Fields::from(vec![ + Field::new("applicationname", DataType::Utf8, true), + ]), + metadata: Default::default(), + }), + table_partition_cols: vec![], + overwrite: false, + }; + let opts = CsvWriterOptions { + writer_options: Default::default(), + compression: CompressionTypeVariant::UNCOMPRESSED + }; + let sink = CsvSink::new(cfg, opts); + + // Generate data + let id_array = StringArray::from(vec!["hello", "world"]); + let schema = Schema::new(vec![ + Field::new("applicationname", DataType::Utf8, false) + ]); + let batch = RecordBatch::try_new( + Arc::new(schema), + vec![Arc::new(id_array)] + ).unwrap(); + let schema = batch.schema(); + let batches = Box::pin(RecordBatchStreamAdapter::new( + schema.clone(), + futures::stream::once(async { Ok(batch) }), + )) as SendableRecordBatchStream; + + // write the data + let res = sink.write_all(batches, &task_ctx).await.unwrap(); + assert_eq!(res, 0); + } #[tokio::test] async fn read_small_batches() -> Result<()> { From 44ab53abd6ca629b93db1ae7ea80e6fe6d4d0753 Mon Sep 17 00:00:00 2001 From: Brent Gardner Date: Fri, 22 Nov 2024 07:56:44 -0700 Subject: [PATCH 2/2] failing test case --- datafusion/core/Cargo.toml | 1 + .../core/src/datasource/file_format/csv.rs | 51 ++++++++++++------- 2 files changed, 34 insertions(+), 18 deletions(-) diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 7533e2cff198..8f2488389048 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -143,6 +143,7 @@ ctor = { workspace = true } doc-comment = { workspace = true } env_logger = { workspace = true } half = { workspace = true, default-features = true } +object_store = { workspace = true, features = ["aws", "http"] } paste = "^1.0" postgres-protocol = "0.6.4" postgres-types = { version = "0.2.4", features = ["derive", "with-chrono-0_4"] } diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index aa03b218e3ce..195c9b177445 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -542,6 +542,7 @@ mod tests { use crate::physical_plan::collect; use crate::prelude::{CsvReadOptions, SessionConfig, SessionContext}; use crate::test_util::arrow_test_data; + use std::iter::repeat; use arrow::compute::concat_batches; use arrow_array::StringArray; @@ -553,13 +554,15 @@ mod tests { use datafusion_expr::{col, lit}; use chrono::DateTime; + use datafusion_common::parsers::CompressionTypeVariant; + use datafusion_physical_plan::stream::RecordBatchStreamAdapter; + use object_store::aws::AmazonS3Builder; + use object_store::aws::Checksum; use object_store::local::LocalFileSystem; use object_store::path::Path; use regex::Regex; use rstest::*; use url::Url; - use datafusion_common::parsers::CompressionTypeVariant; - use datafusion_physical_plan::stream::RecordBatchStreamAdapter; #[tokio::test] async fn write_multipart_csv_with_signature() { @@ -568,18 +571,30 @@ mod tests { let session_ctx = SessionContext::new_with_config(config); let task_ctx = session_ctx.task_ctx(); + let bucket = "cgx-production-c4c-archive-data"; + let s3root = format!("s3://{bucket}"); + let store = AmazonS3Builder::from_env() + .with_bucket_name(bucket) + .with_checksum_algorithm(Checksum::SHA256) // fails for large text + .build() + .unwrap(); + let _ = session_ctx + .register_object_store(&Url::parse(&s3root).unwrap(), Arc::new(store)); + // setup sink - let str = "s3://eu-west-1_cgx-production-c4c-archive-data/cx/exports/team_id=555585/file9.csv"; - let url = Url::parse(str).unwrap(); + let str = format!("{s3root}/cx/exports/team_id=555585/file9.csv"); + let url = Url::parse(&str).unwrap(); let table = ListingTableUrl::parse(str).unwrap(); let cfg = FileSinkConfig { object_store_url: url, file_groups: vec![], table_paths: vec![table], output_schema: Arc::new(Schema { - fields: Fields::from(vec![ - Field::new("applicationname", DataType::Utf8, true), - ]), + fields: Fields::from(vec![Field::new( + "applicationname", + DataType::Utf8, + true, + )]), metadata: Default::default(), }), table_partition_cols: vec![], @@ -587,19 +602,19 @@ mod tests { }; let opts = CsvWriterOptions { writer_options: Default::default(), - compression: CompressionTypeVariant::UNCOMPRESSED + compression: CompressionTypeVariant::UNCOMPRESSED, }; let sink = CsvSink::new(cfg, opts); // Generate data - let id_array = StringArray::from(vec!["hello", "world"]); - let schema = Schema::new(vec![ - Field::new("applicationname", DataType::Utf8, false) - ]); - let batch = RecordBatch::try_new( - Arc::new(schema), - vec![Arc::new(id_array)] - ).unwrap(); + let text = "Hello, World!".to_string(); + let row_count = 1_000_000; + let values: Vec = repeat(text).take(row_count).collect(); + let id_array = StringArray::from(values); + let schema = + Schema::new(vec![Field::new("applicationname", DataType::Utf8, false)]); + let batch = + RecordBatch::try_new(Arc::new(schema), vec![Arc::new(id_array)]).unwrap(); let schema = batch.schema(); let batches = Box::pin(RecordBatchStreamAdapter::new( schema.clone(), @@ -607,8 +622,8 @@ mod tests { )) as SendableRecordBatchStream; // write the data - let res = sink.write_all(batches, &task_ctx).await.unwrap(); - assert_eq!(res, 0); + let num_rows = sink.write_all(batches, &task_ctx).await.unwrap(); + assert_eq!(num_rows, row_count); } #[tokio::test]