-
Notifications
You must be signed in to change notification settings - Fork 53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-35500][Connectors/DynamoDB] DynamoDb Table API Sink fails to delete elements due to key not found #152
base: main
Are you sure you want to change the base?
Changes from all commits
07e5746
68f1253
aa030f3
5372665
cdfcd05
621df92
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -303,6 +303,46 @@ WITH ( | |
); | ||
``` | ||
|
||
## Specifying Primary Key For Deletes | ||
|
||
The DynamoDB sink supports Delete requests to DynamoDB, but AWS requires that a Dynamo Delete request contain **only** the key field(s), or else the Delete request will fail with `DynamoDbException: The provided key element does not match the schema`. | ||
Thus, if a Changelog stream being is being written to DynamoDB that contains DELETEs, you must specify the `PRIMARY KEY` on the table. | ||
This `PRIMARY KEY` specified for the Flink SQL Table must match the actual Primary Key of the DynamoDB table - so it must be either just the Partition Key, or in case of a composite primary key, it must be the Partition Key and Sort Key. | ||
|
||
Example For Partition Key as only Primary Key: | ||
```sql | ||
CREATE TABLE DynamoDbTable ( | ||
`user_id` BIGINT, | ||
`item_id` BIGINT, | ||
`category_id` BIGINT, | ||
`behavior` STRING, | ||
PRIMARY KEY (user_id) NOT ENFORCED | ||
) | ||
WITH ( | ||
'connector' = 'dynamodb', | ||
'table-name' = 'user_behavior', | ||
'aws.region' = 'us-east-2' | ||
); | ||
``` | ||
|
||
Example For Partition Key and Sort Key as Composite Primary Key: | ||
```sql | ||
CREATE TABLE DynamoDbTable ( | ||
`user_id` BIGINT, | ||
`item_id` BIGINT, | ||
`category_id` BIGINT, | ||
`behavior` STRING, | ||
PRIMARY KEY (user_id, item_id) NOT ENFORCED | ||
) | ||
WITH ( | ||
'connector' = 'dynamodb', | ||
'table-name' = 'user_behavior', | ||
'aws.region' = 'us-east-2' | ||
); | ||
``` | ||
|
||
Note that this Primary Key functionality, specified by `PRIMARY KEY`, can be used alongside the Sink Partitioning mentioned above via `PARTITIONED BY` to dedeuplicate data and support DELETEs. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would clarify that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, |
||
|
||
## Notice | ||
|
||
The current implementation of the DynamoDB SQL connector is write-only and doesn't provide an implementation for source queries. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -51,6 +51,7 @@ public class DynamoDbDynamicSink extends AsyncDynamicTableSink<DynamoDbWriteRequ | |
private final Properties dynamoDbClientProperties; | ||
private final DataType physicalDataType; | ||
private final Set<String> overwriteByPartitionKeys; | ||
private final Set<String> primaryKeys; | ||
|
||
protected DynamoDbDynamicSink( | ||
@Nullable Integer maxBatchSize, | ||
|
@@ -62,7 +63,8 @@ protected DynamoDbDynamicSink( | |
boolean failOnError, | ||
Properties dynamoDbClientProperties, | ||
DataType physicalDataType, | ||
Set<String> overwriteByPartitionKeys) { | ||
Set<String> overwriteByPartitionKeys, | ||
Set<String> primaryKeys) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In DynamoDB nomenclature, this set of properties is a primaryKey (not primaryKeys). Let's use a proper name There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To be clear, you're just suggesting changing the variable name from |
||
super( | ||
maxBatchSize, | ||
maxInFlightRequests, | ||
|
@@ -74,6 +76,7 @@ protected DynamoDbDynamicSink( | |
this.dynamoDbClientProperties = dynamoDbClientProperties; | ||
this.physicalDataType = physicalDataType; | ||
this.overwriteByPartitionKeys = overwriteByPartitionKeys; | ||
this.primaryKeys = primaryKeys; | ||
} | ||
|
||
@Override | ||
|
@@ -89,7 +92,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { | |
.setFailOnError(failOnError) | ||
.setOverwriteByPartitionKeys(new ArrayList<>(overwriteByPartitionKeys)) | ||
.setDynamoDbProperties(dynamoDbClientProperties) | ||
.setElementConverter(new RowDataElementConverter(physicalDataType)); | ||
.setElementConverter( | ||
new RowDataElementConverter(physicalDataType, primaryKeys)); | ||
|
||
addAsyncOptionsToSinkBuilder(builder); | ||
|
||
|
@@ -108,7 +112,8 @@ public DynamicTableSink copy() { | |
failOnError, | ||
dynamoDbClientProperties, | ||
physicalDataType, | ||
overwriteByPartitionKeys); | ||
overwriteByPartitionKeys, | ||
primaryKeys); | ||
} | ||
|
||
@Override | ||
|
@@ -136,6 +141,7 @@ public static class DynamoDbDynamicTableSinkBuilder | |
private Properties dynamoDbClientProperties; | ||
private DataType physicalDataType; | ||
private Set<String> overwriteByPartitionKeys; | ||
private Set<String> primaryKeys; | ||
|
||
public DynamoDbDynamicTableSinkBuilder setTableName(String tableName) { | ||
this.tableName = tableName; | ||
|
@@ -164,6 +170,11 @@ public DynamoDbDynamicTableSinkBuilder setOverwriteByPartitionKeys( | |
return this; | ||
} | ||
|
||
public DynamoDbDynamicTableSinkBuilder setPrimaryKeys(Set<String> primaryKeys) { | ||
this.primaryKeys = primaryKeys; | ||
return this; | ||
} | ||
|
||
@Override | ||
public AsyncDynamicTableSink<DynamoDbWriteRequest> build() { | ||
return new DynamoDbDynamicSink( | ||
|
@@ -176,7 +187,8 @@ public AsyncDynamicTableSink<DynamoDbWriteRequest> build() { | |
failOnError, | ||
dynamoDbClientProperties, | ||
physicalDataType, | ||
overwriteByPartitionKeys); | ||
overwriteByPartitionKeys, | ||
primaryKeys); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,6 +58,17 @@ public DynamicTableSink createDynamicTableSink(Context context) { | |
.setDynamoDbClientProperties( | ||
dynamoDbConfiguration.getSinkClientProperties()); | ||
|
||
if (catalogTable.getResolvedSchema().getPrimaryKey().isPresent()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IN case of DynamoDB contents of PrimaryKey and PARTITION BY should be the same to ensure semantic correctness of various operations. To avoid repetititiveness and simplify the usage, we should aim to use just PrimaryKey property, as it's a fairly intuitive and well understood concept for DDB users. In the meantime, we should probably aim to start deprecation process of the The first step could be
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @dzikosc - Isn't there a potential use case wherein a user of the existing
Today, the user of the connector could specify a Either way, could we keep the deprecation of |
||
builder = | ||
builder.setPrimaryKeys( | ||
new HashSet<>( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. DynamoDB primary key is an ordered set of properties - partitionKey and then sortKey. Let's model it like that. I would suggest using those as dedicated properties instead in the Sink model. We could add an extra validation here to ensure consistency with DDB schema.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 Suggestion: if not too complicated, for a user perspective it would be clearer a no-nonsense passing a PrimaryKey object with two fields, partitionKey and sortKey, where partitionKey There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @nicusX - While having a dedicated I don't think there's any reason here why we would need to separate the partition key and sort key for the purpose here of identifying the Primary Key - and in fact, there is also some naming collision then as well as the current Table API / SQL connector supports passing in a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ack for not having an object with separate fields. |
||
catalogTable | ||
.getResolvedSchema() | ||
.getPrimaryKey() | ||
.get() | ||
.getColumns())); | ||
} | ||
|
||
addAsyncOptionsToBuilder(dynamoDbConfiguration.getAsyncSinkProperties(), builder); | ||
|
||
return builder.build(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we use NOT ENFORCED qualifier? In case of DynamoDB Primary Key property (or properties) are always present on the record.