-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🐛 Do not propagate exceptions on source close. #33609
base: master
Are you sure you want to change the base?
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
default void close() throws Exception { | ||
try { | ||
closeSafely(); | ||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there other stuff that would want to fail (doing clean up of some sort) in close
where catching this doesn't make sense?
} | ||
|
||
@VisibleForTesting | ||
static void swallowIteratorCloseErrors(AutoCloseableIterator<AirbyteMessage> iter, Consumer<AirbyteMessage> outputRecordCollector) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this called anywhere?
.../java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/IntegrationRunner.java
Outdated
Show resolved
Hide resolved
.../java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/IntegrationRunner.java
Outdated
Show resolved
Hide resolved
@akashkulk PTAL! I removed all my WIP changes, so this should be much easier to review. Sorry and thanks! |
} | ||
} finally { | ||
stopOrphanedThreads(EXIT_HOOK, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
call this only once instead of twice in each method.
@akashkulk @stephane-airbyte @rodireich PTAL - I think this is a pretty good middle ground to implementing the exception ignoring at the integration runner level while making things testable. If this makes sense, I will add more tests! |
final Collection<AutoCloseableIterator<AirbyteMessage>> streams = source.readStreams(config, catalog, stateOptional.orElse(null)); | ||
final ConcurrentStreamConsumer streamConsumer = new ConcurrentStreamConsumer(this::consumeFromStream, streams.size()); | ||
readConcurrent(streams, streamConsumer); | ||
} else { | ||
readSerial(config, catalog, stateOptional); | ||
AutoCloseableIterator<AirbyteMessage> messageIterator = source.read(config, catalog, stateOptional.orElse(null)); | ||
readSerial(messageIterator, outputRecordCollector); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generally:
- standardise the method signature.
- make them static so they are easily tested/mocked in tests.
What
Based on #33211 , we see certain networking set ups can throw exceptions as we close up. This gives the false perception the sync is failed even though we've moved all data.
Further, we cannot actually action on these exceptions since:
How
Describe the solution
Recommended reading order
x.java
y.python
🚨 User Impact 🚨
Are there any breaking changes? What is the end result perceived by the user?
For connector PRs, use this section to explain which type of semantic versioning bump occurs as a result of the changes. Refer to our Semantic Versioning for Connectors guidelines for more information. Breaking changes to connectors must be documented by an Airbyte engineer (PR author, or reviewer for community PRs) by using the Breaking Change Release Playbook.
If there are breaking changes, please merge this PR with the 🚨🚨 emoji so changelog authors can further highlight this if needed.
Pre-merge Actions
Expand the relevant checklist and delete the others.
New Connector
Community member or Airbyter
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.0.0.1
Dockerfile
has version0.0.1
README.md
bootstrap.md
. See description and examplesdocs/integrations/<source or destination>/<name>.md
including changelog with an entry for the initial version. See changelog exampledocs/integrations/README.md
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
Updating a connector
Community member or Airbyter
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
Connector Generator
-scaffold
in their name) have been updated with the latest scaffold by running./gradlew :airbyte-integrations:connector-templates:generator:generateScaffolds
then checking in your changesUpdating the Python CDK
Airbyter
Before merging:
--use-local-cdk --name=source-<connector>
as optionsairbyte-ci connectors --use-local-cdk --name=source-<connector> test
After merging: