Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JDBC Destinations: Add airbyte_meta to raw table #35648

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,14 @@ protected BaseSerializedBuffer(final BufferStorage bufferStorage) throws Excepti
* TODO: (ryankfu) move destination to use serialized record string instead of passing entire
* AirbyteRecord
*
* @param recordString serialized record
* @param emittedAt timestamp of the record in milliseconds
* @param recordString serialized record
* @param airbyteMetaString
* @param emittedAt timestamp of the record in milliseconds
* @throws IOException
*/
protected void writeRecord(final String recordString, final long emittedAt) throws IOException {
protected void writeRecord(final String recordString, String airbyteMetaString, final long emittedAt) throws IOException {
// TODO Why are we deserializing as an airbyte record message? recordString should just be a naked data blob.
// is this code ever actually called? do we always override it? can we make this method abstract?
writeRecord(Jsons.deserialize(recordString, AirbyteRecordMessage.class).withEmittedAt(emittedAt));
}

Expand Down Expand Up @@ -111,7 +114,7 @@ public long accept(final AirbyteRecordMessage record) throws Exception {
}

@Override
public long accept(final String recordString, final long emittedAt) throws Exception {
public long accept(final String recordString, final String airbyteMetaString, final long emittedAt) throws Exception {
if (!isStarted) {
if (useCompression) {
compressedBuffer = new GzipCompressorOutputStream(byteCounter);
Expand All @@ -123,7 +126,7 @@ public long accept(final String recordString, final long emittedAt) throws Excep
}
if (inputStream == null && !isClosed) {
final long startCount = byteCounter.getCount();
writeRecord(recordString, emittedAt);
writeRecord(recordString, airbyteMetaString, emittedAt);
return byteCounter.getCount() - startCount;
} else {
throw new IllegalCallerException("Buffer is already closed, it cannot accept more messages");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@ public interface SerializableBuffer extends AutoCloseable {
* the entire AirbyteRecordMessage
*
* @param recordString serialized record
* @param airbyteMetaString The serialized airbyte_meta entry
* @param emittedAt timestamp of the record in milliseconds
* @return number of bytes written to the buffer
* @throws Exception
*/
long accept(String recordString, long emittedAt) throws Exception;
long accept(String recordString, String airbyteMetaString, long emittedAt) throws Exception;

/**
* Flush a buffer implementation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.util.Objects;

Expand All @@ -27,6 +28,9 @@ public class PartialAirbyteRecordMessage {
@JsonPropertyDescription("when the data was emitted from the source. epoch in millisecond.")
private long emittedAt;

@JsonProperty("meta")
private AirbyteRecordMessageMeta meta;

public PartialAirbyteRecordMessage() {}

@JsonProperty("namespace")
Expand Down Expand Up @@ -89,6 +93,10 @@ public PartialAirbyteRecordMessage withEmittedAt(final Long emittedAt) {
return this;
}

public AirbyteRecordMessageMeta getMeta() {
return meta;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
import io.airbyte.integrations.base.destination.typing_deduping.NoopTyperDeduper;
import io.airbyte.integrations.base.destination.typing_deduping.NoopV2TableMigrator;
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator;
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
Expand All @@ -54,7 +56,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractJdbcDestination extends JdbcConnector implements Destination {
public abstract class AbstractJdbcDestination<DestinationState extends MinimumDestinationState>
extends JdbcConnector implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractJdbcDestination.class);

Expand Down Expand Up @@ -254,9 +257,19 @@ private void assertCustomParametersDontOverwriteDefaultParameters(final Map<Stri

protected abstract JdbcSqlGenerator getSqlGenerator();

protected abstract JdbcDestinationHandler<? extends MinimumDestinationState> getDestinationHandler(final String databaseName,
final JdbcDatabase database,
final String rawTableSchema);
protected abstract JdbcDestinationHandler<DestinationState> getDestinationHandler(final String databaseName,
final JdbcDatabase database,
final String rawTableSchema);

/**
* Provide any migrations that the destination needs to run. Most destinations will need to provide an instande of
* {@link io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcV1V2Migrator} at minimum.
*/
protected abstract List<Migration<DestinationState>> getMigrations(
final JdbcDatabase database,
final String databaseName,
final SqlGenerator sqlGenerator,
final DestinationHandler<DestinationState> destinationHandler);

/**
* "database" key at root of the config json, for any other variants in config, override this
Expand Down Expand Up @@ -321,15 +334,16 @@ private TyperDeduper getV2TyperDeduper(final JsonNode config, final ConfiguredAi
final String databaseName = getDatabaseName(config);
final var migrator = new JdbcV1V2Migrator(namingResolver, database, databaseName);
final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator();
final DestinationHandler<? extends MinimumDestinationState> destinationHandler =
final DestinationHandler<DestinationState> destinationHandler =
getDestinationHandler(databaseName, database, rawNamespaceOverride.orElse(DEFAULT_AIRBYTE_INTERNAL_NAMESPACE));
final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
final TyperDeduper typerDeduper;
List<Migration<DestinationState>> migrations = getMigrations(database, databaseName, sqlGenerator, destinationHandler);
if (disableTypeDedupe) {
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, List.of());
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, migrations);
} else {
typerDeduper =
new DefaultTyperDeduper<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, List.of());
new DefaultTyperDeduper<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, migrations);
}
return typerDeduper;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,18 @@ protected String createTableQueryV2(final String schemaName, final String tableN
CREATE TABLE IF NOT EXISTS %s.%s (
%s VARCHAR PRIMARY KEY,
%s JSONB,
%s JSONB,
%s TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
%s TIMESTAMP WITH TIME ZONE DEFAULT NULL
);
""",
schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, JavaBaseConstants.COLUMN_NAME_DATA,
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT);
schemaName,
tableName,
JavaBaseConstants.COLUMN_NAME_AB_RAW_ID,
JavaBaseConstants.COLUMN_NAME_DATA,
JavaBaseConstants.COLUMN_NAME_AB_META,
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT,
JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT);
}

// TODO: This method seems to be used by Postgres and others while staging to local temp files.
Expand All @@ -133,9 +139,10 @@ protected void writeBatchToFile(final File tmpFile, final List<PartialAirbyteMes
// TODO we only need to do this is formatData is overridden. If not, we can just do jsonData =
// record.getSerialized()
final var jsonData = Jsons.serialize(formatData(Jsons.deserializeExact(record.getSerialized())));
final var airbyteMeta = Jsons.serialize(record.getRecord().getMeta());
final var extractedAt = Timestamp.from(Instant.ofEpochMilli(record.getRecord().getEmittedAt()));
if (TypingAndDedupingFlag.isDestinationV2()) {
csvPrinter.printRecord(uuid, jsonData, extractedAt, null);
csvPrinter.printRecord(uuid, jsonData, airbyteMeta, extractedAt, null);
} else {
csvPrinter.printRecord(uuid, jsonData, extractedAt);
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/java/airbyte-cdk/dependencies/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ dependencies {
api 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
api 'com.google.guava:guava:33.0.0-jre'
api 'commons-io:commons-io:2.15.1'
api ('io.airbyte.airbyte-protocol:protocol-models:0.5.0') { exclude group: 'com.google.api-client', module: 'google-api-client' }
api ('io.airbyte.airbyte-protocol:protocol-models:0.6.0') { exclude group: 'com.google.api-client', module: 'google-api-client' }
api 'javax.annotation:javax.annotation-api:1.3.2'
api 'org.apache.commons:commons-compress:1.25.0'
api 'org.apache.commons:commons-lang3:3.14.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public List<Object> getDataRow(final JsonNode formattedData) {
return new LinkedList<>(getRecordColumns(formattedData));
}

public List<Object> getDataRow(final UUID id, final String formattedString, final long emittedAt) {
@Override
public List<Object> getDataRow(final UUID id, final String formattedString, final String airbyteMetaString, final long emittedAt) {
throw new UnsupportedOperationException("Not implemented in BaseSheetGenerator");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ protected void writeRecord(final AirbyteRecordMessage record) throws IOException
}

@Override
protected void writeRecord(final String recordString, final long emittedAt) throws IOException {
csvPrinter.printRecord(csvSheetGenerator.getDataRow(UUID.randomUUID(), recordString, emittedAt));
protected void writeRecord(final String recordString, String airbyteMetaString, final long emittedAt) throws IOException {
csvPrinter.printRecord(csvSheetGenerator.getDataRow(UUID.randomUUID(), recordString, airbyteMetaString, emittedAt));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public interface CsvSheetGenerator {

List<Object> getDataRow(JsonNode formattedData);

List<Object> getDataRow(UUID id, String formattedString, long emittedAt);
List<Object> getDataRow(UUID id, String formattedString, String formattedAirbyteMetaString, long emittedAt);

final class Factory {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public List<String> getHeaderRow() {

@Override
public List<Object> getDataRow(final UUID id, final AirbyteRecordMessage recordMessage) {
return getDataRow(id, Jsons.serialize(recordMessage.getData()), recordMessage.getEmittedAt());
return getDataRow(id, Jsons.serialize(recordMessage.getData()), Jsons.serialize(recordMessage.getMeta()), recordMessage.getEmittedAt());
}

@Override
Expand All @@ -57,13 +57,14 @@ public List<Object> getDataRow(final JsonNode formattedData) {
}

@Override
public List<Object> getDataRow(final UUID id, final String formattedString, final long emittedAt) {
public List<Object> getDataRow(final UUID id, final String formattedString, String formattedAirbyteMetaString, final long emittedAt) {
if (useDestinationsV2Columns) {
return List.of(
id,
Instant.ofEpochMilli(emittedAt),
"",
formattedString);
formattedString,
formattedAirbyteMetaString);
} else {
return List.of(
id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public long accept(final AirbyteRecordMessage record) throws Exception {
}

@Override
public long accept(final String recordString, final long emittedAt) throws Exception {
public long accept(final String recordString, final String airbyteMetaString, final long emittedAt) throws Exception {
throw new UnsupportedOperationException("This method is not supported for ParquetSerializedBuffer");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void flush(final StreamDescriptor decs, final Stream<PartialAirbyteMessag
// todo (cgardens) - most writers just go ahead and re-serialize the contents of the record message.
// we should either just pass the raw string or at least have a way to do that and create a default
// impl that maintains backwards compatible behavior.
writer.accept(record.getSerialized(), record.getRecord().getEmittedAt());
writer.accept(record.getSerialized(), Jsons.serialize(record.getRecord().getMeta()), record.getRecord().getEmittedAt());
} catch (final Exception e) {
throw new RuntimeException(e);
}
Expand Down
Loading