Skip to content

Commit

Permalink
adds basic docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Dec 11, 2022
1 parent 8ab75c6 commit 54f0801
Show file tree
Hide file tree
Showing 13 changed files with 291 additions and 32 deletions.
2 changes: 1 addition & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ max_line_length = 200
# Use tabs for indentation (Makefiles require tabs)
indent_style = tab

[*.{yaml,yml,js}]
[*.{yaml,yml,js,md}]
indent_size = 2
21 changes: 10 additions & 11 deletions dlt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,21 @@
1. Write a pipeline script
>>> import dlt
>>> dlt.run(source=my_complicated_json, destination="duckdb")
>>> import requests
>>> dlt.run(requests.get("https://api.chess.com/pub/player/magnuscarlsen/games/2022/11").json()["games"], destination="bigquery", table_name="magnus_games")
2. Run your pipeline script
$ python my_pipeline.py
2. Run your pipeline script (no BigQuery default credentials? go here: https://dlthub.com/docs/destinations#google-bigquery)
$ python magnus_games.py
3. See and use your data
$ dlt pipeline show my_pipeline.py
3. See and query your data with autogenerated Streamlit app
$ dlt pipeline dlt_magnus_games show
This will auto-generate and run a Streamlit app where you can see the data and the schema
Or start with our pipeline template with sample chess.com data to bigquery
Or start with our pipeline template with sample chess.com data loaded to bigquery
$ dlt init chess bigquery
For more detailed info, see https://dlthub.com/docs
For more detailed info, see https://dlthub.com/docs/getting-started
"""

from dlt.version import __version__
Expand All @@ -35,9 +34,9 @@
pipeline = _pipeline

TSecretValue = _TSecretValue
"When typing source/resource function arguments indicates that given argument is a secret and should be taken from dlt.secrets. The value itself is a string"
"When typing source/resource function arguments it indicates that a given argument is a secret and should be taken from dlt.secrets."

TCredentials = _CredentialsConfiguration
"When typing source/resource function arguments indicates that given argument represents credentials and should be taken from dlt.secrets. Credentials may be string, dictionaries or any other types."
"When typing source/resource function arguments it indicates that a given argument represents credentials and should be taken from dlt.secrets. Credentials may be a string, dictionary or any other type."


4 changes: 2 additions & 2 deletions dlt/common/configuration/accessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def default_type(self) -> AnyType:


config = _ConfigAccessor()
"""Dictionary-like access to all secrets known to dlt"""
"""Dictionary-like access to all config values to dlt"""

secrets = _SecretsAccessor()
"""Dictionary-like access to all config values known to dlt"""
"""Dictionary-like access to all secrets known known to dlt"""
2 changes: 1 addition & 1 deletion dlt/common/configuration/specs/base_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def parse_native_representation(self, native_value: Any) -> None:
"""Initialize the configuration fields by parsing the `native_value` which should be a native representation of the configuration
or credentials, for example database connection string or JSON serialized GCP service credentials file.
Args:
### Args:
native_value (Any): A native representation of the configuration
Raises:
Expand Down
10 changes: 7 additions & 3 deletions dlt/common/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,17 @@ class TPipelineState(TypedDict, total=False):
class SupportsPipeline(Protocol):
"""A protocol with core pipeline operations that lets high level abstractions ie. sources to access pipeline methods and properties"""
pipeline_name: str
"""Name of the pipeline"""
destination: DestinationReference
"""The destination reference which is ModuleType. `destination.__name__` returns the name string"""
dataset_name: str = None
"""Name of the dataset to which pipeline will be loaded to"""
runtime_config: RunConfiguration
"""A configuration of runtime options like logging level and format and various tracing options"""

@property
def state(self) -> TPipelineState:
...
"""Returns dictionary with pipeline state"""

def run(
self,
Expand Down Expand Up @@ -139,8 +143,8 @@ def __init__(self, deferred_pipeline: Callable[..., SupportsPipeline]) -> None:
self._deferred_pipeline = deferred_pipeline


def get_default_working_dir() -> str:
""" Gets default working dir of the pipeline, which may be
def get_default_pipelines_dir() -> str:
""" Gets default directory where pipelines' data will be stored
1. in user home directory ~/.dlt/pipelines/
2. if current user is root in /var/dlt/pipelines
3. if current user does not have a home directory in /tmp/dlt/pipelines
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/schema/detections.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@


_NOW_TS: float = pendulum.now().timestamp()
_FLOAT_TS_RANGE = 31536000.0 # seconds in year
_FLOAT_TS_RANGE = 5 * 31536000.0 # seconds in year


def is_timestamp(t: Type[Any], v: Any) -> Optional[TDataType]:
Expand Down
93 changes: 93 additions & 0 deletions dlt/extract/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,36 @@ def source(func: None = ..., /, name: str = None, max_table_nesting: int = None,
...

def source(func: Optional[AnyFun] = None, /, name: str = None, max_table_nesting: int = None, schema: Schema = None, spec: Type[BaseConfiguration] = None) -> Any:
"""A decorator that transforms a function returning one or more `dlt resources` into a `dlt source` in order to load it with `dlt`.
### Summary
A `dlt source` is a logical grouping of resources that are often extracted and loaded together. A source is associated with a schema, which describes the structure of the loaded data and provides instructions how to load it.
Such schema contains table schemas that describe the structure of the data coming from the resources. See https://dlthub.com/docs/glossary for more basic term definitions.
### Passing credentials
Another important function of the source decorator is to provide credentials and other configuration to the code that extracts data. The decorator may automatically bind the source function arguments to the secret and config values.
>>> def chess(username, chess_url: str = dlt.config.value, api_secret = dlt.secret.value, title: str = "GM"):
>>> return user_profile(username, chess_url, api_secret), user_games(username, chess_url, api_secret, with_titles=title)
>>>
>>> list(chess("magnuscarlsen"))
Here `username` is a required, explicit python argument, `chess_url` is a required argument, that if not explicitly passed will be taken from configuration ie. `config.toml`, `api_secret` is a required argument, that if not explicitly passed will be taken from dlt secrets ie. `secrets.toml`.
See https://dlthub.com/docs/customization/credentials for details.
### Args:
func: A function that returns a dlt resource or a list of those or a list of any data items that can be loaded by `dlt`.
name (str, optional): A name of the source which is also the name of the associated schema. If not present, the function name will be used.
max_table_nesting (int, optional): A schema hint that sets the maximum depth of nested table beyond which the remaining nodes are loaded as string.
schema (Schema, optional): An explicit `Schema` instance to be associated with the source. If not present, `dlt` creates a new `Schema` object with provided `name`. If such `Schema` already exists in the same folder as the module containing the decorated function, such schema will be loaded from file.
spec (Type[BaseConfiguration], optional): A specification of configuration and secret values required by the source.
Returns:
`DltSource` instance
"""

if name and schema:
raise ArgumentsOverloadException("'name' has no effect when `schema` argument is present", source.__name__)
Expand Down Expand Up @@ -160,7 +190,52 @@ def resource(
spec: Type[BaseConfiguration] = None,
depends_on: TUnboundDltResource = None
) -> Any:
"""When used as a decorator, transforms any generator (yielding) function into a `dlt resource`. When used as a function, it transforms data in `data` argument into a `dlt resource`.
### Summary
A `resource`is a location within a `source` that holds the data with specific structure (schema) or coming from specific origin. A resource may be a rest API endpoint, table in the database or a tab in Google Sheets.
A `dlt resource` is python representation of a `resource` that combines both data and metadata (table schema) that describes the structure and instructs the loading of the data.
A `dlt resource` is also an `Iterable` and can used like any other similar object ie. list or tuple. See https://dlthub.com/docs/glossary for more on basic term definitions.
### Passing credentials
If used as a decorator (`data` argument is a `Generator`), it may automatically bind the source function arguments to the secret and config values.
>>> def user_games(username, chess_url: str = dlt.config.value, api_secret = dlt.secret.value):
>>> return requests.get("%s/games/%s" % (chess_url, username), headers={"Authorization": f"Bearer {api_secret}"})
>>>
>>> list(user_games("magnuscarlsen"))
Here `username` is a required, explicit python argument, `chess_url` is a required argument, that if not explicitly passed will be taken from configuration ie. `config.toml`, `api_secret` is a required argument, that if not explicitly passed will be taken from dlt secrets ie. `secrets.toml`.
See https://dlthub.com/docs/customization/credentials for details.
Note that if decorated function is an inner function, passing of the credentials will be disabled.
### Args:
data (Callable | Any, optional): a function to be decorated or a data compatible with `dlt` `run`.
name (str, optional): A name of the resource that by default also becomes the name of the table to which the data is loaded.
If not present, the name of the decorated function will be used.
table_name (TTableHintTemplate[str], optional): An table name, if different from `name`.
This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes.
write_disposition (Literal["skip", "append", "replace"], optional): Controls how to write data to a table. `append` will always add new data at the end of the table. `replace` will replace existing data with new data. `skip` will prevent data from loading. . Defaults to "append".
This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes.
columns (Sequence[TColumnSchema], optional): A list of column schemas. Typed dictionary describing column names, data types, write disposition and performance hints that gives you full control over the created table schema.
This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes.
selected (bool, optional): When `True` `dlt pipeline` will extract and load this resource, if `False`, the resource will be ignored.
spec (Type[BaseConfiguration], optional): A specification of configuration and secret values required by the source.
depends_on (TUnboundDltResource, optional): Allows to pipe data from one resource to another to build multi-step pipelines.
### Raises
ResourceNameMissing: indicates that name of the resource cannot be inferred from the `data` being passed.
InvalidResourceDataType: indicates that the `data` argument cannot be converted into `dlt resource`
Returns:
DltResource instance which may be loaded, iterated or combined with other resources into a pipeline.
"""
def make_resource(_name: str, _data: Any) -> DltResource:
table_template = DltResource.new_table_template(table_name or _name, write_disposition=write_disposition, columns=columns)
return DltResource.from_data(_data, _name, table_template, selected, cast(DltResource, depends_on))
Expand Down Expand Up @@ -214,6 +289,24 @@ def transformer(
selected: bool = True,
spec: Type[BaseConfiguration] = None
) -> Callable[[Callable[Concatenate[TDataItem, TResourceFunParams], Any]], Callable[TResourceFunParams, DltResource]]:
"""A form of `dlt resource` that takes input from other resources in order to enrich or transformer the data.
### Example
>>> @dlt.resource
>>> def players(title, chess_url=dlt.config.value):
>>> r = requests.get(f"{chess_url}titled/{title}")
>>> yield r.json()["players"] # returns list of player names
>>>
>>> # this resource takes data from players and returns profiles
>>> @dlt.transformer(data_from=players, write_disposition="replace")
>>> def player_profile(player: Any) -> Iterator[TDataItems]:
>>> r = requests.get(f"{chess_url}player/{player}")
>>> r.raise_for_status()
>>> yield r.json()
>>>
>>> list(players("GM") | player_profile) # pipes the data from players into player profile to produce a list of player profiles
"""
f: AnyFun = None
# if data_from is a function we are called without parens
if inspect.isfunction(data_from):
Expand Down
17 changes: 16 additions & 1 deletion dlt/extract/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ def add_pipe(self, data: Any) -> None:
raise InvalidResourceDataTypeMultiplePipes(self.name, data, type(data))

def select_tables(self, *table_names: Iterable[str]) -> "DltResource":

def _filter(item: TDataItem, meta: Any = None) -> bool:
is_in_meta = isinstance(meta, TableNameMeta) and meta.table_name in table_names
is_in_dyn = self._table_name_hint_fun and self._table_name_hint_fun(item) in table_names
Expand Down Expand Up @@ -397,6 +396,17 @@ def __delitem__(self, resource_name: str) -> None:


class DltSource(Iterable[TDataItem]):
"""Groups several `dlt resources` under a single schema and allows to perform operations on them.
### Summary
The instance of this class is created whenever you call the `dlt.source` decorated function. It automates several functions for you:
* You can pass this instance to `dlt` `run` method in order to load all data present in the `dlt resources`.
* You can select and deselect resources that you want to load via `with_resources` method
* You can access the resources (which are `DltResource` instances) as source attributes
* It implements `Iterable` interface so you can get all the data from the resources yourself and without dlt pipeline present.
* You can get the `schema` for the source and all the resources within it.
* You can use a `run` method to load the data with a default instance of dlt pipeline.
"""
def __init__(self, name: str, schema: Schema, resources: Sequence[DltResource] = None) -> None:
self.name = name
self.exhausted = False
Expand All @@ -408,6 +418,7 @@ def __init__(self, name: str, schema: Schema, resources: Sequence[DltResource] =

@classmethod
def from_data(cls, name: str, schema: Schema, data: Any) -> "DltSource":
"""Converts any `data` supported by `dlt` `run` method into `dlt source` with a name `name` and `schema` schema"""
# creates source from various forms of data
if isinstance(data, DltSource):
return data
Expand All @@ -423,10 +434,12 @@ def from_data(cls, name: str, schema: Schema, data: Any) -> "DltSource":

@property
def resources(self) -> DltResourceDict:
"""A dictionary of all resources present in the source, where the key is a resource name."""
return self._resources

@property
def selected_resources(self) -> Dict[str, DltResource]:
"""A dictionary of all the resources that are selected to be loaded."""
return self._resources.selected

@property
Expand All @@ -447,11 +460,13 @@ def discover_schema(self) -> Schema:
return self._schema

def with_resources(self, *resource_names: str) -> "DltSource":
"""A convenience method to select one of more resources to be loaded. Returns a source with the specified resources selected."""
self._resources.select(*resource_names)
return self

@property
def run(self) -> SupportsPipelineRun:
"""A convenience method that will call `run` run on the currently active `dlt` pipeline. If pipeline instance is not found, one with default settings will be created."""
self_run: SupportsPipelineRun = makefun.partial(Container()[PipelineContext].pipeline().run, *(), data=self)
return self_run

Expand Down
1 change: 1 addition & 0 deletions dlt/normalize/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def _w_normalize_chunk(load_storage: LoadStorage, schema: Schema, load_id: str,
columns = schema.get_table_columns(table_name)
column_schemas[table_name] = columns
# store row
# TODO: it is possible to write to single file from many processes using this: https://gitlab.com/warsaw/flufl.lock
load_storage.write_data_item(load_id, schema_name, table_name, row, columns)
# count total items
items_count += 1
Expand Down
Loading

0 comments on commit 54f0801

Please sign in to comment.