Skip to content

Commit

Permalink
PR changes
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Oct 11, 2023
1 parent ab8a01c commit 3d0c4ed
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 19 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test_destination_athena.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ env:
RUNTIME__DLTHUB_TELEMETRY_SEGMENT_WRITE_KEY: TLJiyRkGVZGCi2TtjClamXpFcxAA1rSB
ACTIVE_DESTINATIONS: "[\"athena\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"
EXCLUDED_DESTINATION_CONFIGURATIONS: "[\"athena-parquet-staging-iceberg\"]"

jobs:
get_docs_changes:
Expand Down
94 changes: 94 additions & 0 deletions .github/workflows/test_destination_athena_iceberg.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@

name: test athena iceberg

on:
pull_request:
branches:
- master
- devel
workflow_dispatch:

env:
DESTINATION__FILESYSTEM__CREDENTIALS__AWS_ACCESS_KEY_ID: AKIAT4QMVMC4J46G55G4
DESTINATION__FILESYSTEM__CREDENTIALS__AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
DESTINATION__ATHENA__CREDENTIALS__AWS_ACCESS_KEY_ID: AKIAT4QMVMC4J46G55G4
DESTINATION__ATHENA__CREDENTIALS__AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
DESTINATION__ATHENA__CREDENTIALS__REGION_NAME: eu-central-1
DESTINATION__ATHENA__QUERY_RESULT_BUCKET: s3://dlt-athena-output

RUNTIME__SENTRY_DSN: https://[email protected]/4504819859914752
RUNTIME__LOG_LEVEL: ERROR
RUNTIME__DLTHUB_TELEMETRY_SEGMENT_WRITE_KEY: TLJiyRkGVZGCi2TtjClamXpFcxAA1rSB
ACTIVE_DESTINATIONS: "[\"athena\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"
EXCLUDED_DESTINATION_CONFIGURATIONS: "[\"athena-no-staging\"]"

jobs:
get_docs_changes:
uses: ./.github/workflows/get_docs_changes.yml
# Tests that require credentials do not run in forks
if: ${{ !github.event.pull_request.head.repo.fork }}

run_loader:
name: test destination athena iceberg
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
strategy:
fail-fast: false
matrix:
os: ["ubuntu-latest"]
# os: ["ubuntu-latest", "macos-latest", "windows-latest"]
defaults:
run:
shell: bash
runs-on: ${{ matrix.os }}

steps:

- name: Check out
uses: actions/checkout@master

- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: "3.10.x"

- name: Install Poetry
uses: snok/[email protected]
with:
virtualenvs-create: true
virtualenvs-in-project: true
installer-parallel: true

- name: Load cached venv
id: cached-poetry-dependencies
uses: actions/cache@v3
with:
# path: ${{ steps.pip-cache.outputs.dir }}
path: .venv
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-athena

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E athena

- run: |
poetry run pytest tests/load
if: runner.os != 'Windows'
name: Run tests Linux/MAC
- run: |
poetry run pytest tests/load
if: runner.os == 'Windows'
name: Run tests Windows
shell: cmd
matrix_job_required_check:
name: Redshift, PostgreSQL and DuckDB tests
needs: run_loader
runs-on: ubuntu-latest
if: always()
steps:
- name: Check matrix job results
if: contains(needs.*.result, 'failure') || contains(needs.*.result, 'cancelled')
run: |
echo "One or more matrix job tests failed or were cancelled. You may need to re-run them." && exit 1
2 changes: 1 addition & 1 deletion dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ def _verify_schema(self) -> None:
if not is_complete_column(column):
logger.warning(f"A column {column_name} in table {table_name} in schema {self.schema.name} is incomplete. It was not bound to the data during normalizations stage and its data type is unknown. Did you add this column manually in code ie. as a merge key?")

def get_load_table(self, table_name: str, staging: bool = False) -> TTableSchema:
def get_load_table(self, table_name: str, prepare_for_staging: bool = False) -> TTableSchema:
try:
# make a copy of the schema so modifications do not affect the original document
table = deepcopy(self.schema.tables[table_name])
Expand Down
4 changes: 2 additions & 2 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,11 +511,11 @@ def get_inherited_table_hint(tables: TSchemaTables, table_name: str, table_hint_

def get_write_disposition(tables: TSchemaTables, table_name: str) -> TWriteDisposition:
"""Returns table hint of a table if present. If not, looks up into parent table"""
return get_inherited_table_hint(tables, table_name, "write_disposition", allow_none=False)
return cast(TWriteDisposition, get_inherited_table_hint(tables, table_name, "write_disposition", allow_none=False))


def get_table_format(tables: TSchemaTables, table_name: str) -> TTableFormat:
return get_inherited_table_hint(tables, table_name, "table_format", allow_none=True)
return cast(TTableFormat, get_inherited_table_hint(tables, table_name, "table_format", allow_none=True))


def table_schema_has_type(table: TTableSchema, _typ: TDataType) -> bool:
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/athena/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class AthenaClientConfiguration(DestinationClientDwhWithStagingConfiguration):
athena_work_group: Optional[str] = None
aws_data_catalog: Optional[str] = "awsdatacatalog"
supports_truncate_command: bool = False
force_iceberg: Optional[bool] = True

__config_gen_annotations__: ClassVar[List[str]] = ["athena_work_group"]

Expand Down
10 changes: 8 additions & 2 deletions dlt/destinations/job_client_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,10 +429,16 @@ def _commit_schema_update(self, schema: Schema, schema_str: str) -> None:

class SqlJobClientWithStaging(SqlJobClientBase, WithStagingDataset):

in_staging_mode: bool = False

@contextlib.contextmanager
def with_staging_dataset(self)-> Iterator["SqlJobClientBase"]:
with self.sql_client.with_staging_dataset(True):
yield self
try:
with self.sql_client.with_staging_dataset(True):
self.in_staging_mode = True
yield self
finally:
self.in_staging_mode = False

def table_needs_staging(self, table: TTableSchema) -> bool:
if table["write_disposition"] == "merge":
Expand Down
3 changes: 0 additions & 3 deletions dlt/load/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,6 @@ def w_spool_job(self: "Load", file_path: str, load_id: str, schema: Schema) -> O
table = job_client.get_load_table(job_info.table_name)
if table["write_disposition"] not in ["append", "replace", "merge"]:
raise LoadClientUnsupportedWriteDisposition(job_info.table_name, table["write_disposition"], file_path)

table_needs_staging = isinstance(job_client, WithStagingDataset) and job_client.table_needs_staging(table)
table = job_client.get_load_table(job_info.table_name, table_needs_staging)
with self.maybe_with_staging_dataset(job_client, table):
job = job_client.start_file_load(table, self.load_storage.storage.make_full_path(file_path), load_id)
except (DestinationTerminalException, TerminalValueError):
Expand Down
11 changes: 1 addition & 10 deletions tests/load/test_dummy_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,7 @@ def test_get_new_jobs_info() -> None:
)

# no write disposition specified - get all new jobs
assert len(load.get_new_jobs_info(load_id, schema)) == 2
# empty list - none
assert len(load.get_new_jobs_info(load_id, schema, [])) == 0
# two appends
assert len(load.get_new_jobs_info(load_id, schema, ["append"])) == 2
assert len(load.get_new_jobs_info(load_id, schema, ["replace"])) == 0
assert len(load.get_new_jobs_info(load_id, schema, ["replace", "append"])) == 2

load.load_storage.start_job(load_id, "event_loop_interrupted.839c6e6b514e427687586ccc65bf133f.0.jsonl")
assert len(load.get_new_jobs_info(load_id, schema, ["replace", "append"])) == 1
assert len(load.get_new_jobs_info(load_id)) == 2


def test_get_completed_table_chain_single_job_per_table() -> None:
Expand Down
5 changes: 4 additions & 1 deletion tests/load/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from dlt.destinations.sql_client import SqlClientBase
from dlt.destinations.job_client_impl import SqlJobClientBase

from tests.utils import ACTIVE_DESTINATIONS, IMPLEMENTED_DESTINATIONS, SQL_DESTINATIONS
from tests.utils import ACTIVE_DESTINATIONS, IMPLEMENTED_DESTINATIONS, SQL_DESTINATIONS, EXCLUDED_DESTINATION_CONFIGURATIONS
from tests.cases import TABLE_UPDATE_COLUMNS_SCHEMA, TABLE_UPDATE, TABLE_ROW_ALL_DATA_TYPES, assert_all_data_types_row

# bucket urls
Expand All @@ -53,6 +53,7 @@ class DestinationTestConfiguration:
staging_iam_role: Optional[str] = None
extra_info: Optional[str] = None
supports_merge: bool = True # TODO: take it from client base class
force_iceberg: bool = True

@property
def name(self) -> str:
Expand All @@ -72,6 +73,7 @@ def setup(self) -> None:
os.environ['DESTINATION__FILESYSTEM__BUCKET_URL'] = self.bucket_url or ""
os.environ['DESTINATION__STAGE_NAME'] = self.stage_name or ""
os.environ['DESTINATION__STAGING_IAM_ROLE'] = self.staging_iam_role or ""
os.environ['DESTINATION__ATHENA__FORCE_ICEBERG'] = str(self.force_iceberg) or ""

"""For the filesystem destinations we disable compression to make analyzing the result easier"""
if self.destination == "filesystem":
Expand Down Expand Up @@ -108,6 +110,7 @@ def destinations_configs(
destination_configs += [DestinationTestConfiguration(destination=destination) for destination in SQL_DESTINATIONS if destination != "athena"]
# athena needs filesystem staging, which will be automatically set, we have to supply a bucket url though
destination_configs += [DestinationTestConfiguration(destination="athena", supports_merge=False, bucket_url=AWS_BUCKET)]
destination_configs += [DestinationTestConfiguration(destination="athena", staging="filesystem", file_format="parquet", bucket_url=AWS_BUCKET, force_iceberg=True, supports_merge=False, extra_info="iceberg")]

if default_vector_configs:
# for now only weaviate
Expand Down
4 changes: 4 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
NON_SQL_DESTINATIONS = {"filesystem", "weaviate", "dummy", "motherduck"}
SQL_DESTINATIONS = IMPLEMENTED_DESTINATIONS - NON_SQL_DESTINATIONS

# exclude destination configs (for now used for athena and athena iceberg separation)
EXCLUDED_DESTINATION_CONFIGURATIONS = set(dlt.config.get("EXCLUDED_DESTINATION_CONFIGURATIONS", list) or set())


# filter out active destinations for current tests
ACTIVE_DESTINATIONS = set(dlt.config.get("ACTIVE_DESTINATIONS", list) or IMPLEMENTED_DESTINATIONS)

Expand Down

0 comments on commit 3d0c4ed

Please sign in to comment.