Skip to content

Commit

Permalink
Regression tests: run with airbyte-ci (#37440)
Browse files Browse the repository at this point in the history
clnoll authored Apr 25, 2024

Verified

This commit was signed with the committer’s verified signature.
shorwood Stanley Horwood
1 parent c4ad3d9 commit 7bd0324
Showing 44 changed files with 828 additions and 544 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/airbyte-ci-tests.yml
Original file line number Diff line number Diff line change
@@ -99,7 +99,7 @@ jobs:
gcs_credentials: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }}
git_branch: ${{ github.head_ref }}
git_revision: ${{ steps.fetch_last_commit_id_pr.outputs.commit_id }}
github_token: ${{ github.token }}
github_token: ${{ secrets.GH_PAT_MAINTENANCE_OSS }}
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
subcommand: "test --modified"

@@ -116,6 +116,6 @@ jobs:
gcs_credentials: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }}
git_branch: ${{ steps.extract_branch.outputs.branch }}
git_revision: ${{ steps.fetch_last_commit_id_pr.outputs.commit_id }}
github_token: ${{ github.token }}
github_token: ${{ secrets.GH_PAT_MAINTENANCE_OSS }}
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
subcommand: "test ${{ inputs.airbyte_ci_subcommand}}"
3 changes: 3 additions & 0 deletions airbyte-ci/connectors/live-tests/README.md
Original file line number Diff line number Diff line change
@@ -237,6 +237,9 @@ The traffic recorded on the control connector is passed to the target connector

## Changelog

### 0.16.0
Enable running with airbyte-ci.

### 0.15.0
Automatic retrieval of connection objects for regression tests. The connection id is not required anymore.

337 changes: 170 additions & 167 deletions airbyte-ci/connectors/live-tests/poetry.lock

Large diffs are not rendered by default.

9 changes: 6 additions & 3 deletions airbyte-ci/connectors/live-tests/pyproject.toml
Original file line number Diff line number Diff line change
@@ -57,8 +57,11 @@ select = ["I", "F"]
known-first-party = ["connection-retriever"]

[tool.poe.tasks]
format = "ruff format src"
test = "pytest tests"
lint = "ruff check src"
type_check = "mypy src --disallow-untyped-defs"
pre-push = ["format", "lint", "test", "type_check"]
pre-push = []

[tool.airbyte_ci]
optional_poetry_groups = ["dev"]
poe_tasks = []
required_environment_variables = ["DOCKER_HUB_USERNAME", "DOCKER_HUB_PASSWORD"]
1 change: 1 addition & 0 deletions airbyte-ci/connectors/live-tests/src/live_tests/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
from __future__ import annotations

import asyncclick as click
from live_tests.debug.cli import debug_cmd
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
from __future__ import annotations

from .base_backend import BaseBackend
from .duckdb_backend import DuckDbBackend
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import Iterable
from collections.abc import Iterable

from airbyte_protocol.models import AirbyteMessage # type: ignore

Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

from __future__ import annotations

import logging
import re
from collections.abc import Iterable
from pathlib import Path
from typing import Iterable, Optional
from typing import Optional

import duckdb
from airbyte_protocol.models import AirbyteMessage # type: ignore
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
from __future__ import annotations

import json
import logging
from collections.abc import Iterable
from pathlib import Path
from typing import Any, Dict, Iterable, TextIO, Tuple
from typing import Any, TextIO

from airbyte_protocol.models import AirbyteMessage # type: ignore
from airbyte_protocol.models import Type as AirbyteMessageType
@@ -13,7 +15,7 @@


class FileDescriptorLRUCache(LRUCache):
def popitem(self) -> Tuple[Any, Any]:
def popitem(self) -> tuple[Any, Any]:
filepath, fd = LRUCache.popitem(self)
fd.close() # type: ignore # Close the file descriptor when it's evicted from the cache
return filepath, fd
@@ -34,8 +36,8 @@ def __init__(self, output_directory: Path):
self._output_directory = output_directory
self.record_per_stream_directory = self._output_directory / "records_per_stream"
self.record_per_stream_directory.mkdir(exist_ok=True, parents=True)
self.record_per_stream_paths: Dict[str, Path] = {}
self.record_per_stream_paths_data_only: Dict[str, Path] = {}
self.record_per_stream_paths: dict[str, Path] = {}
self.record_per_stream_paths_data_only: dict[str, Path] = {}

@property
def jsonl_specs_path(self) -> Path:
@@ -101,14 +103,14 @@ def _open_file(path: Path) -> TextIO:
if not isinstance(_message, AirbyteMessage):
continue
filepaths, messages = self._get_filepaths_and_messages(_message)
for filepath, message in zip(filepaths, messages):
for filepath, message in zip(filepaths, messages, strict=False):
_open_file(self._output_directory / filepath).write(f"{message}\n")
logging.info("Finished writing airbyte messages to disk")
finally:
for f in self.CACHE.values():
f.close()

def _get_filepaths_and_messages(self, message: AirbyteMessage) -> Tuple[Tuple[str, ...], Tuple[str, ...]]:
def _get_filepaths_and_messages(self, message: AirbyteMessage) -> tuple[tuple[str, ...], tuple[str, ...]]:
if message.type == AirbyteMessageType.CATALOG:
return (self.RELATIVE_CATALOGS_PATH,), (message.catalog.json(),)

Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
from __future__ import annotations

import json
import logging
import os
from pathlib import Path
from typing import Dict, Optional, Set
from typing import Optional

import rich
from connection_retriever import ConnectionObject, retrieve_objects # type: ignore
@@ -16,7 +17,7 @@
console = rich.get_console()


def parse_config(config: Dict | str | None) -> Optional[SecretDict]:
def parse_config(config: dict | str | None) -> Optional[SecretDict]:
if not config:
return None
if isinstance(config, str):
@@ -25,7 +26,7 @@ def parse_config(config: Dict | str | None) -> Optional[SecretDict]:
return SecretDict(config)


def parse_catalog(catalog: Dict | str | None) -> Optional[AirbyteCatalog]:
def parse_catalog(catalog: dict | str | None) -> Optional[AirbyteCatalog]:
if not catalog:
return None
if isinstance(catalog, str):
@@ -35,7 +36,7 @@ def parse_catalog(catalog: Dict | str | None) -> Optional[AirbyteCatalog]:


def parse_configured_catalog(
configured_catalog: Dict | str | None, selected_streams: Set[str] | None = None
configured_catalog: dict | str | None, selected_streams: set[str] | None = None
) -> Optional[ConfiguredAirbyteCatalog]:
if not configured_catalog:
return None
@@ -48,7 +49,7 @@ def parse_configured_catalog(
return catalog


def parse_state(state: Dict | str | None) -> Optional[Dict]:
def parse_state(state: dict | str | None) -> Optional[dict]:
if not state:
return None
if isinstance(state, str):
@@ -61,11 +62,11 @@ def get_connector_config_from_path(config_path: Path) -> Optional[SecretDict]:
return parse_config(config_path.read_text())


def get_state_from_path(state_path: Path) -> Optional[Dict]:
def get_state_from_path(state_path: Path) -> Optional[dict]:
return parse_state(state_path.read_text())


def get_configured_catalog_from_path(path: Path, selected_streams: Optional[Set[str]] = None) -> Optional[ConfiguredAirbyteCatalog]:
def get_configured_catalog_from_path(path: Path, selected_streams: Optional[set[str]] = None) -> Optional[ConfiguredAirbyteCatalog]:
return parse_configured_catalog(path.read_text(), selected_streams)


@@ -83,7 +84,7 @@ def get_configured_catalog_from_path(path: Path, selected_streams: Optional[Set[


def get_connection_objects(
requested_objects: Set[ConnectionObject],
requested_objects: set[ConnectionObject],
connection_id: Optional[str],
custom_config_path: Optional[Path],
custom_configured_catalog_path: Optional[Path],
@@ -92,7 +93,7 @@ def get_connection_objects(
fail_if_missing_objects: bool = True,
connector_image: Optional[str] = None,
auto_select_connection: bool = False,
selected_streams: Optional[Set[str]] = None,
selected_streams: Optional[set[str]] = None,
) -> ConnectionObjects:
"""This function retrieves the connection objects values.
It checks that the required objects are available and raises a UsageError if they are not.
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from __future__ import annotations

import datetime
import json
import logging
import uuid
from pathlib import Path
from typing import List, Optional
from typing import Optional

import anyio
import asyncer
@@ -39,7 +40,7 @@ def __init__(
self.actor_id = execution_inputs.actor_id
self.environment_variables = execution_inputs.environment_variables if execution_inputs.environment_variables else {}

self.full_command: List[str] = self._get_full_command(execution_inputs.command)
self.full_command: list[str] = self._get_full_command(execution_inputs.command)
self.completion_event = anyio.Event()
self.http_proxy = http_proxy
self.logger = logging.getLogger(f"{self.connector_under_test.name}-{self.connector_under_test.version}")
@@ -57,7 +58,7 @@ def stdout_file_path(self) -> Path:
def stderr_file_path(self) -> Path:
return (self.output_dir / "stderr.log").resolve()

def _get_full_command(self, command: Command) -> List[str]:
def _get_full_command(self, command: Command) -> list[str]:
if command is Command.SPEC:
return ["spec"]
elif command is Command.CHECK:
@@ -184,7 +185,7 @@ async def _log_progress(self) -> None:
def format_duration(time_delta: datetime.timedelta) -> str:
total_seconds = time_delta.total_seconds()
if total_seconds < 60:
return "{:.2f}s".format(total_seconds)
return f"{total_seconds:.2f}s"
minutes = int(total_seconds // 60)
seconds = int(total_seconds % 60)
return "{:02d}mn{:02d}s".format(minutes, seconds)
return f"{minutes:02d}mn{seconds:02d}s"
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
from __future__ import annotations


class ExportError(Exception):
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
from __future__ import annotations

from urllib.parse import parse_qs, urlencode, urlparse

37 changes: 19 additions & 18 deletions airbyte-ci/connectors/live-tests/src/live_tests/commons/models.py
Original file line number Diff line number Diff line change
@@ -5,17 +5,18 @@
import logging
import tempfile
from collections import defaultdict
from collections.abc import Iterable, Iterator, MutableMapping
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Any, Dict, Iterable, Iterator, List, MutableMapping, Optional, Type
from typing import Any, Optional

import _collections_abc
import dagger
import requests

# type: ignore
from airbyte_protocol.models import AirbyteCatalog, AirbyteMessage, ConfiguredAirbyteCatalog # type: ignore
from airbyte_protocol.models import AirbyteCatalog # type: ignore
from airbyte_protocol.models import AirbyteMessage # type: ignore
from airbyte_protocol.models import ConfiguredAirbyteCatalog # type: ignore
from airbyte_protocol.models import Type as AirbyteMessageType
from genson import SchemaBuilder # type: ignore
from live_tests.commons.backends import DuckDbBackend, FileBackend
@@ -174,7 +175,7 @@ def actor_type(self) -> ActorType:

@classmethod
async def from_image_name(
cls: Type[ConnectorUnderTest],
cls: type[ConnectorUnderTest],
dagger_client: dagger.Client,
image_name: str,
target_or_control: TargetOrControl,
@@ -191,8 +192,8 @@ class ExecutionInputs:
command: Command
config: Optional[SecretDict] = None
configured_catalog: Optional[ConfiguredAirbyteCatalog] = None
state: Optional[Dict] = None
environment_variables: Optional[Dict] = None
state: Optional[dict] = None
environment_variables: Optional[dict] = None
duckdb_path: Optional[Path] = None

def raise_if_missing_attr_for_command(self, attribute: str) -> None:
@@ -232,8 +233,8 @@ class ExecutionResult:
success: bool
executed_container: Optional[dagger.Container]
http_dump: Optional[dagger.File] = None
http_flows: List[http.HTTPFlow] = field(default_factory=list)
stream_schemas: Optional[Dict[str, Any]] = None
http_flows: list[http.HTTPFlow] = field(default_factory=list)
stream_schemas: Optional[dict[str, Any]] = None
backend: Optional[FileBackend] = None

HTTP_DUMP_FILE_NAME = "http_dump.mitm"
@@ -253,7 +254,7 @@ def duckdb_schema(self) -> Iterable[str]:

@classmethod
async def load(
cls: Type[ExecutionResult],
cls: type[ExecutionResult],
connector_under_test: ConnectorUnderTest,
actor_id: str,
command: Command,
@@ -286,7 +287,7 @@ async def load_http_flows(self) -> None:
def parse_airbyte_messages_from_command_output(
self, command_output_path: Path, log_validation_errors: bool = False
) -> Iterable[AirbyteMessage]:
with open(command_output_path, "r") as command_output:
with open(command_output_path) as command_output:
for line in command_output:
try:
yield AirbyteMessage.parse_raw(line)
@@ -302,9 +303,9 @@ def get_records(self) -> Iterable[AirbyteMessage]:
if message.type is AirbyteMessageType.RECORD:
yield message

def generate_stream_schemas(self) -> Dict[str, Any]:
def generate_stream_schemas(self) -> dict[str, Any]:
self.logger.info("Generating stream schemas")
stream_builders: Dict[str, SchemaBuilder] = {}
stream_builders: dict[str, SchemaBuilder] = {}
for record in self.get_records():
stream = record.record.stream
if stream not in stream_builders:
@@ -328,8 +329,8 @@ def get_records_per_stream(self, stream: str) -> Iterator[AirbyteMessage]:
if message.type is AirbyteMessageType.RECORD:
yield message

def get_message_count_per_type(self) -> Dict[AirbyteMessageType, int]:
message_count: Dict[AirbyteMessageType, int] = defaultdict(int)
def get_message_count_per_type(self) -> dict[AirbyteMessageType, int]:
message_count: dict[AirbyteMessageType, int] = defaultdict(int)
for message in self.airbyte_messages:
message_count[message.type] += 1
return message_count
@@ -376,7 +377,7 @@ async def save_artifacts(self, output_dir: Path, duckdb_path: Optional[Path] = N
self.save_stream_schemas(output_dir)
self.logger.info("All artifacts saved to disk")

def get_updated_configuration(self, control_message_path: Path) -> Optional[Dict[str, Any]]:
def get_updated_configuration(self, control_message_path: Path) -> Optional[dict[str, Any]]:
"""Iterate through the control messages to find CONNECTOR_CONFIG message and return the last updated configuration."""
if not control_message_path.exists():
return None
@@ -403,7 +404,7 @@ def update_configuration(self) -> None:
payload = {
"configuration": {
**updated_configuration,
**{f"{self.connector_under_test.actor_type.value}Type": self.connector_under_test.name_without_type_prefix},
f"{self.connector_under_test.actor_type.value}Type": self.connector_under_test.name_without_type_prefix,
}
}
headers = {
@@ -427,7 +428,7 @@ class ConnectionObjects:
destination_config: Optional[SecretDict]
configured_catalog: Optional[ConfiguredAirbyteCatalog]
catalog: Optional[AirbyteCatalog]
state: Optional[Dict]
state: Optional[dict]
workspace_id: Optional[str]
source_id: Optional[str]
destination_id: Optional[str]
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
from __future__ import annotations

import logging
import uuid
Loading

0 comments on commit 7bd0324

Please sign in to comment.