Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Do not propagate exceptions on source close. #33609

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ThreadUtils;
Expand Down Expand Up @@ -168,11 +170,19 @@ private void runInternal(final IntegrationConfig parsed) throws Exception {
try {
if (featureFlags.concurrentSourceStreamRead()) {
LOGGER.info("Concurrent source stream read enabled.");
readConcurrent(config, catalog, stateOptional);
final Collection<AutoCloseableIterator<AirbyteMessage>> streams = source.readStreams(config, catalog, stateOptional.orElse(null));
final ConcurrentStreamConsumer streamConsumer = new ConcurrentStreamConsumer(this::consumeFromStream, streams.size());
readConcurrent(streams, streamConsumer);
} else {
readSerial(config, catalog, stateOptional);
AutoCloseableIterator<AirbyteMessage> messageIterator = source.read(config, catalog, stateOptional.orElse(null));
readSerial(messageIterator, outputRecordCollector);
Comment on lines +173 to +178
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally:

  1. standardise the method signature.
  2. make them static so they are easily tested/mocked in tests.

}
} finally {
stopOrphanedThreads(EXIT_HOOK,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

call this only once instead of twice in each method.

INTERRUPT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES,
EXIT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES);
if (source instanceof AutoCloseable) {
((AutoCloseable) source).close();
}
Expand Down Expand Up @@ -231,17 +241,16 @@ private void runInternal(final IntegrationConfig parsed) throws Exception {
LOGGER.info("Completed integration: {}", integration.getClass().getName());
}

private void produceMessages(final AutoCloseableIterator<AirbyteMessage> messageIterator, final Consumer<AirbyteMessage> recordCollector) {
private static void produceMessages(final AutoCloseableIterator<AirbyteMessage> messageIterator, final Consumer<AirbyteMessage> recordCollector) {
messageIterator.getAirbyteStream().ifPresent(s -> LOGGER.debug("Producing messages for stream {}...", s));
messageIterator.forEachRemaining(recordCollector);
messageIterator.getAirbyteStream().ifPresent(s -> LOGGER.debug("Finished producing messages for stream {}..."));
}

private void readConcurrent(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final Optional<JsonNode> stateOptional)
@VisibleForTesting
private static void readConcurrent(final Collection<AutoCloseableIterator<AirbyteMessage>> streams, final ConcurrentStreamConsumer streamConsumer)
throws Exception {
final Collection<AutoCloseableIterator<AirbyteMessage>> streams = source.readStreams(config, catalog, stateOptional.orElse(null));

try (final ConcurrentStreamConsumer streamConsumer = new ConcurrentStreamConsumer(this::consumeFromStream, streams.size())) {
try {
/*
* Break the streams into partitions equal to the number of concurrent streams supported by the
* stream consumer.
Expand All @@ -262,24 +271,28 @@ private void readConcurrent(final JsonNode config, final ConfiguredAirbyteCatalo
} catch (final Exception e) {
LOGGER.error("Unable to perform concurrent read.", e);
throw e;
} finally {
stopOrphanedThreads(EXIT_HOOK,
INTERRUPT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES,
EXIT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES);
}
// It is generally safe to ignore exceptions on close.
try {
streamConsumer.close();
for (AutoCloseableIterator<AirbyteMessage> stream : streams) {
stream.close();
}
} catch (final Exception e) {
LOGGER.warn("Exception closing connection: {}. This is generally fine as we've moved all data & are terminating everything. ",
e.getMessage());
}
}

private void readSerial(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final Optional<JsonNode> stateOptional) throws Exception {
try (final AutoCloseableIterator<AirbyteMessage> messageIterator = source.read(config, catalog, stateOptional.orElse(null))) {
produceMessages(messageIterator, outputRecordCollector);
} finally {
stopOrphanedThreads(EXIT_HOOK,
INTERRUPT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES,
EXIT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES);
@VisibleForTesting
static private void readSerial(final AutoCloseableIterator<AirbyteMessage> iter, Consumer<AirbyteMessage> outputRecordCollector) throws Exception {
produceMessages(iter, outputRecordCollector);
// It is generally safe to ignore exceptions on close.
try {
iter.close();
} catch (Exception e) {
LOGGER.warn("Exception closing connection: {}. This is generally fine as we've moved all data & are terminating everything. ",
e.getMessage());
}
}

Expand Down
Loading