Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1/n][dagster-airbyte] Scaffold AirbyteCloudWorkspace and AirbyteClient for rework #26204

Merged
merged 5 commits into from
Dec 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
get_dagster_logger,
resource,
)
from dagster._annotations import experimental
from dagster._config.pythonic_config import infer_schema_from_config_class
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster._record import record
from dagster._serdes.serdes import whitelist_for_serdes
from dagster._utils.cached_method import cached_method
from dagster._utils.merger import deep_merge_dicts
from pydantic import Field, PrivateAttr
Expand Down Expand Up @@ -791,3 +794,115 @@ def airbyte_cloud_resource(context) -> AirbyteCloudResource:

"""
return AirbyteCloudResource.from_resource_context(context)


# -------------
# Resources v2
# -------------


@whitelist_for_serdes
@record
class AirbyteConnection:
"""Represents an Airbyte connection, based on data as returned from the API."""

@classmethod
def from_connection_details(
cls,
connection_details: Mapping[str, Any],
) -> "AirbyteConnection":
raise NotImplementedError()


@whitelist_for_serdes
@record
class AirbyteDestination:
"""Represents an Airbyte destination, based on data as returned from the API."""

@classmethod
def from_destination_details(
cls,
destination_details: Mapping[str, Any],
) -> "AirbyteDestination":
raise NotImplementedError()


@record
class AirbyteWorkspaceData:
Copy link
Contributor Author

@maximearmstrong maximearmstrong Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The workspace data is not specific to Airbyte Cloud, which is why this object is named AirbyteWorkspaceData

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feels worth mentioning that in the docstring, that it applies to both airbyte OSS and airbyte cloud potentially? Or at least a comment

"""A record representing all content in an Airbyte workspace.
This applies to both Airbyte OSS and Cloud.
"""

connections_by_id: Mapping[str, AirbyteConnection]
destinations_by_id: Mapping[str, AirbyteDestination]


@experimental
class AirbyteCloudClient:
"""This class exposes methods on top of the Airbyte APIs for Airbyte Cloud."""

_access_token_value: Optional[str] = PrivateAttr(default=None)
_access_token_timestamp: Optional[float] = PrivateAttr(default=None)

def __init__(
self,
workspace_id: str,
client_id: str,
client_secret: str,
):
self.workspace_id = workspace_id
self.client_id = client_id
self.client_secret = client_secret

@property
@cached_method
def _log(self) -> logging.Logger:
return get_dagster_logger()

@property
def api_base_url(self) -> str:
raise NotImplementedError()

def _make_request(
self, method: str, endpoint: str, data: Optional[str] = None
) -> Mapping[str, Any]:
raise NotImplementedError()

def get_connections(self) -> Mapping[str, Any]:
"""Fetches all connections of an Airbyte workspace from the Airbyte API."""
raise NotImplementedError()

def get_destination_details(self, destination_id: str) -> Mapping[str, Any]:
"""Fetches details about a given destination from the Airbyte API."""
raise NotImplementedError()


@experimental
class AirbyteCloudWorkspace(ConfigurableResource):
"""This class represents a Airbyte Cloud workspace and provides utilities
to interact with Airbyte APIs.
"""

workspace_id: str = Field(..., description="The Airbyte Cloud workspace ID")
client_id: str = Field(..., description="The Airbyte Cloud client ID.")
client_secret: str = Field(..., description="The Airbyte Cloud client secret.")

_client: AirbyteCloudClient = PrivateAttr(default=None)

@cached_method
def get_client(self) -> AirbyteCloudClient:
return AirbyteCloudClient(
workspace_id=self.workspace_id,
client_id=self.client_id,
client_secret=self.client_secret,
)

def fetch_airbyte_workspace_data(
self,
) -> AirbyteWorkspaceData:
"""Retrieves all Airbyte content from the workspace and returns it as a AirbyteWorkspaceData object.

Returns:
AirbyteWorkspaceData: A snapshot of the Airbyte workspace's content.
"""
raise NotImplementedError()