Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(low-code): Fix legacy state migration in SubstreamPartitionRouter #261

Merged
merged 1 commit into from
Jan 23, 2025

Conversation

tolik0
Copy link
Contributor

@tolik0 tolik0 commented Jan 23, 2025

Summary by CodeRabbit

  • Bug Fixes

    • Improved state handling in substream partition routing to prevent incorrect state assignments
    • Enhanced robustness of state filtering mechanism
  • Tests

    • Added comprehensive test cases for substream partition router state management
    • Expanded test coverage for different initial state scenarios

@github-actions github-actions bot added the bug Something isn't working label Jan 23, 2025
Copy link
Contributor

coderabbitai bot commented Jan 23, 2025

📝 Walkthrough

Walkthrough

The changes focus on improving the SubstreamPartitionRouter class in the Airbyte CDK, specifically enhancing the set_initial_state method. The modifications involve renaming the substream_state variable to substream_state_values and adding a more robust state filtering mechanism. The update ensures that only valid state formats (non-list, non-dictionary) are processed, preventing potential issues with state assignment in substream partition routing.

Changes

File Change Summary
airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py - Renamed substream_state to substream_state_values
- Added conditional check to filter state types
unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py - Added new test_set_initial_state function
- Introduced parameterized test cases for state management

Sequence Diagram

sequenceDiagram
    participant SR as SubstreamPartitionRouter
    participant IS as Initial State
    
    SR->>IS: set_initial_state()
    IS-->>SR: Validate state type
    alt Valid State
        SR->SR: Copy state to parent streams
    else Invalid State
        SR->SR: Ignore state
    end
Loading

Looks like you've added some nice defensive programming to the state handling! A couple of quick thoughts:

  1. The renaming from substream_state to substream_state_values makes the intent clearer - good move! 👍
  2. The type filtering adds a nice layer of robustness. Wdyt about adding a log warning when an invalid state type is encountered?

Would you be interested in exploring that logging addition? It could help with debugging unexpected state scenarios. Cheers! 🚀

✨ Finishing Touches
  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py (1)

299-304: LGTM! The state handling improvements look solid.

The changes make the state migration more robust by filtering out non-scalar values. The variable rename from substream_state to substream_state_values better reflects its purpose.

What do you think about adding a debug log when filtering out a state value? This could help with troubleshooting, wdyt? 🤔

 substream_state_values = list(stream_state.values())
 substream_state = substream_state_values[0] if substream_state_values else {}
 # Filter out per partition state. Because we pass the state to the parent stream in the format {cursor_field: substream_state}
 if isinstance(substream_state, (list, dict)):
+    self.logger.debug(f"Filtering out non-scalar state value: {substream_state}")
     substream_state = {}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ec7e961 and 544effb.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py (1 hunks)
  • unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (1)
unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py (1)

405-512: Great test coverage! 🎯

The test cases thoroughly validate the state migration logic with well-structured parameterized tests. The test IDs and docstring provide clear documentation of what's being tested.

@tolik0
Copy link
Contributor Author

tolik0 commented Jan 23, 2025

I found this error in CATs for Jira: The parent state was missing, causing the substream partition router to attempt migrating the substream state for the parent stream. However, it should only be migrated if it is in the legacy format. We didn’t encounter errors related to this issue previously because the parent state was updated during the sync. In contrast, with the concurrent per-partition cursor, the parent state is updated at the end of the sync, which created this corner case where the parent state is missing. With the fix, tests are successful.

Copy link
Contributor

@aldogonzalez8 aldogonzalez8 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

APPROVED

@tolik0 tolik0 merged commit 97419d0 into main Jan 23, 2025
20 of 24 checks passed
@tolik0 tolik0 deleted the tolik0/fix-legacy-parent-state-migration branch January 23, 2025 23:41
rpopov added a commit to rpopov/airbyte-python-cdk that referenced this pull request Jan 26, 2025
Created a DPath Enhancing Extractor
Refactored the record enhancement logic - moved to the extracted class
Split the tests of DPathExtractor and DPathEnhancingExtractor

Fix the failing tests:

FAILED unit_tests/sources/declarative/parsers/test_model_to_component_factory.py::test_create_custom_components[test_create_custom_component_with_subcomponent_that_uses_parameters]
FAILED unit_tests/sources/declarative/parsers/test_model_to_component_factory.py::test_custom_components_do_not_contain_extra_fields
FAILED unit_tests/sources/declarative/parsers/test_model_to_component_factory.py::test_parse_custom_component_fields_if_subcomponent
FAILED unit_tests/sources/declarative/parsers/test_model_to_component_factory.py::test_create_page_increment
FAILED unit_tests/sources/declarative/parsers/test_model_to_component_factory.py::test_create_offset_increment
FAILED unit_tests/sources/file_based/test_file_based_scenarios.py::test_file_based_read[simple_unstructured_scenario]
FAILED unit_tests/sources/file_based/test_file_based_scenarios.py::test_file_based_read[no_file_extension_unstructured_scenario]

They faile because of comparing string and int values of the page_size (public) attribute.
Imposed an invariant:
  on construction, page_size can be set to a string or int
  keep only values of one type in page_size for uniform comparison (convert the values of the other type)
  _page_size holds the internal / working value
... unless manipulated directly.

Merged:
feat(low-code concurrent): Allow async job low-code streams that are incremental to be run by the concurrent framework (airbytehq#228)
fix(low-code): Fix declarative low-code state migration in SubstreamPartitionRouter (airbytehq#267)
feat: combine slash command jobs into single job steps (airbytehq#266)
feat(low-code): add items and property mappings to dynamic schemas (airbytehq#256)
feat: add help response for unrecognized slash commands (airbytehq#264)
ci: post direct links to html connector test reports (airbytehq#252) (airbytehq#263)
fix(low-code): Fix legacy state migration in SubstreamPartitionRouter (airbytehq#261)
fix(airbyte-cdk): Fix RequestOptionsProvider for PerPartitionWithGlobalCursor (airbytehq#254)
feat(low-code): add profile assertion flow to oauth authenticator component (airbytehq#236)
feat(Low-Code Concurrent CDK): Add ConcurrentPerPartitionCursor (airbytehq#111)
fix: don't mypy unit_tests (airbytehq#241)
fix: handle backoff_strategies in CompositeErrorHandler (airbytehq#225)
feat(concurrent cursor): attempt at clamping datetime (airbytehq#234)
fix(airbyte-cdk): Fix RequestOptionsProvider for PerPartitionWithGlobalCursor (airbytehq#254)
feat(low-code): add profile assertion flow to oauth authenticator component (airbytehq#236)
feat(Low-Code Concurrent CDK): Add ConcurrentPerPartitionCursor (airbytehq#111)
fix: don't mypy unit_tests (airbytehq#241)
fix: handle backoff_strategies in CompositeErrorHandler (airbytehq#225)
feat(concurrent cursor): attempt at clamping datetime (airbytehq#234)
ci: use `ubuntu-24.04` explicitly (resolves CI warnings) (airbytehq#244)
Fix(sdm): module ref issue in python components import (airbytehq#243)
feat(source-declarative-manifest): add support for custom Python components from dynamic text input (airbytehq#174)
chore(deps): bump avro from 1.11.3 to 1.12.0 (airbytehq#133)
docs: comments on what the `Dockerfile` is for (airbytehq#240)
chore: move ruff configuration to dedicated ruff.toml file (airbytehq#237)
Fix(sdm): module ref issue in python components import (airbytehq#243)
feat(low-code): add DpathFlattenFields (airbytehq#227)
feat(source-declarative-manifest): add support for custom Python components from dynamic text input (airbytehq#174)
chore(deps): bump avro from 1.11.3 to 1.12.0 (airbytehq#133)
docs: comments on what the `Dockerfile` is for (airbytehq#240)
chore: move ruff configuration to dedicated ruff.toml file (airbytehq#237)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants