Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OAK-11545 - Use a single long-lived bulk processor for Elastic reindexing #2183

Open
wants to merge 17 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.jackrabbit.oak.plugins.index;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -166,12 +165,7 @@ private void registerAsyncReindexSupport(Whiteboard whiteboard) {
registerMBean(whiteboard, PropertyIndexAsyncReindexMBean.class, asyncPI,
PropertyIndexAsyncReindexMBean.TYPE, "async"),
registerMBean(whiteboard, IndexStatsMBean.class, task.getIndexStats(), IndexStatsMBean.TYPE, name));
closer.register(new Closeable() {
@Override
public void close() throws IOException {
reg.unregister();
}
});
closer.register(reg::unregister);
}

@Deactivate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@

package org.apache.jackrabbit.oak.index.indexer.document;

import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.plugins.document.NodeDocument;

import static java.util.Objects.requireNonNull;

/**
Expand All @@ -36,6 +38,8 @@
*/
public class CompositeIndexer implements NodeStateIndexer {

private final static Logger LOG = LoggerFactory.getLogger(CompositeIndexer.class);

private final List<NodeStateIndexer> indexers;

public CompositeIndexer(List<NodeStateIndexer> indexers) {
Expand Down Expand Up @@ -98,7 +102,13 @@ public String getIndexName() {

@Override
public void close() throws IOException {

for (NodeStateIndexer indexer : indexers) {
try {
indexer.close();
} catch (IOException e) {
LOG.warn("Error closing indexer {}. Suppressing exception.", indexer, e);
}
}
}

public List<NodeStateIndexer> getIndexers() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,41 +378,38 @@ public void reindex() throws CommitFailedException, IOException {
progressReporter.reset();

progressReporter.reindexingTraversalStart("/");

NodeBuilder builder = copyOnWriteStore.getRoot().builder();
CompositeIndexer compositeIndexer = prepareIndexers(copyOnWriteStore, builder, progressReporter);
closer.register(compositeIndexer);
preIndexOperations(compositeIndexer.getIndexers());

INDEXING_PHASE_LOGGER.info("[TASK:INDEXING:START] Starting indexing");
Stopwatch indexerWatch = Stopwatch.createStarted();
try {
if (indexStores.size() > 1) {
indexParallel(indexStores, compositeIndexer, progressReporter);
} else if (indexStores.size() == 1) {
IndexStore indexStore = indexStores.get(0);
TopKSlowestPaths slowestTopKElements = new TopKSlowestPaths(TOP_SLOWEST_PATHS_TO_LOG);
compositeIndexer.onIndexingStarting();
long entryStart = System.nanoTime();
for (NodeStateEntry entry : indexStore) {
reportDocumentRead(entry.getPath(), progressReporter);
compositeIndexer.index(entry);
// Avoid calling System.nanoTime() twice per each entry, by reusing the timestamp taken at the end
// of indexing an entry as the start time of the following entry. This is less accurate, because
// the measured times will also include the bookkeeping at the end of indexing each entry, but
// we are only interested in entries that take a significant time to index, so this extra
// inaccuracy will not significantly change the results.
long entryEnd = System.nanoTime();
long elapsedMillis = (entryEnd - entryStart) / 1_000_000;
entryStart = entryEnd;
slowestTopKElements.add(entry.getPath(), elapsedMillis);
if (elapsedMillis > 1000) {
log.info("Indexing {} took {} ms", entry.getPath(), elapsedMillis);
try (CompositeIndexer compositeIndexer = prepareIndexers(copyOnWriteStore, builder, progressReporter)) {
preIndexOperations(compositeIndexer.getIndexers());
if (indexStores.size() > 1) {
indexParallel(indexStores, compositeIndexer, progressReporter);
} else if (indexStores.size() == 1) {
IndexStore indexStore = indexStores.get(0);
TopKSlowestPaths slowestTopKElements = new TopKSlowestPaths(TOP_SLOWEST_PATHS_TO_LOG);
compositeIndexer.onIndexingStarting();
long entryStart = System.nanoTime();
for (NodeStateEntry entry : indexStore) {
reportDocumentRead(entry.getPath(), progressReporter);
compositeIndexer.index(entry);
// Avoid calling System.nanoTime() twice per each entry, by reusing the timestamp taken at the end
// of indexing an entry as the start time of the following entry. This is less accurate, because
// the measured times will also include the bookkeeping at the end of indexing each entry, but
// we are only interested in entries that take a significant time to index, so this extra
// inaccuracy will not significantly change the results.
long entryEnd = System.nanoTime();
long elapsedMillis = (entryEnd - entryStart) / 1_000_000;
entryStart = entryEnd;
slowestTopKElements.add(entry.getPath(), elapsedMillis);
if (elapsedMillis > 1000) {
log.info("Indexing {} took {} ms", entry.getPath(), elapsedMillis);
}
}
log.info("Top slowest nodes to index (ms): {}", slowestTopKElements);
}
log.info("Top slowest nodes to index (ms): {}", slowestTopKElements);
}

for (NodeStateIndexerProvider indexerProvider : indexerProviders) {
ExtractedTextCache extractedTextCache = indexerProvider.getTextCache();
CacheStats cacheStats = extractedTextCache == null ? null : extractedTextCache.getCacheStats();
Expand Down Expand Up @@ -545,7 +542,6 @@ protected CompositeIndexer prepareIndexers(NodeStore copyOnWriteStore, NodeBuild
NodeStateIndexer indexer = indexerProvider.getIndexer(type, indexPath, idxBuilder, root, progressReporter);
if (indexer != null) {
indexers.add(indexer);
closer.register(indexer);
progressReporter.registerIndex(indexPath, true, -1);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexTracker;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticMetricHandler;
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticBulkProcessorHandler;
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticDocument;
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticIndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticIndexWriterFactory;
Expand All @@ -36,10 +37,11 @@
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.jetbrains.annotations.NotNull;

import org.jetbrains.annotations.Nullable;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.TYPE_PROPERTY_NAME;

Expand All @@ -49,12 +51,16 @@ public class ElasticIndexerProvider implements NodeStateIndexerProvider {
private final IndexHelper indexHelper;
private final ElasticIndexWriterFactory indexWriterFactory;
private final ElasticConnection connection;
private final ElasticBulkProcessorHandler bulkProcessorHandler;
private final AtomicBoolean closed = new AtomicBoolean(false);

public ElasticIndexerProvider(IndexHelper indexHelper, ElasticConnection connection) {
this.indexHelper = indexHelper;
this.indexWriterFactory = new ElasticIndexWriterFactory(connection,
new ElasticIndexTracker(connection, new ElasticMetricHandler(StatisticsProvider.NOOP)));
this.connection = connection;
this.bulkProcessorHandler = new ElasticBulkProcessorHandler(connection);
this.indexWriterFactory = new ElasticIndexWriterFactory(connection,
new ElasticIndexTracker(connection, new ElasticMetricHandler(StatisticsProvider.NOOP)), bulkProcessorHandler);

}

@Override
Expand All @@ -79,5 +85,9 @@ public ExtractedTextCache getTextCache() {
}

@Override
public void close() {}
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
this.bulkProcessorHandler.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexTracker;
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticBulkProcessorHandler;
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticDocument;
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticIndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticIndexWriterFactory;
import org.apache.jackrabbit.oak.plugins.index.elastic.util.ElasticIndexDefinitionBuilder;
Expand Down Expand Up @@ -71,8 +73,8 @@ public void nodeIndexed_WithIncludedPaths() throws Exception {
when(elasticsearchAsyncClientMock._jsonpMapper()).thenReturn(jsonMapperMock);
when(elasticConnectionMock.getAsyncClient()).thenReturn(elasticsearchAsyncClientMock);

FulltextIndexWriter indexWriter = new ElasticIndexWriterFactory(elasticConnectionMock,
mock(ElasticIndexTracker.class)).newInstance(idxDefn, defn.builder(), CommitInfo.EMPTY, false);
FulltextIndexWriter<ElasticDocument> indexWriter = new ElasticIndexWriterFactory(elasticConnectionMock,
mock(ElasticIndexTracker.class), mock(ElasticBulkProcessorHandler.class)).newInstance(idxDefn, defn.builder(), CommitInfo.EMPTY, false);
ElasticIndexer indexer = new ElasticIndexer(idxDefn, mock(FulltextBinaryTextExtractor.class), builder,
mock(IndexingProgressReporter.class), indexWriter, mock(ElasticIndexEditorProvider.class), mock(IndexHelper.class));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,6 @@ public class ElasticIndexDefinition extends IndexDefinition {

public static final String TYPE_ELASTICSEARCH = "elasticsearch";

public static final String BULK_ACTIONS = "bulkActions";
public static final int BULK_ACTIONS_DEFAULT = 250;

public static final String BULK_SIZE_BYTES = "bulkSizeBytes";
public static final long BULK_SIZE_BYTES_DEFAULT = 1024 * 1024; // 1MB

public static final String BULK_FLUSH_INTERVAL_MS = "bulkFlushIntervalMs";
public static final long BULK_FLUSH_INTERVAL_MS_DEFAULT = 3000;

public static final String NUMBER_OF_SHARDS = "numberOfShards";
public static final int NUMBER_OF_SHARDS_DEFAULT = 1;

Expand Down Expand Up @@ -82,10 +73,6 @@ public class ElasticIndexDefinition extends IndexDefinition {
public static final String LIMIT_TOTAL_FIELDS = "limitTotalFields";
public static final long LIMIT_TOTAL_FIELDS_DEFAULT = 1000L;

// when true, fails indexing in case of bulk failures
public static final String FAIL_ON_ERROR = "failOnError";
public static final boolean FAIL_ON_ERROR_DEFAULT = true;

/**
* When 0, the index name gets dynamically generated by adding a random suffix to the index name.
*/
Expand Down Expand Up @@ -178,9 +165,6 @@ public class ElasticIndexDefinition extends IndexDefinition {

private final String indexPrefix;
private final String indexAlias;
public final int bulkActions;
public final long bulkSizeBytes;
public final long bulkFlushIntervalMs;
private final boolean similarityTagsEnabled;
private final float similarityTagsBoost;
public final int numberOfShards;
Expand All @@ -189,7 +173,6 @@ public class ElasticIndexDefinition extends IndexDefinition {
public final long queryTimeoutMs;
public final Integer trackTotalHits;
public final String dynamicMapping;
public final boolean failOnError;
public final long indexNameSeed;
public final InferenceDefinition inferenceDefinition;
public final long limitTotalFields;
Expand All @@ -211,9 +194,6 @@ public ElasticIndexDefinition(NodeState root, NodeState defn, String indexPath,
} else {
this.indexAlias = ElasticIndexNameHelper.getElasticSafeIndexName(indexPrefix, getIndexPath());
}
this.bulkActions = getOptionalValue(defn, BULK_ACTIONS, BULK_ACTIONS_DEFAULT);
this.bulkSizeBytes = getOptionalValue(defn, BULK_SIZE_BYTES, BULK_SIZE_BYTES_DEFAULT);
this.bulkFlushIntervalMs = getOptionalValue(defn, BULK_FLUSH_INTERVAL_MS, BULK_FLUSH_INTERVAL_MS_DEFAULT);
this.numberOfShards = getOptionalValue(defn, NUMBER_OF_SHARDS, NUMBER_OF_SHARDS_DEFAULT);
this.numberOfReplicas = getOptionalValue(defn, NUMBER_OF_REPLICAS, NUMBER_OF_REPLICAS_DEFAULT);
this.similarityTagsEnabled = getOptionalValue(defn, SIMILARITY_TAGS_ENABLED, SIMILARITY_TAGS_ENABLED_DEFAULT);
Expand All @@ -223,9 +203,6 @@ public ElasticIndexDefinition(NodeState root, NodeState defn, String indexPath,
this.queryTimeoutMs = getOptionalValue(defn, QUERY_TIMEOUT_MS, QUERY_TIMEOUT_MS_DEFAULT);
this.trackTotalHits = getOptionalValue(defn, TRACK_TOTAL_HITS, TRACK_TOTAL_HITS_DEFAULT);
this.dynamicMapping = getOptionalValue(defn, DYNAMIC_MAPPING, DYNAMIC_MAPPING_DEFAULT);
this.failOnError = getOptionalValue(defn, FAIL_ON_ERROR,
Boolean.parseBoolean(System.getProperty(TYPE_ELASTICSEARCH + "." + FAIL_ON_ERROR, Boolean.toString(FAIL_ON_ERROR_DEFAULT)))
);
this.indexNameSeed = getOptionalValue(defn, INDEX_NAME_SEED, INDEX_NAME_SEED_DEFAULT);
this.similarityTagsFields = getOptionalValues(defn, SIMILARITY_TAGS_FIELDS, Type.STRINGS, String.class, SIMILARITY_TAGS_FIELDS_DEFAULT);
this.limitTotalFields = getOptionalValue(defn, LIMIT_TOTAL_FIELDS, LIMIT_TOTAL_FIELDS_DEFAULT);
Expand Down
Loading
Loading