Skip to content

Commit

Permalink
Retry on dynamic index creation when an OpenSearchException is thrown
Browse files Browse the repository at this point in the history
Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas committed Oct 23, 2023
1 parent d118cf6 commit e338559
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,8 @@ public void doOutput(final Collection<Record<Event>> records) {
String indexName = configuredIndexAlias;
try {
indexName = indexManager.getIndexName(event.formatString(indexName, expressionEvaluator));
} catch (IOException | EventKeyNotFoundException e) {
LOG.error("There was an exception when constructing the index name. Check the dlq if configured to see details about the affected Event: {}", e.getMessage());
} catch (final Exception e) {
LOG.error("There was an exception when constructing the index name. Check the dlq if configured to see details about the affected Event: {}", e.getMessage(), e);
dynamicIndexDroppedEvents.increment();
logFailureForDlqObjects(List.of(DlqObject.builder()
.withEventHandle(event.getEventHandle())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,25 @@

package org.opensearch.dataprepper.plugins.sink.opensearch.index;

import org.opensearch.client.opensearch._types.OpenSearchException;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSinkConfiguration;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkNotNull;

public class DynamicIndexManager extends AbstractIndexManager {
private static final Logger LOG = LoggerFactory.getLogger(DynamicIndexManager.class);
private static final int INDEX_SETUP_RETRY_WAIT_TIME_MS = 1000;

private Cache<String, IndexManager> indexManagerCache;
final int CACHE_EXPIRE_AFTER_ACCESS_TIME_MINUTES = 30;
final int APPROXIMATE_INDEX_MANAGER_SIZE = 32;
Expand Down Expand Up @@ -72,9 +79,27 @@ public String getIndexName(final String dynamicIndexAlias) throws IOException {
indexManager = indexManagerFactory.getIndexManager(
indexType, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy, fullIndexAlias);
indexManagerCache.put(fullIndexAlias, indexManager);
indexManager.setupIndex();
setupIndexWithRetries(indexManager);
}
return indexManager.getIndexName(fullIndexAlias);
}

private void setupIndexWithRetries(final IndexManager indexManager) throws IOException {
boolean isIndexSetup = false;

while (!isIndexSetup) {
try {
indexManager.setupIndex();
isIndexSetup = true;
} catch (final OpenSearchException e) {
LOG.warn("Failed to setup dynamic index with an exception. ", e);
try {
Thread.sleep(INDEX_SETUP_RETRY_WAIT_TIME_MS);
} catch (final InterruptedException ex) {
LOG.warn("Interrupted while sleeping between index setup retries");
}
}
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.opensearch.client.opensearch._types.OpenSearchException;
import org.opensearch.client.IndicesClient;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.opensearch.OpenSearchClient;
Expand All @@ -28,6 +29,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -165,4 +167,64 @@ public void missingDynamicIndexTest() throws IOException {
JacksonEvent event = JacksonEvent.builder().withEventType(EVENT_TYPE).withData(Map.of(RandomStringUtils.randomAlphabetic(10), DYNAMIC)).build();
assertThrows(EventKeyNotFoundException.class, () -> dynamicIndexManager.getIndexName(event.formatString(configuredIndexAlias)));
}

@Test
public void getIndexName_DoesNotRetryOnNonOpenSearchExceptions() throws IOException {
when(indexConfiguration.getIndexAlias()).thenReturn(INDEX_ALIAS);
String configuredIndexAlias = openSearchSinkConfiguration.getIndexConfiguration().getIndexAlias();
String expectedIndexAlias = INDEX_ALIAS.replace("${" + ID + "}", DYNAMIC);
innerIndexManager = mock(IndexManager.class);
when(mockIndexManagerFactory.getIndexManager(
IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy, expectedIndexAlias)).thenReturn(innerIndexManager);
doThrow(new RuntimeException())
.when(innerIndexManager).setupIndex();

JacksonEvent event = JacksonEvent.builder().withEventType(EVENT_TYPE).withData(Map.of(ID, DYNAMIC)).build();
assertThrows(RuntimeException.class, () -> dynamicIndexManager.getIndexName(event.formatString(configuredIndexAlias)));

verify(innerIndexManager, times(1)).setupIndex();
}

@Test
public void getIndexName_DoesRetryOnOpenSearchExceptions_UntilSuccess() throws IOException {
when(indexConfiguration.getIndexAlias()).thenReturn(INDEX_ALIAS);
when(clusterSettingsParser.getStringValueClusterSetting(any(GetClusterSettingsResponse.class), eq(IndexConstants.ISM_ENABLED_SETTING)))
.thenReturn("true");
String configuredIndexAlias = openSearchSinkConfiguration.getIndexConfiguration().getIndexAlias();
String expectedIndexAlias = INDEX_ALIAS.replace("${" + ID + "}", DYNAMIC);
innerIndexManager = mock(IndexManager.class);
when(mockIndexManagerFactory.getIndexManager(
IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy, expectedIndexAlias)).thenReturn(innerIndexManager);
doThrow(new OpenSearchException(new RuntimeException()))
.doThrow(new OpenSearchException(new RuntimeException()))
.doNothing()
.when(innerIndexManager).setupIndex();
when(innerIndexManager.getIndexName(expectedIndexAlias)).thenReturn(expectedIndexAlias);

JacksonEvent event = JacksonEvent.builder().withEventType(EVENT_TYPE).withData(Map.of(ID, DYNAMIC)).build();
final String indexName = dynamicIndexManager.getIndexName(event.formatString(configuredIndexAlias));
assertThat(expectedIndexAlias, equalTo(indexName));

verify(innerIndexManager, times(3)).setupIndex();
}

@Test
public void getIndexName_DoesRetryOnOpenSearchExceptions_UntilFailure() throws IOException {
when(indexConfiguration.getIndexAlias()).thenReturn(INDEX_ALIAS);
String configuredIndexAlias = openSearchSinkConfiguration.getIndexConfiguration().getIndexAlias();
String expectedIndexAlias = INDEX_ALIAS.replace("${" + ID + "}", DYNAMIC);
innerIndexManager = mock(IndexManager.class);
when(mockIndexManagerFactory.getIndexManager(
IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy, expectedIndexAlias)).thenReturn(innerIndexManager);
doThrow(new OpenSearchException(new RuntimeException()))
.doThrow(new OpenSearchException(new RuntimeException()))
.doThrow(new RuntimeException())
.when(innerIndexManager).setupIndex();
when(innerIndexManager.getIndexName(expectedIndexAlias)).thenReturn(expectedIndexAlias);

JacksonEvent event = JacksonEvent.builder().withEventType(EVENT_TYPE).withData(Map.of(ID, DYNAMIC)).build();
assertThrows(RuntimeException.class, () -> dynamicIndexManager.getIndexName(event.formatString(configuredIndexAlias)));

verify(innerIndexManager, times(3)).setupIndex();
}
}

0 comments on commit e338559

Please sign in to comment.