Skip to content

Commit

Permalink
OAK-11290 - Improve logging of publish indexing job (#1887)
Browse files Browse the repository at this point in the history
  • Loading branch information
nfsantos authored Dec 6, 2024
1 parent e18d69c commit b74629c
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,16 @@ public IndexImporter(NodeStore nodeStore, File indexDir, IndexEditorProvider ind
AsyncIndexerLock indexerLock, StatisticsProvider statisticsProvider, IndexingReporter indexingReporter) throws IOException {
this.statisticsProvider = statisticsProvider;
this.indexingReporter = indexingReporter;
checkArgument(indexDir.exists() && indexDir.isDirectory(), "Path [%s] does not point " +
"to existing directory", indexDir.getAbsolutePath());
checkArgument(indexDir.exists() && indexDir.isDirectory(),
"Path [%s] does not point to existing directory", indexDir.getAbsolutePath());
this.nodeStore = nodeStore;
this.indexDir = indexDir;
this.indexEditorProvider = indexEditorProvider;
indexerInfo = IndexerInfo.fromDirectory(indexDir);
this.indexerInfo = IndexerInfo.fromDirectory(indexDir);
this.indexerLock = indexerLock;
indexes = indexerInfo.getIndexes();
indexedState = requireNonNull(nodeStore.retrieve(indexerInfo.checkpoint), String.format("Cannot retrieve " +
"checkpointed state [%s]", indexerInfo.checkpoint));
this.indexes = indexerInfo.getIndexes();
this.indexedState = requireNonNull(nodeStore.retrieve(indexerInfo.checkpoint),
"Cannot retrieve checkpointed state [" + indexerInfo.checkpoint + "]");
this.indexDefinitionUpdater = new IndexDefinitionUpdater(new File(indexDir, INDEX_DEFINITIONS_JSON));
this.asyncLaneToIndexMapping = mapIndexesToLanes(indexes);
this.indexPathsToUpdate = new HashSet<>();
Expand All @@ -127,7 +127,7 @@ enum IndexImportState {

public void importIndex() throws IOException, CommitFailedException {
try {
if (indexes.keySet().isEmpty()) {
if (indexes.isEmpty()) {
LOG.warn("No indexes to import (possibly index definitions outside of a oak:index node?)");
}
LOG.info("Proceeding to import {} indexes from {}", indexes.keySet(), indexDir.getAbsolutePath());
Expand Down Expand Up @@ -178,10 +178,9 @@ public void importIndex() throws IOException, CommitFailedException {
mergeWithConcurrentCheck(nodeStore, builder);
});
} catch (CommitFailedException commitFailedException) {
LOG.error("Unable to revert back index lanes for: "
+ indexPathsToUpdate.stream()
.collect(StringBuilder::new, StringBuilder::append, (a, b) -> a.append(",").append(b)),
commitFailedException);
LOG.error("Unable to revert back index lanes for: {}",
indexPathsToUpdate.stream()
.collect(StringBuilder::new, StringBuilder::append, (a, b) -> a.append(",").append(b)), commitFailedException);
throw e;
}
}
Expand Down Expand Up @@ -265,12 +264,12 @@ private void bringAsyncIndexUpToDate(String laneName, List<IndexInfo> indexInfos
boolean success = false;
try {
String checkpoint = getAsync().getString(laneName);
requireNonNull(checkpoint, String.format("No current checkpoint found for lane [%s]", laneName));
requireNonNull(checkpoint, "No current checkpoint found for lane [" + laneName + "]");

//TODO Support case where checkpoint got lost or complete reindexing is done

NodeState after = nodeStore.retrieve(checkpoint);
requireNonNull(after, String.format("No state found for checkpoint [%s] for lane [%s]", checkpoint, laneName));
requireNonNull(after, "No state found for checkpoint [" + checkpoint + "] for lane [" + laneName + "]");
LOG.info("Proceeding to update imported indexes {} to checkpoint [{}] for lane [{}]",
indexInfos, checkpoint, laneName);

Expand All @@ -297,8 +296,8 @@ private void bringAsyncIndexUpToDate(String laneName, List<IndexInfo> indexInfos
updateIndexImporterState(builder, IndexImportState.IMPORT_INDEX_DATA, IndexImportState.BRING_INDEX_UPTODATE, false);
mergeWithConcurrentCheck(nodeStore, builder);
success = true;
LOG.info("Imported index is updated to repository state at checkpoint [{}] for " +
"indexing lane [{}]", checkpoint, laneName);
LOG.info("Imported index is updated to repository state at checkpoint [{}] for indexing lane [{}]",
checkpoint, laneName);
} catch (CommitFailedException e) {
LOG.error("Failed while performing bringIndexUpToDate and updating indexImportState from [{}] to [{}]",
IndexImportState.IMPORT_INDEX_DATA, IndexImportState.BRING_INDEX_UPTODATE);
Expand Down Expand Up @@ -375,7 +374,7 @@ private LockToken interruptCurrentIndexing(String laneName) throws CommitFailedE

private IndexImporterProvider getImporter(String type) {
IndexImporterProvider provider = importers.get(type);
return requireNonNull(provider, String.format("No IndexImporterProvider found for type [%s]", type));
return requireNonNull(provider, "No IndexImporterProvider found for type [" + type + "]");
}

private ListMultimap<String, IndexInfo> mapIndexesToLanes(Map<String, File> indexes) {
Expand All @@ -391,7 +390,7 @@ private ListMultimap<String, IndexInfo> mapIndexesToLanes(Map<String, File> inde
boolean newIndex = !NodeStateUtils.getNode(rootState, indexPath).exists();

String type = indexState.getString(IndexConstants.TYPE_PROPERTY_NAME);
requireNonNull(type, String.format("No 'type' property found for index at path [%s]", indexPath));
requireNonNull(type, "No 'type' property found for index at path [" + indexPath + "]");

String asyncName = getAsyncLaneName(indexPath, indexState);
if (asyncName == null) {
Expand Down Expand Up @@ -448,8 +447,9 @@ private void releaseCheckpoint() throws CommitFailedException {

private void incrementReIndexCount(NodeBuilder definition) {
long count = 0;
if (definition.hasProperty(REINDEX_COUNT)) {
count = definition.getProperty(REINDEX_COUNT).getValue(Type.LONG);
PropertyState reindexCountProp = definition.getProperty(REINDEX_COUNT);
if (reindexCountProp != null) {
count = reindexCountProp.getValue(Type.LONG);
}
definition.setProperty(REINDEX_COUNT, count + 1);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.index;

public interface IndexerMetrics {
String INDEXER_METRICS_PREFIX = "oak_indexer_";
String METRIC_INDEXING_INDEX_DATA_SIZE = INDEXER_METRICS_PREFIX + "index_data_size";

String INDEXER_PUBLISH_METRICS_PREFIX = "oak_indexer_publish_";
String METRIC_INDEXING_PUBLISH_DURATION_SECONDS = INDEXER_PUBLISH_METRICS_PREFIX + "indexing_duration_seconds";
String METRIC_INDEXING_PUBLISH_NODES_TRAVERSED = INDEXER_PUBLISH_METRICS_PREFIX + "nodes_traversed";
String METRIC_INDEXING_PUBLISH_NODES_INDEXED = INDEXER_PUBLISH_METRICS_PREFIX + "nodes_indexed";
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
Expand All @@ -29,6 +30,7 @@

import org.apache.commons.io.FileUtils;
import org.apache.felix.inventory.Format;

import java.util.function.Predicate;
import java.util.regex.Pattern;

Expand All @@ -54,7 +56,7 @@
import static java.util.Objects.requireNonNull;

public class IndexerSupport {
private final Logger log = LoggerFactory.getLogger(getClass());
private final Logger LOG = LoggerFactory.getLogger(getClass());
/**
* Directory name in output directory under which indexes are
* stored
Expand Down Expand Up @@ -131,7 +133,7 @@ public NodeState retrieveNodeStateForCheckpoint() {
NodeState checkpointedState;
if (HEAD_AS_CHECKPOINT.equals(checkpoint)) {
checkpointedState = indexHelper.getNodeStore().getRoot();
log.warn("Using head state for indexing. Such an index cannot be imported back");
LOG.warn("Using head state for indexing. Such an index cannot be imported back");
} else {
checkpointedState = indexHelper.getNodeStore().retrieve(checkpoint);
requireNonNull(checkpointedState, String.format("Not able to retrieve revision referred via checkpoint [%s]", checkpoint));
Expand Down Expand Up @@ -169,7 +171,7 @@ public void switchIndexLanesAndReindexFlag(NodeStore copyOnWriteStore) throws Co
}

copyOnWriteStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
log.info("Switched the async lane for indexes at {} to {} and marked them for reindex", indexHelper.getIndexPaths(), REINDEX_LANE);
LOG.info("Switched the async lane for indexes at {} to {} and marked them for reindex", indexHelper.getIndexPaths(), REINDEX_LANE);
}

public void postIndexWork(NodeStore copyOnWriteStore) throws CommitFailedException, IOException {
Expand All @@ -187,7 +189,7 @@ protected void switchIndexLanesBack(NodeStore copyOnWriteStore) throws CommitFai
}

copyOnWriteStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
log.info("Switched the async lane for indexes at {} back to there original lanes", indexHelper.getIndexPaths());
LOG.info("Switched the async lane for indexes at {} back to there original lanes", indexHelper.getIndexPaths());
}

public Map<String, String> getCheckpointInfo() {
Expand Down Expand Up @@ -236,7 +238,7 @@ public Set<String> getPreferredPathElements(Set<IndexDefinition> indexDefinition
}

/**
* @param indexDefinitions set of IndexDefinition to be used to calculate the Path Predicate
* @param indexDefinitions set of IndexDefinition to be used to calculate the Path Predicate
* @param typeToRepositoryPath Function to convert type <T> to valid repository path of type <String>
* @param <T>
* @return filter predicate based on the include/exclude path rules of the given set of index definitions.
Expand All @@ -246,12 +248,59 @@ public <T> Predicate<T> getFilterPredicate(Set<IndexDefinition> indexDefinitions
}

/**
* @param pattern Pattern for a custom excludes regex based on which paths would be filtered out
* @param pattern Pattern for a custom excludes regex based on which paths would be filtered out
* @param typeToRepositoryPath Function to convert type <T> to valid repository path of type <String>
* @param <T>
* @return Return a predicate that should test true for all paths that do not match the provided regex pattern.
*/
public <T> Predicate<T> getFilterPredicateBasedOnCustomRegex(Pattern pattern, Function<T, String> typeToRepositoryPath) {
return t -> !pattern.matcher(typeToRepositoryPath.apply(t)).find();
}

/**
* Computes the total size of the generated index data. This method is intended to be used when creating Lucene
* indexes, which are created locally. With Elastic, this will not include the Lucene files since the indexes
* are updated remotely.
*
* @return The total size of the index data generated or -1 if there is some error while computing the size.
*/
public long computeSizeOfGeneratedIndexData() {
try {
File localIndexDir = getLocalIndexDir();
long totalSize = 0;
if (localIndexDir == null || !localIndexDir.isDirectory()) {
LOG.warn("Local index directory is invalid, this should not happen: {}", localIndexDir);
return -1;
} else {
// Each index is stored in a separate directory
File[] directories = localIndexDir.listFiles(File::isDirectory);
if (directories == null) {
LOG.warn("Error listing sub directories in the local index directory: {}", localIndexDir);
return -1;
}
// Print the indexes in alphabetic order
Arrays.sort(directories);
StringBuilder sb = new StringBuilder();
for (File indexDir : directories) {
long size = FileUtils.sizeOfDirectory(indexDir);
totalSize += size;
File[] files = indexDir.listFiles(File::isFile);
if (files == null) {
LOG.warn("Error listing files in directory: {}", indexDir);
// continue to the next index
} else {
long numberOfFiles = files.length;
sb.append("\n - " + indexDir.getName() + ": " + numberOfFiles + " files, " +
size + " (" + FileUtils.byteCountToDisplaySize(size) + ")");
}
}
LOG.info("Total size of index data generated: {} ({}){}",
totalSize, FileUtils.byteCountToDisplaySize(totalSize), sb);
return totalSize;
}
} catch (Throwable t) {
LOG.warn("Error while computing size of generated index data.", t);
return -1;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
package org.apache.jackrabbit.oak.index;

import com.codahale.metrics.MetricRegistry;
import org.apache.jackrabbit.guava.common.base.Stopwatch;
import org.apache.jackrabbit.guava.common.io.Closer;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.plugins.index.*;
import org.apache.jackrabbit.oak.plugins.index.progress.MetricRateEstimator;
import org.apache.jackrabbit.oak.plugins.index.progress.NodeCounterMBeanEstimator;
import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
import org.apache.jackrabbit.oak.plugins.metric.MetricStatisticsProvider;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
Expand All @@ -38,15 +40,27 @@
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static java.util.Objects.requireNonNull;
import static org.apache.jackrabbit.oak.index.IndexerMetrics.METRIC_INDEXING_INDEX_DATA_SIZE;
import static org.apache.jackrabbit.oak.index.IndexerMetrics.METRIC_INDEXING_PUBLISH_DURATION_SECONDS;
import static org.apache.jackrabbit.oak.index.IndexerMetrics.METRIC_INDEXING_PUBLISH_NODES_INDEXED;
import static org.apache.jackrabbit.oak.index.IndexerMetrics.METRIC_INDEXING_PUBLISH_NODES_TRAVERSED;
import static org.apache.jackrabbit.oak.plugins.index.IndexUtils.INDEXING_PHASE_LOGGER;

public abstract class OutOfBandIndexerBase implements Closeable, IndexUpdateCallback, NodeTraversalCallback{
public abstract class OutOfBandIndexerBase implements Closeable, IndexUpdateCallback, NodeTraversalCallback {

protected final Closer closer = Closer.create();
private final IndexHelper indexHelper;
private final IndexerSupport indexerSupport;
private final IndexingReporter indexingReporter;
private final StatisticsProvider statisticsProvider;
private NodeStore copyOnWriteStore;
private IndexerSupport indexerSupport;
private long nodesTraversed = 0;
private long nodesIndexed = 0;

/**
* Index lane name which is used for indexing
Expand All @@ -64,18 +78,38 @@ public abstract class OutOfBandIndexerBase implements Closeable, IndexUpdateCall
public OutOfBandIndexerBase(IndexHelper indexHelper, IndexerSupport indexerSupport) {
this.indexHelper = requireNonNull(indexHelper);
this.indexerSupport = requireNonNull(indexerSupport);
this.indexingReporter = indexHelper.getIndexReporter();
this.statisticsProvider = indexHelper.getStatisticsProvider();
}

public void reindex() throws CommitFailedException, IOException {
NodeState checkpointedState = indexerSupport.retrieveNodeStateForCheckpoint();

copyOnWriteStore = new MemoryNodeStore(checkpointedState);
NodeState baseState = copyOnWriteStore.getRoot();
//TODO Check for indexPaths being empty

indexerSupport.switchIndexLanesAndReindexFlag(copyOnWriteStore);
preformIndexUpdate(baseState);
indexerSupport.postIndexWork(copyOnWriteStore);
List<String> indexNames = indexerSupport.getIndexDefinitions().stream().map(IndexDefinition::getIndexName).collect(Collectors.toList());
indexingReporter.setIndexNames(indexNames);
INDEXING_PHASE_LOGGER.info("[TASK:FULL_INDEX_CREATION:START] Starting indexing job");
Stopwatch indexJobWatch = Stopwatch.createStarted();
try {
NodeState checkpointedState = indexerSupport.retrieveNodeStateForCheckpoint();

copyOnWriteStore = new MemoryNodeStore(checkpointedState);
NodeState baseState = copyOnWriteStore.getRoot();

indexerSupport.switchIndexLanesAndReindexFlag(copyOnWriteStore);
preformIndexUpdate(baseState);
indexerSupport.postIndexWork(copyOnWriteStore);

long indexingDurationSeconds = indexJobWatch.elapsed(TimeUnit.SECONDS);
long totalSize = indexerSupport.computeSizeOfGeneratedIndexData();
INDEXING_PHASE_LOGGER.info("[TASK:INDEXING:END] Metrics: {}", MetricsFormatter.createMetricsWithDurationOnly(indexingDurationSeconds));
MetricsUtils.addMetric(statisticsProvider, indexingReporter, METRIC_INDEXING_PUBLISH_DURATION_SECONDS, indexingDurationSeconds);
MetricsUtils.addMetric(statisticsProvider, indexingReporter, METRIC_INDEXING_PUBLISH_NODES_TRAVERSED, nodesTraversed);
MetricsUtils.addMetric(statisticsProvider, indexingReporter, METRIC_INDEXING_PUBLISH_NODES_INDEXED, nodesIndexed);
MetricsUtils.addMetricByteSize(statisticsProvider, indexingReporter, METRIC_INDEXING_INDEX_DATA_SIZE, totalSize);
indexingReporter.addTiming("Build Lucene Index", FormattingUtils.formatToSeconds(indexingDurationSeconds));
} catch (Throwable t) {
INDEXING_PHASE_LOGGER.info("[TASK:FULL_INDEX_CREATION:FAIL] Metrics: {}, Error: {}",
MetricsFormatter.createMetricsWithDurationOnly(indexJobWatch), t.toString());
throw t;
}
}

protected File getLocalIndexDir() throws IOException {
Expand All @@ -90,13 +124,13 @@ public void close() throws IOException {
//~---------------------------------------------------< callbacks >

@Override
public void indexUpdate() throws CommitFailedException {

public void indexUpdate() {
nodesIndexed++;
}

@Override
public void traversedNode(NodeTraversalCallback.PathSource pathSource) throws CommitFailedException {

public void traversedNode(NodeTraversalCallback.PathSource pathSource) {
nodesTraversed++;
}

protected void preformIndexUpdate(NodeState baseState) throws IOException, CommitFailedException {
Expand Down
Loading

0 comments on commit b74629c

Please sign in to comment.