Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement support for append_dedup in iceberg dest #48731

Merged
merged 3 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,25 +93,27 @@ class AirbyteTypeToIcebergSchema {
}

fun ObjectType.toIcebergSchema(primaryKeys: List<List<String>>): Schema {
val mutableListOf = mutableListOf<NestedField>()
val fields = mutableListOf<NestedField>()
val identifierFields = mutableSetOf<Int>()
val identifierFieldNames = primaryKeys.flatten().toSet()
val icebergTypeConverter = AirbyteTypeToIcebergSchema()
this.properties.entries.forEach { (name, field) ->
val id = generatedSchemaFieldId()
mutableListOf.add(
val isPrimaryKey = identifierFieldNames.contains(name)
val isOptional = !isPrimaryKey && field.nullable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi we tried enforcing PK requiredness at one point and it caused a lot of problems (there's a surprising number of sources that emit null PKs). If iceberg supports null values in a partition key, we should do that instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just wont allow you to mark a field as an identifier field if its nullable. Only non-nullable fields can be identifier fields

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ugh. I think we Officially Support composite PKs with nullable fields #31926 (see also slack #p0-primay-keys-cannot-be-null)

so that's going to be awkward

fields.add(
NestedField.of(
id,
field.nullable,
isOptional,
name,
icebergTypeConverter.convert(field.type),
),
)
if (identifierFieldNames.contains(name)) {
if (isPrimaryKey) {
identifierFields.add(id)
}
}
return Schema(mutableListOf, identifierFields)
return Schema(fields, identifierFields)
}

private fun generatedSchemaFieldId() = UUID.randomUUID().hashCode()
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,13 @@ class AirbyteValueToIcebergRecord {
}

fun ObjectValue.toIcebergRecord(schema: Schema): GenericRecord {
val create = GenericRecord.create(schema)
val record = GenericRecord.create(schema)
val airbyteValueToIcebergRecord = AirbyteValueToIcebergRecord()
schema.asStruct().fields().forEach { field ->
val value = this.values.get(field.name())
val value = this.values[field.name()]
if (value != null) {
create.setField(field.name(), airbyteValueToIcebergRecord.convert(value, field.type()))
record.setField(field.name(), airbyteValueToIcebergRecord.convert(value, field.type()))
}
}
return create
return record
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.BaseTaskWriter;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;

/**
* Implementation of the Iceberg {@link BaseTaskWriter} that handles delta-based updates (insert,
Expand Down Expand Up @@ -57,16 +59,26 @@ public InternalRecordWrapper wrapper() {
return wrapper;
}

public Record constructIdentifierRecord(Record row) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: does this need to be public?

final GenericRecord recordWithIds = GenericRecord.create(deleteSchema);

for (final Types.NestedField idField : deleteSchema.columns()) {
recordWithIds.setField(idField.name(), row.getField(idField.name()));
}

return recordWithIds;
}

@Override
public void write(final Record row) throws IOException {
final RowDataDeltaWriter writer = route(row);
final Operation rowOperation = getOperation(row);
if (rowOperation == Operation.INSERT) {
writer.write(row);
} else if (rowOperation == Operation.DELETE) {
writer.deleteKey(row);
writer.deleteKey(constructIdentifierRecord(row));
} else {
writer.deleteKey(row);
writer.deleteKey(constructIdentifierRecord(row));
writer.write(row);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class IcebergV2SpecificationExtension : DestinationSpecificationExtension {
listOf(
DestinationSyncMode.OVERWRITE,
DestinationSyncMode.APPEND,
DestinationSyncMode.APPEND_DEDUP
)
override val supportsIncremental = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import io.airbyte.cdk.load.command.Dedupe
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.command.ImportType
import io.airbyte.cdk.load.data.MapperPipeline
import io.airbyte.cdk.load.data.NullValue
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.data.iceberg.parquet.toIcebergRecord
import io.airbyte.cdk.load.data.iceberg.parquet.toIcebergSchema
Expand Down Expand Up @@ -236,7 +237,8 @@ class IcebergUtil {
): Operation =
if (
record.data is ObjectValue &&
(record.data as ObjectValue).values[AIRBYTE_CDC_DELETE_COLUMN] != null
(record.data as ObjectValue).values[AIRBYTE_CDC_DELETE_COLUMN] != null &&
(record.data as ObjectValue).values[AIRBYTE_CDC_DELETE_COLUMN] !is NullValue
) {
Operation.DELETE
} else if (importType is Dedupe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.airbyte.integrations.destination.iceberg.v2.io

import org.apache.iceberg.FileFormat
import org.apache.iceberg.PartitionKey
import org.apache.iceberg.PartitionSpec
import org.apache.iceberg.Schema
import org.apache.iceberg.data.Record
Expand Down Expand Up @@ -39,7 +38,7 @@ class UnpartitionedDeltaWriter(
identifierFieldIds
) {

private val writer = RowDataDeltaWriter(PartitionKey(spec, schema))
private val writer = RowDataDeltaWriter(null)

override fun route(row: Record): RowDataDeltaWriter {
return writer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,5 @@
"supportsIncremental" : true,
"supportsNormalization" : false,
"supportsDBT" : false,
"supported_destination_sync_modes" : [ "overwrite", "append" ]
"supported_destination_sync_modes" : [ "overwrite", "append", "append_dedup" ]
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,5 @@
"supportsIncremental" : true,
"supportsNormalization" : false,
"supportsDBT" : false,
"supported_destination_sync_modes" : [ "overwrite", "append" ]
"supported_destination_sync_modes" : [ "overwrite", "append", "append_dedup" ]
}
Loading