Skip to content

Commit

Permalink
Better ignore exception on close.
Browse files Browse the repository at this point in the history
  • Loading branch information
davinchia committed Dec 19, 2023
1 parent 797d0f7 commit 6af53c5
Showing 1 changed file with 16 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ private void runInternal(final IntegrationConfig parsed) throws Exception {
if (featureFlags.concurrentSourceStreamRead()) {
LOGGER.info("Concurrent source stream read enabled.");
final Collection<AutoCloseableIterator<AirbyteMessage>> streams = source.readStreams(config, catalog, stateOptional.orElse(null));
readConcurrent(config, catalog, stateOptional);
final ConcurrentStreamConsumer streamConsumer = new ConcurrentStreamConsumer(this::consumeFromStream, streams.size());
readConcurrent(streams, streamConsumer);
} else {
AutoCloseableIterator<AirbyteMessage> messageIterator = source.read(config, catalog, stateOptional.orElse(null));
readSerial(messageIterator, outputRecordCollector);
Expand Down Expand Up @@ -246,10 +247,9 @@ private static void produceMessages(final AutoCloseableIterator<AirbyteMessage>
messageIterator.getAirbyteStream().ifPresent(s -> LOGGER.debug("Finished producing messages for stream {}..."));
}

private void readConcurrent(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final Optional<JsonNode> stateOptional)
@VisibleForTesting
private static void readConcurrent(final Collection<AutoCloseableIterator<AirbyteMessage>> streams, final ConcurrentStreamConsumer streamConsumer)
throws Exception {
final Collection<AutoCloseableIterator<AirbyteMessage>> streams = source.readStreams(config, catalog, stateOptional.orElse(null));
final ConcurrentStreamConsumer streamConsumer = new ConcurrentStreamConsumer(this::consumeFromStream, streams.size());
try {
/*
* Break the streams into partitions equal to the number of concurrent streams supported by the
Expand All @@ -272,15 +272,26 @@ private void readConcurrent(final JsonNode config, final ConfiguredAirbyteCatalo
LOGGER.error("Unable to perform concurrent read.", e);
throw e;
}
// It is generally safe to ignore exceptions on close.
try {
streamConsumer.close();
for (AutoCloseableIterator<AirbyteMessage> stream : streams) {
stream.close();
}
} catch (final Exception e) {
LOGGER.warn("Exception closing connection: {}. This is generally fine as we've moved all data & are terminating everything. ",
e.getMessage());
}
}

@VisibleForTesting
static private void readSerial(final AutoCloseableIterator<AirbyteMessage> iter, Consumer<AirbyteMessage> outputRecordCollector) throws Exception {
produceMessages(iter, outputRecordCollector);
// It is generally safe to ignore exceptions on close.
try {
iter.close();
} catch (Exception e) {
LOGGER.error("Exception closing connection: {}. This is generally fine as we've moved all data & are terminating everything. ",
LOGGER.warn("Exception closing connection: {}. This is generally fine as we've moved all data & are terminating everything. ",
e.getMessage());
}
}
Expand Down

0 comments on commit 6af53c5

Please sign in to comment.