Skip to content

Commit

Permalink
Checkpoint: implement for readSerial.
Browse files Browse the repository at this point in the history
  • Loading branch information
davinchia committed Dec 19, 2023
1 parent 3e4aa52 commit 797d0f7
Showing 1 changed file with 22 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ThreadUtils;
Expand Down Expand Up @@ -168,11 +170,18 @@ private void runInternal(final IntegrationConfig parsed) throws Exception {
try {
if (featureFlags.concurrentSourceStreamRead()) {
LOGGER.info("Concurrent source stream read enabled.");
final Collection<AutoCloseableIterator<AirbyteMessage>> streams = source.readStreams(config, catalog, stateOptional.orElse(null));
readConcurrent(config, catalog, stateOptional);
} else {
readSerial(config, catalog, stateOptional);
AutoCloseableIterator<AirbyteMessage> messageIterator = source.read(config, catalog, stateOptional.orElse(null));
readSerial(messageIterator, outputRecordCollector);
}
} finally {
stopOrphanedThreads(EXIT_HOOK,
INTERRUPT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES,
EXIT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES);
if (source instanceof AutoCloseable) {
((AutoCloseable) source).close();
}
Expand Down Expand Up @@ -231,7 +240,7 @@ private void runInternal(final IntegrationConfig parsed) throws Exception {
LOGGER.info("Completed integration: {}", integration.getClass().getName());
}

private void produceMessages(final AutoCloseableIterator<AirbyteMessage> messageIterator, final Consumer<AirbyteMessage> recordCollector) {
private static void produceMessages(final AutoCloseableIterator<AirbyteMessage> messageIterator, final Consumer<AirbyteMessage> recordCollector) {
messageIterator.getAirbyteStream().ifPresent(s -> LOGGER.debug("Producing messages for stream {}...", s));
messageIterator.forEachRemaining(recordCollector);
messageIterator.getAirbyteStream().ifPresent(s -> LOGGER.debug("Finished producing messages for stream {}..."));
Expand All @@ -240,8 +249,8 @@ private void produceMessages(final AutoCloseableIterator<AirbyteMessage> message
private void readConcurrent(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final Optional<JsonNode> stateOptional)
throws Exception {
final Collection<AutoCloseableIterator<AirbyteMessage>> streams = source.readStreams(config, catalog, stateOptional.orElse(null));

try (final ConcurrentStreamConsumer streamConsumer = new ConcurrentStreamConsumer(this::consumeFromStream, streams.size())) {
final ConcurrentStreamConsumer streamConsumer = new ConcurrentStreamConsumer(this::consumeFromStream, streams.size());
try {
/*
* Break the streams into partitions equal to the number of concurrent streams supported by the
* stream consumer.
Expand All @@ -262,31 +271,17 @@ private void readConcurrent(final JsonNode config, final ConfiguredAirbyteCatalo
} catch (final Exception e) {
LOGGER.error("Unable to perform concurrent read.", e);
throw e;
} finally {
stopOrphanedThreads(EXIT_HOOK,
INTERRUPT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES,
EXIT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES);
}
}

private void readSerial(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final Optional<JsonNode> stateOptional) throws Exception {
final AutoCloseableIterator<AirbyteMessage> messageIterator = source.read(config, catalog, stateOptional.orElse(null));
@VisibleForTesting
static private void readSerial(final AutoCloseableIterator<AirbyteMessage> iter, Consumer<AirbyteMessage> outputRecordCollector) throws Exception {
produceMessages(iter, outputRecordCollector);
try {
produceMessages(messageIterator, outputRecordCollector);
try {
messageIterator.close();
} catch (Exception e) {
LOGGER.warn("Exception closing connection: {}. This is generally fine as we've moved all data & are terminating everything. ",
e.getMessage());
}
} finally {
stopOrphanedThreads(EXIT_HOOK,
INTERRUPT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES,
EXIT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES);
iter.close();
} catch (Exception e) {
LOGGER.error("Exception closing connection: {}. This is generally fine as we've moved all data & are terminating everything. ",
e.getMessage());
}
}

Expand Down Expand Up @@ -356,7 +351,8 @@ static String parseConnectorVersion(final String connectorImage) {
}

@VisibleForTesting
static void swallowIteratorCloseErrors(AutoCloseableIterator<AirbyteMessage> iter, Consumer<AirbyteMessage> outputRecordCollector) {
static void swallowIteratorCloseErrors(AutoCloseableIterator<AirbyteMessage> iter, Callable runner) {
runner.call();

Check failure on line 355 in airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/IntegrationRunner.java

View workflow job for this annotation

GitHub Actions / Gradle Check

[Task :airbyte-cdk:java:airbyte-cdk:core:compileJava] unreported exception Exception; must be caught or declared to be thrown runner.call(); ^
try {
iter.close();
} catch (Exception e) {
Expand Down

0 comments on commit 797d0f7

Please sign in to comment.