diff --git a/.github/workflows/python_build.yml b/.github/workflows/python_build.yml index 51626310be..bc2f20cc9a 100644 --- a/.github/workflows/python_build.yml +++ b/.github/workflows/python_build.yml @@ -127,11 +127,6 @@ jobs: python -m pytest -m "not pandas and not integration and not benchmark" pip install pandas - - name: Build Sphinx documentation - run: | - source venv/bin/activate - make build-documentation - benchmark: name: Python Benchmark runs-on: ubuntu-latest diff --git a/crates/deltalake-core/src/operations/optimize.rs b/crates/deltalake-core/src/operations/optimize.rs index ef8905e0c9..09f9de087e 100644 --- a/crates/deltalake-core/src/operations/optimize.rs +++ b/crates/deltalake-core/src/operations/optimize.rs @@ -900,10 +900,14 @@ fn build_compaction_plan( // Prune merge bins with only 1 file, since they have no effect for (_, bins) in operations.iter_mut() { - if bins.len() == 1 && bins[0].len() == 1 { - metrics.total_files_skipped += 1; - bins.clear(); - } + bins.retain(|bin| { + if bin.len() == 1 { + metrics.total_files_skipped += 1; + false + } else { + true + } + }) } operations.retain(|_, files| !files.is_empty()); diff --git a/crates/deltalake-core/tests/command_optimize.rs b/crates/deltalake-core/tests/command_optimize.rs index 14f9d4c410..b91558ce08 100644 --- a/crates/deltalake-core/tests/command_optimize.rs +++ b/crates/deltalake-core/tests/command_optimize.rs @@ -508,6 +508,71 @@ async fn test_idempotent_metrics() -> Result<(), Box> { Ok(()) } +#[tokio::test] +/// Validate that multiple bins packing is idempotent. +async fn test_idempotent_with_multiple_bins() -> Result<(), Box> { + //TODO: Compression makes it hard to get the target file size... + //Maybe just commit files with a known size + let context = setup_test(true).await?; + let mut dt = context.table; + let mut writer = RecordBatchWriter::for_table(&dt)?; + + write( + &mut writer, + &mut dt, + generate_random_batch(records_for_size(6_000_000), "2022-05-22")?, + ) + .await?; + write( + &mut writer, + &mut dt, + generate_random_batch(records_for_size(3_000_000), "2022-05-22")?, + ) + .await?; + write( + &mut writer, + &mut dt, + generate_random_batch(records_for_size(6_000_000), "2022-05-22")?, + ) + .await?; + write( + &mut writer, + &mut dt, + generate_random_batch(records_for_size(3_000_000), "2022-05-22")?, + ) + .await?; + write( + &mut writer, + &mut dt, + generate_random_batch(records_for_size(9_900_000), "2022-05-22")?, + ) + .await?; + + let version = dt.version(); + + let filter = vec![PartitionFilter::try_from(("date", "=", "2022-05-22"))?]; + + let optimize = DeltaOps(dt) + .optimize() + .with_filters(&filter) + .with_target_size(10_000_000); + let (dt, metrics) = optimize.await?; + assert_eq!(metrics.num_files_added, 2); + assert_eq!(metrics.num_files_removed, 4); + assert_eq!(dt.version(), version + 1); + + let optimize = DeltaOps(dt) + .optimize() + .with_filters(&filter) + .with_target_size(10_000_000); + let (dt, metrics) = optimize.await?; + assert_eq!(metrics.num_files_added, 0); + assert_eq!(metrics.num_files_removed, 0); + assert_eq!(dt.version(), version + 1); + + Ok(()) +} + #[tokio::test] /// Validate operation data and metadata was written async fn test_commit_info() -> Result<(), Box> { diff --git a/docs/api/catalog.md b/docs/api/catalog.md new file mode 100644 index 0000000000..d75dd648db --- /dev/null +++ b/docs/api/catalog.md @@ -0,0 +1 @@ +::: deltalake.data_catalog.DataCatalog \ No newline at end of file diff --git a/docs/api/delta_table.md b/docs/api/delta_table.md deleted file mode 100644 index 75284664fe..0000000000 --- a/docs/api/delta_table.md +++ /dev/null @@ -1,10 +0,0 @@ -# DeltaTable - -::: deltalake.table - options: - show_root_heading: false - -## Writing Delta Tables - -::: deltalake.write_deltalake - diff --git a/docs/api/delta_table/delta_table_merger.md b/docs/api/delta_table/delta_table_merger.md new file mode 100644 index 0000000000..7d707a16f2 --- /dev/null +++ b/docs/api/delta_table/delta_table_merger.md @@ -0,0 +1,5 @@ +# TableMerger + +::: deltalake.table.TableMerger + options: + show_root_heading: true \ No newline at end of file diff --git a/docs/api/delta_table/delta_table_optimizer.md b/docs/api/delta_table/delta_table_optimizer.md new file mode 100644 index 0000000000..2275cbd0ca --- /dev/null +++ b/docs/api/delta_table/delta_table_optimizer.md @@ -0,0 +1,5 @@ +# TableOptimizer + +::: deltalake.table.TableOptimizer + options: + show_root_heading: true \ No newline at end of file diff --git a/docs/api/delta_table/index.md b/docs/api/delta_table/index.md new file mode 100644 index 0000000000..46a65af8d3 --- /dev/null +++ b/docs/api/delta_table/index.md @@ -0,0 +1,5 @@ +# DeltaTable + +::: deltalake.DeltaTable + options: + show_root_heading: true \ No newline at end of file diff --git a/docs/api/delta_table/metadata.md b/docs/api/delta_table/metadata.md new file mode 100644 index 0000000000..92ff62370a --- /dev/null +++ b/docs/api/delta_table/metadata.md @@ -0,0 +1,6 @@ +# Metadata + +::: deltalake.Metadata + options: + show_root_heading: true + diff --git a/docs/api/delta_writer.md b/docs/api/delta_writer.md new file mode 100644 index 0000000000..71c31534b0 --- /dev/null +++ b/docs/api/delta_writer.md @@ -0,0 +1,7 @@ +# Writer +## Write to Delta Tables + +::: deltalake.write_deltalake + +## Convert to Delta Tables +::: deltalake.convert_to_deltalake \ No newline at end of file diff --git a/docs/api/exceptions.md b/docs/api/exceptions.md new file mode 100644 index 0000000000..afe99f92f1 --- /dev/null +++ b/docs/api/exceptions.md @@ -0,0 +1,6 @@ +# Exceptions + +::: deltalake.exceptions.DeltaError +::: deltalake.exceptions.DeltaProtocolError +::: deltalake.exceptions.TableNotFoundError +::: deltalake.exceptions.CommitFailedError diff --git a/docs/api/schema.md b/docs/api/schema.md index 9a91f61062..9a6ba7b2e6 100644 --- a/docs/api/schema.md +++ b/docs/api/schema.md @@ -1,28 +1,29 @@ -## Delta Lake Schemas - +## Schema and field Schemas, fields, and data types are provided in the ``deltalake.schema`` submodule. -::: deltalake.schema.Schema +::: deltalake.Schema options: show_root_heading: true show_root_toc_entry: true -::: deltalake.schema.PrimitiveType +::: deltalake.Field options: show_root_heading: true show_root_toc_entry: true -::: deltalake.schema.ArrayType + +## Data types +::: deltalake.schema.PrimitiveType options: show_root_heading: true show_root_toc_entry: true -::: deltalake.schema.MapType +::: deltalake.schema.ArrayType options: show_root_heading: true show_root_toc_entry: true -::: deltalake.schema.Field +::: deltalake.schema.MapType options: show_root_heading: true show_root_toc_entry: true @@ -30,10 +31,4 @@ Schemas, fields, and data types are provided in the ``deltalake.schema`` submodu ::: deltalake.schema.StructType options: show_root_heading: true - show_root_toc_entry: true - -::: deltalake.data_catalog - -## Delta Storage Handler - -::: deltalake.fs + show_root_toc_entry: true \ No newline at end of file diff --git a/docs/api/storage.md b/docs/api/storage.md index 77fd28c81a..ddb18250cf 100644 --- a/docs/api/storage.md +++ b/docs/api/storage.md @@ -1,3 +1,5 @@ -## Delta Storage Handler +# Storage -::: deltalake.fs +The delta filesystem handler for the pyarrow engine writer. + +::: deltalake.fs.DeltaStorageHandler diff --git a/docs/usage/appending-overwriting-delta-lake-table.md b/docs/usage/appending-overwriting-delta-lake-table.md index 0930d8da1e..397edb9d0d 100644 --- a/docs/usage/appending-overwriting-delta-lake-table.md +++ b/docs/usage/appending-overwriting-delta-lake-table.md @@ -63,7 +63,7 @@ Here are the contents of the Delta table after the overwrite operation: Overwriting just performs a logical delete. It doesn't physically remove the previous data from storage. Time travel back to the previous version to confirm that the old version of the table is still accessable. -``` +```python dt = DeltaTable("tmp/some-table", version=1) +-------+----------+ diff --git a/docs/usage/optimize/small-file-compaction-with-optimize.md b/docs/usage/optimize/small-file-compaction-with-optimize.md index ece15deea4..78d8778ff5 100644 --- a/docs/usage/optimize/small-file-compaction-with-optimize.md +++ b/docs/usage/optimize/small-file-compaction-with-optimize.md @@ -16,7 +16,7 @@ Let’s start by creating a Delta table with a lot of small files so we can demo Start by writing a function that generates on thousand rows of random data given a timestamp. -``` +```python def record_observations(date: datetime) -> pa.Table: """Pulls data for a certain datetime""" nrows = 1000 @@ -31,7 +31,7 @@ def record_observations(date: datetime) -> pa.Table: Let’s run this function and observe the output: -``` +```python record_observations(datetime(2021, 1, 1, 12)).to_pandas() date timestamp value @@ -44,7 +44,7 @@ record_observations(datetime(2021, 1, 1, 12)).to_pandas() Let’s write 100 hours worth of data to the Delta table. -``` +```python # Every hour starting at midnight on 2021-01-01 hours_iter = (datetime(2021, 1, 1) + timedelta(hours=i) for i in itertools.count()) @@ -60,7 +60,7 @@ for timestamp in itertools.islice(hours_iter, 100): This data was appended to the Delta table in 100 separate transactions, so the table will contain 100 transaction log entries and 100 data files. You can see the number of files with the `files()` method. -``` +```python dt = DeltaTable("observation_data") len(dt.files()) # 100 ``` @@ -101,7 +101,7 @@ Each of these Parquet files are tiny - they’re only 10 KB. Let’s see how to Let’s run the optimize command to compact the existing small files into larger files: -``` +```python dt = DeltaTable("observation_data") dt.optimize() @@ -109,7 +109,7 @@ dt.optimize() Here’s the output of the command: -``` +```python {'numFilesAdded': 5, 'numFilesRemoved': 100, 'filesAdded': {'min': 39000, @@ -137,7 +137,7 @@ Let’s append some more data to the Delta table and see how we can selectively Let’s append another 24 hours of data to the Delta table: -``` +```python for timestamp in itertools.islice(hours_iter, 24): write_deltalake( dt, @@ -149,7 +149,7 @@ for timestamp in itertools.islice(hours_iter, 24): We can use `get_add_actions()` to introspect the table state. We can see that `2021-01-06` has only a few hours of data so far, so we don't want to optimize that yet. But `2021-01-05` has all 24 hours of data, so it's ready to be optimized. -``` +```python dt.get_add_actions(flatten=True).to_pandas()[ "partition.date" ].value_counts().sort_index() @@ -164,7 +164,7 @@ dt.get_add_actions(flatten=True).to_pandas()[ To optimize a single partition, you can pass in a `partition_filters` argument speficying which partitions to optimize. -``` +```python dt.optimize(partition_filters=[("date", "=", "2021-01-05")]) {'numFilesAdded': 1, @@ -188,7 +188,7 @@ dt.optimize(partition_filters=[("date", "=", "2021-01-05")]) This optimize operation tombstones 21 small data files and adds one file with all the existing data properly condensed. Let’s take a look a portion of the `_delta_log/00000000000000000125.json` file, which is the transaction log entry that corresponds with this incremental optimize command. -``` +```python { "remove": { "path": "date=2021-01-05/part-00000-41178aab-2491-488f-943d-8f03867295ee-c000.snappy.parquet", @@ -248,13 +248,13 @@ It’s normally a good idea to have a retention period of at least 7 days. For Let’s run the vacuum command: -``` +```python dt.vacuum(retention_hours=0, enforce_retention_duration=False, dry_run=False) ``` The command returns a list of all the files that are removed from storage: -``` +```python ['date=2021-01-02/39-a98680f2-0e0e-4f26-a491-18b183f9eb05-0.parquet', 'date=2021-01-02/41-e96bc8bb-c571-484c-b534-e897424fb7da-0.parquet', … diff --git a/mkdocs.yml b/mkdocs.yml index 97b6e91b0e..a86257c932 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -10,10 +10,14 @@ theme: features: - navigation.tracking - navigation.instant + - navigation.expand - navigation.tabs + - navigation.indexes - navigation.tabs.sticky - navigation.footer - content.tabs.link + - content.code.annotation + - content.code.copy nav: - Home: index.md - Usage: @@ -31,11 +35,17 @@ nav: - Small file compaction: usage/optimize/small-file-compaction-with-optimize.md - Z Order: usage/optimize/delta-lake-z-order.md - API Reference: - - api/delta_table.md + - api/delta_writer.md + - Table: + - api/delta_table/index.md + - api/delta_table/metadata.md + - api/delta_table/delta_table_merger.md + - api/delta_table/delta_table_optimizer.md - api/schema.md - api/storage.md + - api/catalog.md + - api/exceptions.md - Integrations: - - Arrow: integrations/delta-lake-arrow.md - pandas: integrations/delta-lake-pandas.md not_in_nav: | /_build/ @@ -61,7 +71,7 @@ plugins: show_source: false show_symbol_type_in_heading: true show_signature_annotations: true - show_root_heading: false + show_root_heading: true show_root_full_path: true separate_signature: true docstring_options: @@ -81,6 +91,11 @@ plugins: on_page_markdown: 'docs._build.hooks:on_page_markdown' markdown_extensions: + - pymdownx.highlight: + anchor_linenums: true + line_spans: __span + pygments_lang_class: true + - pymdownx.inlinehilite - admonition - pymdownx.details - attr_list @@ -97,4 +112,9 @@ markdown_extensions: - footnotes extra: - python_api_url: https://delta-io.github.io/delta-rs/api/ \ No newline at end of file + python_api_url: https://delta-io.github.io/delta-rs/api/ + generator: false + social: + - icon: fontawesome/brands/slack + link: https://go.delta.io/slack + name: Delta slack channel \ No newline at end of file diff --git a/python/Cargo.toml b/python/Cargo.toml index 5194a2fc22..a9936a483c 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-python" -version = "0.13.0" +version = "0.14.0" authors = ["Qingping Hou ", "Will Jones "] homepage = "https://github.com/delta-io/delta-rs" license = "Apache-2.0" diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index d7c0e1a8f9..e1f5288b81 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -210,36 +210,39 @@ class PrimitiveType: The JSON representation for a primitive type is just a quoted string: `PrimitiveType.from_json('"integer"')` Args: - json: A JSON string + json: a JSON string - Returns a [PrimitiveType][deltalake.schema.PrimitiveType] type + Returns: + a PrimitiveType type """ def to_pyarrow(self) -> pyarrow.DataType: """Get the equivalent PyArrow type (pyarrow.DataType)""" @staticmethod def from_pyarrow(type: pyarrow.DataType) -> PrimitiveType: - """Create a [PrimitiveType][deltalake.schema.PrimitiveType] from a PyArrow type + """Create a PrimitiveType from a PyArrow datatype Will raise `TypeError` if the PyArrow type is not a primitive type. Args: - type: A PyArrow [DataType][pyarrow.DataType] type + type: A PyArrow DataType - Returns: a [PrimitiveType][deltalake.schema.PrimitiveType] type + Returns: + a PrimitiveType """ class ArrayType: """An Array (List) DataType - Can either pass the element type explicitly or can pass a string - if it is a primitive type: - ``` - ArrayType(PrimitiveType("integer")) - # Returns ArrayType(PrimitiveType("integer"), contains_null=True) + Example: + Can either pass the element type explicitly or can pass a string + if it is a primitive type: + ```python + ArrayType(PrimitiveType("integer")) + # Returns ArrayType(PrimitiveType("integer"), contains_null=True) - ArrayType("integer", contains_null=False) - # Returns ArrayType(PrimitiveType("integer"), contains_null=False) - ``` + ArrayType("integer", contains_null=False) + # Returns ArrayType(PrimitiveType("integer"), contains_null=False) + ``` """ def __init__( @@ -269,23 +272,25 @@ class ArrayType: def from_json(json: str) -> "ArrayType": """Create an ArrayType from a JSON string - The JSON representation for an array type is an object with `type` (set to - `"array"`), `elementType`, and `containsNull`: - ``` - ArrayType.from_json( - '''{ - "type": "array", - "elementType": "integer", - "containsNull": false - }''' - ) - # Returns ArrayType(PrimitiveType("integer"), contains_null=False) - ``` - Args: - json: A JSON string + json: a JSON string + + Returns: + an ArrayType - Returns: an [ArrayType][deltalake.schema.ArrayType] type + Example: + The JSON representation for an array type is an object with `type` (set to + `"array"`), `elementType`, and `containsNull`. + ```python + ArrayType.from_json( + '''{ + "type": "array", + "elementType": "integer", + "containsNull": false + }''' + ) + # Returns ArrayType(PrimitiveType("integer"), contains_null=False) + ``` """ def to_pyarrow( self, @@ -298,9 +303,10 @@ class ArrayType: Will raise `TypeError` if a different PyArrow DataType is provided. Args: - type: The PyArrow [ListType][pyarrow.ListType] + type: The PyArrow ListType - Returns: an [ArrayType][deltalake.schema.ArrayType] type + Returns: + an ArrayType """ class MapType: @@ -310,13 +316,14 @@ class MapType: or [StructType][deltalake.schema.StructType]. A string can also be passed, which will be parsed as a primitive type: - ``` - MapType(PrimitiveType("integer"), PrimitiveType("string")) - # Returns MapType(PrimitiveType("integer"), PrimitiveType("string"), value_contains_null=True) + Example: + ```python + MapType(PrimitiveType("integer"), PrimitiveType("string")) + # Returns MapType(PrimitiveType("integer"), PrimitiveType("string"), value_contains_null=True) - MapType("integer", "string", value_contains_null=False) - # Returns MapType(PrimitiveType("integer"), PrimitiveType("string"), value_contains_null=False) - ``` + MapType("integer", "string", value_contains_null=False) + # Returns MapType(PrimitiveType("integer"), PrimitiveType("string"), value_contains_null=False) + ``` """ def __init__( @@ -352,29 +359,36 @@ class MapType: """ def to_json(self) -> str: - """Get JSON string representation of map type.""" + """Get JSON string representation of map type. + + Returns: + a JSON string + """ @staticmethod def from_json(json: str) -> MapType: """Create a MapType from a JSON string - The JSON representation for a map type is an object with `type` (set to `map`), - `keyType`, `valueType`, and `valueContainsNull`: - ``` - MapType.from_json( - '''{ - "type": "map", - "keyType": "integer", - "valueType": "string", - "valueContainsNull": true - }''' - ) - # Returns MapType(PrimitiveType("integer"), PrimitiveType("string"), value_contains_null=True) - ``` - Args: - json: A JSON string + json: a JSON string - Returns: a [MapType][deltalake.schema.MapType] type + Returns: + an ArrayType + + Example: + The JSON representation for a map type is an object with `type` (set to `map`), + `keyType`, `valueType`, and `valueContainsNull`: + + ```python + MapType.from_json( + '''{ + "type": "map", + "keyType": "integer", + "valueType": "string", + "valueContainsNull": true + }''' + ) + # Returns MapType(PrimitiveType("integer"), PrimitiveType("string"), value_contains_null=True) + ``` """ def to_pyarrow(self) -> pyarrow.MapType: """Get the equivalent PyArrow data type.""" @@ -387,25 +401,27 @@ class MapType: Args: type: the PyArrow MapType - Returns: a [MapType][deltalake.schema.MapType] type + Returns: + a MapType """ class Field: """A field in a Delta StructType or Schema - Can create with just a name and a type: - ``` - Field("my_int_col", "integer") - # Returns Field("my_int_col", PrimitiveType("integer"), nullable=True, metadata=None) - ``` + Example: + Can create with just a name and a type: + ```python + Field("my_int_col", "integer") + # Returns Field("my_int_col", PrimitiveType("integer"), nullable=True, metadata=None) + ``` - Can also attach metadata to the field. Metadata should be a dictionary with - string keys and JSON-serializable values (str, list, int, float, dict): + Can also attach metadata to the field. Metadata should be a dictionary with + string keys and JSON-serializable values (str, list, int, float, dict): - ``` - Field("my_col", "integer", metadata={"custom_metadata": {"test": 2}}) - # Returns Field("my_col", PrimitiveType("integer"), nullable=True, metadata={"custom_metadata": {"test": 2}}) - ``` + ```python + Field("my_col", "integer", metadata={"custom_metadata": {"test": 2}}) + # Returns Field("my_col", PrimitiveType("integer"), nullable=True, metadata={"custom_metadata": {"test": 2}}) + ``` """ def __init__( @@ -440,10 +456,15 @@ class Field: def to_json(self) -> str: """Get the field as JSON string. - ``` - Field("col", "integer").to_json() - # Returns '{"name":"col","type":"integer","nullable":true,"metadata":{}}' - ``` + + Returns: + a JSON string + + Example: + ```python + Field("col", "integer").to_json() + # Returns '{"name":"col","type":"integer","nullable":true,"metadata":{}}' + ``` """ @staticmethod def from_json(json: str) -> Field: @@ -452,25 +473,27 @@ class Field: Args: json: the JSON string. - Returns: Field + Returns: + Field Example: - ``` - Field.from_json('''{ - "name": "col", - "type": "integer", - "nullable": true, - "metadata": {} - }''' - ) - # Returns Field(col, PrimitiveType("integer"), nullable=True) - ``` + ``` + Field.from_json('''{ + "name": "col", + "type": "integer", + "nullable": true, + "metadata": {} + }''' + ) + # Returns Field(col, PrimitiveType("integer"), nullable=True) + ``` """ def to_pyarrow(self) -> pyarrow.Field: """Convert to an equivalent PyArrow field Note: This currently doesn't preserve field metadata. - Returns: a [pyarrow.Field][pyarrow.Field] type + Returns: + a pyarrow Field """ @staticmethod def from_pyarrow(field: pyarrow.Field) -> Field: @@ -478,21 +501,21 @@ class Field: Note: This currently doesn't preserve field metadata. Args: - field: a PyArrow Field type + field: a PyArrow Field - Returns: a [Field][deltalake.schema.Field] type + Returns: + a Field """ class StructType: """A struct datatype, containing one or more subfields Example: - - Create with a list of :class:`Field`: - ``` - StructType([Field("x", "integer"), Field("y", "string")]) - # Creates: StructType([Field(x, PrimitiveType("integer"), nullable=True), Field(y, PrimitiveType("string"), nullable=True)]) - ``` + Create with a list of :class:`Field`: + ```python + StructType([Field("x", "integer"), Field("y", "string")]) + # Creates: StructType([Field(x, PrimitiveType("integer"), nullable=True), Field(y, PrimitiveType("string"), nullable=True)]) + ``` """ def __init__(self, fields: List[Field]) -> None: ... @@ -503,33 +526,42 @@ class StructType: def to_json(self) -> str: """Get the JSON representation of the type. - ``` - StructType([Field("x", "integer")]).to_json() - # Returns '{"type":"struct","fields":[{"name":"x","type":"integer","nullable":true,"metadata":{}}]}' - ``` + + Returns: + a JSON string + + Example: + ```python + StructType([Field("x", "integer")]).to_json() + # Returns '{"type":"struct","fields":[{"name":"x","type":"integer","nullable":true,"metadata":{}}]}' + ``` """ @staticmethod def from_json(json: str) -> StructType: """Create a new StructType from a JSON string. - ``` - StructType.from_json( - '''{ - "type": "struct", - "fields": [{"name": "x", "type": "integer", "nullable": true, "metadata": {}}] - }''' - ) - # Returns StructType([Field(x, PrimitiveType("integer"), nullable=True)]) - ``` Args: json: a JSON string - Returns: a [StructType][deltalake.schema.StructType] type + Returns: + a StructType + + Example: + ```python + StructType.from_json( + '''{ + "type": "struct", + "fields": [{"name": "x", "type": "integer", "nullable": true, "metadata": {}}] + }''' + ) + # Returns StructType([Field(x, PrimitiveType("integer"), nullable=True)]) + ``` """ def to_pyarrow(self) -> pyarrow.StructType: """Get the equivalent PyArrow StructType - Returns: a PyArrow [StructType][pyarrow.StructType] type + Returns: + a PyArrow StructType """ @staticmethod def from_pyarrow(type: pyarrow.StructType) -> StructType: @@ -540,7 +572,8 @@ class StructType: Args: type: a PyArrow struct type. - Returns: a [StructType][deltalake.schema.StructType] type + Returns: + a StructType """ class Schema: @@ -553,38 +586,44 @@ class Schema: """ def to_json(self) -> str: """Get the JSON string representation of the Schema. - A schema has the same JSON format as a StructType. - ``` - Schema([Field("x", "integer")]).to_json() - # Returns '{"type":"struct","fields":[{"name":"x","type":"integer","nullable":true,"metadata":{}}]}' - ``` - Returns: a JSON string + + Returns: + a JSON string + + Example: + A schema has the same JSON format as a StructType. + ```python + Schema([Field("x", "integer")]).to_json() + # Returns '{"type":"struct","fields":[{"name":"x","type":"integer","nullable":true,"metadata":{}}]}' + ``` """ @staticmethod def from_json(json: str) -> Schema: """Create a new Schema from a JSON string. - A schema has the same JSON format as a StructType. - ``` - Schema.from_json('''{ - "type": "struct", - "fields": [{"name": "x", "type": "integer", "nullable": true, "metadata": {}}] - } - )''' - # Returns Schema([Field(x, PrimitiveType("integer"), nullable=True)]) - ``` - Args: json: a JSON string + + Example: + A schema has the same JSON format as a StructType. + ```python + Schema.from_json('''{ + "type": "struct", + "fields": [{"name": "x", "type": "integer", "nullable": true, "metadata": {}}] + } + )''' + # Returns Schema([Field(x, PrimitiveType("integer"), nullable=True)]) + ``` """ def to_pyarrow(self, as_large_types: bool = False) -> pyarrow.Schema: """Return equivalent PyArrow schema Args: - as_large_types: get schema with all variable size types (list, binary, string) as large variants (with int64 indices). This is for compatibility with systems like Polars that only support the large versions of Arrow types. + as_large_types: get schema with all variable size types (list, binary, string) as large variants (with int64 indices). + This is for compatibility with systems like Polars that only support the large versions of Arrow types. Returns: - a PyArrow [Schema][pyarrow.Schema] type + a PyArrow Schema """ @staticmethod def from_pyarrow(type: pyarrow.Schema) -> Schema: @@ -593,9 +632,10 @@ class Schema: Will raise `TypeError` if the PyArrow type is not a primitive type. Args: - type: A PyArrow [Schema][pyarrow.Schema] type + type: A PyArrow Schema - Returns: a [Schema][deltalake.schema.Schema] type + Returns: + a Schema """ class ObjectInputFile: diff --git a/python/deltalake/table.py b/python/deltalake/table.py index b238af7929..adf3ca92af 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -31,6 +31,8 @@ ) if TYPE_CHECKING: + import os + import pandas from deltalake._internal import DeltaDataChecker as _DeltaDataChecker @@ -209,13 +211,13 @@ def _filters_to_expression(filters: FilterType) -> Expression: the op is in or not in, the value must be a collection such as a list, a set or a tuple. The supported type for value is str. Use empty string `''` for Null partition value. -Examples: -``` -("x", "=", "a") -("x", "!=", "a") -("y", "in", ["a", "b", "c"]) -("z", "not in", ["a","b"]) -``` +Example: + ``` + ("x", "=", "a") + ("x", "!=", "a") + ("y", "in", ["a", "b", "c"]) + ("z", "not in", ["a","b"]) + ``` """ @@ -225,7 +227,7 @@ class DeltaTable: def __init__( self, - table_uri: Union[str, Path], + table_uri: Union[str, Path, "os.PathLike[str]"], version: Optional[int] = None, storage_options: Optional[Dict[str, str]] = None, without_files: bool = False, @@ -329,13 +331,13 @@ def files( the op is in or not in, the value must be a collection such as a list, a set or a tuple. The supported type for value is str. Use empty string `''` for Null partition value. - Examples: - ``` - ("x", "=", "a") - ("x", "!=", "a") - ("y", "in", ["a", "b", "c"]) - ("z", "not in", ["a","b"]) - ``` + Example: + ``` + ("x", "=", "a") + ("x", "!=", "a") + ("y", "in", ["a", "b", "c"]) + ("z", "not in", ["a","b"]) + ``` """ return self._table.files(self.__stringify_partition_values(partition_filters)) @@ -366,13 +368,13 @@ def file_uris( the op is in or not in, the value must be a collection such as a list, a set or a tuple. The supported type for value is str. Use empty string `''` for Null partition value. - Examples: - ``` - ("x", "=", "a") - ("x", "!=", "a") - ("y", "in", ["a", "b", "c"]) - ("z", "not in", ["a","b"]) - ``` + Example: + ``` + ("x", "=", "a") + ("x", "!=", "a") + ("y", "in", ["a", "b", "c"]) + ("z", "not in", ["a","b"]) + ``` """ return self._table.file_uris( self.__stringify_partition_values(partition_filters) @@ -397,12 +399,12 @@ def load_with_datetime(self, datetime_string: str) -> None: Args: datetime_string: the identifier of the datetime point of the DeltaTable to load - Examples: - ``` - "2018-01-26T18:30:09Z" - "2018-12-19T16:39:57-08:00" - "2018-01-26T18:30:09.453+00:00" - ``` + Example: + ``` + "2018-01-26T18:30:09Z" + "2018-12-19T16:39:57-08:00" + "2018-01-26T18:30:09.453+00:00" + ``` """ self._table.load_with_datetime(datetime_string) @@ -511,7 +513,7 @@ def update( Args: updates: a mapping of column name to update SQL expression. new_values: a mapping of column name to python datatype. - predicate: a logical expression, defaults to None + predicate: a logical expression. writer_properties: Pass writer properties to the Rust parquet writer, see options https://arrow.apache.org/rust/parquet/file/properties/struct.WriterProperties.html, only the following fields are supported: `data_page_size_limit`, `dictionary_page_size_limit`, `data_page_row_count_limit`, `write_batch_size`, `max_row_group_size`. @@ -520,34 +522,43 @@ def update( Returns: the metrics from update - Examples: - - Update some row values with SQL predicate. This is equivalent to `UPDATE table SET deleted = true WHERE id = '3'` + Example: + **Update some row values with SQL predicate** + + This is equivalent to `UPDATE table SET deleted = true WHERE id = '3'` + ```py + from deltalake import write_deltalake, DeltaTable + import pandas as pd + df = pd.DataFrame( + {"id": ["1", "2", "3"], + "deleted": [False, False, False], + "price": [10., 15., 20.] + }) + write_deltalake("tmp", df) + dt = DeltaTable("tmp") + dt.update(predicate="id = '3'", updates = {"deleted": 'True'}) - >>> from deltalake import write_deltalake, DeltaTable - >>> import pandas as pd - >>> df = pd.DataFrame({"id": ["1", "2", "3"], "deleted": [False, False, False], "price": [10., 15., 20.]}) - >>> write_deltalake("tmp", df) - >>> dt = DeltaTable("tmp") - >>> dt.update(predicate="id = '3'", updates = {"deleted": 'True'}) - {'num_added_files': 1, 'num_removed_files': 1, 'num_updated_rows': 1, 'num_copied_rows': 2, 'execution_time_ms': ..., 'scan_time_ms': ...} + ``` - Update all row values. This is equivalent to - ``UPDATE table SET deleted = true, id = concat(id, '_old')``. + **Update all row values** + + This is equivalent to ``UPDATE table SET deleted = true, id = concat(id, '_old')``. + ```py + dt.update(updates = {"deleted": 'True', "id": "concat(id, '_old')"}) - >>> dt.update(updates = {"deleted": 'True', "id": "concat(id, '_old')"}) - - {'num_added_files': 1, 'num_removed_files': 1, 'num_updated_rows': 3, 'num_copied_rows': 0, 'execution_time_ms': ..., 'scan_time_ms': ...} + ``` - To use Python objects instead of SQL strings, use the `new_values` parameter - instead of the `updates` parameter. For example, this is equivalent to - ``UPDATE table SET price = 150.10 WHERE id = '1'`` + **Use Python objects instead of SQL strings** - >>> dt.update(predicate="id = '1_old'", new_values = {"price": 150.10}) - {'num_added_files': 1, 'num_removed_files': 1, 'num_updated_rows': 1, 'num_copied_rows': 2, 'execution_time_ms': ..., 'scan_time_ms': ...} + Use the `new_values` parameter instead of the `updates` parameter. For example, + this is equivalent to ``UPDATE table SET price = 150.10 WHERE id = '1'`` + ```py + dt.update(predicate="id = '1_old'", new_values = {"price": 150.10}) + {'num_added_files': 1, 'num_removed_files': 1, 'num_updated_rows': 1, 'num_copied_rows': 2, 'execution_time_ms': ..., 'scan_time_ms': ...} + ``` """ if updates is None and new_values is not None: updates = {} @@ -614,11 +625,11 @@ def merge( match the underlying table. Args: - source (pyarrow.Table | pyarrow.RecordBatch | pyarrow.RecordBatchReader ): source data - predicate (str): SQL like predicate on how to merge - source_alias (str): Alias for the source table - target_alias (str): Alias for the target table - error_on_type_mismatch (bool): specify if merge will return error if data types are mismatching :default = True + source: source data + predicate: SQL like predicate on how to merge + source_alias: Alias for the source table + target_alias: Alias for the target table + error_on_type_mismatch: specify if merge will return error if data types are mismatching :default = True Returns: TableMerger: TableMerger Object @@ -858,26 +869,27 @@ def get_add_actions(self, flatten: bool = False) -> pyarrow.RecordBatch: a PyArrow RecordBatch containing the add action data. Example: - - >>> from pprint import pprint - >>> from deltalake import DeltaTable, write_deltalake - >>> import pyarrow as pa - >>> data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]}) - >>> write_deltalake("tmp", data, partition_by=["x"]) - >>> dt = DeltaTable("tmp") - >>> df = dt.get_add_actions().to_pandas() - >>> df["path"].sort_values(ignore_index=True) - 0 x=1/0-... - 1 x=2/0-... - 2 x=3/0-... - ... - >>> df = dt.get_add_actions(flatten=True).to_pandas() - >>> df["partition.x"].sort_values(ignore_index=True) + ```python + from pprint import pprint + from deltalake import DeltaTable, write_deltalake + import pyarrow as pa + data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]}) + write_deltalake("tmp", data, partition_by=["x"]) + dt = DeltaTable("tmp") + df = dt.get_add_actions().to_pandas() + df["path"].sort_values(ignore_index=True) + 0 x=1/0 + 1 x=2/0 + 2 x=3/0 + ``` + + ```python + df = dt.get_add_actions(flatten=True).to_pandas() + df["partition.x"].sort_values(ignore_index=True) 0 1 1 2 2 3 - ... - + ``` """ return self._table.get_add_actions(flatten) @@ -911,16 +923,16 @@ def repair(self, dry_run: bool = False) -> Dict[str, Any]: Returns: The metrics from repair (FSCK) action. - Examples: - ``` - from deltalake import DeltaTable - dt = DeltaTable('TEST') - dt.repair(dry_run=False) - ``` - Results in - ``` - {'dry_run': False, 'files_removed': ['6-0d084325-6885-4847-b008-82c1cf30674c-0.parquet', 5-4fba1d3e-3e20-4de1-933d-a8e13ac59f53-0.parquet']} - ``` + Example: + ```python + from deltalake import DeltaTable + dt = DeltaTable('TEST') + dt.repair(dry_run=False) + ``` + Results in + ``` + {'dry_run': False, 'files_removed': ['6-0d084325-6885-4847-b008-82c1cf30674c-0.parquet', 5-4fba1d3e-3e20-4de1-933d-a8e13ac59f53-0.parquet']} + ``` """ metrics = self._table.repair(dry_run) return json.loads(metrics) @@ -969,11 +981,11 @@ def with_writer_properties( """Pass writer properties to the Rust parquet writer, see options https://arrow.apache.org/rust/parquet/file/properties/struct.WriterProperties.html: Args: - data_page_size_limit (int|None, Optional): Limit DataPage size to this in bytes. Defaults to None. - dictionary_page_size_limit (int|None, Optional): Limit the size of each DataPage to store dicts to this amount in bytes. Defaults to None. - data_page_row_count_limit (int|None, Optional): Limit the number of rows in each DataPage. Defaults to None. - write_batch_size (int|None, Optional): Splits internally to smaller batch size. Defaults to None. - max_row_group_size (int|None, Optional): Max number of rows in row group. Defaults to None. + data_page_size_limit: Limit DataPage size to this in bytes. + dictionary_page_size_limit: Limit the size of each DataPage to store dicts to this amount in bytes. + data_page_row_count_limit: Limit the number of rows in each DataPage. + write_batch_size: Splits internally to smaller batch size. + max_row_group_size: Max number of rows in row group. Returns: TableMerger: TableMerger Object @@ -995,36 +1007,39 @@ def when_matched_update( If a ``predicate`` is specified, then it must evaluate to true for the row to be updated. Args: - updates (dict): a mapping of column name to update SQL expression. - predicate (str | None, Optional): SQL like predicate on when to update. Defaults to None. + updates: a mapping of column name to update SQL expression. + predicate: SQL like predicate on when to update. Returns: TableMerger: TableMerger Object - Examples: - - >>> from deltalake import DeltaTable, write_deltalake - >>> import pyarrow as pa - >>> data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]}) - >>> write_deltalake("tmp", data) - >>> dt = DeltaTable("tmp") - >>> new_data = pa.table({"x": [1], "y": [7]}) - >>> ( - ... dt.merge( - ... source=new_data, - ... predicate="target.x = source.x", - ... source_alias="source", - ... target_alias="target") - ... .when_matched_update(updates={"x": "source.x", "y": "source.y"}) - ... .execute() - ... ) + Example: + ```python + from deltalake import DeltaTable, write_deltalake + import pyarrow as pa + + data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]}) + write_deltalake("tmp", data) + dt = DeltaTable("tmp") + new_data = pa.table({"x": [1], "y": [7]}) + + ( + dt.merge( + source=new_data, + predicate="target.x = source.x", + source_alias="source", + target_alias="target") + .when_matched_update(updates={"x": "source.x", "y": "source.y"}) + .execute() + ) {'num_source_rows': 1, 'num_target_rows_inserted': 0, 'num_target_rows_updated': 1, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 2, 'num_output_rows': 3, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...} - >>> dt.to_pandas() + + dt.to_pandas() x y 0 1 7 1 2 5 2 3 6 - + ``` """ if isinstance(self.matched_update_updates, list) and isinstance( self.matched_update_predicate, list @@ -1041,35 +1056,38 @@ def when_matched_update_all(self, predicate: Optional[str] = None) -> "TableMerg If a ``predicate`` is specified, then it must evaluate to true for the row to be updated. Args: - predicate (str | None, Optional): SQL like predicate on when to update all columns. Defaults to None. + predicate: SQL like predicate on when to update all columns. Returns: TableMerger: TableMerger Object - Examples: - - >>> from deltalake import DeltaTable, write_deltalake - >>> import pyarrow as pa - >>> data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]}) - >>> write_deltalake("tmp", data) - >>> dt = DeltaTable("tmp") - >>> new_data = pa.table({"x": [1], "y": [7]}) - >>> ( - ... dt.merge( - ... source=new_data, - ... predicate="target.x = source.x", - ... source_alias="source", - ... target_alias="target") - ... .when_matched_update_all() - ... .execute() - ... ) + Example: + ```python + from deltalake import DeltaTable, write_deltalake + import pyarrow as pa + + data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]}) + write_deltalake("tmp", data) + dt = DeltaTable("tmp") + new_data = pa.table({"x": [1], "y": [7]}) + + ( + dt.merge( + source=new_data, + predicate="target.x = source.x", + source_alias="source", + target_alias="target") + .when_matched_update_all() + .execute() + ) {'num_source_rows': 1, 'num_target_rows_inserted': 0, 'num_target_rows_updated': 1, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 2, 'num_output_rows': 3, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...} - >>> dt.to_pandas() + + dt.to_pandas() x y 0 1 7 1 2 5 2 3 6 - + ``` """ src_alias = (self.source_alias + ".") if self.source_alias is not None else "" @@ -1096,54 +1114,59 @@ def when_matched_delete(self, predicate: Optional[str] = None) -> "TableMerger": true for the matched row. If not specified it deletes all matches. Args: - predicate (str | None, Optional): SQL like predicate on when to delete. Defaults to None. + predicate (str | None, Optional): SQL like predicate on when to delete. Returns: TableMerger: TableMerger Object - Examples: - - Delete on a predicate - - >>> from deltalake import DeltaTable, write_deltalake - >>> import pyarrow as pa - >>> data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]}) - >>> write_deltalake("tmp", data) - >>> dt = DeltaTable("tmp") - >>> new_data = pa.table({"x": [2, 3], "deleted": [False, True]}) - >>> ( - ... dt.merge( - ... source=new_data, - ... predicate='target.x = source.x', - ... source_alias='source', - ... target_alias='target') - ... .when_matched_delete( - ... predicate="source.deleted = true") - ... .execute() - ... ) + Example: + **Delete on a predicate** + + ```python + from deltalake import DeltaTable, write_deltalake + import pyarrow as pa + + data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]}) + write_deltalake("tmp", data) + dt = DeltaTable("tmp") + new_data = pa.table({"x": [2, 3], "deleted": [False, True]}) + + ( + dt.merge( + source=new_data, + predicate='target.x = source.x', + source_alias='source', + target_alias='target') + .when_matched_delete( + predicate="source.deleted = true") + .execute() + ) {'num_source_rows': 2, 'num_target_rows_inserted': 0, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 1, 'num_target_rows_copied': 2, 'num_output_rows': 2, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...} - >>> dt.to_pandas().sort_values("x", ignore_index=True) + + dt.to_pandas().sort_values("x", ignore_index=True) x y 0 1 4 1 2 5 - - Delete all records that were matched - - >>> dt = DeltaTable("tmp") - >>> ( - ... dt.merge( - ... source=new_data, - ... predicate='target.x = source.x', - ... source_alias='source', - ... target_alias='target') - ... .when_matched_delete() - ... .execute() - ... ) + ``` + + **Delete all records that were matched** + ```python + dt = DeltaTable("tmp") + ( + dt.merge( + source=new_data, + predicate='target.x = source.x', + source_alias='source', + target_alias='target') + .when_matched_delete() + .execute() + ) {'num_source_rows': 2, 'num_target_rows_inserted': 0, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 1, 'num_target_rows_copied': 1, 'num_output_rows': 1, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...} - >>> dt.to_pandas() + + dt.to_pandas() x y 0 1 4 - + ``` """ if self.matched_delete_all is not None: raise ValueError( @@ -1168,40 +1191,43 @@ def when_not_matched_insert( Args: updates (dict): a mapping of column name to insert SQL expression. - predicate (str | None, Optional): SQL like predicate on when to insert. Defaults to None. + predicate (str | None, Optional): SQL like predicate on when to insert. Returns: TableMerger: TableMerger Object - Examples: - - >>> from deltalake import DeltaTable, write_deltalake - >>> import pyarrow as pa - >>> data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]}) - >>> write_deltalake("tmp", data) - >>> dt = DeltaTable("tmp") - >>> new_data = pa.table({"x": [4], "y": [7]}) - >>> ( - ... dt.merge( - ... source=new_data, - ... predicate='target.x = source.x', - ... source_alias='source', - ... target_alias='target') - ... .when_not_matched_insert( - ... updates = { - ... "x": "source.x", - ... "y": "source.y", - ... }) - ... .execute() - ... ) + Example: + ```python + from deltalake import DeltaTable, write_deltalake + import pyarrow as pa + + data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]}) + write_deltalake("tmp", data) + dt = DeltaTable("tmp") + new_data = pa.table({"x": [4], "y": [7]}) + + ( + dt.merge( + source=new_data, + predicate="target.x = source.x", + source_alias="source", + target_alias="target",) + .when_not_matched_insert( + updates={ + "x": "source.x", + "y": "source.y", + }) + .execute() + ) {'num_source_rows': 1, 'num_target_rows_inserted': 1, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 3, 'num_output_rows': 4, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...} - >>> dt.to_pandas().sort_values("x", ignore_index=True) + + dt.to_pandas().sort_values("x", ignore_index=True) x y 0 1 4 1 2 5 2 3 6 3 4 7 - + ``` """ if isinstance(self.not_matched_insert_updates, list) and isinstance( @@ -1223,36 +1249,39 @@ def when_not_matched_insert_all( the new row to be inserted. Args: - predicate (str | None, Optional): SQL like predicate on when to insert. Defaults to None. + predicate: SQL like predicate on when to insert. Returns: TableMerger: TableMerger Object - Examples: - - >>> from deltalake import DeltaTable, write_deltalake - >>> import pyarrow as pa - >>> data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]}) - >>> write_deltalake("tmp", data) - >>> dt = DeltaTable("tmp") - >>> new_data = pa.table({"x": [4], "y": [7]}) - >>> ( - ... dt.merge( - ... source=new_data, - ... predicate='target.x = source.x', - ... source_alias='source', - ... target_alias='target') - ... .when_not_matched_insert_all() - ... .execute() - ... ) + Example: + ```python + from deltalake import DeltaTable, write_deltalake + import pyarrow as pa + + data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]}) + write_deltalake("tmp", data) + dt = DeltaTable("tmp") + new_data = pa.table({"x": [4], "y": [7]}) + + ( + dt.merge( + source=new_data, + predicate='target.x = source.x', + source_alias='source', + target_alias='target') + .when_not_matched_insert_all() + .execute() + ) {'num_source_rows': 1, 'num_target_rows_inserted': 1, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 3, 'num_output_rows': 4, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...} - >>> dt.to_pandas().sort_values("x", ignore_index=True) + + dt.to_pandas().sort_values("x", ignore_index=True) x y 0 1 4 1 2 5 2 3 6 3 4 7 - + ``` """ src_alias = (self.source_alias + ".") if self.source_alias is not None else "" @@ -1279,38 +1308,41 @@ def when_not_matched_by_source_update( If a ``predicate`` is specified, then it must evaluate to true for the row to be updated. Args: - updates (dict): a mapping of column name to update SQL expression. - predicate (str | None, Optional): SQL like predicate on when to update. Defaults to None. + updates: a mapping of column name to update SQL expression. + predicate: SQL like predicate on when to update. Returns: TableMerger: TableMerger Object - Examples: - - >>> from deltalake import DeltaTable, write_deltalake - >>> import pyarrow as pa - >>> data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]}) - >>> write_deltalake("tmp", data) - >>> dt = DeltaTable("tmp") - >>> new_data = pa.table({"x": [2, 3, 4]}) - >>> ( - ... dt.merge( - ... source=new_data, - ... predicate='target.x = source.x', - ... source_alias='source', - ... target_alias='target') - ... .when_not_matched_by_source_update( - ... predicate = "y > 3", - ... updates = {"y": "0"}) - ... .execute() - ... ) + Example: + ```python + from deltalake import DeltaTable, write_deltalake + import pyarrow as pa + + data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]}) + write_deltalake("tmp", data) + dt = DeltaTable("tmp") + new_data = pa.table({"x": [2, 3, 4]}) + + ( + dt.merge( + source=new_data, + predicate='target.x = source.x', + source_alias='source', + target_alias='target') + .when_not_matched_by_source_update( + predicate = "y > 3", + updates = {"y": "0"}) + .execute() + ) {'num_source_rows': 3, 'num_target_rows_inserted': 0, 'num_target_rows_updated': 1, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 2, 'num_output_rows': 3, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...} - >>> dt.to_pandas().sort_values("x", ignore_index=True) + + dt.to_pandas().sort_values("x", ignore_index=True) x y 0 1 0 1 2 5 2 3 6 - + ``` """ if isinstance(self.not_matched_by_source_update_updates, list) and isinstance( @@ -1330,7 +1362,7 @@ def when_not_matched_by_source_delete( ``predicate`` (if specified) is true for the target row. Args: - predicate (str | None, Optional): SQL like predicate on when to delete when not matched by source. Defaults to None. + predicate: SQL like predicate on when to delete when not matched by source. Returns: TableMerger: TableMerger Object @@ -1354,7 +1386,7 @@ def execute(self) -> Dict[str, Any]: """Executes `MERGE` with the previously provided settings in Rust with Apache Datafusion query engine. Returns: - Dict[str, Any]: metrics + Dict: metrics """ metrics = self.table._table.merge_execute( source=self.source, @@ -1434,19 +1466,21 @@ def compact( Returns: the metrics from optimize - Examples: + Example: Use a timedelta object to specify the seconds, minutes or hours of the interval. + ```python + from deltalake import DeltaTable, write_deltalake + from datetime import timedelta + import pyarrow as pa - >>> from deltalake import DeltaTable, write_deltalake - >>> from datetime import timedelta - >>> import pyarrow as pa - >>> write_deltalake("tmp", pa.table({"x": [1], "y": [4]})) - >>> write_deltalake("tmp", pa.table({"x": [2], "y": [5]}), mode="append") - >>> dt = DeltaTable("tmp") - >>> time_delta = timedelta(minutes=10) - >>> dt.optimize.compact(min_commit_interval=time_delta) - {'numFilesAdded': 1, 'numFilesRemoved': 2, 'filesAdded': ..., 'filesRemoved': ..., 'partitionsOptimized': 1, 'numBatches': 2, 'totalConsideredFiles': 2, 'totalFilesSkipped': 0, 'preserveInsertionOrder': True} + write_deltalake("tmp", pa.table({"x": [1], "y": [4]})) + write_deltalake("tmp", pa.table({"x": [2], "y": [5]}), mode="append") + dt = DeltaTable("tmp") + time_delta = timedelta(minutes=10) + dt.optimize.compact(min_commit_interval=time_delta) + {'numFilesAdded': 1, 'numFilesRemoved': 2, 'filesAdded': ..., 'filesRemoved': ..., 'partitionsOptimized': 1, 'numBatches': 2, 'totalConsideredFiles': 2, 'totalFilesSkipped': 0, 'preserveInsertionOrder': True} + ``` """ if isinstance(min_commit_interval, timedelta): min_commit_interval = int(min_commit_interval.total_seconds()) @@ -1488,19 +1522,21 @@ def z_order( Returns: the metrics from optimize - Examples: + Example: Use a timedelta object to specify the seconds, minutes or hours of the interval. + ```python + from deltalake import DeltaTable, write_deltalake + from datetime import timedelta + import pyarrow as pa - >>> from deltalake import DeltaTable, write_deltalake - >>> from datetime import timedelta - >>> import pyarrow as pa - >>> write_deltalake("tmp", pa.table({"x": [1], "y": [4]})) - >>> write_deltalake("tmp", pa.table({"x": [2], "y": [5]}), mode="append") - >>> dt = DeltaTable("tmp") - >>> time_delta = timedelta(minutes=10) - >>> dt.optimize.z_order(["x"], min_commit_interval=time_delta) - {'numFilesAdded': 1, 'numFilesRemoved': 2, 'filesAdded': ..., 'filesRemoved': ..., 'partitionsOptimized': 0, 'numBatches': 1, 'totalConsideredFiles': 2, 'totalFilesSkipped': 0, 'preserveInsertionOrder': True} + write_deltalake("tmp", pa.table({"x": [1], "y": [4]})) + write_deltalake("tmp", pa.table({"x": [2], "y": [5]}), mode="append") + dt = DeltaTable("tmp") + time_delta = timedelta(minutes=10) + dt.optimize.z_order(["x"], min_commit_interval=time_delta) + {'numFilesAdded': 1, 'numFilesRemoved': 2, 'filesAdded': ..., 'filesRemoved': ..., 'partitionsOptimized': 0, 'numBatches': 1, 'totalConsideredFiles': 2, 'totalFilesSkipped': 0, 'preserveInsertionOrder': True} + ``` """ if isinstance(min_commit_interval, timedelta): min_commit_interval = int(min_commit_interval.total_seconds()) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 2b4814f98b..bb69fee457 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -21,7 +21,7 @@ ) from urllib.parse import unquote -from deltalake import Schema +from deltalake import Schema as DeltaSchema from deltalake.fs import DeltaStorageHandler from ._util import encode_partition_value @@ -82,7 +82,7 @@ def write_deltalake( RecordBatchReader, ], *, - schema: Optional[Union[pa.Schema, Schema]] = ..., + schema: Optional[Union[pa.Schema, DeltaSchema]] = ..., partition_by: Optional[Union[List[str], str]] = ..., filesystem: Optional[pa_fs.FileSystem] = None, mode: Literal["error", "append", "overwrite", "ignore"] = ..., @@ -116,7 +116,7 @@ def write_deltalake( RecordBatchReader, ], *, - schema: Optional[Union[pa.Schema, Schema]] = ..., + schema: Optional[Union[pa.Schema, DeltaSchema]] = ..., partition_by: Optional[Union[List[str], str]] = ..., mode: Literal["error", "append", "overwrite", "ignore"] = ..., max_rows_per_group: int = ..., @@ -143,7 +143,7 @@ def write_deltalake( RecordBatchReader, ], *, - schema: Optional[Union[pa.Schema, Schema]] = None, + schema: Optional[Union[pa.Schema, DeltaSchema]] = None, partition_by: Optional[Union[List[str], str]] = None, filesystem: Optional[pa_fs.FileSystem] = None, mode: Literal["error", "append", "overwrite", "ignore"] = "error", @@ -231,7 +231,9 @@ def write_deltalake( storage_options: options passed to the native delta filesystem. Unused if 'filesystem' is defined. predicate: When using `Overwrite` mode, replace data that matches a predicate. Only used in rust engine. partition_filters: the partition filters that will be used for partition overwrite. Only used in pyarrow engine. - large_dtypes: If True, the data schema is kept in large_dtypes, has no effect on pandas dataframe input + large_dtypes: If True, the data schema is kept in large_dtypes, has no effect on pandas dataframe input. + engine: writer engine to write the delta table. `Rust` engine is still experimental but you may + see up to 4x performance improvements over pyarrow. """ table, table_uri = try_get_table_and_table_uri(table_or_uri, storage_options) if table is not None: @@ -245,7 +247,7 @@ def write_deltalake( if isinstance(partition_by, str): partition_by = [partition_by] - if isinstance(schema, Schema): + if isinstance(schema, DeltaSchema): schema = schema.to_pyarrow() if isinstance(data, RecordBatchReader):