Skip to content

Commit

Permalink
PR changes
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Oct 11, 2023
1 parent 0924bc5 commit 122d035
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 32 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test_destination_athena.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ env:
RUNTIME__DLTHUB_TELEMETRY_SEGMENT_WRITE_KEY: TLJiyRkGVZGCi2TtjClamXpFcxAA1rSB
ACTIVE_DESTINATIONS: "[\"athena\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"
EXCLUDED_DESTINATION_CONFIGURATIONS: "[\"athena-parquet-staging-iceberg\"]"

jobs:
get_docs_changes:
Expand Down
94 changes: 94 additions & 0 deletions .github/workflows/test_destination_athena_iceberg.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@

name: test athena iceberg

on:
pull_request:
branches:
- master
- devel
workflow_dispatch:

env:
DESTINATION__FILESYSTEM__CREDENTIALS__AWS_ACCESS_KEY_ID: AKIAT4QMVMC4J46G55G4
DESTINATION__FILESYSTEM__CREDENTIALS__AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
DESTINATION__ATHENA__CREDENTIALS__AWS_ACCESS_KEY_ID: AKIAT4QMVMC4J46G55G4
DESTINATION__ATHENA__CREDENTIALS__AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
DESTINATION__ATHENA__CREDENTIALS__REGION_NAME: eu-central-1
DESTINATION__ATHENA__QUERY_RESULT_BUCKET: s3://dlt-athena-output

RUNTIME__SENTRY_DSN: https://[email protected]/4504819859914752
RUNTIME__LOG_LEVEL: ERROR
RUNTIME__DLTHUB_TELEMETRY_SEGMENT_WRITE_KEY: TLJiyRkGVZGCi2TtjClamXpFcxAA1rSB
ACTIVE_DESTINATIONS: "[\"athena\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"
EXCLUDED_DESTINATION_CONFIGURATIONS: "[\"athena-no-staging\"]"

jobs:
get_docs_changes:
uses: ./.github/workflows/get_docs_changes.yml
# Tests that require credentials do not run in forks
if: ${{ !github.event.pull_request.head.repo.fork }}

run_loader:
name: test destination athena iceberg
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
strategy:
fail-fast: false
matrix:
os: ["ubuntu-latest"]
# os: ["ubuntu-latest", "macos-latest", "windows-latest"]
defaults:
run:
shell: bash
runs-on: ${{ matrix.os }}

steps:

- name: Check out
uses: actions/checkout@master

- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: "3.10.x"

- name: Install Poetry
uses: snok/[email protected]
with:
virtualenvs-create: true
virtualenvs-in-project: true
installer-parallel: true

- name: Load cached venv
id: cached-poetry-dependencies
uses: actions/cache@v3
with:
# path: ${{ steps.pip-cache.outputs.dir }}
path: .venv
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-athena

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E athena

- run: |
poetry run pytest tests/load
if: runner.os != 'Windows'
name: Run tests Linux/MAC
- run: |
poetry run pytest tests/load
if: runner.os == 'Windows'
name: Run tests Windows
shell: cmd
matrix_job_required_check:
name: Redshift, PostgreSQL and DuckDB tests
needs: run_loader
runs-on: ubuntu-latest
if: always()
steps:
- name: Check matrix job results
if: contains(needs.*.result, 'failure') || contains(needs.*.result, 'cancelled')
run: |
echo "One or more matrix job tests failed or were cancelled. You may need to re-run them." && exit 1
17 changes: 16 additions & 1 deletion dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,20 @@
from typing import ClassVar, Final, Optional, NamedTuple, Literal, Sequence, Iterable, Type, Protocol, Union, TYPE_CHECKING, cast, List, ContextManager, Dict, Any
from contextlib import contextmanager
import datetime # noqa: 251
from copy import deepcopy

from dlt.common import logger
from dlt.common.exceptions import IdentifierTooLongException, InvalidDestinationReference, UnknownDestinationModule
from dlt.common.schema import Schema, TTableSchema, TSchemaTables
from dlt.common.schema.typing import TWriteDisposition
from dlt.common.schema.exceptions import InvalidDatasetName
from dlt.common.schema.utils import get_load_table
from dlt.common.schema.utils import get_write_disposition, get_table_format
from dlt.common.configuration import configspec
from dlt.common.configuration.specs import BaseConfiguration, CredentialsConfiguration
from dlt.common.configuration.accessors import config
from dlt.common.destination.capabilities import DestinationCapabilitiesContext
from dlt.common.schema.utils import is_complete_column
from dlt.common.schema.exceptions import UnknownTableException
from dlt.common.storages import FileStorage
from dlt.common.storages.load_storage import ParsedLoadJobFileName
from dlt.common.utils import get_module_name
Expand Down Expand Up @@ -287,6 +289,19 @@ def _verify_schema(self) -> None:
if not is_complete_column(column):
logger.warning(f"A column {column_name} in table {table_name} in schema {self.schema.name} is incomplete. It was not bound to the data during normalizations stage and its data type is unknown. Did you add this column manually in code ie. as a merge key?")

def get_load_table(self, table_name: str, prepare_for_staging: bool = False) -> TTableSchema:
try:
# make a copy of the schema so modifications do not affect the original document
table = deepcopy(self.schema.tables[table_name])
# add write disposition if not specified - in child tables
if "write_disposition" not in table:
table["write_disposition"] = get_write_disposition(self.schema.tables, table_name)
if "table_format" not in table:
table["table_format"] = get_table_format(self.schema.tables, table_name)
return table
except KeyError:
raise UnknownTableException(table_name)


class WithStateSync(ABC):

Expand Down
16 changes: 2 additions & 14 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,11 +511,11 @@ def get_inherited_table_hint(tables: TSchemaTables, table_name: str, table_hint_

def get_write_disposition(tables: TSchemaTables, table_name: str) -> TWriteDisposition:
"""Returns table hint of a table if present. If not, looks up into parent table"""
return get_inherited_table_hint(tables, table_name, "write_disposition", allow_none=False)
return cast(TWriteDisposition, get_inherited_table_hint(tables, table_name, "write_disposition", allow_none=False))


def get_table_format(tables: TSchemaTables, table_name: str) -> TTableFormat:
return get_inherited_table_hint(tables, table_name, "table_format", allow_none=True)
return cast(TTableFormat, get_inherited_table_hint(tables, table_name, "table_format", allow_none=True))


def table_schema_has_type(table: TTableSchema, _typ: TDataType) -> bool:
Expand All @@ -536,18 +536,6 @@ def get_top_level_table(tables: TSchemaTables, table_name: str) -> TTableSchema:
return get_top_level_table(tables, parent)
return table

def get_load_table(tables: TSchemaTables, table_name: str) -> TTableSchema:
try:
# make a copy of the schema so modifications do not affect the original document
table = copy(tables[table_name])
# add write disposition if not specified - in child tables
if "write_disposition" not in table:
table["write_disposition"] = get_write_disposition(tables, table_name)
if "table_format" not in table:
table["table_format"] = get_table_format(tables, table_name)
return table
except KeyError:
raise UnknownTableException(table_name)

def get_child_tables(tables: TSchemaTables, table_name: str) -> List[TTableSchema]:
"""Get child tables for table name and return a list of tables ordered by ancestry so the child tables are always after their parents"""
Expand Down
10 changes: 8 additions & 2 deletions dlt/destinations/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from dlt.common import logger
from dlt.common.utils import without_none
from dlt.common.data_types import TDataType
from dlt.common.schema import TColumnSchema, Schema
from dlt.common.schema import TColumnSchema, Schema, TSchemaTables, TTableSchema
from dlt.common.schema.typing import TTableSchema, TColumnType, TWriteDisposition
from dlt.common.schema.utils import table_schema_has_type, get_table_format
from dlt.common.destination import DestinationCapabilitiesContext
Expand Down Expand Up @@ -325,7 +325,7 @@ def _get_table_update_sql(self, table_name: str, new_columns: Sequence[TColumnSc

# for the system tables we need to create empty iceberg tables to be able to run, DELETE and UPDATE queries
# or if we are in iceberg mode, we create iceberg tables for all tables
is_iceberg = (self.schema.tables[table_name].get("write_disposition", None) == "skip") or (self._is_iceberg_table(self.schema.tables[table_name]) and not self.in_staging_mode)
is_iceberg = (self.schema.tables[table_name].get("write_disposition", None) == "skip") or self._is_iceberg_table(self.schema.tables[table_name])
columns = ", ".join([self._get_column_def_sql(c) for c in new_columns])

# this will fail if the table prefix is not properly defined
Expand Down Expand Up @@ -381,6 +381,12 @@ def table_needs_staging(self, table: TTableSchema) -> bool:
if self._is_iceberg_table(table):
return True
return super().table_needs_staging(table)

def get_load_table(self, table_name: str, staging: bool = False) -> TTableSchema:
table = super().get_load_table(table_name, staging)
if staging and table.get("table_format", None) == "iceberg":
table.pop("table_format")
return table

@staticmethod
def is_dbapi_exception(ex: Exception) -> bool:
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/athena/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class AthenaClientConfiguration(DestinationClientDwhWithStagingConfiguration):
athena_work_group: Optional[str] = None
aws_data_catalog: Optional[str] = "awsdatacatalog"
supports_truncate_command: bool = False
force_iceberg: Optional[bool] = True

__config_gen_annotations__: ClassVar[List[str]] = ["athena_work_group"]

Expand Down
4 changes: 2 additions & 2 deletions dlt/load/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from dlt.common.configuration import with_config, known_sections
from dlt.common.configuration.accessors import config
from dlt.common.pipeline import LoadInfo, SupportsPipeline
from dlt.common.schema.utils import get_child_tables, get_top_level_table, get_load_table
from dlt.common.schema.utils import get_child_tables, get_top_level_table
from dlt.common.storages.load_storage import LoadPackageInfo, ParsedLoadJobFileName, TJobState
from dlt.common.typing import StrAny
from dlt.common.runners import TRunMetrics, Runnable, workermethod
Expand Down Expand Up @@ -98,7 +98,7 @@ def w_spool_job(self: "Load", file_path: str, load_id: str, schema: Schema) -> O
if job_info.file_format not in self.load_storage.supported_file_formats:
raise LoadClientUnsupportedFileFormats(job_info.file_format, self.capabilities.supported_loader_file_formats, file_path)
logger.info(f"Will load file {file_path} with table name {job_info.table_name}")
table = get_load_table(schema.tables, job_info.table_name)
table = job_client.get_load_table(job_info.table_name)
if table["write_disposition"] not in ["append", "replace", "merge"]:
raise LoadClientUnsupportedWriteDisposition(job_info.table_name, table["write_disposition"], file_path)
with self.maybe_with_staging_dataset(job_client, table):
Expand Down
11 changes: 1 addition & 10 deletions tests/load/test_dummy_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,7 @@ def test_get_new_jobs_info() -> None:
)

# no write disposition specified - get all new jobs
assert len(load.get_new_jobs_info(load_id, schema)) == 2
# empty list - none
assert len(load.get_new_jobs_info(load_id, schema, [])) == 0
# two appends
assert len(load.get_new_jobs_info(load_id, schema, ["append"])) == 2
assert len(load.get_new_jobs_info(load_id, schema, ["replace"])) == 0
assert len(load.get_new_jobs_info(load_id, schema, ["replace", "append"])) == 2

load.load_storage.start_job(load_id, "event_loop_interrupted.839c6e6b514e427687586ccc65bf133f.0.jsonl")
assert len(load.get_new_jobs_info(load_id, schema, ["replace", "append"])) == 1
assert len(load.get_new_jobs_info(load_id)) == 2


def test_get_completed_table_chain_single_job_per_table() -> None:
Expand Down
9 changes: 6 additions & 3 deletions tests/load/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from dlt.common.data_writers import DataWriter
from dlt.common.schema import TColumnSchema, TTableSchemaColumns, Schema
from dlt.common.storages import SchemaStorage, FileStorage, SchemaStorageConfiguration
from dlt.common.schema.utils import new_table, get_load_table
from dlt.common.schema.utils import new_table
from dlt.common.storages.load_storage import ParsedLoadJobFileName, LoadStorage
from dlt.common.typing import StrAny
from dlt.common.utils import uniq_id
Expand All @@ -26,7 +26,7 @@
from dlt.destinations.sql_client import SqlClientBase
from dlt.destinations.job_client_impl import SqlJobClientBase

from tests.utils import ACTIVE_DESTINATIONS, IMPLEMENTED_DESTINATIONS, SQL_DESTINATIONS
from tests.utils import ACTIVE_DESTINATIONS, IMPLEMENTED_DESTINATIONS, SQL_DESTINATIONS, EXCLUDED_DESTINATION_CONFIGURATIONS
from tests.cases import TABLE_UPDATE_COLUMNS_SCHEMA, TABLE_UPDATE, TABLE_ROW_ALL_DATA_TYPES, assert_all_data_types_row

# bucket urls
Expand All @@ -53,6 +53,7 @@ class DestinationTestConfiguration:
staging_iam_role: Optional[str] = None
extra_info: Optional[str] = None
supports_merge: bool = True # TODO: take it from client base class
force_iceberg: bool = True

@property
def name(self) -> str:
Expand All @@ -72,6 +73,7 @@ def setup(self) -> None:
os.environ['DESTINATION__FILESYSTEM__BUCKET_URL'] = self.bucket_url or ""
os.environ['DESTINATION__STAGE_NAME'] = self.stage_name or ""
os.environ['DESTINATION__STAGING_IAM_ROLE'] = self.staging_iam_role or ""
os.environ['DESTINATION__ATHENA__FORCE_ICEBERG'] = str(self.force_iceberg) or ""

"""For the filesystem destinations we disable compression to make analyzing the result easier"""
if self.destination == "filesystem":
Expand Down Expand Up @@ -108,6 +110,7 @@ def destinations_configs(
destination_configs += [DestinationTestConfiguration(destination=destination) for destination in SQL_DESTINATIONS if destination != "athena"]
# athena needs filesystem staging, which will be automatically set, we have to supply a bucket url though
destination_configs += [DestinationTestConfiguration(destination="athena", supports_merge=False, bucket_url=AWS_BUCKET)]
destination_configs += [DestinationTestConfiguration(destination="athena", staging="filesystem", file_format="parquet", bucket_url=AWS_BUCKET, force_iceberg=True, supports_merge=False, extra_info="iceberg")]

if default_vector_configs:
# for now only weaviate
Expand Down Expand Up @@ -170,7 +173,7 @@ def load_table(name: str) -> Dict[str, TTableSchemaColumns]:
def expect_load_file(client: JobClientBase, file_storage: FileStorage, query: str, table_name: str, status = "completed") -> LoadJob:
file_name = ParsedLoadJobFileName(table_name, uniq_id(), 0, client.capabilities.preferred_loader_file_format).job_id()
file_storage.save(file_name, query.encode("utf-8"))
table = get_load_table(client.schema.tables, table_name)
table = client.get_load_table(table_name)
job = client.start_file_load(table, file_storage.make_full_path(file_name), uniq_id())
while job.state() == "running":
sleep(0.5)
Expand Down
4 changes: 4 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
NON_SQL_DESTINATIONS = {"filesystem", "weaviate", "dummy", "motherduck"}
SQL_DESTINATIONS = IMPLEMENTED_DESTINATIONS - NON_SQL_DESTINATIONS

# exclude destination configs (for now used for athena and athena iceberg separation)
EXCLUDED_DESTINATION_CONFIGURATIONS = set(dlt.config.get("EXCLUDED_DESTINATION_CONFIGURATIONS", list) or set())


# filter out active destinations for current tests
ACTIVE_DESTINATIONS = set(dlt.config.get("ACTIVE_DESTINATIONS", list) or IMPLEMENTED_DESTINATIONS)

Expand Down

0 comments on commit 122d035

Please sign in to comment.