From f86309ecf0b4126fefa4bdf715d9327f77f3858d Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 16 Dec 2024 06:29:10 +0000 Subject: [PATCH] deploy: 97f8a792b66afbce805b1c5a4733d16ce88e602a --- api/help.html | 2 +- .../base_writer/data_file_writer/index.html | 2 +- api/settings.html | 2 +- .../base_writer/data_file_writer.rs.html | 236 +++++++++++++++++- 4 files changed, 230 insertions(+), 12 deletions(-) diff --git a/api/help.html b/api/help.html index b3855f23b..886bccc78 100644 --- a/api/help.html +++ b/api/help.html @@ -1 +1 @@ -Help

Rustdoc help

Back
\ No newline at end of file +Help

Rustdoc help

Back
\ No newline at end of file diff --git a/api/iceberg/writer/base_writer/data_file_writer/index.html b/api/iceberg/writer/base_writer/data_file_writer/index.html index 85d419955..65a97f7ff 100644 --- a/api/iceberg/writer/base_writer/data_file_writer/index.html +++ b/api/iceberg/writer/base_writer/data_file_writer/index.html @@ -1,2 +1,2 @@ -iceberg::writer::base_writer::data_file_writer - Rust

Module iceberg::writer::base_writer::data_file_writer

source ·
Expand description

This module provide DataFileWriter.

+iceberg::writer::base_writer::data_file_writer - Rust

Module iceberg::writer::base_writer::data_file_writer

source ·
Expand description

This module provide DataFileWriter.

Structs§

\ No newline at end of file diff --git a/api/settings.html b/api/settings.html index 0b4d7eab8..430a91621 100644 --- a/api/settings.html +++ b/api/settings.html @@ -1 +1 @@ -Settings

Rustdoc settings

Back
\ No newline at end of file +Settings

Rustdoc settings

Back
\ No newline at end of file diff --git a/api/src/iceberg/writer/base_writer/data_file_writer.rs.html b/api/src/iceberg/writer/base_writer/data_file_writer.rs.html index 9dce8a709..c49c23e2a 100644 --- a/api/src/iceberg/writer/base_writer/data_file_writer.rs.html +++ b/api/src/iceberg/writer/base_writer/data_file_writer.rs.html @@ -139,6 +139,115 @@ 139 140 141 +142 +143 +144 +145 +146 +147 +148 +149 +150 +151 +152 +153 +154 +155 +156 +157 +158 +159 +160 +161 +162 +163 +164 +165 +166 +167 +168 +169 +170 +171 +172 +173 +174 +175 +176 +177 +178 +179 +180 +181 +182 +183 +184 +185 +186 +187 +188 +189 +190 +191 +192 +193 +194 +195 +196 +197 +198 +199 +200 +201 +202 +203 +204 +205 +206 +207 +208 +209 +210 +211 +212 +213 +214 +215 +216 +217 +218 +219 +220 +221 +222 +223 +224 +225 +226 +227 +228 +229 +230 +231 +232 +233 +234 +235 +236 +237 +238 +239 +240 +241 +242 +243 +244 +245 +246 +247 +248 +249 +250
// Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -241,16 +350,21 @@
 mod test {
     use std::sync::Arc;
 
+    use arrow_array::{Int32Array, StringArray};
+    use arrow_schema::{DataType, Field};
+    use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
     use parquet::file::properties::WriterProperties;
     use tempfile::TempDir;
 
     use crate::io::FileIOBuilder;
-    use crate::spec::{DataContentType, DataFileFormat, Schema, Struct};
+    use crate::spec::{
+        DataContentType, DataFileFormat, Literal, NestedField, PrimitiveType, Schema, Struct, Type,
+    };
     use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder;
     use crate::writer::file_writer::location_generator::test::MockLocationGenerator;
     use crate::writer::file_writer::location_generator::DefaultFileNameGenerator;
     use crate::writer::file_writer::ParquetWriterBuilder;
-    use crate::writer::{IcebergWriter, IcebergWriterBuilder};
+    use crate::writer::{IcebergWriter, IcebergWriterBuilder, RecordBatch};
     use crate::Result;
 
     #[tokio::test]
@@ -262,20 +376,124 @@
         let file_name_gen =
             DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
 
+        let schema = Schema::builder()
+            .with_schema_id(3)
+            .with_fields(vec![
+                NestedField::required(3, "foo", Type::Primitive(PrimitiveType::Int)).into(),
+                NestedField::required(4, "bar", Type::Primitive(PrimitiveType::String)).into(),
+            ])
+            .build()?;
+
         let pw = ParquetWriterBuilder::new(
             WriterProperties::builder().build(),
-            Arc::new(Schema::builder().build().unwrap()),
+            Arc::new(schema),
+            file_io.clone(),
+            location_gen,
+            file_name_gen,
+        );
+
+        let mut data_file_writer = DataFileWriterBuilder::new(pw, None).build().await.unwrap();
+
+        let data_files = data_file_writer.close().await.unwrap();
+        assert_eq!(data_files.len(), 1);
+
+        let data_file = &data_files[0];
+        assert_eq!(data_file.file_format, DataFileFormat::Parquet);
+        assert_eq!(data_file.content, DataContentType::Data);
+        assert_eq!(data_file.partition, Struct::empty());
+
+        let input_file = file_io.new_input(data_file.file_path.clone())?;
+        let input_content = input_file.read().await?;
+
+        let parquet_reader =
+            ArrowReaderMetadata::load(&input_content, ArrowReaderOptions::default())
+                .expect("Failed to load Parquet metadata");
+
+        let field_ids: Vec<i32> = parquet_reader
+            .parquet_schema()
+            .columns()
+            .iter()
+            .map(|col| col.self_type().get_basic_info().id())
+            .collect();
+
+        assert_eq!(field_ids, vec![3, 4]);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_parquet_writer_with_partition() -> Result<()> {
+        let temp_dir = TempDir::new().unwrap();
+        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
+        let location_gen =
+            MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string());
+        let file_name_gen = DefaultFileNameGenerator::new(
+            "test_partitioned".to_string(),
+            None,
+            DataFileFormat::Parquet,
+        );
+
+        let schema = Schema::builder()
+            .with_schema_id(5)
+            .with_fields(vec![
+                NestedField::required(5, "id", Type::Primitive(PrimitiveType::Int)).into(),
+                NestedField::required(6, "name", Type::Primitive(PrimitiveType::String)).into(),
+            ])
+            .build()?;
+
+        let partition_value = Struct::from_iter([Some(Literal::int(1))]);
+
+        let parquet_writer_builder = ParquetWriterBuilder::new(
+            WriterProperties::builder().build(),
+            Arc::new(schema.clone()),
             file_io.clone(),
             location_gen,
             file_name_gen,
         );
-        let mut data_file_writer = DataFileWriterBuilder::new(pw, None).build().await?;
 
-        let data_file = data_file_writer.close().await.unwrap();
-        assert_eq!(data_file.len(), 1);
-        assert_eq!(data_file[0].file_format, DataFileFormat::Parquet);
-        assert_eq!(data_file[0].content, DataContentType::Data);
-        assert_eq!(data_file[0].partition, Struct::empty());
+        let mut data_file_writer =
+            DataFileWriterBuilder::new(parquet_writer_builder, Some(partition_value.clone()))
+                .build()
+                .await?;
+
+        let arrow_schema = arrow_schema::Schema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("name", DataType::Utf8, false),
+        ]);
+        let batch = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![
+            Arc::new(Int32Array::from(vec![1, 2, 3])),
+            Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])),
+        ])?;
+        data_file_writer.write(batch).await?;
+
+        let data_files = data_file_writer.close().await.unwrap();
+        assert_eq!(data_files.len(), 1);
+
+        let data_file = &data_files[0];
+        assert_eq!(data_file.file_format, DataFileFormat::Parquet);
+        assert_eq!(data_file.content, DataContentType::Data);
+        assert_eq!(data_file.partition, partition_value);
+
+        let input_file = file_io.new_input(data_file.file_path.clone())?;
+        let input_content = input_file.read().await?;
+
+        let parquet_reader =
+            ArrowReaderMetadata::load(&input_content, ArrowReaderOptions::default())?;
+
+        let field_ids: Vec<i32> = parquet_reader
+            .parquet_schema()
+            .columns()
+            .iter()
+            .map(|col| col.self_type().get_basic_info().id())
+            .collect();
+        assert_eq!(field_ids, vec![5, 6]);
+
+        let field_names: Vec<&str> = parquet_reader
+            .parquet_schema()
+            .columns()
+            .iter()
+            .map(|col| col.name())
+            .collect();
+        assert_eq!(field_names, vec!["id", "name"]);
 
         Ok(())
     }