Skip to content

Commit

Permalink
unrelated formatting fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Jan 29, 2024
1 parent 9446e29 commit d830086
Show file tree
Hide file tree
Showing 15 changed files with 76 additions and 84 deletions.
24 changes: 13 additions & 11 deletions docs/examples/chess_production/chess.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from dlt.common.typing import StrAny, TDataItems
from dlt.sources.helpers.requests import client


@dlt.source
def chess(
chess_url: str = dlt.config.value,
Expand Down Expand Up @@ -60,7 +59,6 @@ def players_games(username: Any) -> Iterator[TDataItems]:

MAX_PLAYERS = 5


def load_data_with_retry(pipeline, data):
try:
for attempt in Retrying(
Expand All @@ -70,16 +68,18 @@ def load_data_with_retry(pipeline, data):
reraise=True,
):
with attempt:
logger.info(f"Running the pipeline, attempt={attempt.retry_state.attempt_number}")
logger.info(
f"Running the pipeline, attempt={attempt.retry_state.attempt_number}"
)
load_info = pipeline.run(data)
logger.info(str(load_info))

# raise on failed jobs
load_info.raise_on_failed_jobs()
# send notification
send_slack_message(
pipeline.runtime_config.slack_incoming_hook, "Data was successfully loaded!"
)
# raise on failed jobs
load_info.raise_on_failed_jobs()
# send notification
send_slack_message(
pipeline.runtime_config.slack_incoming_hook, "Data was successfully loaded!"
)
except Exception:
# we get here after all the failed retries
# send notification
Expand All @@ -92,7 +92,9 @@ def load_data_with_retry(pipeline, data):
# print the information on the first load package and all jobs inside
logger.info(f"First load package info: {load_info.load_packages[0]}")
# print the information on the first completed job in first load package
logger.info(f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}")
logger.info(
f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}"
)

# check for schema updates:
schema_updates = [p.schema_update for p in load_info.load_packages]
Expand Down Expand Up @@ -150,4 +152,4 @@ def load_data_with_retry(pipeline, data):
)
# get data for a few famous players
data = chess(chess_url="https://api.chess.com/pub/", max_players=MAX_PLAYERS)
load_data_with_retry(pipeline, data)
load_data_with_retry(pipeline, data)
10 changes: 4 additions & 6 deletions docs/examples/connector_x_arrow/load_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import dlt
from dlt.sources.credentials import ConnectionStringCredentials


def read_sql_x(
conn_str: ConnectionStringCredentials = dlt.secrets.value,
query: str = dlt.config.value,
Expand All @@ -15,17 +14,16 @@ def read_sql_x(
protocol="binary",
)


def genome_resource():
# create genome resource with merge on `upid` primary key
genome = dlt.resource(
name="genome",
name="acanthochromis_polyacanthus",
write_disposition="merge",
primary_key="upid",
primary_key="analysis_id",
standalone=True,
)(read_sql_x)(
"mysql://[email protected]:4497/Rfam", # type: ignore[arg-type]
"SELECT * FROM genome ORDER BY created LIMIT 1000",
"mysql://[email protected]:3306/acanthochromis_polyacanthus_core_100_1", # type: ignore[arg-type]
"SELECT * FROM analysis LIMIT 20",
)
# add incremental on created at
genome.apply_hints(incremental=dlt.sources.incremental("created"))
Expand Down
5 changes: 1 addition & 4 deletions docs/examples/google_sheets/google_sheets.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@
)
from dlt.common.typing import DictStrAny, StrAny


def _initialize_sheets(
credentials: Union[GcpOAuthCredentials, GcpServiceAccountCredentials]
) -> Any:
# Build the service object.
service = build("sheets", "v4", credentials=credentials.to_native_credentials())
return service


@dlt.source
def google_spreadsheet(
spreadsheet_id: str,
Expand Down Expand Up @@ -57,7 +55,6 @@ def get_sheet(sheet_name: str) -> Iterator[DictStrAny]:
for name in sheet_names
]


if __name__ == "__main__":
pipeline = dlt.pipeline(destination="duckdb")
# see example.secrets.toml to where to put credentials
Expand All @@ -70,4 +67,4 @@ def get_sheet(sheet_name: str) -> Iterator[DictStrAny]:
sheet_names=range_names,
)
)
print(info)
print(info)
8 changes: 4 additions & 4 deletions docs/examples/incremental_loading/zendesk.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
from dlt.common.typing import TAnyDateTime
from dlt.sources.helpers.requests import client


@dlt.source(max_table_nesting=2)
def zendesk_support(
credentials: Dict[str, str] = dlt.secrets.value,
start_date: Optional[TAnyDateTime] = pendulum.datetime(year=2000, month=1, day=1), # noqa: B008
start_date: Optional[TAnyDateTime] = pendulum.datetime( # noqa: B008
year=2000, month=1, day=1
),
end_date: Optional[TAnyDateTime] = None,
):
"""
Expand Down Expand Up @@ -112,12 +113,11 @@ def get_pages(
if not response_json["end_of_stream"]:
get_url = response_json["next_page"]


if __name__ == "__main__":
# create dlt pipeline
pipeline = dlt.pipeline(
pipeline_name="zendesk", destination="duckdb", dataset_name="zendesk_data"
)

load_info = pipeline.run(zendesk_support())
print(load_info)
print(load_info)
2 changes: 0 additions & 2 deletions docs/examples/nested_data/nested_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

CHUNK_SIZE = 10000


# You can limit how deep dlt goes when generating child tables.
# By default, the library will descend and generate child tables
# for all nested lists, without a limit.
Expand Down Expand Up @@ -82,7 +81,6 @@ def load_documents(self) -> Iterator[TDataItem]:
while docs_slice := list(islice(cursor, CHUNK_SIZE)):
yield map_nested_in_place(convert_mongo_objs, docs_slice)


def convert_mongo_objs(value: Any) -> Any:
if isinstance(value, (ObjectId, Decimal128)):
return str(value)
Expand Down
11 changes: 6 additions & 5 deletions docs/examples/pdf_to_weaviate/pdf_to_weaviate.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@
from dlt.destinations.impl.weaviate import weaviate_adapter
from PyPDF2 import PdfReader


@dlt.resource(selected=False)
def list_files(folder_path: str):
folder_path = os.path.abspath(folder_path)
for filename in os.listdir(folder_path):
file_path = os.path.join(folder_path, filename)
yield {"file_name": filename, "file_path": file_path, "mtime": os.path.getmtime(file_path)}

yield {
"file_name": filename,
"file_path": file_path,
"mtime": os.path.getmtime(file_path),
}

@dlt.transformer(primary_key="page_id", write_disposition="merge")
def pdf_to_text(file_item, separate_pages: bool = False):
Expand All @@ -26,7 +28,6 @@ def pdf_to_text(file_item, separate_pages: bool = False):
page_item["page_id"] = file_item["file_name"] + "_" + str(page_no)
yield page_item


pipeline = dlt.pipeline(pipeline_name="pdf_to_text", destination="weaviate")

# this constructs a simple pipeline that: (1) reads files from "invoices" folder (2) filters only those ending with ".pdf"
Expand All @@ -50,4 +51,4 @@ def pdf_to_text(file_item, separate_pages: bool = False):

client = weaviate.Client("http://localhost:8080")
# get text of all the invoices in InvoiceText class we just created above
print(client.query.get("InvoiceText", ["text", "file_name", "mtime", "page_id"]).do())
print(client.query.get("InvoiceText", ["text", "file_name", "mtime", "page_id"]).do())
9 changes: 4 additions & 5 deletions docs/examples/qdrant_zendesk/qdrant.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@

from dlt.common.configuration.inject import with_config


# function from: https://github.com/dlt-hub/verified-sources/tree/master/sources/zendesk
@dlt.source(max_table_nesting=2)
def zendesk_support(
credentials: Dict[str, str] = dlt.secrets.value,
start_date: Optional[TAnyDateTime] = pendulum.datetime(year=2000, month=1, day=1), # noqa: B008
start_date: Optional[TAnyDateTime] = pendulum.datetime( # noqa: B008
year=2000, month=1, day=1
),
end_date: Optional[TAnyDateTime] = None,
):
"""
Expand Down Expand Up @@ -79,15 +80,13 @@ def _parse_date_or_none(value: Optional[str]) -> Optional[pendulum.DateTime]:
return None
return ensure_pendulum_datetime(value)


# modify dates to return datetime objects instead
def _fix_date(ticket):
ticket["updated_at"] = _parse_date_or_none(ticket["updated_at"])
ticket["created_at"] = _parse_date_or_none(ticket["created_at"])
ticket["due_at"] = _parse_date_or_none(ticket["due_at"])
return ticket


# function from: https://github.com/dlt-hub/verified-sources/tree/master/sources/zendesk
def get_pages(
url: str,
Expand Down Expand Up @@ -128,7 +127,6 @@ def get_pages(
if not response_json["end_of_stream"]:
get_url = response_json["next_page"]


if __name__ == "__main__":
# create a pipeline with an appropriate name
pipeline = dlt.pipeline(
Expand All @@ -148,6 +146,7 @@ def get_pages(

print(load_info)


# running the Qdrant client to connect to your Qdrant database

@with_config(sections=("destination", "qdrant", "credentials"))
Expand Down
4 changes: 1 addition & 3 deletions docs/examples/transformers/pokemon.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import dlt
from dlt.sources.helpers import requests


@dlt.source(max_table_nesting=2)
def source(pokemon_api_url: str):
""""""
Expand Down Expand Up @@ -47,7 +46,6 @@ def species(pokemon_details):

return (pokemon_list | pokemon, pokemon_list | pokemon | species)


if __name__ == "__main__":
# build duck db pipeline
pipeline = dlt.pipeline(
Expand All @@ -56,4 +54,4 @@ def species(pokemon_details):

# the pokemon_list resource does not need to be loaded
load_info = pipeline.run(source("https://pokeapi.co/api/v2/pokemon"))
print(load_info)
print(load_info)
12 changes: 6 additions & 6 deletions docs/website/docs/examples/chess_production/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ def load_data_with_retry(pipeline, data):
load_info = pipeline.run(data)
logger.info(str(load_info))

# raise on failed jobs
load_info.raise_on_failed_jobs()
# send notification
send_slack_message(
pipeline.runtime_config.slack_incoming_hook, "Data was successfully loaded!"
)
# raise on failed jobs
load_info.raise_on_failed_jobs()
# send notification
send_slack_message(
pipeline.runtime_config.slack_incoming_hook, "Data was successfully loaded!"
)
except Exception:
# we get here after all the failed retries
# send notification
Expand Down
8 changes: 4 additions & 4 deletions docs/website/docs/examples/connector_x_arrow/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ def read_sql_x(
def genome_resource():
# create genome resource with merge on `upid` primary key
genome = dlt.resource(
name="genome",
name="acanthochromis_polyacanthus",
write_disposition="merge",
primary_key="upid",
primary_key="analysis_id",
standalone=True,
)(read_sql_x)(
"mysql://[email protected]:4497/Rfam", # type: ignore[arg-type]
"SELECT * FROM genome ORDER BY created LIMIT 1000",
"mysql://[email protected]:3306/acanthochromis_polyacanthus_core_100_1", # type: ignore[arg-type]
"SELECT * FROM analysis LIMIT 20",
)
# add incremental on created at
genome.apply_hints(incremental=dlt.sources.incremental("created"))
Expand Down
13 changes: 3 additions & 10 deletions docs/website/docs/examples/pdf_to_weaviate/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import dlt
from dlt.destinations.impl.weaviate import weaviate_adapter
from PyPDF2 import PdfReader


@dlt.resource(selected=False)
def list_files(folder_path: str):
folder_path = os.path.abspath(folder_path)
Expand All @@ -37,10 +36,9 @@ def list_files(folder_path: str):
yield {
"file_name": filename,
"file_path": file_path,
"mtime": os.path.getmtime(file_path)
"mtime": os.path.getmtime(file_path),
}


@dlt.transformer(primary_key="page_id", write_disposition="merge")
def pdf_to_text(file_item, separate_pages: bool = False):
if not separate_pages:
Expand All @@ -54,10 +52,7 @@ def pdf_to_text(file_item, separate_pages: bool = False):
page_item["page_id"] = file_item["file_name"] + "_" + str(page_no)
yield page_item

pipeline = dlt.pipeline(
pipeline_name='pdf_to_text',
destination='weaviate'
)
pipeline = dlt.pipeline(pipeline_name="pdf_to_text", destination="weaviate")

# this constructs a simple pipeline that: (1) reads files from "invoices" folder (2) filters only those ending with ".pdf"
# (3) sends them to pdf_to_text transformer with pipe (|) operator
Expand All @@ -70,9 +65,7 @@ pdf_pipeline = list_files("assets/invoices").add_filter(
pdf_pipeline.table_name = "InvoiceText"

# use weaviate_adapter to tell destination to vectorize "text" column
load_info = pipeline.run(
weaviate_adapter(pdf_pipeline, vectorize="text")
)
load_info = pipeline.run(weaviate_adapter(pdf_pipeline, vectorize="text"))
row_counts = pipeline.last_trace.last_normalize_info
print(row_counts)
print("------")
Expand Down
12 changes: 10 additions & 2 deletions docs/website/docs/general-usage/destination.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Above we want to use **filesystem** built-in destination. You can use shorthand
<!--@@@DLT_SNIPPET_START ./snippets/destination-snippets.py::class_type-->
```py
import dlt

pipeline = dlt.pipeline("pipeline", destination="dlt.destinations.filesystem")
```
<!--@@@DLT_SNIPPET_END ./snippets/destination-snippets.py::class_type-->
Expand All @@ -37,6 +38,7 @@ Above we use built in **filesystem** destination by providing a class type `file
```py
import dlt
from dlt.destinations import filesystem

pipeline = dlt.pipeline("pipeline", destination=filesystem)
```
<!--@@@DLT_SNIPPET_END ./snippets/destination-snippets.py::class-->
Expand All @@ -50,6 +52,7 @@ You can instantiate **destination class** yourself to configure it explicitly. W
<!--@@@DLT_SNIPPET_START ./snippets/destination-snippets.py::instance-->
```py
import dlt

azure_bucket = filesystem("az://dlt-azure-bucket", destination_name="production_az_bucket")
pipeline = dlt.pipeline("pipeline", destination=azure_bucket)
```
Expand Down Expand Up @@ -99,7 +102,10 @@ import dlt
from dlt.destinations import postgres

# pass full credentials - together with the password (not recommended)
pipeline = dlt.pipeline("pipeline", destination=postgres(credentials="postgresql://loader:loader@localhost:5432/dlt_data"))
pipeline = dlt.pipeline(
"pipeline",
destination=postgres(credentials="postgresql://loader:loader@localhost:5432/dlt_data"),
)
```
<!--@@@DLT_SNIPPET_END ./snippets/destination-snippets.py::config_explicit-->

Expand All @@ -126,7 +132,9 @@ from dlt.sources.credentials import AzureCredentials
credentials = AzureCredentials()
# fill only the account name, leave key to be taken from secrets
credentials.azure_storage_account_name = "production_storage"
pipeline = dlt.pipeline("pipeline", destination=filesystem("az://dlt-azure-bucket", credentials=credentials))
pipeline = dlt.pipeline(
"pipeline", destination=filesystem("az://dlt-azure-bucket", credentials=credentials)
)
```
<!--@@@DLT_SNIPPET_END ./snippets/destination-snippets.py::config_partial_spec-->

Expand Down
Loading

0 comments on commit d830086

Please sign in to comment.