Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35500][Connectors/DynamoDB] DynamoDb Table API Sink fails to delete elements due to key not found #152

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions docs/content.zh/docs/connectors/table/dynamodb.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,46 @@ WITH (
);
```

## Specifying Primary Key For Deletes

The DynamoDB sink supports Delete requests to DynamoDB, but AWS requires that a Dynamo Delete request contain **only** the key field(s), or else the Delete request will fail with `DynamoDbException: The provided key element does not match the schema`.
Thus, if a Changelog stream being is being written to DynamoDB that contains DELETEs, you must specify the `PRIMARY KEY` on the table.
This `PRIMARY KEY` specified for the Flink SQL Table must match the actual Primary Key of the DynamoDB table - so it must be either just the Partition Key, or in case of a composite primary key, it must be the Partition Key and Sort Key.

Example For Partition Key as only Primary Key:
```sql
CREATE TABLE DynamoDbTable (
`user_id` BIGINT,
`item_id` BIGINT,
`category_id` BIGINT,
`behavior` STRING,
PRIMARY KEY (user_id) NOT ENFORCED
)
WITH (
'connector' = 'dynamodb',
'table-name' = 'user_behavior',
'aws.region' = 'us-east-2'
);
```

Example For Partition Key and Sort Key as Composite Primary Key:
```sql
CREATE TABLE DynamoDbTable (
`user_id` BIGINT,
`item_id` BIGINT,
`category_id` BIGINT,
`behavior` STRING,
PRIMARY KEY (user_id, item_id) NOT ENFORCED
)
WITH (
'connector' = 'dynamodb',
'table-name' = 'user_behavior',
'aws.region' = 'us-east-2'
);
```

Note that this Primary Key functionality, specified by `PRIMARY KEY`, can be used alongside the Sink Partitioning mentioned above via `PARTITIONED BY` to dedeuplicate data and support DELETEs.

## Notice

The current implementation of the DynamoDB SQL connector is write-only and doesn't provide an implementation for source queries.
Expand Down
40 changes: 40 additions & 0 deletions docs/content/docs/connectors/table/dynamodb.md
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,46 @@ WITH (
);
```

## Specifying Primary Key For Deletes

The DynamoDB sink supports Delete requests to DynamoDB, but AWS requires that a Dynamo Delete request contain **only** the key field(s), or else the Delete request will fail with `DynamoDbException: The provided key element does not match the schema`.
Thus, if a Changelog stream being is being written to DynamoDB that contains DELETEs, you must specify the `PRIMARY KEY` on the table.
This `PRIMARY KEY` specified for the Flink SQL Table must match the actual Primary Key of the DynamoDB table - so it must be either just the Partition Key, or in case of a composite primary key, it must be the Partition Key and Sort Key.

Example For Partition Key as only Primary Key:
```sql
CREATE TABLE DynamoDbTable (
`user_id` BIGINT,
`item_id` BIGINT,
`category_id` BIGINT,
`behavior` STRING,
PRIMARY KEY (user_id) NOT ENFORCED
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we use NOT ENFORCED qualifier? In case of DynamoDB Primary Key property (or properties) are always present on the record.

)
WITH (
'connector' = 'dynamodb',
'table-name' = 'user_behavior',
'aws.region' = 'us-east-2'
);
```

Example For Partition Key and Sort Key as Composite Primary Key:
```sql
CREATE TABLE DynamoDbTable (
`user_id` BIGINT,
`item_id` BIGINT,
`category_id` BIGINT,
`behavior` STRING,
PRIMARY KEY (user_id, item_id) NOT ENFORCED
)
WITH (
'connector' = 'dynamodb',
'table-name' = 'user_behavior',
'aws.region' = 'us-east-2'
);
```

Note that this Primary Key functionality, specified by `PRIMARY KEY`, can be used alongside the Sink Partitioning mentioned above via `PARTITIONED BY` to dedeuplicate data and support DELETEs.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would clarify that PARTITIONED BY is used by Flink for deduplication within the same batch (and to support delete), but it's different from DynamoDB's partitionKey. If the user is familiar with DynamoDB but not much with Flink this may generate lot of confusion.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, PARTITIONED BY and PRIMARY KEY should always be the same.
(see other comment by @dzikosc below)


## Notice

The current implementation of the DynamoDB SQL connector is write-only and doesn't provide an implementation for source queries.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class DynamoDbDynamicSink extends AsyncDynamicTableSink<DynamoDbWriteRequ
private final Properties dynamoDbClientProperties;
private final DataType physicalDataType;
private final Set<String> overwriteByPartitionKeys;
private final Set<String> primaryKeys;

protected DynamoDbDynamicSink(
@Nullable Integer maxBatchSize,
Expand All @@ -62,7 +63,8 @@ protected DynamoDbDynamicSink(
boolean failOnError,
Properties dynamoDbClientProperties,
DataType physicalDataType,
Set<String> overwriteByPartitionKeys) {
Set<String> overwriteByPartitionKeys,
Set<String> primaryKeys) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In DynamoDB nomenclature, this set of properties is a primaryKey (not primaryKeys). Let's use a proper name

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, you're just suggesting changing the variable name from primaryKeys to primaryKey, right? And leaving its data type intact of Set<String>, since a Primary Key can consist of two fields - the Partition Key and the Sort Key?

super(
maxBatchSize,
maxInFlightRequests,
Expand All @@ -74,6 +76,7 @@ protected DynamoDbDynamicSink(
this.dynamoDbClientProperties = dynamoDbClientProperties;
this.physicalDataType = physicalDataType;
this.overwriteByPartitionKeys = overwriteByPartitionKeys;
this.primaryKeys = primaryKeys;
}

@Override
Expand All @@ -89,7 +92,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
.setFailOnError(failOnError)
.setOverwriteByPartitionKeys(new ArrayList<>(overwriteByPartitionKeys))
.setDynamoDbProperties(dynamoDbClientProperties)
.setElementConverter(new RowDataElementConverter(physicalDataType));
.setElementConverter(new RowDataElementConverter(physicalDataType, primaryKeys));

addAsyncOptionsToSinkBuilder(builder);

Expand All @@ -108,7 +111,8 @@ public DynamicTableSink copy() {
failOnError,
dynamoDbClientProperties,
physicalDataType,
overwriteByPartitionKeys);
overwriteByPartitionKeys,
primaryKeys);
}

@Override
Expand Down Expand Up @@ -136,6 +140,7 @@ public static class DynamoDbDynamicTableSinkBuilder
private Properties dynamoDbClientProperties;
private DataType physicalDataType;
private Set<String> overwriteByPartitionKeys;
private Set<String> primaryKeys;

public DynamoDbDynamicTableSinkBuilder setTableName(String tableName) {
this.tableName = tableName;
Expand Down Expand Up @@ -164,6 +169,11 @@ public DynamoDbDynamicTableSinkBuilder setOverwriteByPartitionKeys(
return this;
}

public DynamoDbDynamicTableSinkBuilder setPrimaryKeys(Set<String> primaryKeys) {
this.primaryKeys = primaryKeys;
return this;
}

@Override
public AsyncDynamicTableSink<DynamoDbWriteRequest> build() {
return new DynamoDbDynamicSink(
Expand All @@ -176,7 +186,8 @@ public AsyncDynamicTableSink<DynamoDbWriteRequest> build() {
failOnError,
dynamoDbClientProperties,
physicalDataType,
overwriteByPartitionKeys);
overwriteByPartitionKeys,
primaryKeys);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ public DynamicTableSink createDynamicTableSink(Context context) {
.setDynamoDbClientProperties(
dynamoDbConfiguration.getSinkClientProperties());

if (catalogTable.getResolvedSchema().getPrimaryKey().isPresent()){
builder = builder.setPrimaryKeys(new HashSet<>(catalogTable.getResolvedSchema().getPrimaryKey().get().getColumns()));
}

addAsyncOptionsToBuilder(dynamoDbConfiguration.getAsyncSinkProperties(), builder);

return builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;

import java.util.Set;

/**
* Implementation of an {@link ElementConverter} for the DynamoDb Table sink. The element converter
* maps the Flink internal type of {@link RowData} to a {@link DynamoDbWriteRequest} to be used by
Expand All @@ -36,19 +38,21 @@
public class RowDataElementConverter implements ElementConverter<RowData, DynamoDbWriteRequest> {

private final DataType physicalDataType;
private final Set<String> primaryKeys;
private transient RowDataToAttributeValueConverter rowDataToAttributeValueConverter;

public RowDataElementConverter(DataType physicalDataType) {
public RowDataElementConverter(DataType physicalDataType, Set<String> primaryKeys) {
this.physicalDataType = physicalDataType;
this.primaryKeys = primaryKeys;
this.rowDataToAttributeValueConverter =
new RowDataToAttributeValueConverter(physicalDataType);
new RowDataToAttributeValueConverter(physicalDataType, primaryKeys);
}

@Override
public DynamoDbWriteRequest apply(RowData element, SinkWriter.Context context) {
if (rowDataToAttributeValueConverter == null) {
rowDataToAttributeValueConverter =
new RowDataToAttributeValueConverter(physicalDataType);
new RowDataToAttributeValueConverter(physicalDataType, primaryKeys);
}

DynamoDbWriteRequest.Builder builder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,23 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.dynamodb.table.converter.ArrayAttributeConverterProvider;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.conversion.DataStructureConverters;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.KeyValueDataType;
import org.apache.flink.types.RowKind;

import software.amazon.awssdk.enhanced.dynamodb.AttributeConverterProvider;
import software.amazon.awssdk.enhanced.dynamodb.EnhancedType;
import software.amazon.awssdk.enhanced.dynamodb.TableSchema;
import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableSchema;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.apache.flink.table.data.RowData.createFieldGetter;

Expand All @@ -43,14 +47,35 @@ public class RowDataToAttributeValueConverter {

private final DataType physicalDataType;
private final TableSchema<RowData> tableSchema;
private final Set<String> primaryKeys;

public RowDataToAttributeValueConverter(DataType physicalDataType) {
public RowDataToAttributeValueConverter(DataType physicalDataType, Set<String> primaryKeys) {
robg-eb marked this conversation as resolved.
Show resolved Hide resolved
this.physicalDataType = physicalDataType;
this.primaryKeys = primaryKeys;
this.tableSchema = createTableSchema();
}

public Map<String, AttributeValue> convertRowData(RowData row) {
return tableSchema.itemToMap(row, false);
Map<String, AttributeValue> itemMap = new HashMap<>();
itemMap = tableSchema.itemToMap(row, false);

// In case of DELETE, only the primary key field(s) should be sent in the request
// In order to accomplish this, we need PRIMARY KEY fields to have been set in Table definition.
if (row.getRowKind() == RowKind.DELETE){
if (primaryKeys == null || primaryKeys.isEmpty()) {
throw new TableException("PRIMARY KEY on Table must be set for DynamoDB DELETE operation");
}
Map<String, AttributeValue> pkOnlyMap = new HashMap<String, AttributeValue>();
for (String key : primaryKeys) {
AttributeValue value = itemMap.get(key);
pkOnlyMap.put(key, value);
}
return pkOnlyMap;
}
else {
return itemMap;
}

}

private StaticTableSchema<RowData> createTableSchema() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
Expand All @@ -45,10 +48,16 @@ public class RowDataElementConverterTest {
DataTypes.FIELD("partition_key", DataTypes.STRING()),
DataTypes.FIELD("payload", DataTypes.STRING()));
private static final RowDataElementConverter elementConverter =
new RowDataElementConverter(DATA_TYPE);
new RowDataElementConverter(DATA_TYPE, null);
private static final SinkWriter.Context context = new UnusedSinkWriterContext();
private static final RowDataToAttributeValueConverter rowDataToAttributeValueConverter =
new RowDataToAttributeValueConverter(DATA_TYPE);
new RowDataToAttributeValueConverter(DATA_TYPE, null);

private static final Set<String> primaryKeys = new HashSet<>(Collections.singletonList("partition_key"));
private static final RowDataElementConverter elementConverterWithPK =
new RowDataElementConverter(DATA_TYPE, primaryKeys);
private static final RowDataToAttributeValueConverter rowDataToAttributeValueConverterWithPK =
new RowDataToAttributeValueConverter(DATA_TYPE, primaryKeys);

@Test
void testInsert() {
Expand Down Expand Up @@ -91,11 +100,12 @@ void testUpdateBeforeIsUnsupported() {
@Test
void testDelete() {
RowData rowData = createElement(RowKind.DELETE);
DynamoDbWriteRequest actualWriteRequest = elementConverter.apply(rowData, context);
// In case of DELETE, a set of Primary Key(s) is required.
DynamoDbWriteRequest actualWriteRequest = elementConverterWithPK.apply(rowData, context);
DynamoDbWriteRequest expectedWriterequest =
DynamoDbWriteRequest.builder()
.setType(DynamoDbWriteRequestType.DELETE)
.setItem(rowDataToAttributeValueConverter.convertRowData(rowData))
.setItem(rowDataToAttributeValueConverterWithPK.convertRowData(rowData))
.build();

assertThat(actualWriteRequest).usingRecursiveComparison().isEqualTo(expectedWriterequest);
Expand All @@ -106,7 +116,7 @@ void testAttributeConverterReinitializedAfterSerialization()
throws IOException, ClassNotFoundException {
RowData rowData = createElement(RowKind.INSERT);

RowDataElementConverter originalConverter = new RowDataElementConverter(DATA_TYPE);
RowDataElementConverter originalConverter = new RowDataElementConverter(DATA_TYPE, null);
RowDataElementConverter transformedConverter =
InstantiationUtil.deserializeObject(
InstantiationUtil.serializeObject(originalConverter),
Expand Down
Loading
Loading