diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java index bc68e68797e..4ddb3c033bb 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java @@ -10,10 +10,13 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; @@ -66,6 +69,8 @@ class LmdbSailStore implements SailStore { private boolean enableMultiThreading = true; + private Set removedIds = new HashSet<>(); + /** * A fast non-blocking circular buffer backed by an array. * @@ -125,6 +130,13 @@ class AddQuadOperation implements Operation { @Override public void execute() throws IOException { + if (!removedIds.isEmpty()) { + // these ids are used again + removedIds.remove(s); + removedIds.remove(p); + removedIds.remove(o); + removedIds.remove(c); + } boolean wasNew = tripleStore.storeTriple(s, p, o, c, explicit); if (wasNew && context != null) { contextStore.increment(context); @@ -366,6 +378,16 @@ public void prepare() throws SailException { // serializable is not supported at this level } + protected void handleRemovedIds() { + if (!removedIds.isEmpty()) { + // int sizeBefore = removedIds.size(); + tripleStore.filterUsedIds(removedIds); + // System.out.println("removed before: " + sizeBefore + " after: " + removedIds.size()); + // TODO do something with removed ids + removedIds.clear(); + } + } + @Override public void flush() throws SailException { sinkStoreAccessLock.lock(); @@ -397,6 +419,9 @@ public void flush() throws SailException { contextStore.sync(); } finally { if (activeTxn) { + if (!multiThreadingActive) { + handleRemovedIds(); + } valueStore.commit(); if (!multiThreadingActive) { tripleStore.commit(); @@ -497,6 +522,8 @@ private void startTransaction(boolean preferThreading) throws SailException { Operation op = opQueue.remove(); if (op != null) { if (op == END_TRANSACTION) { + handleRemovedIds(); + tripleStore.commit(); nextTransactionAsync = false; asyncTransactionFinished = true; @@ -581,10 +608,17 @@ private long removeStatements(long subj, long pred, long obj, boolean explicit, throws IOException { long removeCount = 0; for (long contextId : contexts) { - Map result = tripleStore.removeTriplesByContext(subj, pred, obj, contextId, - explicit); + RecordIterator records = tripleStore.getTriples(subj, pred, obj, contextId, explicit); - for (Entry entry : result.entrySet()) { + final Map perContextCounts = new HashMap<>(); + tripleStore.removeTriples(records, explicit, quad -> { + perContextCounts.merge(quad[3], 1L, (c, one) -> c + one); + for (long id : quad) { + removedIds.add(id); + } + }); + + for (Entry entry : perContextCounts.entrySet()) { Long entryContextId = entry.getKey(); if (entryContextId > 0) { Resource modifiedContext = (Resource) valueStore.getValue(entryContextId); @@ -740,5 +774,4 @@ public CloseableIteration getStatements(Reso } } } - -} +} \ No newline at end of file diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java index b0650cfeb26..2594c054d96 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java @@ -15,6 +15,7 @@ import static org.lwjgl.system.MemoryStack.stackPush; import static org.lwjgl.system.MemoryUtil.NULL; import static org.lwjgl.util.lmdb.LMDB.MDB_CREATE; +import static org.lwjgl.util.lmdb.LMDB.MDB_FIRST; import static org.lwjgl.util.lmdb.LMDB.MDB_LAST; import static org.lwjgl.util.lmdb.LMDB.MDB_NEXT; import static org.lwjgl.util.lmdb.LMDB.MDB_NOMETASYNC; @@ -51,14 +52,17 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collection; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.StringTokenizer; +import java.util.function.Consumer; import org.eclipse.rdf4j.sail.SailException; import org.eclipse.rdf4j.sail.lmdb.TxnRef.Mode; @@ -76,7 +80,6 @@ * IDs. Each ID represent an RDF value that is stored in a {@link ValueStore}. The four IDs refer to the statement's * subject, predicate, object and context. The ID 0 is used to represent the "null" context and doesn't map to * an actual RDF value. - * */ @SuppressWarnings("deprecation") class TripleStore implements Closeable { @@ -116,12 +119,10 @@ class TripleStore implements Closeable { * */ private static final int SCHEME_VERSION = 1; - /*-----------* * Variables * *-----------*/ private static final Logger logger = LoggerFactory.getLogger(TripleStore.class); - /** * The directory that is used to store the index files. */ @@ -135,7 +136,6 @@ class TripleStore implements Closeable { */ private final List indexes = new ArrayList<>(); private final boolean forceSync; - private long env; private long writeTxn = 0; private TxnRef readTxnRef; @@ -442,6 +442,105 @@ private RecordIterator getTriplesUsingIndex(long subj, long pred, long obj, long return new LmdbRecordIterator(pool, index, rangeSearch, subj, pred, obj, context, explicit, readTxnRef); } + protected void filterUsedIds(Collection ids) { + try (MemoryStack stack = stackPush()) { + MDBVal maxKey = MDBVal.malloc(stack); + ByteBuffer maxKeyBuf = stack.malloc(TripleStore.MAX_KEY_LENGTH); + MDBVal keyData = MDBVal.malloc(stack); + ByteBuffer keyBuf = stack.malloc(TripleStore.MAX_KEY_LENGTH); + + MDBVal valueData = MDBVal.mallocStack(stack); + + PointerBuffer pp = stack.mallocPointer(1); + + // TODO currently this does not test for contexts (component == 3) + // because in most cases context indexes do not exist + for (int component = 0; component <= 2; component++) { + int c = component; + + TripleIndex index = getBestIndex(component == 0 ? 1 : -1, component == 1 ? 1 : -1, + component == 2 ? 1 : -1, component == 3 ? 1 : -1); + + boolean fullScan = index.getPatternScore(component == 0 ? 1 : -1, component == 1 ? 1 : -1, + component == 2 ? 1 : -1, component == 3 ? 1 : -1) == 0; + + for (boolean explicit : new boolean[] { true, false }) { + int dbi = index.getDB(explicit); + + long cursor = 0; + try { + E(mdb_cursor_open(writeTxn, dbi, pp)); + cursor = pp.get(0); + + if (fullScan) { + long[] quad = new long[4]; + int rc = mdb_cursor_get(cursor, keyData, valueData, MDB_FIRST); + while (rc == 0 && !ids.isEmpty()) { + index.keyToQuad(keyData.mv_data(), quad); + ids.remove(quad[0]); + ids.remove(quad[1]); + ids.remove(quad[2]); + ids.remove(quad[3]); + + rc = mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT); + } + } else { + for (Iterator it = ids.iterator(); it.hasNext();) { + long id = it.next(); + if (id < 0) { + it.remove(); + continue; + } + if (component != 2 && (id & 1) == 1) { + // id is a literal and can only appear in object position + continue; + } + + long subj = c == 0 ? id : -1, pred = c == 1 ? id : -1, + obj = c == 2 ? id : -1, context = c == 3 ? id : -1; + + GroupMatcher matcher = index.createMatcher(subj, pred, obj, context); + + maxKeyBuf.clear(); + index.getMaxKey(maxKeyBuf, subj, pred, obj, context); + maxKeyBuf.flip(); + maxKey.mv_data(maxKeyBuf); + + keyBuf.clear(); + index.getMinKey(keyBuf, subj, pred, obj, context); + keyBuf.flip(); + + // set cursor to min key + keyData.mv_data(keyBuf); + int rc = mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE); + boolean exists = false; + while (!exists && rc == 0) { + if (mdb_cmp(writeTxn, dbi, keyData, maxKey) > 0) { + // id was not found + break; + } else if (!matcher.matches(keyData.mv_data())) { + // value doesn't match search key/mask, fetch next value + rc = mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT); + } else { + exists = true; + } + } + + if (exists) { + it.remove(); + } + } + } + } finally { + if (cursor != 0) { + mdb_cursor_close(cursor); + } + } + } + } + } + } + protected double cardinality(long subj, long pred, long obj, long context) throws IOException { TripleIndex index = getBestIndex(subj, pred, obj, context); @@ -595,33 +694,14 @@ public boolean storeTriple(long subj, long pred, long obj, long context, boolean } } - /** - * @param subj The subject for the pattern, or -1 for a wildcard. - * @param pred The predicate for the pattern, or -1 for a wildcard. - * @param obj The object for the pattern, or -1 for a wildcard. - * @param context The context for the pattern, or -1 for a wildcard. - * @param explicit Flag indicating whether explicit or inferred statements should be removed; true removes - * explicit statements that match the pattern, false removes inferred statements that match - * the pattern. - * @return A mapping of each modified context to the number of statements removed in that context. - * @throws IOException - */ - public Map removeTriplesByContext(long subj, long pred, long obj, long context, boolean explicit) - throws IOException { - RecordIterator records = getTriples(subj, pred, obj, context, explicit); - return removeTriples(records, explicit); - } - - private Map removeTriples(RecordIterator iter, boolean explicit) throws IOException { - final Map perContextCounts = new HashMap<>(); - + public void removeTriples(RecordIterator it, boolean explicit, Consumer handler) throws IOException { try (MemoryStack stack = MemoryStack.stackPush()) { MDBVal keyValue = MDBVal.callocStack(stack); ByteBuffer keyBuf = stack.malloc(MAX_KEY_LENGTH); long[] quad = new long[4]; Record record; - while ((record = iter.next()) != null) { + while ((record = it.next()) != null) { // store key before deleting from db record.toQuad(quad); @@ -636,13 +716,11 @@ private Map removeTriples(RecordIterator iter, boolean explicit) thr mdb_del(writeTxn, index.getDB(explicit), keyValue, null); } - perContextCounts.merge(quad[CONTEXT_IDX], 1L, (c, one) -> c + one); + handler.accept(quad); } } finally { - iter.close(); + it.close(); } - - return perContextCounts; } public void startTransaction() throws IOException {