Skip to content

Commit

Permalink
schema contract (#594)
Browse files Browse the repository at this point in the history
* basic schema freezing

* small changes

* temp

* add new schema update mode

* fix linting errors and one bug

* move freeze code to schema

* some work on schema evolution modes

* add tests

* small tests change

* small fix

* fix some tests

* add global override for schema evolution

* finish implemention of global override

* better tests

* carry over schema settings on update

* add tests for single values

* small changes to tests and code

* fix small error

* add tests for data contract interaction

* fix tests

* some PR work

* update schema management

* fix schema related tests

* add nice schema tests

* add docs page

* small test fix

* smaller PR fixes

* more work

* tests update

* almost there

* tmp

* fix freeze tests

* cleanup

* create data contracts page

* small cleanup

* add pydantic dep to destination tests

* rename contract settings

* rename schema contract dict keys

* some work

* more work...

* more work

* move checking of new tables into extract function

* fix most tests

* fix linter after merge

* small cleanup

* post merge code updates

* small fixes

* some cleanup

* update docs

* makes bumping version optional in Schema, preserves hashes on replace schema content

* extracts on single pipeline schema

* allows to control relational normalizer descend with send

* refactors data contract apply to generate filters instead of actual filtering

* detects if bytes string possibly contains pue characters

* applies schema contracts in item normalizer, uses binary stream, detects pue to skip decoding

* methods to remove and rename arrow columns, need arrow 12+

* implements contracts in extract, fixes issues in apply hints, arrow data filtering still missing

* always uses pipeline schema when extracting

* returns new items count from buffered write

* bumps pyarrow to 12, temporary removes snowflake extra

* fixes arrow imports and normalizer config

* fixes normalizer config tests and pipeline state serialization

* normalizes arrow tables before saving

* adds validation and model synth for contracts to pydantic helper

* splits extractor into files, improves pydantic validator

* runs tests on ci with minimal dependencies

* fixes deps in ci workflows

* re-adds snowflake connector

* updates pydantic helper

* improves contract violation exception

* splits source and resource in extract, adds more tests

* temp disable pydantic 1 tests

* fixes generic type parametrization on 3.8

---------

Co-authored-by: Marcin Rudolf <[email protected]>
  • Loading branch information
sh-rp and rudolfix authored Nov 21, 2023
1 parent 105795c commit 02bac98
Show file tree
Hide file tree
Showing 119 changed files with 5,304 additions and 2,122 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction --all-extras --with airflow
run: poetry install --no-interaction --all-extras --with airflow --with docs --with providers --with sentry-sdk --with pipeline

# - name: Install self
# run: poetry install --no-interaction
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_airflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-airflow-runner

- name: Install dependencies
run: poetry install --no-interaction --with airflow -E duckdb -E parquet
run: poetry install --no-interaction --with airflow --with pipeline -E duckdb -E parquet --with sentry-sdk

- run: |
poetry run pytest tests/helpers/airflow_tests
Expand Down
65 changes: 46 additions & 19 deletions .github/workflows/test_common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,40 +55,67 @@ jobs:
virtualenvs-in-project: true
installer-parallel: true

- name: Load cached venv
id: cached-poetry-dependencies
uses: actions/cache@v3
with:
# path: ${{ steps.pip-cache.outputs.dir }}
path: .venv
key: venv-${{ matrix.os }}-${{ matrix.python-version }}-${{ hashFiles('**/poetry.lock') }}
# NOTE: do not cache. we want to have a clean state each run and we upgrade depdendencies later
# - name: Load cached venv
# id: cached-poetry-dependencies
# uses: actions/cache@v3
# with:
# # path: ${{ steps.pip-cache.outputs.dir }}
# path: .venv
# key: venv-${{ matrix.os }}-${{ matrix.python-version }}-${{ hashFiles('**/poetry.lock') }}

- name: Install dependencies
run: poetry install --no-interaction --with sentry-sdk

- run: |
poetry run pytest tests/common tests/normalize tests/reflection tests/sources tests/load/test_dummy_client.py tests/extract/test_extract.py tests/extract/test_sources.py tests/pipeline/test_pipeline_state.py
if: runner.os != 'Windows'
name: Run common tests with minimum dependencies Linux/MAC
- run: |
poetry run pytest tests/common tests/normalize tests/reflection tests/sources tests/load/test_dummy_client.py tests/extract/test_extract.py tests/extract/test_sources.py tests/pipeline/test_pipeline_state.py -m "not forked"
if: runner.os == 'Windows'
name: Run common tests with minimum dependencies Windows
shell: cmd
- name: Install dependencies + sentry
run: poetry install --no-interaction -E parquet -E pydantic && pip install sentry-sdk
- name: Install duckdb dependencies
run: poetry install --no-interaction -E duckdb --with sentry-sdk

- run: |
poetry run pytest tests/common tests/normalize tests/reflection tests/sources
poetry run pytest tests/pipeline/test_pipeline.py
if: runner.os != 'Windows'
name: Run tests Linux/MAC
name: Run pipeline smoke tests with minimum deps Linux/MAC
- run: |
poetry run pytest tests/common tests/normalize tests/reflection tests/sources -m "not forked"
poetry run pytest tests/pipeline/test_pipeline.py
if: runner.os == 'Windows'
name: Run tests Windows
name: Run smoke tests with minimum deps Windows
shell: cmd
- name: Install extra dependencies
run: poetry install --no-interaction -E duckdb -E cli -E parquet -E pydantic
- name: Install pipeline dependencies
run: poetry install --no-interaction -E duckdb -E cli -E parquet --with sentry-sdk --with pipeline

- run: |
poetry run pytest tests/extract tests/pipeline tests/cli/common
poetry run pytest tests/extract tests/pipeline tests/libs tests/cli/common
if: runner.os != 'Windows'
name: Run extra tests Linux/MAC
name: Run extract and pipeline tests Linux/MAC
- run: |
poetry run pytest tests/extract tests/pipeline tests/cli/common
poetry run pytest tests/extract tests/pipeline tests/libs tests/cli/common
if: runner.os == 'Windows'
name: Run extra tests Windows
name: Run extract tests Windows
shell: cmd
# - name: Install Pydantic 1.0
# run: pip install "pydantic<2"

# - run: |
# poetry run pytest tests/libs
# if: runner.os != 'Windows'
# name: Run extract and pipeline tests Linux/MAC
# - run: |
# poetry run pytest tests/libs
# if: runner.os == 'Windows'
# name: Run extract tests Windows
# shell: cmd

matrix_job_required_check:
name: Common tests
needs: run_common
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_dbt_runner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ jobs:

- name: Install dependencies
# install dlt with postgres support
run: poetry install --no-interaction -E postgres -E dbt
run: poetry install --no-interaction -E postgres -E dbt --with sentry-sdk

- run: |
poetry run pytest tests/helpers/dbt_tests -k '(not venv)'
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destination_athena.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ jobs:

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E athena
run: poetry install --no-interaction -E athena --with sentry-sdk --with pipeline

- run: |
poetry run pytest tests/load
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destination_athena_iceberg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ jobs:

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E athena
run: poetry install --no-interaction -E --with sentry-sdk --with pipeline

- run: |
poetry run pytest tests/load
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destination_bigquery.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ jobs:

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E bigquery --with providers -E parquet
run: poetry install --no-interaction -E bigquery --with providers -E parquet --with sentry-sdk --with pipeline

# - name: Install self
# run: poetry install --no-interaction
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destination_mssql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ jobs:
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-gcp

- name: Install dependencies
run: poetry install --no-interaction -E mssql -E s3 -E gs -E az -E parquet
run: poetry install --no-interaction -E mssql -E s3 -E gs -E az -E parquet --with sentry-sdk --with pipeline

- run: |
poetry run pytest tests/load --ignore tests/load/pipeline/test_dbt_helper.py
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destination_qdrant.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ jobs:
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-gcp

- name: Install dependencies
run: poetry install --no-interaction -E qdrant -E parquet
run: poetry install --no-interaction -E qdrant -E parquet --with sentry-sdk --with pipeline
- run: |
poetry run pytest tests/load/
if: runner.os != 'Windows'
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destination_snowflake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ jobs:
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-gcp

- name: Install dependencies
run: poetry install --no-interaction -E snowflake -E s3 -E gs -E az
run: poetry install --no-interaction -E snowflake -E s3 -E gs -E az --with sentry-sdk --with pipeline

- run: |
poetry run pytest tests/load
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/test_destination_synapse.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ on:
branches:
- master
- devel

workflow_dispatch:

env:
DESTINATION__SYNAPSE__CREDENTIALS: ${{ secrets.SYNAPSE_CREDENTIALS }}
DESTINATION__SYNAPSE__CREDENTIALS__PASSWORD: ${{ secrets.SYNAPSE_PASSWORD }}
Expand Down Expand Up @@ -42,7 +42,7 @@ jobs:
runs-on: ${{ matrix.os }}

steps:

- name: Check out
uses: actions/checkout@master

Expand Down Expand Up @@ -70,7 +70,7 @@ jobs:
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-gcp

- name: Install dependencies
run: poetry install --no-interaction -E synapse -E s3 -E gs -E az
run: poetry install --no-interaction -E synapse -E s3 -E gs -E az --with sentry-sdk --with pipeline

- run: |
poetry run pytest tests/load --ignore tests/load/pipeline/test_dbt_helper.py
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destination_weaviate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ jobs:
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-gcp

- name: Install dependencies
run: poetry install --no-interaction -E weaviate -E parquet
run: poetry install --no-interaction -E weaviate -E parquet --with sentry-sdk --with pipeline
- run: |
poetry run pytest tests/load/
if: runner.os != 'Windows'
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ jobs:

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E redshift -E gs -E s3 -E az -E parquet -E duckdb -E cli
run: poetry install --no-interaction -E redshift -E gs -E s3 -E az -E parquet -E duckdb -E cli --with sentry-sdk --with pipeline

# - name: Install self
# run: poetry install --no-interaction
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_doc_snippets.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ jobs:

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E duckdb -E weaviate -E parquet --with docs --without airflow
run: poetry install --no-interaction -E duckdb -E weaviate -E parquet --with docs --without airflow --with sentry-sdk --with pipeline

- name: Run linter and tests
run: make test-and-lint-snippets
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_local_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ jobs:
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-local-destinations

- name: Install dependencies
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate --with sentry-sdk --with pipeline

- run: poetry run pytest tests/load && poetry run pytest tests/cli
name: Run tests Linux
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ has-poetry:
poetry --version

dev: has-poetry
poetry install --all-extras --with airflow --with docs --with providers
poetry install --all-extras --with airflow --with docs --with providers --with pipeline --with sentry-sdk

lint:
./check-package.sh
Expand Down
14 changes: 9 additions & 5 deletions dlt/common/data_writers/buffered.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def __init__(
except TypeError:
raise InvalidFileNameTemplateException(file_name_template)

def write_data_item(self, item: TDataItems, columns: TTableSchemaColumns) -> None:
def write_data_item(self, item: TDataItems, columns: TTableSchemaColumns) -> int:
self._ensure_open()
# rotate file if columns changed and writer does not allow for that
# as the only allowed change is to add new column (no updates/deletes), we detect the change by comparing lengths
Expand All @@ -78,21 +78,24 @@ def write_data_item(self, item: TDataItems, columns: TTableSchemaColumns) -> Non
# until the first chunk is written we can change the columns schema freely
if columns is not None:
self._current_columns = dict(columns)

new_rows_count: int
if isinstance(item, List):
# items coming in single list will be written together, not matter how many are there
self._buffered_items.extend(item)
# update row count, if item supports "num_rows" it will be used to count items
if len(item) > 0 and hasattr(item[0], "num_rows"):
self._buffered_items_count += sum(tbl.num_rows for tbl in item)
new_rows_count = sum(tbl.num_rows for tbl in item)
else:
self._buffered_items_count += len(item)
new_rows_count = len(item)
else:
self._buffered_items.append(item)
# update row count, if item supports "num_rows" it will be used to count items
if hasattr(item, "num_rows"):
self._buffered_items_count += item.num_rows
new_rows_count = item.num_rows
else:
self._buffered_items_count += 1
new_rows_count = 1
self._buffered_items_count += new_rows_count
# flush if max buffer exceeded
if self._buffered_items_count >= self.buffer_max_items:
self._flush_items()
Expand All @@ -104,6 +107,7 @@ def write_data_item(self, item: TDataItems, columns: TTableSchemaColumns) -> Non
# rotate on max items
elif self.file_max_items and self._writer.items_count >= self.file_max_items:
self._rotate_file()
return new_rows_count

def write_empty_file(self, columns: TTableSchemaColumns) -> None:
if columns is not None:
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/data_writers/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ def __init__(self,
self.parquet_row_group_size = row_group_size

def _create_writer(self, schema: "pa.Schema") -> "pa.parquet.ParquetWriter":
from dlt.common.libs.pyarrow import pyarrow, get_py_arrow_datatype
from dlt.common.libs.pyarrow import pyarrow
return pyarrow.parquet.ParquetWriter(self._f, schema, flavor=self.parquet_flavor, version=self.parquet_version, data_page_size=self.parquet_data_page_size)

def write_header(self, columns_schema: TTableSchemaColumns) -> None:
Expand Down
5 changes: 5 additions & 0 deletions dlt/common/json/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ def custom_pua_remove(obj: Any) -> Any:
return obj


def may_have_pua(line: bytes) -> bool:
"""Checks if bytes string contains pua marker"""
return b'\xef\x80' in line


# pick the right impl
json: SupportsJson = None
if os.environ.get("DLT_USE_JSON") == "simplejson":
Expand Down
Loading

0 comments on commit 02bac98

Please sign in to comment.