Skip to content

Commit

Permalink
DV2: Only run T+D if we have zero records or the previous sync left b…
Browse files Browse the repository at this point in the history
…ehind un-T+D-ed records (#33232)

Co-authored-by: edgao <[email protected]>
  • Loading branch information
edgao and edgao authored Dec 13, 2023
1 parent 5b7dfda commit d27ea33
Show file tree
Hide file tree
Showing 35 changed files with 313 additions and 149 deletions.
4 changes: 3 additions & 1 deletion airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,9 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.7.2 | 2023-12-11 | [\#33307](https://github.com/airbytehq/airbyte/pull/33307) | Fix DV2 JDBC type mappings (code changes in [\#33307](https://github.com/airbytehq/airbyte/pull/33307)). |
| 0.7.4 | 2023-12-13 | [\#33232](https://github.com/airbytehq/airbyte/pull/33232) | Track stream record count during sync; only run T+D if a stream had nonzero records or the previous sync left unprocessed records. |
| 0.7.3 | 2023-12-13 | [\#33369](https://github.com/airbytehq/airbyte/pull/33369) | Extract shared JDBC T+D code. |
| 0.7.2 | 2023-12-11 | [\#33307](https://github.com/airbytehq/airbyte/pull/33307) | Fix DV2 JDBC type mappings (code changes in [\#33307](https://github.com/airbytehq/airbyte/pull/33307)). |
| 0.7.1 | 2023-12-01 | [\#33027](https://github.com/airbytehq/airbyte/pull/33027) | Add the abstract DB source debugger. |
| 0.7.0 | 2023-12-07 | [\#32326](https://github.com/airbytehq/airbyte/pull/32326) | Destinations V2 changes for JDBC destinations |
| 0.6.4 | 2023-12-06 | [\#33082](https://github.com/airbytehq/airbyte/pull/33082) | Improvements to schema snapshot error handling + schema snapshot history scope (scoped to configured DB). |
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.destination;

import java.util.Optional;

/**
* @param recordsWritten The number of records written to the stream, or empty if the caller does
* not track this information. (this is primarily for backwards-compatibility with the legacy
* destinations framework; new implementations should always provide this information). If
* this value is empty, consumers should assume that the sync wrote nonzero records for this
* stream.
*/
public record StreamSyncSummary(Optional<Long> recordsWritten) {

public static final StreamSyncSummary DEFAULT = new StreamSyncSummary(Optional.empty());

}
Original file line number Diff line number Diff line change
Expand Up @@ -310,14 +310,16 @@ protected void close(final boolean hasFailed) throws Exception {
* not bother committing. otherwise attempt to commit
*/
if (stateManager.listFlushed().isEmpty()) {
onClose.accept(hasFailed);
// Not updating this class to track record count, because we want to kill it in favor of the
// AsyncStreamConsumer
onClose.accept(hasFailed, new HashMap<>());
} else {
/*
* if any state message was flushed that means we should try to commit what we have. if
* hasFailed=false, then it could be full success. if hasFailed=true, then going for partial
* success.
*/
onClose.accept(false);
onClose.accept(false, null);
}

stateManager.listCommitted().forEach(outputRecordCollector);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@

package io.airbyte.cdk.integrations.destination.buffered_stream_consumer;

import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.cdk.integrations.destination.StreamSyncSummary;
import io.airbyte.commons.functional.CheckedBiConsumer;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.util.Map;

/**
* Interface allowing destination to specify clean up logic that must be executed after all
* record-related logic has finished.
* <p>
* The map of StreamSyncSummaries MUST be non-null, but MAY be empty. Streams not present in the map
* will be treated as equivalent to {@link StreamSyncSummary#DEFAULT}.
*/
public interface OnCloseFunction extends CheckedConsumer<Boolean, Exception> {

@Override
void accept(Boolean hasFailed) throws Exception;
public interface OnCloseFunction extends CheckedBiConsumer<Boolean, Map<StreamDescriptor, StreamSyncSummary>, Exception> {

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

package io.airbyte.cdk.integrations.destination_async;

import static java.util.stream.Collectors.toMap;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer;
import io.airbyte.cdk.integrations.destination.StreamSyncSummary;
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction;
import io.airbyte.cdk.integrations.destination_async.buffers.BufferEnqueue;
import io.airbyte.cdk.integrations.destination_async.buffers.BufferManager;
Expand All @@ -18,8 +21,14 @@
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
Expand Down Expand Up @@ -47,6 +56,8 @@ public class AsyncStreamConsumer implements SerializedAirbyteMessageConsumer {
private final Set<StreamDescriptor> streamNames;
private final FlushFailure flushFailure;
private final String defaultNamespace;
// Note that this map will only be populated for streams with nonzero records.
private final ConcurrentMap<StreamDescriptor, AtomicLong> recordCounts;

private boolean hasStarted;
private boolean hasClosed;
Expand Down Expand Up @@ -102,6 +113,7 @@ public AsyncStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
flushWorkers =
new FlushWorkers(bufferManager.getBufferDequeue(), flusher, outputRecordCollector, flushFailure, bufferManager.getStateManager(), workerPool);
streamNames = StreamDescriptorUtils.fromConfiguredCatalog(catalog);
this.recordCounts = new ConcurrentHashMap<>();
}

@VisibleForTesting
Expand All @@ -113,18 +125,7 @@ public AsyncStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
final BufferManager bufferManager,
final FlushFailure flushFailure,
final String defaultNamespace) {
this.defaultNamespace = defaultNamespace;
hasStarted = false;
hasClosed = false;

this.onStart = onStart;
this.onClose = onClose;
this.catalog = catalog;
this.bufferManager = bufferManager;
bufferEnqueue = bufferManager.getBufferEnqueue();
this.flushFailure = flushFailure;
flushWorkers = new FlushWorkers(bufferManager.getBufferDequeue(), flusher, outputRecordCollector, flushFailure, bufferManager.getStateManager());
streamNames = StreamDescriptorUtils.fromConfiguredCatalog(catalog);
this(outputRecordCollector, onStart, onClose, flusher, catalog, bufferManager, flushFailure, defaultNamespace, Executors.newFixedThreadPool(5));
}

@Override
Expand Down Expand Up @@ -153,6 +154,8 @@ public void accept(final String messageString, final Integer sizeInBytes) throws
message.getRecord().setNamespace(defaultNamespace);
}
validateRecord(message);

getRecordCounter(message.getRecord().getStreamDescriptor()).incrementAndGet();
}
bufferEnqueue.addRecord(message, sizeInBytes + PARTIAL_DESERIALIZE_REF_BYTES);
}
Expand Down Expand Up @@ -204,13 +207,22 @@ public void close() throws Exception {
flushWorkers.close();

bufferManager.close();
onClose.accept(hasFailed);

final Map<StreamDescriptor, StreamSyncSummary> streamSyncSummaries = streamNames.stream().collect(toMap(
streamDescriptor -> streamDescriptor,
streamDescriptor -> new StreamSyncSummary(
Optional.of(getRecordCounter(streamDescriptor).get()))));
onClose.accept(hasFailed, streamSyncSummaries);

// as this throws an exception, we need to be after all other close functions.
propagateFlushWorkerExceptionIfPresent();
LOGGER.info("{} closed", AsyncStreamConsumer.class);
}

private AtomicLong getRecordCounter(final StreamDescriptor streamDescriptor) {
return recordCounts.computeIfAbsent(streamDescriptor, sd -> new AtomicLong());
}

private void propagateFlushWorkerExceptionIfPresent() throws Exception {
if (flushFailure.isFailed()) {
hasFailed = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@

package io.airbyte.cdk.integrations.destination_async;

import java.util.function.Consumer;
import io.airbyte.cdk.integrations.destination.StreamSyncSummary;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.util.Map;
import java.util.function.BiConsumer;

/**
* Async version of
* {@link io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseFunction}.
* Separately out for easier versioning.
*/
public interface OnCloseFunction extends Consumer<Boolean> {
public interface OnCloseFunction extends BiConsumer<Boolean, Map<StreamDescriptor, StreamSyncSummary>> {

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.util.Objects;

// TODO: (ryankfu) remove this and test with low memory resources to ensure OOM is still not a
Expand Down Expand Up @@ -116,4 +117,8 @@ public String toString() {
'}';
}

public StreamDescriptor getStreamDescriptor() {
return new StreamDescriptor().withName(stream).withNamespace(namespace);
}

}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.7.3
version=0.7.4
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -237,7 +238,7 @@ void testExceptionAfterNoStateMessages() throws Exception {

@Test
void testExceptionDuringOnClose() throws Exception {
doThrow(new IllegalStateException("induced exception")).when(onClose).accept(false);
doThrow(new IllegalStateException("induced exception")).when(onClose).accept(false, new HashMap<>());

final List<AirbyteMessage> expectedRecordsBatch1 = generateRecords(1_000);
final List<AirbyteMessage> expectedRecordsBatch2 = generateRecords(1_000);
Expand Down Expand Up @@ -507,13 +508,13 @@ private BufferedStreamConsumer getConsumerWithFlushFrequency() {

private void verifyStartAndClose() throws Exception {
verify(onStart).call();
verify(onClose).accept(false);
verify(onClose).accept(false, new HashMap<>());
}

/** Indicates that a failure occurred while consuming AirbyteMessages */
private void verifyStartAndCloseFailure() throws Exception {
verify(onStart).call();
verify(onClose).accept(true);
verify(onClose).accept(true, new HashMap<>());
}

private static void consumeRecords(final BufferedStreamConsumer consumer, final Collection<AirbyteMessage> records) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ void testBackPressure() throws Exception {
consumer = new AsyncStreamConsumer(
m -> {},
() -> {},
(hasFailed) -> {},
(hasFailed, recordCounts) -> {},
flushFunction,
CATALOG,
new BufferManager(1024 * 10),
Expand Down Expand Up @@ -365,7 +365,7 @@ private static List<AirbyteMessage> generateRecords(final long targetSizeInBytes

private void verifyStartAndClose() throws Exception {
verify(onStart).call();
verify(onClose).accept(any());
verify(onClose).accept(any(), any());
}

@SuppressWarnings({"unchecked", "SameParameterValue"})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,9 @@ private static RecordWriter<PartialAirbyteMessage> recordWriterFunction(final Jd
* Tear down functionality
*/
private static OnCloseFunction onCloseFunction(final TyperDeduper typerDeduper) {
return (hasFailed) -> {
return (hasFailed, streamSyncSummaries) -> {
try {
typerDeduper.typeAndDedupe();
typerDeduper.typeAndDedupe(streamSyncSummaries);
typerDeduper.commitFinalTables();
typerDeduper.cleanup();
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private static OnStartFunction onStartFunction(final Map<AirbyteStreamNameNamesp
private static RecordWriter<AirbyteRecordMessage> recordWriterFunction(final Map<AirbyteStreamNameNamespacePair, StreamCopier> pairToCopier,
final SqlOperations sqlOperations,
final Map<AirbyteStreamNameNamespacePair, Long> pairToIgnoredRecordCount) {
return (AirbyteStreamNameNamespacePair pair, List<AirbyteRecordMessage> records) -> {
return (final AirbyteStreamNameNamespacePair pair, final List<AirbyteRecordMessage> records) -> {
final var fileName = pairToCopier.get(pair).prepareStagingFile();
for (final AirbyteRecordMessage recordMessage : records) {
final var id = UUID.randomUUID();
Expand All @@ -109,7 +109,7 @@ private static RecordWriter<AirbyteRecordMessage> recordWriterFunction(final Map
}

private static CheckAndRemoveRecordWriter removeStagingFilePrinter(final Map<AirbyteStreamNameNamespacePair, StreamCopier> pairToCopier) {
return (AirbyteStreamNameNamespacePair pair, String stagingFileName) -> {
return (final AirbyteStreamNameNamespacePair pair, final String stagingFileName) -> {
final String currentFileName = pairToCopier.get(pair).getCurrentFile();
if (stagingFileName != null && currentFileName != null && !stagingFileName.equals(currentFileName)) {
pairToCopier.get(pair).closeNonCurrentStagingFileWriters();
Expand All @@ -123,7 +123,7 @@ private static OnCloseFunction onCloseFunction(final Map<AirbyteStreamNameNamesp
final SqlOperations sqlOperations,
final Map<AirbyteStreamNameNamespacePair, Long> pairToIgnoredRecordCount,
final DataSource dataSource) {
return (hasFailed) -> {
return (hasFailed, streamSyncSummaries) -> {
pairToIgnoredRecordCount
.forEach((pair, count) -> LOGGER.warn("A total of {} record(s) of data from stream {} were invalid and were ignored.", count, pair));
closeAsOneTransaction(pairToCopier, hasFailed, database, sqlOperations, dataSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLType;
import java.time.Instant;
import java.util.LinkedHashMap;
import java.util.Optional;
import java.util.UUID;
Expand Down Expand Up @@ -59,8 +58,8 @@ public boolean isFinalTableEmpty(final StreamId id) throws Exception {
}

@Override
public Optional<Instant> getMinTimestampForSync(final StreamId id) throws Exception {
return Optional.empty();
public InitialRawTableState getInitialRawTableState(final StreamId id) throws Exception {
return new InitialRawTableState(true, Optional.empty());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,11 @@ public static OnCloseFunction onCloseFunction(final JdbcDatabase database,
final List<WriteConfig> writeConfigs,
final boolean purgeStagingData,
final TyperDeduper typerDeduper) {
return (hasFailed) -> {
return (hasFailed, streamSyncSummaries) -> {
// After moving data from staging area to the target table (airybte_raw) clean up the staging
// area (if user configured)
log.info("Cleaning up destination started for {} streams", writeConfigs.size());
typerDeduper.typeAndDedupe();
typerDeduper.typeAndDedupe(streamSyncSummaries);
for (final WriteConfig writeConfig : writeConfigs) {
final String schemaName = writeConfig.getOutputSchemaName();
if (purgeStagingData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private FlushBufferFunction flushBufferFunction(final BlobStorageOperations stor

private OnCloseFunction onCloseFunction(final BlobStorageOperations storageOperations,
final List<WriteConfig> writeConfigs) {
return (hasFailed) -> {
return (hasFailed, streamSyncSummaries) -> {
if (hasFailed) {
LOGGER.info("Cleaning up destination started for {} streams", writeConfigs.size());
for (final WriteConfig writeConfig : writeConfigs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,15 @@ public SerializedAirbyteMessageConsumer createAsync(final Consumer<AirbyteMessag
outputRecordCollector,
GeneralStagingFunctions.onStartFunction(database, stagingOperations, writeConfigs, typerDeduper),
// todo (cgardens) - wrapping the old close function to avoid more code churn.
(hasFailed) -> {
(hasFailed, streamSyncSummaries) -> {
try {
GeneralStagingFunctions.onCloseFunction(database, stagingOperations, writeConfigs, purgeStagingData, typerDeduper).accept(false);
} catch (Exception e) {
GeneralStagingFunctions.onCloseFunction(
database,
stagingOperations,
writeConfigs,
purgeStagingData,
typerDeduper).accept(false, streamSyncSummaries);
} catch (final Exception e) {
throw new RuntimeException(e);
}
},
Expand All @@ -121,7 +126,7 @@ public SerializedAirbyteMessageConsumer createAsync(final Consumer<AirbyteMessag
defaultNamespace);
}

private static long getMemoryLimit(Optional<Long> bufferMemoryLimit) {
private static long getMemoryLimit(final Optional<Long> bufferMemoryLimit) {
return bufferMemoryLimit.orElse((long) (Runtime.getRuntime().maxMemory() * MEMORY_LIMIT_RATIO));
}

Expand Down
Loading

0 comments on commit d27ea33

Please sign in to comment.