Skip to content

Commit

Permalink
Improvements to prevent data loss in DynamoDB source (opensearch-proj…
Browse files Browse the repository at this point in the history
…ect#3614)

Signed-off-by: Aiden Dai <[email protected]>
  • Loading branch information
daixba authored Nov 12, 2023
1 parent 6e921ad commit cf6b8ee
Show file tree
Hide file tree
Showing 37 changed files with 1,670 additions and 998 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem;

import java.time.Duration;
import java.util.List;
import java.util.Optional;

/**
* The interface to be implemented when creating a new store plugin for a {@link org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator} to use
* to coordinate and save progress for processing source partitions
*
* @since 2.2
*/
public interface SourceCoordinationStore {
Expand All @@ -22,6 +24,17 @@ public interface SourceCoordinationStore {

Optional<SourcePartitionStoreItem> getSourcePartitionItem(final String sourceIdentifier, final String sourcePartitionKey);

/**
* To query a list of partitions based on status and priority.
* Note that it will still return an empty list if nothing was found.
*
* @param sourceIdentifier The identifier for the source
* @param sourcePartitionStatus Status of the partition
* @param startPartitionPriority Start time (priority)
* @return A list of {@link SourcePartitionStoreItem}
*/
List<SourcePartitionStoreItem> querySourcePartitionItemsByStatus(final String sourceIdentifier, final SourcePartitionStatus sourcePartitionStatus, final String startPartitionPriority);

boolean tryCreatePartitionItem(final String sourceIdentifier,
final String partitionKey,
final SourcePartitionStatus sourcePartitionStatus,
Expand All @@ -33,17 +46,19 @@ boolean tryCreatePartitionItem(final String sourceIdentifier,
* 1. The partition status is UNASSIGNED
* 2. The partition status is CLOSED and the reOpenAt timestamp has passed
* 3. The partition status is ASSIGNED and the partitionOwnershipTimeout has passed
*
* @param sourceIdentifier - The identifier for the source
* @param ownerId - The unique owner id for a sub-pipeline
* @param ownerId - The unique owner id for a sub-pipeline
* @param ownershipTimeout The amount of time before the ownership of the acquired partition expires
* @return The partition that was acquired successfully. Empty if no partition could be acquired.
*/
Optional<SourcePartitionStoreItem> tryAcquireAvailablePartition(final String sourceIdentifier, final String ownerId, final Duration ownershipTimeout);

/**
* This method attempts to update the partition item to the desired state
* @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException when the partition was not updated successfully
*
* @param updateItem - The item to update in the source coordination store
* @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException when the partition was not updated successfully
*/
void tryUpdateSourcePartitionItem(final SourcePartitionStoreItem updateItem);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.dataprepper.model.source.coordinator.enhanced;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;

/**
Expand Down Expand Up @@ -38,14 +40,24 @@ public interface EnhancedSourceCoordinator {
* @param partitionType The partition type identifier
* @return A {@link EnhancedSourcePartition} instance
*/
Optional<EnhancedSourcePartition> acquireAvailablePartition(String partitionType);
Optional<EnhancedSourcePartition> acquireAvailablePartition(final String partitionType);


/**
* This method is used to query a list of completed partition items in the coordination store.
*
* @param partitionType Type of partition
* @param fromCompletionTime completed since.
* @return A list of completed partitions
*/
List<EnhancedSourcePartition> queryCompletedPartitions(final String partitionType, final Instant fromCompletionTime);

/**
* This method is used to update progress state for a partition in the coordination store.
* It will also extend the timeout for ownership.
*
* @param partition The partition to be updated.
* @param <T> The progress state class
* @param partition The partition to be updated.
* @param <T> The progress state class
* @param ownershipTimeoutRenewal The amount of time to update ownership of the partition before another instance can acquire it.
* @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException when the partition was not updated successfully
*/
Expand Down Expand Up @@ -94,7 +106,7 @@ public interface EnhancedSourceCoordinator {
* @param partitionKey A unique key for that partition
* @return A {@link EnhancedSourcePartition} instance
*/
Optional<EnhancedSourcePartition> getPartition(String partitionKey);
Optional<EnhancedSourcePartition> getPartition(final String partitionKey);

/**
* This method is to perform initialization for the coordinator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import java.net.UnknownHostException;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;


/**
Expand Down Expand Up @@ -112,9 +114,9 @@ public <T> boolean createPartition(EnhancedSourcePartition<T> partition) {


@Override
public Optional<EnhancedSourcePartition> acquireAvailablePartition(String partitionType) {
public Optional<EnhancedSourcePartition> acquireAvailablePartition(final String partitionType) {
// Not available for global state.
Objects.nonNull(partitionType);
Objects.requireNonNull(partitionType);
LOG.debug("Try to acquire an available {} partition", partitionType);
Optional<SourcePartitionStoreItem> sourceItem = coordinationStore.tryAcquireAvailablePartition(this.sourceIdentifier + "|" + partitionType, hostName, DEFAULT_LEASE_TIMEOUT);
if (sourceItem.isEmpty()) {
Expand All @@ -125,6 +127,23 @@ public Optional<EnhancedSourcePartition> acquireAvailablePartition(String partit
return Optional.of(partitionFactory.apply(sourceItem.get()));
}

@Override
public List<EnhancedSourcePartition> queryCompletedPartitions(final String partitionType, final Instant fromCompletionTime) {
Objects.requireNonNull(partitionType);
LOG.debug("Try to query a list of completed {} partitions", partitionType);
long startTime = System.currentTimeMillis();
List<SourcePartitionStoreItem> sourcePartitionStoreItems = coordinationStore.querySourcePartitionItemsByStatus(
this.sourceIdentifier + "|" + partitionType,
SourcePartitionStatus.COMPLETED,
fromCompletionTime.toString());

List<EnhancedSourcePartition> sourcePartitions = sourcePartitionStoreItems.stream()
.map(sourcePartitionStoreItem -> partitionFactory.apply(sourcePartitionStoreItem))
.collect(Collectors.toList());
long endTime = System.currentTimeMillis();
LOG.info("Query of completed partitions took {} milliseconds with {} items found", endTime - startTime, sourcePartitions.size());
return sourcePartitions;
}

@Override
public <T> void saveProgressStateForPartition(EnhancedSourcePartition<T> partition, final Duration ownershipTimeoutRenewal) {
Expand All @@ -150,7 +169,7 @@ public <T> void saveProgressStateForPartition(EnhancedSourcePartition<T> partiti

@Override
public <T> void giveUpPartition(EnhancedSourcePartition<T> partition) {
Objects.nonNull(partition.getPartitionType());
Objects.requireNonNull(partition.getPartitionType());

LOG.debug("Try to give up the ownership for partition {} (Type {})", partition.getPartitionKey(), partition.getPartitionType());

Expand All @@ -173,7 +192,7 @@ public <T> void giveUpPartition(EnhancedSourcePartition<T> partition) {

@Override
public <T> void completePartition(EnhancedSourcePartition<T> partition) {
Objects.nonNull(partition.getPartitionType());
Objects.requireNonNull(partition.getPartitionType());

LOG.debug("Try to complete partition {} (Type {})", partition.getPartitionKey(), partition.getPartitionType());

Expand All @@ -197,7 +216,7 @@ public <T> void completePartition(EnhancedSourcePartition<T> partition) {
@Override
public <T> void closePartition(EnhancedSourcePartition<T> partition, final Duration reopenAfter, final int maxClosedCount) {

Objects.nonNull(partition.getPartitionType());
Objects.requireNonNull(partition.getPartitionType());

LOG.debug("Try to close partition {} (Type {})", partition.getPartitionKey(), partition.getPartitionType());
if (partition.getSourcePartitionStoreItem() == null) {
Expand Down Expand Up @@ -227,7 +246,7 @@ public <T> void closePartition(EnhancedSourcePartition<T> partition, final Durat


@Override
public Optional<EnhancedSourcePartition> getPartition(String partitionKey) {
public Optional<EnhancedSourcePartition> getPartition(final String partitionKey) {
// Default to Global State only.
final Optional<SourcePartitionStoreItem> sourceItem = coordinationStore.getSourcePartitionItem(this.sourceIdentifier + "|" + DEFAULT_GLOBAL_STATE_PARTITION_TYPE, partitionKey);
if (!sourceItem.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -51,6 +53,7 @@
public class DynamoDbClientWrapper {

private static final Logger LOG = LoggerFactory.getLogger(DynamoDbClientWrapper.class);
private static final int DEFAULT_QUERY_LIMIT = 1000;
static final String SOURCE_STATUS_COMBINATION_KEY_GLOBAL_SECONDARY_INDEX = "source-status";
static final String TTL_ATTRIBUTE_NAME = "expirationTime";

Expand Down Expand Up @@ -90,7 +93,7 @@ public void initializeTable(final DynamoStoreSettings dynamoStoreSettings,
}
}

try(final DynamoDbWaiter dynamoDbWaiter = DynamoDbWaiter.create()) {
try (final DynamoDbWaiter dynamoDbWaiter = DynamoDbWaiter.create()) {
final DescribeTableRequest describeTableRequest = DescribeTableRequest.builder().tableName(dynamoStoreSettings.getTableName()).build();
final ResponseOrException<DescribeTableResponse> response = dynamoDbWaiter
.waitUntilTableExists(describeTableRequest)
Expand Down Expand Up @@ -252,12 +255,36 @@ public Optional<SourcePartitionStoreItem> getAvailablePartition(final String own
}
}
}
} catch( final Exception e){
} catch (final Exception e) {
LOG.error("An exception occurred while attempting to acquire a DynamoDb partition item for {}", sourceStatusCombinationKey, e);
return Optional.empty();
}

return Optional.empty();
}


public List<SourcePartitionStoreItem> queryPartitionsByStatus(final String sourceStatusCombinationKey, final String partitionPriority) {
List<SourcePartitionStoreItem> result = new ArrayList<>();
try {
final DynamoDbIndex<DynamoDbSourcePartitionItem> sourceStatusIndex = table.index(SOURCE_STATUS_COMBINATION_KEY_GLOBAL_SECONDARY_INDEX);
final QueryEnhancedRequest queryEnhancedRequest = QueryEnhancedRequest.builder()
.limit(DEFAULT_QUERY_LIMIT)
.queryConditional(QueryConditional.sortGreaterThan(Key.builder().partitionValue(sourceStatusCombinationKey).sortValue(partitionPriority).build()))
.build();

final SdkIterable<Page<DynamoDbSourcePartitionItem>> availableItems = sourceStatusIndex.query(queryEnhancedRequest);

for (final Page<DynamoDbSourcePartitionItem> page : availableItems) {
for (final DynamoDbSourcePartitionItem item : page.items()) {
result.add(item);
}
}
} catch (final Exception e) {
LOG.error("An exception occurred while attempting to query partition items with {} due to {}", sourceStatusCombinationKey, e);
}

return result;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

/**
* An implementation of {@link org.opensearch.dataprepper.model.source.SourceCoordinationStore} when DynamoDB is used at the distributed store
* for source_coordination
*
* @since 2.2
*/

@DataPrepperPlugin(name = "dynamodb", pluginType = SourceCoordinationStore.class, pluginConfigurationType = DynamoStoreSettings.class )
@DataPrepperPlugin(name = "dynamodb", pluginType = SourceCoordinationStore.class, pluginConfigurationType = DynamoStoreSettings.class)
public class DynamoDbSourceCoordinationStore implements SourceCoordinationStore {

private static final Logger LOG = LoggerFactory.getLogger(DynamoDbSourceCoordinationStore.class);
Expand All @@ -42,9 +44,9 @@ public DynamoDbSourceCoordinationStore(final DynamoStoreSettings dynamoStoreSett
this.dynamoStoreSettings = dynamoStoreSettings;
this.pluginMetrics = pluginMetrics;
this.dynamoDbClientWrapper = DynamoDbClientWrapper.create(
dynamoStoreSettings.getRegion(),
dynamoStoreSettings.getStsRoleArn(),
dynamoStoreSettings.getStsExternalId());
dynamoStoreSettings.getRegion(),
dynamoStoreSettings.getStsRoleArn(),
dynamoStoreSettings.getStsExternalId());
}

@Override
Expand All @@ -58,6 +60,13 @@ public Optional<SourcePartitionStoreItem> getSourcePartitionItem(final String so
return dynamoDbClientWrapper.getSourcePartitionItem(sourceIdentifier, sourcePartitionKey);
}

@Override
public List<SourcePartitionStoreItem> querySourcePartitionItemsByStatus(final String sourceIdentifier, final SourcePartitionStatus sourcePartitionStatus, final String startPartitionPriority) {
String statusKey = String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, sourcePartitionStatus);
return dynamoDbClientWrapper.queryPartitionsByStatus(statusKey, startPartitionPriority);
}


@Override
public boolean tryCreatePartitionItem(final String sourceIdentifier,
final String sourcePartitionKey,
Expand Down
Loading

0 comments on commit cf6b8ee

Please sign in to comment.