Skip to content

Commit

Permalink
Initial sketch of GC algorithm for values.
Browse files Browse the repository at this point in the history
  • Loading branch information
kenwenzel committed Jan 17, 2022
1 parent eec39af commit 23aeb58
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -66,6 +69,8 @@ class LmdbSailStore implements SailStore {

private boolean enableMultiThreading = true;

private Set<Long> removedIds = new HashSet<>();

/**
* A fast non-blocking circular buffer backed by an array.
*
Expand Down Expand Up @@ -125,6 +130,13 @@ class AddQuadOperation implements Operation {

@Override
public void execute() throws IOException {
if (!removedIds.isEmpty()) {
// these ids are used again
removedIds.remove(s);
removedIds.remove(p);
removedIds.remove(o);
removedIds.remove(c);
}
boolean wasNew = tripleStore.storeTriple(s, p, o, c, explicit);
if (wasNew && context != null) {
contextStore.increment(context);
Expand Down Expand Up @@ -366,6 +378,16 @@ public void prepare() throws SailException {
// serializable is not supported at this level
}

protected void handleRemovedIds() {
if (!removedIds.isEmpty()) {
// int sizeBefore = removedIds.size();
tripleStore.filterUsedIds(removedIds);
// System.out.println("removed before: " + sizeBefore + " after: " + removedIds.size());
// TODO do something with removed ids
removedIds.clear();
}
}

@Override
public void flush() throws SailException {
sinkStoreAccessLock.lock();
Expand Down Expand Up @@ -397,6 +419,9 @@ public void flush() throws SailException {
contextStore.sync();
} finally {
if (activeTxn) {
if (!multiThreadingActive) {
handleRemovedIds();
}
valueStore.commit();
if (!multiThreadingActive) {
tripleStore.commit();
Expand Down Expand Up @@ -497,6 +522,8 @@ private void startTransaction(boolean preferThreading) throws SailException {
Operation op = opQueue.remove();
if (op != null) {
if (op == END_TRANSACTION) {
handleRemovedIds();

tripleStore.commit();
nextTransactionAsync = false;
asyncTransactionFinished = true;
Expand Down Expand Up @@ -581,10 +608,17 @@ private long removeStatements(long subj, long pred, long obj, boolean explicit,
throws IOException {
long removeCount = 0;
for (long contextId : contexts) {
Map<Long, Long> result = tripleStore.removeTriplesByContext(subj, pred, obj, contextId,
explicit);
RecordIterator records = tripleStore.getTriples(subj, pred, obj, contextId, explicit);

for (Entry<Long, Long> entry : result.entrySet()) {
final Map<Long, Long> perContextCounts = new HashMap<>();
tripleStore.removeTriples(records, explicit, quad -> {
perContextCounts.merge(quad[3], 1L, (c, one) -> c + one);
for (long id : quad) {
removedIds.add(id);
}
});

for (Entry<Long, Long> entry : perContextCounts.entrySet()) {
Long entryContextId = entry.getKey();
if (entryContextId > 0) {
Resource modifiedContext = (Resource) valueStore.getValue(entryContextId);
Expand Down Expand Up @@ -740,5 +774,4 @@ public CloseableIteration<? extends Statement, SailException> getStatements(Reso
}
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static org.lwjgl.system.MemoryStack.stackPush;
import static org.lwjgl.system.MemoryUtil.NULL;
import static org.lwjgl.util.lmdb.LMDB.MDB_CREATE;
import static org.lwjgl.util.lmdb.LMDB.MDB_FIRST;
import static org.lwjgl.util.lmdb.LMDB.MDB_LAST;
import static org.lwjgl.util.lmdb.LMDB.MDB_NEXT;
import static org.lwjgl.util.lmdb.LMDB.MDB_NOMETASYNC;
Expand Down Expand Up @@ -51,14 +52,17 @@
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.function.Consumer;

import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.lmdb.TxnRef.Mode;
Expand All @@ -76,7 +80,6 @@
* IDs. Each ID represent an RDF value that is stored in a {@link ValueStore}. The four IDs refer to the statement's
* subject, predicate, object and context. The ID <tt>0</tt> is used to represent the "null" context and doesn't map to
* an actual RDF value.
*
*/
@SuppressWarnings("deprecation")
class TripleStore implements Closeable {
Expand Down Expand Up @@ -116,12 +119,10 @@ class TripleStore implements Closeable {
* </ul>
*/
private static final int SCHEME_VERSION = 1;

/*-----------*
* Variables *
*-----------*/
private static final Logger logger = LoggerFactory.getLogger(TripleStore.class);

/**
* The directory that is used to store the index files.
*/
Expand All @@ -135,7 +136,6 @@ class TripleStore implements Closeable {
*/
private final List<TripleIndex> indexes = new ArrayList<>();
private final boolean forceSync;

private long env;
private long writeTxn = 0;
private TxnRef readTxnRef;
Expand Down Expand Up @@ -442,6 +442,105 @@ private RecordIterator getTriplesUsingIndex(long subj, long pred, long obj, long
return new LmdbRecordIterator(pool, index, rangeSearch, subj, pred, obj, context, explicit, readTxnRef);
}

protected void filterUsedIds(Collection<Long> ids) {
try (MemoryStack stack = stackPush()) {
MDBVal maxKey = MDBVal.malloc(stack);
ByteBuffer maxKeyBuf = stack.malloc(TripleStore.MAX_KEY_LENGTH);
MDBVal keyData = MDBVal.malloc(stack);
ByteBuffer keyBuf = stack.malloc(TripleStore.MAX_KEY_LENGTH);

MDBVal valueData = MDBVal.mallocStack(stack);

PointerBuffer pp = stack.mallocPointer(1);

// TODO currently this does not test for contexts (component == 3)
// because in most cases context indexes do not exist
for (int component = 0; component <= 2; component++) {
int c = component;

TripleIndex index = getBestIndex(component == 0 ? 1 : -1, component == 1 ? 1 : -1,
component == 2 ? 1 : -1, component == 3 ? 1 : -1);

boolean fullScan = index.getPatternScore(component == 0 ? 1 : -1, component == 1 ? 1 : -1,
component == 2 ? 1 : -1, component == 3 ? 1 : -1) == 0;

for (boolean explicit : new boolean[] { true, false }) {
int dbi = index.getDB(explicit);

long cursor = 0;
try {
E(mdb_cursor_open(writeTxn, dbi, pp));
cursor = pp.get(0);

if (fullScan) {
long[] quad = new long[4];
int rc = mdb_cursor_get(cursor, keyData, valueData, MDB_FIRST);
while (rc == 0 && !ids.isEmpty()) {
index.keyToQuad(keyData.mv_data(), quad);
ids.remove(quad[0]);
ids.remove(quad[1]);
ids.remove(quad[2]);
ids.remove(quad[3]);

rc = mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT);
}
} else {
for (Iterator<Long> it = ids.iterator(); it.hasNext();) {
long id = it.next();
if (id < 0) {
it.remove();
continue;
}
if (component != 2 && (id & 1) == 1) {
// id is a literal and can only appear in object position
continue;
}

long subj = c == 0 ? id : -1, pred = c == 1 ? id : -1,
obj = c == 2 ? id : -1, context = c == 3 ? id : -1;

GroupMatcher matcher = index.createMatcher(subj, pred, obj, context);

maxKeyBuf.clear();
index.getMaxKey(maxKeyBuf, subj, pred, obj, context);
maxKeyBuf.flip();
maxKey.mv_data(maxKeyBuf);

keyBuf.clear();
index.getMinKey(keyBuf, subj, pred, obj, context);
keyBuf.flip();

// set cursor to min key
keyData.mv_data(keyBuf);
int rc = mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE);
boolean exists = false;
while (!exists && rc == 0) {
if (mdb_cmp(writeTxn, dbi, keyData, maxKey) > 0) {
// id was not found
break;
} else if (!matcher.matches(keyData.mv_data())) {
// value doesn't match search key/mask, fetch next value
rc = mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT);
} else {
exists = true;
}
}

if (exists) {
it.remove();
}
}
}
} finally {
if (cursor != 0) {
mdb_cursor_close(cursor);
}
}
}
}
}
}

protected double cardinality(long subj, long pred, long obj, long context) throws IOException {
TripleIndex index = getBestIndex(subj, pred, obj, context);

Expand Down Expand Up @@ -595,33 +694,14 @@ public boolean storeTriple(long subj, long pred, long obj, long context, boolean
}
}

/**
* @param subj The subject for the pattern, or <tt>-1</tt> for a wildcard.
* @param pred The predicate for the pattern, or <tt>-1</tt> for a wildcard.
* @param obj The object for the pattern, or <tt>-1</tt> for a wildcard.
* @param context The context for the pattern, or <tt>-1</tt> for a wildcard.
* @param explicit Flag indicating whether explicit or inferred statements should be removed; <tt>true</tt> removes
* explicit statements that match the pattern, <tt>false</tt> removes inferred statements that match
* the pattern.
* @return A mapping of each modified context to the number of statements removed in that context.
* @throws IOException
*/
public Map<Long, Long> removeTriplesByContext(long subj, long pred, long obj, long context, boolean explicit)
throws IOException {
RecordIterator records = getTriples(subj, pred, obj, context, explicit);
return removeTriples(records, explicit);
}

private Map<Long, Long> removeTriples(RecordIterator iter, boolean explicit) throws IOException {
final Map<Long, Long> perContextCounts = new HashMap<>();

public void removeTriples(RecordIterator it, boolean explicit, Consumer<long[]> handler) throws IOException {
try (MemoryStack stack = MemoryStack.stackPush()) {
MDBVal keyValue = MDBVal.callocStack(stack);
ByteBuffer keyBuf = stack.malloc(MAX_KEY_LENGTH);

long[] quad = new long[4];
Record record;
while ((record = iter.next()) != null) {
while ((record = it.next()) != null) {
// store key before deleting from db
record.toQuad(quad);

Expand All @@ -636,13 +716,11 @@ private Map<Long, Long> removeTriples(RecordIterator iter, boolean explicit) thr
mdb_del(writeTxn, index.getDB(explicit), keyValue, null);
}

perContextCounts.merge(quad[CONTEXT_IDX], 1L, (c, one) -> c + one);
handler.accept(quad);
}
} finally {
iter.close();
it.close();
}

return perContextCounts;
}

public void startTransaction() throws IOException {
Expand Down

0 comments on commit 23aeb58

Please sign in to comment.