Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destinations CDK: Plumbing related to airbyte_meta from protocol to raw table #35944

Merged
merged 4 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.cdk.integrations.base;

import java.util.List;
import java.util.Set;

public final class JavaBaseConstants {

Expand All @@ -31,11 +32,20 @@ private JavaBaseConstants() {}
public static final String COLUMN_NAME_AB_LOADED_AT = "_airbyte_loaded_at";
public static final String COLUMN_NAME_AB_EXTRACTED_AT = "_airbyte_extracted_at";
public static final String COLUMN_NAME_AB_META = "_airbyte_meta";
public static final List<String> V2_RAW_TABLE_COLUMN_NAMES = List.of(

// Meta was introduced later, so to avoid triggering raw table soft-reset in v1->v2
// use this column list.
public static final Set<String> V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META = Set.of(
COLUMN_NAME_AB_RAW_ID,
COLUMN_NAME_AB_EXTRACTED_AT,
COLUMN_NAME_AB_LOADED_AT,
COLUMN_NAME_DATA);
public static final List<String> V2_RAW_TABLE_COLUMN_NAMES = List.of(
COLUMN_NAME_AB_RAW_ID,
COLUMN_NAME_AB_EXTRACTED_AT,
COLUMN_NAME_AB_LOADED_AT,
COLUMN_NAME_DATA,
COLUMN_NAME_AB_META);
public static final List<String> V2_FINAL_TABLE_METADATA_COLUMNS = List.of(
COLUMN_NAME_AB_RAW_ID,
COLUMN_NAME_AB_EXTRACTED_AT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.airbyte.cdk.integrations.destination.record_buffer;

import com.google.common.io.CountingOutputStream;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -66,12 +65,11 @@ protected BaseSerializedBuffer(final BufferStorage bufferStorage) throws Excepti
* AirbyteRecord
*
* @param recordString serialized record
* @param airbyteMetaString
* @param emittedAt timestamp of the record in milliseconds
* @throws IOException
*/
protected void writeRecord(final String recordString, final long emittedAt) throws IOException {
writeRecord(Jsons.deserialize(recordString, AirbyteRecordMessage.class).withEmittedAt(emittedAt));
}
protected abstract void writeRecord(final String recordString, String airbyteMetaString, final long emittedAt) throws IOException;

/**
* Stops the writer from receiving new data and prepares it for being finalized and converted into
Expand Down Expand Up @@ -111,7 +109,7 @@ public long accept(final AirbyteRecordMessage record) throws Exception {
}

@Override
public long accept(final String recordString, final long emittedAt) throws Exception {
public long accept(final String recordString, final String airbyteMetaString, final long emittedAt) throws Exception {
if (!isStarted) {
if (useCompression) {
compressedBuffer = new GzipCompressorOutputStream(byteCounter);
Expand All @@ -123,7 +121,7 @@ public long accept(final String recordString, final long emittedAt) throws Excep
}
if (inputStream == null && !isClosed) {
final long startCount = byteCounter.getCount();
writeRecord(recordString, emittedAt);
writeRecord(recordString, airbyteMetaString, emittedAt);
return byteCounter.getCount() - startCount;
} else {
throw new IllegalCallerException("Buffer is already closed, it cannot accept more messages");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@ public interface SerializableBuffer extends AutoCloseable {
* the entire AirbyteRecordMessage
*
* @param recordString serialized record
* @param airbyteMetaString The serialized airbyte_meta entry
* @param emittedAt timestamp of the record in milliseconds
* @return number of bytes written to the buffer
* @throws Exception
*/
long accept(String recordString, long emittedAt) throws Exception;
long accept(String recordString, String airbyteMetaString, long emittedAt) throws Exception;

/**
* Flush a buffer implementation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.cdk.integrations.destination_async;

import static io.airbyte.cdk.integrations.destination_async.deser.DeserializationUtil.*;
import static java.util.stream.Collectors.toMap;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -14,6 +15,8 @@
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction;
import io.airbyte.cdk.integrations.destination_async.buffers.BufferEnqueue;
import io.airbyte.cdk.integrations.destination_async.buffers.BufferManager;
import io.airbyte.cdk.integrations.destination_async.deser.IdentityDataTransformer;
import io.airbyte.cdk.integrations.destination_async.deser.StreamAwareDataTransformer;
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage;
import io.airbyte.cdk.integrations.destination_async.state.FlushFailure;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -56,6 +59,8 @@ public class AsyncStreamConsumer implements SerializedAirbyteMessageConsumer {
private final Set<StreamDescriptor> streamNames;
private final FlushFailure flushFailure;
private final String defaultNamespace;

private final StreamAwareDataTransformer dataTransformer;
// Note that this map will only be populated for streams with nonzero records.
private final ConcurrentMap<StreamDescriptor, AtomicLong> recordCounts;

Expand All @@ -69,6 +74,9 @@ public class AsyncStreamConsumer implements SerializedAirbyteMessageConsumer {
// a 64 bit JVM.
final int PARTIAL_DESERIALIZE_REF_BYTES = 10 * 8;

// TODO: What the.. combinatorics of the constructors are getting out of hand. We should consider
// refactoring this to use a builder pattern with enforced defaults.

public AsyncStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
final OnStartFunction onStart,
final OnCloseFunction onClose,
Expand All @@ -79,6 +87,18 @@ public AsyncStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
this(outputRecordCollector, onStart, onClose, flusher, catalog, bufferManager, new FlushFailure(), defaultNamespace);
}

public AsyncStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
final OnStartFunction onStart,
final OnCloseFunction onClose,
final DestinationFlushFunction flusher,
final ConfiguredAirbyteCatalog catalog,
final BufferManager bufferManager,
final String defaultNamespace,
final StreamAwareDataTransformer dataTransformer) {
this(outputRecordCollector, onStart, onClose, flusher, catalog, bufferManager, new FlushFailure(), defaultNamespace,
Executors.newFixedThreadPool(5), dataTransformer);
gisripa marked this conversation as resolved.
Show resolved Hide resolved
}

public AsyncStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
final OnStartFunction onStart,
final OnCloseFunction onClose,
Expand All @@ -87,7 +107,21 @@ public AsyncStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
final BufferManager bufferManager,
final String defaultNamespace,
final ExecutorService workerPool) {
this(outputRecordCollector, onStart, onClose, flusher, catalog, bufferManager, new FlushFailure(), defaultNamespace, workerPool);
this(outputRecordCollector, onStart, onClose, flusher, catalog, bufferManager, new FlushFailure(), defaultNamespace, workerPool,
new IdentityDataTransformer());
}

@VisibleForTesting
public AsyncStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
final OnStartFunction onStart,
final OnCloseFunction onClose,
final DestinationFlushFunction flusher,
final ConfiguredAirbyteCatalog catalog,
final BufferManager bufferManager,
final FlushFailure flushFailure,
final String defaultNamespace) {
this(outputRecordCollector, onStart, onClose, flusher, catalog, bufferManager, flushFailure, defaultNamespace, Executors.newFixedThreadPool(5),
new IdentityDataTransformer());
}

@VisibleForTesting
Expand All @@ -99,7 +133,8 @@ public AsyncStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
final BufferManager bufferManager,
final FlushFailure flushFailure,
final String defaultNamespace,
final ExecutorService workerPool) {
final ExecutorService workerPool,
final StreamAwareDataTransformer dataTransformer) {
this.defaultNamespace = defaultNamespace;
hasStarted = false;
hasClosed = false;
Expand All @@ -114,18 +149,7 @@ public AsyncStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
new FlushWorkers(bufferManager.getBufferDequeue(), flusher, outputRecordCollector, flushFailure, bufferManager.getStateManager(), workerPool);
streamNames = StreamDescriptorUtils.fromConfiguredCatalog(catalog);
this.recordCounts = new ConcurrentHashMap<>();
}

@VisibleForTesting
public AsyncStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
final OnStartFunction onStart,
final OnCloseFunction onClose,
final DestinationFlushFunction flusher,
final ConfiguredAirbyteCatalog catalog,
final BufferManager bufferManager,
final FlushFailure flushFailure,
final String defaultNamespace) {
this(outputRecordCollector, onStart, onClose, flusher, catalog, bufferManager, flushFailure, defaultNamespace, Executors.newFixedThreadPool(5));
this.dataTransformer = dataTransformer;
}

@Override
Expand All @@ -148,7 +172,7 @@ public void accept(final String messageString, final Integer sizeInBytes) throws
* to try to use a thread pool to partially deserialize to get record type and stream name, we can
* do it without touching buffer manager.
*/
final var message = deserializeAirbyteMessage(messageString);
final var message = deserializeAirbyteMessage(messageString, this.dataTransformer);
if (Type.RECORD.equals(message.getType())) {
if (Strings.isNullOrEmpty(message.getRecord().getNamespace())) {
message.getRecord().setNamespace(defaultNamespace);
Expand All @@ -160,41 +184,6 @@ public void accept(final String messageString, final Integer sizeInBytes) throws
bufferEnqueue.addRecord(message, sizeInBytes + PARTIAL_DESERIALIZE_REF_BYTES, defaultNamespace);
}

/**
* Deserializes to a {@link PartialAirbyteMessage} which can represent both a Record or a State
* Message
*
* PartialAirbyteMessage holds either:
* <li>entire serialized message string when message is a valid State Message
* <li>serialized AirbyteRecordMessage when message is a valid Record Message</li>
*
* @param messageString the string to deserialize
* @return PartialAirbyteMessage if the message is valid, empty otherwise
*/
@VisibleForTesting
public static PartialAirbyteMessage deserializeAirbyteMessage(final String messageString) {
// TODO: (ryankfu) plumb in the serialized AirbyteStateMessage to match AirbyteRecordMessage code
// parity. https://github.com/airbytehq/airbyte/issues/27530 for additional context
final var partial = Jsons.tryDeserializeExact(messageString, PartialAirbyteMessage.class)
.orElseThrow(() -> new RuntimeException("Unable to deserialize PartialAirbyteMessage."));

final var msgType = partial.getType();
if (Type.RECORD.equals(msgType) && partial.getRecord().getData() != null) {
// store serialized json
partial.withSerialized(partial.getRecord().getData().toString());
// The connector doesn't need to be able to access to the record value. We can serialize it here and
// drop the json
// object. Having this data stored as a string is slightly more optimal for the memory usage.
partial.getRecord().setData(null);
} else if (Type.STATE.equals(msgType)) {
partial.withSerialized(messageString);
} else {
throw new RuntimeException(String.format("Unsupported message type: %s", msgType));
}

return partial;
}

@Override
public void close() throws Exception {
Preconditions.checkState(hasStarted, "Cannot close; has not started.");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.cdk.integrations.destination_async.deser

import com.google.common.annotations.VisibleForTesting
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage
import io.airbyte.commons.json.Jsons
import io.airbyte.protocol.models.v0.AirbyteMessage

object DeserializationUtil {
/**
* Deserializes to a [PartialAirbyteMessage] which can represent both a Record or a State
* Message
*
* PartialAirbyteMessage holds either:
* * entire serialized message string when message is a valid State Message
* * serialized AirbyteRecordMessage when message is a valid Record Message
*
* @param messageString the string to deserialize
* @return PartialAirbyteMessage if the message is valid, empty otherwise
*/
@JvmStatic
@VisibleForTesting
fun deserializeAirbyteMessage(
messageString: String?,
dataTransformer: StreamAwareDataTransformer
): PartialAirbyteMessage {
// TODO: This is doing some sketchy assumptions by deserializing either the whole or the
// partial based on type.
// Use JsonSubTypes and extend StdDeserializer to properly handle this.
// Make immutability a first class citizen in the PartialAirbyteMessage class.
val partial =
Jsons.tryDeserializeExact(messageString, PartialAirbyteMessage::class.java)
.orElseThrow { RuntimeException("Unable to deserialize PartialAirbyteMessage.") }

val msgType = partial.type
if (AirbyteMessage.Type.RECORD == msgType && partial.record.data != null) {
gisripa marked this conversation as resolved.
Show resolved Hide resolved
// Transform data provided by destination.
val transformedData =
dataTransformer.transform(
partial.record.streamDescriptor,
partial.record.data,
partial.record.meta
)
// store serialized json & meta
partial.withSerialized(Jsons.serialize(transformedData.getLeft()))
partial.record.meta = transformedData.getRight()
// The connector doesn't need to be able to access to the record value. We can serialize
// it here and
// drop the json
// object. Having this data stored as a string is slightly more optimal for the memory
// usage.
partial.record.data = null
} else if (AirbyteMessage.Type.STATE == msgType) {
partial.withSerialized(messageString)
} else {
throw RuntimeException(String.format("Unsupported message type: %s", msgType))
}

return partial
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.destination_async.deser;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import org.apache.commons.lang3.tuple.ImmutablePair;

/**
* Identity transformer which echoes back the original data and meta.
*/
public class IdentityDataTransformer implements StreamAwareDataTransformer {

@Override
public ImmutablePair<JsonNode, AirbyteRecordMessageMeta> transform(StreamDescriptor streamDescriptor,
JsonNode data,
AirbyteRecordMessageMeta meta) {
return ImmutablePair.of(data, meta);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.destination_async.deser;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import org.apache.commons.lang3.tuple.ImmutablePair;

public interface StreamAwareDataTransformer {

/**
* Transforms the input data by applying destination limitations and populating
* {@link AirbyteRecordMessageMeta}. The returned pair contains the transformed data and the merged
* meta information from upstream.
*
* @param streamDescriptor
* @param data
* @param meta
* @return
*/
ImmutablePair<JsonNode, AirbyteRecordMessageMeta> transform(StreamDescriptor streamDescriptor, JsonNode data, AirbyteRecordMessageMeta meta);

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.util.Objects;

Expand All @@ -27,6 +28,9 @@ public class PartialAirbyteRecordMessage {
@JsonPropertyDescription("when the data was emitted from the source. epoch in millisecond.")
private long emittedAt;

@JsonProperty("meta")
private AirbyteRecordMessageMeta meta;

public PartialAirbyteRecordMessage() {}

@JsonProperty("namespace")
Expand Down Expand Up @@ -89,6 +93,14 @@ public PartialAirbyteRecordMessage withEmittedAt(final Long emittedAt) {
return this;
}

public AirbyteRecordMessageMeta getMeta() {
return meta;
}

public void setMeta(AirbyteRecordMessageMeta meta) {
this.meta = meta;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down
Loading
Loading