From 122d035d147114562639a4651f9ad38f80eca33d Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 11 Oct 2023 16:55:29 +0200 Subject: [PATCH] PR changes --- .github/workflows/test_destination_athena.yml | 1 + .../test_destination_athena_iceberg.yml | 94 +++++++++++++++++++ dlt/common/destination/reference.py | 17 +++- dlt/common/schema/utils.py | 16 +--- dlt/destinations/athena/athena.py | 10 +- dlt/destinations/athena/configuration.py | 1 + dlt/load/load.py | 4 +- tests/load/test_dummy_client.py | 11 +-- tests/load/utils.py | 9 +- tests/utils.py | 4 + 10 files changed, 135 insertions(+), 32 deletions(-) create mode 100644 .github/workflows/test_destination_athena_iceberg.yml diff --git a/.github/workflows/test_destination_athena.yml b/.github/workflows/test_destination_athena.yml index 16c9caff53..704e66522b 100644 --- a/.github/workflows/test_destination_athena.yml +++ b/.github/workflows/test_destination_athena.yml @@ -21,6 +21,7 @@ env: RUNTIME__DLTHUB_TELEMETRY_SEGMENT_WRITE_KEY: TLJiyRkGVZGCi2TtjClamXpFcxAA1rSB ACTIVE_DESTINATIONS: "[\"athena\"]" ALL_FILESYSTEM_DRIVERS: "[\"memory\"]" + EXCLUDED_DESTINATION_CONFIGURATIONS: "[\"athena-parquet-staging-iceberg\"]" jobs: get_docs_changes: diff --git a/.github/workflows/test_destination_athena_iceberg.yml b/.github/workflows/test_destination_athena_iceberg.yml new file mode 100644 index 0000000000..6892a96bf1 --- /dev/null +++ b/.github/workflows/test_destination_athena_iceberg.yml @@ -0,0 +1,94 @@ + +name: test athena iceberg + +on: + pull_request: + branches: + - master + - devel + workflow_dispatch: + +env: + DESTINATION__FILESYSTEM__CREDENTIALS__AWS_ACCESS_KEY_ID: AKIAT4QMVMC4J46G55G4 + DESTINATION__FILESYSTEM__CREDENTIALS__AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + DESTINATION__ATHENA__CREDENTIALS__AWS_ACCESS_KEY_ID: AKIAT4QMVMC4J46G55G4 + DESTINATION__ATHENA__CREDENTIALS__AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + DESTINATION__ATHENA__CREDENTIALS__REGION_NAME: eu-central-1 + DESTINATION__ATHENA__QUERY_RESULT_BUCKET: s3://dlt-athena-output + + RUNTIME__SENTRY_DSN: https://6f6f7b6f8e0f458a89be4187603b55fe@o1061158.ingest.sentry.io/4504819859914752 + RUNTIME__LOG_LEVEL: ERROR + RUNTIME__DLTHUB_TELEMETRY_SEGMENT_WRITE_KEY: TLJiyRkGVZGCi2TtjClamXpFcxAA1rSB + ACTIVE_DESTINATIONS: "[\"athena\"]" + ALL_FILESYSTEM_DRIVERS: "[\"memory\"]" + EXCLUDED_DESTINATION_CONFIGURATIONS: "[\"athena-no-staging\"]" + +jobs: + get_docs_changes: + uses: ./.github/workflows/get_docs_changes.yml + # Tests that require credentials do not run in forks + if: ${{ !github.event.pull_request.head.repo.fork }} + + run_loader: + name: test destination athena iceberg + needs: get_docs_changes + if: needs.get_docs_changes.outputs.changes_outside_docs == 'true' + strategy: + fail-fast: false + matrix: + os: ["ubuntu-latest"] + # os: ["ubuntu-latest", "macos-latest", "windows-latest"] + defaults: + run: + shell: bash + runs-on: ${{ matrix.os }} + + steps: + + - name: Check out + uses: actions/checkout@master + + - name: Setup Python + uses: actions/setup-python@v4 + with: + python-version: "3.10.x" + + - name: Install Poetry + uses: snok/install-poetry@v1.3.2 + with: + virtualenvs-create: true + virtualenvs-in-project: true + installer-parallel: true + + - name: Load cached venv + id: cached-poetry-dependencies + uses: actions/cache@v3 + with: + # path: ${{ steps.pip-cache.outputs.dir }} + path: .venv + key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-athena + + - name: Install dependencies + # if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true' + run: poetry install --no-interaction -E athena + + - run: | + poetry run pytest tests/load + if: runner.os != 'Windows' + name: Run tests Linux/MAC + - run: | + poetry run pytest tests/load + if: runner.os == 'Windows' + name: Run tests Windows + shell: cmd + + matrix_job_required_check: + name: Redshift, PostgreSQL and DuckDB tests + needs: run_loader + runs-on: ubuntu-latest + if: always() + steps: + - name: Check matrix job results + if: contains(needs.*.result, 'failure') || contains(needs.*.result, 'cancelled') + run: | + echo "One or more matrix job tests failed or were cancelled. You may need to re-run them." && exit 1 diff --git a/dlt/common/destination/reference.py b/dlt/common/destination/reference.py index 5fea462159..c2dc0ebcc6 100644 --- a/dlt/common/destination/reference.py +++ b/dlt/common/destination/reference.py @@ -4,18 +4,20 @@ from typing import ClassVar, Final, Optional, NamedTuple, Literal, Sequence, Iterable, Type, Protocol, Union, TYPE_CHECKING, cast, List, ContextManager, Dict, Any from contextlib import contextmanager import datetime # noqa: 251 +from copy import deepcopy from dlt.common import logger from dlt.common.exceptions import IdentifierTooLongException, InvalidDestinationReference, UnknownDestinationModule from dlt.common.schema import Schema, TTableSchema, TSchemaTables from dlt.common.schema.typing import TWriteDisposition from dlt.common.schema.exceptions import InvalidDatasetName -from dlt.common.schema.utils import get_load_table +from dlt.common.schema.utils import get_write_disposition, get_table_format from dlt.common.configuration import configspec from dlt.common.configuration.specs import BaseConfiguration, CredentialsConfiguration from dlt.common.configuration.accessors import config from dlt.common.destination.capabilities import DestinationCapabilitiesContext from dlt.common.schema.utils import is_complete_column +from dlt.common.schema.exceptions import UnknownTableException from dlt.common.storages import FileStorage from dlt.common.storages.load_storage import ParsedLoadJobFileName from dlt.common.utils import get_module_name @@ -287,6 +289,19 @@ def _verify_schema(self) -> None: if not is_complete_column(column): logger.warning(f"A column {column_name} in table {table_name} in schema {self.schema.name} is incomplete. It was not bound to the data during normalizations stage and its data type is unknown. Did you add this column manually in code ie. as a merge key?") + def get_load_table(self, table_name: str, prepare_for_staging: bool = False) -> TTableSchema: + try: + # make a copy of the schema so modifications do not affect the original document + table = deepcopy(self.schema.tables[table_name]) + # add write disposition if not specified - in child tables + if "write_disposition" not in table: + table["write_disposition"] = get_write_disposition(self.schema.tables, table_name) + if "table_format" not in table: + table["table_format"] = get_table_format(self.schema.tables, table_name) + return table + except KeyError: + raise UnknownTableException(table_name) + class WithStateSync(ABC): diff --git a/dlt/common/schema/utils.py b/dlt/common/schema/utils.py index 93f0913550..b89925d9b0 100644 --- a/dlt/common/schema/utils.py +++ b/dlt/common/schema/utils.py @@ -511,11 +511,11 @@ def get_inherited_table_hint(tables: TSchemaTables, table_name: str, table_hint_ def get_write_disposition(tables: TSchemaTables, table_name: str) -> TWriteDisposition: """Returns table hint of a table if present. If not, looks up into parent table""" - return get_inherited_table_hint(tables, table_name, "write_disposition", allow_none=False) + return cast(TWriteDisposition, get_inherited_table_hint(tables, table_name, "write_disposition", allow_none=False)) def get_table_format(tables: TSchemaTables, table_name: str) -> TTableFormat: - return get_inherited_table_hint(tables, table_name, "table_format", allow_none=True) + return cast(TTableFormat, get_inherited_table_hint(tables, table_name, "table_format", allow_none=True)) def table_schema_has_type(table: TTableSchema, _typ: TDataType) -> bool: @@ -536,18 +536,6 @@ def get_top_level_table(tables: TSchemaTables, table_name: str) -> TTableSchema: return get_top_level_table(tables, parent) return table -def get_load_table(tables: TSchemaTables, table_name: str) -> TTableSchema: - try: - # make a copy of the schema so modifications do not affect the original document - table = copy(tables[table_name]) - # add write disposition if not specified - in child tables - if "write_disposition" not in table: - table["write_disposition"] = get_write_disposition(tables, table_name) - if "table_format" not in table: - table["table_format"] = get_table_format(tables, table_name) - return table - except KeyError: - raise UnknownTableException(table_name) def get_child_tables(tables: TSchemaTables, table_name: str) -> List[TTableSchema]: """Get child tables for table name and return a list of tables ordered by ancestry so the child tables are always after their parents""" diff --git a/dlt/destinations/athena/athena.py b/dlt/destinations/athena/athena.py index c9d6f3abb7..514d868047 100644 --- a/dlt/destinations/athena/athena.py +++ b/dlt/destinations/athena/athena.py @@ -15,7 +15,7 @@ from dlt.common import logger from dlt.common.utils import without_none from dlt.common.data_types import TDataType -from dlt.common.schema import TColumnSchema, Schema +from dlt.common.schema import TColumnSchema, Schema, TSchemaTables, TTableSchema from dlt.common.schema.typing import TTableSchema, TColumnType, TWriteDisposition from dlt.common.schema.utils import table_schema_has_type, get_table_format from dlt.common.destination import DestinationCapabilitiesContext @@ -325,7 +325,7 @@ def _get_table_update_sql(self, table_name: str, new_columns: Sequence[TColumnSc # for the system tables we need to create empty iceberg tables to be able to run, DELETE and UPDATE queries # or if we are in iceberg mode, we create iceberg tables for all tables - is_iceberg = (self.schema.tables[table_name].get("write_disposition", None) == "skip") or (self._is_iceberg_table(self.schema.tables[table_name]) and not self.in_staging_mode) + is_iceberg = (self.schema.tables[table_name].get("write_disposition", None) == "skip") or self._is_iceberg_table(self.schema.tables[table_name]) columns = ", ".join([self._get_column_def_sql(c) for c in new_columns]) # this will fail if the table prefix is not properly defined @@ -381,6 +381,12 @@ def table_needs_staging(self, table: TTableSchema) -> bool: if self._is_iceberg_table(table): return True return super().table_needs_staging(table) + + def get_load_table(self, table_name: str, staging: bool = False) -> TTableSchema: + table = super().get_load_table(table_name, staging) + if staging and table.get("table_format", None) == "iceberg": + table.pop("table_format") + return table @staticmethod def is_dbapi_exception(ex: Exception) -> bool: diff --git a/dlt/destinations/athena/configuration.py b/dlt/destinations/athena/configuration.py index 7eca85fe41..a7f05e520d 100644 --- a/dlt/destinations/athena/configuration.py +++ b/dlt/destinations/athena/configuration.py @@ -13,6 +13,7 @@ class AthenaClientConfiguration(DestinationClientDwhWithStagingConfiguration): athena_work_group: Optional[str] = None aws_data_catalog: Optional[str] = "awsdatacatalog" supports_truncate_command: bool = False + force_iceberg: Optional[bool] = True __config_gen_annotations__: ClassVar[List[str]] = ["athena_work_group"] diff --git a/dlt/load/load.py b/dlt/load/load.py index ddce9bf8e9..e1873761a6 100644 --- a/dlt/load/load.py +++ b/dlt/load/load.py @@ -10,7 +10,7 @@ from dlt.common.configuration import with_config, known_sections from dlt.common.configuration.accessors import config from dlt.common.pipeline import LoadInfo, SupportsPipeline -from dlt.common.schema.utils import get_child_tables, get_top_level_table, get_load_table +from dlt.common.schema.utils import get_child_tables, get_top_level_table from dlt.common.storages.load_storage import LoadPackageInfo, ParsedLoadJobFileName, TJobState from dlt.common.typing import StrAny from dlt.common.runners import TRunMetrics, Runnable, workermethod @@ -98,7 +98,7 @@ def w_spool_job(self: "Load", file_path: str, load_id: str, schema: Schema) -> O if job_info.file_format not in self.load_storage.supported_file_formats: raise LoadClientUnsupportedFileFormats(job_info.file_format, self.capabilities.supported_loader_file_formats, file_path) logger.info(f"Will load file {file_path} with table name {job_info.table_name}") - table = get_load_table(schema.tables, job_info.table_name) + table = job_client.get_load_table(job_info.table_name) if table["write_disposition"] not in ["append", "replace", "merge"]: raise LoadClientUnsupportedWriteDisposition(job_info.table_name, table["write_disposition"], file_path) with self.maybe_with_staging_dataset(job_client, table): diff --git a/tests/load/test_dummy_client.py b/tests/load/test_dummy_client.py index dcea7bd94d..aaa89ebfb1 100644 --- a/tests/load/test_dummy_client.py +++ b/tests/load/test_dummy_client.py @@ -100,16 +100,7 @@ def test_get_new_jobs_info() -> None: ) # no write disposition specified - get all new jobs - assert len(load.get_new_jobs_info(load_id, schema)) == 2 - # empty list - none - assert len(load.get_new_jobs_info(load_id, schema, [])) == 0 - # two appends - assert len(load.get_new_jobs_info(load_id, schema, ["append"])) == 2 - assert len(load.get_new_jobs_info(load_id, schema, ["replace"])) == 0 - assert len(load.get_new_jobs_info(load_id, schema, ["replace", "append"])) == 2 - - load.load_storage.start_job(load_id, "event_loop_interrupted.839c6e6b514e427687586ccc65bf133f.0.jsonl") - assert len(load.get_new_jobs_info(load_id, schema, ["replace", "append"])) == 1 + assert len(load.get_new_jobs_info(load_id)) == 2 def test_get_completed_table_chain_single_job_per_table() -> None: diff --git a/tests/load/utils.py b/tests/load/utils.py index 9fd4f033b7..60f389d064 100644 --- a/tests/load/utils.py +++ b/tests/load/utils.py @@ -17,7 +17,7 @@ from dlt.common.data_writers import DataWriter from dlt.common.schema import TColumnSchema, TTableSchemaColumns, Schema from dlt.common.storages import SchemaStorage, FileStorage, SchemaStorageConfiguration -from dlt.common.schema.utils import new_table, get_load_table +from dlt.common.schema.utils import new_table from dlt.common.storages.load_storage import ParsedLoadJobFileName, LoadStorage from dlt.common.typing import StrAny from dlt.common.utils import uniq_id @@ -26,7 +26,7 @@ from dlt.destinations.sql_client import SqlClientBase from dlt.destinations.job_client_impl import SqlJobClientBase -from tests.utils import ACTIVE_DESTINATIONS, IMPLEMENTED_DESTINATIONS, SQL_DESTINATIONS +from tests.utils import ACTIVE_DESTINATIONS, IMPLEMENTED_DESTINATIONS, SQL_DESTINATIONS, EXCLUDED_DESTINATION_CONFIGURATIONS from tests.cases import TABLE_UPDATE_COLUMNS_SCHEMA, TABLE_UPDATE, TABLE_ROW_ALL_DATA_TYPES, assert_all_data_types_row # bucket urls @@ -53,6 +53,7 @@ class DestinationTestConfiguration: staging_iam_role: Optional[str] = None extra_info: Optional[str] = None supports_merge: bool = True # TODO: take it from client base class + force_iceberg: bool = True @property def name(self) -> str: @@ -72,6 +73,7 @@ def setup(self) -> None: os.environ['DESTINATION__FILESYSTEM__BUCKET_URL'] = self.bucket_url or "" os.environ['DESTINATION__STAGE_NAME'] = self.stage_name or "" os.environ['DESTINATION__STAGING_IAM_ROLE'] = self.staging_iam_role or "" + os.environ['DESTINATION__ATHENA__FORCE_ICEBERG'] = str(self.force_iceberg) or "" """For the filesystem destinations we disable compression to make analyzing the result easier""" if self.destination == "filesystem": @@ -108,6 +110,7 @@ def destinations_configs( destination_configs += [DestinationTestConfiguration(destination=destination) for destination in SQL_DESTINATIONS if destination != "athena"] # athena needs filesystem staging, which will be automatically set, we have to supply a bucket url though destination_configs += [DestinationTestConfiguration(destination="athena", supports_merge=False, bucket_url=AWS_BUCKET)] + destination_configs += [DestinationTestConfiguration(destination="athena", staging="filesystem", file_format="parquet", bucket_url=AWS_BUCKET, force_iceberg=True, supports_merge=False, extra_info="iceberg")] if default_vector_configs: # for now only weaviate @@ -170,7 +173,7 @@ def load_table(name: str) -> Dict[str, TTableSchemaColumns]: def expect_load_file(client: JobClientBase, file_storage: FileStorage, query: str, table_name: str, status = "completed") -> LoadJob: file_name = ParsedLoadJobFileName(table_name, uniq_id(), 0, client.capabilities.preferred_loader_file_format).job_id() file_storage.save(file_name, query.encode("utf-8")) - table = get_load_table(client.schema.tables, table_name) + table = client.get_load_table(table_name) job = client.start_file_load(table, file_storage.make_full_path(file_name), uniq_id()) while job.state() == "running": sleep(0.5) diff --git a/tests/utils.py b/tests/utils.py index 7321049c9d..2a0238cdfe 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -31,6 +31,10 @@ NON_SQL_DESTINATIONS = {"filesystem", "weaviate", "dummy", "motherduck"} SQL_DESTINATIONS = IMPLEMENTED_DESTINATIONS - NON_SQL_DESTINATIONS +# exclude destination configs (for now used for athena and athena iceberg separation) +EXCLUDED_DESTINATION_CONFIGURATIONS = set(dlt.config.get("EXCLUDED_DESTINATION_CONFIGURATIONS", list) or set()) + + # filter out active destinations for current tests ACTIVE_DESTINATIONS = set(dlt.config.get("ACTIVE_DESTINATIONS", list) or IMPLEMENTED_DESTINATIONS)