Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: (CDK) (Connector Builder) - refactor the MessageGrouper > TestRead #332

Open
wants to merge 10 commits into
base: main
Choose a base branch
from

Conversation

bazarnov
Copy link
Contributor

@bazarnov bazarnov commented Feb 12, 2025

What

Resolving:

How

The pull request refactors the airbyte_cdk/connector_builder/message_grouper.py module by replacing the MessageGrouper class with the new TestReader class. Key changes include:

  • Imports and Class Replacements:

    • MessageGrouper is replaced by TestReader in connector_builder_handler.py.
    • The airbyte_cdk/connector_builder/test_reader dir holds the TestRead and related logic implementation:
      • the reader holds the logic of how we actually make a test read.
      • the message_grouper holds the logic of how we group the emitted source messages.
      • the helpers holds a bunch of functions to re-use in reader and message_grouper
      • the types holds the specific type-collection used to mark the output from certain methods used in reader and message_grouper
    • Imports are updated to use the dataclass and field from the dataclasses.
  • Unit Tests Updates:

    • Test files (test_connector_builder_handler.py and test_message_grouper.py) are updated to reflect the changes from MessageGrouper to TestReader.
    • No new test cases are added
    • No existing test cases are edited/changed - to make sure we don't break the previous implementations
  • New Files Added:

    • Several new files related to test_reader, including: __init__.py, helpers.py, message_grouper.py, reader.py, and types.py.
  • Test Markers:

  • Added @pytest.mark.slow to some test cases to indicate they may take longer to run, when running locally.

These changes aim to improve the code's readability and maintainability by introducing the TestReader class, which has enhanced functionality and updated testing procedures.

User Impact

No impact is expected; the logic and behavior remain the same.

Local Testing using this guide:

Passed:

  • [ ]

Summary by CodeRabbit

  • New Features

    • Introduced enhanced test read capabilities for streamlined validation of connector configurations and partial data extraction.
    • Added a new class TestReader for performing test reads and validating configurations.
  • Refactor

    • Simplified import structures and updated data grouping mechanisms for clearer organization and improved clarity.
  • Tests

    • Updated testing infrastructure to align with the new test-read approach.
    • Added large-scale data fixtures to assess performance and memory usage.
    • Introduced a new pytest marker for slow tests to manage long-running test cases.

@bazarnov bazarnov self-assigned this Feb 12, 2025
@github-actions github-actions bot added bug Something isn't working security labels Feb 12, 2025
@bazarnov bazarnov marked this pull request as ready for review February 12, 2025 20:03
Copy link
Contributor

coderabbitai bot commented Feb 12, 2025

📝 Walkthrough

Walkthrough

This pull request refactors the code for test reading and message grouping. The main changes include updating dataclass import statements and replacing the MessageGrouper class with a new TestReader class. Additionally, several utility functions and type aliases are added to support structured message processing. Corresponding test files have been updated to mock and call the new run_test_read method. New slow test fixtures have also been introduced in decoder tests.

Changes

File(s) Change Summary
airbyte_cdk/connector_builder/connector_builder_handler.py Updated import statements to directly import dataclass, field, and asdict; modified the TestReadLimits class to use the new import style; replaced instantiation of MessageGrouper with TestReader and its method run_test_read.
airbyte_cdk/connector_builder/message_grouper.py Removed the MessageGrouper class and its associated methods.
airbyte_cdk/connector_builder/test_reader/... Added new modules:
- __init__.py: Exports TestReader via __all__.
- helpers.py: Introduced several utility functions for processing log messages.
- message_grouper.py: Added new function get_message_groups (message grouping logic now yielding processed message groups).
- reader.py: Implemented the TestReader class with methods such as run_test_read and various private helper methods.
- types.py: Defined new type aliases for message groups and schema outputs.
unit_tests/connector_builder/... Updated test files (test_connector_builder_handler.py and test_message_grouper.py) to replace mocks of MessageGrouper.get_message_groups with the new TestReader.run_test_read, and adjusted import paths accordingly.
unit_tests/sources/declarative/decoders/... Added a new slow-test fixture (large_event_response_fixture) in test_json_decoder.py and applied @pytest.mark.slow in both decoder tests to handle large JSONL responses.

Sequence Diagram(s)

sequenceDiagram
  participant TS as Test Suite
  participant TR as TestReader
  participant DS as DeclarativeSource
  participant UH as Utility Helpers

  TS->>TR: run_test_read(source, config, catalog, state, limit)
  TR->>DS: _read_stream(...) 
  DS-->>TR: Iterator of AirbyteMessages
  TR->>UH: Invoke helper functions (_categorise_groups, schema inference, etc.)
  UH-->>TR: Processed message groups
  TR-->>TS: Returns StreamRead object with slices, log messages, and auxiliary requests
Loading

Possibly related PRs

  • chore(refactor): refactor partition generator to take any stream slicer #39: The changes in the main PR, which involve modifications to the TestReader class and the removal of the MessageGrouper class, are related to the changes in the retrieved PR, as both involve updates to how message grouping and reading are handled, specifically through the introduction of the run_test_read method in place of get_message_groups.
  • fix: fix sorting & __init__ imports #189: The changes in the main PR, which involve modifications to the TestReadLimits class and the introduction of the TestReader class, are related to the changes in the retrieved PR, as both involve updates to the handling of message processing and imports, specifically focusing on the transition from the MessageGrouper to the TestReader for reading streams.
  • feat: add download_decoder + download_extractor #50: The changes in the main PR, which involve modifications to the TestReader class and the replacement of the MessageGrouper class, are related to the changes in the retrieved PR, as both involve updates to message processing and handling within the Airbyte framework, specifically focusing on how messages are grouped and read.

Suggested labels

chore

Suggested reviewers

  • aaronsteers, wdyt?
  • maxi297, wdyt?
  • pnilan, wdyt?
✨ Finishing Touches
  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🔭 Outside diff range comments (2)
airbyte_cdk/connector_builder/test_reader/helpers.py (1)

556-582: 🛠️ Refactor suggestion

Potential optimization for frequent record fields.

_handle_record_message accumulates both schema and datetime formats on every record. If a stream is large, we might slow performance. Any interest in offering a caching approach or a sampling strategy (e.g., only inferring every N records)? Wdyt?

unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py (1)

19-31: 🛠️ Refactor suggestion

Consider consolidating duplicate fixtures, wdyt?

This fixture is identical to the one in test_json_decoder.py. Consider moving it to a shared test utilities module to avoid duplication:

# test_utils.py
@pytest.fixture(name="large_events_response")
def large_event_response_fixture():
    # ... shared implementation ...

Then import and use it in both test files.

🧹 Nitpick comments (9)
airbyte_cdk/connector_builder/test_reader/message_grouper.py (2)

40-82: Docstring clarity check.

This docstring is very thorough, but would you like to clarify in the docstring how partial slices/pages are handled if the loop ends early (e.g., due to record limit)? It might help future readers. Wdyt?


118-124: Consolidate duplicate slice parsing logic?

We call _parse_slice_description(message.log.message) at lines 119 and 124. Would you consider extracting that repeated call into a small helper to avoid duplication? Wdyt?

-            yield _processed_slice()
-            current_slice_descriptor = _parse_slice_description(message.log.message)  # type: ignore
+            yield _processed_slice()
+            current_slice_descriptor = _parse_current_slice_descriptor(message)
airbyte_cdk/connector_builder/test_reader/reader.py (4)

71-80: Constructor param naming.

The constructor uses _max_pages_per_slice, _max_slices, _max_record_limit. Any interest in clarifying them as well in the docstring or inline comment? For example, clarifying if they apply to all streams, or only a single stream read. Wdyt?


301-336: Typo in function name.

The function _get_infered_schema has a small spelling mismatch ("infered" vs. "inferred"). Would you consider renaming it to _get_inferred_schema for clarity? Wdyt?

-def _get_infered_schema(
+def _get_inferred_schema(

239-300: Add test coverage for unknown message group types.

In _categorise_groups, an unknown message group type raises a ValueError. Would you consider adding a unit test to confirm that scenario is handled gracefully and the error is raised as expected? Wdyt?


410-442: Validate multiple limits more explicitly.

_has_reached_limit checks slices, pages, and record counts. Is it worth adding a short docstring note or logging line to clarify which limit was triggered if the function returns True? For large codebases, that might simplify debugging. Wdyt?

airbyte_cdk/connector_builder/test_reader/helpers.py (2)

174-200: Consider merging _close_page and _close_current_page.

Currently, _close_page and _close_current_page compose similar operations. Would you like to merge them into a single function that closes a page and returns reset objects, avoiding duplication? Wdyt?


474-521: Refine approach for unknown log messages.

In _handle_log_message, only HTTP-based logs are processed in detail, while everything else is appended as a raw log. Would you consider capturing more metadata for non-HTTP logs (e.g., partial parsing) to aid debugging? Wdyt?

unit_tests/sources/declarative/decoders/test_json_decoder.py (1)

51-63: Consider extracting magic numbers into constants, wdyt?

The fixture uses magic numbers for the file size. Consider extracting these into named constants for better maintainability:

+LINES_IN_RESPONSE = 2_000_000  # ≈ 58 MB of response
+TEST_EMAIL = "[email protected]"

 @pytest.mark.slow
 @pytest.fixture(name="large_events_response")
 def large_event_response_fixture():
-    data = {"email": "[email protected]"}
-    jsonl_string = f"{json.dumps(data)}\n"
-    lines_in_response = 2_000_000  # ≈ 58 MB of response
+    data = {"email": TEST_EMAIL}
+    jsonl_string = f"{json.dumps(data)}\n"
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fdcded3 and 2dce6ff.

📒 Files selected for processing (11)
  • airbyte_cdk/connector_builder/connector_builder_handler.py (3 hunks)
  • airbyte_cdk/connector_builder/message_grouper.py (0 hunks)
  • airbyte_cdk/connector_builder/test_reader/__init__.py (1 hunks)
  • airbyte_cdk/connector_builder/test_reader/helpers.py (1 hunks)
  • airbyte_cdk/connector_builder/test_reader/message_grouper.py (1 hunks)
  • airbyte_cdk/connector_builder/test_reader/reader.py (1 hunks)
  • airbyte_cdk/connector_builder/test_reader/types.py (1 hunks)
  • unit_tests/connector_builder/test_connector_builder_handler.py (2 hunks)
  • unit_tests/connector_builder/test_message_grouper.py (32 hunks)
  • unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py (1 hunks)
  • unit_tests/sources/declarative/decoders/test_json_decoder.py (1 hunks)
💤 Files with no reviewable changes (1)
  • airbyte_cdk/connector_builder/message_grouper.py
✅ Files skipped from review due to trivial changes (1)
  • airbyte_cdk/connector_builder/test_reader/init.py
🔇 Additional comments (8)
airbyte_cdk/connector_builder/test_reader/message_grouper.py (2)

106-116: Consider verifying the page's partial content.

Inside the while records_count < limit loop, we close the page unconditionally once _need_to_close_page is true. Would you like to add an extra check to handle a scenario where the page might have zero records? Or is that an intentional design choice? Wdyt?


157-166: Ensure final slice is yielded if no records are processed.

In the else block, we only finalize the page and slice if current_page_request or current_page_response or current_page_records is truthy. Could a slice with zero records and no HTTP logs be missed? Would you like to confirm that scenario? Wdyt?

airbyte_cdk/connector_builder/test_reader/helpers.py (1)

263-283: Graceful error handling for slice JSON parse.

_parse_slice_description could potentially fail if the log message is malformed or missing the prefix. Do you want to catch JSON errors here to yield a more explanatory log message or fallback? Wdyt?

airbyte_cdk/connector_builder/test_reader/types.py (1)

1-76: Great job on the type definitions and documentation!

The type aliases are well-organized, comprehensive, and thoroughly documented. The docstrings provide clear explanations of each type's purpose and structure.

airbyte_cdk/connector_builder/connector_builder_handler.py (1)

6-9: Clean refactoring of imports and TestReader integration!

The changes improve code organization through better import structure and the transition to TestReader is clean and well-implemented.

Also applies to: 77-86

unit_tests/connector_builder/test_message_grouper.py (2)

19-20: LGTM! The imports look good.

The imports are correctly updated to use the new TestReader class and its helper functions.


148-148: LGTM! The test cases are correctly updated.

The test cases are properly updated to use TestReader instead of MessageGrouper, and the method calls are changed from get_message_groups to run_test_read. The test coverage is maintained.

Also applies to: 206-207, 290-292, 343-343, 446-449, 504-506, 617-619, 661-663, 689-691, 708-709, 733-734, 763-764, 791-792, 808-809, 822-823, 848-850, 877-879

unit_tests/connector_builder/test_connector_builder_handler.py (1)

553-553: LGTM! The mock patch paths are correctly updated.

The mock patch paths are properly updated to use TestReader.run_test_read instead of MessageGrouper.get_message_groups. The test coverage is maintained.

Also applies to: 1172-1172

@github-actions github-actions bot added bug Something isn't working security labels Feb 13, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (8)
airbyte_cdk/connector_builder/test_reader/helpers.py (5)

65-83: Consider removing nested keys with __ prefixes as well?

Currently, clean_config only deletes top-level keys that start with __. Would you like to remove nested keys with __ prefixes too, ensuring thorough cleanup of internal or meta-data fields? wdyt?


86-128: Default to an empty dictionary for headers?

create_request_from_log_message sets headers to whatever request.get("headers") provides. For safety, especially if headers can be absent, would you consider defaulting it to an empty dict to avoid NoneType issues? wdyt?


151-173: Log malformed JSON errors for visibility?

parse_json silently returns None on JSONDecodeError. Would you consider logging a debug/info message to indicate that JSON parsing failed (for easier troubleshooting)? wdyt?


198-228: Fix docstring reference to _is_page_http_request?

The docstring in should_close_page references _is_page_http_request, but the actual function is is_page_http_request. Want to update it for clarity? wdyt?


434-464: Return a more descriptive structure than (None, None)?

handle_current_page returns (None, None) to clear references. Would you consider returning a small named container (e.g., a tuple with fields) or an explicit indicator to improve readability? wdyt?

airbyte_cdk/connector_builder/test_reader/message_grouper.py (1)

133-146: Ensure final slice is yielded on manual break.

In the else: block after the while, we only finalize pages if the loop finishes naturally. If we decide to break early in the future, we might skip yielding. Is that acceptable or do we need a fallback ensuring the final slice is yielded? wdyt?

unit_tests/connector_builder/test_message_grouper.py (2)

206-207: Refactored usage of TestReader in test functions.

All these line changes consistently replace MessageGrouper usage with TestReader calls. This aligns with the new code structure.
Would you like to rename connector_builder_handler to test_reader_handler in tests for clarity? wdyt?

Also applies to: 290-292, 314-314, 343-344, 422-423, 458-459, 504-507


647-671: General test flow and structure.

The series of tests confirm boundary conditions (maximum slices, pages, no slices, error streams, multiple control messages, etc.). Everything appears logically consistent.
Would you like to unify repeated test scaffolding (e.g., source set up) with a fixture or helper? wdyt?

Also applies to: 673-699, 701-719, 722-772, 774-800, 802-817, 819-888

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2dce6ff and 9da64f1.

📒 Files selected for processing (4)
  • airbyte_cdk/connector_builder/test_reader/helpers.py (1 hunks)
  • airbyte_cdk/connector_builder/test_reader/message_grouper.py (1 hunks)
  • airbyte_cdk/connector_builder/test_reader/reader.py (1 hunks)
  • unit_tests/connector_builder/test_message_grouper.py (32 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • airbyte_cdk/connector_builder/test_reader/reader.py
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-the-guardian-api' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Analyze (python)
🔇 Additional comments (4)
airbyte_cdk/connector_builder/test_reader/message_grouper.py (1)

78-78: Confirm processing logic for limit == 0.

The loop condition is while records_count < limit and (message := next(messages, None)):. If limit is 0, the loop never runs. Is that intended behavior, or should we handle that differently? wdyt?

unit_tests/connector_builder/test_message_grouper.py (3)

19-20: Imports look correct.

The new import references TestReader from airbyte_cdk.connector_builder.test_reader. This aligns with the recent refactor. All good here!


148-219: Use @patch consistently for the test?

The test at lines 148-219 correctly patches AirbyteEntrypoint.read. All logic appears valid. Perhaps confirm if we need additional patches or mocks for external network calls? wdyt?


592-645: Spot check large test coverage.

The test from lines 592-645 adds multiple slices verifying correct grouping. The scenario thoroughly tests partial slices, multiple slices, and the final state message. Looks solid!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant