Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

πŸŽ‰ New Destination deepset #48875

Draft
wants to merge 34 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
7edf0fa
feat: add new destination for deepset cloud
abrahamy Dec 10, 2024
f97568e
chore: format code
abrahamy Dec 10, 2024
9d48dc4
bump base image version
abrahamy Dec 10, 2024
1b6afc5
add spec, icon and the config model
abrahamy Dec 10, 2024
9c80988
implement api for health check and file uploads
abrahamy Dec 11, 2024
bf9e3bd
wip: implement destination check interface
abrahamy Dec 11, 2024
5448b32
Merge branch 'master' into feat/new-destination-deepset
abrahamy Dec 11, 2024
d527120
chore: format code
abrahamy Dec 11, 2024
c82e2d3
chore: format code
abrahamy Dec 11, 2024
e621354
refactor: cleanup client initialization
abrahamy Dec 11, 2024
cd7a7fe
code: implement destination class and writer
abrahamy Dec 12, 2024
0e027a5
chore: format code
abrahamy Dec 12, 2024
ed6cd03
fix: from_destination_sync_mode on WriteMode enum
abrahamy Dec 12, 2024
8212c49
chore: format code
abrahamy Dec 12, 2024
61060c9
Merge branch 'master' into feat/new-destination-deepset
abrahamy Dec 16, 2024
a5765ff
Add support for SourceMicrosoftSharepoint document file type format
abrahamy Dec 16, 2024
8afa444
chore: format fix
abrahamy Dec 16, 2024
c54e9c2
chore: format code
abrahamy Dec 16, 2024
be20db3
rename models
abrahamy Dec 16, 2024
6059666
reduce default retry count
abrahamy Dec 16, 2024
1ebf237
cleanup
abrahamy Dec 16, 2024
5dd8d46
refactor meta
abrahamy Dec 16, 2024
0dad431
test api health check endpoint
abrahamy Dec 16, 2024
35cb826
chore: format code
abrahamy Dec 16, 2024
a22528c
Merge branch 'master' into feat/new-destination-deepset
abrahamy Dec 16, 2024
6eb3912
Merge branch 'master' into feat/new-destination-deepset
abrahamy Dec 17, 2024
03f20fd
add util tests
abrahamy Dec 17, 2024
9cbaae6
chore: format code
abrahamy Dec 17, 2024
5056560
fix: from record
abrahamy Dec 17, 2024
adae6d6
Add tests for writer
abrahamy Dec 17, 2024
84bbae2
add dedicated methods for create trace and log messages
abrahamy Dec 17, 2024
b2a3595
cover more test cases
abrahamy Dec 17, 2024
b900b5f
add tests for destination
abrahamy Dec 17, 2024
97d47d4
Merge branch 'master' into feat/new-destination-deepset
abrahamy Dec 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
103 changes: 103 additions & 0 deletions airbyte-integrations/connectors/destination-deepset/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Deepset Destination

This is the repository for the Deepset destination connector, written in Python.
For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.com/integrations/destinations/deepset).

## Local development

### Prerequisites

* Python (`^3.9`)
* Poetry (`^1.7`) - installation instructions [here](https://python-poetry.org/docs/#installation)



### Installing the connector

From this connector directory, run:
```bash
poetry install --with dev
```


#### Create credentials

**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.com/integrations/destinations/deepset)
to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `destination_deepset/spec.json` file.
Note that the `secrets` directory is gitignored by default, so there is no danger of accidentally checking in sensitive information.
See `integration_tests/sample_config.json` for a sample config file.

**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `destination deepset test creds`
and place them into `secrets/config.json`.

### Locally running the connector
```
poetry run destination-deepset spec
poetry run destination-deepset check --config secrets/config.json
poetry run destination-deepset write --config secrets/config.json --catalog sample_files/configured_catalog.json
```

### Running tests

To run tests locally, from the connector directory run:

```
poetry run pytest tests
```

### Building the docker image

1. Install [`airbyte-ci`](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md)
2. Run the following command to build the docker image:
```bash
airbyte-ci connectors --name=destination-deepset build
```

An image will be available on your host with the tag `airbyte/destination-deepset:dev`.

### Running as a docker container

Then run any of the connector commands as follows:
```
docker run --rm airbyte/destination-deepset:dev spec
docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-deepset:dev check --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-deepset:dev write --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
```

### Running our CI test suite

You can run our full test suite locally using [`airbyte-ci`](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md):

```bash
airbyte-ci connectors --name=destination-deepset test
```

### Customizing acceptance Tests

Customize `acceptance-test-config.yml` file to configure acceptance tests. See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference) for more information.
If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py.

### Dependency Management

All of your dependencies should be managed via Poetry.
To add a new dependency, run:

```bash
poetry add <package-name>
```

Please commit the changes to `pyproject.toml` and `poetry.lock` files.

## Publishing a new version of the connector

You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
1. Make sure your changes are passing our test suite: `airbyte-ci connectors --name=destination-deepset test`
2. Bump the connector version (please follow [semantic versioning for connectors](https://docs.airbyte.com/contributing-to-airbyte/resources/pull-requests-handbook/#semantic-versioning-for-connectors)):
- bump the `dockerImageTag` value in in `metadata.yaml`
- bump the `version` value in `pyproject.toml`
3. Make sure the `metadata.yaml` content is up to date.
4. Make sure the connector documentation and its changelog is up to date (`docs/integrations/destinations/deepset.md`).
5. Create a Pull Request: use [our PR naming conventions](https://docs.airbyte.com/contributing-to-airbyte/resources/pull-requests-handbook/#pull-request-title-convention).
6. Pat yourself on the back for being an awesome contributor.
7. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master.
8. Once your PR is merged, the new version of the connector will be automatically published to Docker Hub and our connector registry.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
from __future__ import annotations

from .destination import DestinationDeepset


__all__ = ["DestinationDeepset"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from __future__ import annotations

from uuid import UUID

import httpx
from destination_deepset.models import SUPPORTED_FILE_EXTENSIONS, DeepsetCloudConfig, DeepsetCloudFile, WriteMode
from httpx import HTTPError, HTTPStatusError
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_random_exponential


class APIError(RuntimeError):
pass


class ConfigurationError(ValueError, APIError):
"""Raised when the configuration is missing or incorrect."""

def __str__(self) -> str:
return "Configuration is missing, cannot create an HTTP client."


class FileTypeError(APIError):
"""Raised when the file's extension does not match one of the supported file types."""

def __str__(self) -> str:
return f"File type not supported. Supported file extensions: {SUPPORTED_FILE_EXTENSIONS}"


class FileUploadError(APIError):
"""Raised when the server is unable to successfully upload the file."""

def __str__(self) -> str:
return "File upload failed."


class DeepsetCloudApi:
def __init__(self, config: DeepsetCloudConfig) -> None:
self.config = config
self._client: httpx.Client | None = None

# retry settings in seconds
self.max = 60
self.multiplier = 0.5

@property
def client(self) -> httpx.Client:
if not self.config:
raise ConfigurationError

if self._client is None:
self._client = httpx.Client(
base_url=self.config.base_url,
headers={
"Accept": "application/json",
"Authorization": f"Bearer {self.config.api_key}",
"X-Client-Source": "deepset-cloud-airbyte",
},
follow_redirects=True,
)

return self._client

def health_check(self) -> bool:
"""Check the health of deepset cloud API

Returns:
bool: Returns `True` if the health check was successful, `False` otherwise
"""
try:
with retry(
retry=retry_if_exception_type(HTTPError),
stop=stop_after_attempt(self.config.retries),
wait=wait_random_exponential(multiplier=self.multiplier, max=self.max),
reraise=True,
):
response = self.client.get("/health")
response.raise_for_status()
except Exception:
return False
else:
return True

def upload(self, file: DeepsetCloudFile, write_mode: WriteMode = WriteMode.KEEP) -> UUID:
"""Upload file to deepset Cloud.

Args:
file (DeepsetCloudFile): The file to upload

Raises:
APIError: Raised whenever the file upload fails

Returns:
UUID: The unique identifier of the uploaded file
"""
if file.extension not in SUPPORTED_FILE_EXTENSIONS:
raise FileTypeError

try:
with retry(
retry=retry_if_exception_type(HTTPError),
stop=stop_after_attempt(self.config.retries),
wait=wait_random_exponential(multiplier=self.multiplier, max=self.max),
reraise=True,
):
response = self.client.post(
f"/api/v1/workspaces/{self.config.workspace}/files",
files={"file": (file.name, file.content)},
data={"meta": file.meta_as_string},
params={"write_mode": write_mode},
)
response.raise_for_status()

if file_id := response.json().get("file_id"):
return UUID(file_id)

except HTTPStatusError as ex:
status_code, response_text = ex.response.status_code, ex.response.text
message = f"File upload failed: {status_code = }, {response_text = }."
raise FileUploadError(message) from ex
except Exception as ex:
raise FileUploadError from ex

raise FileUploadError
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
from __future__ import annotations

import logging
from collections.abc import Iterable, Mapping
from typing import Any

from airbyte_cdk.destinations import Destination
from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, Status, Type
from destination_deepset.api import DeepsetCloudApi
from destination_deepset.models import DeepsetCloudConfig, WriteMode
from destination_deepset.writer import DeepsetCloudFileWriter, WriterError

logger = logging.getLogger("airbyte")


class DestinationDeepset(Destination):
def get_deepset_cloud_api(self, config: Mapping[str, Any]) -> DeepsetCloudApi:
deepset_cloud_config = DeepsetCloudConfig.parse_obj(config)
return DeepsetCloudApi(deepset_cloud_config)

def write(
self,
config: Mapping[str, Any],
configured_catalog: ConfiguredAirbyteCatalog,
input_messages: Iterable[AirbyteMessage],
) -> Iterable[AirbyteMessage]:
"""Reads the input stream of messages, config, and catalog to write data to the destination.

This method returns an iterable (typically a generator of AirbyteMessages via yield) containing state messages
received in the input message stream. Outputting a state message means that every AirbyteRecordMessage which
came before it has been successfully persisted to the destination. This is used to ensure fault tolerance in the
case that a sync fails before fully completing, then the source is given the last state message output from this
method as the starting point of the next sync.

Args:
config (Mapping[str, Any]): dict of JSON configuration matching the configuration declared in spec.json
configured_catalog (ConfiguredAirbyteCatalog): The Configured Catalog describing the schema of the data
being received and how it should be persisted in the destination
input_messages (Iterable[AirbyteMessage]): The stream of input messages received from the source

Returns:
Iterable[AirbyteMessage]: Iterable of AirbyteStateMessages wrapped in AirbyteMessage structs
"""
writer = DeepsetCloudFileWriter.factory(config=config)
streams = {s.stream.name: s.destination_sync_mode for s in configured_catalog.streams}

for message in input_messages:
match message.type:
case Type.STATE:
yield message
case Type.RECORD:
if (stream := message.record.stream) not in streams:
logger.debug(f"Stream {stream} was not present in configured streams, skipping")
continue

destination_sync_mode = streams[stream]
write_mode = WriteMode.from_destination_sync_mode(destination_sync_mode)

yield writer.write(message=message, write_mode=write_mode)
case _:
continue

def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
"""Tests if the input configuration can be used to successfully connect to the destination with the needed
permissions e.g: if a provided API token or password can be used to connect and write to the destination.

Args:
logger (logging.Logger): Logging object to display debug/info/error to the logs (logs will not be accessible
via airbyte UI if they are not passed to this logger)
config (Mapping[str, Any]): Json object containing the configuration of this destination, content of this
json is as specified in the properties of the spec.json file

Returns:
AirbyteConnectionStatus: AirbyteConnectionStatus indicating a Success or Failure
"""
try:
writer = DeepsetCloudFileWriter.factory(config=config)
except WriterError:
logger.exception("Failed to initialize writer!")
else:
if writer.client.health_check():
return AirbyteConnectionStatus(status=Status.SUCCEEDED)

return AirbyteConnectionStatus(status=Status.FAILED, message="Connection is down.")
Loading
Loading