Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix tests #903

Merged
merged 6 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 0 additions & 96 deletions .github/workflows/test_destination_synapse.yml

This file was deleted.

5 changes: 4 additions & 1 deletion docs/examples/google_sheets/google_sheets.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
)
from dlt.common.typing import DictStrAny, StrAny


def _initialize_sheets(
credentials: Union[GcpOAuthCredentials, GcpServiceAccountCredentials]
) -> Any:
# Build the service object.
service = build("sheets", "v4", credentials=credentials.to_native_credentials())
return service


@dlt.source
def google_spreadsheet(
spreadsheet_id: str,
Expand Down Expand Up @@ -55,6 +57,7 @@ def get_sheet(sheet_name: str) -> Iterator[DictStrAny]:
for name in sheet_names
]


if __name__ == "__main__":
pipeline = dlt.pipeline(destination="duckdb")
# see example.secrets.toml to where to put credentials
Expand All @@ -67,4 +70,4 @@ def get_sheet(sheet_name: str) -> Iterator[DictStrAny]:
sheet_names=range_names,
)
)
print(info)
print(info)
18 changes: 5 additions & 13 deletions docs/examples/pdf_to_weaviate/pdf_to_weaviate.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,7 @@ def list_files(folder_path: str):
folder_path = os.path.abspath(folder_path)
for filename in os.listdir(folder_path):
file_path = os.path.join(folder_path, filename)
yield {
"file_name": filename,
"file_path": file_path,
"mtime": os.path.getmtime(file_path)
}
yield {"file_name": filename, "file_path": file_path, "mtime": os.path.getmtime(file_path)}


@dlt.transformer(primary_key="page_id", write_disposition="merge")
Expand All @@ -30,10 +26,8 @@ def pdf_to_text(file_item, separate_pages: bool = False):
page_item["page_id"] = file_item["file_name"] + "_" + str(page_no)
yield page_item

pipeline = dlt.pipeline(
pipeline_name='pdf_to_text',
destination='weaviate'
)

pipeline = dlt.pipeline(pipeline_name="pdf_to_text", destination="weaviate")

# this constructs a simple pipeline that: (1) reads files from "invoices" folder (2) filters only those ending with ".pdf"
# (3) sends them to pdf_to_text transformer with pipe (|) operator
Expand All @@ -46,9 +40,7 @@ def pdf_to_text(file_item, separate_pages: bool = False):
pdf_pipeline.table_name = "InvoiceText"

# use weaviate_adapter to tell destination to vectorize "text" column
load_info = pipeline.run(
weaviate_adapter(pdf_pipeline, vectorize="text")
)
load_info = pipeline.run(weaviate_adapter(pdf_pipeline, vectorize="text"))
row_counts = pipeline.last_trace.last_normalize_info
print(row_counts)
print("------")
Expand All @@ -58,4 +50,4 @@ def pdf_to_text(file_item, separate_pages: bool = False):

client = weaviate.Client("http://localhost:8080")
# get text of all the invoices in InvoiceText class we just created above
print(client.query.get("InvoiceText", ["text", "file_name", "mtime", "page_id"]).do())
print(client.query.get("InvoiceText", ["text", "file_name", "mtime", "page_id"]).do())
1 change: 1 addition & 0 deletions docs/website/docs/.dlt/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/secrets.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ def read_sql_x(
def genome_resource():
# create genome resource with merge on `upid` primary key
genome = dlt.resource(
name="genome",
name="acanthochromis_polyacanthus",
write_disposition="merge",
primary_key="upid",
primary_key="analysis_id",
standalone=True,
)(read_sql_x)(
"mysql://[email protected]:4497/Rfam", # type: ignore[arg-type]
"SELECT * FROM genome ORDER BY created LIMIT 1000",
"mysql://[email protected]:3306/acanthochromis_polyacanthus_core_100_1", # type: ignore[arg-type]
"SELECT * FROM analysis LIMIT 20",
)
# add incremental on created at
genome.apply_hints(incremental=dlt.sources.incremental("created"))
Expand All @@ -47,6 +47,6 @@ def genome_resource():

# check that stuff was loaded # @@@DLT_REMOVE
row_counts = pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE
assert row_counts["genome"] == 1000 # @@@DLT_REMOVE
assert row_counts["acanthochromis_polyacanthus"] == 20 # @@@DLT_REMOVE

# @@@DLT_SNIPPET_END example
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ def get_sheet(sheet_name: str) -> Iterator[DictStrAny]:
)
)
print(info)
# @@@DLT_SNIPPET_END google_sheets_run
# @@@DLT_SNIPPET_END example
# @@@DLT_SNIPPET_END google_sheets_run
# @@@DLT_SNIPPET_END example
row_counts = pipeline.last_trace.last_normalize_info.row_counts
print(row_counts.keys())
assert row_counts["hidden_columns_merged_cells"] == 7
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from tests.pipeline.utils import assert_load_info


def pdf_to_weaviate_snippet() -> None:
# @@@DLT_SNIPPET_START example
# @@@DLT_SNIPPET_START pdf_to_weaviate
Expand All @@ -9,7 +10,6 @@ def pdf_to_weaviate_snippet() -> None:
from dlt.destinations.impl.weaviate import weaviate_adapter
from PyPDF2 import PdfReader


@dlt.resource(selected=False)
def list_files(folder_path: str):
folder_path = os.path.abspath(folder_path)
Expand All @@ -18,10 +18,9 @@ def list_files(folder_path: str):
yield {
"file_name": filename,
"file_path": file_path,
"mtime": os.path.getmtime(file_path)
"mtime": os.path.getmtime(file_path),
}


@dlt.transformer(primary_key="page_id", write_disposition="merge")
def pdf_to_text(file_item, separate_pages: bool = False):
if not separate_pages:
Expand All @@ -35,10 +34,7 @@ def pdf_to_text(file_item, separate_pages: bool = False):
page_item["page_id"] = file_item["file_name"] + "_" + str(page_no)
yield page_item

pipeline = dlt.pipeline(
pipeline_name='pdf_to_text',
destination='weaviate'
)
pipeline = dlt.pipeline(pipeline_name="pdf_to_text", destination="weaviate")

# this constructs a simple pipeline that: (1) reads files from "invoices" folder (2) filters only those ending with ".pdf"
# (3) sends them to pdf_to_text transformer with pipe (|) operator
Expand All @@ -51,9 +47,7 @@ def pdf_to_text(file_item, separate_pages: bool = False):
pdf_pipeline.table_name = "InvoiceText"

# use weaviate_adapter to tell destination to vectorize "text" column
load_info = pipeline.run(
weaviate_adapter(pdf_pipeline, vectorize="text")
)
load_info = pipeline.run(weaviate_adapter(pdf_pipeline, vectorize="text"))
row_counts = pipeline.last_trace.last_normalize_info
print(row_counts)
print("------")
Expand Down
12 changes: 8 additions & 4 deletions docs/website/docs/getting-started-snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,21 +99,25 @@ def db_snippet() -> None:
# use any sql database supported by SQLAlchemy, below we use a public mysql instance to get data
# NOTE: you'll need to install pymysql with "pip install pymysql"
# NOTE: loading data from public mysql instance may take several seconds
engine = create_engine("mysql+pymysql://[email protected]:4497/Rfam")
engine = create_engine(
"mysql+pymysql://[email protected]:3306/acanthochromis_polyacanthus_core_100_1"
)
with engine.connect() as conn:
# select genome table, stream data in batches of 100 elements
rows = conn.execution_options(yield_per=100).exec_driver_sql(
"SELECT * FROM genome LIMIT 1000"
"SELECT * FROM analysis LIMIT 1000"
)

pipeline = dlt.pipeline(
pipeline_name="from_database",
destination="duckdb",
dataset_name="genome_data",
dataset_name="acanthochromis_polyacanthus_data",
)

# here we convert the rows into dictionaries on the fly with a map function
load_info = pipeline.run(map(lambda row: dict(row._mapping), rows), table_name="genome")
load_info = pipeline.run(
map(lambda row: dict(row._mapping), rows), table_name="acanthochromis_polyacanthus"
)

print(load_info)
# @@@DLT_SNIPPET_END db
Expand Down
17 changes: 8 additions & 9 deletions docs/website/docs/intro-snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@ def intro_snippet() -> None:
response.raise_for_status()
data.append(response.json())
# Extract, normalize, and load the data
load_info = pipeline.run(data, table_name='player')
load_info = pipeline.run(data, table_name="player")
# @@@DLT_SNIPPET_END api

assert_load_info(load_info)


def csv_snippet() -> None:

# @@@DLT_SNIPPET_START csv
import dlt
import pandas as pd
Expand All @@ -50,8 +49,8 @@ def csv_snippet() -> None:

assert_load_info(load_info)

def db_snippet() -> None:

def db_snippet() -> None:
# @@@DLT_SNIPPET_START db
import dlt
from sqlalchemy import create_engine
Expand All @@ -60,27 +59,27 @@ def db_snippet() -> None:
# MySQL instance to get data.
# NOTE: you'll need to install pymysql with `pip install pymysql`
# NOTE: loading data from public mysql instance may take several seconds
engine = create_engine("mysql+pymysql://[email protected]:4497/Rfam")
engine = create_engine(
"mysql+pymysql://[email protected]:3306/acanthochromis_polyacanthus_core_100_1"
)

with engine.connect() as conn:
# Select genome table, stream data in batches of 100 elements
query = "SELECT * FROM genome LIMIT 1000"
query = "SELECT * FROM analysis LIMIT 1000"
rows = conn.execution_options(yield_per=100).exec_driver_sql(query)

pipeline = dlt.pipeline(
pipeline_name="from_database",
destination="duckdb",
dataset_name="genome_data",
dataset_name="acanthochromis_polyacanthus_data",
)

# Convert the rows into dictionaries on the fly with a map function
load_info = pipeline.run(
map(lambda row: dict(row._mapping), rows),
table_name="genome"
map(lambda row: dict(row._mapping), rows), table_name="acanthochromis_polyacanthus"
)

print(load_info)
# @@@DLT_SNIPPET_END db

assert_load_info(load_info)

Loading
Loading