Skip to content

Commit

Permalink
LmdbStore GC algorithm for values.
Browse files Browse the repository at this point in the history
- uses revision-based eviction to support lazy values
  • Loading branch information
kenwenzel committed Sep 6, 2023
1 parent 38e0b42 commit 3a9ad95
Show file tree
Hide file tree
Showing 11 changed files with 1,234 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,21 @@

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;

import org.eclipse.rdf4j.collection.factory.mapdb.MapDbCollectionFactory;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.CloseableIteratorIteration;
import org.eclipse.rdf4j.common.iteration.ConvertingIteration;
Expand Down Expand Up @@ -69,6 +74,8 @@ class LmdbSailStore implements SailStore {

private final boolean enableMultiThreading = true;

private final Set<Long> unusedIds;

/**
* A fast non-blocking circular buffer backed by an array.
*
Expand Down Expand Up @@ -134,6 +141,13 @@ class AddQuadOperation implements Operation {

@Override
public void execute() throws IOException {
if (!unusedIds.isEmpty()) {
// these ids are used again
unusedIds.remove(s);
unusedIds.remove(p);
unusedIds.remove(o);
unusedIds.remove(c);
}
boolean wasNew = tripleStore.storeTriple(s, p, o, c, explicit);
if (wasNew && context != null) {
contextStore.increment(context);
Expand Down Expand Up @@ -168,6 +182,19 @@ abstract static class StatefulOperation implements Operation {
* Creates a new {@link LmdbSailStore}.
*/
public LmdbSailStore(File dataDir, LmdbStoreConfig config) throws IOException, SailException {
this.unusedIds = new PersistentSet<>(dataDir) {
@Override
protected byte[] write(Long element) {
ByteBuffer bb = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.BIG_ENDIAN);
bb.putLong(element);
return bb.array();
}

@Override
protected Long read(ByteBuffer buffer) {
return buffer.order(ByteOrder.BIG_ENDIAN).getLong();
}
};
boolean initialized = false;
try {
namespaceStore = new NamespaceStore(dataDir);
Expand Down Expand Up @@ -415,6 +442,19 @@ public void prepare() throws SailException {
// serializable is not supported at this level
}

protected void filterUsedIdsInTripleStore() throws IOException {
if (!unusedIds.isEmpty()) {
tripleStore.filterUsedIds(unusedIds);
}
}

protected void handleRemovedIdsInValueStore() throws IOException {
if (!unusedIds.isEmpty()) {
valueStore.gcIds(unusedIds);
unusedIds.clear();
}
}

@Override
public void flush() throws SailException {
sinkStoreAccessLock.lock();
Expand Down Expand Up @@ -446,6 +486,10 @@ public void flush() throws SailException {
contextStore.sync();
} finally {
if (activeTxn) {
if (!multiThreadingActive) {
filterUsedIdsInTripleStore();
}
handleRemovedIdsInValueStore();
valueStore.commit();
if (!multiThreadingActive) {
tripleStore.commit();
Expand Down Expand Up @@ -548,6 +592,8 @@ private void startTransaction(boolean preferThreading) throws SailException {
Operation op = opQueue.remove();
if (op != null) {
if (op == COMMIT_TRANSACTION) {
filterUsedIdsInTripleStore();

tripleStore.commit();
nextTransactionAsync = false;
asyncTransactionFinished = true;
Expand Down Expand Up @@ -642,9 +688,17 @@ private long removeStatements(long subj, long pred, long obj, boolean explicit,
throws IOException {
long removeCount = 0;
for (long contextId : contexts) {
Map<Long, Long> result = tripleStore.removeTriplesByContext(subj, pred, obj, contextId, explicit);
final Map<Long, Long> perContextCounts = new HashMap<>();
tripleStore.removeTriplesByContext(subj, pred, obj, contextId, explicit, quad -> {
perContextCounts.merge(quad[3], 1L, (c, one) -> c + one);
for (long id : quad) {
if (id != 0L) {
unusedIds.add(id);
}
}
});

for (Entry<Long, Long> entry : result.entrySet()) {
for (Entry<Long, Long> entry : perContextCounts.entrySet()) {
Long entryContextId = entry.getKey();
if (entryContextId > 0) {
Resource modifiedContext = (Resource) valueStore.getValue(entryContextId);
Expand Down Expand Up @@ -809,5 +863,4 @@ public CloseableIteration<? extends Statement> getStatements(Resource subj, IRI
}
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,21 @@
package org.eclipse.rdf4j.sail.lmdb;

import org.eclipse.rdf4j.common.concurrent.locks.Lock;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.IterationWrapper;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.Dataset;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.algebra.TupleExpr;
import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.SailReadOnlyException;
import org.eclipse.rdf4j.sail.base.SailSourceConnection;
import org.eclipse.rdf4j.sail.helpers.DefaultSailChangedEvent;
import org.eclipse.rdf4j.sail.lmdb.model.LmdbValue;

/**
* Connection to an {@link LmdbStore}.
Expand Down Expand Up @@ -120,6 +128,52 @@ public boolean addInferredStatement(Resource subj, IRI pred, Value obj, Resource
return ret;
}

@Override
protected CloseableIteration<? extends BindingSet> evaluateInternal(TupleExpr tupleExpr,
Dataset dataset,
BindingSet bindings, boolean includeInferred) throws SailException {
// ensure that all elements of the binding set are initialized (lazy values are resolved)
return new IterationWrapper<BindingSet>(
super.evaluateInternal(tupleExpr, dataset, bindings, includeInferred)) {
@Override
public BindingSet next() throws QueryEvaluationException {
BindingSet bs = super.next();
bs.forEach(b -> initValue(b.getValue()));
return bs;
}
};
}

@Override
protected CloseableIteration<? extends Statement> getStatementsInternal(Resource subj, IRI pred,
Value obj,
boolean includeInferred, Resource... contexts) throws SailException {
return new IterationWrapper<Statement>(
super.getStatementsInternal(subj, pred, obj, includeInferred, contexts)) {
@Override
public Statement next() throws SailException {
// ensure that all elements of the statement are initialized (lazy values are resolved)
Statement stmt = super.next();
initValue(stmt.getSubject());
initValue(stmt.getPredicate());
initValue(stmt.getObject());
initValue(stmt.getContext());
return stmt;
}
};
}

/**
* Ensures that all components of the value are initialized from the underlying database.
*
* @param value The value that should be initialized
*/
protected void initValue(Value value) {
if (value instanceof LmdbValue) {
((LmdbValue) value).init();
}
}

@Override
protected void removeStatementsInternal(Resource subj, IRI pred, Value obj, Resource... contexts)
throws SailException {
Expand Down
Loading

0 comments on commit 3a9ad95

Please sign in to comment.