Skip to content

Commit

Permalink
LanceDB Destination (#1375)
Browse files Browse the repository at this point in the history
* Added lancedb as an optional dependency

Signed-off-by: Marcel Coetzee <[email protected]>

* Added lancedb to dependencies in test workflow

Signed-off-by: Marcel Coetzee <[email protected]>

* Add initial capabilities for LanceDB destination

Signed-off-by: Marcel Coetzee <[email protected]>

* Added new lancedb_adapter

Signed-off-by: Marcel Coetzee <[email protected]>

* Added LanceDB factory in destinations implementation

Signed-off-by: Marcel Coetzee <[email protected]>

* Added LanceDB client configuration with embedding details

Signed-off-by: Marcel Coetzee <[email protected]>

* Added LanceDB Client with data load and schema management functionalities

Signed-off-by: Marcel Coetzee <[email protected]>

* Lockfile

Signed-off-by: Marcel Coetzee <[email protected]>

* Wireframe LanceDB client implementation

Signed-off-by: Marcel Coetzee <[email protected]>

* Add abstract methods

Signed-off-by: Marcel Coetzee <[email protected]>

* Enhance LanceDB client with additional functionality

Signed-off-by: Marcel Coetzee <[email protected]>

* Add tests and GitHub workflow for LanceDB destination

Signed-off-by: Marcel Coetzee <[email protected]>

* Update Python version to 3.11.x in GitHub workflow

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor and cleanup LanceDBClient and LoadLanceDBJob classes

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor load tests in lancedb/utils.py and add test for LanceDB model inference

Signed-off-by: Marcel Coetzee <[email protected]>

* Added functionality to infer LanceDB model from data and refactored name for reserved fields

Signed-off-by: Marcel Coetzee <[email protected]>

* Remove storage options

Storage options are only available in asynchronous Python API. See https://lancedb.github.io/lancedb/guides/storage/

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor test pipeline and implement lancedb_adapter in LanceDBClient

Signed-off-by: Marcel Coetzee <[email protected]>

* Add schema argument to LoadLanceDBJob function

Signed-off-by: Marcel Coetzee <[email protected]>

* Format

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor LanceDB related code and increase type hint coverage

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor LanceDB client and tests, enhance DB type mapping

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor code to improve readability by reducing line breaks

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor LanceDB client code by adding schema_conversion and utils modules

Signed-off-by: Marcel Coetzee <[email protected]>

* Remove redundant variables in lancedb_client.py

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor code to improve readability and move environment variable set function to utils.py

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor LanceDB client implementation and error handling

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor code for better readability and add type ignore comments

Signed-off-by: Marcel Coetzee <[email protected]>

* Added lancedb as an optional dependency

Signed-off-by: Marcel Coetzee <[email protected]>

* Added lancedb to dependencies in test workflow

Signed-off-by: Marcel Coetzee <[email protected]>

* Add initial capabilities for LanceDB destination

Signed-off-by: Marcel Coetzee <[email protected]>

* Added new lancedb_adapter

Signed-off-by: Marcel Coetzee <[email protected]>

* Added LanceDB factory in destinations implementation

Signed-off-by: Marcel Coetzee <[email protected]>

* Added LanceDB client configuration with embedding details

Signed-off-by: Marcel Coetzee <[email protected]>

* Added LanceDB Client with data load and schema management functionalities

Signed-off-by: Marcel Coetzee <[email protected]>

* Wireframe LanceDB client implementation

Signed-off-by: Marcel Coetzee <[email protected]>

* Add abstract methods

Signed-off-by: Marcel Coetzee <[email protected]>

* Enhance LanceDB client with additional functionality

Signed-off-by: Marcel Coetzee <[email protected]>

* Add tests and GitHub workflow for LanceDB destination

Signed-off-by: Marcel Coetzee <[email protected]>

* Update Python version to 3.11.x in GitHub workflow

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor and cleanup LanceDBClient and LoadLanceDBJob classes

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor load tests in lancedb/utils.py and add test for LanceDB model inference

Signed-off-by: Marcel Coetzee <[email protected]>

* Added functionality to infer LanceDB model from data and refactored name for reserved fields

Signed-off-by: Marcel Coetzee <[email protected]>

* Remove storage options

Storage options are only available in asynchronous Python API. See https://lancedb.github.io/lancedb/guides/storage/

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor test pipeline and implement lancedb_adapter in LanceDBClient

Signed-off-by: Marcel Coetzee <[email protected]>

* Add schema argument to LoadLanceDBJob function

Signed-off-by: Marcel Coetzee <[email protected]>

* Format

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor LanceDB related code and increase type hint coverage

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor LanceDB client and tests, enhance DB type mapping

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor code to improve readability by reducing line breaks

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor LanceDB client code by adding schema_conversion and utils modules

Signed-off-by: Marcel Coetzee <[email protected]>

* Remove redundant variables in lancedb_client.py

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor code to improve readability and move environment variable set function to utils.py

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor LanceDB client implementation and error handling

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor code for better readability and add type ignore comments

Signed-off-by: Marcel Coetzee <[email protected]>

* Dependency Versioning

Signed-off-by: Marcel Coetzee <[email protected]>

* Remove unnecessary dependencies and update lancedb and pylance versions

Signed-off-by: Marcel Coetzee <[email protected]>

* Silence mypy warnings

Signed-off-by: Marcel Coetzee <[email protected]>

* Revert mypy ignores

Signed-off-by: Marcel Coetzee <[email protected]>

* Revert mypy ignores

Signed-off-by: Marcel Coetzee <[email protected]>

* Fix versioning with 3.8

Signed-off-by: Marcel Coetzee <[email protected]>

* Fix versioning

Signed-off-by: Marcel Coetzee <[email protected]>

* Update default URI and dataset separator in LanceDB configuration

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor LanceDB typemapper with timestamp and decimal precision adjustments

Signed-off-by: Marcel Coetzee <[email protected]>

* Updated method for retrieving sentinel table name

Signed-off-by: Marcel Coetzee <[email protected]>

* Remove redundant table normalisation for version_table_name

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor LanceDB functionalities and improve handling of optional embedding fields

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor LanceDBClient and update parameter defaults in schema.py

Signed-off-by: Marcel Coetzee <[email protected]>

* Added lancedb to default vector configs and improved type annotations in tests.

Signed-off-by: Marcel Coetzee <[email protected]>

* Return self in enter context manager method

Signed-off-by: Marcel Coetzee <[email protected]>

* Handle FileNotFoundError

Signed-off-by: Marcel Coetzee <[email protected]>

* Replace FileNotFoundError with DestinationUndefinedEntity in lancedb_client.py

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor LanceDB client for simplified table name handling

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactored LanceDB schema creation and storage update processes to pyarrow

Signed-off-by: Marcel Coetzee <[email protected]>

* Remove LanceModels

Signed-off-by: Marcel Coetzee <[email protected]>

* Ensure 'records' is a list in lancedb_client.py

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor code and add batch error handling in lancedb client

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor LanceDB client and schema for improved embedding handling

Signed-off-by: Marcel Coetzee <[email protected]>

* Improve error handling and retries in LanceDB client

Signed-off-by: Marcel Coetzee <[email protected]>

* Add error decorator to get_stored_state method in lancedb_client

Signed-off-by: Marcel Coetzee <[email protected]>

* Change error handling from FileNotFoundError to IndexError

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor lancedb_client.py and add error decorators

Signed-off-by: Marcel Coetzee <[email protected]>

* Add configurable read consistency to LanceDB client

Signed-off-by: Marcel Coetzee <[email protected]>

* Versioning

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor code for readability and change return type in tests

Signed-off-by: Marcel Coetzee <[email protected]>

* Update queries in lancedb_client to order by insertion date

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor LanceDB client and schema for better table creation and management

Signed-off-by: Marcel Coetzee <[email protected]>

* Combine "skip" and "append" write dispositions in batch upload

Signed-off-by: Marcel Coetzee <[email protected]>

* Add schema version hash check in LanceDB client write operations

Signed-off-by: Marcel Coetzee <[email protected]>

* Remove testing code

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor return statement in lancedb_client for successful state loads

Signed-off-by: Marcel Coetzee <[email protected]>

* Update lancedb_client.py to improve table handling and embedding fields

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor LanceDB schema generation and handle metadata for embedding functions

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor schema creation and remove unused code

Signed-off-by: Marcel Coetzee <[email protected]>

* Add mapping for provider environment variables and update schema comment

Signed-off-by: Marcel Coetzee <[email protected]>

* Update package versions in pyproject.toml and poetry.lock

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor LanceDB utils and client, handle exception and remove unnecessary comment

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor utility functions in lancedb tests

Signed-off-by: Marcel Coetzee <[email protected]>

* Update 'replace' mode and improve table handling in lancedb client

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor assert_unordered_list_equal to handle dictionaries

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor code for better readability and remove unnecessary blank lines

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor code for readability and remove redundant comments

Signed-off-by: Marcel Coetzee <[email protected]>

* Update sentinel table name in test_pipeline.py

Signed-off-by: Marcel Coetzee <[email protected]>

* "Add order by clause to database query in lancedb_client"

Signed-off-by: Marcel Coetzee <[email protected]>

* Use super method to reduce redundancy

Signed-off-by: Marcel Coetzee <[email protected]>

* Syntax

Signed-off-by: Marcel Coetzee <[email protected]>

* Remove bare except clauses

Signed-off-by: Marcel Coetzee <[email protected]>

* Revert "Remove bare except clauses"

This reverts commit 3b44631.

* Remove bare except clause

Signed-off-by: Marcel Coetzee <[email protected]>

* Remove bare except clause

Signed-off-by: Marcel Coetzee <[email protected]>

* Remove bare except clause

Signed-off-by: Marcel Coetzee <[email protected]>

* Remove bare except clause

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor error handling in LanceDB client

Signed-off-by: Marcel Coetzee <[email protected]>

* Add configurable sentinel table name in LanceDB client configuration

Signed-off-by: Marcel Coetzee <[email protected]>

* Update embedding model config and schema in LanceDB

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor lancedb_client.py, remove unused methods and imports

Signed-off-by: Marcel Coetzee <[email protected]>

* Add support for adding multiple fields to LanceDB table in a single operation

Signed-off-by: Marcel Coetzee <[email protected]>

* Only filter by successful loads

Signed-off-by: Marcel Coetzee <[email protected]>

* Remove redundant exception handling in JSON extraction

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor lancedb_client.py for better code readability

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor lancedb_client.py for improved code readability

Signed-off-by: Marcel Coetzee <[email protected]>

* Fix module docstring

Signed-off-by: Marcel Coetzee <[email protected]>

* Remove embedding_fields from make_arrow_field_schema function

Signed-off-by: Marcel Coetzee <[email protected]>

* Add merge key support

Signed-off-by: Marcel Coetzee <[email protected]>

* Refactor `get_stored_state` to perform join in memory

Signed-off-by: Marcel Coetzee <[email protected]>

* Packaging

Signed-off-by: Marcel Coetzee <[email protected]>

* Format

Signed-off-by: Marcel Coetzee <[email protected]>

* Update dependencies in GitHub workflow for testing lancedb

Signed-off-by: Marcel Coetzee <[email protected]>

* Add "cohere" to package dependencies in pyproject.toml

Signed-off-by: Marcel Coetzee <[email protected]>

* Dependencies

Signed-off-by: Marcel Coetzee <[email protected]>

* Update dependencies installation in GitHub workflow

Signed-off-by: Marcel Coetzee <[email protected]>

* Dependencies

Signed-off-by: Marcel Coetzee <[email protected]>

* Update dependency in GitHub workflow

Signed-off-by: Marcel Coetzee <[email protected]>

* Dependencies

Signed-off-by: Marcel Coetzee <[email protected]>

* Dependencies

Signed-off-by: Marcel Coetzee <[email protected]>

* Add documentation for LanceDB

Signed-off-by: Marcel Coetzee <[email protected]>

* Add limitations

Signed-off-by: Marcel Coetzee <[email protected]>

* Offload ordering logic from LanceDB

Signed-off-by: Marcel Coetzee <[email protected]>

* Update import statements in lancedb client and exceptions files

Signed-off-by: Marcel Coetzee <[email protected]>

* Create `_get_table_name` getter

Signed-off-by: Marcel Coetzee <[email protected]>

* Format

Signed-off-by: Marcel Coetzee <[email protected]>

* Avoid race conditions by delegating all state management to dlt

Signed-off-by: Marcel Coetzee <[email protected]>

* Imports

Signed-off-by: Marcel Coetzee <[email protected]>

* small doc and test fixes

* Fix OpenAI embedding handling of empty strings

Replace empty strings with a placeholder before sending to the OpenAI API, and handle the placeholder as an empty embedding in the results. This avoids BadRequestErrors from the API when empty strings are present in the input data.

Implemented by subclassing OpenAIEmbeddings and overriding sanitize_input and generate_embeddings methods.

Signed-off-by: Marcel Coetzee <[email protected]>

* Add 'embeddings' dependencies manually

Signed-off-by: Marcel Coetzee <[email protected]>

* Finally...

Signed-off-by: Marcel Coetzee <[email protected]>

* Dependencies

Signed-off-by: Marcel Coetzee <[email protected]>

* Dependencies

Signed-off-by: Marcel Coetzee <[email protected]>

* Docs

Signed-off-by: Marcel Coetzee <[email protected]>

* Remove superfluous helper method.

Signed-off-by: Marcel Coetzee <[email protected]>

* Lock File

Signed-off-by: Marcel Coetzee <[email protected]>

* Make api_key and embedding_model_provider_api_key optional

Signed-off-by: Marcel Coetzee <[email protected]>

* Clear environment for config test

Signed-off-by: Marcel Coetzee <[email protected]>

* Minor test config

Signed-off-by: Marcel Coetzee <[email protected]>

* test config

Signed-off-by: Marcel Coetzee <[email protected]>

* lancedb config

Signed-off-by: Marcel Coetzee <[email protected]>

* Config test

Signed-off-by: Marcel Coetzee <[email protected]>

* Config

Signed-off-by: Marcel Coetzee <[email protected]>

* config

Signed-off-by: Marcel Coetzee <[email protected]>

* Import lancedb_adapter function instead of module in adapter collection module

Signed-off-by: Marcel Coetzee <[email protected]>

* Clarify embedding facilities in LanceDB docs

Signed-off-by: Marcel Coetzee <[email protected]>

* update lancedb to support new naming setup (cleanup will follow)

* update lockfile

---------

Signed-off-by: Marcel Coetzee <[email protected]>
Co-authored-by: Dave <[email protected]>
  • Loading branch information
Pipboyguy and sh-rp authored Jun 27, 2024
1 parent c00d408 commit 78cdb0b
Show file tree
Hide file tree
Showing 24 changed files with 2,121 additions and 208 deletions.
81 changes: 81 additions & 0 deletions .github/workflows/test_destination_lancedb.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
name: dest | lancedb

on:
pull_request:
branches:
- master
- devel
workflow_dispatch:
schedule:
- cron: '0 2 * * *'

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

env:
DLT_SECRETS_TOML: ${{ secrets.DLT_SECRETS_TOML }}

RUNTIME__SENTRY_DSN: https://[email protected]/4504819859914752
RUNTIME__LOG_LEVEL: ERROR
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}

ACTIVE_DESTINATIONS: "[\"lancedb\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"

jobs:
get_docs_changes:
name: docs changes
uses: ./.github/workflows/get_docs_changes.yml
if: ${{ !github.event.pull_request.head.repo.fork || contains(github.event.pull_request.labels.*.name, 'ci from fork')}}

run_loader:
name: dest | lancedb tests
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
defaults:
run:
shell: bash
runs-on: "ubuntu-latest"

steps:
- name: Check out
uses: actions/checkout@master

- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: "3.11.x"

- name: Install Poetry
uses: snok/[email protected]
with:
virtualenvs-create: true
virtualenvs-in-project: true
installer-parallel: true

- name: Load cached venv
id: cached-poetry-dependencies
uses: actions/cache@v3
with:
path: .venv
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-gcp

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml

- name: Install dependencies
run: poetry install --no-interaction -E lancedb -E parquet --with sentry-sdk --with pipeline

- name: Install embedding provider dependencies
run: poetry run pip install openai

- run: |
poetry run pytest tests/load -m "essential"
name: Run essential tests Linux
if: ${{ ! (contains(github.event.pull_request.labels.*.name, 'ci full') || github.event_name == 'schedule')}}
- run: |
poetry run pytest tests/load
name: Run all tests Linux
if: ${{ contains(github.event.pull_request.labels.*.name, 'ci full') || github.event_name == 'schedule'}}
2 changes: 1 addition & 1 deletion .github/workflows/test_doc_snippets.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ jobs:

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E duckdb -E weaviate -E parquet -E qdrant -E bigquery -E postgres --with docs,sentry-sdk --without airflow
run: poetry install --no-interaction -E duckdb -E weaviate -E parquet -E qdrant -E bigquery -E postgres -E lancedb --with docs,sentry-sdk --without airflow

- name: create secrets.toml for examples
run: pwd && echo "$DLT_SECRETS_TOML" > docs/examples/.dlt/secrets.toml
Expand Down
2 changes: 2 additions & 0 deletions dlt/destinations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from dlt.destinations.impl.athena.factory import athena
from dlt.destinations.impl.redshift.factory import redshift
from dlt.destinations.impl.qdrant.factory import qdrant
from dlt.destinations.impl.lancedb.factory import lancedb
from dlt.destinations.impl.motherduck.factory import motherduck
from dlt.destinations.impl.weaviate.factory import weaviate
from dlt.destinations.impl.destination.factory import destination
Expand All @@ -28,6 +29,7 @@
"athena",
"redshift",
"qdrant",
"lancedb",
"motherduck",
"weaviate",
"synapse",
Expand Down
2 changes: 2 additions & 0 deletions dlt/destinations/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from dlt.destinations.impl.weaviate.weaviate_adapter import weaviate_adapter
from dlt.destinations.impl.qdrant.qdrant_adapter import qdrant_adapter
from dlt.destinations.impl.lancedb import lancedb_adapter
from dlt.destinations.impl.bigquery.bigquery_adapter import bigquery_adapter
from dlt.destinations.impl.synapse.synapse_adapter import synapse_adapter
from dlt.destinations.impl.clickhouse.clickhouse_adapter import clickhouse_adapter
Expand All @@ -10,6 +11,7 @@
__all__ = [
"weaviate_adapter",
"qdrant_adapter",
"lancedb_adapter",
"bigquery_adapter",
"synapse_adapter",
"clickhouse_adapter",
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/lancedb/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from dlt.destinations.impl.lancedb.lancedb_adapter import lancedb_adapter
111 changes: 111 additions & 0 deletions dlt/destinations/impl/lancedb/configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import dataclasses
from typing import Optional, Final, Literal, ClassVar, List

from dlt.common.configuration import configspec
from dlt.common.configuration.specs.base_configuration import (
BaseConfiguration,
CredentialsConfiguration,
)
from dlt.common.destination.reference import DestinationClientDwhConfiguration
from dlt.common.typing import TSecretStrValue
from dlt.common.utils import digest128


@configspec
class LanceDBCredentials(CredentialsConfiguration):
uri: Optional[str] = ".lancedb"
"""LanceDB database URI. Defaults to local, on-disk instance.
The available schemas are:
- `/path/to/database` - local database.
- `db://host:port` - remote database (LanceDB cloud).
"""
api_key: Optional[TSecretStrValue] = None
"""API key for the remote connections (LanceDB cloud)."""
embedding_model_provider_api_key: Optional[str] = None
"""API key for the embedding model provider."""

__config_gen_annotations__: ClassVar[List[str]] = [
"uri",
"api_key",
"embedding_model_provider_api_key",
]


@configspec
class LanceDBClientOptions(BaseConfiguration):
max_retries: Optional[int] = 3
"""`EmbeddingFunction` class wraps the calls for source and query embedding
generation inside a rate limit handler that retries the requests with exponential
backoff after successive failures.
You can tune it by setting it to a different number, or disable it by setting it to 0."""

__config_gen_annotations__: ClassVar[List[str]] = [
"max_retries",
]


TEmbeddingProvider = Literal[
"gemini-text",
"bedrock-text",
"cohere",
"gte-text",
"imagebind",
"instructor",
"open-clip",
"openai",
"sentence-transformers",
"huggingface",
"colbert",
]


@configspec
class LanceDBClientConfiguration(DestinationClientDwhConfiguration):
destination_type: Final[str] = dataclasses.field( # type: ignore
default="LanceDB", init=False, repr=False, compare=False
)
credentials: LanceDBCredentials = None
dataset_separator: str = "___"
"""Character for the dataset separator."""
dataset_name: Final[Optional[str]] = dataclasses.field( # type: ignore
default=None, init=False, repr=False, compare=False
)

options: Optional[LanceDBClientOptions] = None
"""LanceDB client options."""

embedding_model_provider: TEmbeddingProvider = "cohere"
"""Embedding provider used for generating embeddings. Default is "cohere". You can find the full list of
providers at https://github.com/lancedb/lancedb/tree/main/python/python/lancedb/embeddings as well as
https://lancedb.github.io/lancedb/embeddings/default_embedding_functions/."""
embedding_model: str = "embed-english-v3.0"
"""The model used by the embedding provider for generating embeddings.
Check with the embedding provider which options are available.
Reference https://lancedb.github.io/lancedb/embeddings/default_embedding_functions/."""
embedding_model_dimensions: Optional[int] = None
"""The dimensions of the embeddings generated. In most cases it will be automatically inferred, by LanceDB,
but it is configurable in rare cases.
Make sure it corresponds with the associated embedding model's dimensionality."""
vector_field_name: str = "vector__"
"""Name of the special field to store the vector embeddings."""
id_field_name: str = "id__"
"""Name of the special field to manage deduplication."""
sentinel_table_name: str = "dltSentinelTable"
"""Name of the sentinel table that encapsulates datasets. Since LanceDB has no
concept of schemas, this table serves as a proxy to group related dlt tables together."""

__config_gen_annotations__: ClassVar[List[str]] = [
"embedding_model",
"embedding_model_provider",
]

def fingerprint(self) -> str:
"""Returns a fingerprint of a connection string."""

if self.credentials and self.credentials.uri:
return digest128(self.credentials.uri)
return ""
30 changes: 30 additions & 0 deletions dlt/destinations/impl/lancedb/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from functools import wraps
from typing import (
Any,
)

from lancedb.exceptions import MissingValueError, MissingColumnError # type: ignore

from dlt.common.destination.exceptions import (
DestinationUndefinedEntity,
DestinationTerminalException,
)
from dlt.common.destination.reference import JobClientBase
from dlt.common.typing import TFun


def lancedb_error(f: TFun) -> TFun:
@wraps(f)
def _wrap(self: JobClientBase, *args: Any, **kwargs: Any) -> Any:
try:
return f(self, *args, **kwargs)
except (
FileNotFoundError,
MissingValueError,
MissingColumnError,
) as status_ex:
raise DestinationUndefinedEntity(status_ex) from status_ex
except Exception as e:
raise DestinationTerminalException(e) from e

return _wrap # type: ignore[return-value]
53 changes: 53 additions & 0 deletions dlt/destinations/impl/lancedb/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import typing as t

from dlt.common.destination import Destination, DestinationCapabilitiesContext
from dlt.destinations.impl.lancedb.configuration import (
LanceDBCredentials,
LanceDBClientConfiguration,
)


if t.TYPE_CHECKING:
from dlt.destinations.impl.lancedb.lancedb_client import LanceDBClient


class lancedb(Destination[LanceDBClientConfiguration, "LanceDBClient"]):
spec = LanceDBClientConfiguration

def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps = DestinationCapabilitiesContext()
caps.preferred_loader_file_format = "jsonl"
caps.supported_loader_file_formats = ["jsonl"]

caps.max_identifier_length = 200
caps.max_column_identifier_length = 1024
caps.max_query_length = 8 * 1024 * 1024
caps.is_max_query_length_in_bytes = False
caps.max_text_data_type_length = 8 * 1024 * 1024
caps.is_max_text_data_type_length_in_bytes = False
caps.supports_ddl_transactions = False

caps.decimal_precision = (38, 18)
caps.timestamp_precision = 6

return caps

@property
def client_class(self) -> t.Type["LanceDBClient"]:
from dlt.destinations.impl.lancedb.lancedb_client import LanceDBClient

return LanceDBClient

def __init__(
self,
credentials: t.Union[LanceDBCredentials, t.Dict[str, t.Any]] = None,
destination_name: t.Optional[str] = None,
environment: t.Optional[str] = None,
**kwargs: t.Any,
) -> None:
super().__init__(
credentials=credentials,
destination_name=destination_name,
environment=environment,
**kwargs,
)
58 changes: 58 additions & 0 deletions dlt/destinations/impl/lancedb/lancedb_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from typing import Any

from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns
from dlt.destinations.utils import ensure_resource
from dlt.extract import DltResource


VECTORIZE_HINT = "x-lancedb-embed"


def lancedb_adapter(
data: Any,
embed: TColumnNames = None,
) -> DltResource:
"""Prepares data for the LanceDB destination by specifying which columns should be embedded.
Args:
data (Any): The data to be transformed. It can be raw data or an instance
of DltResource. If raw data, the function wraps it into a DltResource
object.
embed (TColumnNames, optional): Specify columns to generate embeddings for.
It can be a single column name as a string, or a list of column names.
Returns:
DltResource: A resource with applied LanceDB-specific hints.
Raises:
ValueError: If input for `embed` invalid or empty.
Examples:
>>> data = [{"name": "Marcel", "description": "Moonbase Engineer"}]
>>> lancedb_adapter(data, embed="description")
[DltResource with hints applied]
"""
resource = ensure_resource(data)

column_hints: TTableSchemaColumns = {}

if embed:
if isinstance(embed, str):
embed = [embed]
if not isinstance(embed, list):
raise ValueError(
"'embed' must be a list of column names or a single column name as a string."
)

for column_name in embed:
column_hints[column_name] = {
"name": column_name,
VECTORIZE_HINT: True, # type: ignore[misc]
}

if not column_hints:
raise ValueError("A value for 'embed' must be specified.")
else:
resource.apply_hints(columns=column_hints)

return resource
Loading

0 comments on commit 78cdb0b

Please sign in to comment.