From 6af53c55ae722b489291fbe8d4e2f68fdc33f49b Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Tue, 19 Dec 2023 15:52:57 -0800 Subject: [PATCH] Better ignore exception on close. --- .../integrations/base/IntegrationRunner.java | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/IntegrationRunner.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/IntegrationRunner.java index 60802878f787..6ca3b27ffc29 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/IntegrationRunner.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/IntegrationRunner.java @@ -171,7 +171,8 @@ private void runInternal(final IntegrationConfig parsed) throws Exception { if (featureFlags.concurrentSourceStreamRead()) { LOGGER.info("Concurrent source stream read enabled."); final Collection> streams = source.readStreams(config, catalog, stateOptional.orElse(null)); - readConcurrent(config, catalog, stateOptional); + final ConcurrentStreamConsumer streamConsumer = new ConcurrentStreamConsumer(this::consumeFromStream, streams.size()); + readConcurrent(streams, streamConsumer); } else { AutoCloseableIterator messageIterator = source.read(config, catalog, stateOptional.orElse(null)); readSerial(messageIterator, outputRecordCollector); @@ -246,10 +247,9 @@ private static void produceMessages(final AutoCloseableIterator messageIterator.getAirbyteStream().ifPresent(s -> LOGGER.debug("Finished producing messages for stream {}...")); } - private void readConcurrent(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final Optional stateOptional) + @VisibleForTesting + private static void readConcurrent(final Collection> streams, final ConcurrentStreamConsumer streamConsumer) throws Exception { - final Collection> streams = source.readStreams(config, catalog, stateOptional.orElse(null)); - final ConcurrentStreamConsumer streamConsumer = new ConcurrentStreamConsumer(this::consumeFromStream, streams.size()); try { /* * Break the streams into partitions equal to the number of concurrent streams supported by the @@ -272,15 +272,26 @@ private void readConcurrent(final JsonNode config, final ConfiguredAirbyteCatalo LOGGER.error("Unable to perform concurrent read.", e); throw e; } + // It is generally safe to ignore exceptions on close. + try { + streamConsumer.close(); + for (AutoCloseableIterator stream : streams) { + stream.close(); + } + } catch (final Exception e) { + LOGGER.warn("Exception closing connection: {}. This is generally fine as we've moved all data & are terminating everything. ", + e.getMessage()); + } } @VisibleForTesting static private void readSerial(final AutoCloseableIterator iter, Consumer outputRecordCollector) throws Exception { produceMessages(iter, outputRecordCollector); + // It is generally safe to ignore exceptions on close. try { iter.close(); } catch (Exception e) { - LOGGER.error("Exception closing connection: {}. This is generally fine as we've moved all data & are terminating everything. ", + LOGGER.warn("Exception closing connection: {}. This is generally fine as we've moved all data & are terminating everything. ", e.getMessage()); } }