Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code cdk): add component resolver and http component resolver #88

Merged
merged 22 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,13 @@ def _group_streams(

state_manager = ConnectorStateManager(state=self._state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later

name_to_stream_mapping = {
stream["name"]: stream for stream in self.resolved_manifest["streams"]
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
}
# Combine streams and dynamic_streams. Note: both cannot be empty at the same time,
# and this is validated during the initialization of the source.
streams = self.resolved_manifest.get("streams", []) + self.resolved_manifest.get(
"dynamic_streams", []
)

name_to_stream_mapping = {stream["name"]: stream for stream in streams}

for declarative_stream in self.streams(config=config):
# Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect
Expand Down
100 changes: 99 additions & 1 deletion airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ version: 1.0.0
required:
- type
- check
- streams
- version
anyOf:
- required:
- streams
- required:
- dynamic_streams
properties:
type:
type: string
Expand All @@ -19,6 +23,10 @@ properties:
type: array
items:
"$ref": "#/definitions/DeclarativeStream"
dynamic_streams:
type: array
items:
"$ref": "#/definitions/DynamicDeclarativeStream"
version:
type: string
description: The version of the Airbyte CDK used to build and test the source.
Expand Down Expand Up @@ -2700,6 +2708,96 @@ definitions:
$parameters:
type: object
additionalProperties: true
ComponentMappingDefinition:
title: Component Mapping Definition
description: (This component is experimental. Use at your own risk.) Specifies a mapping definition to update or add fields in a record or configuration. This allows dynamic mapping of data by interpolating values into the template based on provided contexts.
type: object
required:
- type
- field_path
- value
properties:
type:
type: string
enum: [ComponentMappingDefinition]
field_path:
title: Field Path
description: A list of potentially nested fields indicating the full path where value will be added or updated.
type: array
items:
- type: string
interpolation_content:
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
- config
- components_values
- stream_template_config
examples:
- ["data"]
- ["data", "records"]
- ["data", "{{ parameters.name }}"]
- ["data", "*", "record"]
value:
title: Value
description: The dynamic or static value to assign to the key. Interpolated values can be used to dynamically determine the value during runtime.
type: string
interpolation_context:
- config
- stream_template_config
- components_values
examples:
- "{{ components_values['updates'] }}"
- "{{ components_values['MetaData']['LastUpdatedTime'] }}"
- "{{ config['segment_id'] }}"
value_type:
title: Value Type
description: The expected data type of the value. If omitted, the type will be inferred from the value provided.
"$ref": "#/definitions/ValueType"
$parameters:
type: object
additionalProperties: true
HttpComponentsResolver:
type: object
description: Component resolve and populates stream templates with components fetched via an HTTP retriever. (This component is experimental. Use at your own risk.)
properties:
type:
type: string
enum: [HttpComponentsResolver]
retriever:
title: Retriever
description: Component used to coordinate how records are extracted across stream slices and request pages.
anyOf:
- "$ref": "#/definitions/AsyncRetriever"
- "$ref": "#/definitions/CustomRetriever"
- "$ref": "#/definitions/SimpleRetriever"
components_mapping:
type: array
items:
"$ref": "#/definitions/ComponentMappingDefinition"
$parameters:
type: object
additionalProperties: true
required:
- type
- retriever
- components_mapping
DynamicDeclarativeStream:
type: object
description: A component that described how will be created declarative streams based on stream template. (This component is experimental. Use at your own risk.)
properties:
type:
type: string
enum: [DynamicDeclarativeStream]
stream_template:
title: Stream Template
description: Reference to the stream template.
"$ref": "#/definitions/DeclarativeStream"
components_resolver:
title: Components Resolver
description: Component resolve and populates stream templates with components values.
"$ref": "#/definitions/HttpComponentsResolver"
required:
- type
- stream_template
- components_resolver
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
interpolation:
variables:
- title: config
Expand Down
53 changes: 51 additions & 2 deletions airbyte_cdk/sources/declarative/manifest_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import (
ManifestComponentTransformer,
)
from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING

from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import (
ManifestReferenceResolver,
)
Expand Down Expand Up @@ -119,7 +121,10 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
self._emit_manifest_debug_message(
extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)}
)
stream_configs = self._stream_configs(self._source_config)

stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
self._source_config, config
)

source_streams = [
self._constructor.create_component(
Expand Down Expand Up @@ -233,7 +238,8 @@ def _validate_source(self) -> None:
)

streams = self._source_config.get("streams")
if not streams:
dynamic_streams = self._source_config.get("dynamic_streams")
if not (streams or dynamic_streams):
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
raise ValidationError(
f"A valid manifest should have at least one stream defined. Got {streams}"
)
Expand Down Expand Up @@ -302,5 +308,48 @@ def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]:
s["type"] = "DeclarativeStream"
return stream_configs

def _dynamic_stream_configs(
self, manifest: Mapping[str, Any], config: Mapping[str, Any]
) -> List[Dict[str, Any]]:
dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", [])
dynamic_stream_configs: List[Dict[str, Any]] = []

for dynamic_definition in dynamic_stream_definitions:
components_resolver_config = dynamic_definition["components_resolver"]

if not components_resolver_config:
raise ValueError(
f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}"
)
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved

resolver_type = components_resolver_config.get("type")
if not resolver_type:
raise ValueError(
f"Missing 'type' in components resolver configuration: {components_resolver_config}"
)

if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING:
raise ValueError(
f"Invalid components resolver type '{resolver_type}'. "
f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}."
)

# Create a resolver for dynamic components based on type
components_resolver = self._constructor.create_component(
COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], components_resolver_config, config
)

stream_template_config = dynamic_definition["stream_template"]

for dynamic_stream in components_resolver.resolve_components(
stream_template_config=stream_template_config
):
if "type" not in dynamic_stream:
dynamic_stream["type"] = "DeclarativeStream"

dynamic_stream_configs.append(dynamic_stream)

return dynamic_stream_configs

def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None:
self.logger.debug("declarative source created from manifest", extra=extra_args)
Loading
Loading