Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Notebook-friendly connectors as importable classes #2685

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,7 @@ config.yml

**/venv
scripts/stack/connectors-config

# package related
package/elastic_connectors
package/dist
31 changes: 30 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ bin/elastic-ingest: bin/python
bin/black: bin/python
bin/pip install -r requirements/$(ARCH).txt
bin/pip install -r requirements/tests.txt


bin/pytest: bin/python
bin/pip install -r requirements/$(ARCH).txt
Expand Down Expand Up @@ -85,3 +84,33 @@ docker-run:

docker-push:
docker push $(DOCKER_IMAGE_NAME):$(VERSION)-SNAPSHOT

bin/package-dev: requirements/package-dev.txt
bin/pip install -r requirements/$(ARCH).txt
bin/pip install -r requirements/package-dev.txt

generate-connector-package-code: bin/package-dev
bin/python scripts/package/codegen/generate_connectors.py
bin/python scripts/package/codegen/generate_connectors_init.py

# Move everything under `elastic_connectors` temporary folder
generate-connector-package: generate-connector-package-code
mkdir -p package/elastic_connectors
cp -r package/* package/elastic_connectors
rm -rf package/elastic_connectors/elastic_connectors
cp -r connectors requirements package/elastic_connectors
bin/python scripts/package/update_imports.py

# Clean temporary folder and distribution files
clean-connector-package:
cd package && rm -rf elastic_connectors build dist *.egg-info

generate-connector-package-docs:
PYTHONPATH=./package lazydocs generated --remove-package-prefix --no-watermark --output-path package/docs --overview-file README

# Build the connector package
build-connector-package: clean-connector-package generate-connector-package generate-connector-package-docs
cd package && ../bin/python setup.py sdist bdist_wheel

publish-connector-package: build-connector-package
cd package && twine upload --repository testpypi dist/*
63 changes: 63 additions & 0 deletions package/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# elastic-connectors

## Overview

`elastic-connectors` is an async-first Python package that provides connectors to various third-party services. Each connector class in this package exposes an asynchronous method to fetch documents from the third-party service.

## Installation

To install the package, use pip:

```bash
pip install elastic-connectors
```

## Usage

### Importing a Connector
Each connector module can be imported as follows:

```python
from elastic_connectors import SharepointOnlineConnector
```

### Constructor
The constructor for each connector module requires arguments relevant to the third-party integration along with optional parameters:

- `logger` (logging.Logger, optional): Logger instance. Defaults to None.
- `download_content` (bool, optional): Flag to determine if content should be downloaded. Defaults to True.

### Methods

Each connector module exposes the following asynchronous method to fetch the data from a 3rd party source:

```python
async def async_get_docs(self) -> AsyncIterator[Dict]:
"""
Asynchronously retrieves documents from the third-party service.

Yields:
AsyncIterator[Dict]: An asynchronous iterator of dictionaries containing document data.
"""
```

### Example
Below is an example demonstrating how to use a connector module

```python
docs = []

async with SharepointOnlineConnector(
tenant_id=SPO_TENANT_ID,
tenant_name=SPO_TENANT_NAME,
client_id=SPO_CLIENT_ID,
secret_value=SPO_CLIENT_SECRET
) as connector:
spo_docs = []
async for doc in connector.async_get_docs():
spo_docs.append(doc)
```

### API overview

See [API overview](./docs/README.md) with all available connectors.
99 changes: 99 additions & 0 deletions package/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
#
# This is a generated code. Do not modify directly.
# Run `make generate_connector_package` to update.


from .generated.azure_blob_storage import AzureBlobStorageConnector

from .generated.box import BoxConnector

from .generated.confluence import ConfluenceConnector

from .generated.dir import DirectoryConnector

from .generated.dropbox import DropboxConnector

from .generated.github import GitHubConnector

from .generated.gmail import GMailConnector

from .generated.google_cloud_storage import GoogleCloudStorageConnector

from .generated.google_drive import GoogleDriveConnector

from .generated.graphql import GraphQLConnector

from .generated.jira import JiraConnector

from .generated.microsoft_teams import MicrosoftTeamsConnector

from .generated.mongodb import MongoConnector

from .generated.mssql import MSSQLConnector

from .generated.mysql import MySqlConnector

from .generated.network_drive import NASConnector

from .generated.notion import NotionConnector

from .generated.onedrive import OneDriveConnector

from .generated.oracle import OracleConnector

from .generated.outlook import OutlookConnector

from .generated.postgresql import PostgreSQLConnector

from .generated.redis import RedisConnector

from .generated.s3 import S3Connector

from .generated.salesforce import SalesforceConnector

from .generated.servicenow import ServiceNowConnector

from .generated.sharepoint_online import SharepointOnlineConnector

from .generated.sharepoint_server import SharepointServerConnector

from .generated.slack import SlackConnector

from .generated.zoom import ZoomConnector


__all__ = [
"AzureBlobStorageConnector",
"BoxConnector",
"ConfluenceConnector",
"DirectoryConnector",
"DropboxConnector",
"GitHubConnector",
"GMailConnector",
"GoogleCloudStorageConnector",
"GoogleDriveConnector",
"GraphQLConnector",
"JiraConnector",
"MicrosoftTeamsConnector",
"MongoConnector",
"MSSQLConnector",
"MySqlConnector",
"NASConnector",
"NotionConnector",
"OneDriveConnector",
"OracleConnector",
"OutlookConnector",
"PostgreSQLConnector",
"RedisConnector",
"S3Connector",
"SalesforceConnector",
"ServiceNowConnector",
"SharepointOnlineConnector",
"SharepointServerConnector",
"SlackConnector",
"ZoomConnector",
]
142 changes: 142 additions & 0 deletions package/connector_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#

import base64
import logging
from typing import AsyncIterator, Dict

from tika import parser

from connectors.es.settings import TIMESTAMP_FIELD


def _extract_content_with_tika(b64_content: str) -> str:
"""
Extracts text content from a base64-encoded binary content using Tika.

Args:
b64_content (str): Base64 encoded content.

Returns:
str: Extracted text content.
"""
binary_data = base64.b64decode(b64_content)

# Parse the binary data using Tika
parsed = parser.from_buffer(binary_data)

# Extract text and metadata
text = parsed.get("content", "")

return text


class ConnectorBase:
def __init__(self, data_provider, logger=None, download_content=True):
"""
Initializes the ConnectorBase instance.

Args:
data_provider: An instance of the data provider.
logger (logging.Logger, optional): Logger instance. Defaults to None.
download_content (bool, optional): Flag to determine if content should be downloaded. Defaults to True.
"""
self.data_provider = data_provider
self.download_content = download_content

if logger is None:
logger = logging.getLogger("elastic-connectors")
self.logger = logger
self.data_provider.set_logger(logger)

def get_configuration(self):
"""
Gets the configuration from the data provider.

Returns:
The configuration of the data provider.
"""
return self.data_provider.configuration

async def validate(self):
"""
Validates the data provider configuration and pings the data provider.

Raises:
Exception: If validation or ping fails.
"""
try:
await self.data_provider.validate_config()
await self.ping()
except Exception as e:
self.logger.error("Validation failed", exc_info=True)
raise e

async def ping(self):
"""
Pings the data provider.

Raises:
Exception: If ping fails.
"""
try:
return await self.data_provider.ping()
except Exception as e:
self.logger.error("Ping failed", exc_info=True)
raise e

async def async_get_docs(self) -> AsyncIterator[Dict]:
"""
Asynchronously retrieves documents from the data provider.

Yields:
dict: A document from the data provider.

Raises:
Exception: If an error occurs while extracting content.
"""
async for doc, lazy_download in self.data_provider.get_docs(filtering=None):

doc["id"] = doc.pop("_id")

if lazy_download is not None and self.download_content:
# content downloaded and represented in a binary format {'_attachment': <binary data>}
try:
data = await lazy_download(
doit=True, timestamp=doc[TIMESTAMP_FIELD]
)
# binary to string conversion
binary_data = data.get("_attachment", None)

text = _extract_content_with_tika(binary_data)

doc.update({"body": text})
except Exception as e:
print(f"Error extracting content: {e}")

yield doc

async def close(self):
"""
Closes the data provider connection.
"""
await self.data_provider.close()

async def __aenter__(self):
"""
Asynchronous context manager entry. Validates the configuration.

Returns:
ConnectorBase: The instance itself.
"""
await self.validate()
return self

async def __aexit__(self, exc_type, exc_value, traceback):
"""
Asynchronous context manager exit. Closes the data provider connection.
"""
await self.close()
4 changes: 4 additions & 0 deletions package/docs/.pages
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
title: API Reference
nav:
- Overview: README.md
- ...
Loading