Skip to content

Commit

Permalink
chore: set up logging, partition diffs and update pyiceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
JasperHG90 committed Nov 22, 2024
1 parent 38c9bd0 commit 3fea4d8
Show file tree
Hide file tree
Showing 9 changed files with 403 additions and 38 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ authors = [
requires-python = ">=3.10"
readme = "README.md"
dependencies = [
"pyiceberg[pyarrow]==0.8.0rc1",
"pyiceberg[pyarrow]>=0.8",
"dagster>=1.8.2",
"pendulum>=3.0.0",
"tenacity>=8.5.0",
Expand Down
62 changes: 53 additions & 9 deletions src/dagster_pyiceberg/_utils/io.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from typing import Dict, List, Optional, Sequence, Union

import pyarrow as pa
Expand All @@ -6,18 +7,25 @@
from pyiceberg import expressions as E
from pyiceberg import table as iceberg_table
from pyiceberg.catalog import Catalog
from pyiceberg.exceptions import CommitFailedException, TableAlreadyExistsError
from pyiceberg.exceptions import (
CommitFailedException,
NoSuchTableError,
TableAlreadyExistsError,
)
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.schema import Schema

from dagster_pyiceberg._utils.partitions import (
DagsterPartitionToPyIcebergExpressionMapper,
update_table_partition_spec,
)
from dagster_pyiceberg._utils.properties import update_table_properties
from dagster_pyiceberg._utils.retries import PyIcebergOperationWithRetry
from dagster_pyiceberg._utils.schema import update_table_schema
from dagster_pyiceberg.version import __version__ as dagster_pyiceberg_version

logger = logging.getLogger("dagster_pyiceberg._utils.io")


def table_writer(
table_slice: TableSlice,
Expand Down Expand Up @@ -53,6 +61,9 @@ def table_writer(
"pyiceberg-version": iceberg_version,
"dagster-pyiceberg-version": dagster_pyiceberg_version,
}
logger.debug(
f"Writing data to table {table_path} with properties {base_properties}"
)
# In practice, partition_dimensions is an empty list for unpartitioned assets and not None
# even though it's the default value.
partition_exprs: List[str] | None = None
Expand All @@ -69,31 +80,42 @@ def table_writer(
" 'partition_expr' in the asset metadata?"
)
partition_dimensions = table_slice.partition_dimensions
if catalog.table_exists(table_path):
logger.debug(f"Partition dimensions: {partition_dimensions}")
if table_exists(catalog, table_path):
logger.debug("Updating existing table")
table = catalog.load_table(table_path)
# Check if the table has partition dimensions set
num_partition_fields = len(table.spec().fields)
logger.debug(
f"Current table version has {num_partition_fields} partition fields"
)
# Check if schema matches. If not, update
update_table_schema(
table=table,
new_table_schema=data.schema,
schema_update_mode=schema_update_mode,
)
# Check if partitions match. If not, update
if partition_dimensions is not None:
if (partition_dimensions is not None) or (num_partition_fields > 0):
update_table_partition_spec(
table=table,
# Refresh metadata just in case a partition column was dropped
table=table.refresh(),
table_slice=table_slice,
partition_spec_update_mode=partition_spec_update_mode,
)
if table_properties is not None:
update_table_properties(
table=table,
current_table_properties=table.properties,
new_table_properties=table_properties,
)
else:
logger.debug("Creating new table")
table = create_table_if_not_exists(
catalog=catalog,
table_path=table_path,
schema=data.schema,
properties=(
table_properties | base_properties
if table_properties is not None
else base_properties
),
properties=(table_properties if table_properties is not None else {}),
)
if partition_dimensions is not None:
update_table_partition_spec(
Expand Down Expand Up @@ -145,6 +167,27 @@ def get_expression_row_filter(
)


def table_exists(catalog: Catalog, table_path: str) -> bool:
"""Checks if a table exists in the iceberg catalog
NB: This is custom logic because for some reason, the PyIceberg REST implementation
doesn't seem to work properly for catalog.table_exists(table_path). This is a workaround
so that users don't run into strange issues when updating an existing table.
Args:
catalog (Catalog): PyIceberg catalogs supported by this library
table_path (str): Table path
Returns:
bool: True if the table exists, False otherwise
"""
try:
catalog.load_table(table_path)
return True
except NoSuchTableError:
return False


def create_table_if_not_exists(
catalog: Catalog,
table_path: str,
Expand Down Expand Up @@ -225,6 +268,7 @@ def operation(
overwrite_filter: Union[E.BooleanExpression, str],
snapshot_properties: Optional[Dict[str, str]] = None,
):
self.logger.debug(f"Overwriting table with filter: {overwrite_filter}")
self.table.overwrite(
df=data,
overwrite_filter=overwrite_filter,
Expand Down
16 changes: 14 additions & 2 deletions src/dagster_pyiceberg/_utils/partitions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime as dt
import itertools
import logging
from abc import abstractmethod
from typing import (
Dict,
Expand Down Expand Up @@ -227,6 +228,7 @@ def update_table_partition_spec(
class PyIcebergPartitionSpecUpdaterWithRetry(PyIcebergOperationWithRetry):

def operation(self, table_slice: TableSlice, partition_spec_update_mode: str):
self.logger.debug("Updating table partition spec")
IcebergTableSpecUpdater(
partition_mapping=PartitionMapper(
table_slice=table_slice,
Expand Down Expand Up @@ -399,15 +401,19 @@ def new(self) -> List[TablePartitionDimension]:
"""Retrieve partition dimensions that are not yet present in the iceberg table."""
return [
p
for p in self.get_table_slice_partition_dimensions()
for p in self.get_table_slice_partition_dimensions(
allow_empty_dagster_partitions=True
)
if p.partition_expr in self.new_partition_field_names
]

def updated(self) -> List[TablePartitionDimension]:
"""Retrieve partition dimensions that have been updated."""
return [
p
for p in self.get_table_slice_partition_dimensions()
for p in self.get_table_slice_partition_dimensions(
allow_empty_dagster_partitions=True
)
if p.partition_expr == self.updated_dagster_time_partition_field
]

Expand All @@ -429,6 +435,9 @@ def __init__(
):
self.partition_spec_update_mode = partition_spec_update_mode
self.partition_mapping = partition_mapping
self.logger = logging.getLogger(
"dagster_pyiceberg._utils.partitions.IcebergTableSpecUpdater"
)

def _changes(
self,
Expand All @@ -444,13 +453,16 @@ def _spec_update(self, update: UpdateSpec, partition: TablePartitionDimension):
self._spec_new(update=update, partition=partition)

def _spec_delete(self, update: UpdateSpec, partition_name: str):
self.logger.debug("Removing partition column: %s", partition_name)
update.remove_field(name=partition_name)

def _spec_new(self, update: UpdateSpec, partition: TablePartitionDimension):
if isinstance(partition.partitions, TimeWindow):
transform = diff_to_transformation(*partition.partitions)
else:
transform = IdentityTransform()
self.logger.debug("Setting new partition column: %s", partition.partition_expr)
self.logger.debug("Using transform: %s", transform)
update.add_field(
source_column_name=partition.partition_expr,
transform=transform,
Expand Down
104 changes: 104 additions & 0 deletions src/dagster_pyiceberg/_utils/properties.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import logging
from functools import cached_property
from typing import List

from pyiceberg import table
from pyiceberg.exceptions import CommitFailedException

from dagster_pyiceberg._utils.retries import PyIcebergOperationWithRetry


def update_table_properties(
table: table.Table, current_table_properties: dict, new_table_properties: dict
):
PyIcebergPropertiesUpdaterWithRetry(table=table).execute(
retries=3,
exception_types=CommitFailedException,
current_table_properties=current_table_properties,
new_table_properties=new_table_properties,
)


class PyIcebergPropertiesUpdaterWithRetry(PyIcebergOperationWithRetry):

def operation(self, current_table_properties: dict, new_table_properties: dict):
IcebergTablePropertiesUpdater(
table_properties_differ=TablePropertiesDiffer(
current_table_properties=current_table_properties,
new_table_properties=new_table_properties,
),
).update_table_properties(self.table, table_properties=new_table_properties)


class TablePropertiesDiffer:

def __init__(self, current_table_properties: dict, new_table_properties: dict):
self.current_table_properties = current_table_properties
self.new_table_properties = new_table_properties

@property
def has_changes(self) -> bool:
return (
not (
len(self.updated_properties)
+ len(self.deleted_properties)
+ len(self.new_properties)
)
== 0
)

@cached_property
def updated_properties(self) -> List[str]:
updated = []
for k in self.new_table_properties.keys():
if (
k in self.current_table_properties
and self.current_table_properties[k] != self.new_table_properties[k]
):
updated.append(k)
return updated

@cached_property
def deleted_properties(self) -> List[str]:
return list(
set(self.current_table_properties.keys())
- set(self.new_table_properties.keys())
)

@cached_property
def new_properties(self) -> List[str]:
return list(
set(self.new_table_properties.keys())
- set(self.current_table_properties.keys())
)


class IcebergTablePropertiesUpdater:

def __init__(
self,
table_properties_differ: TablePropertiesDiffer,
):
self.table_properties_differ = table_properties_differ
self.logger = logging.getLogger(
"dagster_pyiceberg._utils.schema.IcebergTablePropertiesUpdater"
)

@property
def deleted_properties(self):
return self.table_properties_differ.deleted_properties

def update_table_properties(self, table: table.Table, table_properties: dict):
if not self.table_properties_differ.has_changes:
return
else:
self.logger.debug("Updating table properties")
with table.transaction() as tx:
self.logger.debug(
f"Deleting table properties '{self.deleted_properties}'"
)
tx.remove_properties(*self.deleted_properties)
self.logger.debug(
f"Updating table properties if applicable using '{table_properties}'"
)
tx.set_properties(table_properties)
4 changes: 4 additions & 0 deletions src/dagster_pyiceberg/_utils/retries.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from abc import ABCMeta, abstractmethod
from typing import Tuple

Expand All @@ -18,6 +19,9 @@ class PyIcebergOperationWithRetry(metaclass=ABCMeta):

def __init__(self, table: Table):
self.table = table
self.logger = logging.getLogger(
f"dagster_pyiceberg._utils.{self.__class__.__name__}"
)

@abstractmethod
def operation(self, *args, **kwargs): ...
Expand Down
9 changes: 8 additions & 1 deletion src/dagster_pyiceberg/_utils/schema.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from functools import cached_property
from typing import List

Expand All @@ -16,7 +17,6 @@ def update_table_schema(
new_table_schema=new_table_schema,
schema_update_mode=schema_update_mode,
)
new_table_schema.names


class PyIcebergSchemaUpdaterWithRetry(PyIcebergOperationWithRetry):
Expand Down Expand Up @@ -65,6 +65,9 @@ def __init__(
):
self.schema_update_mode = schema_update_mode
self.schema_differ = schema_differ
self.logger = logging.getLogger(
"dagster_pyiceberg._utils.schema.IcebergTableSchemaUpdater"
)

def update_table_schema(self, table: table.Table):
if self.schema_update_mode == "error" and self.schema_differ.has_changes:
Expand All @@ -76,6 +79,10 @@ def update_table_schema(self, table: table.Table):
else:
with table.update_schema() as update:
for column in self.schema_differ.deleted_columns:
self.logger.debug(f"Deleting column '{column}' from schema")
update.delete_column(column)
if self.schema_differ.new_columns:
self.logger.debug(
f"Merging schemas with new columns {self.schema_differ.new_columns}"
)
update.union_by_name(self.schema_differ.new_table_schema)
Loading

0 comments on commit 3fea4d8

Please sign in to comment.