Skip to content

Commit

Permalink
Add LanceDB custom destination example code (#1323)
Browse files Browse the repository at this point in the history
* Add LanceDB custom destination example code

Signed-off-by: Marcel Coetzee <[email protected]>

* Format

Signed-off-by: Marcel Coetzee <[email protected]>

* Remove Postgres credentials from example.secrets.toml

Signed-off-by: Marcel Coetzee <[email protected]>

* Format

Signed-off-by: Marcel Coetzee <[email protected]>

* Add typing

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor code documentation and add type ignore comments

Signed-off-by: Marcel Coetzee <[email protected]>

* Ignore checks

Signed-off-by: Marcel Coetzee <[email protected]>

* wrap in main if statement

Signed-off-by: Marcel Coetzee <[email protected]>

* Add lancedb to install dependencies in test_doc_snippets workflow

Signed-off-by: Marcel Coetzee <[email protected]>

* poetry

Signed-off-by: Marcel Coetzee <[email protected]>

* Update deps

Signed-off-by: Marcel Coetzee <[email protected]>

* Update LanceDB version and replace Sentence-Transformers with OpenAIEmbeddings

Signed-off-by: Marcel Coetzee <[email protected]>

* Poetry lock

Signed-off-by: Marcel Coetzee <[email protected]>

* Format

Signed-off-by: Marcel Coetzee <[email protected]>

* Update versions

Signed-off-by: Marcel Coetzee <[email protected]>

* Replace OpenAI with Cohere in LanceDB custom destination example

Signed-off-by: Marcel Coetzee <[email protected]>

* Format

Signed-off-by: Marcel Coetzee <[email protected]>

* Add error handling to custom destination lanceDB example

Signed-off-by: Marcel Coetzee <[email protected]>

* Lift config to secrets/config

Signed-off-by: Marcel Coetzee <[email protected]>

* Ignore example lancedb local dir

Signed-off-by: Marcel Coetzee <[email protected]>

* Why was this uncommented

Signed-off-by: Marcel Coetzee <[email protected]>

* Remove unnecessary lock

Signed-off-by: Marcel Coetzee <[email protected]>

* Cleanup

Signed-off-by: Marcel Coetzee <[email protected]>

* Remove print statements from custom_destination_lancedb.py

Signed-off-by: Marcel Coetzee <[email protected]>

* Print info

Signed-off-by: Marcel Coetzee <[email protected]>

* Print info

Signed-off-by: Marcel Coetzee <[email protected]>

* Use rest_client

Signed-off-by: Marcel Coetzee <[email protected]>

* noqa

Signed-off-by: Marcel Coetzee <[email protected]>

* Remove `cohere` dependency and add `embeddings` extra to `lancedb`

Signed-off-by: Marcel Coetzee <[email protected]>

* changing secrets path for cohere to pass docs tests

* fixes lock file

* moves get lancedb path to run within the test

* fix dependencies

* fix linting

* fix lancedb deps

* update lock file

* change source name

* moved client_id to secrets

* switch lancedb example to openai and small fixes

* small fixes

* add openai to docs deps

* fix grammar gpt typing

---------

Signed-off-by: Marcel Coetzee <[email protected]>
Co-authored-by: Marcin Rudolf <[email protected]>
Co-authored-by: rahuljo <[email protected]>
Co-authored-by: Dave <[email protected]>
Co-authored-by: Alena <[email protected]>
  • Loading branch information
5 people authored Jun 24, 2024
1 parent 6466ce4 commit 6b83cee
Show file tree
Hide file tree
Showing 8 changed files with 319 additions and 2 deletions.
2 changes: 2 additions & 0 deletions docs/examples/custom_destination_lancedb/.dlt/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[lancedb]
db_path = "spotify.db"
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[spotify]
client_id = ""
client_secret = ""

# provide the openai api key here
[destination.lancedb.credentials]
embedding_model_provider_api_key = ""
1 change: 1 addition & 0 deletions docs/examples/custom_destination_lancedb/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
spotify.db
Empty file.
155 changes: 155 additions & 0 deletions docs/examples/custom_destination_lancedb/custom_destination_lancedb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
"""
---
title: Custom Destination with LanceDB
description: Learn how use the custom destination to load to LanceDB.
keywords: [destination, credentials, example, lancedb, custom destination, vectorstore, AI, LLM]
---
This example showcases a Python script that demonstrates the integration of LanceDB, an open-source vector database,
as a custom destination within the dlt ecosystem.
The script illustrates the implementation of a custom destination as well as the population of the LanceDB vector
store with data from various sources.
This highlights the seamless interoperability between dlt and LanceDB.
You can get a Spotify client ID and secret from https://developer.spotify.com/.
We'll learn how to:
- Use the [custom destination](../dlt-ecosystem/destinations/destination.md)
- Delegate the embeddings to LanceDB using OpenAI Embeddings
"""

__source_name__ = "spotify"

import datetime # noqa: I251
import os
from dataclasses import dataclass, fields
from pathlib import Path
from typing import Any

import lancedb # type: ignore
from lancedb.embeddings import get_registry # type: ignore
from lancedb.pydantic import LanceModel, Vector # type: ignore

import dlt
from dlt.common.configuration import configspec
from dlt.common.schema import TTableSchema
from dlt.common.typing import TDataItems, TSecretStrValue
from dlt.sources.helpers import requests
from dlt.sources.helpers.rest_client import RESTClient, AuthConfigBase

# access secrets to get openai key and instantiate embedding function
openai_api_key: str = dlt.secrets.get("destination.lancedb.credentials.embedding_model_provider_api_key")
func = get_registry().get("openai").create(name="text-embedding-3-small", api_key=openai_api_key)


class EpisodeSchema(LanceModel):
id: str # noqa: A003
name: str
description: str = func.SourceField()
vector: Vector(func.ndims()) = func.VectorField() # type: ignore[valid-type]
release_date: datetime.date
href: str


@dataclass(frozen=True)
class Shows:
monday_morning_data_chat: str = "3Km3lBNzJpc1nOTJUtbtMh"
latest_space_podcast: str = "2p7zZVwVF6Yk0Zsb4QmT7t"
superdatascience_podcast: str = "1n8P7ZSgfVLVJ3GegxPat1"
lex_fridman: str = "2MAi0BvDc6GTFvKFPXnkCL"


@configspec
class SpotifyAuth(AuthConfigBase):
client_id: str = None
client_secret: TSecretStrValue = None

def __call__(self, request) -> Any:
if not hasattr(self, "access_token"):
self.access_token = self._get_access_token()
request.headers["Authorization"] = f"Bearer {self.access_token}"
return request

def _get_access_token(self) -> Any:
auth_url = "https://accounts.spotify.com/api/token"
auth_response = requests.post(
auth_url,
{
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
},
)
return auth_response.json()["access_token"]


@dlt.source
def spotify_shows(
client_id: str = dlt.secrets.value,
client_secret: str = dlt.secrets.value,
):
spotify_base_api_url = "https://api.spotify.com/v1"
client = RESTClient(
base_url=spotify_base_api_url,
auth=SpotifyAuth(client_id=client_id, client_secret=client_secret), # type: ignore[arg-type]
)

for show in fields(Shows):
show_name = show.name
show_id = show.default
url = f"/shows/{show_id}/episodes"
yield dlt.resource(
client.paginate(url, params={"limit": 50}),
name=show_name,
write_disposition="merge",
primary_key="id",
parallelized=True,
max_table_nesting=0,
)


@dlt.destination(batch_size=250, name="lancedb")
def lancedb_destination(items: TDataItems, table: TTableSchema) -> None:
db_path = Path(dlt.config.get("lancedb.db_path"))
db = lancedb.connect(db_path)

# since we are embedding the description field, we need to do some additional cleaning
# for openai. Openai will not accept empty strings or input with more than 8191 tokens
for item in items:
item["description"] = item.get("description") or "No Description"
item["description"] = item["description"][0:8000]
try:
tbl = db.open_table(table["name"])
except FileNotFoundError:
tbl = db.create_table(table["name"], schema=EpisodeSchema)
tbl.add(items)


if __name__ == "__main__":
db_path = Path(dlt.config.get("lancedb.db_path"))
db = lancedb.connect(db_path)

for show in fields(Shows):
db.drop_table(show.name, ignore_missing=True)

pipeline = dlt.pipeline(
pipeline_name="spotify",
destination=lancedb_destination,
dataset_name="spotify_podcast_data",
progress="log",
)

load_info = pipeline.run(spotify_shows())
load_info.raise_on_failed_jobs()
print(load_info)

row_counts = pipeline.last_trace.last_normalize_info
print(row_counts)

query = "French AI scientist with Lex, talking about AGI and Meta and Llama"
table_to_query = "lex_fridman"

tbl = db.open_table(table_to_query)

results = tbl.search(query=query).to_list()
assert results
2 changes: 1 addition & 1 deletion docs/tools/fix_grammar_gpt.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def get_chunk_length(chunk: List[str]) -> int:
temperature=0,
)

fixed_chunks.append(response.choices[0].message.content)
fixed_chunks.append(response.choices[0].message.content) # type: ignore

with open(file_path, "w", encoding="utf-8") as f:
for c in fixed_chunks:
Expand Down
152 changes: 151 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 6b83cee

Please sign in to comment.