Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ def __init__(
component_factory = ModelToComponentFactory(
emit_connector_builder_messages=emit_connector_builder_messages,
message_repository=ConcurrentMessageRepository(queue, message_repository),
configured_catalog=catalog,
connector_state_manager=self._connector_state_manager,
max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
limit_pages_fetched_per_slice=limits.max_pages_per_slice if limits else None,
Expand Down
36 changes: 34 additions & 2 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2520,6 +2520,34 @@ definitions:
type:
type: string
enum: [JsonlDecoder]
JsonSchemaPropertySelector:
title: Json Schema Property Selector
description: When configured, the JSON schema supplied in the catalog containing which columns are selected for the current stream will be used to reduce which query properties will be included in the outbound API request. This can improve the performance of API requests, especially for those requiring multiple requests to get a complete record.
type: object
required:
- type
properties:
type:
type: string
enum: [JsonSchemaPropertySelector]
transformations:
title: Transformations
description: A list of transformations to be applied on the customer configured schema that will be used to filter out unselected fields when specifying query properties for API requests.
linkable: true
type: array
items:
anyOf:
- "$ref": "#/definitions/AddFields"
- "$ref": "#/definitions/RemoveFields"
- "$ref": "#/definitions/KeysToLower"
- "$ref": "#/definitions/KeysToSnakeCase"
- "$ref": "#/definitions/FlattenFields"
- "$ref": "#/definitions/DpathFlattenFields"
- "$ref": "#/definitions/KeysReplace"
- "$ref": "#/definitions/CustomTransformation"
$parameters:
type: object
additionalProperties: true
KeysToLower:
title: Keys to Lower Case
description: A transformation that renames all keys to lower case.
Expand Down Expand Up @@ -3410,6 +3438,10 @@ definitions:
title: Property Chunking
description: Defines how query properties will be grouped into smaller sets for APIs with limitations on the number of properties fetched per API request.
"$ref": "#/definitions/PropertyChunking"
property_selector:
title: Property Selector
description: Defines where to look for and which query properties that should be sent in outbound API requests. For example, you can specify that only the selected columns of a stream should be in the request.
"$ref": "#/definitions/JsonSchemaPropertySelector"
$parameters:
type: object
additionalProperties: true
Expand Down Expand Up @@ -3746,7 +3778,7 @@ definitions:
properties:
type:
type: string
enum: [ PaginationReset ]
enum: [PaginationReset]
action:
type: string
enum:
Expand All @@ -3763,7 +3795,7 @@ definitions:
properties:
type:
type: string
enum: [ PaginationResetLimits ]
enum: [PaginationResetLimits]
number_of_records:
type: integer
GzipDecoder:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

# generated by datamodel-codegen:
# filename: declarative_component_schema.yaml

Expand Down Expand Up @@ -2029,6 +2031,29 @@ class SessionTokenRequestApiKeyAuthenticator(BaseModel):
)


class JsonSchemaPropertySelector(BaseModel):
type: Literal["JsonSchemaPropertySelector"]
transformations: Optional[
List[
Union[
AddFields,
RemoveFields,
KeysToLower,
KeysToSnakeCase,
FlattenFields,
DpathFlattenFields,
KeysReplace,
CustomTransformation,
]
]
] = Field(
None,
description="A list of transformations to be applied on the customer configured schema that will be used to filter out unselected fields when specifying query properties for API requests.",
title="Transformations",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class ListPartitionRouter(BaseModel):
type: Literal["ListPartitionRouter"]
cursor_field: str = Field(
Expand Down Expand Up @@ -2799,6 +2824,11 @@ class QueryProperties(BaseModel):
description="Defines how query properties will be grouped into smaller sets for APIs with limitations on the number of properties fetched per API request.",
title="Property Chunking",
)
property_selector: Optional[JsonSchemaPropertySelector] = Field(
None,
description="Defines where to look for and which query properties that should be sent in outbound API requests. For example, you can specify that only the selected columns of a stream should be in the request.",
title="Property Selector",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
get_type_hints,
)

from airbyte_protocol_dataclasses.models import ConfiguredAirbyteStream
from isodate import parse_duration
from pydantic.v1 import BaseModel
from requests import Response
Expand All @@ -42,6 +43,7 @@
AirbyteStateMessage,
AirbyteStateType,
AirbyteStreamState,
ConfiguredAirbyteCatalog,
FailureType,
Level,
StreamDescriptor,
Expand Down Expand Up @@ -314,6 +316,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JsonlDecoder as JsonlDecoderModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JsonSchemaPropertySelector as JsonSchemaPropertySelectorModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JwtAuthenticator as JwtAuthenticatorModel,
)
Expand Down Expand Up @@ -501,6 +506,9 @@
from airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking import (
PropertyLimitType,
)
from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector import (
JsonSchemaPropertySelector,
)
from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import (
GroupByKey,
)
Expand Down Expand Up @@ -668,6 +676,7 @@ def __init__(
message_repository: Optional[MessageRepository] = None,
connector_state_manager: Optional[ConnectorStateManager] = None,
max_concurrent_async_job_count: Optional[int] = None,
configured_catalog: Optional[ConfiguredAirbyteCatalog] = None,
):
self._init_mappings()
self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice
Expand All @@ -678,6 +687,9 @@ def __init__(
self._message_repository = message_repository or InMemoryMessageRepository(
self._evaluate_log_level(emit_connector_builder_messages)
)
self._stream_name_to_configured_stream = self._create_stream_name_to_configured_stream(
configured_catalog
)
self._connector_state_manager = connector_state_manager or ConnectorStateManager()
self._api_budget: Optional[Union[APIBudget, HttpAPIBudget]] = None
self._job_tracker: JobTracker = JobTracker(max_concurrent_async_job_count or 1)
Expand Down Expand Up @@ -734,6 +746,7 @@ def _init_mappings(self) -> None:
InlineSchemaLoaderModel: self.create_inline_schema_loader,
JsonDecoderModel: self.create_json_decoder,
JsonlDecoderModel: self.create_jsonl_decoder,
JsonSchemaPropertySelectorModel: self.create_json_schema_property_selector,
GzipDecoderModel: self.create_gzip_decoder,
KeysToLowerModel: self.create_keys_to_lower_transformation,
KeysToSnakeCaseModel: self.create_keys_to_snake_transformation,
Expand Down Expand Up @@ -796,6 +809,16 @@ def _init_mappings(self) -> None:
# Needed for the case where we need to perform a second parse on the fields of a custom component
self.TYPE_NAME_TO_MODEL = {cls.__name__: cls for cls in self.PYDANTIC_MODEL_TO_CONSTRUCTOR}

@staticmethod
def _create_stream_name_to_configured_stream(
configured_catalog: Optional[ConfiguredAirbyteCatalog],
) -> Mapping[str, ConfiguredAirbyteStream]:
return (
{stream.stream.name: stream for stream in configured_catalog.streams}
if configured_catalog
else {}
)

def create_component(
self,
model_type: Type[BaseModel],
Expand Down Expand Up @@ -2987,7 +3010,7 @@ def create_property_chunking(
)

def create_query_properties(
self, model: QueryPropertiesModel, config: Config, **kwargs: Any
self, model: QueryPropertiesModel, config: Config, *, stream_name: str, **kwargs: Any
) -> QueryProperties:
if isinstance(model.property_list, list):
property_list = model.property_list
Expand All @@ -3004,10 +3027,43 @@ def create_query_properties(
else None
)

property_selector = (
self._create_component_from_model(
model=model.property_selector, config=config, stream_name=stream_name, **kwargs
)
if model.property_selector
else None
)

return QueryProperties(
property_list=property_list,
always_include_properties=model.always_include_properties,
property_chunking=property_chunking,
property_selector=property_selector,
config=config,
parameters=model.parameters or {},
)

def create_json_schema_property_selector(
self,
model: JsonSchemaPropertySelectorModel,
config: Config,
*,
stream_name: str,
**kwargs: Any,
) -> JsonSchemaPropertySelector:
configured_stream = self._stream_name_to_configured_stream.get(stream_name)

transformations = []
if model.transformations:
for transformation_model in model.transformations:
transformations.append(
self._create_component_from_model(model=transformation_model, config=config)
)

return JsonSchemaPropertySelector(
configured_stream=configured_stream,
properties_transformations=transformations,
config=config,
parameters=model.parameters or {},
)
Expand Down Expand Up @@ -3235,7 +3291,7 @@ def _get_url(req: Requester) -> str:

if len(query_properties_definitions) == 1:
query_properties = self._create_component_from_model(
model=query_properties_definitions[0], config=config
model=query_properties_definitions[0], stream_name=name, config=config
)

# Removes QueryProperties components from the interpolated mappings because it has been designed
Expand All @@ -3261,11 +3317,13 @@ def _get_url(req: Requester) -> str:

query_properties = self.create_query_properties(
model=query_properties_definition,
stream_name=name,
config=config,
)
elif hasattr(model.requester, "query_properties") and model.requester.query_properties:
query_properties = self.create_query_properties(
model=model.requester.query_properties,
stream_name=name,
config=config,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from dataclasses import InitVar, dataclass
from enum import Enum
from typing import Any, Iterable, List, Mapping, Optional
from typing import Any, Iterable, List, Mapping, Optional, Set

from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import GroupByKey
from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.merge_strategy import (
Expand Down Expand Up @@ -40,7 +40,10 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
)

def get_request_property_chunks(
self, property_fields: Iterable[str], always_include_properties: Optional[List[str]]
self,
property_fields: Iterable[str],
always_include_properties: Optional[List[str]],
configured_properties: Optional[Set[str]],
) -> Iterable[List[str]]:
if not self.property_limit:
single_property_chunk = list(property_fields)
Expand All @@ -53,6 +56,8 @@ def get_request_property_chunks(
for property_field in property_fields:
# If property_limit_type is not defined, we default to property_count which is just an incrementing count
# todo: Add ability to specify parameter delimiter representation and take into account in property_field_size
if configured_properties is not None and property_field not in configured_properties:
continue
property_field_size = (
len(property_field)
+ 3 # The +3 represents the extra characters for encoding the delimiter in between properties
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector.json_schema_property_selector import (
JsonSchemaPropertySelector,
)
from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector.property_selector import (
PropertySelector,
)

__all__ = ["JsonSchemaPropertySelector", "PropertySelector"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
import copy
from dataclasses import InitVar, dataclass, field
from typing import Any, List, Mapping, Optional, Set

from airbyte_protocol_dataclasses.models import ConfiguredAirbyteStream

from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector.property_selector import (
PropertySelector,
)
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.types import Config


@dataclass
class JsonSchemaPropertySelector(PropertySelector):
"""
A class that contains a list of transformations to apply to properties.
"""

config: Config
parameters: InitVar[Mapping[str, Any]]
# For other non-read operations, there is no configured catalog and therefore no schema selection
configured_stream: Optional[ConfiguredAirbyteStream] = None
properties_transformations: List[RecordTransformation] = field(default_factory=lambda: [])

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._parameters = parameters

def select(self) -> Optional[Set[str]]:
"""
Returns the set of properties that have been selected for the configured stream. The intent being that
we should only query for selected properties not all since disabled properties are discarded.

When configured_stream is None, then there was no incoming catalog and all fields should be retrieved.
This is different from the empty set where the json_schema was empty and no schema fields were selected.
"""

# For CHECK/DISCOVER operations, there is no catalog and therefore no configured stream or selected
# columns. In this case we return None which is interpreted by the QueryProperties component to not
# perform any filtering of schema properties and fetch all of them
if self.configured_stream is None:
return None

schema_properties = copy.deepcopy(
self.configured_stream.stream.json_schema.get("properties", {})
)
if self.properties_transformations:
for transformation in self.properties_transformations:
transformation.transform(
record=schema_properties,
config=self.config,
)
return set(schema_properties.keys())
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional, Set


@dataclass
class PropertySelector(ABC):
"""
Describes the interface for selecting and transforming properties from a configured stream's schema
to determine which properties should be queried from the API.
"""

@abstractmethod
def select(self) -> Optional[Set[str]]:
"""
Selects and returns the set of properties that should be queried from the API based on the
configured stream's schema and any applicable transformations.

Returns:
Set[str]: The set of property names to query
"""
pass
Loading
Loading