Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 follow up to #35471: update the cartesian stream slicer #35865

Merged
merged 4 commits into from
Mar 7, 2024

Conversation

girarda
Copy link
Contributor

@girarda girarda commented Mar 6, 2024

What

This caused connectors to fail because dicts are returned instead of StreamSlices

{"type": "TRACE", "trace": {"type": "ERROR", "emitted_at": 1709764005297.951, "error": {"message": "Something went wrong in the connector. See the logs for more details.", "internal_message": "'dict' object has no attribute 'partition'", "stack_trace": "Traceback (most recent call last):\n  File \"/Users/alex/code/airbyte/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py\", line 129, in read\n    yield from self._read_stream(\n  File \"/Users/alex/code/airbyte/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py\", line 229, in _read_stream\n    for record_data_or_message in record_iterator:\n  File \"/Users/alex/code/airbyte/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py\", line 135, in read\n    for _slice in slices:\n  File \"/Users/alex/code/airbyte/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py\", line 74, in stream_slices\n    cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition))\nAttributeError: 'dict' object has no attribute 'partition'\n", "failure_type": "system_error", "stream_descriptor": {"name": "billable_rates"}}}}

How

  • Create a StreamSlice by ChainMapping the partitions and the cursor_slice separately

Testing

I tested this bugfix with Patrick's harvest low-code connector and confirmed we were able to pull the records without failing

{"type": "LOG", "log": {"level": "INFO", "message": "Read 4 records from billable_rates stream"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Marking stream billable_rates as STOPPED"}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "emitted_at": 1709763953537.6611, "stream_status": {"stream_descriptor": {"name": "billable_rates", "namespace": null}, "status": "COMPLETE"}}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing billable_rates"}}
{"type": "LOG", "log": {"level": "INFO", "message": "SourceHarvest runtimes:\nSyncing stream billable_rates 0:00:08.429028"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing SourceHarvest"}

Recommended reading order

  1. airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py
  2. airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py

Copy link

vercel bot commented Mar 6, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Ignored Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview Mar 7, 2024 3:59pm

@octavia-squidington-iii octavia-squidington-iii added the CDK Connector Development Kit label Mar 6, 2024
@girarda girarda marked this pull request as ready for review March 6, 2024 22:27
@girarda girarda requested a review from a team as a code owner March 6, 2024 22:27
@girarda girarda requested review from maxi297, pnilan and brianjlai March 6, 2024 22:27
Copy link
Contributor

@pnilan pnilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for tackling this so quickly.

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also have this check if the list has only one element?

product = itertools.product(*sub_slices)
for stream_slice_tuple in product:
partition = dict(ChainMap(*[s.partition for s in stream_slice_tuple]))
cursor_slice = dict(ChainMap(*[s.cursor_slice for s in stream_slice_tuple]))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels weird that we would have multiple cursor here but I guess it has always been like that. I'm just wondering if we should fail loudly because we don't understand this business case or just let it happen and have an undetermined behavior

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed. fixed

@girarda
Copy link
Contributor Author

girarda commented Mar 7, 2024

Should we also have this check if the list has only one element?

I'd prefer avoiding a breaking change as part of a bug fix, but happy to revisit

@girarda girarda merged commit 4a808ee into master Mar 7, 2024
28 checks passed
@girarda girarda deleted the alex/substream_bugfix branch March 7, 2024 16:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CDK Connector Development Kit
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants