Skip to content

Commit

Permalink
Initial sketch of GC algorithm for values.
Browse files Browse the repository at this point in the history
  • Loading branch information
kenwenzel committed May 25, 2023
1 parent ef35869 commit d0c685a
Show file tree
Hide file tree
Showing 9 changed files with 780 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,21 @@

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;

import org.eclipse.rdf4j.collection.factory.mapdb.MapDbCollectionFactory;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.CloseableIteratorIteration;
import org.eclipse.rdf4j.common.iteration.ConvertingIteration;
Expand Down Expand Up @@ -69,6 +74,8 @@ class LmdbSailStore implements SailStore {

private final boolean enableMultiThreading = true;

private final Set<Long> unusedIds;

/**
* A fast non-blocking circular buffer backed by an array.
*
Expand Down Expand Up @@ -134,6 +141,13 @@ class AddQuadOperation implements Operation {

@Override
public void execute() throws IOException {
if (!unusedIds.isEmpty()) {
// these ids are used again
unusedIds.remove(s);
unusedIds.remove(p);
unusedIds.remove(o);
unusedIds.remove(c);
}
boolean wasNew = tripleStore.storeTriple(s, p, o, c, explicit);
if (wasNew && context != null) {
contextStore.increment(context);
Expand Down Expand Up @@ -168,6 +182,21 @@ abstract static class StatefulOperation implements Operation {
* Creates a new {@link LmdbSailStore}.
*/
public LmdbSailStore(File dataDir, LmdbStoreConfig config) throws IOException, SailException {
// this.unusedIds = new HashSet<>();
// this.unusedIds = new MapDbCollectionFactory(Integer.MAX_VALUE).createSet();
this.unusedIds = new PersistentSet<>(dataDir) {
@Override
protected byte[] write(Long element) {
ByteBuffer bb = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.BIG_ENDIAN);
bb.putLong(element);
return bb.array();
}

@Override
protected Long read(ByteBuffer buffer) {
return buffer.order(ByteOrder.BIG_ENDIAN).getLong();
}
};
boolean initialized = false;
try {
namespaceStore = new NamespaceStore(dataDir);
Expand Down Expand Up @@ -415,6 +444,19 @@ public void prepare() throws SailException {
// serializable is not supported at this level
}

protected void filterUsedIdsInTripleStore() throws IOException {
if (!unusedIds.isEmpty()) {
tripleStore.filterUsedIds(unusedIds);
}
}

protected void handleRemovedIdsInValueStore() throws IOException {
if (!unusedIds.isEmpty()) {
valueStore.gcIds(unusedIds);
unusedIds.clear();
}
}

@Override
public void flush() throws SailException {
sinkStoreAccessLock.lock();
Expand Down Expand Up @@ -446,6 +488,10 @@ public void flush() throws SailException {
contextStore.sync();
} finally {
if (activeTxn) {
if (!multiThreadingActive) {
filterUsedIdsInTripleStore();
}
handleRemovedIdsInValueStore();
valueStore.commit();
if (!multiThreadingActive) {
tripleStore.commit();
Expand Down Expand Up @@ -548,6 +594,8 @@ private void startTransaction(boolean preferThreading) throws SailException {
Operation op = opQueue.remove();
if (op != null) {
if (op == COMMIT_TRANSACTION) {
filterUsedIdsInTripleStore();

tripleStore.commit();
nextTransactionAsync = false;
asyncTransactionFinished = true;
Expand Down Expand Up @@ -642,9 +690,17 @@ private long removeStatements(long subj, long pred, long obj, boolean explicit,
throws IOException {
long removeCount = 0;
for (long contextId : contexts) {
Map<Long, Long> result = tripleStore.removeTriplesByContext(subj, pred, obj, contextId, explicit);
final Map<Long, Long> perContextCounts = new HashMap<>();
tripleStore.removeTriplesByContext(subj, pred, obj, contextId, explicit, quad -> {
perContextCounts.merge(quad[3], 1L, (c, one) -> c + one);
for (long id : quad) {
if (id != 0L) {
unusedIds.add(id);
}
}
});

for (Entry<Long, Long> entry : result.entrySet()) {
for (Entry<Long, Long> entry : perContextCounts.entrySet()) {
Long entryContextId = entry.getKey();
if (entryContextId > 0) {
Resource modifiedContext = (Resource) valueStore.getValue(entryContextId);
Expand Down Expand Up @@ -809,5 +865,4 @@ public CloseableIteration<? extends Statement, SailException> getStatements(Reso
}
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,21 @@
import java.io.IOException;

import org.eclipse.rdf4j.common.concurrent.locks.Lock;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.IterationWrapper;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.Dataset;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.algebra.TupleExpr;
import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.SailReadOnlyException;
import org.eclipse.rdf4j.sail.base.SailSourceConnection;
import org.eclipse.rdf4j.sail.helpers.DefaultSailChangedEvent;
import org.eclipse.rdf4j.sail.lmdb.model.LmdbValue;

/**
* Connection to an {@link LmdbStore}.
Expand Down Expand Up @@ -123,6 +131,52 @@ public boolean addInferredStatement(Resource subj, IRI pred, Value obj, Resource
return ret;
}

@Override
protected CloseableIteration<? extends BindingSet, QueryEvaluationException> evaluateInternal(TupleExpr tupleExpr,
Dataset dataset,
BindingSet bindings, boolean includeInferred) throws SailException {
// ensure that all elements of the binding set are initialized (lazy values are resolved)
return new IterationWrapper<BindingSet, QueryEvaluationException>(
super.evaluateInternal(tupleExpr, dataset, bindings, includeInferred)) {
@Override
public BindingSet next() throws QueryEvaluationException {
BindingSet bs = super.next();
bs.forEach(b -> initValue(b.getValue()));
return bs;
}
};
}

@Override
protected CloseableIteration<? extends Statement, SailException> getStatementsInternal(Resource subj, IRI pred,
Value obj,
boolean includeInferred, Resource... contexts) throws SailException {
return new IterationWrapper<Statement, SailException>(
super.getStatementsInternal(subj, pred, obj, includeInferred, contexts)) {
@Override
public Statement next() throws SailException {
// ensure that all elements of the statement are initialized (lazy values are resolved)
Statement stmt = super.next();
initValue(stmt.getSubject());
initValue(stmt.getPredicate());
initValue(stmt.getObject());
initValue(stmt.getContext());
return stmt;
}
};
}

/**
* Ensures that all components of the value are initialized from the underlying database.
*
* @param value The value that should be initialized
*/
protected void initValue(Value value) {
if (value instanceof LmdbValue) {
((LmdbValue) value).init();
}
}

@Override
protected void removeStatementsInternal(Resource subj, IRI pred, Value obj, Resource... contexts)
throws SailException {
Expand Down
Loading

0 comments on commit d0c685a

Please sign in to comment.