Skip to content

Commit

Permalink
some work on the transformer example
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Sep 27, 2023
1 parent 5ff9649 commit 4301dc0
Show file tree
Hide file tree
Showing 14 changed files with 203 additions and 173 deletions.
3 changes: 3 additions & 0 deletions docs/examples/transformers/.dlt/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

[runtime]
log_level="WARNING"
File renamed without changes.
56 changes: 56 additions & 0 deletions docs/examples/transformers/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from typing import Sequence, Iterable
import dlt
from dlt.common.typing import TDataItem
from dlt.extract.source import DltResource
from dlt.sources.helpers import requests

# constants
POKEMON_URL = "https://pokeapi.co/api/v2/pokemon"

# retrieve pokemon list
@dlt.resource(write_disposition="replace")
def pokemon_list() -> Iterable[TDataItem]:
"""
Returns an iterator of pokemon
Yields:
dict: The pokemon list data.
"""
yield from requests.get(POKEMON_URL).json()["results"]

# asynchronously retrieve details for each pokemon in the list
@dlt.transformer(data_from=pokemon_list)
async def pokemon(pokemon: TDataItem):
"""
Returns an iterator of pokemon deatils
Yields:
dict: The pokemon full data.
"""
# just return the results, if you yield,
# generator will be evaluated in main thread
return requests.get(pokemon["url"]).json()


# asynchronously retrieve details for the species of each pokemon
@dlt.transformer(data_from=pokemon)
async def species(pokemon: TDataItem):
"""
Returns an iterator of species details for each pokemon
Yields:
dict: The species full data.
"""
# just return the results, if you yield,
# generator will be evaluated in main thread
species_data = requests.get(pokemon["species"]["url"]).json()
# optionally add pokemon_id to result json
species_data["pokemon_id"] = pokemon["id"]
return species_data


# build duck db pipeline
pipeline = dlt.pipeline(
pipeline_name="pokemon", destination="duckdb", dataset_name="pokemon_data"
)

# the pokemon_list resource does not need to be loaded
load_info = pipeline.run([pokemon(), species()])
print(load_info)
5 changes: 0 additions & 5 deletions docs/examples/transformers_and_parallelism/.dlt/config.toml

This file was deleted.

46 changes: 0 additions & 46 deletions docs/examples/transformers_and_parallelism/run.py

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# @@@DLT_SNIPPET_START example
# @@@DLT_SNIPPET_START toml
pokemon_url="https://pokeapi.co/api/v2/pokemon"
berry_url="https://pokeapi.co/api/v2/berry"
# @@@DLT_SNIPPET_END toml

[runtime]
Expand Down
67 changes: 67 additions & 0 deletions docs/website/docs/examples/transformers/code/run-snippets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@

def transformers_snippet() -> None:

# @@@DLT_SNIPPET_START example
from typing import Sequence, Iterable
import dlt
from dlt.common.typing import TDataItem
from dlt.extract.source import DltResource
from dlt.sources.helpers import requests

# constants
POKEMON_URL = "https://pokeapi.co/api/v2/pokemon"

# retrieve pokemon list
@dlt.resource(write_disposition="replace")
def pokemon_list() -> Iterable[TDataItem]:
"""
Returns an iterator of pokemon
Yields:
dict: The pokemon list data.
"""
yield from requests.get(POKEMON_URL).json()["results"]

# asynchronously retrieve details for each pokemon in the list
@dlt.transformer(data_from=pokemon_list)
async def pokemon(pokemon: TDataItem):
"""
Returns an iterator of pokemon deatils
Yields:
dict: The pokemon full data.
"""
# just return the results, if you yield,
# generator will be evaluated in main thread
return requests.get(pokemon["url"]).json()


# asynchronously retrieve details for the species of each pokemon
@dlt.transformer(data_from=pokemon)
async def species(pokemon: TDataItem):
"""
Returns an iterator of species details for each pokemon
Yields:
dict: The species full data.
"""
# just return the results, if you yield,
# generator will be evaluated in main thread
species_data = requests.get(pokemon["species"]["url"]).json()
# optionally add pokemon_id to result json
species_data["pokemon_id"] = pokemon["id"]
return species_data


# build duck db pipeline
pipeline = dlt.pipeline(
pipeline_name="pokemon", destination="duckdb", dataset_name="pokemon_data"
)

# the pokemon_list resource does not need to be loaded
load_info = pipeline.run([pokemon(), species()])
print(load_info)
# @@@DLT_SNIPPET_END example

# test assertions
row_counts = pipeline.last_trace.last_normalize_info.row_counts
assert row_counts["pokemon"] == 20
assert row_counts["species"] == 20
assert "pokemon_list" not in row_counts
76 changes: 76 additions & 0 deletions docs/website/docs/examples/transformers/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
---
title: Enriching loaded data with transformers
description: Learn how to use dlt transformers and how to speed up your loads with parallelism
keywords: [transformers, parallelism, example]
---

import Header from '../_examples-header.md';

<Header
intro="In this tutorial you will learn how load a list of pokemone from the pokeapi and with the help of dlt transformers
automatically query additional data per retrieved pokemon. You will also learn how to harness parallelism with futures."
slug="transformer"
title="Enriching loaded data with transformers" />


## Use transformers
<!--@@@DLT_SNIPPET_START ./code/run-snippets.py::example-->
```py
from typing import Sequence, Iterable
import dlt
from dlt.common.typing import TDataItem
from dlt.extract.source import DltResource
from dlt.sources.helpers import requests

# constants
POKEMON_URL = "https://pokeapi.co/api/v2/pokemon"

# retrieve pokemon list
@dlt.resource(write_disposition="replace")
def pokemon_list() -> Iterable[TDataItem]:
"""
Returns an iterator of pokemon
Yields:
dict: The pokemon list data.
"""
yield from requests.get(POKEMON_URL).json()["results"]

# asynchronously retrieve details for each pokemon in the list
@dlt.transformer(data_from=pokemon_list)
async def pokemon(pokemon: TDataItem):
"""
Returns an iterator of pokemon deatils
Yields:
dict: The pokemon full data.
"""
# just return the results, if you yield,
# generator will be evaluated in main thread
return requests.get(pokemon["url"]).json()


# asynchronously retrieve details for the species of each pokemon
@dlt.transformer(data_from=pokemon)
async def species(pokemon: TDataItem):
"""
Returns an iterator of species details for each pokemon
Yields:
dict: The species full data.
"""
# just return the results, if you yield,
# generator will be evaluated in main thread
species_data = requests.get(pokemon["species"]["url"]).json()
# optionally add pokemon_id to result json
species_data["pokemon_id"] = pokemon["id"]
return species_data


# build duck db pipeline
pipeline = dlt.pipeline(
pipeline_name="pokemon", destination="duckdb", dataset_name="pokemon_data"
)

# the pokemon_list resource does not need to be loaded
load_info = pipeline.run([pokemon(), species()])
print(load_info)
```
<!--@@@DLT_SNIPPET_END ./code/run-snippets.py::example-->

This file was deleted.

57 changes: 0 additions & 57 deletions docs/website/docs/examples/transformers_and_parallelism/index.md

This file was deleted.

7 changes: 0 additions & 7 deletions docs/website/docusaurus.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,6 @@ const config = {
label: 'Docs',
},
{ to: 'blog', label: 'Blog', position: 'left' },
{
type: 'doc',
docId: 'examples',
label: 'Code Examples',
position:'right',
className: 'examples-link',
},
{
href: 'https://join.slack.com/t/dlthub-community/shared_invite/zt-1slox199h-HAE7EQoXmstkP_bTqal65g',
label: '.',
Expand Down
Loading

0 comments on commit 4301dc0

Please sign in to comment.