Skip to content

Commit

Permalink
Merge branch 'devel' into d#/data_contracts
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Oct 29, 2023
2 parents 912dd8b + b61ea0b commit ef1b10f
Show file tree
Hide file tree
Showing 100 changed files with 2,651 additions and 272 deletions.
4 changes: 2 additions & 2 deletions .github/ISSUE_TEMPLATE/bug_report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ body:
attributes:
value: |
Thanks for reporting a bug for dlt! Please fill out the sections below.
If you are not sure if this is a bug or not, please join our [Slack](https://join.slack.com/t/dlthub-community/shared_invite/zt-1slox199h-HAE7EQoXmstkP_bTqal65g)
If you are not sure if this is a bug or not, please join our [Slack](https://join.slack.com/t/dlthub-community/shared_invite/zt-1n5193dbq-rCBmJ6p~ckpSFK4hCF2dYA)
and ask in the #3-technical-help channel.
- type: input
attributes:
Expand All @@ -34,7 +34,7 @@ body:
attributes:
label: Steps to reproduce
description: >
How can we replicate the issue? If it's not straightforward to reproduce, please join our [Slack](https://join.slack.com/t/dlthub-community/shared_invite/zt-1slox199h-HAE7EQoXmstkP_bTqal65g)
How can we replicate the issue? If it's not straightforward to reproduce, please join our [Slack](https://join.slack.com/t/dlthub-community/shared_invite/zt-1n5193dbq-rCBmJ6p~ckpSFK4hCF2dYA)
and ask in the #3-technical-help channel.
placeholder: >
Provide a step-by-step description of how to reproduce the problem you are running into.
Expand Down
2 changes: 1 addition & 1 deletion .github/ISSUE_TEMPLATE/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
blank_issues_enabled: true
contact_links:
- name: Ask a question or get support on dlt Slack
url: https://join.slack.com/t/dlthub-community/shared_invite/zt-1slox199h-HAE7EQoXmstkP_bTqal65g
url: https://join.slack.com/t/dlthub-community/shared_invite/zt-1n5193dbq-rCBmJ6p~ckpSFK4hCF2dYA
about: Need help or support? Join our dlt community on Slack and get assistance.
2 changes: 1 addition & 1 deletion .github/ISSUE_TEMPLATE/feature_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ body:
attributes:
value: |
Thanks for suggesting a feature for dlt!
If you like to discuss your idea first, please join our [Slack](https://join.slack.com/t/dlthub-community/shared_invite/zt-1slox199h-HAE7EQoXmstkP_bTqal65g)
If you like to discuss your idea first, please join our [Slack](https://join.slack.com/t/dlthub-community/shared_invite/zt-1n5193dbq-rCBmJ6p~ckpSFK4hCF2dYA)
and pose your questions in the #3-technical-help channel.
For minor features and improvements, feel free to open a [pull request](https://github.com/dlt-hub/dlt/pulls) directly.
- type: textarea
Expand Down
86 changes: 86 additions & 0 deletions .github/workflows/test_dbt_cloud.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@

name: test dbt cloud

on:
pull_request:
branches:
- master
- devel
workflow_dispatch:

env:
# all credentials must be present to be passed to dbt cloud
DBT_CLOUD__ACCOUNT_ID: ${{ secrets.DBT_CLOUD__ACCOUNT_ID }}
DBT_CLOUD__JOB_ID: ${{ secrets.DBT_CLOUD__JOB_ID }}
DBT_CLOUD__API_TOKEN: ${{ secrets.DBT_CLOUD__API_TOKEN }}

RUNTIME__LOG_LEVEL: ERROR

jobs:
get_docs_changes:
uses: ./.github/workflows/get_docs_changes.yml
if: ${{ !github.event.pull_request.head.repo.fork }}

run_dbt_cloud:
name: Tests dbt cloud
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
strategy:
fail-fast: false
matrix:
os: ["ubuntu-latest"]
# os: ["ubuntu-latest", "macos-latest", "windows-latest"]
defaults:
run:
shell: bash
runs-on: ${{ matrix.os }}

steps:

- name: Check out
uses: actions/checkout@master

- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: "3.10.x"

- name: Install Poetry without dbt
uses: snok/[email protected]
with:
virtualenvs-create: true
virtualenvs-in-project: true
installer-parallel: true

- name: Load cached venv
id: cached-poetry-dependencies
uses: actions/cache@v3
with:
# path: ${{ steps.pip-cache.outputs.dir }}
path: .venv
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-dbt-cloud

- name: Install dependencies
# install dlt with postgres support
run: poetry install --no-interaction

- run: |
poetry run pytest tests/helpers/dbt_cloud_tests -k '(not venv)'
if: runner.os != 'Windows'
name: Run dbt cloud - Linux/MAC
- run: |
poetry run pytest tests/helpers/dbt_cloud_tests -k "(not venv)"
if: runner.os == 'Windows'
name: Run dbt cloud - Windows
shell: cmd
matrix_job_required_check:
name: dbt cloud tests
needs: run_dbt_cloud
runs-on: ubuntu-latest
if: always()
steps:
- name: Check matrix job results
if: contains(needs.*.result, 'failure') || contains(needs.*.result, 'cancelled')
run: |
echo "One or more matrix job tests failed or were cancelled. You may need to re-run them." && exit 1
6 changes: 6 additions & 0 deletions .github/workflows/test_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ env:
DESTINATION__FILESYSTEM__CREDENTIALS__AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
DESTINATION__FILESYSTEM__CREDENTIALS__AZURE_STORAGE_ACCOUNT_NAME: dltdata
DESTINATION__FILESYSTEM__CREDENTIALS__AZURE_STORAGE_ACCOUNT_KEY: ${{ secrets.AZURE_STORAGE_ACCOUNT_KEY }}

# For s3 compatible tests
TESTS__R2_AWS_ACCESS_KEY_ID: a4950a5003b26f5a71ac97ef3848ff4c
TESTS__R2_AWS_SECRET_ACCESS_KEY: ${{ secrets.CLOUDFLARE_R2_SECRET_ACCESS_KEY }}
TESTS__R2_ENDPOINT_URL: https://9830548e4e4b582989be0811f2a0a97f.r2.cloudflarestorage.com

# DESTINATION__ATHENA__CREDENTIALS__AWS_ACCESS_KEY_ID: AKIAT4QMVMC4J46G55G4
# DESTINATION__ATHENA__CREDENTIALS__AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
# DESTINATION__ATHENA__CREDENTIALS__REGION_NAME: eu-central-1
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_doc_snippets.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E duckdb -E weaviate --with docs --without airflow
run: poetry install --no-interaction -E duckdb -E weaviate -E parquet --with docs --without airflow

- name: Run linter and tests
run: make test-and-lint-snippets
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Be it a Google Colab notebook, AWS Lambda function, an Airflow DAG, your local l
</h3>

<div align="center">
<a target="_blank" href="https://join.slack.com/t/dlthub-community/shared_invite/zt-1slox199h-HAE7EQoXmstkP_bTqal65g" style="background:none">
<a target="_blank" href="https://join.slack.com/t/dlthub-community/shared_invite/zt-1n5193dbq-rCBmJ6p~ckpSFK4hCF2dYA" style="background:none">
<img src="https://img.shields.io/badge/slack-join-dlt.svg?labelColor=191937&color=6F6FF7&logo=slack" style="width: 260px;" />
</a>
</div>
Expand Down Expand Up @@ -90,7 +90,7 @@ You can find examples for various use cases in the [examples](docs/examples) fol

The dlt project is quickly growing, and we're excited to have you join our community! Here's how you can get involved:

- **Connect with the Community**: Join other dlt users and contributors on our [Slack](https://join.slack.com/t/dlthub-community/shared_invite/zt-1slox199h-HAE7EQoXmstkP_bTqal65g)
- **Connect with the Community**: Join other dlt users and contributors on our [Slack](https://join.slack.com/t/dlthub-community/shared_invite/zt-1n5193dbq-rCBmJ6p~ckpSFK4hCF2dYA)
- **Report issues and suggest features**: Please use the [GitHub Issues](https://github.com/dlt-hub/dlt/issues) to report bugs or suggest new features. Before creating a new issue, make sure to search the tracker for possible duplicates and add a comment if you find one.
- **Track progress of our work and our plans**: Please check out our [public Github project](https://github.com/orgs/dlt-hub/projects/9)
- **Contribute Verified Sources**: Contribute your custom sources to the [dlt-hub/verified-sources](https://github.com/dlt-hub/verified-sources) to help other folks in handling their data tasks.
Expand Down
10 changes: 10 additions & 0 deletions dlt/common/configuration/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import os
from typing import Any, Mapping, Type, Tuple, NamedTuple, Sequence

from dlt.common.exceptions import DltException, TerminalException
from dlt.common.utils import main_module_file_path


class LookupTrace(NamedTuple):
Expand Down Expand Up @@ -48,6 +50,14 @@ def __str__(self) -> str:
msg += f'\tfor field "{f}" config providers and keys were tried in following order:\n'
for tr in field_traces:
msg += f'\t\tIn {tr.provider} key {tr.key} was not found.\n'
# check if entry point is run with path. this is common problem so warn the user
main_path = main_module_file_path()
main_dir = os.path.dirname(main_path)
abs_main_dir = os.path.abspath(main_dir)
if abs_main_dir != os.getcwd():
# directory was specified
msg += "WARNING: dlt looks for .dlt folder in your current working directory and your cwd (%s) is different from directory of your pipeline script (%s).\n" % (os.getcwd(), abs_main_dir)
msg += "If you keep your secret files in the same folder as your pipeline script but run your script from some other folder, secrets/configs will not be found\n"
msg += "Please refer to https://dlthub.com/docs/general-usage/credentials for more information\n"
return msg

Expand Down
11 changes: 8 additions & 3 deletions dlt/common/configuration/specs/aws_credentials.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Optional, Dict, Any

from dlt.common.exceptions import MissingDependencyException
from dlt.common.typing import TSecretStrValue
from dlt.common.typing import TSecretStrValue, DictStrAny
from dlt.common.configuration.specs import CredentialsConfiguration, CredentialsWithDefault, configspec
from dlt.common.configuration.specs.exceptions import InvalidBoto3Session
from dlt import version
Expand All @@ -15,15 +15,20 @@ class AwsCredentialsWithoutDefaults(CredentialsConfiguration):
aws_session_token: Optional[TSecretStrValue] = None
profile_name: Optional[str] = None
region_name: Optional[str] = None
endpoint_url: Optional[str] = None

def to_s3fs_credentials(self) -> Dict[str, Optional[str]]:
"""Dict of keyword arguments that can be passed to s3fs"""
return dict(
credentials: DictStrAny = dict(
key=self.aws_access_key_id,
secret=self.aws_secret_access_key,
token=self.aws_session_token,
profile=self.profile_name
profile=self.profile_name,
endpoint_url=self.endpoint_url,
)
if self.region_name:
credentials["client_kwargs"] = {"region_name": self.region_name}
return credentials

def to_native_representation(self) -> Dict[str, Optional[str]]:
"""Return a dict that can be passed as kwargs to boto3 session"""
Expand Down
3 changes: 3 additions & 0 deletions dlt/common/configuration/specs/known_sections.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,6 @@

DBT_PACKAGE_RUNNER = "dbt_package_runner"
"""dbt package runner configuration (DBTRunnerConfiguration)"""

DBT_CLOUD = "dbt_cloud"
"""dbt cloud helpers configuration (DBTCloudConfiguration)"""
18 changes: 16 additions & 2 deletions dlt/common/data_writers/buffered.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import gzip
from functools import reduce
from typing import List, IO, Any, Optional, Type, TypeVar, Generic

from dlt.common.utils import uniq_id
Expand Down Expand Up @@ -58,6 +59,7 @@ def __init__(
self._current_columns: TTableSchemaColumns = None
self._file_name: str = None
self._buffered_items: List[TDataItem] = []
self._buffered_items_count: int = 0
self._writer: TWriter = None
self._file: IO[Any] = None
self._closed = False
Expand All @@ -79,10 +81,20 @@ def write_data_item(self, item: TDataItems, columns: TTableSchemaColumns) -> Non
if isinstance(item, List):
# items coming in single list will be written together, not matter how many are there
self._buffered_items.extend(item)
# update row count, if item supports "num_rows" it will be used to count items
if len(item) > 0 and hasattr(item[0], "num_rows"):
self._buffered_items_count += sum(tbl.num_rows for tbl in item)
else:
self._buffered_items_count += len(item)
else:
self._buffered_items.append(item)
# update row count, if item supports "num_rows" it will be used to count items
if hasattr(item, "num_rows"):
self._buffered_items_count += item.num_rows
else:
self._buffered_items_count += 1
# flush if max buffer exceeded
if len(self._buffered_items) >= self.buffer_max_items:
if self._buffered_items_count >= self.buffer_max_items:
self._flush_items()
# rotate the file if max_bytes exceeded
if self._file:
Expand Down Expand Up @@ -118,7 +130,7 @@ def _rotate_file(self) -> None:
self._file_name = self.file_name_template % uniq_id(5) + "." + self._file_format_spec.file_extension

def _flush_items(self, allow_empty_file: bool = False) -> None:
if len(self._buffered_items) > 0 or allow_empty_file:
if self._buffered_items_count > 0 or allow_empty_file:
# we only open a writer when there are any items in the buffer and first flush is requested
if not self._writer:
# create new writer and write header
Expand All @@ -131,7 +143,9 @@ def _flush_items(self, allow_empty_file: bool = False) -> None:
# write buffer
if self._buffered_items:
self._writer.write_data(self._buffered_items)
# reset buffer and counter
self._buffered_items.clear()
self._buffered_items_count = 0

def _flush_and_close_file(self) -> None:
# if any buffered items exist, flush them
Expand Down
2 changes: 2 additions & 0 deletions dlt/common/data_writers/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ def write_data(self, rows: Sequence[Any]) -> None:
self.writer.write_batch(row)
else:
raise ValueError(f"Unsupported type {type(row)}")
# count rows that got written
self.items_count += row.num_rows

@classmethod
def data_format(cls) -> TFileFormatSpec:
Expand Down
12 changes: 12 additions & 0 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,18 @@ def set_schema_contract(self, settings: TSchemaContract, update_table_settings:
if not table.get("parent"):
table["schema_contract"] = settings

def add_type_detection(self, detection: TTypeDetections) -> None:
"""Add type auto detection to the schema."""
if detection not in self.settings["detections"]:
self.settings["detections"].append(detection)
self._compile_settings()

def remove_type_detection(self, detection: TTypeDetections) -> None:
"""Adds type auto detection to the schema."""
if detection in self.settings["detections"]:
self.settings["detections"].remove(detection)
self._compile_settings()

def _infer_column(self, k: str, v: Any, data_type: TDataType = None, is_variant: bool = False) -> TColumnSchema:
column_schema = TColumnSchema(
name=k,
Expand Down
3 changes: 1 addition & 2 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,5 +709,4 @@ def standard_hints() -> Dict[TColumnHint, List[TSimpleRegex]]:


def standard_type_detections() -> List[TTypeDetections]:
return ["timestamp", "iso_timestamp"]

return ["iso_timestamp"]
14 changes: 14 additions & 0 deletions dlt/destinations/duckdb/duck.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.data_types import TDataType
from dlt.common.exceptions import TerminalValueError
from dlt.common.schema import TColumnSchema, TColumnHint, Schema
from dlt.common.destination.reference import LoadJob, FollowupJob, TLoadJobState
from dlt.common.schema.typing import TTableSchema, TColumnType, TTableFormat
Expand Down Expand Up @@ -63,6 +64,9 @@ class DuckDbTypeMapper(TypeMapper):
"INTEGER": "bigint",
"BIGINT": "bigint",
"HUGEINT": "bigint",
"TIMESTAMP_S": "timestamp",
"TIMESTAMP_MS": "timestamp",
"TIMESTAMP_NS": "timestamp",
}

def to_db_integer_type(self, precision: Optional[int], table_format: TTableFormat = None) -> str:
Expand All @@ -79,6 +83,16 @@ def to_db_integer_type(self, precision: Optional[int], table_format: TTableForma
return "BIGINT"
return "HUGEINT"

def to_db_datetime_type(self, precision: Optional[int], table_format: TTableFormat = None) -> str:
if precision is None or precision == 6:
return super().to_db_datetime_type(precision, table_format)
if precision == 0:
return "TIMESTAMP_S"
if precision == 3:
return "TIMESTAMP_MS"
if precision == 9:
return "TIMESTAMP_NS"
raise TerminalValueError(f"timestamp {precision} cannot be mapped into duckdb TIMESTAMP typ")

def from_db_type(self, db_type: str, precision: Optional[int], scale: Optional[int]) -> TColumnType:
# duckdb provides the types with scale and precision
Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None:
for search_prefix in truncate_prefixes:
if item.startswith(search_prefix):
# NOTE: deleting in chunks on s3 does not raise on access denied, file non existing and probably other errors
logger.info(f"DEL {item}")
# logger.info(f"DEL {item}")
# print(f"DEL {item}")
self.fs_client.rm(item)
except FileNotFoundError:
Expand Down
9 changes: 5 additions & 4 deletions dlt/destinations/motherduck/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ def _token_to_password(self) -> None:
self.password = TSecretValue(self.query.pop("token"))

def borrow_conn(self, read_only: bool) -> Any:
from duckdb import HTTPException
from duckdb import HTTPException, InvalidInputException
try:
return super().borrow_conn(read_only)
except HTTPException as http_ex:
if http_ex.status_code == 403 and 'Failed to download extension "motherduck"' in str(http_ex):
except (InvalidInputException, HTTPException) as ext_ex:
if 'Failed to download extension' in str(ext_ex) and "motherduck" in str(ext_ex):
from importlib.metadata import version as pkg_version
raise MotherduckLocalVersionNotSupported(pkg_version("duckdb")) from http_ex
raise MotherduckLocalVersionNotSupported(pkg_version("duckdb")) from ext_ex

raise

def parse_native_representation(self, native_value: Any) -> None:
Expand Down
Loading

0 comments on commit ef1b10f

Please sign in to comment.