diff --git a/crates/deltalake-core/src/operations/write.rs b/crates/deltalake-core/src/operations/write.rs index 2ef9afa4af..5230fd09dd 100644 --- a/crates/deltalake-core/src/operations/write.rs +++ b/crates/deltalake-core/src/operations/write.rs @@ -115,8 +115,8 @@ pub struct WriteBuilder { name: Option, /// Description of the table, only used when table doesn't exist yet description: Option, - // /// Configurations of the delta table, only used when table doesn't exist - // configuration: Option>>, + /// Configurations of the delta table, only used when table doesn't exist + configuration: HashMap>, } impl WriteBuilder { @@ -139,7 +139,7 @@ impl WriteBuilder { app_metadata: None, name: None, description: None, - // configuration: None, + configuration: Default::default(), } } @@ -236,17 +236,17 @@ impl WriteBuilder { self } - // /// Set configuration on created table - // pub fn with_configuration( - // mut self, - // configuration: impl IntoIterator, Option>)>, - // ) -> Self { - // self.configuration = configuration - // .into_iter() - // .map(|(k, v)| (k.into(), v.map(|s| s.into()))) - // .collect(); - // self - // } + /// Set configuration on created table + pub fn with_configuration( + mut self, + configuration: impl IntoIterator, Option>)>, + ) -> Self { + self.configuration = configuration + .into_iter() + .map(|(k, v)| (k.into(), v.map(|s| s.into()))) + .collect(); + self + } async fn check_preconditions(&self) -> DeltaResult> { match self.log_store.is_delta_table_location().await? { @@ -272,7 +272,8 @@ impl WriteBuilder { }?; let mut builder = CreateBuilder::new() .with_log_store(self.log_store.clone()) - .with_columns(schema.fields().clone()); + .with_columns(schema.fields().clone()) + .with_configuration(self.configuration.clone()); if let Some(partition_columns) = self.partition_columns.as_ref() { builder = builder.with_partition_columns(partition_columns.clone()) } @@ -285,10 +286,6 @@ impl WriteBuilder { builder = builder.with_comment(desc.clone()); }; - // if let Some(config) = self.configuration.as_ref() { - // builder = builder.with_configuration(config.clone()); - // }; - let (_, actions, _) = builder.into_table_and_actions()?; Ok(actions) } diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 1989e39e8c..2a4f77993c 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -149,7 +149,7 @@ def write_to_deltalake( overwrite_schema: bool, name: Optional[str], description: Optional[str], - _configuration: Optional[Mapping[str, Optional[str]]], + configuration: Optional[Mapping[str, Optional[str]]], storage_options: Optional[Dict[str, str]], ) -> None: ... def batch_distinct(batch: pyarrow.RecordBatch) -> pyarrow.RecordBatch: ... diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 35323f9a49..b1e301b39e 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -125,7 +125,7 @@ def write_deltalake( table_or_uri: URI of a table or a DeltaTable object. data: Data to write. If passing iterable, the schema must also be given. schema: Optional schema to write. - partition_by: List of columns to partition the table by. Only required + partition_by: List of columns to partition the table by. Only required when creating a new table. filesystem: Optional filesystem to pass to PyArrow. If not provided will be inferred from uri. The file system has to be rooted in the table root. @@ -137,20 +137,20 @@ def write_deltalake( file_options: Optional write options for Parquet (ParquetFileWriteOptions). Can be provided with defaults using ParquetFileWriteOptions().make_write_options(). Please refer to https://github.com/apache/arrow/blob/master/python/pyarrow/_dataset_parquet.pyx#L492-L533 - for the list of available options - max_partitions: the maximum number of partitions that will be used. + for the list of available options. Only used in pyarrow engine. + max_partitions: the maximum number of partitions that will be used. Only used in pyarrow engine. max_open_files: Limits the maximum number of files that can be left open while writing. If an attempt is made to open too many files then the least recently used file will be closed. If this setting is set too low you may end up fragmenting your - data into many small files. + data into many small files. Only used in pyarrow engine. max_rows_per_file: Maximum number of rows per file. If greater than 0 then this will limit how many rows are placed in any single file. Otherwise there will be no limit and one file will be created in each output directory unless files need to be closed to respect max_open_files min_rows_per_group: Minimum number of rows per group. When the value is set, the dataset writer will batch incoming data and only write the row groups to the disk - when sufficient rows have accumulated. + when sufficient rows have accumulated. Only used in pyarrow engine. max_rows_per_group: Maximum number of rows per group. If the value is set, then the dataset writer may split up large incoming batches into multiple row groups. If this value is set, then min_rows_per_group should also be set. @@ -159,7 +159,7 @@ def write_deltalake( configuration: A map containing configuration options for the metadata action. overwrite_schema: If True, allows updating the schema of the table. storage_options: options passed to the native delta filesystem. Unused if 'filesystem' is defined. - partition_filters: the partition filters that will be used for partition overwrite. + partition_filters: the partition filters that will be used for partition overwrite. Only used in pyarrow engine. large_dtypes: If True, the table schema is checked against large_dtypes """ table, table_uri = try_get_table_and_table_uri(table_or_uri, storage_options) @@ -174,7 +174,12 @@ def write_deltalake( if table: table.update_incremental() - + if table.protocol().min_writer_version > MAX_SUPPORTED_WRITER_VERSION: + raise DeltaProtocolError( + "This table's min_writer_version is " + f"{table.protocol().min_writer_version}, " + "but this method only supports version 2." + ) if engine == "rust": if table is not None and mode == "ignore": return @@ -216,7 +221,7 @@ def write_deltalake( overwrite_schema=overwrite_schema, name=name, description=description, - _configuration=configuration, + configuration=configuration, storage_options=storage_options, ) if table: @@ -264,12 +269,6 @@ def write_deltalake( else: partition_by = table.metadata().partition_columns - if table.protocol().min_writer_version > MAX_SUPPORTED_WRITER_VERSION: - raise DeltaProtocolError( - "This table's min_writer_version is " - f"{table.protocol().min_writer_version}, " - "but this method only supports version 2." - ) else: # creating a new table current_version = -1 diff --git a/python/src/lib.rs b/python/src/lib.rs index c22fb408f0..eddd742951 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1146,7 +1146,7 @@ fn write_to_deltalake( partition_by: Option>, name: Option, description: Option, - _configuration: Option>>, + configuration: Option>>, storage_options: Option>, ) -> PyResult<()> { let batches = data.0.map(|batch| batch.unwrap()).collect::>(); @@ -1177,6 +1177,10 @@ fn write_to_deltalake( builder = builder.with_description(description); }; + if let Some(config) = configuration { + builder = builder.with_configuration(config); + }; + rt()? .block_on(builder.into_future()) .map_err(PythonError::from)?; diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index 797b9365a9..808a2f9563 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -145,7 +145,7 @@ def test_update_schema(existing_table: DeltaTable): assert existing_table.schema().to_pyarrow() == new_data.schema -def test_update_schema_rust_writer(existing_table: DeltaTable): # Test fails +def test_update_schema_rust_writer(existing_table: DeltaTable): new_data = pa.table({"x": pa.array([1, 2, 3])}) with pytest.raises(DeltaError): @@ -156,6 +156,7 @@ def test_update_schema_rust_writer(existing_table: DeltaTable): # Test fails overwrite_schema=True, engine="rust", ) + # TODO(ion): Remove this once we add schema overwrite support with pytest.raises(NotImplementedError): write_deltalake( existing_table, @@ -189,33 +190,15 @@ def test_local_path( assert table == sample_data -def test_roundtrip_metadata_rust(tmp_path: pathlib.Path, sample_data: pa.Table): - write_deltalake( - tmp_path, - sample_data, - name="test_name", - description="test_desc", - configuration={"delta.appendOnly": "false", "foo": "bar"}, - engine="rust", - ) - - delta_table = DeltaTable(tmp_path) - - metadata = delta_table.metadata() - - assert metadata.name == "test_name" - assert metadata.description == "test_desc" - # assert metadata.configuration == {"delta.appendOnly": "false", "foo": "bar"} - - -def test_roundtrip_metadata(tmp_path: pathlib.Path, sample_data: pa.Table): +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) +def test_roundtrip_metadata(tmp_path: pathlib.Path, sample_data: pa.Table, engine): write_deltalake( tmp_path, sample_data, name="test_name", description="test_desc", configuration={"delta.appendOnly": "false", "foo": "bar"}, - engine="pyarrow", + engine=engine, ) delta_table = DeltaTable(tmp_path) @@ -300,30 +283,17 @@ def test_roundtrip_multi_partitioned( assert add_path.count("/") == 2 -def test_write_modes(tmp_path: pathlib.Path, sample_data: pa.Table): - write_deltalake(tmp_path, sample_data) - assert DeltaTable(tmp_path).to_pyarrow_table() == sample_data - - with pytest.raises(AssertionError): - write_deltalake(tmp_path, sample_data, mode="error") - - write_deltalake(tmp_path, sample_data, mode="ignore") - assert ("0" * 19 + "1.json") not in os.listdir(tmp_path / "_delta_log") - - write_deltalake(tmp_path, sample_data, mode="append") - expected = pa.concat_tables([sample_data, sample_data]) - assert DeltaTable(tmp_path).to_pyarrow_table() == expected - - write_deltalake(tmp_path, sample_data, mode="overwrite") - assert DeltaTable(tmp_path).to_pyarrow_table() == sample_data - - -def test_write_modes_rust(tmp_path: pathlib.Path, sample_data: pa.Table): - write_deltalake(tmp_path, sample_data) +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) +def test_write_modes(tmp_path: pathlib.Path, sample_data: pa.Table, engine): + write_deltalake(tmp_path, sample_data, engine=engine) assert DeltaTable(tmp_path).to_pyarrow_table() == sample_data - with pytest.raises(DeltaError): - write_deltalake(tmp_path, sample_data, mode="error", engine="rust") + if engine == "pyarrow": + with pytest.raises(AssertionError): + write_deltalake(tmp_path, sample_data, mode="error") + elif engine == "rust": + with pytest.raises(DeltaError): + write_deltalake(tmp_path, sample_data, mode="error", engine="rust") write_deltalake(tmp_path, sample_data, mode="ignore", engine="rust") assert ("0" * 19 + "1.json") not in os.listdir(tmp_path / "_delta_log") @@ -336,15 +306,18 @@ def test_write_modes_rust(tmp_path: pathlib.Path, sample_data: pa.Table): assert DeltaTable(tmp_path).to_pyarrow_table() == sample_data +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) def test_append_only_should_append_only_with_the_overwrite_mode( # Create rust equivalent rust - tmp_path: pathlib.Path, sample_data: pa.Table + tmp_path: pathlib.Path, sample_data: pa.Table, engine ): config = {"delta.appendOnly": "true"} - write_deltalake(tmp_path, sample_data, mode="append", configuration=config) + write_deltalake( + tmp_path, sample_data, mode="append", configuration=config, engine=engine + ) table = DeltaTable(tmp_path) - write_deltalake(table, sample_data, mode="append") + write_deltalake(table, sample_data, mode="append", engine=engine) data_store_types = [tmp_path, table] fail_modes = ["overwrite", "ignore", "error"] @@ -357,7 +330,7 @@ def test_append_only_should_append_only_with_the_overwrite_mode( # Create rust f" 'append'. Mode is currently {mode}" ), ): - write_deltalake(data_store_type, sample_data, mode=mode) + write_deltalake(data_store_type, sample_data, mode=mode, engine=engine) expected = pa.concat_tables([sample_data, sample_data]) @@ -371,24 +344,31 @@ def test_writer_with_table(existing_table: DeltaTable, sample_data: pa.Table, en assert existing_table.to_pyarrow_table() == sample_data -def test_fails_wrong_partitioning(existing_table: DeltaTable, sample_data: pa.Table): - with pytest.raises(AssertionError): - write_deltalake( - existing_table, sample_data, mode="append", partition_by="int32" - ) - - -def test_fails_wrong_partitioning_rust_writer( - existing_table: DeltaTable, sample_data: pa.Table +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) +def test_fails_wrong_partitioning( + existing_table: DeltaTable, sample_data: pa.Table, engine ): - with pytest.raises(DeltaError): - write_deltalake( - existing_table, - sample_data, - mode="append", - partition_by="int32", - engine="rust", - ) + if engine == "pyarrow": + with pytest.raises(AssertionError): + write_deltalake( + existing_table, + sample_data, + mode="append", + partition_by="int32", + engine=engine, + ) + elif engine == "rust": + with pytest.raises( + DeltaError, + match='Generic error: Specified table partitioning does not match table partitioning: expected: [], got: ["int32"]', + ): + write_deltalake( + existing_table, + sample_data, + mode="append", + partition_by="int32", + engine=engine, + ) @pytest.mark.parametrize("engine", ["pyarrow", "rust"]) @@ -556,7 +536,7 @@ def test_writer_null_stats(tmp_path: pathlib.Path, engine: Literal["pyarrow", "r assert stats["nullCount"] == expected_nulls -@pytest.mark.parametrize("engine", ["pyarrow"]) # This one is broken +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) def test_writer_fails_on_protocol( existing_table: DeltaTable, sample_data: pa.Table,