Skip to content

Commit

Permalink
Merge branch 'devel' into d#/data_sink_decorator
Browse files Browse the repository at this point in the history
# Conflicts:
#	dlt/common/storages/load_package.py
#	dlt/destinations/impl/athena/athena.py
#	dlt/destinations/impl/bigquery/bigquery.py
#	dlt/load/load.py
  • Loading branch information
sh-rp committed Mar 4, 2024
2 parents 27b8b2c + fc34dd0 commit e60f2f1
Show file tree
Hide file tree
Showing 213 changed files with 9,267 additions and 2,514 deletions.
16 changes: 16 additions & 0 deletions .github/workflows/deploy_docs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
name: deploy docs

on:
schedule:
- cron: '0 2 * * *'
workflow_dispatch:

env:
NETLIFY_DOCS_PRODUCTION_DEPLOY_HOOK: ${{ secrets.NETLIFY_DOCS_PRODUCTION_DEPLOY_HOOK }}

jobs:
deploy:
runs-on: ubuntu-latest
steps:
- name: Trigger deploy hook
run: curl ${{ env.NETLIFY_DOCS_PRODUCTION_DEPLOY_HOOK }} -X POST
2 changes: 1 addition & 1 deletion .github/workflows/get_docs_changes.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ on:
value: ${{ jobs.get_docs_changes.outputs.changes_outside_docs }}

env:
EXCLUDED_FILE_PATTERNS: '^docs/|^README.md|^LICENSE\.txt|\.editorconfig|\.gitignore'
EXCLUDED_FILE_PATTERNS: '^docs/|^README.md|^CONTRIBUTING.md|^LICENSE\.txt|\.editorconfig|\.gitignore|get_docs_changes.yml'


jobs:
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/test_common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ jobs:
os: "ubuntu-latest"
- python-version: "3.10.x"
os: "ubuntu-latest"
- python-version: "3.12.x"
os: "ubuntu-latest"

defaults:
run:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destination_athena_iceberg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ jobs:

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E --with sentry-sdk --with pipeline
run: poetry install --no-interaction -E athena --with sentry-sdk --with pipeline

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/test_doc_snippets.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ env:

# Slack hook for chess in production example
RUNTIME__SLACK_INCOMING_HOOK: ${{ secrets.RUNTIME__SLACK_INCOMING_HOOK }}

# detect if the workflow is executed in a repo fork
IS_FORK: ${{ github.event.pull_request.head.repo.fork }}
jobs:

run_lint:
Expand Down
12 changes: 6 additions & 6 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ When you're ready to contribute, follow these steps:

We use **devel** (which is our default Github branch) to prepare a next release of `dlt`. We accept all regular contributions there (including most of the bugfixes).

We use **master** branch for hot fixes (including documentation) that needs to be released out of normal schedule.
We use **master** branch for hot fixes (including documentation) that needs to be released out of the normal schedule.

On the release day, **devel** branch is merged into **master**. All releases of `dlt` happen only from the **master**.

### Submitting a hotfix
We'll fix critical bugs and release `dlt` our of the schedule. Follow the regular procedure, but make your PR against **master** branch. Please ping us on Slack if you do it.
We'll fix critical bugs and release `dlt` out of the schedule. Follow the regular procedure, but make your PR against **master** branch. Please ping us on Slack if you do it.

### Testing with Github Actions
We enable our CI to run tests for contributions from forks. All the tests are run, but not all destinations are available due to credentials. Currently
Expand Down Expand Up @@ -71,7 +71,7 @@ To test local destinations (`duckdb` and `postgres`), run `make test-load-local`

### External Destinations

To test external destinations use `make test`. You will need following external resources
To test external destinations use `make test`. You will need the following external resources

1. `BigQuery` project
2. `Redshift` cluster
Expand All @@ -95,7 +95,7 @@ This section is intended for project maintainers who have the necessary permissi

Please read how we [version the library](README.md#adding-as-dependency) first.

The source of truth of the current version is is `pyproject.toml`, and we use `poetry` to manage it.
The source of truth for the current version is `pyproject.toml`, and we use `poetry` to manage it.

### Regular release

Expand All @@ -104,14 +104,14 @@ Before publishing a new release, make sure to bump the project's version accordi
1. Check out the **devel** branch.
2. Use `poetry version patch` to increase the **patch** version
3. Run `make build-library` to apply the changes to the project.
4. Create a new branch, and submit the PR to **devel**. Go through standard process to merge it.
4. Create a new branch, and submit the PR to **devel**. Go through the standard process to merge it.
5. Create a merge PR from `devel` to `master` and merge it with a merge commit.

### Hotfix release
1. Check out the **master** branch
2. Use `poetry version patch` to increase the **patch** version
3. Run `make build-library` to apply the changes to the project.
4. Create a new branch, and submit the PR to **master** and merge it.
4. Create a new branch, submit the PR to **master** and merge it.

### Pre-release
Occasionally we may release an alpha version directly from the **branch**.
Expand Down
11 changes: 6 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ help:
@echo " test"
@echo " tests all the components including destinations"
@echo " test-load-local"
@echo " tests all components unsing local destinations: duckdb and postgres"
@echo " tests all components using local destinations: duckdb and postgres"
@echo " test-common"
@echo " tests common components"
@echo " test-and-lint-snippets"
Expand Down Expand Up @@ -74,7 +74,7 @@ test-load-local:
DESTINATION__POSTGRES__CREDENTIALS=postgresql://loader:loader@localhost:5432/dlt_data DESTINATION__DUCKDB__CREDENTIALS=duckdb:///_storage/test_quack.duckdb poetry run pytest tests -k '(postgres or duckdb)'

test-common:
poetry run pytest tests/common tests/normalize tests/extract tests/pipeline tests/reflection tests/sources tests/cli/common
poetry run pytest tests/common tests/normalize tests/extract tests/pipeline tests/reflection tests/sources tests/cli/common tests/load/test_dummy_client.py tests/libs tests/destinations

reset-test-storage:
-rm -r _storage
Expand All @@ -89,9 +89,10 @@ publish-library: build-library
poetry publish

test-build-images: build-library
poetry export -f requirements.txt --output _gen_requirements.txt --without-hashes --extras gcp --extras redshift
grep `cat compiled_packages.txt` _gen_requirements.txt > compiled_requirements.txt
# TODO: enable when we can remove special duckdb setting for python 3.12
# poetry export -f requirements.txt --output _gen_requirements.txt --without-hashes --extras gcp --extras redshift
# grep `cat compiled_packages.txt` _gen_requirements.txt > compiled_requirements.txt
docker build -f deploy/dlt/Dockerfile.airflow --build-arg=COMMIT_SHA="$(shell git log -1 --pretty=%h)" --build-arg=IMAGE_VERSION="$(shell poetry version -s)" .
docker build -f deploy/dlt/Dockerfile --build-arg=COMMIT_SHA="$(shell git log -1 --pretty=%h)" --build-arg=IMAGE_VERSION="$(shell poetry version -s)" .
# docker build -f deploy/dlt/Dockerfile --build-arg=COMMIT_SHA="$(shell git log -1 --pretty=%h)" --build-arg=IMAGE_VERSION="$(shell poetry version -s)" .


42 changes: 40 additions & 2 deletions dlt/common/configuration/providers/google_secrets.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,38 @@
import base64
import string
import re
from typing import Tuple

from dlt.common import json
from dlt.common.configuration.specs import GcpServiceAccountCredentials
from dlt.common.exceptions import MissingDependencyException

from .toml import VaultTomlProvider
from .provider import get_key_name

# Create a translation table to replace punctuation with ""
# since google secrets allow "-"" and "_" we need to exclude them
punctuation = "".join(set(string.punctuation) - {"-", "_"})
translator = str.maketrans("", "", punctuation)


def normalize_key(in_string: str) -> str:
"""Replaces punctuation characters in a string
Note: We exclude `_` and `-` from punctuation characters
Args:
in_string(str): input string
Returns:
(str): a string without punctuatio characters and whitespaces
"""

# Strip punctuation from the string
stripped_text = in_string.translate(translator)
whitespace = re.compile(r"\s+")
stripped_whitespace = whitespace.sub("", stripped_text)
return stripped_whitespace


class GoogleSecretsProvider(VaultTomlProvider):
def __init__(
Expand All @@ -20,7 +46,19 @@ def __init__(

@staticmethod
def get_key_name(key: str, *sections: str) -> str:
return get_key_name(key, "-", *sections)
"""Make key name for the secret
Per Google the secret name can contain, so we will use snake_case normalizer
1. Uppercase and lowercase letters,
2. Numerals,
3. Hyphens,
4. Underscores.
"""
key = normalize_key(key)
normalized_sections = [normalize_key(section) for section in sections if section]
key_name = get_key_name(normalize_key(key), "-", *normalized_sections)
return key_name

@property
def name(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from typing import Any, ClassVar, Dict, List, Optional
from sqlalchemy.engine import URL, make_url
from dlt.common.libs.sql_alchemy import URL, make_url
from dlt.common.configuration.specs.exceptions import InvalidConnectionString

from dlt.common.typing import TSecretValue
Expand All @@ -26,6 +26,7 @@ def parse_native_representation(self, native_value: Any) -> None:
# update only values that are not None
self.update({k: v for k, v in url._asdict().items() if v is not None})
if self.query is not None:
# query may be immutable so make it mutable
self.query = dict(self.query)
except Exception:
raise InvalidConnectionString(self.__class__, native_value, self.drivername)
Expand Down
10 changes: 5 additions & 5 deletions dlt/common/data_types/type_helpers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import binascii
import base64
import dataclasses
import datetime # noqa: I251
from collections.abc import Mapping as C_Mapping, Sequence as C_Sequence
from typing import Any, Type, Literal, Union, cast
Expand All @@ -12,7 +13,6 @@
from dlt.common.data_types.typing import TDataType
from dlt.common.time import (
ensure_pendulum_datetime,
parse_iso_like_datetime,
ensure_pendulum_date,
ensure_pendulum_time,
)
Expand Down Expand Up @@ -55,7 +55,7 @@ def py_type_to_sc_type(t: Type[Any]) -> TDataType:
return "bigint"
if issubclass(t, bytes):
return "binary"
if issubclass(t, (C_Mapping, C_Sequence)):
if dataclasses.is_dataclass(t) or issubclass(t, (C_Mapping, C_Sequence)):
return "complex"
# Enum is coerced to str or int respectively
if issubclass(t, Enum):
Expand All @@ -81,13 +81,13 @@ def coerce_from_date_types(
if to_type == "text":
return v.isoformat()
if to_type == "bigint":
return v.int_timestamp # type: ignore
return v.int_timestamp
if to_type == "double":
return v.timestamp() # type: ignore
return v.timestamp()
if to_type == "date":
return ensure_pendulum_date(v)
if to_type == "time":
return v.time() # type: ignore[no-any-return]
return v.time()
raise TypeError(f"Cannot convert timestamp to {to_type}")


Expand Down
67 changes: 57 additions & 10 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
Generic,
Final,
)
from contextlib import contextmanager
import datetime # noqa: 251
from copy import deepcopy
import inspect
Expand All @@ -32,18 +31,22 @@
UnknownDestinationModule,
)
from dlt.common.schema import Schema, TTableSchema, TSchemaTables
from dlt.common.schema.typing import TWriteDisposition
from dlt.common.schema.exceptions import InvalidDatasetName
from dlt.common.schema.utils import get_write_disposition, get_table_format
from dlt.common.configuration import configspec, with_config, resolve_configuration, known_sections
from dlt.common.schema.exceptions import SchemaException
from dlt.common.schema.utils import (
get_write_disposition,
get_table_format,
get_columns_names_with_prop,
has_column_with_prop,
get_first_column_name_with_prop,
)
from dlt.common.configuration import configspec, resolve_configuration, known_sections
from dlt.common.configuration.specs import BaseConfiguration, CredentialsConfiguration
from dlt.common.configuration.accessors import config
from dlt.common.destination.capabilities import DestinationCapabilitiesContext
from dlt.common.schema.utils import is_complete_column
from dlt.common.schema.exceptions import UnknownTableException
from dlt.common.storages import FileStorage
from dlt.common.storages.load_storage import ParsedLoadJobFileName
from dlt.common.utils import get_module_name
from dlt.common.configuration.specs import GcpCredentials, AwsCredentialsWithoutDefaults


Expand Down Expand Up @@ -252,7 +255,8 @@ def new_file_path(self) -> str:
class FollowupJob:
"""Adds a trait that allows to create a followup job"""

def create_followup_jobs(self, next_state: str) -> List[NewLoadJob]:
def create_followup_jobs(self, final_state: TLoadJobState) -> List[NewLoadJob]:
"""Return list of new jobs. `final_state` is state to which this job transits"""
return []


Expand Down Expand Up @@ -345,6 +349,49 @@ def _verify_schema(self) -> None:
table_name,
self.capabilities.max_identifier_length,
)
if has_column_with_prop(table, "hard_delete"):
if len(get_columns_names_with_prop(table, "hard_delete")) > 1:
raise SchemaException(
f'Found multiple "hard_delete" column hints for table "{table_name}" in'
f' schema "{self.schema.name}" while only one is allowed:'
f' {", ".join(get_columns_names_with_prop(table, "hard_delete"))}.'
)
if table.get("write_disposition") in ("replace", "append"):
logger.warning(
f"""The "hard_delete" column hint for column "{get_first_column_name_with_prop(table, 'hard_delete')}" """
f'in table "{table_name}" with write disposition'
f' "{table.get("write_disposition")}"'
f' in schema "{self.schema.name}" will be ignored.'
' The "hard_delete" column hint is only applied when using'
' the "merge" write disposition.'
)
if has_column_with_prop(table, "dedup_sort"):
if len(get_columns_names_with_prop(table, "dedup_sort")) > 1:
raise SchemaException(
f'Found multiple "dedup_sort" column hints for table "{table_name}" in'
f' schema "{self.schema.name}" while only one is allowed:'
f' {", ".join(get_columns_names_with_prop(table, "dedup_sort"))}.'
)
if table.get("write_disposition") in ("replace", "append"):
logger.warning(
f"""The "dedup_sort" column hint for column "{get_first_column_name_with_prop(table, 'dedup_sort')}" """
f'in table "{table_name}" with write disposition'
f' "{table.get("write_disposition")}"'
f' in schema "{self.schema.name}" will be ignored.'
' The "dedup_sort" column hint is only applied when using'
' the "merge" write disposition.'
)
if table.get("write_disposition") == "merge" and not has_column_with_prop(
table, "primary_key"
):
logger.warning(
f"""The "dedup_sort" column hint for column "{get_first_column_name_with_prop(table, 'dedup_sort')}" """
f'in table "{table_name}" with write disposition'
f' "{table.get("write_disposition")}"'
f' in schema "{self.schema.name}" will be ignored.'
' The "dedup_sort" column hint is only applied when a'
" primary key has been specified."
)
for column_name, column in dict(table["columns"]).items():
if len(column_name) > self.capabilities.max_column_identifier_length:
raise IdentifierTooLongException(
Expand All @@ -361,9 +408,9 @@ def _verify_schema(self) -> None:
" column manually in code ie. as a merge key?"
)

def get_load_table(self, table_name: str, prepare_for_staging: bool = False) -> TTableSchema:
if table_name not in self.schema.tables:
return None
def prepare_load_table(
self, table_name: str, prepare_for_staging: bool = False
) -> TTableSchema:
try:
# make a copy of the schema so modifications do not affect the original document
table = deepcopy(self.schema.tables[table_name])
Expand Down
Loading

0 comments on commit e60f2f1

Please sign in to comment.