Skip to content

Commit

Permalink
Docs/website/docs/examples/qdrant eg (#775)
Browse files Browse the repository at this point in the history
* qdrant zendesk example

* pushing after rebase

* edded all code snippets and sidebars.js

* added all code snippets and sidebars.js

* added test and few code explanations

* code to be fixed to work for zendesk source

* fixed code and qdrant creds

* changes incorporated

* return back article

* return package.json

* fixing destinations in all examples

* fix __name__ with _remove

* add dependencies in index.md

* add dependencies in pyproject.toml

* add generated files

* pin python version, and reduce on top version from 3.13 to 3.12, update lock file

* revert check-package.sh

* revert poetry-deps.sh

* removed unused variables

* regenerate script example

* generate script example for connector_x_arrow, small fixes in code and text

* added env variable for qdrant

* fix deps and secrets.toml's

* fix creds for qdrant

* fix creds for zendesk

* fix source name for zendesk

* fix qdrant creds path

* revert stupid bash scripts

* no updated lock file

---------

Co-authored-by: AstrakhantsevaAA <[email protected]>
  • Loading branch information
hibajamal and AstrakhantsevaAA authored Dec 14, 2023
1 parent 6dce0eb commit 2c1cd58
Show file tree
Hide file tree
Showing 24 changed files with 4,375 additions and 4,006 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/test_doc_snippets.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ env:
RUNTIME__SLACK_INCOMING_HOOK: ${{ secrets.RUNTIME__SLACK_INCOMING_HOOK }}
# Mongodb url for nested data example
MONGODB_PIPELINE__SOURCES__CONNECTION_URL: ${{ secrets.MONGODB_PIPELINE__SOURCES__CONNECTION_URL }}
# Qdrant credentials
DESTINATION__QDRANT__CREDENTIALS__LOCATION: ${{ secrets.DESTINATION__QDRANT__CREDENTIALS__LOCATION }}
DESTINATION__QDRANT__CREDENTIALS__API_KEY: ${{ secrets.DESTINATION__QDRANT__CREDENTIALS__API_KEY }}

jobs:

run_lint:
Expand Down Expand Up @@ -59,7 +63,7 @@ jobs:

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E duckdb -E weaviate -E parquet --with docs --without airflow
run: poetry install --no-interaction -E duckdb -E weaviate -E parquet -E qdrant --with docs --without airflow

- name: Run linter and tests
run: make test-and-lint-snippets
Expand Down
39 changes: 39 additions & 0 deletions docs/examples/connector_x_arrow/load_arrow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import connectorx as cx

import dlt
from dlt.sources.credentials import ConnectionStringCredentials

def read_sql_x(
conn_str: ConnectionStringCredentials = dlt.secrets.value,
query: str = dlt.config.value,
):
yield cx.read_sql(
conn_str.to_native_representation(),
query,
return_type="arrow2",
protocol="binary",
)

def genome_resource():
# create genome resource with merge on `upid` primary key
genome = dlt.resource(
name="genome",
write_disposition="merge",
primary_key="upid",
standalone=True,
)(read_sql_x)(
"mysql://[email protected]:4497/Rfam", # type: ignore[arg-type]
"SELECT * FROM genome ORDER BY created LIMIT 1000",
)
# add incremental on created at
genome.apply_hints(incremental=dlt.sources.incremental("created"))
return genome


if __name__ == "__main__":
pipeline = dlt.pipeline(destination="duckdb")
genome = genome_resource()

print(pipeline.run(genome))
print(pipeline.last_trace.last_normalize_info)
# NOTE: run pipeline again to see that no more records got loaded thanks to incremental loading
File renamed without changes.
8 changes: 8 additions & 0 deletions docs/examples/qdrant_zendesk/.dlt/example.secrets.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[destination.qdrant.credentials]
location = ""
api_key = ""

[sources.zendesk.credentials]
password = ""
subdomain = ""
email = ""
File renamed without changes.
172 changes: 172 additions & 0 deletions docs/examples/qdrant_zendesk/qdrant.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
from typing import Optional, Dict, Any, Tuple

import dlt
from dlt.common import pendulum
from dlt.common.time import ensure_pendulum_datetime
from dlt.common.typing import TAnyDateTime
from dlt.sources.helpers.requests import client
from dlt.destinations.qdrant import qdrant_adapter
from qdrant_client import QdrantClient

from dlt.common.configuration.inject import with_config

# function from: https://github.com/dlt-hub/verified-sources/tree/master/sources/zendesk
@dlt.source(max_table_nesting=2)
def zendesk_support(
credentials: Dict[str, str] = dlt.secrets.value,
start_date: Optional[TAnyDateTime] = pendulum.datetime(year=2000, month=1, day=1), # noqa: B008
end_date: Optional[TAnyDateTime] = None,
):
"""
Retrieves data from Zendesk Support for tickets events.
Args:
credentials: Zendesk credentials (default: dlt.secrets.value)
start_date: Start date for data extraction (default: 2000-01-01)
end_date: End date for data extraction (default: None).
If end time is not provided, the incremental loading will be
enabled, and after the initial run, only new data will be retrieved.
Returns:
DltResource.
"""
# Convert start_date and end_date to Pendulum datetime objects
start_date_obj = ensure_pendulum_datetime(start_date)
end_date_obj = ensure_pendulum_datetime(end_date) if end_date else None

# Extract credentials from secrets dictionary
auth = (credentials["email"], credentials["password"])
subdomain = credentials["subdomain"]
url = f"https://{subdomain}.zendesk.com"

# we use `append` write disposition, because objects in tickets_data endpoint are never updated
# so we do not need to merge
# we set primary_key so allow deduplication of events by the `incremental` below in the rare case
# when two events have the same timestamp
@dlt.resource(primary_key="id", write_disposition="append")
def tickets_data(
updated_at: dlt.sources.incremental[
pendulum.DateTime
] = dlt.sources.incremental(
"updated_at",
initial_value=start_date_obj,
end_value=end_date_obj,
allow_external_schedulers=True,
)
):
# URL For ticket events
# 'https://d3v-dlthub.zendesk.com/api/v2/incremental/tickets_data.json?start_time=946684800'
event_pages = get_pages(
url=url,
endpoint="/api/v2/incremental/tickets",
auth=auth,
data_point_name="tickets",
params={"start_time": updated_at.last_value.int_timestamp},
)
for page in event_pages:
yield ([_fix_date(ticket) for ticket in page])

# stop loading when using end_value and end is reached.
# unfortunately, Zendesk API does not have the "end_time" parameter, so we stop iterating ourselves
if updated_at.end_out_of_range:
return

return tickets_data


# helper function to fix the datetime format
def _parse_date_or_none(value: Optional[str]) -> Optional[pendulum.DateTime]:
if not value:
return None
return ensure_pendulum_datetime(value)

# modify dates to return datetime objects instead
def _fix_date(ticket):
ticket["updated_at"] = _parse_date_or_none(ticket["updated_at"])
ticket["created_at"] = _parse_date_or_none(ticket["created_at"])
ticket["due_at"] = _parse_date_or_none(ticket["due_at"])
return ticket

# function from: https://github.com/dlt-hub/verified-sources/tree/master/sources/zendesk
def get_pages(
url: str,
endpoint: str,
auth: Tuple[str, str],
data_point_name: str,
params: Optional[Dict[str, Any]] = None,
):
"""
Makes a request to a paginated endpoint and returns a generator of data items per page.
Args:
url: The base URL.
endpoint: The url to the endpoint, e.g. /api/v2/calls
auth: Credentials for authentication.
data_point_name: The key which data items are nested under in the response object (e.g. calls)
params: Optional dict of query params to include in the request.
Returns:
Generator of pages, each page is a list of dict data items.
"""
# update the page size to enable cursor pagination
params = params or {}
params["per_page"] = 1000
headers = None

# make request and keep looping until there is no next page
get_url = f"{url}{endpoint}"
while get_url:
response = client.get(
get_url, headers=headers, auth=auth, params=params
)
response.raise_for_status()
response_json = response.json()
result = response_json[data_point_name]
yield result

get_url = None
# See https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#json-format
if not response_json["end_of_stream"]:
get_url = response_json["next_page"]

if __name__ == "__main__":
# create a pipeline with an appropriate name
pipeline = dlt.pipeline(
pipeline_name="qdrant_zendesk_pipeline",
destination="qdrant",
dataset_name="zendesk_data",
)

# run the dlt pipeline and save info about the load process
load_info = pipeline.run(
# here we use a special function to tell Qdrant which fields to embed
qdrant_adapter(
zendesk_support(), # retrieve tickets data
embed=["subject", "description"],
)
)

print(load_info)


# running the Qdrant client to connect to your Qdrant database

@with_config(sections=("destination", "credentials"))
def get_qdrant_client(location=dlt.secrets.value, api_key=dlt.secrets.value):
return QdrantClient(
url=location,
api_key=api_key,
)

# running the Qdrant client to connect to your Qdrant database
qdrant_client = get_qdrant_client()

# view Qdrant collections you'll find your dataset here:
print(qdrant_client.get_collections())

# query Qdrant with prompt: getting tickets info close to "cancellation"
response = qdrant_client.query(
"zendesk_data_content", # collection/dataset name with the 'content' suffix -> tickets content table
query_text=["cancel", "cancel subscription"], # prompt to search
limit=3 # limit the number of results to the nearest 3 embeddings
)
4 changes: 2 additions & 2 deletions docs/website/docs/examples/_examples-header.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import CodeBlock from '@theme/CodeBlock';
git clone [email protected]:dlt-hub/dlt.git
# go to example directory
cd ./dlt/docs/examples/${props.slug}
# install dlt with duckdb
pip install "dlt[duckdb]"
# install dlt with ${props.destination}
pip install "dlt[${props.destination}]"
# run the example script
python ${props.run_file}.py`}
</CodeBlock>
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,4 @@ def load_data_with_retry(pipeline, data):
data = chess(chess_url="https://api.chess.com/pub/", max_players=MAX_PLAYERS)
load_data_with_retry(pipeline, data)
# @@@DLT_SNIPPET_END markdown_pipeline
# @@@DLT_SNIPPET_END example
3 changes: 2 additions & 1 deletion docs/website/docs/examples/chess_production/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import Header from '../_examples-header.md';
<Header
intro="In this tutorial, you will learn how to investigate, track, retry and test your loads."
slug="chess_production"
run_file="chess" />
run_file="chess"
destination="duckdb" />

## Run chess pipeline in production

Expand Down
Original file line number Diff line number Diff line change
@@ -1,41 +1,52 @@
def connector_x_snippet() -> None:

# @@@DLT_SNIPPET_START example
# @@@DLT_SNIPPET_START markdown_source

import connectorx as cx

import dlt
from dlt.sources.credentials import ConnectionStringCredentials

def read_sql_x(
conn_str: ConnectionStringCredentials = dlt.secrets.value,
query: str = dlt.config.value
query: str = dlt.config.value,
):
yield cx.read_sql(conn_str.to_native_representation(), query, return_type="arrow2", protocol="binary")

# create genome resource with merge on `upid` primary key
genome = dlt.resource(
name="genome",
write_disposition="merge",
primary_key="upid",
standalone=True
)(read_sql_x)(
"mysql://[email protected]:4497/Rfam", # type: ignore[arg-type]
"SELECT * FROM genome ORDER BY created LIMIT 1000"
)
# add incremental on created at
genome.apply_hints(incremental=dlt.sources.incremental("created"))
yield cx.read_sql(
conn_str.to_native_representation(),
query,
return_type="arrow2",
protocol="binary",
)

def genome_resource():
# create genome resource with merge on `upid` primary key
genome = dlt.resource(
name="genome",
write_disposition="merge",
primary_key="upid",
standalone=True,
)(read_sql_x)(
"mysql://[email protected]:4497/Rfam", # type: ignore[arg-type]
"SELECT * FROM genome ORDER BY created LIMIT 1000",
)
# add incremental on created at
genome.apply_hints(incremental=dlt.sources.incremental("created"))
return genome

# @@@DLT_SNIPPET_END markdown_source

# @@@DLT_SNIPPET_START markdown_pipeline
__name__ = "__main__" # @@@DLT_REMOVE
__name__ = "__main__" # @@@DLT_REMOVE
if __name__ == "__main__":
pipeline = dlt.pipeline(destination="duckdb")
genome = genome_resource()

print(pipeline.run(genome))
print(pipeline.last_trace.last_normalize_info)
# NOTE: run pipeline again to see that no more records got loaded thanks to incremental working
# NOTE: run pipeline again to see that no more records got loaded thanks to incremental loading
# @@@DLT_SNIPPET_END markdown_pipeline

# check that stuff was loaded
row_counts = pipeline.last_trace.last_normalize_info.row_counts
assert row_counts["genome"] == 1000
# check that stuff was loaded # @@@DLT_REMOVE
row_counts = pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE
assert row_counts["genome"] == 1000 # @@@DLT_REMOVE

# @@@DLT_SNIPPET_END example
Loading

0 comments on commit 2c1cd58

Please sign in to comment.