diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/IntegrationRunner.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/IntegrationRunner.java index 5887466c126d..bd558c1fbe74 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/IntegrationRunner.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/IntegrationRunner.java @@ -32,10 +32,12 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.commons.lang3.ThreadUtils; @@ -168,11 +170,19 @@ private void runInternal(final IntegrationConfig parsed) throws Exception { try { if (featureFlags.concurrentSourceStreamRead()) { LOGGER.info("Concurrent source stream read enabled."); - readConcurrent(config, catalog, stateOptional); + final Collection> streams = source.readStreams(config, catalog, stateOptional.orElse(null)); + final ConcurrentStreamConsumer streamConsumer = new ConcurrentStreamConsumer(this::consumeFromStream, streams.size()); + readConcurrent(streams, streamConsumer); } else { - readSerial(config, catalog, stateOptional); + AutoCloseableIterator messageIterator = source.read(config, catalog, stateOptional.orElse(null)); + readSerial(messageIterator, outputRecordCollector); } } finally { + stopOrphanedThreads(EXIT_HOOK, + INTERRUPT_THREAD_DELAY_MINUTES, + TimeUnit.MINUTES, + EXIT_THREAD_DELAY_MINUTES, + TimeUnit.MINUTES); if (source instanceof AutoCloseable) { ((AutoCloseable) source).close(); } @@ -231,17 +241,16 @@ private void runInternal(final IntegrationConfig parsed) throws Exception { LOGGER.info("Completed integration: {}", integration.getClass().getName()); } - private void produceMessages(final AutoCloseableIterator messageIterator, final Consumer recordCollector) { + private static void produceMessages(final AutoCloseableIterator messageIterator, final Consumer recordCollector) { messageIterator.getAirbyteStream().ifPresent(s -> LOGGER.debug("Producing messages for stream {}...", s)); messageIterator.forEachRemaining(recordCollector); messageIterator.getAirbyteStream().ifPresent(s -> LOGGER.debug("Finished producing messages for stream {}...")); } - private void readConcurrent(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final Optional stateOptional) + @VisibleForTesting + private static void readConcurrent(final Collection> streams, final ConcurrentStreamConsumer streamConsumer) throws Exception { - final Collection> streams = source.readStreams(config, catalog, stateOptional.orElse(null)); - - try (final ConcurrentStreamConsumer streamConsumer = new ConcurrentStreamConsumer(this::consumeFromStream, streams.size())) { + try { /* * Break the streams into partitions equal to the number of concurrent streams supported by the * stream consumer. @@ -262,24 +271,28 @@ private void readConcurrent(final JsonNode config, final ConfiguredAirbyteCatalo } catch (final Exception e) { LOGGER.error("Unable to perform concurrent read.", e); throw e; - } finally { - stopOrphanedThreads(EXIT_HOOK, - INTERRUPT_THREAD_DELAY_MINUTES, - TimeUnit.MINUTES, - EXIT_THREAD_DELAY_MINUTES, - TimeUnit.MINUTES); + } + // It is generally safe to ignore exceptions on close. + try { + streamConsumer.close(); + for (AutoCloseableIterator stream : streams) { + stream.close(); + } + } catch (final Exception e) { + LOGGER.warn("Exception closing connection: {}. This is generally fine as we've moved all data & are terminating everything. ", + e.getMessage()); } } - private void readSerial(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final Optional stateOptional) throws Exception { - try (final AutoCloseableIterator messageIterator = source.read(config, catalog, stateOptional.orElse(null))) { - produceMessages(messageIterator, outputRecordCollector); - } finally { - stopOrphanedThreads(EXIT_HOOK, - INTERRUPT_THREAD_DELAY_MINUTES, - TimeUnit.MINUTES, - EXIT_THREAD_DELAY_MINUTES, - TimeUnit.MINUTES); + @VisibleForTesting + static private void readSerial(final AutoCloseableIterator iter, Consumer outputRecordCollector) throws Exception { + produceMessages(iter, outputRecordCollector); + // It is generally safe to ignore exceptions on close. + try { + iter.close(); + } catch (Exception e) { + LOGGER.warn("Exception closing connection: {}. This is generally fine as we've moved all data & are terminating everything. ", + e.getMessage()); } }