Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[docs] added api reference, test run #620

Closed
wants to merge 10 commits into from
99 changes: 71 additions & 28 deletions dlt/pipeline/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
from typing import Sequence, cast, overload

from dlt.common.schema import Schema
from dlt.common.schema.typing import TColumnSchema, TWriteDisposition

from dlt.common.typing import TSecretValue, Any
from dlt.common.configuration import with_config
from dlt.common.configuration.container import Container
from dlt.common.configuration.inject import get_orig_args, last_config
from dlt.common.destination.reference import DestinationReference, TDestinationReferenceArg
from dlt.common.pipeline import LoadInfo, PipelineContext, get_dlt_pipelines_dir
from dlt.common.data_writers import TLoaderFileFormat

from dlt.common.schema import Schema
from dlt.common.schema.typing import TColumnSchema, TWriteDisposition
from dlt.common.typing import TSecretValue, Any
from dlt.pipeline.configuration import PipelineConfiguration, ensure_correct_pipeline_kwargs
from dlt.pipeline.pipeline import Pipeline
from dlt.pipeline.progress import _from_name as collector_from_name, TCollectorArg, _NULL_COLLECTOR
Expand All @@ -32,16 +29,16 @@ def pipeline(
) -> Pipeline:
"""Creates a new instance of `dlt` pipeline, which moves the data from the source ie. a REST API to a destination ie. database or a data lake.

### Summary
The `pipeline` functions allows you to pass the destination name to which the data should be loaded, the name of the dataset and several other options that govern loading of the data.
The created `Pipeline` object lets you load the data from any source with `run` method or to have more granular control over the loading process with `extract`, `normalize` and `load` methods.
#### Summary
The `pipeline` functions allows you to pass the destination name to which the data should be loaded, the name of the dataset and several other options that govern loading of the data.
The created `Pipeline` object lets you load the data from any source with `run` method or to have more granular control over the loading process with `extract`, `normalize` and `load` methods.

Please refer to the following doc pages
- Write your first pipeline walkthrough: https://dlthub.com/docs/walkthroughs/create-a-pipeline
- Pipeline architecture and data loading steps: https://dlthub.com/docs/reference
- List of supported destinations: https://dlthub.com/docs/dlt-ecosystem/destinations
Please refer to the following doc pages
- Write your first pipeline walkthrough: https://dlthub.com/docs/walkthroughs/create-a-pipeline
- Pipeline architecture and data loading steps: https://dlthub.com/docs/reference
- List of supported destinations: https://dlthub.com/docs/dlt-ecosystem/destinations

### Args:
#### Args:
pipeline_name (str, optional): A name of the pipeline that will be used to identify it in monitoring events and to restore its state and data schemas on subsequent runs.
Defaults to the file name of pipeline script with `dlt_` prefix added.

Expand Down Expand Up @@ -73,7 +70,7 @@ def pipeline(
`extract`, `normalize` and `load` stage. Pass a string with a collector name or configure your own by choosing from `dlt.progress` module.
We support most of the progress libraries: try passing `tqdm`, `enlighten` or `alive_progress` or `log` to write to console/log.

### Returns:
Returns:
Pipeline: An instance of `Pipeline` class with. Please check the documentation of `run` method for information on what to do with it.
"""

Expand All @@ -99,6 +96,52 @@ def pipeline(
progress: TCollectorArg = _NULL_COLLECTOR,
**kwargs: Any
) -> Pipeline:
"""Creates a new instance of `dlt` pipeline, which moves the data from the source ie. a REST API to a destination ie. database or a data lake.

Summary
The `pipeline` functions allows you to pass the destination name to which the data should be loaded, the name of the dataset and several other options that govern loading of the data.
The created `Pipeline` object lets you load the data from any source with `run` method or to have more granular control over the loading process with `extract`, `normalize` and `load` methods.

Please refer to the following doc pages
- Write your first pipeline walkthrough: https://dlthub.com/docs/walkthroughs/create-a-pipeline
- Pipeline architecture and data loading steps: https://dlthub.com/docs/reference
- List of supported destinations: https://dlthub.com/docs/dlt-ecosystem/destinations

Args:
pipeline_name (str, optional): A name of the pipeline that will be used to identify it in monitoring events and to restore its state and data schemas on subsequent runs.
Defaults to the file name of pipeline script with `dlt_` prefix added.

pipelines_dir (str, optional): A working directory in which pipeline state and temporary files will be stored. Defaults to user home directory: `~/dlt/pipelines/`.

pipeline_salt (TSecretValue, optional): A random value used for deterministic hashing during data anonymization. Defaults to a value derived from the pipeline name.
Default value should not be used for any cryptographic purposes.

destination (str | DestinationReference, optional): A name of the destination to which dlt will load the data, or a destination module imported from `dlt.destination`.
May also be provided to `run` method of the `pipeline`.

staging (str | DestinationReference, optional): A name of the destination where dlt will stage the data before final loading, or a destination module imported from `dlt.destination`.
May also be provided to `run` method of the `pipeline`.

dataset_name (str, optional): A name of the dataset to which the data will be loaded. A dataset is a logical group of tables ie. `schema` in relational databases or folder grouping many files.
May also be provided later to the `run` or `load` methods of the `Pipeline`. If not provided at all then defaults to the `pipeline_name`

import_schema_path (str, optional): A path from which the schema `yaml` file will be imported on each pipeline run. Defaults to None which disables importing.

export_schema_path (str, optional): A path where the schema `yaml` file will be exported after every schema change. Defaults to None which disables exporting.

full_refresh (bool, optional): When set to True, each instance of the pipeline with the `pipeline_name` starts from scratch when run and loads the data to a separate dataset.
The datasets are identified by `dataset_name_` + datetime suffix. Use this setting whenever you experiment with your data to be sure you start fresh on each run. Defaults to False.

credentials (Any, optional): Credentials for the `destination` ie. database connection string or a dictionary with google cloud credentials.
In most cases should be set to None, which lets `dlt` to use `secrets.toml` or environment variables to infer right credentials values.

progress(str, Collector): A progress monitor that shows progress bars, console or log messages with current information on sources, resources, data items etc. processed in
`extract`, `normalize` and `load` stage. Pass a string with a collector name or configure your own by choosing from `dlt.progress` module.
We support most of the progress libraries: try passing `tqdm`, `enlighten` or `alive_progress` or `log` to write to console/log.

Returns:
Pipeline: An instance of `Pipeline` class with. Please check the documentation of `run` method for information on what to do with it.
"""
ensure_correct_pipeline_kwargs(pipeline, **kwargs)
# call without arguments returns current pipeline
orig_args = get_orig_args(**kwargs) # original (*args, **kwargs)
Expand Down Expand Up @@ -179,23 +222,23 @@ def run(
) -> LoadInfo:
"""Loads the data in `data` argument into the destination specified in `destination` and dataset specified in `dataset_name`.

### Summary
This method will `extract` the data from the `data` argument, infer the schema, `normalize` the data into a load package (ie. jsonl or PARQUET files representing tables) and then `load` such packages into the `destination`.
#### Summary
This method will `extract` the data from the `data` argument, infer the schema, `normalize` the data into a load package (ie. jsonl or PARQUET files representing tables) and then `load` such packages into the `destination`.

The data may be supplied in several forms:
* a `list` or `Iterable` of any JSON-serializable objects ie. `dlt.run([1, 2, 3], table_name="numbers")`
* any `Iterator` or a function that yield (`Generator`) ie. `dlt.run(range(1, 10), table_name="range")`
* a function or a list of functions decorated with @dlt.resource ie. `dlt.run([chess_players(title="GM"), chess_games()])`
* a function or a list of functions decorated with @dlt.source.
* a `list` or `Iterable` of any JSON-serializable objects ie. `dlt.run([1, 2, 3], table_name="numbers")`
* any `Iterator` or a function that yield (`Generator`) ie. `dlt.run(range(1, 10), table_name="range")`
* a function or a list of functions decorated with @dlt.resource ie. `dlt.run([chess_players(title="GM"), chess_games()])`
* a function or a list of functions decorated with @dlt.source.

Please note that `dlt` deals with `bytes`, `datetime`, `decimal` and `uuid` objects so you are free to load binary data or documents containing dates.

### Execution
The `run` method will first use `sync_destination` method to synchronize pipeline state and schemas with the destination. You can disable this behavior with `restore_from_destination` configuration option.
Next it will make sure that data from the previous is fully processed. If not, `run` method normalizes and loads pending data items.
Only then the new data from `data` argument is extracted, normalized and loaded.
#### Execution
The `run` method will first use `sync_destination` method to synchronize pipeline state and schemas with the destination. You can disable this behavior with `restore_from_destination` configuration option.
Next it will make sure that data from the previous is fully processed. If not, `run` method normalizes and loads pending data items.
Only then the new data from `data` argument is extracted, normalized and loaded.

### Args:
#### Args:
data (Any): Data to be loaded to destination

destination (str | DestinationReference, optional): A name of the destination to which dlt will load the data, or a destination module imported from `dlt.destination`.
Expand All @@ -220,9 +263,9 @@ def run(

schema (Schema, optional): An explicit `Schema` object in which all table schemas will be grouped. By default `dlt` takes the schema from the source (if passed in `data` argument) or creates a default one itself.

### Raises:
Raises:
PipelineStepFailed when a problem happened during `extract`, `normalize` or `load` steps.
### Returns:
Returns:
LoadInfo: Information on loaded data including the list of package ids and failed job statuses. Please not that `dlt` will not raise if a single job terminally fails. Such information is provided via LoadInfo.
"""
destination = DestinationReference.from_name(destination)
Expand Down
Loading