Skip to content

Commit

Permalink
Initial sketch of GC algorithm for values.
Browse files Browse the repository at this point in the history
  • Loading branch information
kenwenzel committed Jan 7, 2022
1 parent 0cc3546 commit f873609
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;

Expand Down Expand Up @@ -285,6 +288,7 @@ public LmdbSailDataset dataset(IsolationLevel level) throws SailException {

private final class LmdbSailSink implements SailSink {

private Set<Long> removedIds = new HashSet<>();
private final boolean explicit;

public LmdbSailSink(boolean explicit) throws SailException {
Expand Down Expand Up @@ -312,6 +316,13 @@ public synchronized void flush() throws SailException {
contextStore.sync();
} finally {
if (storeTxnStarted.get()) {
if (!removedIds.isEmpty()) {
// int sizeBefore = removedIds.size();
tripleStore.filterUsedIds(removedIds);
// System.out.println("removed before: " + sizeBefore + " after: " + removedIds.size());
// TODO do something with removed ids
removedIds.clear();
}
tripleStore.commit();
valueStore.commit();
// do not set flag to false until _after_ commit is successfully completed.
Expand Down Expand Up @@ -492,10 +503,17 @@ private long removeStatements(Resource subj, IRI pred, Value obj, boolean explic

long removeCount = 0;
for (long contextId : contextIds) {
Map<Long, Long> result = tripleStore.removeTriplesByContext(subjID, predID, objID, contextId,
explicit);
RecordIterator records = tripleStore.getTriples(subjID, predID, objID, contextId, explicit);

for (Entry<Long, Long> entry : result.entrySet()) {
final Map<Long, Long> perContextCounts = new HashMap<>();
tripleStore.removeTriples(records, explicit, quad -> {
perContextCounts.merge(quad[3], 1L, (c, one) -> c + one);
for (long id : quad) {
removedIds.add(id);
}
});

for (Entry<Long, Long> entry : perContextCounts.entrySet()) {
Long entryContextId = entry.getKey();
if (entryContextId > 0) {
Resource modifiedContext = (Resource) valueStore.getValue(entryContextId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,17 @@
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.function.Consumer;

import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.lmdb.TxnRef.Mode;
Expand All @@ -76,7 +79,6 @@
* IDs. Each ID represent an RDF value that is stored in a {@link ValueStore}. The four IDs refer to the statement's
* subject, predicate, object and context. The ID <tt>0</tt> is used to represent the "null" context and doesn't map to
* an actual RDF value.
*
*/
@SuppressWarnings("deprecation")
class TripleStore implements Closeable {
Expand Down Expand Up @@ -116,12 +118,10 @@ class TripleStore implements Closeable {
* </ul>
*/
private static final int SCHEME_VERSION = 1;

/*-----------*
* Variables *
*-----------*/
private static final Logger logger = LoggerFactory.getLogger(TripleStore.class);

/**
* The directory that is used to store the index files.
*/
Expand All @@ -135,7 +135,6 @@ class TripleStore implements Closeable {
*/
private final List<TripleIndex> indexes = new ArrayList<>();
private final boolean forceSync;

private long env;
private long writeTxn = 0;
private TxnRef readTxnRef;
Expand Down Expand Up @@ -442,6 +441,87 @@ private RecordIterator getTriplesUsingIndex(long subj, long pred, long obj, long
return new LmdbRecordIterator(pool, index, rangeSearch, subj, pred, obj, context, explicit, readTxnRef);
}

protected void filterUsedIds(Collection<Long> ids) {
try (MemoryStack stack = stackPush()) {
MDBVal maxKey = MDBVal.malloc(stack);
ByteBuffer maxKeyBuf = stack.malloc(TripleStore.MAX_KEY_LENGTH);
MDBVal keyData = MDBVal.malloc(stack);
ByteBuffer keyBuf = stack.malloc(TripleStore.MAX_KEY_LENGTH);

MDBVal valueData = MDBVal.mallocStack(stack);

PointerBuffer pp = stack.mallocPointer(1);

// TODO currently this does not test for contextes (component == 3)
// because in most cases context indexes do not exist
for (int component = 0; component <= 2; component++) {
int c = component;

TripleIndex index = getBestIndex(component == 0 ? 1 : -1, component == 1 ? 1 : -1,
component == 2 ? 1 : -1, component == 3 ? 1 : -1);
for (boolean explicit : new boolean[] { true, false }) {
int dbi = index.getDB(explicit);

long cursor = 0;
try {
E(mdb_cursor_open(writeTxn, dbi, pp));
cursor = pp.get(0);

for (Iterator<Long> it = ids.iterator(); it.hasNext();) {
long id = it.next();
if (id < 0) {
it.remove();
continue;
}
if (component != 2 && (id & 1) == 1) {
// id is a literal and can only appear in object position
continue;
}

long subj = c == 0 ? id : -1, pred = c == 1 ? id : -1,
obj = c == 2 ? id : -1, context = c == 3 ? id : -1;

GroupMatcher matcher = index.createMatcher(subj, pred, obj, context);

maxKeyBuf.clear();
index.getMaxKey(maxKeyBuf, subj, pred, obj, context);
maxKeyBuf.flip();
maxKey.mv_data(maxKeyBuf);

keyBuf.clear();
index.getMinKey(keyBuf, subj, pred, obj, context);
keyBuf.flip();

// set cursor to min key
keyData.mv_data(keyBuf);
int rc = mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE);
boolean exists = false;
while (!exists && rc == 0) {
if (mdb_cmp(writeTxn, dbi, keyData, maxKey) > 0) {
// id was not found
break;
} else if (!matcher.matches(keyData.mv_data())) {
// value doesn't match search key/mask, fetch next value
rc = mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT);
} else {
exists = true;
}
}

if (exists) {
it.remove();
}
}
} finally {
if (cursor != 0) {
mdb_cursor_close(cursor);
}
}
}
}
}
}

protected double cardinality(long subj, long pred, long obj, long context) throws IOException {
TripleIndex index = getBestIndex(subj, pred, obj, context);

Expand Down Expand Up @@ -595,33 +675,14 @@ public boolean storeTriple(long subj, long pred, long obj, long context, boolean
}
}

/**
* @param subj The subject for the pattern, or <tt>-1</tt> for a wildcard.
* @param pred The predicate for the pattern, or <tt>-1</tt> for a wildcard.
* @param obj The object for the pattern, or <tt>-1</tt> for a wildcard.
* @param context The context for the pattern, or <tt>-1</tt> for a wildcard.
* @param explicit Flag indicating whether explicit or inferred statements should be removed; <tt>true</tt> removes
* explicit statements that match the pattern, <tt>false</tt> removes inferred statements that match
* the pattern.
* @return A mapping of each modified context to the number of statements removed in that context.
* @throws IOException
*/
public Map<Long, Long> removeTriplesByContext(long subj, long pred, long obj, long context, boolean explicit)
throws IOException {
RecordIterator records = getTriples(subj, pred, obj, context, explicit);
return removeTriples(records, explicit);
}

private Map<Long, Long> removeTriples(RecordIterator iter, boolean explicit) throws IOException {
final Map<Long, Long> perContextCounts = new HashMap<>();

public void removeTriples(RecordIterator it, boolean explicit, Consumer<long[]> handler) throws IOException {
try (MemoryStack stack = MemoryStack.stackPush()) {
MDBVal keyValue = MDBVal.callocStack(stack);
ByteBuffer keyBuf = stack.malloc(MAX_KEY_LENGTH);

long[] quad = new long[4];
Record record;
while ((record = iter.next()) != null) {
while ((record = it.next()) != null) {
// store key before deleting from db
record.toQuad(quad);

Expand All @@ -636,13 +697,11 @@ private Map<Long, Long> removeTriples(RecordIterator iter, boolean explicit) thr
mdb_del(writeTxn, index.getDB(explicit), keyValue, null);
}

perContextCounts.merge(quad[CONTEXT_IDX], 1L, (c, one) -> c + one);
handler.accept(quad);
}
} finally {
iter.close();
it.close();
}

return perContextCounts;
}

public void startTransaction() throws IOException {
Expand Down

0 comments on commit f873609

Please sign in to comment.