Skip to content

Commit

Permalink
Merge pull request #10388 from ErykKul/10381_index_after_publish
Browse files Browse the repository at this point in the history
semaphore for async indexing and sync index in transaction after publish
  • Loading branch information
landreev committed Mar 27, 2024
2 parents 472bb3d + 1479403 commit d5bb0c2
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 9 deletions.
3 changes: 3 additions & 0 deletions doc/release-notes/10381-index-after-publish.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
New release adds a new microprofile setting for maximum number of simultaneously running asynchronous dataset index operations that defaults to ``4``:

dataverse.solr.concurrency.max-async-indexes
8 changes: 8 additions & 0 deletions doc/sphinx-guides/source/admin/monitoring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,11 @@ Tips:
- It's possible to view and act on **RDS Events** such as snapshots, parameter changes, etc. See `Working with Amazon RDS events <https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_PerfInsights.access-control.html>`_ for details.
- RDS monitoring is available via API and the ``aws`` command line tool. For example, see `Retrieving metrics with the Performance Insights API <https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_PerfInsights.API.html>`_.
- To play with monitoring RDS using a server configured by `dataverse-ansible <https://github.com/GlobalDataverseCommunityConsortium/dataverse-ansible>`_ set ``use_rds`` to true to skip some steps that aren't necessary when using RDS. See also the :doc:`/developers/deployment` section of the Developer Guide.

MicroProfile Metrics endpoint
-----------------------------

Payara provides the metrics endpoint: <https://docs.payara.fish/community/docs/6.2023.9/Technical%20Documentation/MicroProfile/Metrics/Metrics%20Rest%20Endpoint.html>_
The metrics you can retrieve that way:
- `index_permit_wait_time_seconds_mean` displays how long does it take to receive a permit to index a dataset.
- `index_time_seconds` displays how long does it take to index a dataset.
9 changes: 9 additions & 0 deletions doc/sphinx-guides/source/installation/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2425,6 +2425,15 @@ when using it to configure your core name!

Can also be set via *MicroProfile Config API* sources, e.g. the environment variable ``DATAVERSE_SOLR_PATH``.

dataverse.solr.concurrency.max-async-indexes
++++++++++++++++++++++++++++++++++++++++++++

Maximum number of simultaneously running asynchronous dataset index operations.

Defaults to ``4``.

Can also be set via *MicroProfile Config API* sources, e.g. the environment variable ``DATAVERSE_SOLR_CONCURRENCY_MAX_ASYNC_INDEXES``.

dataverse.rserve.host
+++++++++++++++++++++

Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@
<artifactId>microprofile-config-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.eclipse.microprofile.metrics</groupId>
<artifactId>microprofile-metrics-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>jakarta.platform</groupId>
<artifactId>jakarta.jakartaee-api</artifactId>
Expand Down
14 changes: 10 additions & 4 deletions src/main/java/edu/harvard/iq/dataverse/DatasetPage.java
Original file line number Diff line number Diff line change
Expand Up @@ -790,11 +790,17 @@ public boolean isIndexedVersion() {
return isIndexedVersion = false;
}
// If this is the latest published version, we want to confirm that this
// version was successfully indexed after the last publication

// version was successfully indexed after the last publication
if (isThisLatestReleasedVersion()) {
return isIndexedVersion = (workingVersion.getDataset().getIndexTime() != null)
&& workingVersion.getDataset().getIndexTime().after(workingVersion.getReleaseTime());
if (workingVersion.getDataset().getIndexTime() == null) {
return isIndexedVersion = false;
}
// We add 3 hours to the indexed time to prevent false negatives
// when indexed time gets overwritten in finalizing the publication step
// by a value before the release time
final long duration = 3 * 60 * 60 * 1000;
final Timestamp movedIndexTime = new Timestamp(workingVersion.getDataset().getIndexTime().getTime() + duration);
return isIndexedVersion = movedIndexTime.after(workingVersion.getReleaseTime());
}

// Drafts don't have the indextime stamps set/incremented when indexed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,6 @@ public boolean onSuccess(CommandContext ctxt, Object r) {
} catch (Exception e) {
logger.warning("Failure to send dataset published messages for : " + dataset.getId() + " : " + e.getMessage());
}
ctxt.index().asyncIndexDataset(dataset, true);

//re-indexing dataverses that have additional subjects
if (!dataversesToIndex.isEmpty()){
Expand Down Expand Up @@ -297,7 +296,8 @@ public boolean onSuccess(CommandContext ctxt, Object r) {
logger.log(Level.WARNING, "Finalization: exception caught while exporting: "+ex.getMessage(), ex);
// ... but it is important to only update the export time stamp if the
// export was indeed successful.
}
}
ctxt.index().asyncIndexDataset(dataset, true);

return retVal;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.function.Function;
import java.util.logging.Logger;
import java.util.stream.Collectors;
Expand All @@ -47,6 +48,8 @@
import jakarta.ejb.Stateless;
import jakarta.ejb.TransactionAttribute;
import static jakarta.ejb.TransactionAttributeType.REQUIRES_NEW;

import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.json.JsonObject;
import jakarta.persistence.EntityManager;
Expand All @@ -71,6 +74,9 @@
import org.apache.tika.sax.BodyContentHandler;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.metrics.MetricUnits;
import org.eclipse.microprofile.metrics.Timer;
import org.eclipse.microprofile.metrics.annotation.Metric;
import org.xml.sax.ContentHandler;

@Stateless
Expand Down Expand Up @@ -341,6 +347,29 @@ public void indexDatasetInNewTransaction(Long datasetId) { //Dataset dataset) {
private static final Map<Long, Dataset> NEXT_TO_INDEX = new ConcurrentHashMap<>();
// indexingNow is a set of dataset ids of datasets being indexed asynchronously right now
private static final Map<Long, Boolean> INDEXING_NOW = new ConcurrentHashMap<>();
// semaphore for async indexing
private static final Semaphore ASYNC_INDEX_SEMAPHORE = new Semaphore(JvmSettings.MAX_ASYNC_INDEXES.lookupOptional(Integer.class).orElse(4), true);

@Inject
@Metric(name = "index_permit_wait_time", absolute = true, unit = MetricUnits.NANOSECONDS,
description = "Displays how long does it take to receive a permit to index a dataset")
Timer indexPermitWaitTimer;

@Inject
@Metric(name = "index_time", absolute = true, unit = MetricUnits.NANOSECONDS,
description = "Displays how long does it take to index a dataset")
Timer indexTimer;

/**
* Try to acquire a permit from the semaphore avoiding too many parallel indexes, potentially overwhelming Solr.
* This method will time the duration waiting for the permit, allowing indexing performance to be measured.
* @throws InterruptedException
*/
private void acquirePermitFromSemaphore() throws InterruptedException {
try (var timeContext = indexPermitWaitTimer.time()) {
ASYNC_INDEX_SEMAPHORE.acquire();
}
}

// When you pass null as Dataset parameter to this method, it indicates that the indexing of the dataset with "id" has finished
// Pass non-null Dataset to schedule it for indexing
Expand Down Expand Up @@ -385,10 +414,24 @@ synchronized private static Dataset getNextToIndex(Long id, Dataset d) {
*/
@Asynchronous
public void asyncIndexDataset(Dataset dataset, boolean doNormalSolrDocCleanUp) {
try {
acquirePermitFromSemaphore();
doAyncIndexDataset(dataset, doNormalSolrDocCleanUp);
} catch (InterruptedException e) {
String failureLogText = "Indexing failed: interrupted. You can kickoff a re-index of this dataset with: \r\n curl http://localhost:8080/api/admin/index/datasets/" + dataset.getId().toString();
failureLogText += "\r\n" + e.getLocalizedMessage();
LoggingUtil.writeOnSuccessFailureLog(null, failureLogText, dataset);
} finally {
ASYNC_INDEX_SEMAPHORE.release();
}
}

private void doAyncIndexDataset(Dataset dataset, boolean doNormalSolrDocCleanUp) {
Long id = dataset.getId();
Dataset next = getNextToIndex(id, dataset); // if there is an ongoing index job for this dataset, next is null (ongoing index job will reindex the newest version after current indexing finishes)
while (next != null) {
try {
// Time context will automatically start on creation and stop when leaving the try block
try (var timeContext = indexTimer.time()) {
indexDataset(next, doNormalSolrDocCleanUp);
} catch (Exception e) { // catch all possible exceptions; otherwise when something unexpected happes the dataset wold remain locked and impossible to reindex
String failureLogText = "Indexing failed. You can kickoff a re-index of this dataset with: \r\n curl http://localhost:8080/api/admin/index/datasets/" + dataset.getId().toString();
Expand All @@ -402,7 +445,16 @@ public void asyncIndexDataset(Dataset dataset, boolean doNormalSolrDocCleanUp) {
@Asynchronous
public void asyncIndexDatasetList(List<Dataset> datasets, boolean doNormalSolrDocCleanUp) {
for(Dataset dataset : datasets) {
asyncIndexDataset(dataset, true);
try {
acquirePermitFromSemaphore();
doAyncIndexDataset(dataset, true);
} catch (InterruptedException e) {
String failureLogText = "Indexing failed: interrupted. You can kickoff a re-index of this dataset with: \r\n curl http://localhost:8080/api/admin/index/datasets/" + dataset.getId().toString();
failureLogText += "\r\n" + e.getLocalizedMessage();
LoggingUtil.writeOnSuccessFailureLog(null, failureLogText, dataset);
} finally {
ASYNC_INDEX_SEMAPHORE.release();
}
}
}

Expand All @@ -414,7 +466,7 @@ public void indexDvObject(DvObject objectIn) throws SolrServerException, IOExce
}
}

private void indexDataset(Dataset dataset, boolean doNormalSolrDocCleanUp) throws SolrServerException, IOException {
public void indexDataset(Dataset dataset, boolean doNormalSolrDocCleanUp) throws SolrServerException, IOException {
doIndexDataset(dataset, doNormalSolrDocCleanUp);
updateLastIndexedTime(dataset.getId());
}
Expand Down Expand Up @@ -1495,6 +1547,7 @@ private void updateLastIndexedTimeInNewTransaction(Long id) {
DvObject dvObjectToModify = em.find(DvObject.class, id);
dvObjectToModify.setIndexTime(new Timestamp(new Date().getTime()));
dvObjectToModify = em.merge(dvObjectToModify);
em.flush();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ public enum JvmSettings {
SOLR_CORE(SCOPE_SOLR, "core"),
SOLR_PATH(SCOPE_SOLR, "path"),

// INDEX CONCURENCY
SCOPE_SOLR_CONCURENCY(SCOPE_SOLR, "concurrency"),
MAX_ASYNC_INDEXES(SCOPE_SOLR_CONCURENCY, "max-async-indexes"),

// RSERVE CONNECTION
SCOPE_RSERVE(PREFIX, "rserve"),
RSERVE_HOST(SCOPE_RSERVE, "host"),
Expand Down

0 comments on commit d5bb0c2

Please sign in to comment.