Skip to content

Commit

Permalink
athena iceberg (#659)
Browse files Browse the repository at this point in the history
* first iceberg prototype

* enable iceberg tests for athena

* fix iceberg detection

* move athena tests to default sql configs

* fix replace disposition

* fix datatype support

* fix append for merge in iceberg

* fix merge jobs for iceberg

* clean up followup jobs code

* set iceberg tests to merge supported

* fix sql merge syntax for iceberg

* separate regular athena and iceberg tests

* remove some iceberg specific code

* make type mapper table format sensitive

* disable dbt tests for athena iceberg

* updates athena dbt docs

* adds docsting on table format to decorators

---------

Co-authored-by: Marcin Rudolf <[email protected]>
  • Loading branch information
sh-rp and rudolfix authored Oct 16, 2023
1 parent 90ff990 commit 87b210b
Show file tree
Hide file tree
Showing 33 changed files with 606 additions and 253 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test_destination_athena.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ env:
RUNTIME__DLTHUB_TELEMETRY_SEGMENT_WRITE_KEY: TLJiyRkGVZGCi2TtjClamXpFcxAA1rSB
ACTIVE_DESTINATIONS: "[\"athena\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"
EXCLUDED_DESTINATION_CONFIGURATIONS: "[\"athena-parquet-staging-iceberg\"]"

jobs:
get_docs_changes:
Expand Down
94 changes: 94 additions & 0 deletions .github/workflows/test_destination_athena_iceberg.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@

name: test athena iceberg

on:
pull_request:
branches:
- master
- devel
workflow_dispatch:

env:
DESTINATION__FILESYSTEM__CREDENTIALS__AWS_ACCESS_KEY_ID: AKIAT4QMVMC4J46G55G4
DESTINATION__FILESYSTEM__CREDENTIALS__AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
DESTINATION__ATHENA__CREDENTIALS__AWS_ACCESS_KEY_ID: AKIAT4QMVMC4J46G55G4
DESTINATION__ATHENA__CREDENTIALS__AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
DESTINATION__ATHENA__CREDENTIALS__REGION_NAME: eu-central-1
DESTINATION__ATHENA__QUERY_RESULT_BUCKET: s3://dlt-athena-output

RUNTIME__SENTRY_DSN: https://[email protected]/4504819859914752
RUNTIME__LOG_LEVEL: ERROR
RUNTIME__DLTHUB_TELEMETRY_SEGMENT_WRITE_KEY: TLJiyRkGVZGCi2TtjClamXpFcxAA1rSB
ACTIVE_DESTINATIONS: "[\"athena\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"
EXCLUDED_DESTINATION_CONFIGURATIONS: "[\"athena-no-staging\"]"

jobs:
get_docs_changes:
uses: ./.github/workflows/get_docs_changes.yml
# Tests that require credentials do not run in forks
if: ${{ !github.event.pull_request.head.repo.fork }}

run_loader:
name: test destination athena iceberg
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
strategy:
fail-fast: false
matrix:
os: ["ubuntu-latest"]
# os: ["ubuntu-latest", "macos-latest", "windows-latest"]
defaults:
run:
shell: bash
runs-on: ${{ matrix.os }}

steps:

- name: Check out
uses: actions/checkout@master

- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: "3.10.x"

- name: Install Poetry
uses: snok/[email protected]
with:
virtualenvs-create: true
virtualenvs-in-project: true
installer-parallel: true

- name: Load cached venv
id: cached-poetry-dependencies
uses: actions/cache@v3
with:
# path: ${{ steps.pip-cache.outputs.dir }}
path: .venv
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-athena

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E athena

- run: |
poetry run pytest tests/load
if: runner.os != 'Windows'
name: Run tests Linux/MAC
- run: |
poetry run pytest tests/load
if: runner.os == 'Windows'
name: Run tests Windows
shell: cmd
matrix_job_required_check:
name: Redshift, PostgreSQL and DuckDB tests
needs: run_loader
runs-on: ubuntu-latest
if: always()
steps:
- name: Check matrix job results
if: contains(needs.*.result, 'failure') || contains(needs.*.result, 'cancelled')
run: |
echo "One or more matrix job tests failed or were cancelled. You may need to re-run them." && exit 1
37 changes: 31 additions & 6 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@
from typing import ClassVar, Final, Optional, NamedTuple, Literal, Sequence, Iterable, Type, Protocol, Union, TYPE_CHECKING, cast, List, ContextManager, Dict, Any
from contextlib import contextmanager
import datetime # noqa: 251
from copy import deepcopy

from dlt.common import logger
from dlt.common.exceptions import IdentifierTooLongException, InvalidDestinationReference, UnknownDestinationModule
from dlt.common.schema import Schema, TTableSchema, TSchemaTables
from dlt.common.schema.typing import TWriteDisposition
from dlt.common.schema.exceptions import InvalidDatasetName
from dlt.common.schema.utils import get_write_disposition, get_table_format
from dlt.common.configuration import configspec
from dlt.common.configuration.specs import BaseConfiguration, CredentialsConfiguration
from dlt.common.configuration.accessors import config
from dlt.common.destination.capabilities import DestinationCapabilitiesContext
from dlt.common.schema.utils import is_complete_column
from dlt.common.schema.exceptions import UnknownTableException
from dlt.common.storages import FileStorage
from dlt.common.storages.load_storage import ParsedLoadJobFileName
from dlt.common.utils import get_module_name
Expand Down Expand Up @@ -244,9 +247,8 @@ def restore_file_load(self, file_path: str) -> LoadJob:
"""Finds and restores already started loading job identified by `file_path` if destination supports it."""
pass

def get_truncate_destination_table_dispositions(self) -> List[TWriteDisposition]:
# in the base job, all replace strategies are treated the same, see filesystem for example
return ["replace"]
def should_truncate_table_before_load(self, table: TTableSchema) -> bool:
return table["write_disposition"] == "replace"

def create_table_chain_completed_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]:
"""Creates a list of followup jobs that should be executed after a table chain is completed"""
Expand Down Expand Up @@ -287,6 +289,21 @@ def _verify_schema(self) -> None:
if not is_complete_column(column):
logger.warning(f"A column {column_name} in table {table_name} in schema {self.schema.name} is incomplete. It was not bound to the data during normalizations stage and its data type is unknown. Did you add this column manually in code ie. as a merge key?")

def get_load_table(self, table_name: str, prepare_for_staging: bool = False) -> TTableSchema:
if table_name not in self.schema.tables:
return None
try:
# make a copy of the schema so modifications do not affect the original document
table = deepcopy(self.schema.tables[table_name])
# add write disposition if not specified - in child tables
if "write_disposition" not in table:
table["write_disposition"] = get_write_disposition(self.schema.tables, table_name)
if "table_format" not in table:
table["table_format"] = get_table_format(self.schema.tables, table_name)
return table
except KeyError:
raise UnknownTableException(table_name)


class WithStateSync(ABC):

Expand All @@ -309,15 +326,23 @@ class WithStagingDataset(ABC):
"""Adds capability to use staging dataset and request it from the loader"""

@abstractmethod
def get_stage_dispositions(self) -> List[TWriteDisposition]:
"""Returns a list of write dispositions that require staging dataset"""
return []
def should_load_data_to_staging_dataset(self, table: TTableSchema) -> bool:
return False

@abstractmethod
def with_staging_dataset(self)-> ContextManager["JobClientBase"]:
"""Executes job client methods on staging dataset"""
return self # type: ignore

class SupportsStagingDestination():
"""Adds capability to support a staging destination for the load"""

def should_load_data_to_staging_dataset_on_staging_destination(self, table: TTableSchema) -> bool:
return False

def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool:
# the default is to truncate the tables on the staging destination...
return True

TDestinationReferenceArg = Union["DestinationReference", ModuleType, None, str]

Expand Down
5 changes: 5 additions & 0 deletions dlt/common/schema/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,8 @@ def __init__(self, schema_name: str, init_engine: int, from_engine: int, to_engi
self.from_engine = from_engine
self.to_engine = to_engine
super().__init__(f"No engine upgrade path in schema {schema_name} from {init_engine} to {to_engine}, stopped at {from_engine}")

class UnknownTableException(SchemaException):
def __init__(self, table_name: str) -> None:
self.table_name = table_name
super().__init__(f"Trying to access unknown table {table_name}.")
2 changes: 2 additions & 0 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
TColumnHint = Literal["not_null", "partition", "cluster", "primary_key", "foreign_key", "sort", "unique", "root_key", "merge_key"]
"""Known hints of a column used to declare hint regexes."""
TWriteDisposition = Literal["skip", "append", "replace", "merge"]
TTableFormat = Literal["iceberg"]
TTypeDetections = Literal["timestamp", "iso_timestamp", "large_integer", "hexbytes_to_text", "wei_to_double"]
TTypeDetectionFunc = Callable[[Type[Any], Any], Optional[TDataType]]
TColumnNames = Union[str, Sequence[str]]
Expand Down Expand Up @@ -86,6 +87,7 @@ class TTableSchema(TypedDict, total=False):
filters: Optional[TRowFilters]
columns: TTableSchemaColumns
resource: Optional[str]
table_format: Optional[TTableFormat]


class TPartialTableSchema(TTableSchema):
Expand Down
36 changes: 25 additions & 11 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
from dlt.common.validation import TCustomValidator, validate_dict, validate_dict_ignoring_xkeys
from dlt.common.schema import detections
from dlt.common.schema.typing import (COLUMN_HINTS, SCHEMA_ENGINE_VERSION, LOADS_TABLE_NAME, SIMPLE_REGEX_PREFIX, VERSION_TABLE_NAME, TColumnName, TPartialTableSchema, TSchemaTables, TSchemaUpdate,
TSimpleRegex, TStoredSchema, TTableSchema, TTableSchemaColumns, TColumnSchemaBase, TColumnSchema, TColumnProp,
TSimpleRegex, TStoredSchema, TTableSchema, TTableSchemaColumns, TColumnSchemaBase, TColumnSchema, TColumnProp, TTableFormat,
TColumnHint, TTypeDetectionFunc, TTypeDetections, TWriteDisposition)
from dlt.common.schema.exceptions import (CannotCoerceColumnException, ParentTableNotFoundException, SchemaEngineNoUpgradePathException, SchemaException,
TablePropertiesConflictException, InvalidSchemaName)
TablePropertiesConflictException, InvalidSchemaName, UnknownTableException)

from dlt.common.normalizers.utils import import_normalizers
from dlt.common.schema.typing import TAnySchemaColumns
Expand Down Expand Up @@ -493,18 +493,29 @@ def merge_schema_updates(schema_updates: Sequence[TSchemaUpdate]) -> TSchemaTabl
return aggregated_update


def get_write_disposition(tables: TSchemaTables, table_name: str) -> TWriteDisposition:
"""Returns write disposition of a table if present. If not, looks up into parent table"""
table = tables[table_name]
w_d = table.get("write_disposition")
if w_d:
return w_d
def get_inherited_table_hint(tables: TSchemaTables, table_name: str, table_hint_name: str, allow_none: bool = False) -> Any:
table = tables.get(table_name, {})
hint = table.get(table_hint_name)
if hint:
return hint

parent = table.get("parent")
if parent:
return get_write_disposition(tables, parent)
return get_inherited_table_hint(tables, parent, table_hint_name, allow_none)

if allow_none:
return None

raise ValueError(f"No table hint '{table_hint_name} found in the chain of tables for '{table_name}'.")


def get_write_disposition(tables: TSchemaTables, table_name: str) -> TWriteDisposition:
"""Returns table hint of a table if present. If not, looks up into parent table"""
return cast(TWriteDisposition, get_inherited_table_hint(tables, table_name, "write_disposition", allow_none=False))


raise ValueError(f"No write disposition found in the chain of tables for '{table_name}'.")
def get_table_format(tables: TSchemaTables, table_name: str) -> TTableFormat:
return cast(TTableFormat, get_inherited_table_hint(tables, table_name, "table_format", allow_none=True))


def table_schema_has_type(table: TTableSchema, _typ: TDataType) -> bool:
Expand Down Expand Up @@ -637,7 +648,8 @@ def new_table(
write_disposition: TWriteDisposition = None,
columns: Sequence[TColumnSchema] = None,
validate_schema: bool = False,
resource: str = None
resource: str = None,
table_format: TTableFormat = None
) -> TTableSchema:

table: TTableSchema = {
Expand All @@ -652,6 +664,8 @@ def new_table(
# set write disposition only for root tables
table["write_disposition"] = write_disposition or DEFAULT_WRITE_DISPOSITION
table["resource"] = resource or table_name
if table_format:
table["table_format"] = table_format
if validate_schema:
validate_dict_ignoring_xkeys(
spec=TColumnSchema,
Expand Down
5 changes: 4 additions & 1 deletion dlt/common/storages/load_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,11 @@ def list_failed_jobs(self, load_id: str) -> Sequence[str]:
return self.storage.list_folder_files(self._get_job_folder_path(load_id, LoadStorage.FAILED_JOBS_FOLDER))

def list_jobs_for_table(self, load_id: str, table_name: str) -> Sequence[LoadJobInfo]:
return [job for job in self.list_all_jobs(load_id) if job.job_file_info.table_name == table_name]

def list_all_jobs(self, load_id: str) -> Sequence[LoadJobInfo]:
info = self.get_load_package_info(load_id)
return [job for job in flatten_list_or_items(iter(info.jobs.values())) if job.job_file_info.table_name == table_name] # type: ignore
return [job for job in flatten_list_or_items(iter(info.jobs.values()))] # type: ignore

def list_completed_failed_jobs(self, load_id: str) -> Sequence[str]:
return self.storage.list_folder_files(self._get_job_folder_completed_path(load_id, LoadStorage.FAILED_JOBS_FOLDER))
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/athena/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def capabilities() -> DestinationCapabilitiesContext:
caps.alter_add_multi_column = True
caps.schema_supports_numeric_precision = False
caps.timestamp_precision = 3
caps.supports_truncate_command = False
return caps


Expand Down
Loading

0 comments on commit 87b210b

Please sign in to comment.