Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Docs/website/docs/examples/qdrant eg #775

Merged
merged 29 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
ad36202
qdrant zendesk example
hibajamal Nov 17, 2023
18e8bb5
pushing after rebase
hibajamal Nov 29, 2023
98c2fc3
edded all code snippets and sidebars.js
hibajamal Nov 17, 2023
4506e9b
added all code snippets and sidebars.js
hibajamal Nov 17, 2023
3eeb33b
added test and few code explanations
hibajamal Nov 17, 2023
9573291
code to be fixed to work for zendesk source
hibajamal Nov 24, 2023
64afe34
fixed code and qdrant creds
hibajamal Nov 29, 2023
6aa5a68
changes incorporated
hibajamal Dec 7, 2023
4afdc17
return back article
hibajamal Dec 7, 2023
218cff7
return package.json
hibajamal Dec 7, 2023
43a0a5a
fixing destinations in all examples
hibajamal Dec 7, 2023
02421a9
fix __name__ with _remove
hibajamal Dec 7, 2023
02b644a
add dependencies in index.md
hibajamal Dec 7, 2023
185f460
add dependencies in pyproject.toml
hibajamal Dec 7, 2023
ffea520
add generated files
AstrakhantsevaAA Dec 7, 2023
2f9a8e9
pin python version, and reduce on top version from 3.13 to 3.12, upda…
AstrakhantsevaAA Dec 7, 2023
a8f64af
revert check-package.sh
AstrakhantsevaAA Dec 7, 2023
cfd2fac
revert poetry-deps.sh
AstrakhantsevaAA Dec 7, 2023
13a99f7
removed unused variables
hibajamal Dec 7, 2023
bc268f9
regenerate script example
AstrakhantsevaAA Dec 7, 2023
1f0674a
generate script example for connector_x_arrow, small fixes in code an…
AstrakhantsevaAA Dec 7, 2023
c0131e1
added env variable for qdrant
hibajamal Dec 7, 2023
9f54373
fix deps and secrets.toml's
AstrakhantsevaAA Dec 8, 2023
0c89555
fix creds for qdrant
AstrakhantsevaAA Dec 8, 2023
cd3f385
fix creds for zendesk
AstrakhantsevaAA Dec 8, 2023
a1d3a28
fix source name for zendesk
AstrakhantsevaAA Dec 8, 2023
890164a
fix qdrant creds path
AstrakhantsevaAA Dec 8, 2023
e874cff
revert stupid bash scripts
AstrakhantsevaAA Dec 8, 2023
a20af27
no updated lock file
AstrakhantsevaAA Dec 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/test_doc_snippets.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ env:
RUNTIME__SLACK_INCOMING_HOOK: ${{ secrets.RUNTIME__SLACK_INCOMING_HOOK }}
# Mongodb url for nested data example
MONGODB_PIPELINE__SOURCES__CONNECTION_URL: ${{ secrets.MONGODB_PIPELINE__SOURCES__CONNECTION_URL }}
# Qdrant credentials
DESTINATION__QDRANT__CREDENTIALS: ${{ secrets.QDRANT__CREDENTIALS }}
jobs:

run_lint:
Expand Down
Empty file modified check-package.sh
100755 → 100644
AstrakhantsevaAA marked this conversation as resolved.
Show resolved Hide resolved
Empty file.
39 changes: 39 additions & 0 deletions docs/examples/connector_x_arrow/load_arrow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import connectorx as cx

import dlt
from dlt.sources.credentials import ConnectionStringCredentials

def read_sql_x(
AstrakhantsevaAA marked this conversation as resolved.
Show resolved Hide resolved
conn_str: ConnectionStringCredentials = dlt.secrets.value,
query: str = dlt.config.value,
):
yield cx.read_sql(
conn_str.to_native_representation(),
query,
return_type="arrow2",
protocol="binary",
)

def genome_resource():
# create genome resource with merge on `upid` primary key
genome = dlt.resource(
name="genome",
write_disposition="merge",
primary_key="upid",
standalone=True,
)(read_sql_x)(
"mysql://[email protected]:4497/Rfam", # type: ignore[arg-type]
"SELECT * FROM genome ORDER BY created LIMIT 1000",
)
# add incremental on created at
genome.apply_hints(incremental=dlt.sources.incremental("created"))
return genome


if __name__ == "__main__":
pipeline = dlt.pipeline(destination="duckdb")
genome = genome_resource()

print(pipeline.run(genome))
print(pipeline.last_trace.last_normalize_info)
# NOTE: run pipeline again to see that no more records got loaded thanks to incremental loading
Empty file.
174 changes: 174 additions & 0 deletions docs/examples/qdrant_zendesk/qdrant.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
from typing import Optional, Dict, Any, Tuple

import dlt
from dlt.common import pendulum
from dlt.common.time import ensure_pendulum_datetime
from dlt.common.typing import TAnyDateTime
from dlt.sources.helpers.requests import client
from dlt.destinations.qdrant import qdrant_adapter
from qdrant_client import QdrantClient

from dlt.common.configuration.inject import with_config

# function from: https://github.com/dlt-hub/verified-sources/tree/master/sources/zendesk
@dlt.source(max_table_nesting=2)
def zendesk_support(
credentials: Dict[str, str] = dlt.secrets.value,
start_date: Optional[TAnyDateTime] = pendulum.datetime(year=2000, month=1, day=1), # noqa: B008
end_date: Optional[TAnyDateTime] = None,
):
"""
Retrieves data from Zendesk Support for tickets events.

Args:
credentials: Zendesk credentials (default: dlt.secrets.value)
start_date: Start date for data extraction (default: 2000-01-01)
end_date: End date for data extraction (default: None).
If end time is not provided, the incremental loading will be
enabled, and after the initial run, only new data will be retrieved.

Returns:
DltResource.
"""
# Convert start_date and end_date to Pendulum datetime objects
start_date_obj = ensure_pendulum_datetime(start_date)
end_date_obj = ensure_pendulum_datetime(end_date) if end_date else None

# Extract credentials from secrets dictionary
auth = (credentials["email"], credentials["password"])
subdomain = credentials["subdomain"]
url = f"https://{subdomain}.zendesk.com"

# we use `append` write disposition, because objects in tickets_data endpoint are never updated
# so we do not need to merge
# we set primary_key so allow deduplication of events by the `incremental` below in the rare case
# when two events have the same timestamp
@dlt.resource(primary_key="id", write_disposition="append")
def tickets_data(
updated_at: dlt.sources.incremental[
pendulum.DateTime
] = dlt.sources.incremental(
"updated_at",
initial_value=start_date_obj,
end_value=end_date_obj,
allow_external_schedulers=True,
)
):
# URL For ticket events
# 'https://d3v-dlthub.zendesk.com/api/v2/incremental/tickets_data.json?start_time=946684800'
event_pages = get_pages(
url=url,
endpoint="/api/v2/incremental/tickets",
auth=auth,
data_point_name="tickets",
params={"start_time": updated_at.last_value.int_timestamp},
)
for page in event_pages:
yield ([_fix_date(ticket) for ticket in page])

# stop loading when using end_value and end is reached.
# unfortunately, Zendesk API does not have the "end_time" parameter, so we stop iterating ourselves
if updated_at.end_out_of_range:
return

return tickets_data


# helper function to fix the datetime format
def _parse_date_or_none(value: Optional[str]) -> Optional[pendulum.DateTime]:
if not value:
return None
return ensure_pendulum_datetime(value)

# modify dates to return datetime objects instead
def _fix_date(ticket):
ticket["updated_at"] = _parse_date_or_none(ticket["updated_at"])
ticket["created_at"] = _parse_date_or_none(ticket["created_at"])
ticket["due_at"] = _parse_date_or_none(ticket["due_at"])
return ticket

# function from: https://github.com/dlt-hub/verified-sources/tree/master/sources/zendesk
def get_pages(
url: str,
endpoint: str,
auth: Tuple[str, str],
data_point_name: str,
params: Optional[Dict[str, Any]] = None,
):
"""
Makes a request to a paginated endpoint and returns a generator of data items per page.

Args:
url: The base URL.
endpoint: The url to the endpoint, e.g. /api/v2/calls
auth: Credentials for authentication.
data_point_name: The key which data items are nested under in the response object (e.g. calls)
params: Optional dict of query params to include in the request.

Returns:
Generator of pages, each page is a list of dict data items.
"""
# update the page size to enable cursor pagination
params = params or {}
params["per_page"] = 1000
headers = None

# make request and keep looping until there is no next page
get_url = f"{url}{endpoint}"
while get_url:
response = client.get(
get_url, headers=headers, auth=auth, params=params
)
response.raise_for_status()
response_json = response.json()
result = response_json[data_point_name]
yield result

get_url = None
# See https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#json-format
if not response_json["end_of_stream"]:
get_url = response_json["next_page"]

if __name__ == "__main__":


# create a pipeline with an appropriate name
pipeline = dlt.pipeline(
pipeline_name="qdrant_zendesk_pipeline",
destination="qdrant",
dataset_name="zendesk_data",
)

# run the dlt pipeline and save info about the load process
load_info = pipeline.run(
# here we use a special function to tell Qdrant which fields to embed
qdrant_adapter(
zendesk_support(), # retrieve tickets data
embed=["subject", "description"],
)
)

print(load_info)


# running the Qdrant client to connect to your Qdrant database

@with_config(sections=("destination", "credentials"))
def get_qdrant_client(location=dlt.secrets.value, api_key=dlt.secrets.value):
return QdrantClient(
url=location,
api_key=api_key,
)

# running the Qdrant client to connect to your Qdrant database
qdrant_client = get_qdrant_client()

# view Qdrant collections you'll find your dataset here:
print(qdrant_client.get_collections())

# query Qdrant with prompt: getting tickets info close to "cancellation"
response = qdrant_client.query(
"zendesk_data_content", # collection/dataset name with the 'content' suffix -> tickets content table
query_text=["cancel", "cancel subscription"], # prompt to search
limit=3 # limit the number of results to the nearest 3 embeddings
)
4 changes: 2 additions & 2 deletions docs/website/docs/examples/_examples-header.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import CodeBlock from '@theme/CodeBlock';
git clone [email protected]:dlt-hub/dlt.git
# go to example directory
cd ./dlt/docs/examples/${props.slug}
# install dlt with duckdb
pip install "dlt[duckdb]"
# install dlt with ${props.destination}
pip install "dlt[${props.destination}]"
# run the example script
python ${props.run_file}.py`}
</CodeBlock>
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,4 @@ def load_data_with_retry(pipeline, data):
data = chess(chess_url="https://api.chess.com/pub/", max_players=MAX_PLAYERS)
load_data_with_retry(pipeline, data)
# @@@DLT_SNIPPET_END markdown_pipeline
# @@@DLT_SNIPPET_END example
3 changes: 2 additions & 1 deletion docs/website/docs/examples/chess_production/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import Header from '../_examples-header.md';
<Header
intro="In this tutorial, you will learn how to investigate, track, retry and test your loads."
slug="chess_production"
run_file="chess" />
run_file="chess"
destination="duckdb" />

## Run chess pipeline in production

Expand Down
Original file line number Diff line number Diff line change
@@ -1,41 +1,52 @@
def connector_x_snippet() -> None:

# @@@DLT_SNIPPET_START example
# @@@DLT_SNIPPET_START markdown_source

import connectorx as cx

import dlt
from dlt.sources.credentials import ConnectionStringCredentials

def read_sql_x(
conn_str: ConnectionStringCredentials = dlt.secrets.value,
query: str = dlt.config.value
query: str = dlt.config.value,
):
yield cx.read_sql(conn_str.to_native_representation(), query, return_type="arrow2", protocol="binary")

# create genome resource with merge on `upid` primary key
genome = dlt.resource(
name="genome",
write_disposition="merge",
primary_key="upid",
standalone=True
)(read_sql_x)(
"mysql://[email protected]:4497/Rfam", # type: ignore[arg-type]
"SELECT * FROM genome ORDER BY created LIMIT 1000"
)
# add incremental on created at
genome.apply_hints(incremental=dlt.sources.incremental("created"))
yield cx.read_sql(
conn_str.to_native_representation(),
query,
return_type="arrow2",
protocol="binary",
)

def genome_resource():
# create genome resource with merge on `upid` primary key
genome = dlt.resource(
name="genome",
write_disposition="merge",
primary_key="upid",
standalone=True,
)(read_sql_x)(
"mysql://[email protected]:4497/Rfam", # type: ignore[arg-type]
"SELECT * FROM genome ORDER BY created LIMIT 1000",
)
# add incremental on created at
genome.apply_hints(incremental=dlt.sources.incremental("created"))
return genome

# @@@DLT_SNIPPET_END markdown_source

# @@@DLT_SNIPPET_START markdown_pipeline
__name__ = "__main__" # @@@DLT_REMOVE
__name__ = "__main__" # @@@DLT_REMOVE
if __name__ == "__main__":
pipeline = dlt.pipeline(destination="duckdb")
genome = genome_resource()

print(pipeline.run(genome))
print(pipeline.last_trace.last_normalize_info)
# NOTE: run pipeline again to see that no more records got loaded thanks to incremental working
# NOTE: run pipeline again to see that no more records got loaded thanks to incremental loading
# @@@DLT_SNIPPET_END markdown_pipeline

# check that stuff was loaded
row_counts = pipeline.last_trace.last_normalize_info.row_counts
assert row_counts["genome"] == 1000
# check that stuff was loaded # @@@DLT_REMOVE
row_counts = pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE
assert row_counts["genome"] == 1000 # @@@DLT_REMOVE

# @@@DLT_SNIPPET_END example
Loading
Loading