From 1ddc7619e867ebacb4ea65552c3ea720f3dd582b Mon Sep 17 00:00:00 2001 From: pnilan Date: Wed, 5 Feb 2025 11:01:02 -0800 Subject: [PATCH 01/12] remove from `interpolation_context` and fail loudly in `ConcurrentDeclarativeSource` --- .../concurrent_declarative_source.py | 37 +++++++++---------- .../declarative_component_schema.yaml | 9 ----- 2 files changed, 17 insertions(+), 29 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index d4ecc0084..1ba5f8893 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -10,6 +10,7 @@ AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog, + FailureType ) from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager @@ -53,6 +54,7 @@ from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, FinalStateCursor from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream +from airbyte_cdk.utils import AirbyteTracedException class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]): @@ -416,17 +418,15 @@ def _stream_supports_concurrent_partition_processing( ): http_requester = declarative_stream.retriever.requester if "stream_state" in http_requester._path.string: - self.logger.warning( - f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the HttpRequester which is not thread-safe. Defaulting to synchronous processing" - ) - return False + error_message = f"Low-code stream '{declarative_stream.name}' uses interpolation of `stream_state` in the `HttpRequester` component which is no longer supported. Please remove this interpolation. Similar functionality may be achieved by interpolating the `stream_interval` variable instead." + self.logger.error(error_message) + raise AirbyteTracedException(internal_message=error_message, message=error_message, failure_type=FailureType.config_error) request_options_provider = http_requester._request_options_provider if request_options_provider.request_options_contain_stream_state(): - self.logger.warning( - f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the HttpRequester which is not thread-safe. Defaulting to synchronous processing" - ) - return False + error_message = f"Low-code stream '{declarative_stream.name}' uses interpolation of `stream_state` in the `HttpRequester` component which is no longer supported. Please remove this interpolation. Similar functionality may be achieved by interpolating the `stream_interval` variable instead." + self.logger.error(error_message) + raise AirbyteTracedException(internal_message=error_message, message=error_message, failure_type=FailureType.config_error) record_selector = declarative_stream.retriever.record_selector if isinstance(record_selector, RecordSelector): @@ -437,10 +437,9 @@ def _stream_supports_concurrent_partition_processing( ) and "stream_state" in record_selector.record_filter.condition ): - self.logger.warning( - f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the RecordFilter which is not thread-safe. Defaulting to synchronous processing" - ) - return False + error_message = f"Low-code stream '{declarative_stream.name}' uses interpolation of `stream_state` in the `RecordFilter` component which is no longer supported. Please remove this interpolation. Similar functionality may be achieved by interpolating the `stream_interval` variable instead." + self.logger.error(error_message) + raise AirbyteTracedException(internal_message=error_message, message=error_message, failure_type=FailureType.config_error) for add_fields in [ transformation @@ -449,18 +448,16 @@ def _stream_supports_concurrent_partition_processing( ]: for field in add_fields.fields: if isinstance(field.value, str) and "stream_state" in field.value: - self.logger.warning( - f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the AddFields which is not thread-safe. Defaulting to synchronous processing" - ) - return False + error_message = f"Low-code stream '{declarative_stream.name}' uses interpolation of `stream_state` in the `AddFields` component which is no longer supported. Please remove this interpolation. Similar functionality may be achieved by interpolating the `stream_interval` variable instead." + self.logger.error(error_message) + raise AirbyteTracedException(internal_message=error_message, message=error_message, failure_type=FailureType.config_error) if ( isinstance(field.value, InterpolatedString) and "stream_state" in field.value.string ): - self.logger.warning( - f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the AddFields which is not thread-safe. Defaulting to synchronous processing" - ) - return False + error_message = f"Low-code stream '{declarative_stream.name}' uses interpolation of `stream_state` in the `AddFields` component which is no longer supported. Please remove this interpolation. Similar functionality may be achieved by interpolating the `stream_interval` variable instead." + self.logger.error(error_message) + raise AirbyteTracedException(internal_message=error_message, message=error_message, failure_type=FailureType.config_error) return True @staticmethod diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 072a1efcd..3528747f7 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -80,7 +80,6 @@ definitions: - stream_interval - stream_partition - stream_slice - - stream_state examples: - "{{ record['updates'] }}" - "{{ record['MetaData']['LastUpdatedTime'] }}" @@ -1611,7 +1610,6 @@ definitions: - stream_interval - stream_partition - stream_slice - - stream_state examples: - "/products" - "/quotes/{{ stream_partition['id'] }}/quote_line_groups" @@ -1661,7 +1659,6 @@ definitions: - stream_interval - stream_partition - stream_slice - - stream_state examples: - | [{"clause": {"type": "timestamp", "operator": 10, "parameters": @@ -1679,7 +1676,6 @@ definitions: - stream_interval - stream_partition - stream_slice - - stream_state examples: - sort_order: "ASC" sort_field: "CREATED_AT" @@ -1700,7 +1696,6 @@ definitions: - stream_interval - stream_partition - stream_slice - - stream_state examples: - Output-Format: JSON - Version: "{{ config['version'] }}" @@ -1717,7 +1712,6 @@ definitions: - stream_interval - stream_partition - stream_slice - - stream_state examples: - unit: "day" - query: 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' @@ -2072,7 +2066,6 @@ definitions: interpolation_context: - config - record - - stream_state - stream_slice new: type: string @@ -2086,7 +2079,6 @@ definitions: interpolation_context: - config - record - - stream_state - stream_slice $parameters: type: object @@ -2753,7 +2745,6 @@ definitions: - stream_interval - stream_partition - stream_slice - - stream_state examples: - "{{ record['created_at'] >= stream_interval['start_time'] }}" - "{{ record.status in ['active', 'expired'] }}" From ac48f8367a7befacf020900ab3cf60a670bc5015 Mon Sep 17 00:00:00 2001 From: pnilan Date: Wed, 5 Feb 2025 12:53:05 -0800 Subject: [PATCH 02/12] consolidate stream_state interpolation logging and exceptiosn --- .../concurrent_declarative_source.py | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 1ba5f8893..8c4c1c07a 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -4,6 +4,7 @@ import logging from typing import Any, Generic, Iterator, List, Mapping, Optional, Tuple +from typing_extensions import deprecated from airbyte_cdk.models import ( AirbyteCatalog, @@ -45,7 +46,7 @@ ) from airbyte_cdk.sources.declarative.transformations.add_fields import AddFields from airbyte_cdk.sources.declarative.types import ConnectionDefinition -from airbyte_cdk.sources.source import TState +from airbyte_cdk.sources.source import TState, ExperimentalClassWarning from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream from airbyte_cdk.sources.streams.concurrent.availability_strategy import ( @@ -399,6 +400,15 @@ def _is_datetime_incremental_without_partition_routing( ) ) + def _log_and_raise_stream_state_interpolation_error(self, stream_name: str, interpolation_variable: str, component_name: str) -> None: + error_message = f"Low-code stream '{stream_name}' uses interpolation of `{interpolation_variable}` in the `{component_name}` component which is no longer supported. Please remove this interpolation." + self.logger.error(error_message) + raise AirbyteTracedException(internal_message=error_message, message=error_message, failure_type=FailureType.config_error) + + @deprecated( + "This class is temporary and used to incrementally deliver low-code to concurrent", + category=ExperimentalClassWarning, + ) def _stream_supports_concurrent_partition_processing( self, declarative_stream: DeclarativeStream ) -> bool: @@ -418,15 +428,11 @@ def _stream_supports_concurrent_partition_processing( ): http_requester = declarative_stream.retriever.requester if "stream_state" in http_requester._path.string: - error_message = f"Low-code stream '{declarative_stream.name}' uses interpolation of `stream_state` in the `HttpRequester` component which is no longer supported. Please remove this interpolation. Similar functionality may be achieved by interpolating the `stream_interval` variable instead." - self.logger.error(error_message) - raise AirbyteTracedException(internal_message=error_message, message=error_message, failure_type=FailureType.config_error) + self._log_and_raise_stream_state_interpolation_error(stream_name=declarative_stream.name, interpolation_variable="stream_state", component_name="HttpRequester") request_options_provider = http_requester._request_options_provider if request_options_provider.request_options_contain_stream_state(): - error_message = f"Low-code stream '{declarative_stream.name}' uses interpolation of `stream_state` in the `HttpRequester` component which is no longer supported. Please remove this interpolation. Similar functionality may be achieved by interpolating the `stream_interval` variable instead." - self.logger.error(error_message) - raise AirbyteTracedException(internal_message=error_message, message=error_message, failure_type=FailureType.config_error) + self._log_and_raise_stream_state_interpolation_error(stream_name=declarative_stream.name, interpolation_variable="stream_state", component_name="HttpRequester") record_selector = declarative_stream.retriever.record_selector if isinstance(record_selector, RecordSelector): @@ -437,9 +443,7 @@ def _stream_supports_concurrent_partition_processing( ) and "stream_state" in record_selector.record_filter.condition ): - error_message = f"Low-code stream '{declarative_stream.name}' uses interpolation of `stream_state` in the `RecordFilter` component which is no longer supported. Please remove this interpolation. Similar functionality may be achieved by interpolating the `stream_interval` variable instead." - self.logger.error(error_message) - raise AirbyteTracedException(internal_message=error_message, message=error_message, failure_type=FailureType.config_error) + self._log_and_raise_stream_state_interpolation_error(stream_name=declarative_stream.name, interpolation_variable="stream_state", component_name="RecordFilter") for add_fields in [ transformation @@ -448,16 +452,12 @@ def _stream_supports_concurrent_partition_processing( ]: for field in add_fields.fields: if isinstance(field.value, str) and "stream_state" in field.value: - error_message = f"Low-code stream '{declarative_stream.name}' uses interpolation of `stream_state` in the `AddFields` component which is no longer supported. Please remove this interpolation. Similar functionality may be achieved by interpolating the `stream_interval` variable instead." - self.logger.error(error_message) - raise AirbyteTracedException(internal_message=error_message, message=error_message, failure_type=FailureType.config_error) + self._log_and_raise_stream_state_interpolation_error(stream_name=declarative_stream.name, interpolation_variable="stream_state", component_name="AddFields") if ( isinstance(field.value, InterpolatedString) and "stream_state" in field.value.string ): - error_message = f"Low-code stream '{declarative_stream.name}' uses interpolation of `stream_state` in the `AddFields` component which is no longer supported. Please remove this interpolation. Similar functionality may be achieved by interpolating the `stream_interval` variable instead." - self.logger.error(error_message) - raise AirbyteTracedException(internal_message=error_message, message=error_message, failure_type=FailureType.config_error) + self._log_and_raise_stream_state_interpolation_error(stream_name=declarative_stream.name, interpolation_variable="stream_state", component_name="AddFields") return True @staticmethod From 8b11425961dccb073fc010dfccad5f6a3f8b952d Mon Sep 17 00:00:00 2001 From: pnilan Date: Thu, 6 Feb 2025 11:23:56 -0800 Subject: [PATCH 03/12] Remove `stream_state` from `interpolation_contexts`, no longer pass `stream_state` to jinja interpolation --- .../concurrent_declarative_source.py | 39 +++++++++-------- .../declarative_component_schema.yaml | 2 +- .../declarative/interpolation/jinja.py | 13 ++++++ .../declarative/requesters/http_requester.py | 1 - .../interpolated_request_input_provider.py | 2 - .../interpolated_request_options_provider.py | 43 +------------------ .../declarative/transformations/add_fields.py | 2 +- ...est_interpolated_request_input_provider.py | 2 +- 8 files changed, 38 insertions(+), 66 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 8c4c1c07a..d4ecc0084 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -4,14 +4,12 @@ import logging from typing import Any, Generic, Iterator, List, Mapping, Optional, Tuple -from typing_extensions import deprecated from airbyte_cdk.models import ( AirbyteCatalog, AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog, - FailureType ) from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager @@ -46,7 +44,7 @@ ) from airbyte_cdk.sources.declarative.transformations.add_fields import AddFields from airbyte_cdk.sources.declarative.types import ConnectionDefinition -from airbyte_cdk.sources.source import TState, ExperimentalClassWarning +from airbyte_cdk.sources.source import TState from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream from airbyte_cdk.sources.streams.concurrent.availability_strategy import ( @@ -55,7 +53,6 @@ from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, FinalStateCursor from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream -from airbyte_cdk.utils import AirbyteTracedException class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]): @@ -400,15 +397,6 @@ def _is_datetime_incremental_without_partition_routing( ) ) - def _log_and_raise_stream_state_interpolation_error(self, stream_name: str, interpolation_variable: str, component_name: str) -> None: - error_message = f"Low-code stream '{stream_name}' uses interpolation of `{interpolation_variable}` in the `{component_name}` component which is no longer supported. Please remove this interpolation." - self.logger.error(error_message) - raise AirbyteTracedException(internal_message=error_message, message=error_message, failure_type=FailureType.config_error) - - @deprecated( - "This class is temporary and used to incrementally deliver low-code to concurrent", - category=ExperimentalClassWarning, - ) def _stream_supports_concurrent_partition_processing( self, declarative_stream: DeclarativeStream ) -> bool: @@ -428,11 +416,17 @@ def _stream_supports_concurrent_partition_processing( ): http_requester = declarative_stream.retriever.requester if "stream_state" in http_requester._path.string: - self._log_and_raise_stream_state_interpolation_error(stream_name=declarative_stream.name, interpolation_variable="stream_state", component_name="HttpRequester") + self.logger.warning( + f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the HttpRequester which is not thread-safe. Defaulting to synchronous processing" + ) + return False request_options_provider = http_requester._request_options_provider if request_options_provider.request_options_contain_stream_state(): - self._log_and_raise_stream_state_interpolation_error(stream_name=declarative_stream.name, interpolation_variable="stream_state", component_name="HttpRequester") + self.logger.warning( + f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the HttpRequester which is not thread-safe. Defaulting to synchronous processing" + ) + return False record_selector = declarative_stream.retriever.record_selector if isinstance(record_selector, RecordSelector): @@ -443,7 +437,10 @@ def _stream_supports_concurrent_partition_processing( ) and "stream_state" in record_selector.record_filter.condition ): - self._log_and_raise_stream_state_interpolation_error(stream_name=declarative_stream.name, interpolation_variable="stream_state", component_name="RecordFilter") + self.logger.warning( + f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the RecordFilter which is not thread-safe. Defaulting to synchronous processing" + ) + return False for add_fields in [ transformation @@ -452,12 +449,18 @@ def _stream_supports_concurrent_partition_processing( ]: for field in add_fields.fields: if isinstance(field.value, str) and "stream_state" in field.value: - self._log_and_raise_stream_state_interpolation_error(stream_name=declarative_stream.name, interpolation_variable="stream_state", component_name="AddFields") + self.logger.warning( + f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the AddFields which is not thread-safe. Defaulting to synchronous processing" + ) + return False if ( isinstance(field.value, InterpolatedString) and "stream_state" in field.value.string ): - self._log_and_raise_stream_state_interpolation_error(stream_name=declarative_stream.name, interpolation_variable="stream_state", component_name="AddFields") + self.logger.warning( + f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the AddFields which is not thread-safe. Defaulting to synchronous processing" + ) + return False return True @staticmethod diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 3528747f7..f97644ab2 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -793,7 +793,7 @@ definitions: description: This option is used to adjust the upper and lower boundaries of each datetime window to beginning and end of the provided target period (day, week, month) type: object required: - - target + - target properties: target: title: Target diff --git a/airbyte_cdk/sources/declarative/interpolation/jinja.py b/airbyte_cdk/sources/declarative/interpolation/jinja.py index 3bb9b0c20..2991a3cab 100644 --- a/airbyte_cdk/sources/declarative/interpolation/jinja.py +++ b/airbyte_cdk/sources/declarative/interpolation/jinja.py @@ -15,6 +15,8 @@ from airbyte_cdk.sources.declarative.interpolation.interpolation import Interpolation from airbyte_cdk.sources.declarative.interpolation.macros import macros from airbyte_cdk.sources.types import Config +from airbyte_cdk.models import FailureType +from airbyte_cdk.utils import AirbyteTracedException class StreamPartitionAccessEnvironment(SandboxedEnvironment): @@ -36,6 +38,10 @@ def is_safe_attribute(self, obj: Any, attr: str, value: Any) -> bool: "stream_partition": "stream_slice", # Use stream_partition to access partition router's values } +_UNSUPPORTED_INTERPOLATION_VARIABLES: Mapping[str, str] = { + "stream_state": "`stream_state` is no longer supported for interpolation. We recommend using `stream_interval` instead. Please reference the CDK Migration Guide for more information.", +} + # These extensions are not installed so they're not currently a problem, # but we're still explicitly removing them from the jinja context. # At worst, this is documentation that we do NOT want to include these extensions because of the potential security risks @@ -95,6 +101,13 @@ def eval( elif equivalent in context: context[alias] = context[equivalent] + for variable_name in _UNSUPPORTED_INTERPOLATION_VARIABLES: + if variable_name in input_str: + raise AirbyteTracedException( + message=_UNSUPPORTED_INTERPOLATION_VARIABLES[variable_name], + internal_message=_UNSUPPORTED_INTERPOLATION_VARIABLES[variable_name], + failure_type=FailureType.config_error, + ) try: if isinstance(input_str, str): result = self._eval(input_str, context) diff --git a/airbyte_cdk/sources/declarative/requesters/http_requester.py b/airbyte_cdk/sources/declarative/requesters/http_requester.py index ad23f4d06..87ec44888 100644 --- a/airbyte_cdk/sources/declarative/requesters/http_requester.py +++ b/airbyte_cdk/sources/declarative/requesters/http_requester.py @@ -120,7 +120,6 @@ def get_path( next_page_token: Optional[Mapping[str, Any]], ) -> str: kwargs = { - "stream_state": stream_state, "stream_slice": stream_slice, "next_page_token": next_page_token, } diff --git a/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_input_provider.py b/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_input_provider.py index 0278df351..a409ebcbd 100644 --- a/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_input_provider.py +++ b/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_input_provider.py @@ -37,7 +37,6 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: def eval_request_inputs( self, - stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, valid_key_types: Optional[Tuple[Type[Any]]] = None, @@ -54,7 +53,6 @@ def eval_request_inputs( :return: The request inputs to set on an outgoing HTTP request """ kwargs = { - "stream_state": stream_state, "stream_slice": stream_slice, "next_page_token": next_page_token, } diff --git a/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py b/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py index c327b83da..9e299d696 100644 --- a/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py +++ b/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py @@ -5,8 +5,6 @@ from dataclasses import InitVar, dataclass, field from typing import Any, Mapping, MutableMapping, Optional, Union -from typing_extensions import deprecated - from airbyte_cdk.sources.declarative.interpolation.interpolated_nested_mapping import NestedMapping from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_nested_request_input_provider import ( InterpolatedNestedRequestInputProvider, @@ -17,7 +15,6 @@ from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import ( RequestOptionsProvider, ) -from airbyte_cdk.sources.source import ExperimentalClassWarning from airbyte_cdk.sources.types import Config, StreamSlice, StreamState RequestInput = Union[str, Mapping[str, str]] @@ -80,7 +77,6 @@ def get_request_params( next_page_token: Optional[Mapping[str, Any]] = None, ) -> MutableMapping[str, Any]: interpolated_value = self._parameter_interpolator.eval_request_inputs( - stream_state, stream_slice, next_page_token, valid_key_types=(str,), @@ -98,7 +94,7 @@ def get_request_headers( next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: return self._headers_interpolator.eval_request_inputs( - stream_state, stream_slice, next_page_token + stream_slice, next_page_token ) def get_request_body_data( @@ -109,7 +105,6 @@ def get_request_body_data( next_page_token: Optional[Mapping[str, Any]] = None, ) -> Union[Mapping[str, Any], str]: return self._body_data_interpolator.eval_request_inputs( - stream_state, stream_slice, next_page_token, valid_key_types=(str,), @@ -126,39 +121,3 @@ def get_request_body_json( return self._body_json_interpolator.eval_request_inputs( stream_state, stream_slice, next_page_token ) - - @deprecated( - "This class is temporary and used to incrementally deliver low-code to concurrent", - category=ExperimentalClassWarning, - ) - def request_options_contain_stream_state(self) -> bool: - """ - Temporary helper method used as we move low-code streams to the concurrent framework. This method determines if - the InterpolatedRequestOptionsProvider has is a dependency on a non-thread safe interpolation context such as - stream_state. - """ - - return ( - self._check_if_interpolation_uses_stream_state(self.request_parameters) - or self._check_if_interpolation_uses_stream_state(self.request_headers) - or self._check_if_interpolation_uses_stream_state(self.request_body_data) - or self._check_if_interpolation_uses_stream_state(self.request_body_json) - ) - - @staticmethod - def _check_if_interpolation_uses_stream_state( - request_input: Optional[Union[RequestInput, NestedMapping]], - ) -> bool: - if not request_input: - return False - elif isinstance(request_input, str): - return "stream_state" in request_input - else: - for key, val in request_input.items(): - # Covers the case of RequestInput in the form of a string or Mapping[str, str]. It also covers the case - # of a NestedMapping where the value is a string. - # Note: Doesn't account for nested mappings for request_body_json, but I don't see stream_state used in that way - # in our code - if "stream_state" in key or (isinstance(val, str) and "stream_state" in val): - return True - return False diff --git a/airbyte_cdk/sources/declarative/transformations/add_fields.py b/airbyte_cdk/sources/declarative/transformations/add_fields.py index 4c9d5366c..909768756 100644 --- a/airbyte_cdk/sources/declarative/transformations/add_fields.py +++ b/airbyte_cdk/sources/declarative/transformations/add_fields.py @@ -128,7 +128,7 @@ def transform( ) -> None: if config is None: config = {} - kwargs = {"record": record, "stream_state": stream_state, "stream_slice": stream_slice} + kwargs = {"record": record, "stream_slice": stream_slice} for parsed_field in self._parsed_fields: valid_types = (parsed_field.value_type,) if parsed_field.value_type else None value = parsed_field.value.eval(config, valid_types=valid_types, **kwargs) diff --git a/unit_tests/sources/declarative/requesters/test_interpolated_request_input_provider.py b/unit_tests/sources/declarative/requesters/test_interpolated_request_input_provider.py index 6a332b528..5161fe765 100644 --- a/unit_tests/sources/declarative/requesters/test_interpolated_request_input_provider.py +++ b/unit_tests/sources/declarative/requesters/test_interpolated_request_input_provider.py @@ -45,7 +45,7 @@ def test_initialize_interpolated_mapping_request_input_provider( provider = InterpolatedRequestInputProvider( request_inputs=input_request_data, config=config, parameters=parameters ) - actual_request_data = provider.eval_request_inputs(stream_state={}, stream_slice=stream_slice) + actual_request_data = provider.eval_request_inputs(stream_slice=stream_slice) assert isinstance(provider._interpolator, InterpolatedMapping) assert actual_request_data == expected_request_data From e94cdddd78639e57c8a36d7ab3671a165a7637f2 Mon Sep 17 00:00:00 2001 From: pnilan Date: Thu, 6 Feb 2025 11:39:44 -0800 Subject: [PATCH 04/12] Removes stream_state from a missed location, formats code --- .../interpolated_nested_request_input_provider.py | 5 +---- .../interpolated_request_input_provider.py | 1 - .../interpolated_request_options_provider.py | 8 ++------ 3 files changed, 3 insertions(+), 11 deletions(-) diff --git a/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_nested_request_input_provider.py b/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_nested_request_input_provider.py index 6403417c9..4e175bb28 100644 --- a/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_nested_request_input_provider.py +++ b/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_nested_request_input_provider.py @@ -10,7 +10,7 @@ NestedMapping, ) from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString -from airbyte_cdk.sources.types import Config, StreamSlice, StreamState +from airbyte_cdk.sources.types import Config, StreamSlice @dataclass @@ -42,20 +42,17 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: def eval_request_inputs( self, - stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: """ Returns the request inputs to set on an outgoing HTTP request - :param stream_state: The stream state :param stream_slice: The stream slice :param next_page_token: The pagination token :return: The request inputs to set on an outgoing HTTP request """ kwargs = { - "stream_state": stream_state, "stream_slice": stream_slice, "next_page_token": next_page_token, } diff --git a/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_input_provider.py b/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_input_provider.py index a409ebcbd..ed0e54c60 100644 --- a/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_input_provider.py +++ b/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_input_provider.py @@ -45,7 +45,6 @@ def eval_request_inputs( """ Returns the request inputs to set on an outgoing HTTP request - :param stream_state: The stream state :param stream_slice: The stream slice :param next_page_token: The pagination token :param valid_key_types: A tuple of types that the interpolator should allow diff --git a/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py b/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py index 9e299d696..e14c64de0 100644 --- a/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py +++ b/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py @@ -93,9 +93,7 @@ def get_request_headers( stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: - return self._headers_interpolator.eval_request_inputs( - stream_slice, next_page_token - ) + return self._headers_interpolator.eval_request_inputs(stream_slice, next_page_token) def get_request_body_data( self, @@ -118,6 +116,4 @@ def get_request_body_json( stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: - return self._body_json_interpolator.eval_request_inputs( - stream_state, stream_slice, next_page_token - ) + return self._body_json_interpolator.eval_request_inputs(stream_slice, next_page_token) From 22b41719a609a823aa453ced29bd210062ef0354 Mon Sep 17 00:00:00 2001 From: pnilan Date: Thu, 6 Feb 2025 11:46:39 -0800 Subject: [PATCH 05/12] remove helper methods checking for stream_state interpolation --- .../concurrent_declarative_source.py | 75 ------------------- 1 file changed, 75 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index d4ecc0084..880b73f2b 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -24,7 +24,6 @@ from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import ( PerPartitionWithGlobalCursor, ) -from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ConcurrencyLevel as ConcurrencyLevelModel, @@ -36,13 +35,11 @@ ModelToComponentFactory, ) from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter -from airbyte_cdk.sources.declarative.requesters import HttpRequester from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, Retriever, SimpleRetriever from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import ( DeclarativePartitionFactory, StreamSlicerPartitionGenerator, ) -from airbyte_cdk.sources.declarative.transformations.add_fields import AddFields from airbyte_cdk.sources.declarative.types import ConnectionDefinition from airbyte_cdk.sources.source import TState from airbyte_cdk.sources.streams import Stream @@ -320,9 +317,6 @@ def _group_streams( incremental_sync_component_definition and incremental_sync_component_definition.get("type", "") == DatetimeBasedCursorModel.__name__ - and self._stream_supports_concurrent_partition_processing( - declarative_stream=declarative_stream - ) and hasattr(declarative_stream.retriever, "stream_slicer") and isinstance( declarative_stream.retriever.stream_slicer, PerPartitionWithGlobalCursor @@ -387,9 +381,6 @@ def _is_datetime_incremental_without_partition_routing( and bool(incremental_sync_component_definition) and incremental_sync_component_definition.get("type", "") == DatetimeBasedCursorModel.__name__ - and self._stream_supports_concurrent_partition_processing( - declarative_stream=declarative_stream - ) and hasattr(declarative_stream.retriever, "stream_slicer") and ( isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor) @@ -397,72 +388,6 @@ def _is_datetime_incremental_without_partition_routing( ) ) - def _stream_supports_concurrent_partition_processing( - self, declarative_stream: DeclarativeStream - ) -> bool: - """ - Many connectors make use of stream_state during interpolation on a per-partition basis under the assumption that - state is updated sequentially. Because the concurrent CDK engine processes different partitions in parallel, - stream_state is no longer a thread-safe interpolation context. It would be a race condition because a cursor's - stream_state can be updated in any order depending on which stream partition's finish first. - - We should start to move away from depending on the value of stream_state for low-code components that operate - per-partition, but we need to gate this otherwise some connectors will be blocked from publishing. See the - cdk-migrations.md for the full list of connectors. - """ - - if isinstance(declarative_stream.retriever, SimpleRetriever) and isinstance( - declarative_stream.retriever.requester, HttpRequester - ): - http_requester = declarative_stream.retriever.requester - if "stream_state" in http_requester._path.string: - self.logger.warning( - f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the HttpRequester which is not thread-safe. Defaulting to synchronous processing" - ) - return False - - request_options_provider = http_requester._request_options_provider - if request_options_provider.request_options_contain_stream_state(): - self.logger.warning( - f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the HttpRequester which is not thread-safe. Defaulting to synchronous processing" - ) - return False - - record_selector = declarative_stream.retriever.record_selector - if isinstance(record_selector, RecordSelector): - if ( - record_selector.record_filter - and not isinstance( - record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator - ) - and "stream_state" in record_selector.record_filter.condition - ): - self.logger.warning( - f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the RecordFilter which is not thread-safe. Defaulting to synchronous processing" - ) - return False - - for add_fields in [ - transformation - for transformation in record_selector.transformations - if isinstance(transformation, AddFields) - ]: - for field in add_fields.fields: - if isinstance(field.value, str) and "stream_state" in field.value: - self.logger.warning( - f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the AddFields which is not thread-safe. Defaulting to synchronous processing" - ) - return False - if ( - isinstance(field.value, InterpolatedString) - and "stream_state" in field.value.string - ): - self.logger.warning( - f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the AddFields which is not thread-safe. Defaulting to synchronous processing" - ) - return False - return True - @staticmethod def _get_retriever( declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any] From 315b3d89945c18591f1cbd2d566fe35919ee0c1a Mon Sep 17 00:00:00 2001 From: pnilan Date: Thu, 6 Feb 2025 11:50:58 -0800 Subject: [PATCH 06/12] chore: format --- airbyte_cdk/sources/declarative/interpolation/jinja.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/interpolation/jinja.py b/airbyte_cdk/sources/declarative/interpolation/jinja.py index 2991a3cab..8f8548aee 100644 --- a/airbyte_cdk/sources/declarative/interpolation/jinja.py +++ b/airbyte_cdk/sources/declarative/interpolation/jinja.py @@ -11,11 +11,11 @@ from jinja2.exceptions import UndefinedError from jinja2.sandbox import SandboxedEnvironment +from airbyte_cdk.models import FailureType from airbyte_cdk.sources.declarative.interpolation.filters import filters from airbyte_cdk.sources.declarative.interpolation.interpolation import Interpolation from airbyte_cdk.sources.declarative.interpolation.macros import macros from airbyte_cdk.sources.types import Config -from airbyte_cdk.models import FailureType from airbyte_cdk.utils import AirbyteTracedException From 15aa931e45de71728eeb6f1549fc45fdbc27d3ef Mon Sep 17 00:00:00 2001 From: pnilan Date: Mon, 10 Feb 2025 10:32:11 -0800 Subject: [PATCH 07/12] remove stream state interpolation test --- .../test_concurrent_declarative_source.py | 32 ------------------- 1 file changed, 32 deletions(-) diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 1877e11bb..d35071df5 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -1438,38 +1438,6 @@ def test_concurrency_level_initial_number_partitions_to_generate_is_always_one_o assert source._concurrent_source._initial_number_partitions_to_generate == 1 -def test_streams_with_stream_state_interpolation_should_be_synchronous(): - manifest_with_stream_state_interpolation = copy.deepcopy(_MANIFEST) - - # Add stream_state interpolation to the location stream's HttpRequester - manifest_with_stream_state_interpolation["definitions"]["locations_stream"]["retriever"][ - "requester" - ]["request_parameters"] = { - "after": "{{ stream_state['updated_at'] }}", - } - - # Add a RecordFilter component that uses stream_state interpolation to the party member stream - manifest_with_stream_state_interpolation["definitions"]["party_members_stream"]["retriever"][ - "record_selector" - ]["record_filter"] = { - "type": "RecordFilter", - "condition": "{{ record.updated_at > stream_state['updated_at'] }}", - } - - source = ConcurrentDeclarativeSource( - source_config=manifest_with_stream_state_interpolation, - config=_CONFIG, - catalog=_CATALOG, - state=None, - ) - concurrent_streams, synchronous_streams = source._group_streams(config=_CONFIG) - - # 1 full refresh stream, 2 with parent stream without incremental dependency, 1 stream with async retriever, 1 incremental with parent stream (palace_enemies) - assert len(concurrent_streams) == 5 - # 2 incremental stream with interpolation on state (locations and party_members) - assert len(synchronous_streams) == 2 - - def test_given_partition_routing_and_incremental_sync_then_stream_is_concurrent(): manifest = { "version": "5.0.0", From 4a1211ee4c366099d117e13247f90c05043440fe Mon Sep 17 00:00:00 2001 From: pnilan Date: Thu, 13 Feb 2025 16:17:18 -0800 Subject: [PATCH 08/12] update tests to remove stream_state interpolation --- .../datetime/test_min_max_datetime.py | 32 ++--- .../decoders/test_decoders_memory_usage.py | 126 +++++++++--------- .../extractors/test_record_filter.py | 2 +- .../extractors/test_record_selector.py | 8 +- .../test_concurrent_perpartitioncursor.py | 4 +- .../test_parent_state_stream.py | 4 +- ...t_interpolated_request_options_provider.py | 107 ++------------- .../transformations/test_add_fields.py | 16 --- 8 files changed, 101 insertions(+), 198 deletions(-) diff --git a/unit_tests/sources/declarative/datetime/test_min_max_datetime.py b/unit_tests/sources/declarative/datetime/test_min_max_datetime.py index 1cc0e6014..590b5dcf1 100644 --- a/unit_tests/sources/declarative/datetime/test_min_max_datetime.py +++ b/unit_tests/sources/declarative/datetime/test_min_max_datetime.py @@ -22,13 +22,13 @@ ( "test_time_is_greater_than_min", "{{ config['older'] }}", - "{{ stream_state['newer'] }}", + "{{ stream_slice['start_date'] }}", "", new_date, ), ( "test_time_is_less_than_min", - "{{ stream_state['newer'] }}", + "{{ stream_slice['start_date'] }}", "{{ config['older'] }}", "", new_date, @@ -42,7 +42,7 @@ ), ( "test_time_is_greater_than_max", - "{{ stream_state['newer'] }}", + "{{ stream_slice['start_date'] }}", "", "{{ config['older'] }}", old_date, @@ -51,13 +51,13 @@ "test_time_is_less_than_max", "{{ config['older'] }}", "", - "{{ stream_state['newer'] }}", + "{{ stream_slice['start_date'] }}", old_date, ), ( "test_time_is_equal_to_min", - "{{ stream_state['newer'] }}", - "{{ stream_state['newer'] }}", + "{{ stream_slice['start_date'] }}", + "{{ stream_slice['start_date'] }}", "", new_date, ), @@ -65,7 +65,7 @@ "test_time_is_between_min_and_max", "{{ config['middle'] }}", "{{ config['older'] }}", - "{{ stream_state['newer'] }}", + "{{ stream_slice['start_date'] }}", middle_date, ), ( @@ -77,7 +77,7 @@ ), ( "test_max_newer_time_from_parameters", - "{{ stream_state['newer'] }}", + "{{ stream_slice['start_date'] }}", "", "{{ parameters['older'] }}", old_date, @@ -86,29 +86,29 @@ ) def test_min_max_datetime(test_name, date, min_date, max_date, expected_date): config = {"older": old_date, "middle": middle_date} - stream_state = {"newer": new_date} + stream_slice = {"start_date": new_date} parameters = {"newer": new_date, "older": old_date} min_max_date = MinMaxDatetime( datetime=date, min_datetime=min_date, max_datetime=max_date, parameters=parameters ) - actual_date = min_max_date.get_datetime(config, **{"stream_state": stream_state}) + actual_date = min_max_date.get_datetime(config, **{"stream_slice": stream_slice}) assert actual_date == datetime.datetime.strptime(expected_date, date_format) def test_custom_datetime_format(): config = {"older": "2021-01-01T20:12:19", "middle": "2022-01-01T20:12:19"} - stream_state = {"newer": "2022-06-24T20:12:19"} + stream_slice = {"newer": "2022-06-24T20:12:19"} min_max_date = MinMaxDatetime( datetime="{{ config['middle'] }}", datetime_format="%Y-%m-%dT%H:%M:%S", min_datetime="{{ config['older'] }}", - max_datetime="{{ stream_state['newer'] }}", + max_datetime="{{ stream_slice['newer'] }}", parameters={}, ) - actual_date = min_max_date.get_datetime(config, **{"stream_state": stream_state}) + actual_date = min_max_date.get_datetime(config, **{"stream_slice": stream_slice}) assert actual_date == datetime.datetime.strptime( "2022-01-01T20:12:19", "%Y-%m-%dT%H:%M:%S" @@ -117,16 +117,16 @@ def test_custom_datetime_format(): def test_format_is_a_number(): config = {"older": "20210101", "middle": "20220101"} - stream_state = {"newer": "20220624"} + stream_slice = {"newer": "20220624"} min_max_date = MinMaxDatetime( datetime="{{ config['middle'] }}", datetime_format="%Y%m%d", min_datetime="{{ config['older'] }}", - max_datetime="{{ stream_state['newer'] }}", + max_datetime="{{ stream_slice['newer'] }}", parameters={}, ) - actual_date = min_max_date.get_datetime(config, **{"stream_state": stream_state}) + actual_date = min_max_date.get_datetime(config, **{"stream_slice": stream_slice}) assert actual_date == datetime.datetime.strptime("20220101", "%Y%m%d").replace( tzinfo=datetime.timezone.utc diff --git a/unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py b/unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py index 241b45822..908d29a51 100644 --- a/unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py +++ b/unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py @@ -30,71 +30,71 @@ def large_event_response_fixture(): os.remove(file_path) -@pytest.mark.slow -@pytest.mark.limit_memory("20 MB") -@pytest.mark.parametrize( - "decoder_yaml_definition", - [ - "type: JsonlDecoder", - ], -) -def test_jsonl_decoder_memory_usage( - requests_mock, large_events_response, decoder_yaml_definition: str -): - # - lines_in_response, file_path = large_events_response - content = f""" - name: users - type: DeclarativeStream - retriever: - type: SimpleRetriever - decoder: - {decoder_yaml_definition} - paginator: - type: "NoPagination" - requester: - path: "users/{{{{ stream_slice.slice }}}}" - type: HttpRequester - url_base: "https://for-all-mankind.nasa.com/api/v1" - http_method: GET - authenticator: - type: NoAuth - request_headers: {{}} - request_body_json: {{}} - record_selector: - type: RecordSelector - extractor: - type: DpathExtractor - field_path: [] - partition_router: - type: ListPartitionRouter - cursor_field: "slice" - values: - - users1 - - users2 - - users3 - - users4 - primary_key: [] - """ +# @pytest.mark.slow +# @pytest.mark.limit_memory("20 MB") +# @pytest.mark.parametrize( +# "decoder_yaml_definition", +# [ +# "type: JsonlDecoder", +# ], +# ) +# def test_jsonl_decoder_memory_usage( +# requests_mock, large_events_response, decoder_yaml_definition: str +# ): +# # +# lines_in_response, file_path = large_events_response +# content = f""" +# name: users +# type: DeclarativeStream +# retriever: +# type: SimpleRetriever +# decoder: +# {decoder_yaml_definition} +# paginator: +# type: "NoPagination" +# requester: +# path: "users/{{{{ stream_slice.slice }}}}" +# type: HttpRequester +# url_base: "https://for-all-mankind.nasa.com/api/v1" +# http_method: GET +# authenticator: +# type: NoAuth +# request_headers: {{}} +# request_body_json: {{}} +# record_selector: +# type: RecordSelector +# extractor: +# type: DpathExtractor +# field_path: [] +# partition_router: +# type: ListPartitionRouter +# cursor_field: "slice" +# values: +# - users1 +# - users2 +# - users3 +# - users4 +# primary_key: [] +# """ - factory = ModelToComponentFactory() - stream_manifest = YamlDeclarativeSource._parse(content) - stream = factory.create_component( - model_type=DeclarativeStreamModel, component_definition=stream_manifest, config={} - ) +# factory = ModelToComponentFactory() +# stream_manifest = YamlDeclarativeSource._parse(content) +# stream = factory.create_component( +# model_type=DeclarativeStreamModel, component_definition=stream_manifest, config={} +# ) - def get_body(): - return open(file_path, "rb", buffering=30) +# def get_body(): +# return open(file_path, "rb", buffering=30) - counter = 0 - requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users1", body=get_body()) - requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users2", body=get_body()) - requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users3", body=get_body()) - requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users4", body=get_body()) +# counter = 0 +# requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users1", body=get_body()) +# requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users2", body=get_body()) +# requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users3", body=get_body()) +# requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users4", body=get_body()) - stream_slices = list(stream.stream_slices(sync_mode=SyncMode.full_refresh)) - for stream_slice in stream_slices: - for _ in stream.retriever.read_records(records_schema={}, stream_slice=stream_slice): - counter += 1 +# stream_slices = list(stream.stream_slices(sync_mode=SyncMode.full_refresh)) +# for stream_slice in stream_slices: +# for _ in stream.retriever.read_records(records_schema={}, stream_slice=stream_slice): +# counter += 1 - assert counter == lines_in_response * len(stream_slices) +# assert counter == lines_in_response * len(stream_slices) diff --git a/unit_tests/sources/declarative/extractors/test_record_filter.py b/unit_tests/sources/declarative/extractors/test_record_filter.py index 5df391327..24956078f 100644 --- a/unit_tests/sources/declarative/extractors/test_record_filter.py +++ b/unit_tests/sources/declarative/extractors/test_record_filter.py @@ -56,7 +56,7 @@ "filter_template, records, expected_records", [ ( - "{{ record['created_at'] > stream_state['created_at'] }}", + "{{ record['created_at'] >= stream_interval.extra_fields['created_at'] }}", [ {"id": 1, "created_at": "06-06-21"}, {"id": 2, "created_at": "06-07-21"}, diff --git a/unit_tests/sources/declarative/extractors/test_record_selector.py b/unit_tests/sources/declarative/extractors/test_record_selector.py index 5ec883ad2..2d32bc75c 100644 --- a/unit_tests/sources/declarative/extractors/test_record_selector.py +++ b/unit_tests/sources/declarative/extractors/test_record_selector.py @@ -23,7 +23,7 @@ ( "test_with_extractor_and_filter", ["data"], - "{{ record['created_at'] > stream_state['created_at'] }}", + "{{ record['created_at'] > stream_interval.extra_fields['created_at'] }}", { "data": [ {"id": 1, "created_at": "06-06-21"}, @@ -80,7 +80,11 @@ def test_record_filter(test_name, field_path, filter_template, body, expected_da config = {"response_override": "stop_if_you_see_me"} parameters = {"parameters_field": "data", "created_at": "06-07-21"} stream_state = {"created_at": "06-06-21"} - stream_slice = StreamSlice(partition={}, cursor_slice={"last_seen": "06-10-21"}) + stream_slice = StreamSlice( + partition={}, + cursor_slice={"last_seen": "06-10-21"}, + extra_fields={"created_at": "06-06-21"}, + ) next_page_token = {"last_seen_id": 14} schema = create_schema() first_transformation = Mock(spec=RecordTransformation) diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index ef06676f5..3ec3bee99 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -143,7 +143,7 @@ "type": "RecordSelector", "extractor": {"type": "DpathExtractor", "field_path": ["comments"]}, "record_filter": { - "condition": "{{ record['updated_at'] >= stream_state.get('updated_at', config.get('start_date')) }}" + "condition": "{{ record['updated_at'] >= stream_interval.extra_fields.get('updated_at', config.get('start_date')) }}" }, }, "paginator": "#/definitions/retriever/paginator", @@ -2543,7 +2543,7 @@ def test_incremental_error( "type": "RecordSelector", "extractor": {"type": "DpathExtractor", "field_path": ["comments"]}, "record_filter": { - "condition": "{{ record['updated_at'] >= stream_state.get('updated_at', config.get('start_date')) }}" + "condition": "{{ record['updated_at'] >= stream_interval['extra_fields'].get('updated_at', config.get('start_date')) }}" }, }, "paginator": "#/definitions/retriever/paginator", diff --git a/unit_tests/sources/declarative/partition_routers/test_parent_state_stream.py b/unit_tests/sources/declarative/partition_routers/test_parent_state_stream.py index b65f1f724..2cc6080e9 100644 --- a/unit_tests/sources/declarative/partition_routers/test_parent_state_stream.py +++ b/unit_tests/sources/declarative/partition_routers/test_parent_state_stream.py @@ -145,7 +145,7 @@ "type": "RecordSelector", "extractor": {"type": "DpathExtractor", "field_path": ["comments"]}, "record_filter": { - "condition": "{{ record['updated_at'] >= stream_state.get('updated_at', config.get('start_date')) }}" + "condition": "{{ record['updated_at'] >= stream_interval.get('start_date', config.get('start_date')) }}" }, }, "paginator": "#/definitions/retriever/paginator", @@ -1655,7 +1655,7 @@ def test_incremental_parent_state_no_incremental_dependency( "type": "RecordSelector", "extractor": {"type": "DpathExtractor", "field_path": ["comments"]}, "record_filter": { - "condition": "{{ record['updated_at'] >= stream_state.get('updated_at', config.get('start_date')) }}" + "condition": "{{ record['updated_at'] >= stream_interval.get('start_date', config.get('start_date')) }}" }, }, "paginator": "#/definitions/retriever/paginator", diff --git a/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py b/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py index d244cfc4c..2c646cf43 100644 --- a/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py +++ b/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py @@ -23,9 +23,9 @@ {"a_static_request_param": "a_static_value"}, ), ( - "test_value_depends_on_state", - {"read_from_state": "{{ stream_state['date'] }}"}, - {"read_from_state": "2021-01-01"}, + "test_value_depends_on_stream_interval", + {"read_from_stream_interval": "{{ stream_interval['start_date'] }}"}, + {"read_from_stream_interval": "2020-01-01"}, ), ( "test_value_depends_on_stream_slice", @@ -45,9 +45,9 @@ ( "test_parameter_is_interpolated", { - "{{ stream_state['date'] }} - {{stream_slice['start_date']}} - {{next_page_token['offset']}} - {{config['option']}}": "ABC" + "{{ stream_interval['start_date'] }} - {{stream_slice['start_date']}} - {{next_page_token['offset']}} - {{config['option']}}": "ABC" }, - {"2021-01-01 - 2020-01-01 - 12345 - OPTION": "ABC"}, + {"2020-01-01 - 2020-01-01 - 12345 - OPTION": "ABC"}, ), ("test_boolean_false_value", {"boolean_false": "{{ False }}"}, {"boolean_false": "False"}), ("test_integer_falsy_value", {"integer_falsy": "{{ 0 }}"}, {"integer_falsy": "0"}), @@ -76,11 +76,6 @@ def test_interpolated_request_params(test_name, input_request_params, expected_r {"a_static_request_param": "a_static_value"}, {"a_static_request_param": "a_static_value"}, ), - ( - "test_value_depends_on_state", - {"read_from_state": "{{ stream_state['date'] }}"}, - {"read_from_state": "2021-01-01"}, - ), ( "test_value_depends_on_stream_slice", {"read_from_slice": "{{ stream_slice['start_date'] }}"}, @@ -98,8 +93,8 @@ def test_interpolated_request_params(test_name, input_request_params, expected_r ), ( "test_interpolated_keys", - {"{{ stream_state['date'] }}": 123, "{{ config['option'] }}": "ABC"}, - {"2021-01-01": 123, "OPTION": "ABC"}, + {"{{ stream_interval['start_date'] }}": 123, "{{ config['option'] }}": "ABC"}, + {"2020-01-01": 123, "OPTION": "ABC"}, ), ("test_boolean_false_value", {"boolean_false": "{{ False }}"}, {"boolean_false": False}), ("test_integer_falsy_value", {"integer_falsy": "{{ 0 }}"}, {"integer_falsy": 0}), @@ -118,8 +113,8 @@ def test_interpolated_request_params(test_name, input_request_params, expected_r ), ( "test_nested_objects_interpolated keys", - {"nested": {"{{ stream_state['date'] }}": "{{ config['option'] }}"}}, - {"nested": {"2021-01-01": "OPTION"}}, + {"nested": {"{{ stream_interval['start_date'] }}": "{{ config['option'] }}"}}, + {"nested": {"2020-01-01": "OPTION"}}, ), ], ) @@ -156,8 +151,8 @@ def test_interpolated_request_json(test_name, input_request_json, expected_reque ("test_defaults_to_empty_dict", None, {}), ( "test_interpolated_keys", - {"{{ stream_state['date'] }} - {{ next_page_token['offset'] }}": "ABC"}, - {"2021-01-01 - 12345": "ABC"}, + {"{{ stream_interval['start_date'] }} - {{ next_page_token['offset'] }}": "ABC"}, + {"2020-01-01 - 12345": "ABC"}, ), ], ) @@ -183,83 +178,3 @@ def test_error_on_create_for_both_request_json_and_data(): request_body_data=request_data, parameters={}, ) - - -@pytest.mark.parametrize( - "request_option_type,request_input,contains_state", - [ - pytest.param( - "request_parameter", - {"start": "{{ stream_state.get('start_date') }}"}, - True, - id="test_request_parameter_has_state", - ), - pytest.param( - "request_parameter", - {"start": "{{ slice_interval.get('start_date') }}"}, - False, - id="test_request_parameter_no_state", - ), - pytest.param( - "request_header", - {"start": "{{ stream_state.get('start_date') }}"}, - True, - id="test_request_header_has_state", - ), - pytest.param( - "request_header", - {"start": "{{ slice_interval.get('start_date') }}"}, - False, - id="test_request_header_no_state", - ), - pytest.param( - "request_body_data", - "[{'query': {'type': 'timestamp', 'value': stream_state.get('start_date')}}]", - True, - id="test_request_body_data_has_state", - ), - pytest.param( - "request_body_data", - "[{'query': {'type': 'timestamp', 'value': stream_interval.get('start_date')}}]", - False, - id="test_request_body_data_no_state", - ), - pytest.param( - "request_body_json", - {"start": "{{ stream_state.get('start_date') }}"}, - True, - id="test_request_body_json_has_state", - ), - pytest.param( - "request_body_json", - {"start": "{{ slice_interval.get('start_date') }}"}, - False, - id="test_request_request_body_json_no_state", - ), - ], -) -def test_request_options_contain_stream_state(request_option_type, request_input, contains_state): - request_options_provider: InterpolatedRequestOptionsProvider - match request_option_type: - case "request_parameter": - request_options_provider = InterpolatedRequestOptionsProvider( - config=config, request_parameters=request_input, parameters={} - ) - case "request_header": - request_options_provider = InterpolatedRequestOptionsProvider( - config=config, request_headers=request_input, parameters={} - ) - case "request_body_data": - request_options_provider = InterpolatedRequestOptionsProvider( - config=config, request_body_data=request_input, parameters={} - ) - case "request_body_json": - request_options_provider = InterpolatedRequestOptionsProvider( - config=config, request_body_json=request_input, parameters={} - ) - case _: - request_options_provider = InterpolatedRequestOptionsProvider( - config=config, parameters={} - ) - - assert request_options_provider.request_options_contain_stream_state() == contains_state diff --git a/unit_tests/sources/declarative/transformations/test_add_fields.py b/unit_tests/sources/declarative/transformations/test_add_fields.py index 507f60abc..929af28a6 100644 --- a/unit_tests/sources/declarative/transformations/test_add_fields.py +++ b/unit_tests/sources/declarative/transformations/test_add_fields.py @@ -86,22 +86,6 @@ {"k": "v", "k2": "in-n-out"}, id="set a value from the config using dot notation", ), - pytest.param( - {"k": "v"}, - [(["k2"], '{{ stream_state["cursor"] }}')], - None, - {"stream_state": {"cursor": "t0"}}, - {"k": "v", "k2": "t0"}, - id="set a value from the state using bracket notation", - ), - pytest.param( - {"k": "v"}, - [(["k2"], "{{ stream_state.cursor }}")], - None, - {"stream_state": {"cursor": "t0"}}, - {"k": "v", "k2": "t0"}, - id="set a value from the state using dot notation", - ), pytest.param( {"k": "v"}, [(["k2"], '{{ stream_slice["start_date"] }}')], From 45f4c9075c5710a2314e5ae85a3119bfc3c0a16b Mon Sep 17 00:00:00 2001 From: pnilan Date: Thu, 13 Feb 2025 16:28:06 -0800 Subject: [PATCH 09/12] re-enable test --- .../decoders/test_decoders_memory_usage.py | 126 +++++++++--------- 1 file changed, 63 insertions(+), 63 deletions(-) diff --git a/unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py b/unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py index 908d29a51..241b45822 100644 --- a/unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py +++ b/unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py @@ -30,71 +30,71 @@ def large_event_response_fixture(): os.remove(file_path) -# @pytest.mark.slow -# @pytest.mark.limit_memory("20 MB") -# @pytest.mark.parametrize( -# "decoder_yaml_definition", -# [ -# "type: JsonlDecoder", -# ], -# ) -# def test_jsonl_decoder_memory_usage( -# requests_mock, large_events_response, decoder_yaml_definition: str -# ): -# # -# lines_in_response, file_path = large_events_response -# content = f""" -# name: users -# type: DeclarativeStream -# retriever: -# type: SimpleRetriever -# decoder: -# {decoder_yaml_definition} -# paginator: -# type: "NoPagination" -# requester: -# path: "users/{{{{ stream_slice.slice }}}}" -# type: HttpRequester -# url_base: "https://for-all-mankind.nasa.com/api/v1" -# http_method: GET -# authenticator: -# type: NoAuth -# request_headers: {{}} -# request_body_json: {{}} -# record_selector: -# type: RecordSelector -# extractor: -# type: DpathExtractor -# field_path: [] -# partition_router: -# type: ListPartitionRouter -# cursor_field: "slice" -# values: -# - users1 -# - users2 -# - users3 -# - users4 -# primary_key: [] -# """ +@pytest.mark.slow +@pytest.mark.limit_memory("20 MB") +@pytest.mark.parametrize( + "decoder_yaml_definition", + [ + "type: JsonlDecoder", + ], +) +def test_jsonl_decoder_memory_usage( + requests_mock, large_events_response, decoder_yaml_definition: str +): + # + lines_in_response, file_path = large_events_response + content = f""" + name: users + type: DeclarativeStream + retriever: + type: SimpleRetriever + decoder: + {decoder_yaml_definition} + paginator: + type: "NoPagination" + requester: + path: "users/{{{{ stream_slice.slice }}}}" + type: HttpRequester + url_base: "https://for-all-mankind.nasa.com/api/v1" + http_method: GET + authenticator: + type: NoAuth + request_headers: {{}} + request_body_json: {{}} + record_selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: [] + partition_router: + type: ListPartitionRouter + cursor_field: "slice" + values: + - users1 + - users2 + - users3 + - users4 + primary_key: [] + """ -# factory = ModelToComponentFactory() -# stream_manifest = YamlDeclarativeSource._parse(content) -# stream = factory.create_component( -# model_type=DeclarativeStreamModel, component_definition=stream_manifest, config={} -# ) + factory = ModelToComponentFactory() + stream_manifest = YamlDeclarativeSource._parse(content) + stream = factory.create_component( + model_type=DeclarativeStreamModel, component_definition=stream_manifest, config={} + ) -# def get_body(): -# return open(file_path, "rb", buffering=30) + def get_body(): + return open(file_path, "rb", buffering=30) -# counter = 0 -# requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users1", body=get_body()) -# requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users2", body=get_body()) -# requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users3", body=get_body()) -# requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users4", body=get_body()) + counter = 0 + requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users1", body=get_body()) + requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users2", body=get_body()) + requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users3", body=get_body()) + requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users4", body=get_body()) -# stream_slices = list(stream.stream_slices(sync_mode=SyncMode.full_refresh)) -# for stream_slice in stream_slices: -# for _ in stream.retriever.read_records(records_schema={}, stream_slice=stream_slice): -# counter += 1 + stream_slices = list(stream.stream_slices(sync_mode=SyncMode.full_refresh)) + for stream_slice in stream_slices: + for _ in stream.retriever.read_records(records_schema={}, stream_slice=stream_slice): + counter += 1 -# assert counter == lines_in_response * len(stream_slices) + assert counter == lines_in_response * len(stream_slices) From 4a1c4c91b3d6e6e2b158313e579290ecb210cbb6 Mon Sep 17 00:00:00 2001 From: pnilan Date: Fri, 14 Feb 2025 14:56:06 -0800 Subject: [PATCH 10/12] update per comments --- .../declarative/retrievers/simple_retriever.py | 2 -- .../declarative/transformations/add_fields.py | 6 +++--- .../datetime/test_min_max_datetime.py | 16 ++++++++-------- .../declarative/interpolation/test_jinja.py | 6 ++++++ .../retrievers/test_simple_retriever.py | 2 +- 5 files changed, 18 insertions(+), 14 deletions(-) diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index a5a8a71bc..a535a9b3d 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -133,7 +133,6 @@ def _get_request_options( mappings = [ paginator_method( - stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token, ), @@ -141,7 +140,6 @@ def _get_request_options( if not next_page_token or not self.ignore_stream_slicer_parameters_on_paginated_requests: mappings.append( stream_slicer_method( - stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token, ) diff --git a/airbyte_cdk/sources/declarative/transformations/add_fields.py b/airbyte_cdk/sources/declarative/transformations/add_fields.py index 909768756..59e0c2aeb 100644 --- a/airbyte_cdk/sources/declarative/transformations/add_fields.py +++ b/airbyte_cdk/sources/declarative/transformations/add_fields.py @@ -64,9 +64,9 @@ class AddFields(RecordTransformation): - path: ["shop_id"] value: "{{ config.shop_id }}" - # from state - - path: ["current_state"] - value: "{{ stream_state.cursor_field }}" # Or {{ stream_state['cursor_field'] }} + # from stream_interval + - path: ["date"] + value: "{{ stream_interval.start_date }}" # from record - path: ["unnested_value"] diff --git a/unit_tests/sources/declarative/datetime/test_min_max_datetime.py b/unit_tests/sources/declarative/datetime/test_min_max_datetime.py index 590b5dcf1..c00cdd2ac 100644 --- a/unit_tests/sources/declarative/datetime/test_min_max_datetime.py +++ b/unit_tests/sources/declarative/datetime/test_min_max_datetime.py @@ -22,13 +22,13 @@ ( "test_time_is_greater_than_min", "{{ config['older'] }}", - "{{ stream_slice['start_date'] }}", + "{{ stream_interval['start_date'] }}", "", new_date, ), ( "test_time_is_less_than_min", - "{{ stream_slice['start_date'] }}", + "{{ stream_interval['start_date'] }}", "{{ config['older'] }}", "", new_date, @@ -42,7 +42,7 @@ ), ( "test_time_is_greater_than_max", - "{{ stream_slice['start_date'] }}", + "{{ stream_interval['start_date'] }}", "", "{{ config['older'] }}", old_date, @@ -51,13 +51,13 @@ "test_time_is_less_than_max", "{{ config['older'] }}", "", - "{{ stream_slice['start_date'] }}", + "{{ stream_interval['start_date'] }}", old_date, ), ( "test_time_is_equal_to_min", - "{{ stream_slice['start_date'] }}", - "{{ stream_slice['start_date'] }}", + "{{ stream_interval['start_date'] }}", + "{{ stream_interval['start_date'] }}", "", new_date, ), @@ -65,7 +65,7 @@ "test_time_is_between_min_and_max", "{{ config['middle'] }}", "{{ config['older'] }}", - "{{ stream_slice['start_date'] }}", + "{{ stream_interval['start_date'] }}", middle_date, ), ( @@ -77,7 +77,7 @@ ), ( "test_max_newer_time_from_parameters", - "{{ stream_slice['start_date'] }}", + "{{ stream_interval['start_date'] }}", "", "{{ parameters['older'] }}", old_date, diff --git a/unit_tests/sources/declarative/interpolation/test_jinja.py b/unit_tests/sources/declarative/interpolation/test_jinja.py index a99236324..dba770f6e 100644 --- a/unit_tests/sources/declarative/interpolation/test_jinja.py +++ b/unit_tests/sources/declarative/interpolation/test_jinja.py @@ -10,6 +10,7 @@ from airbyte_cdk import StreamSlice from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation +from airbyte_cdk.utils import AirbyteTracedException interpolation = JinjaInterpolation() @@ -208,6 +209,11 @@ def test_invalid_jinja_statements(template_string): interpolation.eval(template_string, config=config) +def test_given_unsupported_jinja_expression_then_raises_airbyte_traced_exception(): + with pytest.raises(AirbyteTracedException): + interpolation.eval("{{ stream_state.get('some_field) }}", config={}) + + @pytest.mark.parametrize( "template_string", [ diff --git a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index fe03c6ad4..0b5778b7b 100644 --- a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -129,7 +129,7 @@ def test_simple_retriever_full(mock_http_stream): retriever._next_page_token(response, last_page_size, last_record, last_page_token_value) == next_page_token ) - assert retriever._request_params(None, None, None) == {} + assert retriever._request_params(None, None) == {} assert retriever.stream_slices() == stream_slices From e24e4082279679a0341eccf6397e32f2ba2c6f1b Mon Sep 17 00:00:00 2001 From: pnilan Date: Fri, 14 Feb 2025 16:29:51 -0800 Subject: [PATCH 11/12] update errors --- .../sources/declarative/declarative_component_schema.yaml | 6 ------ unit_tests/sources/declarative/interpolation/test_jinja.py | 2 +- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index a3489e82b..b4eef5f03 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -3680,12 +3680,6 @@ interpolation: - title: stream_slice description: This variable is deprecated. Use stream_interval or stream_partition instead. type: object - - title: stream_state - description: The current state of the stream. The object's keys are defined by the incremental sync's cursor_field the and partition router's values. - type: object - examples: - - created_at: "2020-01-01 00:00:00.000+00:00" - - updated_at: "2020-01-02 00:00:00.000+00:00" macros: - title: now_utc description: Returns the current date and time in the UTC timezone. diff --git a/unit_tests/sources/declarative/interpolation/test_jinja.py b/unit_tests/sources/declarative/interpolation/test_jinja.py index dba770f6e..7335e056b 100644 --- a/unit_tests/sources/declarative/interpolation/test_jinja.py +++ b/unit_tests/sources/declarative/interpolation/test_jinja.py @@ -211,7 +211,7 @@ def test_invalid_jinja_statements(template_string): def test_given_unsupported_jinja_expression_then_raises_airbyte_traced_exception(): with pytest.raises(AirbyteTracedException): - interpolation.eval("{{ stream_state.get('some_field) }}", config={}) + interpolation.eval("{{ stream_state.get('some_field') }}", config={}) @pytest.mark.parametrize( From ed506ee3fd17e3f0f9ac323c821452708c69b60e Mon Sep 17 00:00:00 2001 From: pnilan Date: Fri, 14 Feb 2025 17:05:10 -0800 Subject: [PATCH 12/12] update cdk-migration --- cdk-migrations.md | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/cdk-migrations.md b/cdk-migrations.md index 8173a5edd..dc39a8a67 100644 --- a/cdk-migrations.md +++ b/cdk-migrations.md @@ -1,10 +1,41 @@ # CDK Migration Guide +## Upgrading to 6.X.X + +Version 6.X.X of the CDK removes support for `stream_state` in the Jinja interpolation context. This change is breaking for any low-code connectors that use `stream_state` in the interpolation context. + +The following components are impacted by this change: + +- `HttpRequester` + - `request_parameters` + - `request_body_json` + - `request_body_data` + - `request_headers` +- `RecordFilter` +- `AddField` + +Where applicable, we recommend updating to use `stream_interval` instead. + +### Example + +```yaml +# Before +record_filter: + type: RecordFilter + condition: "{{ stream_state['updated_at'] }}" + +# After +record_filter: + type: RecordFilter + condition: "{{ stream_interval['start_date'] }}" +``` + ## Upgrading to 6.28.0 Starting from version 6.28.0, the CDK no longer includes Pendulum as a transitive dependency. If your connector relies on Pendulum without explicitly declaring it as a dependency, you will need to add it to your connector's dependencies going forward. More info: + - https://deptry.com/rules-violations/#transitive-dependencies-dep003 ## Upgrading to 6.0.0