Skip to content

Commit

Permalink
reorganizes api reference
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Sep 19, 2023
1 parent 7a0edb8 commit 682d9b3
Show file tree
Hide file tree
Showing 137 changed files with 97 additions and 9,168 deletions.
19 changes: 9 additions & 10 deletions dlt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,20 @@
How to create a data loading pipeline with dlt in 3 seconds:
1. Write a pipeline script
>>> import dlt
>>> from dlt.sources.helpers import requests
>>> dlt.run(requests.get("https://pokeapi.co/api/v2/pokemon/").json()["results"], destination="duckdb", table_name="pokemon")
1. Write a pipeline script
>>> import dlt
>>> from dlt.sources.helpers import requests
>>> dlt.run(requests.get("https://pokeapi.co/api/v2/pokemon/").json()["results"], destination="duckdb", table_name="pokemon")
2. Run your pipeline script
$ python pokemon.py
3. See and query your data with autogenerated Streamlit app
$ dlt pipeline dlt_pokemon show
2. Run your pipeline script
> $ python pokemon.py
3. See and query your data with autogenerated Streamlit app
> $ dlt pipeline dlt_pokemon show
Or start with our pipeline template with sample PokeAPI (pokeapi.co) data loaded to bigquery
$ dlt init pokemon bigquery
> $ dlt init pokemon bigquery
For more detailed info, see https://dlthub.com/docs/getting-started
"""
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/configuration/specs/base_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def parse_native_representation(self, native_value: Any) -> None:
"""Initialize the configuration fields by parsing the `native_value` which should be a native representation of the configuration
or credentials, for example database connection string or JSON serialized GCP service credentials file.
### Args:
#### Args:
native_value (Any): A native representation of the configuration
Raises:
Expand Down
52 changes: 26 additions & 26 deletions dlt/common/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ def source_state() -> DictStrAny:
"""Returns a dictionary with the source-scoped state. Source-scoped state may be shared across the resources of a particular source. Please avoid using source scoped state. Check
the `resource_state` function for resource-scoped state that is visible within particular resource. Dlt state is preserved across pipeline runs and may be used to implement incremental loads.
### Summary
#### Note:
The source state is a python dictionary-like object that is available within the `@dlt.source` and `@dlt.resource` decorated functions and may be read and written to.
The data within the state is loaded into destination together with any other extracted data and made automatically available to the source/resource extractor functions when they are run next time.
When using the state:
Expand Down Expand Up @@ -374,31 +374,31 @@ def resource_state(resource_name: str = None, source_state_: Optional[DictStrAny
Note that this function accepts the resource name as optional argument. There are rare cases when `dlt` is not able to resolve resource name due to requesting function
working in different thread than the main. You'll need to pass the name explicitly when you request resource_state from async functions or functions decorated with @defer.
### Summary
The resource state is a python dictionary-like object that is available within the `@dlt.resource` decorated functions and may be read and written to.
The data within the state is loaded into destination together with any other extracted data and made automatically available to the source/resource extractor functions when they are run next time.
When using the state:
* The resource state is scoped to a particular resource requesting it.
* Any JSON-serializable values can be written and the read from the state. `dlt` dumps and restores instances of Python bytes, DateTime, Date and Decimal types.
* The state available in the resource decorated function is writable and written values will be available on the next pipeline run
### Example
The most typical use case for the state is to implement incremental load.
>>> @dlt.resource(write_disposition="append")
>>> def players_games(chess_url, players, start_month=None, end_month=None):
>>> checked_archives = dlt.current.resource_state().setdefault("archives", [])
>>> archives = players_archives(chess_url, players)
>>> for url in archives:
>>> if url in checked_archives:
>>> print(f"skipping archive {url}")
>>> continue
>>> else:
>>> print(f"getting archive {url}")
>>> checked_archives.append(url)
>>> # get the filtered archive
>>> r = requests.get(url)
>>> r.raise_for_status()
>>> yield r.json().get("games", [])
Summary:
The resource state is a python dictionary-like object that is available within the `@dlt.resource` decorated functions and may be read and written to.
The data within the state is loaded into destination together with any other extracted data and made automatically available to the source/resource extractor functions when they are run next time.
When using the state:
* The resource state is scoped to a particular resource requesting it.
* Any JSON-serializable values can be written and the read from the state. `dlt` dumps and restores instances of Python bytes, DateTime, Date and Decimal types.
* The state available in the resource decorated function is writable and written values will be available on the next pipeline run
Example:
The most typical use case for the state is to implement incremental load.
>>> @dlt.resource(write_disposition="append")
>>> def players_games(chess_url, players, start_month=None, end_month=None):
>>> checked_archives = dlt.current.resource_state().setdefault("archives", [])
>>> archives = players_archives(chess_url, players)
>>> for url in archives:
>>> if url in checked_archives:
>>> print(f"skipping archive {url}")
>>> continue
>>> else:
>>> print(f"getting archive {url}")
>>> checked_archives.append(url)
>>> # get the filtered archive
>>> r = requests.get(url)
>>> r.raise_for_status()
>>> yield r.json().get("games", [])
Here we store all the urls with game archives in the state and we skip loading them on next run. The archives are immutable. The state will grow with the coming months (and more players).
Up to few thousand archives we should be good though.
Expand Down
18 changes: 9 additions & 9 deletions dlt/extract/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,13 @@ def source(
) -> Any:
"""A decorator that transforms a function returning one or more `dlt resources` into a `dlt source` in order to load it with `dlt`.
### Summary
#### Note:
A `dlt source` is a logical grouping of resources that are often extracted and loaded together. A source is associated with a schema, which describes the structure of the loaded data and provides instructions how to load it.
Such schema contains table schemas that describe the structure of the data coming from the resources.
Please refer to https://dlthub.com/docs/general-usage/source for a complete documentation.
### Passing credentials
#### Credentials:
Another important function of the source decorator is to provide credentials and other configuration to the code that extracts data. The decorator may automatically bind the source function arguments to the secret and config values.
>>> @dlt.source
>>> def chess(username, chess_url: str = dlt.config.value, api_secret = dlt.secrets.value, title: str = "GM"):
Expand All @@ -98,7 +98,7 @@ def source(
Here `username` is a required, explicit python argument, `chess_url` is a required argument, that if not explicitly passed will be taken from configuration ie. `config.toml`, `api_secret` is a required argument, that if not explicitly passed will be taken from dlt secrets ie. `secrets.toml`.
See https://dlthub.com/docs/general-usage/credentials for details.
### Args:
#### Args:
func: A function that returns a dlt resource or a list of those or a list of any data items that can be loaded by `dlt`.
name (str, optional): A name of the source which is also the name of the associated schema. If not present, the function name will be used.
Expand Down Expand Up @@ -251,14 +251,14 @@ def resource(
) -> Any:
"""When used as a decorator, transforms any generator (yielding) function into a `dlt resource`. When used as a function, it transforms data in `data` argument into a `dlt resource`.
### Summary
#### Note:
A `resource`is a location within a `source` that holds the data with specific structure (schema) or coming from specific origin. A resource may be a rest API endpoint, table in the database or a tab in Google Sheets.
A `dlt resource` is python representation of a `resource` that combines both data and metadata (table schema) that describes the structure and instructs the loading of the data.
A `dlt resource` is also an `Iterable` and can used like any other iterable object ie. list or tuple.
Please refer to https://dlthub.com/docs/general-usage/resource for a complete documentation.
### Passing credentials
#### Credentials:
If used as a decorator (`data` argument is a `Generator`), it may automatically bind the source function arguments to the secret and config values.
>>> @dlt.resource
>>> def user_games(username, chess_url: str = dlt.config.value, api_secret = dlt.secrets.value):
Expand All @@ -270,7 +270,7 @@ def resource(
See https://dlthub.com/docs/general-usage/credentials for details.
Note that if decorated function is an inner function, passing of the credentials will be disabled.
### Args:
#### Args:
data (Callable | Any, optional): a function to be decorated or a data compatible with `dlt` `run`.
name (str, optional): A name of the resource that by default also becomes the name of the table to which the data is loaded.
Expand All @@ -297,7 +297,7 @@ def resource(
depends_on (TUnboundDltResource, optional): Allows to pipe data from one resource to another to build multi-step pipelines.
### Raises
Raises:
ResourceNameMissing: indicates that name of the resource cannot be inferred from the `data` being passed.
InvalidResourceDataType: indicates that the `data` argument cannot be converted into `dlt resource`
Expand Down Expand Up @@ -428,7 +428,7 @@ def transformer( # type: ignore
You can bind the transformer early by specifying resource in `data_from` when the transformer is created or create dynamic bindings later with | operator
which is demonstrated in example below:
### Example
Example:
>>> @dlt.resource
>>> def players(title, chess_url=dlt.config.value):
>>> r = requests.get(f"{chess_url}titled/{title}")
Expand All @@ -444,7 +444,7 @@ def transformer( # type: ignore
>>> # pipes the data from players into player profile to produce a list of player profiles
>>> list(players("GM") | player_profile)
### Args:
Args:
f: (Callable): a function taking minimum one argument of TDataItems type which will receive data yielded from `data_from` resource.
data_from (Callable | Any, optional): a resource that will send data to the decorated function `f`
Expand Down
1 change: 0 additions & 1 deletion dlt/extract/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,6 @@ def __delitem__(self, resource_name: str) -> None:
class DltSource(Iterable[TDataItem]):
"""Groups several `dlt resources` under a single schema and allows to perform operations on them.
### Summary
The instance of this class is created whenever you call the `dlt.source` decorated function. It automates several functions for you:
* You can pass this instance to `dlt` `run` method in order to load all data present in the `dlt resources`.
* You can select and deselect resources that you want to load via `with_resources` method
Expand Down
2 changes: 1 addition & 1 deletion dlt/helpers/streamlit_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def _query_data_live(query: str, schema_name: str = None) -> pd.DataFrame:
def write_data_explorer_page(pipeline: Pipeline, schema_name: str = None, show_dlt_tables: bool = False, example_query: str = "", show_charts: bool = True) -> None:
"""Writes Streamlit app page with a schema and live data preview.
### Args:
#### Args:
pipeline (Pipeline): Pipeline instance to use.
schema_name (str, optional): Name of the schema to display. If None, default schema is used.
show_dlt_tables (bool, optional): Should show DLT internal tables. Defaults to False.
Expand Down
16 changes: 8 additions & 8 deletions dlt/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def pipeline(
) -> Pipeline:
"""Creates a new instance of `dlt` pipeline, which moves the data from the source ie. a REST API to a destination ie. database or a data lake.
### Summary
#### Note:
The `pipeline` functions allows you to pass the destination name to which the data should be loaded, the name of the dataset and several other options that govern loading of the data.
The created `Pipeline` object lets you load the data from any source with `run` method or to have more granular control over the loading process with `extract`, `normalize` and `load` methods.
Expand All @@ -41,7 +41,7 @@ def pipeline(
- Pipeline architecture and data loading steps: https://dlthub.com/docs/reference
- List of supported destinations: https://dlthub.com/docs/dlt-ecosystem/destinations
### Args:
#### Args:
pipeline_name (str, optional): A name of the pipeline that will be used to identify it in monitoring events and to restore its state and data schemas on subsequent runs.
Defaults to the file name of pipeline script with `dlt_` prefix added.
Expand Down Expand Up @@ -73,7 +73,7 @@ def pipeline(
`extract`, `normalize` and `load` stage. Pass a string with a collector name or configure your own by choosing from `dlt.progress` module.
We support most of the progress libraries: try passing `tqdm`, `enlighten` or `alive_progress` or `log` to write to console/log.
### Returns:
#### Returns:
Pipeline: An instance of `Pipeline` class with. Please check the documentation of `run` method for information on what to do with it.
"""

Expand Down Expand Up @@ -179,7 +179,7 @@ def run(
) -> LoadInfo:
"""Loads the data in `data` argument into the destination specified in `destination` and dataset specified in `dataset_name`.
### Summary
#### Note:
This method will `extract` the data from the `data` argument, infer the schema, `normalize` the data into a load package (ie. jsonl or PARQUET files representing tables) and then `load` such packages into the `destination`.
The data may be supplied in several forms:
Expand All @@ -190,12 +190,12 @@ def run(
Please note that `dlt` deals with `bytes`, `datetime`, `decimal` and `uuid` objects so you are free to load binary data or documents containing dates.
### Execution
#### Execution:
The `run` method will first use `sync_destination` method to synchronize pipeline state and schemas with the destination. You can disable this behavior with `restore_from_destination` configuration option.
Next it will make sure that data from the previous is fully processed. If not, `run` method normalizes and loads pending data items.
Only then the new data from `data` argument is extracted, normalized and loaded.
### Args:
#### Args:
data (Any): Data to be loaded to destination
destination (str | DestinationReference, optional): A name of the destination to which dlt will load the data, or a destination module imported from `dlt.destination`.
Expand All @@ -220,9 +220,9 @@ def run(
schema (Schema, optional): An explicit `Schema` object in which all table schemas will be grouped. By default `dlt` takes the schema from the source (if passed in `data` argument) or creates a default one itself.
### Raises:
Raises:
PipelineStepFailed when a problem happened during `extract`, `normalize` or `load` steps.
### Returns:
Returns:
LoadInfo: Information on loaded data including the list of package ids and failed job statuses. Please not that `dlt` will not raise if a single job terminally fails. Such information is provided via LoadInfo.
"""
destination = DestinationReference.from_name(destination)
Expand Down
12 changes: 6 additions & 6 deletions dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ def run(
) -> LoadInfo:
"""Loads the data from `data` argument into the destination specified in `destination` and dataset specified in `dataset_name`.
### Summary
#### Note:
This method will `extract` the data from the `data` argument, infer the schema, `normalize` the data into a load package (ie. jsonl or PARQUET files representing tables) and then `load` such packages into the `destination`.
The data may be supplied in several forms:
Expand All @@ -404,12 +404,12 @@ def run(
Please note that `dlt` deals with `bytes`, `datetime`, `decimal` and `uuid` objects so you are free to load documents containing ie. binary data or dates.
### Execution
#### Execution:
The `run` method will first use `sync_destination` method to synchronize pipeline state and schemas with the destination. You can disable this behavior with `restore_from_destination` configuration option.
Next it will make sure that data from the previous is fully processed. If not, `run` method normalizes, loads pending data items and **exits**
If there was no pending data, new data from `data` argument is extracted, normalized and loaded.
### Args:
#### Args:
data (Any): Data to be loaded to destination
destination (str | DestinationReference, optional): A name of the destination to which dlt will load the data, or a destination module imported from `dlt.destination`.
Expand Down Expand Up @@ -439,9 +439,9 @@ def run(
loader_file_format (Literal["jsonl", "insert_values", "parquet"], optional). The file format the loader will use to create the load package. Not all file_formats are compatible with all destinations. Defaults to the preferred file format of the selected destination.
### Raises:
Raises:
PipelineStepFailed when a problem happened during `extract`, `normalize` or `load` steps.
### Returns:
Returns:
LoadInfo: Information on loaded data including the list of package ids and failed job statuses. Please not that `dlt` will not raise if a single job terminally fails. Such information is provided via LoadInfo.
"""
signals.raise_if_signalled()
Expand Down Expand Up @@ -475,7 +475,7 @@ def run(
def sync_destination(self, destination: TDestinationReferenceArg = None, staging: TDestinationReferenceArg = None, dataset_name: str = None) -> None:
"""Synchronizes pipeline state with the `destination`'s state kept in `dataset_name`
### Summary
#### Note:
Attempts to restore pipeline state and schemas from the destination. Requires the state that is present at the destination to have a higher version number that state kept locally in working directory.
In such a situation the local state, schemas and intermediate files with the data will be deleted and replaced with the state and schema present in the destination.
Expand Down
6 changes: 3 additions & 3 deletions dlt/sources/helpers/requests/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,11 @@ def _make_retry(
class Client:
"""Wrapper for `requests` to create a `Session` with configurable retry functionality.
### Summary
#### Note:
Create a `requests.Session` which automatically retries requests in case of error.
By default retries are triggered for `5xx` and `429` status codes and when the server is unreachable or drops connection.
### Custom retry condition
#### Custom retry condition
You can provide one or more custom predicates for specific retry condition. The predicate is called after every request with the resulting response and/or exception.
For example, this will trigger a retry when the response text is `error`:
Expand All @@ -134,7 +134,7 @@ class Client:
The retry is triggered when either any of the predicates or the default conditions based on status code/exception are `True`.
### Args:
#### Args:
request_timeout: Timeout for requests in seconds. May be passed as `timedelta` or `float/int` number of seconds.
max_connections: Max connections per host in the HTTPAdapter pool
raise_for_status: Whether to raise exception on error status codes (using `response.raise_for_status()`)
Expand Down
Loading

0 comments on commit 682d9b3

Please sign in to comment.