Skip to content

Commit

Permalink
Merge branch 'main' into fix-write-to-fuse
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco authored Dec 2, 2023
2 parents 33b7583 + 71586b1 commit 8da7d52
Show file tree
Hide file tree
Showing 20 changed files with 621 additions and 437 deletions.
5 changes: 0 additions & 5 deletions .github/workflows/python_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,6 @@ jobs:
python -m pytest -m "not pandas and not integration and not benchmark"
pip install pandas
- name: Build Sphinx documentation
run: |
source venv/bin/activate
make build-documentation
benchmark:
name: Python Benchmark
runs-on: ubuntu-latest
Expand Down
12 changes: 8 additions & 4 deletions crates/deltalake-core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -900,10 +900,14 @@ fn build_compaction_plan(

// Prune merge bins with only 1 file, since they have no effect
for (_, bins) in operations.iter_mut() {
if bins.len() == 1 && bins[0].len() == 1 {
metrics.total_files_skipped += 1;
bins.clear();
}
bins.retain(|bin| {
if bin.len() == 1 {
metrics.total_files_skipped += 1;
false
} else {
true
}
})
}
operations.retain(|_, files| !files.is_empty());

Expand Down
65 changes: 65 additions & 0 deletions crates/deltalake-core/tests/command_optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,71 @@ async fn test_idempotent_metrics() -> Result<(), Box<dyn Error>> {
Ok(())
}

#[tokio::test]
/// Validate that multiple bins packing is idempotent.
async fn test_idempotent_with_multiple_bins() -> Result<(), Box<dyn Error>> {
//TODO: Compression makes it hard to get the target file size...
//Maybe just commit files with a known size
let context = setup_test(true).await?;
let mut dt = context.table;
let mut writer = RecordBatchWriter::for_table(&dt)?;

write(
&mut writer,
&mut dt,
generate_random_batch(records_for_size(6_000_000), "2022-05-22")?,
)
.await?;
write(
&mut writer,
&mut dt,
generate_random_batch(records_for_size(3_000_000), "2022-05-22")?,
)
.await?;
write(
&mut writer,
&mut dt,
generate_random_batch(records_for_size(6_000_000), "2022-05-22")?,
)
.await?;
write(
&mut writer,
&mut dt,
generate_random_batch(records_for_size(3_000_000), "2022-05-22")?,
)
.await?;
write(
&mut writer,
&mut dt,
generate_random_batch(records_for_size(9_900_000), "2022-05-22")?,
)
.await?;

let version = dt.version();

let filter = vec![PartitionFilter::try_from(("date", "=", "2022-05-22"))?];

let optimize = DeltaOps(dt)
.optimize()
.with_filters(&filter)
.with_target_size(10_000_000);
let (dt, metrics) = optimize.await?;
assert_eq!(metrics.num_files_added, 2);
assert_eq!(metrics.num_files_removed, 4);
assert_eq!(dt.version(), version + 1);

let optimize = DeltaOps(dt)
.optimize()
.with_filters(&filter)
.with_target_size(10_000_000);
let (dt, metrics) = optimize.await?;
assert_eq!(metrics.num_files_added, 0);
assert_eq!(metrics.num_files_removed, 0);
assert_eq!(dt.version(), version + 1);

Ok(())
}

#[tokio::test]
/// Validate operation data and metadata was written
async fn test_commit_info() -> Result<(), Box<dyn Error>> {
Expand Down
1 change: 1 addition & 0 deletions docs/api/catalog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
::: deltalake.data_catalog.DataCatalog
10 changes: 0 additions & 10 deletions docs/api/delta_table.md

This file was deleted.

5 changes: 5 additions & 0 deletions docs/api/delta_table/delta_table_merger.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# TableMerger

::: deltalake.table.TableMerger
options:
show_root_heading: true
5 changes: 5 additions & 0 deletions docs/api/delta_table/delta_table_optimizer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# TableOptimizer

::: deltalake.table.TableOptimizer
options:
show_root_heading: true
5 changes: 5 additions & 0 deletions docs/api/delta_table/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# DeltaTable

::: deltalake.DeltaTable
options:
show_root_heading: true
6 changes: 6 additions & 0 deletions docs/api/delta_table/metadata.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Metadata

::: deltalake.Metadata
options:
show_root_heading: true

7 changes: 7 additions & 0 deletions docs/api/delta_writer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Writer
## Write to Delta Tables

::: deltalake.write_deltalake

## Convert to Delta Tables
::: deltalake.convert_to_deltalake
6 changes: 6 additions & 0 deletions docs/api/exceptions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Exceptions

::: deltalake.exceptions.DeltaError
::: deltalake.exceptions.DeltaProtocolError
::: deltalake.exceptions.TableNotFoundError
::: deltalake.exceptions.CommitFailedError
23 changes: 9 additions & 14 deletions docs/api/schema.md
Original file line number Diff line number Diff line change
@@ -1,39 +1,34 @@
## Delta Lake Schemas

## Schema and field
Schemas, fields, and data types are provided in the ``deltalake.schema`` submodule.

::: deltalake.schema.Schema
::: deltalake.Schema
options:
show_root_heading: true
show_root_toc_entry: true

::: deltalake.schema.PrimitiveType
::: deltalake.Field
options:
show_root_heading: true
show_root_toc_entry: true

::: deltalake.schema.ArrayType

## Data types
::: deltalake.schema.PrimitiveType
options:
show_root_heading: true
show_root_toc_entry: true

::: deltalake.schema.MapType
::: deltalake.schema.ArrayType
options:
show_root_heading: true
show_root_toc_entry: true

::: deltalake.schema.Field
::: deltalake.schema.MapType
options:
show_root_heading: true
show_root_toc_entry: true

::: deltalake.schema.StructType
options:
show_root_heading: true
show_root_toc_entry: true

::: deltalake.data_catalog

## Delta Storage Handler

::: deltalake.fs
show_root_toc_entry: true
6 changes: 4 additions & 2 deletions docs/api/storage.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
## Delta Storage Handler
# Storage

::: deltalake.fs
The delta filesystem handler for the pyarrow engine writer.

::: deltalake.fs.DeltaStorageHandler
2 changes: 1 addition & 1 deletion docs/usage/appending-overwriting-delta-lake-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ Here are the contents of the Delta table after the overwrite operation:

Overwriting just performs a logical delete. It doesn't physically remove the previous data from storage. Time travel back to the previous version to confirm that the old version of the table is still accessable.

```
```python
dt = DeltaTable("tmp/some-table", version=1)

+-------+----------+
Expand Down
24 changes: 12 additions & 12 deletions docs/usage/optimize/small-file-compaction-with-optimize.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Let’s start by creating a Delta table with a lot of small files so we can demo

Start by writing a function that generates on thousand rows of random data given a timestamp.

```
```python
def record_observations(date: datetime) -> pa.Table:
"""Pulls data for a certain datetime"""
nrows = 1000
Expand All @@ -31,7 +31,7 @@ def record_observations(date: datetime) -> pa.Table:

Let’s run this function and observe the output:

```
```python
record_observations(datetime(2021, 1, 1, 12)).to_pandas()

date timestamp value
Expand All @@ -44,7 +44,7 @@ record_observations(datetime(2021, 1, 1, 12)).to_pandas()

Let’s write 100 hours worth of data to the Delta table.

```
```python
# Every hour starting at midnight on 2021-01-01
hours_iter = (datetime(2021, 1, 1) + timedelta(hours=i) for i in itertools.count())

Expand All @@ -60,7 +60,7 @@ for timestamp in itertools.islice(hours_iter, 100):

This data was appended to the Delta table in 100 separate transactions, so the table will contain 100 transaction log entries and 100 data files. You can see the number of files with the `files()` method.

```
```python
dt = DeltaTable("observation_data")
len(dt.files()) # 100
```
Expand Down Expand Up @@ -101,15 +101,15 @@ Each of these Parquet files are tiny - they’re only 10 KB. Let’s see how to

Let’s run the optimize command to compact the existing small files into larger files:

```
```python
dt = DeltaTable("observation_data")

dt.optimize()
```

Here’s the output of the command:

```
```python
{'numFilesAdded': 5,
'numFilesRemoved': 100,
'filesAdded': {'min': 39000,
Expand Down Expand Up @@ -137,7 +137,7 @@ Let’s append some more data to the Delta table and see how we can selectively

Let’s append another 24 hours of data to the Delta table:

```
```python
for timestamp in itertools.islice(hours_iter, 24):
write_deltalake(
dt,
Expand All @@ -149,7 +149,7 @@ for timestamp in itertools.islice(hours_iter, 24):

We can use `get_add_actions()` to introspect the table state. We can see that `2021-01-06` has only a few hours of data so far, so we don't want to optimize that yet. But `2021-01-05` has all 24 hours of data, so it's ready to be optimized.

```
```python
dt.get_add_actions(flatten=True).to_pandas()[
"partition.date"
].value_counts().sort_index()
Expand All @@ -164,7 +164,7 @@ dt.get_add_actions(flatten=True).to_pandas()[

To optimize a single partition, you can pass in a `partition_filters` argument speficying which partitions to optimize.

```
```python
dt.optimize(partition_filters=[("date", "=", "2021-01-05")])

{'numFilesAdded': 1,
Expand All @@ -188,7 +188,7 @@ dt.optimize(partition_filters=[("date", "=", "2021-01-05")])

This optimize operation tombstones 21 small data files and adds one file with all the existing data properly condensed. Let’s take a look a portion of the `_delta_log/00000000000000000125.json` file, which is the transaction log entry that corresponds with this incremental optimize command.

```
```python
{
"remove": {
"path": "date=2021-01-05/part-00000-41178aab-2491-488f-943d-8f03867295ee-c000.snappy.parquet",
Expand Down Expand Up @@ -248,13 +248,13 @@ It’s normally a good idea to have a retention period of at least 7 days. For

Let’s run the vacuum command:

```
```python
dt.vacuum(retention_hours=0, enforce_retention_duration=False, dry_run=False)
```

The command returns a list of all the files that are removed from storage:

```
```python
['date=2021-01-02/39-a98680f2-0e0e-4f26-a491-18b183f9eb05-0.parquet',
'date=2021-01-02/41-e96bc8bb-c571-484c-b534-e897424fb7da-0.parquet',
Expand Down
Loading

0 comments on commit 8da7d52

Please sign in to comment.