Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

documents schema and data contract #782

Merged
merged 2 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions dlt/common/schema/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,14 @@ def __init__(
schema_name: str,
table_name: str,
column_name: str,
contract_entity: TSchemaContractEntities,
schema_entity: TSchemaContractEntities,
contract_mode: TSchemaEvolutionMode,
table_schema: Any,
schema_contract: TSchemaContractDict,
data_item: Any = None,
extended_info: str = None,
) -> None:
"""Raised when `data_item` violates `contract_mode` on a `contract_entity` as defined by `table_schema`
"""Raised when `data_item` violates `contract_mode` on a `schema_entity` as defined by `table_schema`

Schema, table and column names are given as a context and full `schema_contract` and causing `data_item` as an evidence.
"""
Expand All @@ -128,7 +128,7 @@ def __init__(
msg = (
"In "
+ msg
+ f" . Contract on {contract_entity} with mode {contract_mode} is violated. "
+ f" . Contract on {schema_entity} with mode {contract_mode} is violated. "
+ (extended_info or "")
)
super().__init__(msg)
Expand All @@ -137,7 +137,7 @@ def __init__(
self.column_name = column_name

# violated contract
self.contract_entity = contract_entity
self.schema_entity = schema_entity
self.contract_mode = contract_mode

# some evidence
Expand Down
12 changes: 5 additions & 7 deletions docs/examples/chess_production/chess.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dlt.common.typing import StrAny, TDataItems
from dlt.sources.helpers.requests import client


@dlt.source
def chess(
chess_url: str = dlt.config.value,
Expand Down Expand Up @@ -59,6 +60,7 @@ def players_games(username: Any) -> Iterator[TDataItems]:

MAX_PLAYERS = 5


def load_data_with_retry(pipeline, data):
try:
for attempt in Retrying(
Expand All @@ -68,9 +70,7 @@ def load_data_with_retry(pipeline, data):
reraise=True,
):
with attempt:
logger.info(
f"Running the pipeline, attempt={attempt.retry_state.attempt_number}"
)
logger.info(f"Running the pipeline, attempt={attempt.retry_state.attempt_number}")
load_info = pipeline.run(data)
logger.info(str(load_info))

Expand All @@ -92,9 +92,7 @@ def load_data_with_retry(pipeline, data):
# print the information on the first load package and all jobs inside
logger.info(f"First load package info: {load_info.load_packages[0]}")
# print the information on the first completed job in first load package
logger.info(
f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}"
)
logger.info(f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}")

# check for schema updates:
schema_updates = [p.schema_update for p in load_info.load_packages]
Expand Down Expand Up @@ -152,4 +150,4 @@ def load_data_with_retry(pipeline, data):
)
# get data for a few famous players
data = chess(chess_url="https://api.chess.com/pub/", max_players=MAX_PLAYERS)
load_data_with_retry(pipeline, data)
load_data_with_retry(pipeline, data)
8 changes: 4 additions & 4 deletions docs/examples/incremental_loading/zendesk.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@
from dlt.common.typing import TAnyDateTime
from dlt.sources.helpers.requests import client


@dlt.source(max_table_nesting=2)
def zendesk_support(
credentials: Dict[str, str] = dlt.secrets.value,
start_date: Optional[TAnyDateTime] = pendulum.datetime( # noqa: B008
year=2000, month=1, day=1
),
start_date: Optional[TAnyDateTime] = pendulum.datetime(year=2000, month=1, day=1), # noqa: B008
end_date: Optional[TAnyDateTime] = None,
):
"""
Expand Down Expand Up @@ -113,11 +112,12 @@ def get_pages(
if not response_json["end_of_stream"]:
get_url = response_json["next_page"]


if __name__ == "__main__":
# create dlt pipeline
pipeline = dlt.pipeline(
pipeline_name="zendesk", destination="duckdb", dataset_name="zendesk_data"
)

load_info = pipeline.run(zendesk_support())
print(load_info)
print(load_info)
2 changes: 2 additions & 0 deletions docs/examples/nested_data/nested_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

CHUNK_SIZE = 10000


# You can limit how deep dlt goes when generating child tables.
# By default, the library will descend and generate child tables
# for all nested lists, without a limit.
Expand Down Expand Up @@ -81,6 +82,7 @@ def load_documents(self) -> Iterator[TDataItem]:
while docs_slice := list(islice(cursor, CHUNK_SIZE)):
yield map_nested_in_place(convert_mongo_objs, docs_slice)


def convert_mongo_objs(value: Any) -> Any:
if isinstance(value, (ObjectId, Decimal128)):
return str(value)
Expand Down
4 changes: 3 additions & 1 deletion docs/examples/transformers/pokemon.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import dlt
from dlt.sources.helpers import requests


@dlt.source(max_table_nesting=2)
def source(pokemon_api_url: str):
""""""
Expand Down Expand Up @@ -46,6 +47,7 @@ def species(pokemon_details):

return (pokemon_list | pokemon, pokemon_list | pokemon | species)


if __name__ == "__main__":
# build duck db pipeline
pipeline = dlt.pipeline(
Expand All @@ -54,4 +56,4 @@ def species(pokemon_details):

# the pokemon_list resource does not need to be loaded
load_info = pipeline.run(source("https://pokeapi.co/api/v2/pokemon"))
print(load_info)
print(load_info)
3 changes: 1 addition & 2 deletions docs/website/docs/dlt-ecosystem/destinations/motherduck.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,4 @@ We also see them.
My observation is that if you write a lot of data into the database then close the connection and then open it again to write, there's a chance of such timeout. Possible **WAL** file is being written to the remote duckdb database.

### Invalid Input Error: Initialization function "motherduck_init" from file
Use `duckdb 0.8.1`

Use `duckdb 0.8.1` or above.
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ You can pass destination credentials and ignore the default lookup:
pipeline = dlt.pipeline(destination="postgres", credentials=dlt.secrets["postgres_dsn"])
```

:::Note
:::note
**dlt.config** and **dlt.secrets** can be also used as setters. For example:
```python
dlt.config["sheet_id"] = "23029402349032049"
Expand Down
81 changes: 0 additions & 81 deletions docs/website/docs/general-usage/data-contracts.md

This file was deleted.

8 changes: 7 additions & 1 deletion docs/website/docs/general-usage/resource.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ accepts following arguments:
> 💡 You can mark some resource arguments as [configuration and credentials](credentials)
> values so `dlt` can pass them automatically to your functions.

### Put a contract on a tables, columns and data
Use the `schema_contract` argument to tell dlt how to [deal with new tables, data types and bad data types](schema-contracts.md). For example if you set it to **freeze**, `dlt` will not allow for any new tables, columns or data types to be introduced to the schema - it will raise an exception. Learn more in on available contract modes [here](schema-contracts.md#setting-up-the-contract)

### Define a schema with Pydantic

You can alternatively use a [Pydantic](https://pydantic-docs.helpmanual.io/) model to define the schema.
Expand Down Expand Up @@ -106,6 +109,8 @@ def get_users():
The data types of the table columns are inferred from the types of the pydantic fields. These use the same type conversions
as when the schema is automatically generated from the data.

Pydantic models integrate well with [schema contracts](schema-contracts.md) as data validators.

Things to note:

- Fields with an `Optional` type are marked as `nullable`
Expand All @@ -131,6 +136,7 @@ behaviour of creating child tables for these fields.

We do not support `RootModel` that validate simple types. You can add such validator yourself, see [data filtering section](#filter-transform-and-pivot-data).


### Dispatch data to many tables

You can load data to many tables from a single resource. The most common case is a stream of events
Expand Down Expand Up @@ -307,7 +313,7 @@ assert list(r) == list(range(10))
> 💡 You cannot limit transformers. They should process all the data they receive fully to avoid
> inconsistencies in generated datasets.

### Set table and adjust schema
### Set table name and adjust schema

You can change the schema of a resource, be it standalone or as a part of a source. Look for method
named `apply_hints` which takes the same arguments as resource decorator. Obviously you should call
Expand Down
Loading
Loading