From ffeb2021b8fd4ab853460aec9e82435b7d29f9fa Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 26 Feb 2024 16:28:24 -0800 Subject: [PATCH 1/4] Interface changes to raw table meta and transformers Summary ======= add airbyte_meta in raw table creation also add migrations to jdbc destinations write airbyte_meta in staging data transformer interface --- .../integrations/base/JavaBaseConstants.java | 11 ++- .../record_buffer/BaseSerializedBuffer.java | 10 +-- .../record_buffer/SerializableBuffer.java | 3 +- .../AsyncStreamConsumer.java | 89 ++++++++----------- .../deser/DeserializationUtil.kt | 63 +++++++++++++ .../deser/IdentityDataTransformer.java | 24 +++++ .../deser/StreamAwareDataTransformer.java | 26 ++++++ .../PartialAirbyteRecordMessage.java | 12 +++ .../AsyncStreamConsumerTest.java | 16 ++-- .../jdbc/AbstractJdbcDestination.java | 29 ++++-- .../destination/jdbc/JdbcSqlOperations.java | 17 +++- .../typing_deduping/JdbcSqlGenerator.java | 6 +- .../jdbc/AbstractJdbcDestinationTest.java | 18 +++- .../JdbcSqlGeneratorIntegrationTest.java | 4 +- .../airbyte-cdk/dependencies/build.gradle | 2 +- .../s3/avro/AvroSerializedBuffer.java | 7 ++ .../s3/csv/BaseSheetGenerator.java | 5 +- .../s3/csv/CsvSerializedBuffer.java | 4 +- .../destination/s3/csv/CsvSheetGenerator.java | 2 +- .../csv/StagingDatabaseCsvSheetGenerator.java | 7 +- .../s3/jsonl/JsonLSerializedBuffer.java | 6 ++ .../s3/parquet/ParquetSerializedBuffer.java | 2 +- .../destination/staging/AsyncFlush.java | 2 +- .../BaseDestinationV1V2Migrator.java | 4 +- .../BaseSqlGeneratorIntegrationTest.java | 4 +- .../resources/dat/sync1_messages.jsonl | 4 +- .../sqlgenerator/alltypes_inputrecords.jsonl | 2 +- 27 files changed, 282 insertions(+), 97 deletions(-) create mode 100644 airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/deser/DeserializationUtil.kt create mode 100644 airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/deser/IdentityDataTransformer.java create mode 100644 airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/deser/StreamAwareDataTransformer.java diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/JavaBaseConstants.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/JavaBaseConstants.java index 5001d6119e7a..cc4839816598 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/JavaBaseConstants.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/JavaBaseConstants.java @@ -31,11 +31,20 @@ private JavaBaseConstants() {} public static final String COLUMN_NAME_AB_LOADED_AT = "_airbyte_loaded_at"; public static final String COLUMN_NAME_AB_EXTRACTED_AT = "_airbyte_extracted_at"; public static final String COLUMN_NAME_AB_META = "_airbyte_meta"; - public static final List V2_RAW_TABLE_COLUMN_NAMES = List.of( + + // Meta was introduced later, so to avoid triggering raw table soft-reset in v1->v2 + // use this column list. + public static final List V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META = List.of( COLUMN_NAME_AB_RAW_ID, COLUMN_NAME_AB_EXTRACTED_AT, COLUMN_NAME_AB_LOADED_AT, COLUMN_NAME_DATA); + public static final List V2_RAW_TABLE_COLUMN_NAMES = List.of( + COLUMN_NAME_AB_RAW_ID, + COLUMN_NAME_AB_EXTRACTED_AT, + COLUMN_NAME_AB_LOADED_AT, + COLUMN_NAME_DATA, + COLUMN_NAME_AB_META); public static final List V2_FINAL_TABLE_METADATA_COLUMNS = List.of( COLUMN_NAME_AB_RAW_ID, COLUMN_NAME_AB_EXTRACTED_AT, diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination/record_buffer/BaseSerializedBuffer.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination/record_buffer/BaseSerializedBuffer.java index 38f951030951..9b4a9800acaf 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination/record_buffer/BaseSerializedBuffer.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination/record_buffer/BaseSerializedBuffer.java @@ -5,7 +5,6 @@ package io.airbyte.cdk.integrations.destination.record_buffer; import com.google.common.io.CountingOutputStream; -import io.airbyte.commons.json.Jsons; import io.airbyte.protocol.models.v0.AirbyteRecordMessage; import java.io.File; import java.io.IOException; @@ -66,12 +65,11 @@ protected BaseSerializedBuffer(final BufferStorage bufferStorage) throws Excepti * AirbyteRecord * * @param recordString serialized record + * @param airbyteMetaString * @param emittedAt timestamp of the record in milliseconds * @throws IOException */ - protected void writeRecord(final String recordString, final long emittedAt) throws IOException { - writeRecord(Jsons.deserialize(recordString, AirbyteRecordMessage.class).withEmittedAt(emittedAt)); - } + protected abstract void writeRecord(final String recordString, String airbyteMetaString, final long emittedAt) throws IOException; /** * Stops the writer from receiving new data and prepares it for being finalized and converted into @@ -111,7 +109,7 @@ public long accept(final AirbyteRecordMessage record) throws Exception { } @Override - public long accept(final String recordString, final long emittedAt) throws Exception { + public long accept(final String recordString, final String airbyteMetaString, final long emittedAt) throws Exception { if (!isStarted) { if (useCompression) { compressedBuffer = new GzipCompressorOutputStream(byteCounter); @@ -123,7 +121,7 @@ public long accept(final String recordString, final long emittedAt) throws Excep } if (inputStream == null && !isClosed) { final long startCount = byteCounter.getCount(); - writeRecord(recordString, emittedAt); + writeRecord(recordString, airbyteMetaString, emittedAt); return byteCounter.getCount() - startCount; } else { throw new IllegalCallerException("Buffer is already closed, it cannot accept more messages"); diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination/record_buffer/SerializableBuffer.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination/record_buffer/SerializableBuffer.java index 79477ab5cc5b..42e67f689d35 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination/record_buffer/SerializableBuffer.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination/record_buffer/SerializableBuffer.java @@ -43,11 +43,12 @@ public interface SerializableBuffer extends AutoCloseable { * the entire AirbyteRecordMessage * * @param recordString serialized record + * @param airbyteMetaString The serialized airbyte_meta entry * @param emittedAt timestamp of the record in milliseconds * @return number of bytes written to the buffer * @throws Exception */ - long accept(String recordString, long emittedAt) throws Exception; + long accept(String recordString, String airbyteMetaString, long emittedAt) throws Exception; /** * Flush a buffer implementation. diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/AsyncStreamConsumer.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/AsyncStreamConsumer.java index 711326fd919b..0d09877bc1f0 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/AsyncStreamConsumer.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/AsyncStreamConsumer.java @@ -4,6 +4,7 @@ package io.airbyte.cdk.integrations.destination_async; +import static io.airbyte.cdk.integrations.destination_async.deser.DeserializationUtil.*; import static java.util.stream.Collectors.toMap; import com.google.common.annotations.VisibleForTesting; @@ -14,6 +15,8 @@ import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction; import io.airbyte.cdk.integrations.destination_async.buffers.BufferEnqueue; import io.airbyte.cdk.integrations.destination_async.buffers.BufferManager; +import io.airbyte.cdk.integrations.destination_async.deser.IdentityDataTransformer; +import io.airbyte.cdk.integrations.destination_async.deser.StreamAwareDataTransformer; import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage; import io.airbyte.cdk.integrations.destination_async.state.FlushFailure; import io.airbyte.commons.json.Jsons; @@ -56,6 +59,8 @@ public class AsyncStreamConsumer implements SerializedAirbyteMessageConsumer { private final Set streamNames; private final FlushFailure flushFailure; private final String defaultNamespace; + + private final StreamAwareDataTransformer dataTransformer; // Note that this map will only be populated for streams with nonzero records. private final ConcurrentMap recordCounts; @@ -69,6 +74,9 @@ public class AsyncStreamConsumer implements SerializedAirbyteMessageConsumer { // a 64 bit JVM. final int PARTIAL_DESERIALIZE_REF_BYTES = 10 * 8; + // TODO: What the.. combinatorics of the constructors are getting out of hand. We should consider + // refactoring this to use a builder pattern with enforced defaults. + public AsyncStreamConsumer(final Consumer outputRecordCollector, final OnStartFunction onStart, final OnCloseFunction onClose, @@ -79,6 +87,18 @@ public AsyncStreamConsumer(final Consumer outputRecordCollector, this(outputRecordCollector, onStart, onClose, flusher, catalog, bufferManager, new FlushFailure(), defaultNamespace); } + public AsyncStreamConsumer(final Consumer outputRecordCollector, + final OnStartFunction onStart, + final OnCloseFunction onClose, + final DestinationFlushFunction flusher, + final ConfiguredAirbyteCatalog catalog, + final BufferManager bufferManager, + final String defaultNamespace, + final StreamAwareDataTransformer dataTransformer) { + this(outputRecordCollector, onStart, onClose, flusher, catalog, bufferManager, new FlushFailure(), defaultNamespace, + Executors.newFixedThreadPool(5), dataTransformer); + } + public AsyncStreamConsumer(final Consumer outputRecordCollector, final OnStartFunction onStart, final OnCloseFunction onClose, @@ -87,7 +107,21 @@ public AsyncStreamConsumer(final Consumer outputRecordCollector, final BufferManager bufferManager, final String defaultNamespace, final ExecutorService workerPool) { - this(outputRecordCollector, onStart, onClose, flusher, catalog, bufferManager, new FlushFailure(), defaultNamespace, workerPool); + this(outputRecordCollector, onStart, onClose, flusher, catalog, bufferManager, new FlushFailure(), defaultNamespace, workerPool, + new IdentityDataTransformer()); + } + + @VisibleForTesting + public AsyncStreamConsumer(final Consumer outputRecordCollector, + final OnStartFunction onStart, + final OnCloseFunction onClose, + final DestinationFlushFunction flusher, + final ConfiguredAirbyteCatalog catalog, + final BufferManager bufferManager, + final FlushFailure flushFailure, + final String defaultNamespace) { + this(outputRecordCollector, onStart, onClose, flusher, catalog, bufferManager, flushFailure, defaultNamespace, Executors.newFixedThreadPool(5), + new IdentityDataTransformer()); } @VisibleForTesting @@ -99,7 +133,8 @@ public AsyncStreamConsumer(final Consumer outputRecordCollector, final BufferManager bufferManager, final FlushFailure flushFailure, final String defaultNamespace, - final ExecutorService workerPool) { + final ExecutorService workerPool, + final StreamAwareDataTransformer dataTransformer) { this.defaultNamespace = defaultNamespace; hasStarted = false; hasClosed = false; @@ -114,18 +149,7 @@ public AsyncStreamConsumer(final Consumer outputRecordCollector, new FlushWorkers(bufferManager.getBufferDequeue(), flusher, outputRecordCollector, flushFailure, bufferManager.getStateManager(), workerPool); streamNames = StreamDescriptorUtils.fromConfiguredCatalog(catalog); this.recordCounts = new ConcurrentHashMap<>(); - } - - @VisibleForTesting - public AsyncStreamConsumer(final Consumer outputRecordCollector, - final OnStartFunction onStart, - final OnCloseFunction onClose, - final DestinationFlushFunction flusher, - final ConfiguredAirbyteCatalog catalog, - final BufferManager bufferManager, - final FlushFailure flushFailure, - final String defaultNamespace) { - this(outputRecordCollector, onStart, onClose, flusher, catalog, bufferManager, flushFailure, defaultNamespace, Executors.newFixedThreadPool(5)); + this.dataTransformer = dataTransformer; } @Override @@ -148,7 +172,7 @@ public void accept(final String messageString, final Integer sizeInBytes) throws * to try to use a thread pool to partially deserialize to get record type and stream name, we can * do it without touching buffer manager. */ - final var message = deserializeAirbyteMessage(messageString); + final var message = deserializeAirbyteMessage(messageString, this.dataTransformer); if (Type.RECORD.equals(message.getType())) { if (Strings.isNullOrEmpty(message.getRecord().getNamespace())) { message.getRecord().setNamespace(defaultNamespace); @@ -160,41 +184,6 @@ public void accept(final String messageString, final Integer sizeInBytes) throws bufferEnqueue.addRecord(message, sizeInBytes + PARTIAL_DESERIALIZE_REF_BYTES, defaultNamespace); } - /** - * Deserializes to a {@link PartialAirbyteMessage} which can represent both a Record or a State - * Message - * - * PartialAirbyteMessage holds either: - *
  • entire serialized message string when message is a valid State Message - *
  • serialized AirbyteRecordMessage when message is a valid Record Message
  • - * - * @param messageString the string to deserialize - * @return PartialAirbyteMessage if the message is valid, empty otherwise - */ - @VisibleForTesting - public static PartialAirbyteMessage deserializeAirbyteMessage(final String messageString) { - // TODO: (ryankfu) plumb in the serialized AirbyteStateMessage to match AirbyteRecordMessage code - // parity. https://github.com/airbytehq/airbyte/issues/27530 for additional context - final var partial = Jsons.tryDeserializeExact(messageString, PartialAirbyteMessage.class) - .orElseThrow(() -> new RuntimeException("Unable to deserialize PartialAirbyteMessage.")); - - final var msgType = partial.getType(); - if (Type.RECORD.equals(msgType) && partial.getRecord().getData() != null) { - // store serialized json - partial.withSerialized(partial.getRecord().getData().toString()); - // The connector doesn't need to be able to access to the record value. We can serialize it here and - // drop the json - // object. Having this data stored as a string is slightly more optimal for the memory usage. - partial.getRecord().setData(null); - } else if (Type.STATE.equals(msgType)) { - partial.withSerialized(messageString); - } else { - throw new RuntimeException(String.format("Unsupported message type: %s", msgType)); - } - - return partial; - } - @Override public void close() throws Exception { Preconditions.checkState(hasStarted, "Cannot close; has not started."); diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/deser/DeserializationUtil.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/deser/DeserializationUtil.kt new file mode 100644 index 000000000000..083769241bda --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/deser/DeserializationUtil.kt @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.cdk.integrations.destination_async.deser + +import com.google.common.annotations.VisibleForTesting +import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage +import io.airbyte.commons.json.Jsons +import io.airbyte.protocol.models.v0.AirbyteMessage + +object DeserializationUtil { + /** + * Deserializes to a [PartialAirbyteMessage] which can represent both a Record or a State + * Message + * + * PartialAirbyteMessage holds either: + * * entire serialized message string when message is a valid State Message + * * serialized AirbyteRecordMessage when message is a valid Record Message + * + * @param messageString the string to deserialize + * @return PartialAirbyteMessage if the message is valid, empty otherwise + */ + @JvmStatic + @VisibleForTesting + fun deserializeAirbyteMessage( + messageString: String?, + dataTransformer: StreamAwareDataTransformer + ): PartialAirbyteMessage { + // TODO: This is doing some sketchy assumptions by deserializing either the whole or the + // partial based on type. + // Use JsonSubTypes and extend StdDeserializer to properly handle this. + // Make immutability a first class citizen in the PartialAirbyteMessage class. + val partial = + Jsons.tryDeserializeExact(messageString, PartialAirbyteMessage::class.java) + .orElseThrow { RuntimeException("Unable to deserialize PartialAirbyteMessage.") } + + val msgType = partial.type + if (AirbyteMessage.Type.RECORD == msgType && partial.record.data != null) { + // Transform data provided by destination. + val transformedData = + dataTransformer.transform( + partial.record.streamDescriptor, + partial.record.data, + partial.record.meta + ) + // store serialized json & meta + partial.withSerialized(Jsons.serialize(transformedData.getLeft())) + partial.record.meta = transformedData.getRight() + // The connector doesn't need to be able to access to the record value. We can serialize + // it here and + // drop the json + // object. Having this data stored as a string is slightly more optimal for the memory + // usage. + partial.record.data = null + } else if (AirbyteMessage.Type.STATE == msgType) { + partial.withSerialized(messageString) + } else { + throw RuntimeException(String.format("Unsupported message type: %s", msgType)) + } + + return partial + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/deser/IdentityDataTransformer.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/deser/IdentityDataTransformer.java new file mode 100644 index 000000000000..d822689bbc93 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/deser/IdentityDataTransformer.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.integrations.destination_async.deser; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta; +import io.airbyte.protocol.models.v0.StreamDescriptor; +import org.apache.commons.lang3.tuple.ImmutablePair; + +/** + * Identity transformer which echoes back the original data and meta. + */ +public class IdentityDataTransformer implements StreamAwareDataTransformer { + + @Override + public ImmutablePair transform(StreamDescriptor streamDescriptor, + JsonNode data, + AirbyteRecordMessageMeta meta) { + return ImmutablePair.of(data, meta); + } + +} diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/deser/StreamAwareDataTransformer.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/deser/StreamAwareDataTransformer.java new file mode 100644 index 000000000000..c8ed5a8c5929 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/deser/StreamAwareDataTransformer.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.integrations.destination_async.deser; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta; +import io.airbyte.protocol.models.v0.StreamDescriptor; +import org.apache.commons.lang3.tuple.ImmutablePair; + +public interface StreamAwareDataTransformer { + + /** + * Transforms the input data by applying destination limitations and populating + * {@link AirbyteRecordMessageMeta}. The returned pair contains the transformed data and the merged + * meta information from upstream. + * + * @param streamDescriptor + * @param data + * @param meta + * @return + */ + ImmutablePair transform(StreamDescriptor streamDescriptor, JsonNode data, AirbyteRecordMessageMeta meta); + +} diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/partial_messages/PartialAirbyteRecordMessage.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/partial_messages/PartialAirbyteRecordMessage.java index ebd903fcfc87..b7c598276e2a 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/partial_messages/PartialAirbyteRecordMessage.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/partial_messages/PartialAirbyteRecordMessage.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyDescription; import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta; import io.airbyte.protocol.models.v0.StreamDescriptor; import java.util.Objects; @@ -27,6 +28,9 @@ public class PartialAirbyteRecordMessage { @JsonPropertyDescription("when the data was emitted from the source. epoch in millisecond.") private long emittedAt; + @JsonProperty("meta") + private AirbyteRecordMessageMeta meta; + public PartialAirbyteRecordMessage() {} @JsonProperty("namespace") @@ -89,6 +93,14 @@ public PartialAirbyteRecordMessage withEmittedAt(final Long emittedAt) { return this; } + public AirbyteRecordMessageMeta getMeta() { + return meta; + } + + public void setMeta(AirbyteRecordMessageMeta meta) { + this.meta = meta; + } + @Override public boolean equals(final Object o) { if (this == o) { diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/AsyncStreamConsumerTest.java b/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/AsyncStreamConsumerTest.java index 67bc7c7dc427..aa0192140042 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/AsyncStreamConsumerTest.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/AsyncStreamConsumerTest.java @@ -19,6 +19,8 @@ import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction; import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.RecordSizeEstimator; import io.airbyte.cdk.integrations.destination_async.buffers.BufferManager; +import io.airbyte.cdk.integrations.destination_async.deser.DeserializationUtil; +import io.airbyte.cdk.integrations.destination_async.deser.IdentityDataTransformer; import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage; import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteRecordMessage; import io.airbyte.cdk.integrations.destination_async.state.FlushFailure; @@ -252,7 +254,7 @@ void deserializeAirbyteMessageWithAirbyteRecord() { .withData(PAYLOAD)); final String serializedAirbyteMessage = Jsons.serialize(airbyteMessage); final String airbyteRecordString = Jsons.serialize(PAYLOAD); - final PartialAirbyteMessage partial = AsyncStreamConsumer.deserializeAirbyteMessage(serializedAirbyteMessage); + final PartialAirbyteMessage partial = DeserializationUtil.deserializeAirbyteMessage(serializedAirbyteMessage, new IdentityDataTransformer()); assertEquals(airbyteRecordString, partial.getSerialized()); } @@ -268,7 +270,7 @@ void deserializeAirbyteMessageWithBigDecimalAirbyteRecord() { .withData(payload)); final String serializedAirbyteMessage = Jsons.serialize(airbyteMessage); final String airbyteRecordString = Jsons.serialize(payload); - final PartialAirbyteMessage partial = AsyncStreamConsumer.deserializeAirbyteMessage(serializedAirbyteMessage); + final PartialAirbyteMessage partial = DeserializationUtil.deserializeAirbyteMessage(serializedAirbyteMessage, new IdentityDataTransformer()); assertEquals(airbyteRecordString, partial.getSerialized()); } @@ -282,7 +284,7 @@ void deserializeAirbyteMessageWithEmptyAirbyteRecord() { .withNamespace(SCHEMA_NAME) .withData(Jsons.jsonNode(emptyMap))); final String serializedAirbyteMessage = Jsons.serialize(airbyteMessage); - final PartialAirbyteMessage partial = AsyncStreamConsumer.deserializeAirbyteMessage(serializedAirbyteMessage); + final PartialAirbyteMessage partial = DeserializationUtil.deserializeAirbyteMessage(serializedAirbyteMessage, new IdentityDataTransformer()); assertEquals(emptyMap.toString(), partial.getSerialized()); } @@ -292,13 +294,14 @@ void deserializeAirbyteMessageWithNoStateOrRecord() { .withType(Type.LOG) .withLog(new AirbyteLogMessage()); final String serializedAirbyteMessage = Jsons.serialize(airbyteMessage); - assertThrows(RuntimeException.class, () -> AsyncStreamConsumer.deserializeAirbyteMessage(serializedAirbyteMessage)); + assertThrows(RuntimeException.class, + () -> DeserializationUtil.deserializeAirbyteMessage(serializedAirbyteMessage, new IdentityDataTransformer())); } @Test void deserializeAirbyteMessageWithAirbyteState() { final String serializedAirbyteMessage = Jsons.serialize(STATE_MESSAGE1); - final PartialAirbyteMessage partial = AsyncStreamConsumer.deserializeAirbyteMessage(serializedAirbyteMessage); + final PartialAirbyteMessage partial = DeserializationUtil.deserializeAirbyteMessage(serializedAirbyteMessage, new IdentityDataTransformer()); assertEquals(serializedAirbyteMessage, partial.getSerialized()); } @@ -309,7 +312,8 @@ void deserializeAirbyteMessageWithBadAirbyteState() { .withType(AirbyteStateType.STREAM) .withStream(new AirbyteStreamState().withStreamDescriptor(STREAM1_DESC).withStreamState(Jsons.jsonNode(1)))); final String serializedAirbyteMessage = Jsons.serialize(badState); - assertThrows(RuntimeException.class, () -> AsyncStreamConsumer.deserializeAirbyteMessage(serializedAirbyteMessage)); + assertThrows(RuntimeException.class, + () -> DeserializationUtil.deserializeAirbyteMessage(serializedAirbyteMessage, new IdentityDataTransformer())); } @Nested diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java index 86935bbe2937..03eb3203794f 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java @@ -36,7 +36,9 @@ import io.airbyte.integrations.base.destination.typing_deduping.NoopTyperDeduper; import io.airbyte.integrations.base.destination.typing_deduping.NoopV2TableMigrator; import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog; +import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator; import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper; +import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration; import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status; @@ -54,7 +56,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class AbstractJdbcDestination extends JdbcConnector implements Destination { +public abstract class AbstractJdbcDestination + extends JdbcConnector implements Destination { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractJdbcDestination.class); @@ -254,9 +257,20 @@ private void assertCustomParametersDontOverwriteDefaultParameters(final Map getDestinationHandler(final String databaseName, - final JdbcDatabase database, - final String rawTableSchema); + protected abstract JdbcDestinationHandler getDestinationHandler(final String databaseName, + final JdbcDatabase database, + final String rawTableSchema); + + /** + * Provide any migrations that the destination needs to run. Most destinations will need to provide + * an instande of + * {@link io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcV1V2Migrator} at minimum. + */ + protected abstract List> getMigrations( + final JdbcDatabase database, + final String databaseName, + final SqlGenerator sqlGenerator, + final DestinationHandler destinationHandler); /** * "database" key at root of the config json, for any other variants in config, override this @@ -321,15 +335,16 @@ private TyperDeduper getV2TyperDeduper(final JsonNode config, final ConfiguredAi final String databaseName = getDatabaseName(config); final var migrator = new JdbcV1V2Migrator(namingResolver, database, databaseName); final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator(); - final DestinationHandler destinationHandler = + final DestinationHandler destinationHandler = getDestinationHandler(databaseName, database, rawNamespaceOverride.orElse(DEFAULT_AIRBYTE_INTERNAL_NAMESPACE)); final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false); final TyperDeduper typerDeduper; + List> migrations = getMigrations(database, databaseName, sqlGenerator, destinationHandler); if (disableTypeDedupe) { - typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, List.of()); + typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, migrations); } else { typerDeduper = - new DefaultTyperDeduper<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, List.of()); + new DefaultTyperDeduper<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, migrations); } return typerDeduper; } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.java index 1ffd5f0c93ae..296d8a25ff05 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.java @@ -110,17 +110,25 @@ protected String createTableQueryV1(final String schemaName, final String tableN } protected String createTableQueryV2(final String schemaName, final String tableName) { + // Note that Meta is the last column in order, there was a time when tables didn't have meta, + // we issued Alter to add that column so it should be the last column. return String.format( """ CREATE TABLE IF NOT EXISTS %s.%s ( %s VARCHAR PRIMARY KEY, %s JSONB, %s TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, - %s TIMESTAMP WITH TIME ZONE DEFAULT NULL + %s TIMESTAMP WITH TIME ZONE DEFAULT NULL, + %s JSONB ); """, - schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, JavaBaseConstants.COLUMN_NAME_DATA, - JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT); + schemaName, + tableName, + JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, + JavaBaseConstants.COLUMN_NAME_DATA, + JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, + JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, + JavaBaseConstants.COLUMN_NAME_AB_META); } // TODO: This method seems to be used by Postgres and others while staging to local temp files. @@ -133,9 +141,10 @@ protected void writeBatchToFile(final File tmpFile, final List new TestJdbcDestination().getConnectionProperties(buildConfigWithExtraJdbcParameters(extraParam))); } - static class TestJdbcDestination extends AbstractJdbcDestination { + static class TestJdbcDestination extends AbstractJdbcDestination { private final Map defaultProperties; @@ -142,10 +148,18 @@ protected JdbcSqlGenerator getSqlGenerator() { } @Override - protected JdbcDestinationHandler getDestinationHandler(String databaseName, JdbcDatabase database, String rawTableSchema) { + protected JdbcDestinationHandler getDestinationHandler(String databaseName, JdbcDatabase database, String rawTableSchema) { return null; } + @Override + protected List> getMigrations(JdbcDatabase database, + String databaseName, + SqlGenerator sqlGenerator, + DestinationHandler destinationHandler) { + return Collections.emptyList(); + } + } } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/java/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/java/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.java index 2688b6d6e42b..565eb11f55c8 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/java/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/java/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.java @@ -110,6 +110,7 @@ protected void createRawTable(final StreamId streamId) throws Exception { .column(COLUMN_NAME_AB_EXTRACTED_AT, getTimestampWithTimeZoneType().nullable(false)) .column(COLUMN_NAME_AB_LOADED_AT, getTimestampWithTimeZoneType()) .column(COLUMN_NAME_DATA, getStructType().nullable(false)) + .column(COLUMN_NAME_AB_META, getStructType().nullable(true)) .getSQL(ParamType.INLINED)); } @@ -128,7 +129,8 @@ protected void insertRawTableRecords(final StreamId streamId, final List getDataRow(final JsonNode formattedData) { return new LinkedList<>(getRecordColumns(formattedData)); } - public List getDataRow(final UUID id, final String formattedString, final long emittedAt) { + @Override + public List getDataRow(final UUID id, final String formattedString, final long emittedAt, final String airbyteMetaString) { + // TODO: Make this abstract or default if No-op is intended in NoFlatteningSheetGenerator or + // RootLevelFlatteningSheetGenerator throw new UnsupportedOperationException("Not implemented in BaseSheetGenerator"); } diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/s3/csv/CsvSerializedBuffer.java b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/s3/csv/CsvSerializedBuffer.java index 8555dc0d58e4..8941530af736 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/s3/csv/CsvSerializedBuffer.java +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/s3/csv/CsvSerializedBuffer.java @@ -72,8 +72,8 @@ protected void writeRecord(final AirbyteRecordMessage record) throws IOException } @Override - protected void writeRecord(final String recordString, final long emittedAt) throws IOException { - csvPrinter.printRecord(csvSheetGenerator.getDataRow(UUID.randomUUID(), recordString, emittedAt)); + protected void writeRecord(final String recordString, String airbyteMetaString, final long emittedAt) throws IOException { + csvPrinter.printRecord(csvSheetGenerator.getDataRow(UUID.randomUUID(), recordString, emittedAt, airbyteMetaString)); } @Override diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/s3/csv/CsvSheetGenerator.java b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/s3/csv/CsvSheetGenerator.java index 3da15a08780c..04730e2f4cfe 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/s3/csv/CsvSheetGenerator.java +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/s3/csv/CsvSheetGenerator.java @@ -24,7 +24,7 @@ public interface CsvSheetGenerator { List getDataRow(JsonNode formattedData); - List getDataRow(UUID id, String formattedString, long emittedAt); + List getDataRow(UUID id, String formattedString, long emittedAt, String formattedAirbyteMetaString); final class Factory { diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/s3/csv/StagingDatabaseCsvSheetGenerator.java b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/s3/csv/StagingDatabaseCsvSheetGenerator.java index 9ff3ecb9dab6..546862719c99 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/s3/csv/StagingDatabaseCsvSheetGenerator.java +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/s3/csv/StagingDatabaseCsvSheetGenerator.java @@ -48,7 +48,7 @@ public List getHeaderRow() { @Override public List getDataRow(final UUID id, final AirbyteRecordMessage recordMessage) { - return getDataRow(id, Jsons.serialize(recordMessage.getData()), recordMessage.getEmittedAt()); + return getDataRow(id, Jsons.serialize(recordMessage.getData()), recordMessage.getEmittedAt(), Jsons.serialize(recordMessage.getMeta())); } @Override @@ -57,13 +57,14 @@ public List getDataRow(final JsonNode formattedData) { } @Override - public List getDataRow(final UUID id, final String formattedString, final long emittedAt) { + public List getDataRow(final UUID id, final String formattedString, final long emittedAt, String formattedAirbyteMetaString) { if (useDestinationsV2Columns) { return List.of( id, Instant.ofEpochMilli(emittedAt), "", - formattedString); + formattedString, + formattedAirbyteMetaString); } else { return List.of( id, diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/s3/jsonl/JsonLSerializedBuffer.java b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/s3/jsonl/JsonLSerializedBuffer.java index 6e901ce7a19f..3a85a8c0c136 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/s3/jsonl/JsonLSerializedBuffer.java +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/s3/jsonl/JsonLSerializedBuffer.java @@ -61,6 +61,12 @@ protected void writeRecord(final AirbyteRecordMessage record) { printWriter.println(Jsons.serialize(json)); } + @Override + protected void writeRecord(String recordString, String airbyteMetaString, long emittedAt) { + // TODO Remove this double deserialization when S3 Destinations moves to Async. + writeRecord(Jsons.deserialize(recordString, AirbyteRecordMessage.class).withEmittedAt(emittedAt)); + } + @Override protected void flushWriter() { printWriter.flush(); diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/s3/parquet/ParquetSerializedBuffer.java b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/s3/parquet/ParquetSerializedBuffer.java index f33778d751b7..d4f9324260c5 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/s3/parquet/ParquetSerializedBuffer.java +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/s3/parquet/ParquetSerializedBuffer.java @@ -102,7 +102,7 @@ public long accept(final AirbyteRecordMessage record) throws Exception { } @Override - public long accept(final String recordString, final long emittedAt) throws Exception { + public long accept(final String recordString, final String airbyteMetaString, final long emittedAt) throws Exception { throw new UnsupportedOperationException("This method is not supported for ParquetSerializedBuffer"); } diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/staging/AsyncFlush.java b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/staging/AsyncFlush.java index 0ad036367837..aa4008bd850d 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/staging/AsyncFlush.java +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/staging/AsyncFlush.java @@ -75,7 +75,7 @@ public void flush(final StreamDescriptor decs, final Stream?"}}} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/resources/sqlgenerator/alltypes_inputrecords.jsonl b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/resources/sqlgenerator/alltypes_inputrecords.jsonl index c21fc0bbb6ab..8f8ced8a26a1 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/resources/sqlgenerator/alltypes_inputrecords.jsonl +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/resources/sqlgenerator/alltypes_inputrecords.jsonl @@ -2,6 +2,6 @@ {"_airbyte_raw_id": "53ce75a5-5bcc-47a3-b45c-96c2015cfe35", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": null, "struct": null, "string": null, "number": null, "integer": null, "boolean": null, "timestamp_with_timezone": null, "timestamp_without_timezone": null, "time_with_timezone": null, "time_without_timezone": null, "date": null, "unknown": null}} {"_airbyte_raw_id": "7e1fac0c-017e-4ad6-bc78-334a34d64fbe", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 3, "id2": 100, "updated_at": "2023-01-01T01:00:00Z"}} // Note that array and struct have invalid values ({} and [] respectively). -{"_airbyte_raw_id": "84242b60-3a34-4531-ad75-a26702960a9a", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 4, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": {}, "struct": [], "string": null, "number": "foo", "integer": "bar", "boolean": "fizz", "timestamp_with_timezone": {}, "timestamp_without_timezone": {}, "time_with_timezone": {}, "time_without_timezone": {}, "date": "airbyte", "unknown": null}} +{"_airbyte_raw_id": "84242b60-3a34-4531-ad75-a26702960a9a", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 4, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": {}, "struct": [], "string": null, "number": "foo", "integer": "bar", "boolean": "fizz", "timestamp_with_timezone": {}, "timestamp_without_timezone": {}, "time_with_timezone": {}, "time_without_timezone": {}, "date": "airbyte", "unknown": null}, "_airbyte_meta": {"changes": [{"field": "string", "change": "NULLED", "reason": "SOURCE_SERIALIZATION_ERROR"}]}} {"_airbyte_raw_id": "a4a783b5-7729-4d0b-b659-48ceb08713f1", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 5, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "number": 67.174118, "struct": {"nested_number": 67.174118}, "array": [67.174118], "unknown": 67.174118}} {"_airbyte_raw_id": "7e1fac0c-017e-4ad6-bc78-334a34d64fce", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 6, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "IamACaseSensitiveColumnName": "Case senstive value"}} \ No newline at end of file From f31f42c045765b167db9229cd803916ff3ad5ebf Mon Sep 17 00:00:00 2001 From: Gireesh Sreepathi Date: Tue, 12 Mar 2024 19:55:00 -0700 Subject: [PATCH 2/4] fix unit test mock --- .../typing_deduping/DestinationV1V2MigratorTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.java index 2f582274438b..03ef289f07ef 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.java @@ -5,7 +5,7 @@ package io.airbyte.integrations.base.destination.typing_deduping; import static io.airbyte.cdk.integrations.base.JavaBaseConstants.LEGACY_RAW_TABLE_COLUMNS; -import static io.airbyte.cdk.integrations.base.JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES; +import static io.airbyte.cdk.integrations.base.JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META; import static org.mockito.ArgumentMatchers.any; import io.airbyte.protocol.models.v0.DestinationSyncMode; @@ -97,7 +97,7 @@ public static BaseDestinationV1V2Migrator makeMockMigrator(final boolean v2Names Mockito.when(migrator.doesAirbyteInternalNamespaceExist(any())).thenReturn(v2NamespaceExists); final var existingTable = v2TableExists ? Optional.of("v2_raw") : Optional.empty(); Mockito.when(migrator.getTableIfExists("raw", "raw_table")).thenReturn(existingTable); - Mockito.when(migrator.schemaMatchesExpectation("v2_raw", V2_RAW_TABLE_COLUMN_NAMES)).thenReturn(v2RawSchemaMatches); + Mockito.when(migrator.schemaMatchesExpectation("v2_raw", V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META)).thenReturn(v2RawSchemaMatches); Mockito.when(migrator.convertToV1RawName(any())).thenReturn(new NamespacedTableName("v1_raw_namespace", "v1_raw_table")); final var existingV1RawTable = v1RawTableExists ? Optional.of("v1_raw") : Optional.empty(); From 0154bbcbbcb3012ad85e26aca1e0dc13a707e91b Mon Sep 17 00:00:00 2001 From: Gireesh Sreepathi Date: Wed, 13 Mar 2024 13:01:36 -0700 Subject: [PATCH 3/4] Add additional check to skip v1v2 migration --- .../io/airbyte/cdk/integrations/base/JavaBaseConstants.java | 3 ++- .../typing_deduping/BaseDestinationV1V2Migrator.java | 6 +++++- .../typing_deduping/DestinationV1V2MigratorTest.java | 2 ++ 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/JavaBaseConstants.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/JavaBaseConstants.java index cc4839816598..97b611cb7d60 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/JavaBaseConstants.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/JavaBaseConstants.java @@ -5,6 +5,7 @@ package io.airbyte.cdk.integrations.base; import java.util.List; +import java.util.Set; public final class JavaBaseConstants { @@ -34,7 +35,7 @@ private JavaBaseConstants() {} // Meta was introduced later, so to avoid triggering raw table soft-reset in v1->v2 // use this column list. - public static final List V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META = List.of( + public static final Set V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META = Set.of( COLUMN_NAME_AB_RAW_ID, COLUMN_NAME_AB_EXTRACTED_AT, COLUMN_NAME_AB_LOADED_AT, diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseDestinationV1V2Migrator.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseDestinationV1V2Migrator.java index afc3a684a60e..e8ab5ad8950c 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseDestinationV1V2Migrator.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseDestinationV1V2Migrator.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.base.destination.typing_deduping; import static io.airbyte.cdk.integrations.base.JavaBaseConstants.LEGACY_RAW_TABLE_COLUMNS; +import static io.airbyte.cdk.integrations.base.JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES; import static io.airbyte.cdk.integrations.base.JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META; import io.airbyte.protocol.models.v0.DestinationSyncMode; @@ -89,7 +90,10 @@ private boolean doesV1RawTableMatchExpectedSchema(final DialectTableDefinition e * @param existingV2AirbyteRawTable the v2 raw table */ private void validateAirbyteInternalNamespaceRawTableMatchExpectedV2Schema(final DialectTableDefinition existingV2AirbyteRawTable) { - if (!schemaMatchesExpectation(existingV2AirbyteRawTable, V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META)) { + // Account for the fact that the meta column was added later, so skip the rebuilding of the raw + // table. + if (!(schemaMatchesExpectation(existingV2AirbyteRawTable, V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META) || + schemaMatchesExpectation(existingV2AirbyteRawTable, V2_RAW_TABLE_COLUMN_NAMES))) { throw new UnexpectedSchemaException("Destination V2 Raw Table does not match expected Schema"); } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.java index 03ef289f07ef..47382e099cb9 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.base.destination.typing_deduping; import static io.airbyte.cdk.integrations.base.JavaBaseConstants.LEGACY_RAW_TABLE_COLUMNS; +import static io.airbyte.cdk.integrations.base.JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES; import static io.airbyte.cdk.integrations.base.JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META; import static org.mockito.ArgumentMatchers.any; @@ -97,6 +98,7 @@ public static BaseDestinationV1V2Migrator makeMockMigrator(final boolean v2Names Mockito.when(migrator.doesAirbyteInternalNamespaceExist(any())).thenReturn(v2NamespaceExists); final var existingTable = v2TableExists ? Optional.of("v2_raw") : Optional.empty(); Mockito.when(migrator.getTableIfExists("raw", "raw_table")).thenReturn(existingTable); + Mockito.when(migrator.schemaMatchesExpectation("v2_raw", V2_RAW_TABLE_COLUMN_NAMES)).thenReturn(false); Mockito.when(migrator.schemaMatchesExpectation("v2_raw", V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META)).thenReturn(v2RawSchemaMatches); Mockito.when(migrator.convertToV1RawName(any())).thenReturn(new NamespacedTableName("v1_raw_namespace", "v1_raw_table")); From 1c628a39c653f6da11206f4044f5c296760dd4d5 Mon Sep 17 00:00:00 2001 From: Gireesh Sreepathi Date: Wed, 13 Mar 2024 14:37:03 -0700 Subject: [PATCH 4/4] version logistics --- airbyte-cdk/java/airbyte-cdk/README.md | 1 + .../java/airbyte-cdk/core/src/main/resources/version.properties | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 78fab79dba44..a1c091720796 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -166,6 +166,7 @@ MavenLocal debugging steps: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.24.0 | 2024-03-13 | [\#35944](https://github.com/airbytehq/airbyte/pull/35944) | Add `_airbyte_meta` in raw table and test fixture updates | | 0.23.20 | 2024-03-12 | [\#36011](https://github.com/airbytehq/airbyte/pull/36011) | Debezium configuration for conversion of null value on a column with default value. | | 0.23.19 | 2024-03-11 | [\#35904](https://github.com/airbytehq/airbyte/pull/35904) | Add retries to the debezium engine. | | 0.23.18 | 2024-03-07 | [\#35899](https://github.com/airbytehq/airbyte/pull/35899) | Null check when retrieving destination state | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 8eff806ec32c..31662eadd539 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.23.20 +version=0.24.0