Skip to content

Commit

Permalink
feat: Qdrant destination support (#724)
Browse files Browse the repository at this point in the history
  • Loading branch information
Anush008 authored Nov 7, 2023
1 parent 7f2f042 commit e92cf75
Show file tree
Hide file tree
Showing 14 changed files with 5,230 additions and 3,778 deletions.
82 changes: 82 additions & 0 deletions .github/workflows/test_destination_qdrant.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
name: test Qdrant

on:
pull_request:
branches:
- master
- devel
workflow_dispatch:

env:
DESTINATION__QDRANT__CREDENTIALS__LOCATION: ${{ secrets.DESTINATION__QDRANT__CREDENTIALS__LOCATION }}
DESTINATION__QDRANT__CREDENTIALS__API_KEY: ${{ secrets.DESTINATION__QDRANT__CREDENTIALS__API_KEY }}

RUNTIME__SENTRY_DSN: https://[email protected]/4504819859914752
RUNTIME__LOG_LEVEL: ERROR

ACTIVE_DESTINATIONS: "[\"qdrant\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"

jobs:
get_docs_changes:
uses: ./.github/workflows/get_docs_changes.yml
if: ${{ !github.event.pull_request.head.repo.fork }}

run_loader:
name: Tests Qdrant loader
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
strategy:
fail-fast: false
matrix:
os: ["ubuntu-latest", "macos-latest", "windows-latest"]
defaults:
run:
shell: bash
runs-on: ${{ matrix.os }}

steps:
- name: Check out
uses: actions/checkout@master

- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: "3.10.x"

- name: Install Poetry
uses: snok/[email protected]
with:
virtualenvs-create: true
virtualenvs-in-project: true
installer-parallel: true

- name: Load cached venv
id: cached-poetry-dependencies
uses: actions/cache@v3
with:
path: .venv
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-gcp

- name: Install dependencies
run: poetry install --no-interaction -E qdrant -E parquet
- run: |
poetry run pytest tests/load/
if: runner.os != 'Windows'
name: Run tests Linux/MAC
- run: |
poetry run pytest tests/load/
if: runner.os == 'Windows'
name: Run tests Windows
shell: cmd
matrix_job_required_check:
name: Qdrant loader tests
needs: run_loader
runs-on: ubuntu-latest
if: always()
steps:
- name: Check matrix job results
if: contains(needs.*.result, 'failure') || contains(needs.*.result, 'cancelled')
run: |
echo "One or more matrix job tests failed or were cancelled. You may need to re-run them." && exit 1
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,7 @@ logs/

# temp
tmp
**/tmp
**/tmp

# Qdrant embedding models cache
local_cache/
53 changes: 53 additions & 0 deletions dlt/destinations/qdrant/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from typing import Type

from dlt.common.schema.schema import Schema
from dlt.common.configuration import with_config, known_sections
from dlt.common.configuration.accessors import config
from dlt.common.destination.reference import (
JobClientBase,
DestinationClientConfiguration,
)
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.destinations.qdrant.qdrant_adapter import qdrant_adapter

from dlt.destinations.qdrant.configuration import QdrantClientConfiguration


@with_config(
spec=QdrantClientConfiguration,
sections=(
known_sections.DESTINATION,
"qdrant",
),
)
def _configure(
config: QdrantClientConfiguration = config.value,
) -> QdrantClientConfiguration:
return config


def capabilities() -> DestinationCapabilitiesContext:
caps = DestinationCapabilitiesContext()
caps.preferred_loader_file_format = "jsonl"
caps.supported_loader_file_formats = ["jsonl"]

caps.max_identifier_length = 200
caps.max_column_identifier_length = 1024
caps.max_query_length = 8 * 1024 * 1024
caps.is_max_query_length_in_bytes = False
caps.max_text_data_type_length = 8 * 1024 * 1024
caps.is_max_text_data_type_length_in_bytes = False
caps.supports_ddl_transactions = False

return caps


def client(
schema: Schema, initial_config: DestinationClientConfiguration = config.value
) -> JobClientBase:
from dlt.destinations.qdrant.qdrant_client import QdrantClient
return QdrantClient(schema, _configure(initial_config)) # type: ignore


def spec() -> Type[QdrantClientConfiguration]:
return QdrantClientConfiguration
79 changes: 79 additions & 0 deletions dlt/destinations/qdrant/configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from typing import Optional, Final

from dlt.common.configuration import configspec
from dlt.common.configuration.specs.base_configuration import BaseConfiguration, CredentialsConfiguration
from dlt.common.destination.reference import DestinationClientDwhConfiguration


@configspec
class QdrantCredentials(CredentialsConfiguration):
# If `:memory:` - use in-memory Qdrant instance.
# If `str` - use it as a `url` parameter.
# If `None` - use default values for `host` and `port`
location: Optional[str] = None
# API key for authentication in Qdrant Cloud. Default: `None`
api_key: Optional[str]

def __str__(self) -> str:
return self.location or "localhost"


@configspec
class QdrantClientOptions(BaseConfiguration):
# Port of the REST API interface. Default: 6333
port: int = 6333
# Port of the gRPC interface. Default: 6334
grpc_port: int = 6334
# If `true` - use gPRC interface whenever possible in custom methods
prefer_grpc: bool = False
# If `true` - use HTTPS(SSL) protocol. Default: `None`
https: bool = False
# If not `None` - add `prefix` to the REST URL path.
# Example: `service/v1` will result in `http://localhost:6333/service/v1/{qdrant-endpoint}` for REST API.
# Default: `None`
prefix: Optional[str] = None
# Timeout for REST and gRPC API requests.
# Default: 5.0 seconds for REST and unlimited for gRPC
timeout: Optional[int] = None
# Host name of Qdrant service. If url and host are None, set to 'localhost'.
# Default: `None`
host: Optional[str] = None
# Persistence path for QdrantLocal. Default: `None`
path: Optional[str] = None


@configspec
class QdrantClientConfiguration(DestinationClientDwhConfiguration):
destination_name: Final[str] = "qdrant" # type: ignore
# character for the dataset separator
dataset_separator: str = "_"

# make it optional do empty dataset is allowed
dataset_name: Optional[str] = None # type: ignore

# Batch size for generating embeddings
embedding_batch_size: int = 32
# Number of parallel processes for generating embeddings
embedding_parallelism: int = 0

# Batch size for uploading embeddings
upload_batch_size: int = 64
# Number of parallel processes for uploading embeddings
upload_parallelism: int = 1
# Number of retries for uploading embeddings
upload_max_retries: int = 3

# Qdrant client options
options: QdrantClientOptions

# Qdrant connection credentials
credentials: QdrantCredentials

# FlagEmbedding model to use
# Find the list here. https://qdrant.github.io/fastembed/examples/Supported_Models/.
model: str = "BAAI/bge-small-en"

def fingerprint(self) -> str:
"""Returns a fingerprint of a connection string"""

return self.credentials.location
67 changes: 67 additions & 0 deletions dlt/destinations/qdrant/qdrant_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from typing import Any

from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns
from dlt.extract.decorators import resource as make_resource
from dlt.extract.source import DltResource

VECTORIZE_HINT = "x-qdrant-embed"

def qdrant_adapter(
data: Any,
embed: TColumnNames = None,
) -> DltResource:
"""Prepares data for the Qdrant destination by specifying which columns
should be embedded.
Args:
data (Any): The data to be transformed. It can be raw data or an instance
of DltResource. If raw data, the function wraps it into a DltResource
object.
embed (TColumnNames, optional): Specifies columns to generate embeddings for.
Can be a single column name as a string or a list of column names.
Returns:
DltResource: A resource with applied qdrant-specific hints.
Raises:
ValueError: If input for `embed` invalid or empty.
Examples:
>>> data = [{"name": "Anush", "description": "Integrations Hacker"}]
>>> qdrant_adapter(data, embed="description")
[DltResource with hints applied]
"""
# wrap `data` in a resource if not an instance already
resource: DltResource
if not isinstance(data, DltResource):
resource_name: str = None
if not hasattr(data, "__name__"):
resource_name = "content"
resource = make_resource(data, name=resource_name)
else:
resource = data

column_hints: TTableSchemaColumns = {}

if embed:
if isinstance(embed, str):
embed = [embed]
if not isinstance(embed, list):
raise ValueError(
"embed must be a list of column names or a single "
"column name as a string"
)

for column_name in embed:
column_hints[column_name] = {
"name": column_name,
VECTORIZE_HINT: True, # type: ignore
}

if not column_hints:
raise ValueError(
"A value for 'embed' must be specified.")
else:
resource.apply_hints(columns=column_hints)

return resource
Loading

0 comments on commit e92cf75

Please sign in to comment.