Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Do not propagate exceptions on source close. #33609

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

davinchia
Copy link
Contributor

@davinchia davinchia commented Dec 18, 2023

What

Based on #33211 , we see certain networking set ups can throw exceptions as we close up. This gives the false perception the sync is failed even though we've moved all data.

Further, we cannot actually action on these exceptions since:

  • we are going to close down anyway.
  • there is minimal impact all round since we run on containerised infrastructure and an exception closing the db connection likely means the connection was already closed on the DB side (at least the cases we've observed in practice).

How

Describe the solution

Recommended reading order

  1. x.java
  2. y.python

🚨 User Impact 🚨

Are there any breaking changes? What is the end result perceived by the user?

For connector PRs, use this section to explain which type of semantic versioning bump occurs as a result of the changes. Refer to our Semantic Versioning for Connectors guidelines for more information. Breaking changes to connectors must be documented by an Airbyte engineer (PR author, or reviewer for community PRs) by using the Breaking Change Release Playbook.

If there are breaking changes, please merge this PR with the 🚨🚨 emoji so changelog authors can further highlight this if needed.

Pre-merge Actions

Expand the relevant checklist and delete the others.

New Connector

Community member or Airbyter

  • Community member? Grant edit access to maintainers (instructions)
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Connector version is set to 0.0.1
    • Dockerfile has version 0.0.1
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • docs/integrations/<source or destination>/<name>.md including changelog with an entry for the initial version. See changelog example
    • docs/integrations/README.md

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • If new credentials are required for use in CI, add them to GSM. Instructions.
Updating a connector

Community member or Airbyter

  • Grant edit access to maintainers (instructions)
  • Unit & integration tests added

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • If new credentials are required for use in CI, add them to GSM. Instructions.
Connector Generator
  • Issue acceptance criteria met
  • PR name follows PR naming conventions
  • If adding a new generator, add it to the list of scaffold modules being tested
  • The generator test modules (all connectors with -scaffold in their name) have been updated with the latest scaffold by running ./gradlew :airbyte-integrations:connector-templates:generator:generateScaffolds then checking in your changes
  • Documentation which references the generator is updated as needed
Updating the Python CDK

Airbyter

Before merging:

  • Pull Request description explains what problem it is solving
  • Code change is unit tested
  • Build and my-py check pass
  • Smoke test the change on at least one affected connector
    • On Github: Run this workflow, passing --use-local-cdk --name=source-<connector> as options
    • Locally: airbyte-ci connectors --use-local-cdk --name=source-<connector> test
  • PR is reviewed and approved

After merging:

  • Publish the CDK
    • The CDK does not follow proper semantic versioning. Choose minor if this the change has significant user impact or is a breaking change. Choose patch otherwise.
    • Write a thoughtful changelog message so we know what was updated.
  • Merge the platform PR that was auto-created for updating the Connector Builder's CDK version
    • This step is optional if the change does not affect the connector builder or declarative connectors.

Copy link

vercel bot commented Dec 18, 2023

The latest updates on your projects. Learn more about Vercel for Git ↗︎

Name Status Preview Comments Updated (UTC)
airbyte-docs ✅ Ready (Inspect) Visit Preview 💬 Add feedback Dec 19, 2023 11:57pm

@octavia-squidington-iii octavia-squidington-iii added the CDK Connector Development Kit label Dec 18, 2023
default void close() throws Exception {
try {
closeSafely();
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there other stuff that would want to fail (doing clean up of some sort) in close where catching this doesn't make sense?

}

@VisibleForTesting
static void swallowIteratorCloseErrors(AutoCloseableIterator<AirbyteMessage> iter, Consumer<AirbyteMessage> outputRecordCollector) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this called anywhere?

@davinchia
Copy link
Contributor Author

@akashkulk PTAL! I removed all my WIP changes, so this should be much easier to review. Sorry and thanks!

}
} finally {
stopOrphanedThreads(EXIT_HOOK,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

call this only once instead of twice in each method.

@davinchia
Copy link
Contributor Author

@akashkulk @stephane-airbyte @rodireich PTAL - I think this is a pretty good middle ground to implementing the exception ignoring at the integration runner level while making things testable.

If this makes sense, I will add more tests!

Comment on lines +173 to +178
final Collection<AutoCloseableIterator<AirbyteMessage>> streams = source.readStreams(config, catalog, stateOptional.orElse(null));
final ConcurrentStreamConsumer streamConsumer = new ConcurrentStreamConsumer(this::consumeFromStream, streams.size());
readConcurrent(streams, streamConsumer);
} else {
readSerial(config, catalog, stateOptional);
AutoCloseableIterator<AirbyteMessage> messageIterator = source.read(config, catalog, stateOptional.orElse(null));
readSerial(messageIterator, outputRecordCollector);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally:

  1. standardise the method signature.
  2. make them static so they are easily tested/mocked in tests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CDK Connector Development Kit
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants