Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notification API reject out of order notifications #232

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.polaris.core.entity;

import com.fasterxml.jackson.annotation.JsonIgnore;
import java.util.Optional;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.RESTUtil;
Expand All @@ -31,6 +32,9 @@ public class TableLikeEntity extends PolarisEntity {
public static final String USER_SPECIFIED_WRITE_DATA_LOCATION_KEY = "write.data.path";
public static final String USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY = "write.metadata.path";

public static final String LAST_ADMITTED_NOTIFICATION_TIMESTAMP_KEY =
"last-notification-timestamp";

public TableLikeEntity(PolarisBaseEntity sourceEntity) {
super(sourceEntity);
}
Expand Down Expand Up @@ -63,6 +67,13 @@ public String getMetadataLocation() {
return getInternalPropertiesAsMap().get(METADATA_LOCATION_KEY);
}

@JsonIgnore
public Optional<Long> getLastAdmittedNotificationTimestamp() {
return Optional.ofNullable(
getInternalPropertiesAsMap().get(LAST_ADMITTED_NOTIFICATION_TIMESTAMP_KEY))
.map(Long::parseLong);
}

@JsonIgnore
public String getBaseLocation() {
return getPropertiesAsMap().get(PolarisEntityConstants.ENTITY_BASE_LOCATION);
Expand Down Expand Up @@ -109,5 +120,10 @@ public Builder setMetadataLocation(String location) {
internalProperties.put(METADATA_LOCATION_KEY, location);
return this;
}

public Builder setLastNotificationTimestamp(long timestamp) {
internalProperties.put(LAST_ADMITTED_NOTIFICATION_TIMESTAMP_KEY, String.valueOf(timestamp));
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1874,10 +1874,23 @@ private boolean sendNotificationForTableLike(
.getMetaStoreManager()
.generateNewEntityId(getCurrentPolarisContext())
.getId())
.setLastNotificationTimestamp(request.getPayload().getTimestamp())
.build();
} else {
// If the notification timestamp is out-of-order, we should not update the table
if (entity.getLastAdmittedNotificationTimestamp().isPresent()
&& request.getPayload().getTimestamp()
<= entity.getLastAdmittedNotificationTimestamp().get()) {
throw new AlreadyExistsException(
"A notification with a newer timestamp has been admitted for table %s",
tableIdentifier);
}
existingLocation = entity.getMetadataLocation();
entity = new TableLikeEntity.Builder(entity).setMetadataLocation(newLocation).build();
entity =
new TableLikeEntity.Builder(entity)
.setMetadataLocation(newLocation)
.setLastNotificationTimestamp(request.getPayload().getTimestamp())
.build();
}
// first validate we can read the metadata file
validateLocationForTableLike(tableIdentifier, newLocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.BadRequestException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.ForbiddenException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.inmemory.InMemoryFileIO;
Expand Down Expand Up @@ -623,7 +624,7 @@ public void testUpdateNotificationCreateTableInExternalLocation() {
update.setMetadataLocation(maliciousMetadataFile);
update.setTableName(table.name());
update.setTableUuid(UUID.randomUUID().toString());
update.setTimestamp(230950845L);
update.setTimestamp(230950849L);
updateRequest.setPayload(update);

Assertions.assertThatThrownBy(() -> catalog.sendNotification(table, updateRequest))
Expand Down Expand Up @@ -913,6 +914,70 @@ public void testUpdateNotificationWhenTableExistsInDisallowedLocation() {
.hasMessageContaining("Invalid location");
}

@Test
public void testUpdateNotificationRejectOutOfOrderTimestamp() {
Assumptions.assumeTrue(
requiresNamespaceCreate(),
"Only applicable if namespaces must be created before adding children");
Assumptions.assumeTrue(
supportsNestedNamespaces(), "Only applicable if nested namespaces are supported");
Assumptions.assumeTrue(
supportsNotifications(), "Only applicable if notifications are supported");

final String tableLocation = "s3://externally-owned-bucket/table/";
final String tableMetadataLocation = tableLocation + "metadata/v1.metadata.json";
BasePolarisCatalog catalog = catalog();

Namespace namespace = Namespace.of("parent", "child1");
TableIdentifier table = TableIdentifier.of(namespace, "table");

long timestamp = 230950845L;
NotificationRequest request = new NotificationRequest();
request.setNotificationType(NotificationType.CREATE);
TableUpdateNotification update = new TableUpdateNotification();
update.setMetadataLocation(tableMetadataLocation);
update.setTableName(table.name());
update.setTableUuid(UUID.randomUUID().toString());
update.setTimestamp(timestamp);
request.setPayload(update);

InMemoryFileIO fileIO = (InMemoryFileIO) catalog.getIo();

fileIO.addFile(
tableMetadataLocation,
TableMetadataParser.toJson(createSampleTableMetadata(tableLocation)).getBytes());

catalog.sendNotification(table, request);

// Send a notification with a timestamp same as that of the previous notification, should fail
NotificationRequest request2 = new NotificationRequest();
request2.setNotificationType(NotificationType.UPDATE);
TableUpdateNotification update2 = new TableUpdateNotification();
update2.setMetadataLocation(tableLocation + "metadata/v2.metadata.json");
update2.setTableName(table.name());
update2.setTableUuid(UUID.randomUUID().toString());
update2.setTimestamp(timestamp);
tzuan16 marked this conversation as resolved.
Show resolved Hide resolved
request2.setPayload(update2);

Assertions.assertThatThrownBy(() -> catalog.sendNotification(table, request2))
.isInstanceOf(CommitFailedException.class)
.hasMessageContaining("Notification timestamp is outdated for table parent.child1.table");

// Verify that DROP notification won't be rejected due to timestamp
NotificationRequest request3 = new NotificationRequest();
request3.setNotificationType(NotificationType.DROP);
TableUpdateNotification update3 = new TableUpdateNotification();
update3.setMetadataLocation(tableLocation + "metadata/v2.metadata.json");
update3.setTableName(table.name());
update3.setTableUuid(UUID.randomUUID().toString());
update3.setTimestamp(timestamp);
request3.setPayload(update3);

Assertions.assertThat(catalog.sendNotification(table, request3))
.as("Drop notification should not fail despite timestamp being outdated")
.isTrue();
}

@Test
public void testUpdateNotificationWhenTableExistsFileSpecifiesDisallowedLocation() {
Assumptions.assumeTrue(
Expand Down
32 changes: 31 additions & 1 deletion spec/rest-catalog-open-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,14 @@ paths:
summary: Sends a notification to the table
operationId: sendNotification
requestBody:
description: The request containing the notification to be sent
description:
The request containing the notification to be sent.

For each table, Polaris will reject any notification where the timestamp in the request body
is older than or equal to the most recent time Polaris has already processed for the table.
The responsibility of ensuring the correct order of timestamps for a sequence of notifications
lies with the caller of the API. This includes managing potential clock skew or inconsistencies
when notifications are sent from multiple sources.
content:
application/json:
schema:
Expand All @@ -1001,6 +1008,16 @@ paths:
examples:
TableToLoadDoesNotExist:
$ref: '#/components/examples/NoSuchTableError'
409:
description:
Conflict - The timestamp of the received notification is older than or equal to the
most recent timestamp Polaris has already processed for the specified table.
content:
application/json:
schema:
$ref: '#/components/schemas/IcebergErrorResponse'
example:
$ref: '#/components/examples/OutOfOrderNotificationError'
419:
$ref: '#/components/responses/AuthenticationTimeoutResponse'
503:
Expand Down Expand Up @@ -3213,6 +3230,7 @@ components:
NotificationRequest:
required:
- notification-type
- payload
properties:
notification-type:
$ref: '#/components/schemas/NotificationType'
Expand Down Expand Up @@ -4130,6 +4148,18 @@ components:
"updates": {"owner": "Raoul"}
}

OutOfOrderNotificationError:
summary:
The timestamp of the received notification is older than or equal to the most recent timestamp
Polaris has already processed for the specified table.
value: {
"error": {
"message": "A notification with a newer timestamp has been admitted for table",
"type": "AlreadyExistsException",
"code": 409
}
}

securitySchemes:
OAuth2:
type: oauth2
Expand Down
Loading