diff --git a/airbyte-integrations/connectors/source-declarative-manifest/metadata.yaml b/airbyte-integrations/connectors/source-declarative-manifest/metadata.yaml index 95c37b1e2e40..53039337afab 100644 --- a/airbyte-integrations/connectors/source-declarative-manifest/metadata.yaml +++ b/airbyte-integrations/connectors/source-declarative-manifest/metadata.yaml @@ -8,7 +8,7 @@ data: connectorType: source definitionId: 64a2f99c-542f-4af8-9a6f-355f1217b436 # This version should not be updated manually - it is updated by the CDK release workflow. - dockerImageTag: 6.2.0 + dockerImageTag: 6.2.1 dockerRepository: airbyte/source-declarative-manifest # This page is hidden from the docs for now, since the connector is not in any Airbyte registries. documentationUrl: https://docs.airbyte.com/integrations/sources/low-code diff --git a/airbyte-integrations/connectors/source-declarative-manifest/pyproject.toml b/airbyte-integrations/connectors/source-declarative-manifest/pyproject.toml index d90d10ca93b8..d44f0c01b34a 100644 --- a/airbyte-integrations/connectors/source-declarative-manifest/pyproject.toml +++ b/airbyte-integrations/connectors/source-declarative-manifest/pyproject.toml @@ -3,7 +3,7 @@ requires = ["poetry-core>=1.0.0"] build-backend = "poetry.core.masonry.api" [tool.poetry] -version = "6.2.0" +version = "6.2.1" name = "source-declarative-manifest" description = "Base source implementation for low-code sources." authors = ["Airbyte "] diff --git a/airbyte-integrations/connectors/source-declarative-manifest/source_declarative_manifest/run.py b/airbyte-integrations/connectors/source-declarative-manifest/source_declarative_manifest/run.py index b7303e9c6927..2dfd888804b8 100644 --- a/airbyte-integrations/connectors/source-declarative-manifest/source_declarative_manifest/run.py +++ b/airbyte-integrations/connectors/source-declarative-manifest/source_declarative_manifest/run.py @@ -1,6 +1,7 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # + from __future__ import annotations import json @@ -11,21 +12,21 @@ from pathlib import Path from typing import Any, List, Mapping, Optional -from airbyte_cdk import TState -from airbyte_cdk.connector import BaseConnector from airbyte_cdk.entrypoint import AirbyteEntrypoint, launch from airbyte_cdk.models import ( AirbyteErrorTraceMessage, AirbyteMessage, AirbyteMessageSerializer, + AirbyteStateMessage, AirbyteTraceMessage, ConfiguredAirbyteCatalog, ConnectorSpecificationSerializer, TraceType, Type, ) -from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource +from airbyte_cdk.sources.declarative.concurrent_declarative_source import ConcurrentDeclarativeSource from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource +from airbyte_cdk.sources.source import TState from orjson import orjson @@ -61,16 +62,10 @@ def handle_command(args: List[str]) -> None: handle_remote_manifest_command(args) -def _get_local_yaml_source(args: List[str]): - catalog_path = AirbyteEntrypoint.extract_catalog(args) - config_path = AirbyteEntrypoint.extract_config(args) - state_path = AirbyteEntrypoint.extract_state(args) +def _get_local_yaml_source(args: List[str]) -> SourceLocalYaml: try: - return SourceLocalYaml( - SourceLocalYaml.read_catalog(catalog_path) if catalog_path else None, - SourceLocalYaml.read_config(config_path) if config_path else None, - SourceLocalYaml.read_state(state_path) if state_path else None, - ) + config, catalog, state = _parse_inputs_into_config_catalog_state(args) + return SourceLocalYaml(config=config, catalog=catalog, state=state) except Exception as error: print( orjson.dumps( @@ -112,24 +107,56 @@ def handle_remote_manifest_command(args: List[str]) -> None: message = AirbyteMessage(type=Type.SPEC, spec=spec) print(AirbyteEntrypoint.airbyte_message_to_string(message)) else: - source = create_manifest(args) + source = create_declarative_source(args) launch(source, args) -def create_manifest(args: List[str]) -> ManifestDeclarativeSource: +def create_declarative_source(args: List[str]) -> ConcurrentDeclarativeSource: """Creates the source with the injected config. This essentially does what other low-code sources do at build time, but at runtime, with a user-provided manifest in the config. This better reflects what happens in the connector builder. """ - parsed_args = AirbyteEntrypoint.parse_args(args) - config = BaseConnector.read_config(parsed_args.config) - if "__injected_declarative_manifest" not in config: - raise ValueError( - f"Invalid config: `__injected_declarative_manifest` should be provided at the root of the config but config only has keys {list(config.keys())}" + try: + config, catalog, state = _parse_inputs_into_config_catalog_state(args) + if "__injected_declarative_manifest" not in config: + raise ValueError( + f"Invalid config: `__injected_declarative_manifest` should be provided at the root of the config but config only has keys {list(config.keys())}" + ) + return ConcurrentDeclarativeSource( + config=config, catalog=catalog, state=state, source_config=config.get("__injected_declarative_manifest") ) - return ManifestDeclarativeSource(config.get("__injected_declarative_manifest")) + except Exception as error: + print( + orjson.dumps( + AirbyteMessageSerializer.dump( + AirbyteMessage( + type=Type.TRACE, + trace=AirbyteTraceMessage( + type=TraceType.ERROR, + emitted_at=int(datetime.now().timestamp() * 1000), + error=AirbyteErrorTraceMessage( + message=f"Error starting the sync. This could be due to an invalid configuration or catalog. Please contact Support for assistance. Error: {error}", + stack_trace=traceback.format_exc(), + ), + ), + ) + ) + ).decode() + ) + raise error + + +def _parse_inputs_into_config_catalog_state( + args: List[str], +) -> (Optional[Mapping[str, Any]], Optional[ConfiguredAirbyteCatalog], List[AirbyteStateMessage]): + parsed_args = AirbyteEntrypoint.parse_args(args) + config = ConcurrentDeclarativeSource.read_config(parsed_args.config) if hasattr(parsed_args, "config") else None + catalog = ConcurrentDeclarativeSource.read_catalog(parsed_args.catalog) if hasattr(parsed_args, "catalog") else None + state = ConcurrentDeclarativeSource.read_state(parsed_args.state) if hasattr(parsed_args, "state") else [] + + return config, catalog, state def run(): diff --git a/airbyte-integrations/connectors/source-declarative-manifest/unit_tests/test_source_declarative_local_manifest.py b/airbyte-integrations/connectors/source-declarative-manifest/unit_tests/test_source_declarative_local_manifest.py index dddd710bdbee..e2f03e369f8d 100644 --- a/airbyte-integrations/connectors/source-declarative-manifest/unit_tests/test_source_declarative_local_manifest.py +++ b/airbyte-integrations/connectors/source-declarative-manifest/unit_tests/test_source_declarative_local_manifest.py @@ -2,6 +2,7 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +import json from unittest.mock import patch import pytest @@ -10,6 +11,8 @@ POKEAPI_JSON_SPEC_SUBSTRING = '"required":["pokemon_name"]' SUCCESS_CHECK_SUBSTRING = '"connectionStatus":{"status":"SUCCEEDED"}' +FAILED_CHECK_SUBSTRING = '"connectionStatus":{"status":"FAILED"}' + @pytest.fixture(autouse=True) def setup(valid_local_manifest_yaml): @@ -17,21 +20,26 @@ def setup(valid_local_manifest_yaml): with patch('source_declarative_manifest.run.YamlDeclarativeSource._read_and_parse_yaml_file', return_value=valid_local_manifest_yaml): yield + def test_spec_is_poke_api(capsys): run.handle_command(["spec"]) stdout = capsys.readouterr() assert POKEAPI_JSON_SPEC_SUBSTRING in stdout.out + def test_invalid_yaml_throws(capsys, invalid_local_manifest_yaml): with patch('source_declarative_manifest.run.YamlDeclarativeSource._read_and_parse_yaml_file', return_value=invalid_local_manifest_yaml): with pytest.raises(ValidationError): run.handle_command(["spec"]) -def test_given_invalid_config_then_raise_value_error(invalid_local_config_file): - with pytest.raises(ValueError): - run.create_manifest(["check", "--config", str(invalid_local_config_file)]) -def test_given_invalid_config_then_raise_value_error(capsys, valid_local_config_file): +def test_given_invalid_config_then_unsuccessful_check(capsys, invalid_local_config_file): + run.handle_command(["check", "--config", str(invalid_local_config_file)]) + stdout = capsys.readouterr() + assert json.loads(stdout.out).get("connectionStatus").get("status") == "FAILED" + + +def test_given_valid_config_with_successful_check(capsys, valid_local_config_file): run.handle_command(["check", "--config", str(valid_local_config_file)]) stdout = capsys.readouterr() assert SUCCESS_CHECK_SUBSTRING in stdout.out diff --git a/airbyte-integrations/connectors/source-declarative-manifest/unit_tests/test_source_declarative_remote_manifest.py b/airbyte-integrations/connectors/source-declarative-manifest/unit_tests/test_source_declarative_remote_manifest.py index 35091ac0785b..52a363a298c1 100644 --- a/airbyte-integrations/connectors/source-declarative-manifest/unit_tests/test_source_declarative_remote_manifest.py +++ b/airbyte-integrations/connectors/source-declarative-manifest/unit_tests/test_source_declarative_remote_manifest.py @@ -4,7 +4,7 @@ import pytest from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource -from source_declarative_manifest.run import create_manifest, handle_command +from source_declarative_manifest.run import create_declarative_source, handle_command REMOTE_MANIFEST_SPEC_SUBSTRING = '"required":["__injected_declarative_manifest"]' @@ -17,9 +17,9 @@ def test_spec_does_not_raise_value_error(capsys): def test_given_no_injected_declarative_manifest_then_raise_value_error(invalid_remote_config): with pytest.raises(ValueError): - create_manifest(["check", "--config", str(invalid_remote_config)]) + create_declarative_source(["check", "--config", str(invalid_remote_config)]) def test_given_injected_declarative_manifest_then_return_declarative_manifest(valid_remote_config): - source = create_manifest(["check", "--config", str(valid_remote_config)]) + source = create_declarative_source(["check", "--config", str(valid_remote_config)]) assert isinstance(source, ManifestDeclarativeSource) diff --git a/docs/integrations/sources/low-code.md b/docs/integrations/sources/low-code.md index a14575a9a378..273b561b3f01 100644 --- a/docs/integrations/sources/low-code.md +++ b/docs/integrations/sources/low-code.md @@ -9,6 +9,7 @@ The changelog below is automatically updated by the `bump_version` command as pa | Version | Date | Pull Request | Subject | | :------ | :--------- | :------------------------------------------------------- | :------------------------------------------------------------------- | +| 6.2.1 | 2024-11-05 | [48373](https://github.com/airbytehq/airbyte/pull/48373) | Add CDK v6 support to remote manifest mode | | 6.2.0 | 2024-11-05 | [36501](https://github.com/airbytehq/airbyte/pull/36501) | Bump CDK version to 6.2.0 | | 6.1.2 | 2024-11-05 | [48344](https://github.com/airbytehq/airbyte/pull/48344) | Fix discover failing on new CDK | | 6.1.1 | 2024-11-04 | [36501](https://github.com/airbytehq/airbyte/pull/36501) | Bump CDK version to 6.1.1 |