Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File-based CDK: make full refresh concurrent #34411

Merged
merged 11 commits into from
Jan 30, 2024
Merged

File-based CDK: make full refresh concurrent #34411

merged 11 commits into from
Jan 30, 2024

Conversation

clnoll
Copy link
Contributor

@clnoll clnoll commented Jan 22, 2024

Uses the thread-based concurrent CDK to add concurrency to the file-based CDK.

This is done by creating a FileBasedStreamFacade and associated file-based classes to use as partitions.

For now this just uses a FileBasedNoopCursor; once incremental is put into place we will also have a FileBasedConcurrentCursor.

Note: as this PR stands, it relies on the file-based streams creating new connections to the source when a read is being done. On the face of it this is not entirely optimal, as described in the ticket for this work, but we can control the number of StreamReaders created using the concurrency limits, and this may be sufficient. Rather than putting the optimization in with this PR I plan to do some production testing to determine whether it's needed.

Recommended reading order

  1. airbyte-cdk/python/airbyte_cdk/sources/file_based/file_based_source.py
  2. airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py

@clnoll clnoll requested a review from a team as a code owner January 22, 2024 15:13
Copy link

vercel bot commented Jan 22, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Ignored Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview Jan 30, 2024 0:16am

@clnoll clnoll marked this pull request as draft January 22, 2024 15:13
@octavia-squidington-iii octavia-squidington-iii added area/connectors Connector related issues CDK Connector Development Kit connectors/source/s3 labels Jan 22, 2024
Copy link
Contributor

Before Merging a Connector Pull Request

Wow! What a great pull request you have here! 🎉

To merge this PR, ensure the following has been done/considered for each connector added or updated:

  • PR name follows PR naming conventions
  • Breaking changes are considered. If a Breaking Change is being introduced, ensure an Airbyte engineer has created a Breaking Change Plan.
  • Connector version has been incremented in the Dockerfile and metadata.yaml according to our Semantic Versioning for Connectors guidelines
  • You've updated the connector's metadata.yaml file any other relevant changes, including a breakingChanges entry for major version bumps. See metadata.yaml docs
  • Secrets in the connector's spec are annotated with airbyte_secret
  • All documentation files are up to date. (README.md, bootstrap.md, docs.md, etc...)
  • Changelog updated in docs/integrations/<source or destination>/<name>.md with an entry for the new version. See changelog example
  • Migration guide updated in docs/integrations/<source or destination>/<name>-migrations.md with an entry for the new version, if the version is a breaking change. See migration guide example
  • If set, you've ensured the icon is present in the platform-internal repo. (Docs)

If the checklist is complete, but the CI check is failing,

  1. Check for hidden checklists in your PR description

  2. Toggle the github label checklist-action-run on/off to re-run the checklist CI.

@octavia-squidington-iv octavia-squidington-iv requested a review from a team January 22, 2024 15:16
@octavia-squidington-iii octavia-squidington-iii removed the area/connectors Connector related issues label Jan 22, 2024
@clnoll clnoll requested review from girarda and maxi297 January 23, 2024 02:08
@clnoll clnoll marked this pull request as ready for review January 23, 2024 02:08
Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My big concern is related to creating a new StreamFacade. Can we explicit why we decided to go this way?

@@ -58,6 +59,6 @@ def _select_abstract_streams(self, config: Mapping[str, Any], configured_catalog
f"The stream {configured_stream.stream.name} no longer exists in the configuration. "
f"Refresh the schema in replication settings and remove this stream from future sync attempts."
)
if isinstance(stream_instance, StreamFacade):
if isinstance(stream_instance, (StreamFacade, FileBasedStreamFacade)):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't checked much yet but it seems odd that the CCDK knows about the FCDK and vice versa. Given that, it feels like both will be tightly coupled

Copy link
Contributor Author

@clnoll clnoll Jan 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree but I'd like to address this separately (see comment below).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: I actually decided to pull the change in from a comment in the incremental sync PR (this commit).

try:
parsed_config = self._get_parsed_config(config)
self.stream_reader.config = parsed_config
streams: List[Stream] = []
streams: List[AbstractFileBasedStream] = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change the type here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We specifically require file-based streams since they have additional methods. I think this makes sense since this is a file-based source - probably should have been this way from the beginning.



@deprecated("This class is experimental. Use at your own risk.")
class FileBasedStreamFacade(Stream):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be a lot of duplicated code between this class and StreamFacade. Is there a reason why we don't instantiate a StreamFacade for the FCDK instead of creating a new one? For each item identified, can we make it part of the StreamFacade?

I see:

  • primary_key being different without being sure why the implementation that calls as_airbyte_stream would not be sufficient
  • read_records using state = str(self._cursor.state) instead of state = self._cursor.state which seems odd to me

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hear you on the duplicated code. I'm unable to use StreamFacade here because there are file-specific methods and types that are required by the callers.

I've played around a lot with this to try to clean it up - e.g. creating an interface that StreamFacade and FileBasedStreamFacade can use, and avoiding the use of StreamFacade-like pattern altogether, and avoiding use of DefaultStream, but did not come up with something that I felt worked with the existing framework.

What I'd like to do is keep this as-is, and consider a separate refactor that would help make the concurrency interface a little more flexible, but I'd prefer not to block this on it. In the mean time I'm not worried about the fact that we have a parallel file-based flow that has some duplicated code since it does still fit into the concurrency patterns that we've put in place rather than reinventing it from scratch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clnoll can you create an issue specifying exactly what problems the refactor should resolve so we don't lose track of it?

The bit I'm particularly surprised about is that the file-based classes need to know about the stream facade. Maybe FileBasedStreamFacade should implement a AbstractFileBasedStream? This way, the mess is kept to the facade and the file-based framework doesn't know anything about it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: I was at least able to git rid of the need for DefaultFileBasedStream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@girarda I wrote that last comment before I saw yours.

Good call re having FileBasedStreamFacade implement AbstractFileBasedStream, made that update and it cleaned up some of the issues with types.

Copy link
Contributor

@girarda girarda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good! main question is whether we can avoid leaking the facade into the file_based module.



@deprecated("This class is experimental. Use at your own risk.")
class FileBasedStreamFacade(Stream):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clnoll can you create an issue specifying exactly what problems the refactor should resolve so we don't lose track of it?

The bit I'm particularly surprised about is that the file-based classes need to know about the stream facade. Maybe FileBasedStreamFacade should implement a AbstractFileBasedStream? This way, the mess is kept to the facade and the file-based framework doesn't know anything about it

state = str(self._cursor.state)
else:
# This shouldn't happen if the ConcurrentCursor was used
state = "unknown; no state attribute was available on the cursor"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this fail loudly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was copied over from the non-file-based adapters but is no longer needed now that we're inheriting from AbstractFileBasedStream.

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't played with/manually tested the solution but from reading the code, I don't see any issue. 🚢

return f"FileBasedStreamPartition({self._stream.name}, {self._slice})"


class FileBasedStreamPartitionGenerator(PartitionGenerator):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have unit tests for the classes in this file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They do have coverage via scenario-based tests, but unit tests are a good idea too. Added.

def generate(self) -> Iterable[FileBasedStreamPartition]:
pending_partitions = []
for _slice in self._stream.stream_slices(sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state):
if _slice is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which case can this occur? What does it mean? It feels odd that the stream is creating a slice but we won't read records for that slice

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The signature for stream_slices is Iterable[Optional[Mapping[str, Any]]] so we have to handle that case. I don't expect it to ever be None for file-based.

Copy link
Contributor

@girarda girarda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:shipit: !

@clnoll clnoll merged commit eb31e4d into master Jan 30, 2024
22 checks passed
@clnoll clnoll deleted the fcdk-with-ccdk branch January 30, 2024 00:33
clnoll added a commit that referenced this pull request Jan 30, 2024
Copy link

sentry-io bot commented Jan 30, 2024

Suspect Issues

This pull request was deployed and Sentry observed the following issues:

  • ‼️ airbyte_cdk.sources.file_based.exceptions.SchemaInferenceError: Error inferring schema from files. Are the files valid? Contact Support if you need assistance. /usr/local/lib/python3.9/site-packages/airbyte_... View Issue
  • ‼️ RuntimeError: No sync mode was found for bounce. /usr/local/lib/python3.9/site-packages/airbyte_... View Issue

Did you find this useful? React with a 👍 or 👎

jatinyadav-cc pushed a commit to ollionorg/datapipes-airbyte that referenced this pull request Feb 21, 2024
jatinyadav-cc pushed a commit to ollionorg/datapipes-airbyte that referenced this pull request Feb 26, 2024
jatinyadav-cc pushed a commit to ollionorg/datapipes-airbyte that referenced this pull request Feb 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CDK Connector Development Kit connectors/source/s3
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants