diff --git a/docs/examples/chess_production/chess.py b/docs/examples/chess_production/chess.py index 5b767f0eb6..2e85805781 100644 --- a/docs/examples/chess_production/chess.py +++ b/docs/examples/chess_production/chess.py @@ -6,7 +6,6 @@ from dlt.common.typing import StrAny, TDataItems from dlt.sources.helpers.requests import client - @dlt.source def chess( chess_url: str = dlt.config.value, @@ -60,7 +59,6 @@ def players_games(username: Any) -> Iterator[TDataItems]: MAX_PLAYERS = 5 - def load_data_with_retry(pipeline, data): try: for attempt in Retrying( @@ -70,16 +68,18 @@ def load_data_with_retry(pipeline, data): reraise=True, ): with attempt: - logger.info(f"Running the pipeline, attempt={attempt.retry_state.attempt_number}") + logger.info( + f"Running the pipeline, attempt={attempt.retry_state.attempt_number}" + ) load_info = pipeline.run(data) logger.info(str(load_info)) - # raise on failed jobs - load_info.raise_on_failed_jobs() - # send notification - send_slack_message( - pipeline.runtime_config.slack_incoming_hook, "Data was successfully loaded!" - ) + # raise on failed jobs + load_info.raise_on_failed_jobs() + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, "Data was successfully loaded!" + ) except Exception: # we get here after all the failed retries # send notification @@ -92,7 +92,9 @@ def load_data_with_retry(pipeline, data): # print the information on the first load package and all jobs inside logger.info(f"First load package info: {load_info.load_packages[0]}") # print the information on the first completed job in first load package - logger.info(f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}") + logger.info( + f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}" + ) # check for schema updates: schema_updates = [p.schema_update for p in load_info.load_packages] @@ -150,4 +152,4 @@ def load_data_with_retry(pipeline, data): ) # get data for a few famous players data = chess(chess_url="https://api.chess.com/pub/", max_players=MAX_PLAYERS) - load_data_with_retry(pipeline, data) + load_data_with_retry(pipeline, data) \ No newline at end of file diff --git a/docs/examples/connector_x_arrow/load_arrow.py b/docs/examples/connector_x_arrow/load_arrow.py index b3c654cef9..24ba2acb0e 100644 --- a/docs/examples/connector_x_arrow/load_arrow.py +++ b/docs/examples/connector_x_arrow/load_arrow.py @@ -3,7 +3,6 @@ import dlt from dlt.sources.credentials import ConnectionStringCredentials - def read_sql_x( conn_str: ConnectionStringCredentials = dlt.secrets.value, query: str = dlt.config.value, @@ -15,17 +14,16 @@ def read_sql_x( protocol="binary", ) - def genome_resource(): # create genome resource with merge on `upid` primary key genome = dlt.resource( - name="genome", + name="acanthochromis_polyacanthus", write_disposition="merge", - primary_key="upid", + primary_key="analysis_id", standalone=True, )(read_sql_x)( - "mysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam", # type: ignore[arg-type] - "SELECT * FROM genome ORDER BY created LIMIT 1000", + "mysql://anonymous@ensembldb.ensembl.org:3306/acanthochromis_polyacanthus_core_100_1", # type: ignore[arg-type] + "SELECT * FROM analysis LIMIT 20", ) # add incremental on created at genome.apply_hints(incremental=dlt.sources.incremental("created")) diff --git a/docs/examples/google_sheets/google_sheets.py b/docs/examples/google_sheets/google_sheets.py index 1ba330e4ca..8a93df9970 100644 --- a/docs/examples/google_sheets/google_sheets.py +++ b/docs/examples/google_sheets/google_sheets.py @@ -9,7 +9,6 @@ ) from dlt.common.typing import DictStrAny, StrAny - def _initialize_sheets( credentials: Union[GcpOAuthCredentials, GcpServiceAccountCredentials] ) -> Any: @@ -17,7 +16,6 @@ def _initialize_sheets( service = build("sheets", "v4", credentials=credentials.to_native_credentials()) return service - @dlt.source def google_spreadsheet( spreadsheet_id: str, @@ -57,7 +55,6 @@ def get_sheet(sheet_name: str) -> Iterator[DictStrAny]: for name in sheet_names ] - if __name__ == "__main__": pipeline = dlt.pipeline(destination="duckdb") # see example.secrets.toml to where to put credentials @@ -70,4 +67,4 @@ def get_sheet(sheet_name: str) -> Iterator[DictStrAny]: sheet_names=range_names, ) ) - print(info) + print(info) \ No newline at end of file diff --git a/docs/examples/incremental_loading/zendesk.py b/docs/examples/incremental_loading/zendesk.py index 6113f98793..4b8597886a 100644 --- a/docs/examples/incremental_loading/zendesk.py +++ b/docs/examples/incremental_loading/zendesk.py @@ -6,11 +6,12 @@ from dlt.common.typing import TAnyDateTime from dlt.sources.helpers.requests import client - @dlt.source(max_table_nesting=2) def zendesk_support( credentials: Dict[str, str] = dlt.secrets.value, - start_date: Optional[TAnyDateTime] = pendulum.datetime(year=2000, month=1, day=1), # noqa: B008 + start_date: Optional[TAnyDateTime] = pendulum.datetime( # noqa: B008 + year=2000, month=1, day=1 + ), end_date: Optional[TAnyDateTime] = None, ): """ @@ -112,7 +113,6 @@ def get_pages( if not response_json["end_of_stream"]: get_url = response_json["next_page"] - if __name__ == "__main__": # create dlt pipeline pipeline = dlt.pipeline( @@ -120,4 +120,4 @@ def get_pages( ) load_info = pipeline.run(zendesk_support()) - print(load_info) + print(load_info) \ No newline at end of file diff --git a/docs/examples/nested_data/nested_data.py b/docs/examples/nested_data/nested_data.py index 7f85f0522e..3464448de6 100644 --- a/docs/examples/nested_data/nested_data.py +++ b/docs/examples/nested_data/nested_data.py @@ -13,7 +13,6 @@ CHUNK_SIZE = 10000 - # You can limit how deep dlt goes when generating child tables. # By default, the library will descend and generate child tables # for all nested lists, without a limit. @@ -82,7 +81,6 @@ def load_documents(self) -> Iterator[TDataItem]: while docs_slice := list(islice(cursor, CHUNK_SIZE)): yield map_nested_in_place(convert_mongo_objs, docs_slice) - def convert_mongo_objs(value: Any) -> Any: if isinstance(value, (ObjectId, Decimal128)): return str(value) diff --git a/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py b/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py index fecd842214..8f7833e7d7 100644 --- a/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py +++ b/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py @@ -4,14 +4,16 @@ from dlt.destinations.impl.weaviate import weaviate_adapter from PyPDF2 import PdfReader - @dlt.resource(selected=False) def list_files(folder_path: str): folder_path = os.path.abspath(folder_path) for filename in os.listdir(folder_path): file_path = os.path.join(folder_path, filename) - yield {"file_name": filename, "file_path": file_path, "mtime": os.path.getmtime(file_path)} - + yield { + "file_name": filename, + "file_path": file_path, + "mtime": os.path.getmtime(file_path), + } @dlt.transformer(primary_key="page_id", write_disposition="merge") def pdf_to_text(file_item, separate_pages: bool = False): @@ -26,7 +28,6 @@ def pdf_to_text(file_item, separate_pages: bool = False): page_item["page_id"] = file_item["file_name"] + "_" + str(page_no) yield page_item - pipeline = dlt.pipeline(pipeline_name="pdf_to_text", destination="weaviate") # this constructs a simple pipeline that: (1) reads files from "invoices" folder (2) filters only those ending with ".pdf" @@ -50,4 +51,4 @@ def pdf_to_text(file_item, separate_pages: bool = False): client = weaviate.Client("http://localhost:8080") # get text of all the invoices in InvoiceText class we just created above -print(client.query.get("InvoiceText", ["text", "file_name", "mtime", "page_id"]).do()) +print(client.query.get("InvoiceText", ["text", "file_name", "mtime", "page_id"]).do()) \ No newline at end of file diff --git a/docs/examples/qdrant_zendesk/qdrant.py b/docs/examples/qdrant_zendesk/qdrant.py index bd0cbafc99..300d8dc6ad 100644 --- a/docs/examples/qdrant_zendesk/qdrant.py +++ b/docs/examples/qdrant_zendesk/qdrant.py @@ -10,12 +10,13 @@ from dlt.common.configuration.inject import with_config - # function from: https://github.com/dlt-hub/verified-sources/tree/master/sources/zendesk @dlt.source(max_table_nesting=2) def zendesk_support( credentials: Dict[str, str] = dlt.secrets.value, - start_date: Optional[TAnyDateTime] = pendulum.datetime(year=2000, month=1, day=1), # noqa: B008 + start_date: Optional[TAnyDateTime] = pendulum.datetime( # noqa: B008 + year=2000, month=1, day=1 + ), end_date: Optional[TAnyDateTime] = None, ): """ @@ -79,7 +80,6 @@ def _parse_date_or_none(value: Optional[str]) -> Optional[pendulum.DateTime]: return None return ensure_pendulum_datetime(value) - # modify dates to return datetime objects instead def _fix_date(ticket): ticket["updated_at"] = _parse_date_or_none(ticket["updated_at"]) @@ -87,7 +87,6 @@ def _fix_date(ticket): ticket["due_at"] = _parse_date_or_none(ticket["due_at"]) return ticket - # function from: https://github.com/dlt-hub/verified-sources/tree/master/sources/zendesk def get_pages( url: str, @@ -128,7 +127,6 @@ def get_pages( if not response_json["end_of_stream"]: get_url = response_json["next_page"] - if __name__ == "__main__": # create a pipeline with an appropriate name pipeline = dlt.pipeline( @@ -148,6 +146,7 @@ def get_pages( print(load_info) + # running the Qdrant client to connect to your Qdrant database @with_config(sections=("destination", "qdrant", "credentials")) diff --git a/docs/examples/transformers/pokemon.py b/docs/examples/transformers/pokemon.py index 97b9a98b11..c17beff6a8 100644 --- a/docs/examples/transformers/pokemon.py +++ b/docs/examples/transformers/pokemon.py @@ -1,7 +1,6 @@ import dlt from dlt.sources.helpers import requests - @dlt.source(max_table_nesting=2) def source(pokemon_api_url: str): """""" @@ -47,7 +46,6 @@ def species(pokemon_details): return (pokemon_list | pokemon, pokemon_list | pokemon | species) - if __name__ == "__main__": # build duck db pipeline pipeline = dlt.pipeline( @@ -56,4 +54,4 @@ def species(pokemon_details): # the pokemon_list resource does not need to be loaded load_info = pipeline.run(source("https://pokeapi.co/api/v2/pokemon")) - print(load_info) + print(load_info) \ No newline at end of file diff --git a/docs/website/docs/examples/chess_production/index.md b/docs/website/docs/examples/chess_production/index.md index b812e47ef8..f372d26d80 100644 --- a/docs/website/docs/examples/chess_production/index.md +++ b/docs/website/docs/examples/chess_production/index.md @@ -110,12 +110,12 @@ def load_data_with_retry(pipeline, data): load_info = pipeline.run(data) logger.info(str(load_info)) - # raise on failed jobs - load_info.raise_on_failed_jobs() - # send notification - send_slack_message( - pipeline.runtime_config.slack_incoming_hook, "Data was successfully loaded!" - ) + # raise on failed jobs + load_info.raise_on_failed_jobs() + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, "Data was successfully loaded!" + ) except Exception: # we get here after all the failed retries # send notification diff --git a/docs/website/docs/examples/connector_x_arrow/index.md b/docs/website/docs/examples/connector_x_arrow/index.md index 6702b8bbef..92941e1988 100644 --- a/docs/website/docs/examples/connector_x_arrow/index.md +++ b/docs/website/docs/examples/connector_x_arrow/index.md @@ -56,13 +56,13 @@ def read_sql_x( def genome_resource(): # create genome resource with merge on `upid` primary key genome = dlt.resource( - name="genome", + name="acanthochromis_polyacanthus", write_disposition="merge", - primary_key="upid", + primary_key="analysis_id", standalone=True, )(read_sql_x)( - "mysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam", # type: ignore[arg-type] - "SELECT * FROM genome ORDER BY created LIMIT 1000", + "mysql://anonymous@ensembldb.ensembl.org:3306/acanthochromis_polyacanthus_core_100_1", # type: ignore[arg-type] + "SELECT * FROM analysis LIMIT 20", ) # add incremental on created at genome.apply_hints(incremental=dlt.sources.incremental("created")) diff --git a/docs/website/docs/examples/pdf_to_weaviate/index.md b/docs/website/docs/examples/pdf_to_weaviate/index.md index c67f1f9253..cc2ef01e33 100644 --- a/docs/website/docs/examples/pdf_to_weaviate/index.md +++ b/docs/website/docs/examples/pdf_to_weaviate/index.md @@ -28,7 +28,6 @@ import dlt from dlt.destinations.impl.weaviate import weaviate_adapter from PyPDF2 import PdfReader - @dlt.resource(selected=False) def list_files(folder_path: str): folder_path = os.path.abspath(folder_path) @@ -37,10 +36,9 @@ def list_files(folder_path: str): yield { "file_name": filename, "file_path": file_path, - "mtime": os.path.getmtime(file_path) + "mtime": os.path.getmtime(file_path), } - @dlt.transformer(primary_key="page_id", write_disposition="merge") def pdf_to_text(file_item, separate_pages: bool = False): if not separate_pages: @@ -54,10 +52,7 @@ def pdf_to_text(file_item, separate_pages: bool = False): page_item["page_id"] = file_item["file_name"] + "_" + str(page_no) yield page_item -pipeline = dlt.pipeline( - pipeline_name='pdf_to_text', - destination='weaviate' -) +pipeline = dlt.pipeline(pipeline_name="pdf_to_text", destination="weaviate") # this constructs a simple pipeline that: (1) reads files from "invoices" folder (2) filters only those ending with ".pdf" # (3) sends them to pdf_to_text transformer with pipe (|) operator @@ -70,9 +65,7 @@ pdf_pipeline = list_files("assets/invoices").add_filter( pdf_pipeline.table_name = "InvoiceText" # use weaviate_adapter to tell destination to vectorize "text" column -load_info = pipeline.run( - weaviate_adapter(pdf_pipeline, vectorize="text") -) +load_info = pipeline.run(weaviate_adapter(pdf_pipeline, vectorize="text")) row_counts = pipeline.last_trace.last_normalize_info print(row_counts) print("------") diff --git a/docs/website/docs/general-usage/destination.md b/docs/website/docs/general-usage/destination.md index be3f8d8296..c20aa62d16 100644 --- a/docs/website/docs/general-usage/destination.md +++ b/docs/website/docs/general-usage/destination.md @@ -27,6 +27,7 @@ Above we want to use **filesystem** built-in destination. You can use shorthand ```py import dlt + pipeline = dlt.pipeline("pipeline", destination="dlt.destinations.filesystem") ``` @@ -37,6 +38,7 @@ Above we use built in **filesystem** destination by providing a class type `file ```py import dlt from dlt.destinations import filesystem + pipeline = dlt.pipeline("pipeline", destination=filesystem) ``` @@ -50,6 +52,7 @@ You can instantiate **destination class** yourself to configure it explicitly. W ```py import dlt + azure_bucket = filesystem("az://dlt-azure-bucket", destination_name="production_az_bucket") pipeline = dlt.pipeline("pipeline", destination=azure_bucket) ``` @@ -99,7 +102,10 @@ import dlt from dlt.destinations import postgres # pass full credentials - together with the password (not recommended) -pipeline = dlt.pipeline("pipeline", destination=postgres(credentials="postgresql://loader:loader@localhost:5432/dlt_data")) +pipeline = dlt.pipeline( + "pipeline", + destination=postgres(credentials="postgresql://loader:loader@localhost:5432/dlt_data"), +) ``` @@ -126,7 +132,9 @@ from dlt.sources.credentials import AzureCredentials credentials = AzureCredentials() # fill only the account name, leave key to be taken from secrets credentials.azure_storage_account_name = "production_storage" -pipeline = dlt.pipeline("pipeline", destination=filesystem("az://dlt-azure-bucket", credentials=credentials)) +pipeline = dlt.pipeline( + "pipeline", destination=filesystem("az://dlt-azure-bucket", credentials=credentials) +) ``` diff --git a/docs/website/docs/intro.md b/docs/website/docs/intro.md index 5662d6302d..ba00e593a5 100644 --- a/docs/website/docs/intro.md +++ b/docs/website/docs/intro.md @@ -49,7 +49,7 @@ for player in ["magnuscarlsen", "rpragchess"]: response.raise_for_status() data.append(response.json()) # Extract, normalize, and load the data -load_info = pipeline.run(data, table_name='player') +load_info = pipeline.run(data, table_name="player") ``` @@ -143,23 +143,24 @@ from sqlalchemy import create_engine # MySQL instance to get data. # NOTE: you'll need to install pymysql with `pip install pymysql` # NOTE: loading data from public mysql instance may take several seconds -engine = create_engine("mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam") +engine = create_engine( + "mysql+pymysql://anonymous@ensembldb.ensembl.org:3306/acanthochromis_polyacanthus_core_100_1" +) with engine.connect() as conn: # Select genome table, stream data in batches of 100 elements - query = "SELECT * FROM genome LIMIT 1000" + query = "SELECT * FROM analysis LIMIT 1000" rows = conn.execution_options(yield_per=100).exec_driver_sql(query) pipeline = dlt.pipeline( pipeline_name="from_database", destination="duckdb", - dataset_name="genome_data", + dataset_name="acanthochromis_polyacanthus_data", ) # Convert the rows into dictionaries on the fly with a map function load_info = pipeline.run( - map(lambda row: dict(row._mapping), rows), - table_name="genome" + map(lambda row: dict(row._mapping), rows), table_name="acanthochromis_polyacanthus" ) print(load_info) diff --git a/docs/website/docs/tutorial/load-data-from-an-api.md b/docs/website/docs/tutorial/load-data-from-an-api.md index 4452d22d59..0491080156 100644 --- a/docs/website/docs/tutorial/load-data-from-an-api.md +++ b/docs/website/docs/tutorial/load-data-from-an-api.md @@ -32,9 +32,9 @@ response = requests.get(url) response.raise_for_status() pipeline = dlt.pipeline( - pipeline_name='github_issues', - destination='duckdb', - dataset_name='github_data', + pipeline_name="github_issues", + destination="duckdb", + dataset_name="github_data", ) # The response contains a list of issues load_info = pipeline.run(response.json(), table_name="issues") @@ -152,9 +152,9 @@ def get_issues( url = response.links["next"]["url"] pipeline = dlt.pipeline( - pipeline_name='github_issues_incremental', - destination='duckdb', - dataset_name='github_data_append', + pipeline_name="github_issues_incremental", + destination="duckdb", + dataset_name="github_data_append", ) load_info = pipeline.run(get_issues) @@ -212,15 +212,15 @@ from dlt.sources.helpers import requests primary_key="id", ) def get_issues( - updated_at = dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z") + updated_at=dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z") ): # NOTE: we read only open issues to minimize number of calls to # the API. There's a limit of ~50 calls for not authenticated # Github users url = ( - f"https://api.github.com/repos/dlt-hub/dlt/issues" + "https://api.github.com/repos/dlt-hub/dlt/issues" f"?since={updated_at.last_value}&per_page=100&sort=updated" - f"&directions=desc&state=open" + "&directions=desc&state=open" ) while True: @@ -234,9 +234,9 @@ def get_issues( url = response.links["next"]["url"] pipeline = dlt.pipeline( - pipeline_name='github_issues_merge', - destination='duckdb', - dataset_name='github_data_merge', + pipeline_name="github_issues_merge", + destination="duckdb", + dataset_name="github_data_merge", ) load_info = pipeline.run(get_issues) row_counts = pipeline.last_trace.last_normalize_info diff --git a/docs/website/package-lock.json b/docs/website/package-lock.json index 577b362611..c45374f83b 100644 --- a/docs/website/package-lock.json +++ b/docs/website/package-lock.json @@ -25,7 +25,6 @@ }, "devDependencies": { "@docusaurus/module-type-aliases": "2.4.1" - }, "engines": { "node": ">=16.14" @@ -5714,7 +5713,6 @@ "version": "16.3.1", "resolved": "https://registry.npmjs.org/dotenv/-/dotenv-16.3.1.tgz", "integrity": "sha512-IPzF4w4/Rd94bA9imS68tZBaYyBWSCE47V1RGuMrB94iyTOIEwRmVL2x/4An+6mETpLrKJ5hQkB8W4kFAadeIQ==", - "dev": true, "engines": { "node": ">=12" }, @@ -11526,8 +11524,7 @@ "node_modules/toml": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/toml/-/toml-3.0.0.tgz", - "integrity": "sha512-y/mWCZinnvxjTKYhJ+pYxwD0mRLVvOtdS2Awbgxln6iEnt4rk0yBxeSBHkGJcPucRiG0e55mwWp+g/05rsrd6w==", - "dev": true + "integrity": "sha512-y/mWCZinnvxjTKYhJ+pYxwD0mRLVvOtdS2Awbgxln6iEnt4rk0yBxeSBHkGJcPucRiG0e55mwWp+g/05rsrd6w==" }, "node_modules/totalist": { "version": "3.0.1",