diff --git a/.github/workflows/test_destination_synapse.yml b/.github/workflows/test_destination_synapse.yml deleted file mode 100644 index 83800fa789..0000000000 --- a/.github/workflows/test_destination_synapse.yml +++ /dev/null @@ -1,96 +0,0 @@ -name: test synapse - -on: - pull_request: - branches: - - master - - devel - - workflow_dispatch: - -env: - DLT_SECRETS_TOML: ${{ secrets.DLT_SECRETS_TOML }} - - RUNTIME__SENTRY_DSN: https://cf6086f7d263462088b9fb9f9947caee@o4505514867163136.ingest.sentry.io/4505516212682752 - RUNTIME__LOG_LEVEL: ERROR - - ACTIVE_DESTINATIONS: "[\"synapse\"]" - ALL_FILESYSTEM_DRIVERS: "[\"memory\"]" - -jobs: - - build: - runs-on: ubuntu-latest - - steps: - - name: Check source branch name - run: | - if [[ "${{ github.head_ref }}" != "synapse" ]]; then - exit 1 - fi - - run_loader: - name: Tests Synapse loader - strategy: - fail-fast: false - matrix: - os: ["ubuntu-latest"] - defaults: - run: - shell: bash - runs-on: ${{ matrix.os }} - - steps: - - - name: Check out - uses: actions/checkout@master - - - name: Install ODBC driver for SQL Server - run: | - sudo ACCEPT_EULA=Y apt-get install --yes msodbcsql18 - - - name: Setup Python - uses: actions/setup-python@v4 - with: - python-version: "3.10.x" - - - name: Install Poetry - uses: snok/install-poetry@v1.3.2 - with: - virtualenvs-create: true - virtualenvs-in-project: true - installer-parallel: true - - - name: Load cached venv - id: cached-poetry-dependencies - uses: actions/cache@v3 - with: - path: .venv - key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-gcp - - - name: Install dependencies - run: poetry install --no-interaction -E synapse -E s3 -E gs -E az --with sentry-sdk --with pipeline - - - name: create secrets.toml - run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml - - - run: | - poetry run pytest tests/load --ignore tests/load/pipeline/test_dbt_helper.py - if: runner.os != 'Windows' - name: Run tests Linux/MAC - - run: | - poetry run pytest tests/load --ignore tests/load/pipeline/test_dbt_helper.py - if: runner.os == 'Windows' - name: Run tests Windows - shell: cmd - - matrix_job_required_check: - name: Synapse loader tests - needs: run_loader - runs-on: ubuntu-latest - if: always() - steps: - - name: Check matrix job results - if: contains(needs.*.result, 'failure') || contains(needs.*.result, 'cancelled') - run: | - echo "One or more matrix job tests failed or were cancelled. You may need to re-run them." && exit 1 diff --git a/docs/examples/google_sheets/google_sheets.py b/docs/examples/google_sheets/google_sheets.py index 8a93df9970..1ba330e4ca 100644 --- a/docs/examples/google_sheets/google_sheets.py +++ b/docs/examples/google_sheets/google_sheets.py @@ -9,6 +9,7 @@ ) from dlt.common.typing import DictStrAny, StrAny + def _initialize_sheets( credentials: Union[GcpOAuthCredentials, GcpServiceAccountCredentials] ) -> Any: @@ -16,6 +17,7 @@ def _initialize_sheets( service = build("sheets", "v4", credentials=credentials.to_native_credentials()) return service + @dlt.source def google_spreadsheet( spreadsheet_id: str, @@ -55,6 +57,7 @@ def get_sheet(sheet_name: str) -> Iterator[DictStrAny]: for name in sheet_names ] + if __name__ == "__main__": pipeline = dlt.pipeline(destination="duckdb") # see example.secrets.toml to where to put credentials @@ -67,4 +70,4 @@ def get_sheet(sheet_name: str) -> Iterator[DictStrAny]: sheet_names=range_names, ) ) - print(info) \ No newline at end of file + print(info) diff --git a/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py b/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py index d1ba3537ea..fecd842214 100644 --- a/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py +++ b/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py @@ -10,11 +10,7 @@ def list_files(folder_path: str): folder_path = os.path.abspath(folder_path) for filename in os.listdir(folder_path): file_path = os.path.join(folder_path, filename) - yield { - "file_name": filename, - "file_path": file_path, - "mtime": os.path.getmtime(file_path) - } + yield {"file_name": filename, "file_path": file_path, "mtime": os.path.getmtime(file_path)} @dlt.transformer(primary_key="page_id", write_disposition="merge") @@ -30,10 +26,8 @@ def pdf_to_text(file_item, separate_pages: bool = False): page_item["page_id"] = file_item["file_name"] + "_" + str(page_no) yield page_item -pipeline = dlt.pipeline( - pipeline_name='pdf_to_text', - destination='weaviate' -) + +pipeline = dlt.pipeline(pipeline_name="pdf_to_text", destination="weaviate") # this constructs a simple pipeline that: (1) reads files from "invoices" folder (2) filters only those ending with ".pdf" # (3) sends them to pdf_to_text transformer with pipe (|) operator @@ -46,9 +40,7 @@ def pdf_to_text(file_item, separate_pages: bool = False): pdf_pipeline.table_name = "InvoiceText" # use weaviate_adapter to tell destination to vectorize "text" column -load_info = pipeline.run( - weaviate_adapter(pdf_pipeline, vectorize="text") -) +load_info = pipeline.run(weaviate_adapter(pdf_pipeline, vectorize="text")) row_counts = pipeline.last_trace.last_normalize_info print(row_counts) print("------") @@ -58,4 +50,4 @@ def pdf_to_text(file_item, separate_pages: bool = False): client = weaviate.Client("http://localhost:8080") # get text of all the invoices in InvoiceText class we just created above -print(client.query.get("InvoiceText", ["text", "file_name", "mtime", "page_id"]).do()) \ No newline at end of file +print(client.query.get("InvoiceText", ["text", "file_name", "mtime", "page_id"]).do()) diff --git a/docs/website/docs/.dlt/.gitignore b/docs/website/docs/.dlt/.gitignore new file mode 100644 index 0000000000..da95bc542a --- /dev/null +++ b/docs/website/docs/.dlt/.gitignore @@ -0,0 +1 @@ +/secrets.toml diff --git a/docs/website/docs/examples/connector_x_arrow/code/load_arrow-snippets.py b/docs/website/docs/examples/connector_x_arrow/code/load_arrow-snippets.py index db96efab86..1a6b77da1b 100644 --- a/docs/website/docs/examples/connector_x_arrow/code/load_arrow-snippets.py +++ b/docs/website/docs/examples/connector_x_arrow/code/load_arrow-snippets.py @@ -20,13 +20,13 @@ def read_sql_x( def genome_resource(): # create genome resource with merge on `upid` primary key genome = dlt.resource( - name="genome", + name="acanthochromis_polyacanthus", write_disposition="merge", - primary_key="upid", + primary_key="analysis_id", standalone=True, )(read_sql_x)( - "mysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam", # type: ignore[arg-type] - "SELECT * FROM genome ORDER BY created LIMIT 1000", + "mysql://anonymous@ensembldb.ensembl.org:3306/acanthochromis_polyacanthus_core_100_1", # type: ignore[arg-type] + "SELECT * FROM analysis LIMIT 20", ) # add incremental on created at genome.apply_hints(incremental=dlt.sources.incremental("created")) @@ -47,6 +47,6 @@ def genome_resource(): # check that stuff was loaded # @@@DLT_REMOVE row_counts = pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE - assert row_counts["genome"] == 1000 # @@@DLT_REMOVE + assert row_counts["acanthochromis_polyacanthus"] == 20 # @@@DLT_REMOVE # @@@DLT_SNIPPET_END example diff --git a/docs/website/docs/examples/google_sheets/code/google_sheets-snippets.py b/docs/website/docs/examples/google_sheets/code/google_sheets-snippets.py index 4f32f65370..f56861e9e9 100644 --- a/docs/website/docs/examples/google_sheets/code/google_sheets-snippets.py +++ b/docs/website/docs/examples/google_sheets/code/google_sheets-snippets.py @@ -80,8 +80,8 @@ def get_sheet(sheet_name: str) -> Iterator[DictStrAny]: ) ) print(info) - # @@@DLT_SNIPPET_END google_sheets_run - # @@@DLT_SNIPPET_END example + # @@@DLT_SNIPPET_END google_sheets_run + # @@@DLT_SNIPPET_END example row_counts = pipeline.last_trace.last_normalize_info.row_counts print(row_counts.keys()) assert row_counts["hidden_columns_merged_cells"] == 7 diff --git a/docs/website/docs/examples/pdf_to_weaviate/code/pdf_to_weaviate-snippets.py b/docs/website/docs/examples/pdf_to_weaviate/code/pdf_to_weaviate-snippets.py index fddae74ddf..1ad7cc8159 100644 --- a/docs/website/docs/examples/pdf_to_weaviate/code/pdf_to_weaviate-snippets.py +++ b/docs/website/docs/examples/pdf_to_weaviate/code/pdf_to_weaviate-snippets.py @@ -1,5 +1,6 @@ from tests.pipeline.utils import assert_load_info + def pdf_to_weaviate_snippet() -> None: # @@@DLT_SNIPPET_START example # @@@DLT_SNIPPET_START pdf_to_weaviate @@ -9,7 +10,6 @@ def pdf_to_weaviate_snippet() -> None: from dlt.destinations.impl.weaviate import weaviate_adapter from PyPDF2 import PdfReader - @dlt.resource(selected=False) def list_files(folder_path: str): folder_path = os.path.abspath(folder_path) @@ -18,10 +18,9 @@ def list_files(folder_path: str): yield { "file_name": filename, "file_path": file_path, - "mtime": os.path.getmtime(file_path) + "mtime": os.path.getmtime(file_path), } - @dlt.transformer(primary_key="page_id", write_disposition="merge") def pdf_to_text(file_item, separate_pages: bool = False): if not separate_pages: @@ -35,10 +34,7 @@ def pdf_to_text(file_item, separate_pages: bool = False): page_item["page_id"] = file_item["file_name"] + "_" + str(page_no) yield page_item - pipeline = dlt.pipeline( - pipeline_name='pdf_to_text', - destination='weaviate' - ) + pipeline = dlt.pipeline(pipeline_name="pdf_to_text", destination="weaviate") # this constructs a simple pipeline that: (1) reads files from "invoices" folder (2) filters only those ending with ".pdf" # (3) sends them to pdf_to_text transformer with pipe (|) operator @@ -51,9 +47,7 @@ def pdf_to_text(file_item, separate_pages: bool = False): pdf_pipeline.table_name = "InvoiceText" # use weaviate_adapter to tell destination to vectorize "text" column - load_info = pipeline.run( - weaviate_adapter(pdf_pipeline, vectorize="text") - ) + load_info = pipeline.run(weaviate_adapter(pdf_pipeline, vectorize="text")) row_counts = pipeline.last_trace.last_normalize_info print(row_counts) print("------") diff --git a/docs/website/docs/getting-started-snippets.py b/docs/website/docs/getting-started-snippets.py index eb00df9986..8b7a01e192 100644 --- a/docs/website/docs/getting-started-snippets.py +++ b/docs/website/docs/getting-started-snippets.py @@ -99,21 +99,25 @@ def db_snippet() -> None: # use any sql database supported by SQLAlchemy, below we use a public mysql instance to get data # NOTE: you'll need to install pymysql with "pip install pymysql" # NOTE: loading data from public mysql instance may take several seconds - engine = create_engine("mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam") + engine = create_engine( + "mysql+pymysql://anonymous@ensembldb.ensembl.org:3306/acanthochromis_polyacanthus_core_100_1" + ) with engine.connect() as conn: # select genome table, stream data in batches of 100 elements rows = conn.execution_options(yield_per=100).exec_driver_sql( - "SELECT * FROM genome LIMIT 1000" + "SELECT * FROM analysis LIMIT 1000" ) pipeline = dlt.pipeline( pipeline_name="from_database", destination="duckdb", - dataset_name="genome_data", + dataset_name="acanthochromis_polyacanthus_data", ) # here we convert the rows into dictionaries on the fly with a map function - load_info = pipeline.run(map(lambda row: dict(row._mapping), rows), table_name="genome") + load_info = pipeline.run( + map(lambda row: dict(row._mapping), rows), table_name="acanthochromis_polyacanthus" + ) print(load_info) # @@@DLT_SNIPPET_END db diff --git a/docs/website/docs/intro-snippets.py b/docs/website/docs/intro-snippets.py index 340a6ff262..f1edfb0d9e 100644 --- a/docs/website/docs/intro-snippets.py +++ b/docs/website/docs/intro-snippets.py @@ -18,14 +18,13 @@ def intro_snippet() -> None: response.raise_for_status() data.append(response.json()) # Extract, normalize, and load the data - load_info = pipeline.run(data, table_name='player') + load_info = pipeline.run(data, table_name="player") # @@@DLT_SNIPPET_END api assert_load_info(load_info) def csv_snippet() -> None: - # @@@DLT_SNIPPET_START csv import dlt import pandas as pd @@ -50,8 +49,8 @@ def csv_snippet() -> None: assert_load_info(load_info) -def db_snippet() -> None: +def db_snippet() -> None: # @@@DLT_SNIPPET_START db import dlt from sqlalchemy import create_engine @@ -60,27 +59,27 @@ def db_snippet() -> None: # MySQL instance to get data. # NOTE: you'll need to install pymysql with `pip install pymysql` # NOTE: loading data from public mysql instance may take several seconds - engine = create_engine("mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam") + engine = create_engine( + "mysql+pymysql://anonymous@ensembldb.ensembl.org:3306/acanthochromis_polyacanthus_core_100_1" + ) with engine.connect() as conn: # Select genome table, stream data in batches of 100 elements - query = "SELECT * FROM genome LIMIT 1000" + query = "SELECT * FROM analysis LIMIT 1000" rows = conn.execution_options(yield_per=100).exec_driver_sql(query) pipeline = dlt.pipeline( pipeline_name="from_database", destination="duckdb", - dataset_name="genome_data", + dataset_name="acanthochromis_polyacanthus_data", ) # Convert the rows into dictionaries on the fly with a map function load_info = pipeline.run( - map(lambda row: dict(row._mapping), rows), - table_name="genome" + map(lambda row: dict(row._mapping), rows), table_name="acanthochromis_polyacanthus" ) print(load_info) # @@@DLT_SNIPPET_END db assert_load_info(load_info) - diff --git a/docs/website/docs/tutorial/load-data-from-an-api-snippets.py b/docs/website/docs/tutorial/load-data-from-an-api-snippets.py index cd7004bdbe..d53af9e3d9 100644 --- a/docs/website/docs/tutorial/load-data-from-an-api-snippets.py +++ b/docs/website/docs/tutorial/load-data-from-an-api-snippets.py @@ -3,7 +3,6 @@ def basic_api_snippet() -> None: - # @@@DLT_SNIPPET_START basic_api import dlt from dlt.sources.helpers import requests @@ -15,9 +14,9 @@ def basic_api_snippet() -> None: response.raise_for_status() pipeline = dlt.pipeline( - pipeline_name='github_issues', - destination='duckdb', - dataset_name='github_data', + pipeline_name="github_issues", + destination="duckdb", + dataset_name="github_data", ) # The response contains a list of issues load_info = pipeline.run(response.json(), table_name="issues") @@ -29,19 +28,15 @@ def basic_api_snippet() -> None: def replace_snippet() -> None: - # @@@DLT_SNIPPET_START replace import dlt - data = [ - {'id': 1, 'name': 'Alice'}, - {'id': 2, 'name': 'Bob'} - ] + data = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}] pipeline = dlt.pipeline( - pipeline_name='replace_data', - destination='duckdb', - dataset_name='mydata', + pipeline_name="replace_data", + destination="duckdb", + dataset_name="mydata", ) load_info = pipeline.run(data, table_name="users", write_disposition="replace") @@ -52,7 +47,6 @@ def replace_snippet() -> None: def incremental_snippet() -> None: - # @@@DLT_SNIPPET_START incremental import dlt from dlt.sources.helpers import requests @@ -86,9 +80,9 @@ def get_issues( url = response.links["next"]["url"] pipeline = dlt.pipeline( - pipeline_name='github_issues_incremental', - destination='duckdb', - dataset_name='github_data_append', + pipeline_name="github_issues_incremental", + destination="duckdb", + dataset_name="github_data_append", ) load_info = pipeline.run(get_issues) @@ -103,7 +97,6 @@ def get_issues( def incremental_merge_snippet() -> None: - # @@@DLT_SNIPPET_START incremental_merge import dlt from dlt.sources.helpers import requests @@ -114,15 +107,15 @@ def incremental_merge_snippet() -> None: primary_key="id", ) def get_issues( - updated_at = dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z") + updated_at=dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z") ): # NOTE: we read only open issues to minimize number of calls to # the API. There's a limit of ~50 calls for not authenticated # Github users url = ( - f"https://api.github.com/repos/dlt-hub/dlt/issues" + "https://api.github.com/repos/dlt-hub/dlt/issues" f"?since={updated_at.last_value}&per_page=100&sort=updated" - f"&directions=desc&state=open" + "&directions=desc&state=open" ) while True: @@ -136,9 +129,9 @@ def get_issues( url = response.links["next"]["url"] pipeline = dlt.pipeline( - pipeline_name='github_issues_merge', - destination='duckdb', - dataset_name='github_data_merge', + pipeline_name="github_issues_merge", + destination="duckdb", + dataset_name="github_data_merge", ) load_info = pipeline.run(get_issues) row_counts = pipeline.last_trace.last_normalize_info @@ -152,15 +145,12 @@ def get_issues( def table_dispatch_snippet() -> None: - # @@@DLT_SNIPPET_START table_dispatch import dlt from dlt.sources.helpers import requests @dlt.resource(primary_key="id", table_name=lambda i: i["type"], write_disposition="append") - def repo_events( - last_created_at = dlt.sources.incremental("created_at") - ): + def repo_events(last_created_at=dlt.sources.incremental("created_at")): url = "https://api.github.com/repos/dlt-hub/dlt/events?per_page=100" while True: @@ -179,9 +169,9 @@ def repo_events( url = response.links["next"]["url"] pipeline = dlt.pipeline( - pipeline_name='github_events', - destination='duckdb', - dataset_name='github_events_data', + pipeline_name="github_events", + destination="duckdb", + dataset_name="github_events_data", ) load_info = pipeline.run(repo_events) row_counts = pipeline.last_trace.last_normalize_info diff --git a/tests/common/storages/utils.py b/tests/common/storages/utils.py index 91d8c3c77f..3319480c4f 100644 --- a/tests/common/storages/utils.py +++ b/tests/common/storages/utils.py @@ -67,8 +67,8 @@ def assert_sample_files( assert len(lines) >= 1 assert isinstance(lines[0], str) - assert len(all_file_items) == 10 - assert set([item["file_name"] for item in all_file_items]) == { + assert len(all_file_items) >= 10 + assert set([item["file_name"] for item in all_file_items]) >= { "csv/freshman_kgs.csv", "csv/freshman_lbs.csv", "csv/mlb_players.csv",