diff --git a/.github/workflows/test_destination_athena.yml b/.github/workflows/test_destination_athena.yml index 16c9caff53..704e66522b 100644 --- a/.github/workflows/test_destination_athena.yml +++ b/.github/workflows/test_destination_athena.yml @@ -21,6 +21,7 @@ env: RUNTIME__DLTHUB_TELEMETRY_SEGMENT_WRITE_KEY: TLJiyRkGVZGCi2TtjClamXpFcxAA1rSB ACTIVE_DESTINATIONS: "[\"athena\"]" ALL_FILESYSTEM_DRIVERS: "[\"memory\"]" + EXCLUDED_DESTINATION_CONFIGURATIONS: "[\"athena-parquet-staging-iceberg\"]" jobs: get_docs_changes: diff --git a/.github/workflows/test_destination_athena_iceberg.yml b/.github/workflows/test_destination_athena_iceberg.yml new file mode 100644 index 0000000000..6892a96bf1 --- /dev/null +++ b/.github/workflows/test_destination_athena_iceberg.yml @@ -0,0 +1,94 @@ + +name: test athena iceberg + +on: + pull_request: + branches: + - master + - devel + workflow_dispatch: + +env: + DESTINATION__FILESYSTEM__CREDENTIALS__AWS_ACCESS_KEY_ID: AKIAT4QMVMC4J46G55G4 + DESTINATION__FILESYSTEM__CREDENTIALS__AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + DESTINATION__ATHENA__CREDENTIALS__AWS_ACCESS_KEY_ID: AKIAT4QMVMC4J46G55G4 + DESTINATION__ATHENA__CREDENTIALS__AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + DESTINATION__ATHENA__CREDENTIALS__REGION_NAME: eu-central-1 + DESTINATION__ATHENA__QUERY_RESULT_BUCKET: s3://dlt-athena-output + + RUNTIME__SENTRY_DSN: https://6f6f7b6f8e0f458a89be4187603b55fe@o1061158.ingest.sentry.io/4504819859914752 + RUNTIME__LOG_LEVEL: ERROR + RUNTIME__DLTHUB_TELEMETRY_SEGMENT_WRITE_KEY: TLJiyRkGVZGCi2TtjClamXpFcxAA1rSB + ACTIVE_DESTINATIONS: "[\"athena\"]" + ALL_FILESYSTEM_DRIVERS: "[\"memory\"]" + EXCLUDED_DESTINATION_CONFIGURATIONS: "[\"athena-no-staging\"]" + +jobs: + get_docs_changes: + uses: ./.github/workflows/get_docs_changes.yml + # Tests that require credentials do not run in forks + if: ${{ !github.event.pull_request.head.repo.fork }} + + run_loader: + name: test destination athena iceberg + needs: get_docs_changes + if: needs.get_docs_changes.outputs.changes_outside_docs == 'true' + strategy: + fail-fast: false + matrix: + os: ["ubuntu-latest"] + # os: ["ubuntu-latest", "macos-latest", "windows-latest"] + defaults: + run: + shell: bash + runs-on: ${{ matrix.os }} + + steps: + + - name: Check out + uses: actions/checkout@master + + - name: Setup Python + uses: actions/setup-python@v4 + with: + python-version: "3.10.x" + + - name: Install Poetry + uses: snok/install-poetry@v1.3.2 + with: + virtualenvs-create: true + virtualenvs-in-project: true + installer-parallel: true + + - name: Load cached venv + id: cached-poetry-dependencies + uses: actions/cache@v3 + with: + # path: ${{ steps.pip-cache.outputs.dir }} + path: .venv + key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-athena + + - name: Install dependencies + # if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true' + run: poetry install --no-interaction -E athena + + - run: | + poetry run pytest tests/load + if: runner.os != 'Windows' + name: Run tests Linux/MAC + - run: | + poetry run pytest tests/load + if: runner.os == 'Windows' + name: Run tests Windows + shell: cmd + + matrix_job_required_check: + name: Redshift, PostgreSQL and DuckDB tests + needs: run_loader + runs-on: ubuntu-latest + if: always() + steps: + - name: Check matrix job results + if: contains(needs.*.result, 'failure') || contains(needs.*.result, 'cancelled') + run: | + echo "One or more matrix job tests failed or were cancelled. You may need to re-run them." && exit 1 diff --git a/dlt/common/destination/reference.py b/dlt/common/destination/reference.py index 6f5689ed20..c2dc0ebcc6 100644 --- a/dlt/common/destination/reference.py +++ b/dlt/common/destination/reference.py @@ -289,7 +289,7 @@ def _verify_schema(self) -> None: if not is_complete_column(column): logger.warning(f"A column {column_name} in table {table_name} in schema {self.schema.name} is incomplete. It was not bound to the data during normalizations stage and its data type is unknown. Did you add this column manually in code ie. as a merge key?") - def get_load_table(self, table_name: str, staging: bool = False) -> TTableSchema: + def get_load_table(self, table_name: str, prepare_for_staging: bool = False) -> TTableSchema: try: # make a copy of the schema so modifications do not affect the original document table = deepcopy(self.schema.tables[table_name]) diff --git a/dlt/common/schema/utils.py b/dlt/common/schema/utils.py index cc265891d1..b89925d9b0 100644 --- a/dlt/common/schema/utils.py +++ b/dlt/common/schema/utils.py @@ -511,11 +511,11 @@ def get_inherited_table_hint(tables: TSchemaTables, table_name: str, table_hint_ def get_write_disposition(tables: TSchemaTables, table_name: str) -> TWriteDisposition: """Returns table hint of a table if present. If not, looks up into parent table""" - return get_inherited_table_hint(tables, table_name, "write_disposition", allow_none=False) + return cast(TWriteDisposition, get_inherited_table_hint(tables, table_name, "write_disposition", allow_none=False)) def get_table_format(tables: TSchemaTables, table_name: str) -> TTableFormat: - return get_inherited_table_hint(tables, table_name, "table_format", allow_none=True) + return cast(TTableFormat, get_inherited_table_hint(tables, table_name, "table_format", allow_none=True)) def table_schema_has_type(table: TTableSchema, _typ: TDataType) -> bool: diff --git a/dlt/destinations/athena/configuration.py b/dlt/destinations/athena/configuration.py index 7eca85fe41..a7f05e520d 100644 --- a/dlt/destinations/athena/configuration.py +++ b/dlt/destinations/athena/configuration.py @@ -13,6 +13,7 @@ class AthenaClientConfiguration(DestinationClientDwhWithStagingConfiguration): athena_work_group: Optional[str] = None aws_data_catalog: Optional[str] = "awsdatacatalog" supports_truncate_command: bool = False + force_iceberg: Optional[bool] = True __config_gen_annotations__: ClassVar[List[str]] = ["athena_work_group"] diff --git a/dlt/destinations/job_client_impl.py b/dlt/destinations/job_client_impl.py index e2768dcf76..ec3afced94 100644 --- a/dlt/destinations/job_client_impl.py +++ b/dlt/destinations/job_client_impl.py @@ -429,10 +429,16 @@ def _commit_schema_update(self, schema: Schema, schema_str: str) -> None: class SqlJobClientWithStaging(SqlJobClientBase, WithStagingDataset): + in_staging_mode: bool = False + @contextlib.contextmanager def with_staging_dataset(self)-> Iterator["SqlJobClientBase"]: - with self.sql_client.with_staging_dataset(True): - yield self + try: + with self.sql_client.with_staging_dataset(True): + self.in_staging_mode = True + yield self + finally: + self.in_staging_mode = False def table_needs_staging(self, table: TTableSchema) -> bool: if table["write_disposition"] == "merge": diff --git a/dlt/load/load.py b/dlt/load/load.py index c57d18fa0f..e1873761a6 100644 --- a/dlt/load/load.py +++ b/dlt/load/load.py @@ -101,9 +101,6 @@ def w_spool_job(self: "Load", file_path: str, load_id: str, schema: Schema) -> O table = job_client.get_load_table(job_info.table_name) if table["write_disposition"] not in ["append", "replace", "merge"]: raise LoadClientUnsupportedWriteDisposition(job_info.table_name, table["write_disposition"], file_path) - - table_needs_staging = isinstance(job_client, WithStagingDataset) and job_client.table_needs_staging(table) - table = job_client.get_load_table(job_info.table_name, table_needs_staging) with self.maybe_with_staging_dataset(job_client, table): job = job_client.start_file_load(table, self.load_storage.storage.make_full_path(file_path), load_id) except (DestinationTerminalException, TerminalValueError): diff --git a/tests/load/test_dummy_client.py b/tests/load/test_dummy_client.py index dcea7bd94d..aaa89ebfb1 100644 --- a/tests/load/test_dummy_client.py +++ b/tests/load/test_dummy_client.py @@ -100,16 +100,7 @@ def test_get_new_jobs_info() -> None: ) # no write disposition specified - get all new jobs - assert len(load.get_new_jobs_info(load_id, schema)) == 2 - # empty list - none - assert len(load.get_new_jobs_info(load_id, schema, [])) == 0 - # two appends - assert len(load.get_new_jobs_info(load_id, schema, ["append"])) == 2 - assert len(load.get_new_jobs_info(load_id, schema, ["replace"])) == 0 - assert len(load.get_new_jobs_info(load_id, schema, ["replace", "append"])) == 2 - - load.load_storage.start_job(load_id, "event_loop_interrupted.839c6e6b514e427687586ccc65bf133f.0.jsonl") - assert len(load.get_new_jobs_info(load_id, schema, ["replace", "append"])) == 1 + assert len(load.get_new_jobs_info(load_id)) == 2 def test_get_completed_table_chain_single_job_per_table() -> None: diff --git a/tests/load/utils.py b/tests/load/utils.py index 2af16af496..60f389d064 100644 --- a/tests/load/utils.py +++ b/tests/load/utils.py @@ -26,7 +26,7 @@ from dlt.destinations.sql_client import SqlClientBase from dlt.destinations.job_client_impl import SqlJobClientBase -from tests.utils import ACTIVE_DESTINATIONS, IMPLEMENTED_DESTINATIONS, SQL_DESTINATIONS +from tests.utils import ACTIVE_DESTINATIONS, IMPLEMENTED_DESTINATIONS, SQL_DESTINATIONS, EXCLUDED_DESTINATION_CONFIGURATIONS from tests.cases import TABLE_UPDATE_COLUMNS_SCHEMA, TABLE_UPDATE, TABLE_ROW_ALL_DATA_TYPES, assert_all_data_types_row # bucket urls @@ -53,6 +53,7 @@ class DestinationTestConfiguration: staging_iam_role: Optional[str] = None extra_info: Optional[str] = None supports_merge: bool = True # TODO: take it from client base class + force_iceberg: bool = True @property def name(self) -> str: @@ -72,6 +73,7 @@ def setup(self) -> None: os.environ['DESTINATION__FILESYSTEM__BUCKET_URL'] = self.bucket_url or "" os.environ['DESTINATION__STAGE_NAME'] = self.stage_name or "" os.environ['DESTINATION__STAGING_IAM_ROLE'] = self.staging_iam_role or "" + os.environ['DESTINATION__ATHENA__FORCE_ICEBERG'] = str(self.force_iceberg) or "" """For the filesystem destinations we disable compression to make analyzing the result easier""" if self.destination == "filesystem": @@ -108,6 +110,7 @@ def destinations_configs( destination_configs += [DestinationTestConfiguration(destination=destination) for destination in SQL_DESTINATIONS if destination != "athena"] # athena needs filesystem staging, which will be automatically set, we have to supply a bucket url though destination_configs += [DestinationTestConfiguration(destination="athena", supports_merge=False, bucket_url=AWS_BUCKET)] + destination_configs += [DestinationTestConfiguration(destination="athena", staging="filesystem", file_format="parquet", bucket_url=AWS_BUCKET, force_iceberg=True, supports_merge=False, extra_info="iceberg")] if default_vector_configs: # for now only weaviate diff --git a/tests/utils.py b/tests/utils.py index 7321049c9d..2a0238cdfe 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -31,6 +31,10 @@ NON_SQL_DESTINATIONS = {"filesystem", "weaviate", "dummy", "motherduck"} SQL_DESTINATIONS = IMPLEMENTED_DESTINATIONS - NON_SQL_DESTINATIONS +# exclude destination configs (for now used for athena and athena iceberg separation) +EXCLUDED_DESTINATION_CONFIGURATIONS = set(dlt.config.get("EXCLUDED_DESTINATION_CONFIGURATIONS", list) or set()) + + # filter out active destinations for current tests ACTIVE_DESTINATIONS = set(dlt.config.get("ACTIVE_DESTINATIONS", list) or IMPLEMENTED_DESTINATIONS)