Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove stream_state from interpolation contexts #325

Open
wants to merge 44 commits into
base: main
Choose a base branch
from

Conversation

devin-ai-integration[bot]
Copy link
Contributor

@devin-ai-integration devin-ai-integration bot commented Feb 7, 2025

Remove stream_state from interpolation contexts as it is not thread-safe.

Link to Devin run: https://app.devin.ai/sessions/5b6bba9b365143ef813af2e410691083
Requested by: [email protected]

Changes:

  • Remove stream_state from all interpolation contexts in schema
  • Add validation to prevent stream_state usage in interpolation
  • Restore stream_state aliasing in jinja.py for backward compatibility
  • Remove stream_interval parameters from components
  • Update tests to use stream_interval instead of stream_state

Fixes: airbytehq/airbyte-internal-issues#11591

Copy link
Contributor Author

🤖 Devin AI Engineer

Original prompt from [email protected]:

Hey @Devin, let's work on this issue: <https://github.com/airbytehq/airbyte-internal-issues/issues/11591>

Our goal is to make sure that:
• `stream_state` is no longer passed to low-code components for interpolation
• `stream_state` is removed from `inteprolation_context` in `declarative_component_schema.yaml`
• `stream_state`  is removed from spots where it's explicitly included in interpolation context: 
• There are 9 instances across 4 components where stream_state is explicitly included within the interpolation_context:
    ◦ AddedFieldDefintion[“properties”][“value”]
    ◦ HttpRequester[“properties”][“path”]
    ◦ HttpRequester[“properties”][“request_body_data”]
    ◦ HttpRequester[“properties”][“request_body_json”]
    ◦ HttpRequester[“properties”][“request_headers”]
    ◦ HttpRequester[“properties”][“request_parameters”]
    ◦ KeysReplace[“properties”][“old”]
    ◦ KeysReplace[“properties”][“new”]
    ◦ RecordFilter[“properties”][“condition”]
*Some more notes on how you can approach this:* 

Removing the stream_state from a component’s interpolation_context does not automatically prohibit its use. In order to fail if a connector interpolates stream_state, we’ll need to find and remove instances where stream_state is passed into the interpolator. For example, the RecordFilter component’s JinjaInterpolation is aware of stream_state because it’s passed to it via kwargs:


    def filter_records(
        self,
        records: Iterable[Mapping[str, Any]],
        stream_state: StreamState,
        stream_slice: Optional[StreamSlice] = None,
        next_page_token: Optional[Mapping[str, Any]] = None,
    ) -&gt; Iterable[Mapping[str, Any]]:
        kwargs = {
            "stream_state": stream_state, # Remove stream_state here
            "stream_slice": stream_slice,
            "next_page_token": next_page_token,
            "stream_slice.extra_fields": stream_slice.extra_fields if stream_slice else {},
        }
        for record in records:
            if self._filter_interpolator.eval(self.config, record=record, **kwargs):
                yield record

After removing all places where stream_state is passed to the interpolator, we will want to raise an exception with an appropriate error message informing the user of the change and include recommendations on ways to resolve (e.g. recommend using stream_interval instead).

To keep this change within the context of the interpolation, we can provide a similar check when evaluating an interpolated expression. This could be included in the eval method of the JinjaInterpolation class. When evaluating an expression, we can raise an AirbyteTracedException if stream_state is included within the input string. The primary benefit of this is the interpolation validation would live within the JinjaInterpolation class and the ConcurrentDeclarativeSource would not need to know about this. The primary drawbacks are that these evaluations are invoked only when needed. For example, if stream_state was interpolated within a record filter, an exception would only be raised after making a request, receiving a response, and then attempting to filter the records based on the conditional. The primary benefit of this approach is that restrictions to Jinja interpolation remain within the context of JinjaInterpolation.

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add "(aside)" to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions github-actions bot added bug Something isn't working security labels Feb 7, 2025
) -> None:
if config is None:
config = {}
kwargs = {"record": record, "stream_state": stream_state, "stream_slice": stream_slice}
kwargs = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove stream_state from here as well

Copy link
Contributor Author

Devin is currently unreachable - the session may have died.

Copy link
Contributor

@brianjlai brianjlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some fundamental problems with how this PR has been designed. Primarily, the issue is that Devin has either misinterpreted or been directed incorrectly to try to inject stream_interval into every part of the codebase.

The main piece of context missing is that the stream_interval object does not actually exist across all the different components at runtime because under the hood in jinja.py, we alias stream_state into stream_interval and partition. See this block

for alias, equivalent in _ALIASES.items():
if alias in context:
# This is unexpected. We could ignore or log a warning, but failing loudly should result in fewer surprises
raise ValueError(
f"Found reserved keyword {alias} in interpolation context. This is unexpected and indicative of a bug in the CDK."
)
elif equivalent in context:
context[alias] = context[equivalent]
. At worst this might bread a lot of connectors without us knowing because jinja.py won't receive the stream_state object we need to alias.

What probably needs to happen is we walk back a lot of this refactor where we're replacing stream_state with stream_interval. But the error handling with custom error messages and the YAML schema changes should be fine.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working security
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants