diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java index 454cdfc3bd6..322e55d91be 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java @@ -12,16 +12,21 @@ import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; +import org.eclipse.rdf4j.collection.factory.mapdb.MapDbCollectionFactory; import org.eclipse.rdf4j.common.iteration.CloseableIteration; import org.eclipse.rdf4j.common.iteration.CloseableIteratorIteration; import org.eclipse.rdf4j.common.iteration.ConvertingIteration; @@ -69,6 +74,8 @@ class LmdbSailStore implements SailStore { private final boolean enableMultiThreading = true; + private final Set unusedIds; + /** * A fast non-blocking circular buffer backed by an array. * @@ -134,6 +141,13 @@ class AddQuadOperation implements Operation { @Override public void execute() throws IOException { + if (!unusedIds.isEmpty()) { + // these ids are used again + unusedIds.remove(s); + unusedIds.remove(p); + unusedIds.remove(o); + unusedIds.remove(c); + } boolean wasNew = tripleStore.storeTriple(s, p, o, c, explicit); if (wasNew && context != null) { contextStore.increment(context); @@ -168,6 +182,19 @@ abstract static class StatefulOperation implements Operation { * Creates a new {@link LmdbSailStore}. */ public LmdbSailStore(File dataDir, LmdbStoreConfig config) throws IOException, SailException { + this.unusedIds = new PersistentSet<>(dataDir) { + @Override + protected byte[] write(Long element) { + ByteBuffer bb = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.BIG_ENDIAN); + bb.putLong(element); + return bb.array(); + } + + @Override + protected Long read(ByteBuffer buffer) { + return buffer.order(ByteOrder.BIG_ENDIAN).getLong(); + } + }; boolean initialized = false; try { namespaceStore = new NamespaceStore(dataDir); @@ -415,6 +442,19 @@ public void prepare() throws SailException { // serializable is not supported at this level } + protected void filterUsedIdsInTripleStore() throws IOException { + if (!unusedIds.isEmpty()) { + tripleStore.filterUsedIds(unusedIds); + } + } + + protected void handleRemovedIdsInValueStore() throws IOException { + if (!unusedIds.isEmpty()) { + valueStore.gcIds(unusedIds); + unusedIds.clear(); + } + } + @Override public void flush() throws SailException { sinkStoreAccessLock.lock(); @@ -446,6 +486,10 @@ public void flush() throws SailException { contextStore.sync(); } finally { if (activeTxn) { + if (!multiThreadingActive) { + filterUsedIdsInTripleStore(); + } + handleRemovedIdsInValueStore(); valueStore.commit(); if (!multiThreadingActive) { tripleStore.commit(); @@ -548,6 +592,8 @@ private void startTransaction(boolean preferThreading) throws SailException { Operation op = opQueue.remove(); if (op != null) { if (op == COMMIT_TRANSACTION) { + filterUsedIdsInTripleStore(); + tripleStore.commit(); nextTransactionAsync = false; asyncTransactionFinished = true; @@ -642,9 +688,17 @@ private long removeStatements(long subj, long pred, long obj, boolean explicit, throws IOException { long removeCount = 0; for (long contextId : contexts) { - Map result = tripleStore.removeTriplesByContext(subj, pred, obj, contextId, explicit); + final Map perContextCounts = new HashMap<>(); + tripleStore.removeTriplesByContext(subj, pred, obj, contextId, explicit, quad -> { + perContextCounts.merge(quad[3], 1L, (c, one) -> c + one); + for (long id : quad) { + if (id != 0L) { + unusedIds.add(id); + } + } + }); - for (Entry entry : result.entrySet()) { + for (Entry entry : perContextCounts.entrySet()) { Long entryContextId = entry.getKey(); if (entryContextId > 0) { Resource modifiedContext = (Resource) valueStore.getValue(entryContextId); @@ -809,5 +863,4 @@ public CloseableIteration getStatements(Resource subj, IRI } } } - -} +} \ No newline at end of file diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStoreConnection.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStoreConnection.java index a8441ec3b5e..8dc3c7019ed 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStoreConnection.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStoreConnection.java @@ -11,13 +11,21 @@ package org.eclipse.rdf4j.sail.lmdb; import org.eclipse.rdf4j.common.concurrent.locks.Lock; +import org.eclipse.rdf4j.common.iteration.CloseableIteration; +import org.eclipse.rdf4j.common.iteration.IterationWrapper; import org.eclipse.rdf4j.model.IRI; import org.eclipse.rdf4j.model.Resource; +import org.eclipse.rdf4j.model.Statement; import org.eclipse.rdf4j.model.Value; +import org.eclipse.rdf4j.query.BindingSet; +import org.eclipse.rdf4j.query.Dataset; +import org.eclipse.rdf4j.query.QueryEvaluationException; +import org.eclipse.rdf4j.query.algebra.TupleExpr; import org.eclipse.rdf4j.sail.SailException; import org.eclipse.rdf4j.sail.SailReadOnlyException; import org.eclipse.rdf4j.sail.base.SailSourceConnection; import org.eclipse.rdf4j.sail.helpers.DefaultSailChangedEvent; +import org.eclipse.rdf4j.sail.lmdb.model.LmdbValue; /** * Connection to an {@link LmdbStore}. @@ -120,6 +128,52 @@ public boolean addInferredStatement(Resource subj, IRI pred, Value obj, Resource return ret; } + @Override + protected CloseableIteration evaluateInternal(TupleExpr tupleExpr, + Dataset dataset, + BindingSet bindings, boolean includeInferred) throws SailException { + // ensure that all elements of the binding set are initialized (lazy values are resolved) + return new IterationWrapper( + super.evaluateInternal(tupleExpr, dataset, bindings, includeInferred)) { + @Override + public BindingSet next() throws QueryEvaluationException { + BindingSet bs = super.next(); + bs.forEach(b -> initValue(b.getValue())); + return bs; + } + }; + } + + @Override + protected CloseableIteration getStatementsInternal(Resource subj, IRI pred, + Value obj, + boolean includeInferred, Resource... contexts) throws SailException { + return new IterationWrapper( + super.getStatementsInternal(subj, pred, obj, includeInferred, contexts)) { + @Override + public Statement next() throws SailException { + // ensure that all elements of the statement are initialized (lazy values are resolved) + Statement stmt = super.next(); + initValue(stmt.getSubject()); + initValue(stmt.getPredicate()); + initValue(stmt.getObject()); + initValue(stmt.getContext()); + return stmt; + } + }; + } + + /** + * Ensures that all components of the value are initialized from the underlying database. + * + * @param value The value that should be initialized + */ + protected void initValue(Value value) { + if (value instanceof LmdbValue) { + ((LmdbValue) value).init(); + } + } + @Override protected void removeStatementsInternal(Resource subj, IRI pred, Value obj, Resource... contexts) throws SailException { diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/PersistentSet.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/PersistentSet.java new file mode 100644 index 00000000000..401bae9424f --- /dev/null +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/PersistentSet.java @@ -0,0 +1,390 @@ +/******************************************************************************* + * Copyright (c) 2023 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.sail.lmdb; + +import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.E; +import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.openDatabase; +import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.readTransaction; +import static org.lwjgl.system.MemoryStack.stackPush; +import static org.lwjgl.system.MemoryUtil.NULL; +import static org.lwjgl.util.lmdb.LMDB.MDB_CREATE; +import static org.lwjgl.util.lmdb.LMDB.MDB_NEXT; +import static org.lwjgl.util.lmdb.LMDB.MDB_NOMETASYNC; +import static org.lwjgl.util.lmdb.LMDB.MDB_NOOVERWRITE; +import static org.lwjgl.util.lmdb.LMDB.MDB_NOSYNC; +import static org.lwjgl.util.lmdb.LMDB.MDB_NOTLS; +import static org.lwjgl.util.lmdb.LMDB.MDB_RDONLY; +import static org.lwjgl.util.lmdb.LMDB.MDB_SET; +import static org.lwjgl.util.lmdb.LMDB.MDB_SET_RANGE; +import static org.lwjgl.util.lmdb.LMDB.MDB_SUCCESS; +import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_close; +import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_get; +import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_open; +import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_renew; +import static org.lwjgl.util.lmdb.LMDB.mdb_del; +import static org.lwjgl.util.lmdb.LMDB.mdb_drop; +import static org.lwjgl.util.lmdb.LMDB.mdb_env_close; +import static org.lwjgl.util.lmdb.LMDB.mdb_env_create; +import static org.lwjgl.util.lmdb.LMDB.mdb_env_open; +import static org.lwjgl.util.lmdb.LMDB.mdb_env_set_mapsize; +import static org.lwjgl.util.lmdb.LMDB.mdb_env_set_maxdbs; +import static org.lwjgl.util.lmdb.LMDB.mdb_get; +import static org.lwjgl.util.lmdb.LMDB.mdb_put; +import static org.lwjgl.util.lmdb.LMDB.mdb_stat; +import static org.lwjgl.util.lmdb.LMDB.mdb_txn_abort; +import static org.lwjgl.util.lmdb.LMDB.mdb_txn_begin; +import static org.lwjgl.util.lmdb.LMDB.mdb_txn_commit; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.AbstractSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.locks.StampedLock; + +import javax.swing.text.ElementIterator; + +import org.apache.commons.io.FileUtils; +import org.eclipse.rdf4j.sail.lmdb.TxnManager.Mode; +import org.eclipse.rdf4j.sail.lmdb.TxnManager.Txn; +import org.eclipse.rdf4j.sail.lmdb.TxnRecordCache.Record; +import org.eclipse.rdf4j.sail.lmdb.TxnRecordCache.RecordCacheIterator; +import org.lwjgl.PointerBuffer; +import org.lwjgl.system.MemoryStack; +import org.lwjgl.util.lmdb.MDBStat; +import org.lwjgl.util.lmdb.MDBVal; + +import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream; + +/** + * A LMDB-based persistent set. + */ +class PersistentSet extends AbstractSet { + + private final Path dbDir; + private final long env; + private final int dbi; + private TxnManager txnManager; + private long writeTxn; + private PointerBuffer writeTxnPp = PointerBuffer.allocateDirect(1); + private long mapSize = 1048576; // 1 MiB + private long pageSize; + + private int size; + + public PersistentSet(File cacheDir) throws IOException { + try (MemoryStack stack = stackPush()) { + PointerBuffer pp = stack.mallocPointer(1); + E(mdb_env_create(pp)); + env = pp.get(0); + + txnManager = new TxnManager(env, Mode.ABORT); + + E(mdb_env_set_maxdbs(env, 2)); + E(mdb_env_set_mapsize(env, mapSize)); + + int flags = MDB_NOTLS | MDB_NOSYNC | MDB_NOMETASYNC; + + dbDir = Files.createTempDirectory(cacheDir.toPath(), "set"); + E(mdb_env_open(env, dbDir.toAbsolutePath().toString(), flags, 0664)); + dbi = openDatabase(env, "elements", MDB_CREATE, null); + + MDBStat stat = MDBStat.malloc(stack); + readTransaction(env, (stack2, txn) -> { + E(mdb_stat(txn, dbi, stat)); + pageSize = stat.ms_psize(); + return null; + }); + } + } + + public synchronized void close() throws IOException { + if (writeTxn != 0) { + mdb_txn_abort(writeTxn); + writeTxn = 0; + } + writeTxnPp.free(); + mdb_env_close(env); + FileUtils.deleteDirectory(dbDir.toFile()); + } + + protected synchronized void commit() throws IOException { + if (writeTxn != 0) { + E(mdb_txn_commit(writeTxn)); + writeTxn = 0; + } + } + + public synchronized void clear() { + if (writeTxn != 0) { + mdb_txn_abort(writeTxn); + writeTxn = 0; + } + try { + // start a write transaction + E(mdb_txn_begin(env, NULL, 0, writeTxnPp)); + writeTxn = writeTxnPp.get(0); + mdb_drop(writeTxn, dbi, false); + commit(); + } catch (Exception e) { + throw new RuntimeException(e); + } + size = 0; + } + + @Override + public Iterator iterator() { + try { + commit(); + } catch (Exception e) { + throw new RuntimeException(e); + } + return new ElementIterator(dbi); + } + + @Override + public int size() { + return size; + } + + public boolean add(T element) { + try { + return update(element, true); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public boolean remove(Object element) { + try { + return update(element, false); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + protected synchronized boolean update(Object element, boolean add) throws IOException { + try (MemoryStack stack = MemoryStack.stackPush()) { + if (writeTxn == 0) { + // start a write transaction + E(mdb_txn_begin(env, NULL, 0, writeTxnPp)); + writeTxn = writeTxnPp.get(0); + } + if (LmdbUtil.requiresResize(mapSize, pageSize, writeTxn, 0)) { + StampedLock lock = txnManager.lock(); + long stamp = lock.writeLock(); + try { + txnManager.deactivate(); + + // resize map + E(mdb_txn_commit(writeTxn)); + mapSize = LmdbUtil.autoGrowMapSize(mapSize, pageSize, 0); + E(mdb_env_set_mapsize(env, mapSize)); + + E(mdb_txn_begin(env, NULL, 0, writeTxnPp)); + writeTxn = writeTxnPp.get(0); + } finally { + try { + txnManager.activate(); + } finally { + lock.unlockWrite(stamp); + } + } + } + + MDBVal keyVal = MDBVal.malloc(stack); + // use calloc to get an empty data value + MDBVal dataVal = MDBVal.calloc(stack); + + byte[] data = write((T) element); + ByteBuffer keyBuf = stack.malloc(data.length); + keyBuf.put(data); + keyBuf.flip(); + keyVal.mv_data(keyBuf); + + if (add) { + if (mdb_put(writeTxn, dbi, keyVal, dataVal, MDB_NOOVERWRITE) == MDB_SUCCESS) { + size++; + return true; + } + } else { + // delete element + if (mdb_del(writeTxn, dbi, keyVal, dataVal) == MDB_SUCCESS) { + size--; + return true; + } + } + return false; + } + } + + protected byte[] write(T element) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(baos); + out.writeObject(element); + out.close(); + return baos.toByteArray(); + } + + protected T read(ByteBuffer buffer) throws IOException { + try { + return (T) new ObjectInputStream(new ByteBufferBackedInputStream(buffer)).readObject(); + } catch (ClassNotFoundException cnfe) { + throw new IOException(cnfe); + } + } + + protected class ElementIterator implements Iterator { + + private final MDBVal keyData = MDBVal.malloc(); + private final MDBVal valueData = MDBVal.malloc(); + private final long cursor; + + private final StampedLock txnLock; + private Txn txnRef; + private long txnRefVersion; + + private T next; + private T current; + + protected ElementIterator(int dbi) { + try { + this.txnRef = txnManager.createReadTxn(); + this.txnLock = txnRef.lock(); + + long stamp = txnLock.readLock(); + try { + this.txnRefVersion = txnRef.version(); + + try (MemoryStack stack = MemoryStack.stackPush()) { + PointerBuffer pp = stack.mallocPointer(1); + E(mdb_cursor_open(txnRef.get(), dbi, pp)); + cursor = pp.get(0); + } + } finally { + txnLock.unlockRead(stamp); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean hasNext() { + if (next == null && txnRef != null) { + try { + next = computeNext(); + } catch (Exception e) { + next = null; + } + if (next == null) { + close(); + } + } + return next != null; + } + + @Override + public T next() { + if (next == null) { + throw new NoSuchElementException(); + } + current = next; + next = null; + return current; + } + + public T computeNext() throws IOException { + long stamp = txnLock.readLock(); + try { + if (txnRefVersion != txnRef.version()) { + // cursor must be renewed + mdb_cursor_renew(txnRef.get(), cursor); + + try (MemoryStack stack = MemoryStack.stackPush()) { + keyData.mv_data(stack.bytes(write(current))); + if (mdb_cursor_get(cursor, keyData, valueData, MDB_SET) != 0) { + // use MDB_SET_RANGE if key was deleted + if (mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE) == 0) { + return read(keyData.mv_data()); + } + } + } + } + + if (mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT) == 0) { + return read(keyData.mv_data()); + } + close(); + return null; + } finally { + txnLock.unlockRead(stamp); + } + } + + public void close() { + if (txnRef != null) { + keyData.close(); + valueData.close(); + long stamp = txnLock.readLock(); + try { + mdb_cursor_close(cursor); + txnRef.close(); + txnRef = null; + } finally { + txnLock.unlockRead(stamp); + } + } + } + + @Override + public void remove() { + PersistentSet.this.remove(current); + } + } + + public class ByteBufferBackedInputStream extends InputStream { + + final ByteBuffer buf; + + public ByteBufferBackedInputStream(ByteBuffer buf) { + this.buf = buf; + } + + public int read() throws IOException { + if (!buf.hasRemaining()) { + return -1; + } + return buf.get() & 0xFF; + } + + public int read(byte[] bytes, int off, int len) + throws IOException { + if (!buf.hasRemaining()) { + return -1; + } + + len = Math.min(len, buf.remaining()); + buf.get(bytes, off, len); + return len; + } + } +} diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java index 775b2ccfaf3..5aa88a1df60 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java @@ -19,6 +19,7 @@ import static org.lwjgl.system.MemoryStack.stackPush; import static org.lwjgl.system.MemoryUtil.NULL; import static org.lwjgl.util.lmdb.LMDB.MDB_CREATE; +import static org.lwjgl.util.lmdb.LMDB.MDB_FIRST; import static org.lwjgl.util.lmdb.LMDB.MDB_LAST; import static org.lwjgl.util.lmdb.LMDB.MDB_NEXT; import static org.lwjgl.util.lmdb.LMDB.MDB_NOMETASYNC; @@ -39,13 +40,13 @@ import static org.lwjgl.util.lmdb.LMDB.mdb_env_open; import static org.lwjgl.util.lmdb.LMDB.mdb_env_set_mapsize; import static org.lwjgl.util.lmdb.LMDB.mdb_env_set_maxdbs; +import static org.lwjgl.util.lmdb.LMDB.mdb_env_set_maxreaders; import static org.lwjgl.util.lmdb.LMDB.mdb_get; import static org.lwjgl.util.lmdb.LMDB.mdb_put; import static org.lwjgl.util.lmdb.LMDB.mdb_stat; import static org.lwjgl.util.lmdb.LMDB.mdb_txn_abort; import static org.lwjgl.util.lmdb.LMDB.mdb_txn_begin; import static org.lwjgl.util.lmdb.LMDB.mdb_txn_commit; -import static org.lwjgl.util.lmdb.LMDB.nmdb_env_set_maxreaders; import java.io.Closeable; import java.io.File; @@ -57,15 +58,18 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.StringTokenizer; import java.util.concurrent.locks.StampedLock; +import java.util.function.Consumer; import org.eclipse.rdf4j.sail.SailException; import org.eclipse.rdf4j.sail.lmdb.TxnManager.Mode; @@ -126,12 +130,10 @@ class TripleStore implements Closeable { * */ private static final int SCHEME_VERSION = 1; - /*-----------* * Variables * *-----------*/ private static final Logger logger = LoggerFactory.getLogger(TripleStore.class); - /** * The directory that is used to store the index files. */ @@ -191,8 +193,8 @@ public int compareRegion(ByteBuffer array1, int startIdx1, ByteBuffer array2, in env = pp.get(0); } - mdb_env_set_maxdbs(env, 12); - nmdb_env_set_maxreaders(env, 256); + E(mdb_env_set_maxdbs(env, 12)); + E(mdb_env_set_maxreaders(env, 256)); // Open environment int flags = MDB_NOTLS; @@ -504,6 +506,111 @@ protected void bucketStart(double fraction, long[] lowerValues, long[] upperValu } } + /** + * Checks if any of ids is used and removes it from the collection. + * + * @param ids Collection with possibly removed IDs + * @throws IOException + */ + protected void filterUsedIds(Collection ids) throws IOException { + try (MemoryStack stack = stackPush()) { + MDBVal maxKey = MDBVal.malloc(stack); + ByteBuffer maxKeyBuf = stack.malloc(TripleStore.MAX_KEY_LENGTH); + MDBVal keyData = MDBVal.malloc(stack); + ByteBuffer keyBuf = stack.malloc(TripleStore.MAX_KEY_LENGTH); + + MDBVal valueData = MDBVal.mallocStack(stack); + + PointerBuffer pp = stack.mallocPointer(1); + + // TODO currently this does not test for contexts (component == 3) + // because in most cases context indexes do not exist + for (int component = 0; component <= 2; component++) { + int c = component; + + TripleIndex index = getBestIndex(component == 0 ? 1 : -1, component == 1 ? 1 : -1, + component == 2 ? 1 : -1, component == 3 ? 1 : -1); + + boolean fullScan = index.getPatternScore(component == 0 ? 1 : -1, component == 1 ? 1 : -1, + component == 2 ? 1 : -1, component == 3 ? 1 : -1) == 0; + + for (boolean explicit : new boolean[] { true, false }) { + int dbi = index.getDB(explicit); + + long cursor = 0; + try { + E(mdb_cursor_open(writeTxn, dbi, pp)); + cursor = pp.get(0); + + if (fullScan) { + long[] quad = new long[4]; + int rc = mdb_cursor_get(cursor, keyData, valueData, MDB_FIRST); + while (rc == 0 && !ids.isEmpty()) { + index.keyToQuad(keyData.mv_data(), quad); + ids.remove(quad[0]); + ids.remove(quad[1]); + ids.remove(quad[2]); + ids.remove(quad[3]); + + rc = mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT); + } + } else { + for (Iterator it = ids.iterator(); it.hasNext();) { + long id = it.next(); + if (id < 0) { + it.remove(); + continue; + } + if (component != 2 && (id & 1) == 1) { + // id is a literal and can only appear in object position + continue; + } + + long subj = c == 0 ? id : -1, pred = c == 1 ? id : -1, + obj = c == 2 ? id : -1, context = c == 3 ? id : -1; + + GroupMatcher matcher = index.createMatcher(subj, pred, obj, context); + + maxKeyBuf.clear(); + index.getMaxKey(maxKeyBuf, subj, pred, obj, context); + maxKeyBuf.flip(); + maxKey.mv_data(maxKeyBuf); + + keyBuf.clear(); + index.getMinKey(keyBuf, subj, pred, obj, context); + keyBuf.flip(); + + // set cursor to min key + keyData.mv_data(keyBuf); + int rc = mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE); + boolean exists = false; + while (!exists && rc == 0) { + if (mdb_cmp(writeTxn, dbi, keyData, maxKey) > 0) { + // id was not found + break; + } else if (!matcher.matches(keyData.mv_data())) { + // value doesn't match search key/mask, fetch next value + rc = mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT); + } else { + exists = true; + } + } + + if (exists) { + it.remove(); + } + } + } + } finally { + if (cursor != 0) { + mdb_cursor_close(cursor); + } + } + } + } + } + } + protected double cardinality(long subj, long pred, long obj, long context) throws IOException { TripleIndex index = getBestIndex(subj, pred, obj, context); @@ -758,24 +865,22 @@ public boolean storeTriple(long subj, long pred, long obj, long context, boolean * @param explicit Flag indicating whether explicit or inferred statements should be removed; true removes * explicit statements that match the pattern, false removes inferred statements that match * the pattern. - * @return A mapping of each modified context to the number of statements removed in that context. + * @param handler Function that gets notified about each deleted quad * @throws IOException */ - public Map removeTriplesByContext(long subj, long pred, long obj, long context, - boolean explicit) throws IOException { + public void removeTriplesByContext(long subj, long pred, long obj, long context, + boolean explicit, Consumer handler) throws IOException { RecordIterator records = getTriples(txnManager.createTxn(writeTxn), subj, pred, obj, context, explicit); - return removeTriples(records, explicit); + removeTriples(records, explicit, handler); } - private Map removeTriples(RecordIterator iter, boolean explicit) throws IOException { - final Map perContextCounts = new HashMap<>(); - - try (iter; MemoryStack stack = MemoryStack.stackPush()) { + public void removeTriples(RecordIterator it, boolean explicit, Consumer handler) throws IOException { + try (MemoryStack stack = MemoryStack.stackPush()) { MDBVal keyValue = MDBVal.callocStack(stack); ByteBuffer keyBuf = stack.malloc(MAX_KEY_LENGTH); long[] quad; - while ((quad = iter.next()) != null) { + while ((quad = it.next()) != null) { if (recordCache == null) { if (requiresResize()) { // map is full, resize required @@ -799,11 +904,11 @@ private Map removeTriples(RecordIterator iter, boolean explicit) thr E(mdb_del(writeTxn, index.getDB(explicit), keyValue, null)); } - perContextCounts.merge(quad[CONTEXT_IDX], 1L, Long::sum); + handler.accept(quad); } + } finally { + it.close(); } - - return perContextCounts; } protected void updateFromCache() throws IOException { diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/ValueStore.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/ValueStore.java index 8df0cb8fdb0..9c974e2b373 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/ValueStore.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/ValueStore.java @@ -15,6 +15,7 @@ import static org.lwjgl.system.MemoryStack.stackPush; import static org.lwjgl.system.MemoryUtil.NULL; import static org.lwjgl.util.lmdb.LMDB.MDB_CREATE; +import static org.lwjgl.util.lmdb.LMDB.MDB_FIRST; import static org.lwjgl.util.lmdb.LMDB.MDB_NEXT; import static org.lwjgl.util.lmdb.LMDB.MDB_NOMETASYNC; import static org.lwjgl.util.lmdb.LMDB.MDB_NOSYNC; @@ -23,13 +24,17 @@ import static org.lwjgl.util.lmdb.LMDB.MDB_RESERVE; import static org.lwjgl.util.lmdb.LMDB.MDB_SET_RANGE; import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_close; +import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_del; import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_get; import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_open; +import static org.lwjgl.util.lmdb.LMDB.mdb_del; import static org.lwjgl.util.lmdb.LMDB.mdb_env_close; import static org.lwjgl.util.lmdb.LMDB.mdb_env_create; import static org.lwjgl.util.lmdb.LMDB.mdb_env_info; import static org.lwjgl.util.lmdb.LMDB.mdb_env_open; import static org.lwjgl.util.lmdb.LMDB.mdb_env_set_mapsize; +import static org.lwjgl.util.lmdb.LMDB.mdb_env_set_maxdbs; +import static org.lwjgl.util.lmdb.LMDB.mdb_env_set_maxreaders; import static org.lwjgl.util.lmdb.LMDB.mdb_get; import static org.lwjgl.util.lmdb.LMDB.mdb_put; import static org.lwjgl.util.lmdb.LMDB.mdb_stat; @@ -44,11 +49,17 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; import java.util.Optional; +import java.util.Set; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.StampedLock; import java.util.zip.CRC32; +import org.eclipse.rdf4j.common.concurrent.locks.diagnostics.ConcurrentCleaner; import org.eclipse.rdf4j.common.io.ByteArrayUtil; import org.eclipse.rdf4j.model.BNode; import org.eclipse.rdf4j.model.IRI; @@ -76,6 +87,8 @@ */ class ValueStore extends AbstractValueFactory { + private static final long VALUE_EVICTION_INTERVAL = 60000; // 60 seconds + private static final byte URI_VALUE = 0x0; // 00 private static final byte LITERAL_VALUE = 0x1; // 01 @@ -98,6 +111,10 @@ class ValueStore extends AbstractValueFactory { * Used to do the actual storage of values, once they're translated to byte arrays. */ private final File dir; + /** + * Lock for clearing caches when values are removed. + */ + private final StampedLock revisionLock = new StampedLock(); /** * A simple cache containing the [VALUE_CACHE_SIZE] most-recently used values stored by their ID. */ @@ -121,10 +138,16 @@ class ValueStore extends AbstractValueFactory { private long env; private int pageSize; private long mapSize; + // main database private int dbi; + // database with unused IDs + private int unusedDbi; + // database with free IDs + private int freeDbi; private long writeTxn; private final boolean forceSync; private final boolean autoGrow; + private boolean invalidateRevisionOnCommit = false; /** * This lock is required to block transactions while auto-growing the map size. */ @@ -135,10 +158,24 @@ class ValueStore extends AbstractValueFactory { * valid. In order to be valid, the ValueStoreRevision object of a LmdbValue needs to be equal to this object. */ private volatile ValueStoreRevision revision; + /** + * A wrapper object for the revision of the value store, which is used within lazy (uninitialized values). If this + * object is GCed then it is safe to finally remove the ID-value associations and to reuse IDs. + */ + private volatile ValueStoreRevision.Lazy lazyRevision; + /** * The next ID that is associated with a stored value */ private long nextId; + private boolean freeIdsAvailable; + + private volatile long nextValueEvictionTime = 0; + + // package-protected for testing + final Set unusedRevisionIds = new HashSet<>(); + + private final ConcurrentCleaner cleaner = new ConcurrentCleaner(); ValueStore(File dir, LmdbStoreConfig config) throws IOException { this.dir = dir; @@ -192,6 +229,9 @@ private void open() throws IOException { env = pp.get(0); } + E(mdb_env_set_maxdbs(env, 6)); + E(mdb_env_set_maxreaders(env, 256)); + // Open environment int flags = MDB_NOTLS; if (!forceSync) { @@ -199,7 +239,7 @@ private void open() throws IOException { } E(mdb_env_open(env, dir.getAbsolutePath(), flags, 0664)); - // Open database + // open main database dbi = openDatabase(env, null, MDB_CREATE, null); // initialize page size and set map size for env @@ -225,9 +265,66 @@ private void open() throws IOException { } return null; }); + + // open unused IDs database + unusedDbi = openDatabase(env, "unused_ids", MDB_CREATE, null); + // open free IDs database + freeDbi = openDatabase(env, "free_ids", MDB_CREATE, null); + + // check if free IDs are available + readTransaction(env, (stack, txn) -> { + MDBStat stat = MDBStat.malloc(stack); + mdb_stat(txn, freeDbi, stat); + freeIdsAvailable = stat.ms_entries() > 0; + + mdb_stat(txn, unusedDbi, stat); + if (stat.ms_entries() > 0) { + // free unused IDs + resizeMap(txn, stat.ms_entries() * (2 + Long.BYTES)); + + writeTransaction((stack2, txn2) -> { + freeUnusedIdsAndValues(stack2, txn2, null); + return null; + }); + } + return null; + }); } - private long nextId(byte type) { + private long nextId(byte type) throws IOException { + if (freeIdsAvailable) { + // next id from store + Long reusedId = writeTransaction((stack, txn) -> { + long cursor = 0; + try { + PointerBuffer pp = stack.mallocPointer(1); + E(mdb_cursor_open(txn, freeDbi, pp)); + cursor = pp.get(0); + + MDBVal keyData = MDBVal.calloc(stack); + MDBVal valueData = MDBVal.calloc(stack); + if (mdb_cursor_get(cursor, keyData, valueData, MDB_FIRST) == 0) { + // remove lower 2 type bits + long value = data2id(keyData.mv_data()) >> 2; + // delete entry + E(mdb_cursor_del(cursor, 0)); + return value; + } + freeIdsAvailable = mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT) == 0; + return null; + } finally { + if (cursor != 0) { + mdb_cursor_close(cursor); + } + } + }); + if (reusedId != null) { + long result = reusedId; + // encode type in lower 2 bits of id + result = (result << 2) | type; + return result; + } + } long result = nextId; nextId++; // encode type in lower 2 bits of id @@ -256,10 +353,11 @@ protected long data2id(ByteBuffer bb) { * created by this value store. */ private void setNewRevision() { - revision = new ValueStoreRevision(this); + revision = new ValueStoreRevision.Default(this); + lazyRevision = new ValueStoreRevision.Lazy(revision); } - public ValueStoreRevision getRevision() { + ValueStoreRevision getRevision() { return revision; } @@ -314,29 +412,34 @@ void cacheValue(long id, LmdbValue value) { * @throws IOException If an I/O error occurred. */ public LmdbValue getLazyValue(long id) throws IOException { - // Check value cache - Long cacheID = id; - LmdbValue resultValue = cachedValue(cacheID); - - if (resultValue == null) { - switch ((byte) (id & 0x3)) { - case URI_VALUE: - resultValue = new LmdbIRI(revision, id); - break; - case LITERAL_VALUE: - resultValue = new LmdbLiteral(revision, id); - break; - case BNODE_VALUE: - resultValue = new LmdbBNode(revision, id); - break; - default: - throw new IOException("Unsupported value with type id " + (id & 0x3)); + long stamp = revisionLock.readLock(); + try { + // Check value cache + Long cacheID = id; + LmdbValue resultValue = cachedValue(cacheID); + + if (resultValue == null) { + switch ((byte) (id & 0x3)) { + case URI_VALUE: + resultValue = new LmdbIRI(lazyRevision, id); + break; + case LITERAL_VALUE: + resultValue = new LmdbLiteral(lazyRevision, id); + break; + case BNODE_VALUE: + resultValue = new LmdbBNode(lazyRevision, id); + break; + default: + throw new IOException("Unsupported value with type id " + (id & 0x3)); + } + // Store value in cache + cacheValue(cacheID, resultValue); } - // Store value in cache - cacheValue(cacheID, resultValue); - } - return resultValue; + return resultValue; + } finally { + revisionLock.unlockRead(stamp); + } } /** @@ -347,22 +450,27 @@ public LmdbValue getLazyValue(long id) throws IOException { * @throws IOException If an I/O error occurred. */ public LmdbValue getValue(long id) throws IOException { - // Check value cache - Long cacheID = id; - LmdbValue resultValue = cachedValue(cacheID); - - if (resultValue == null) { - // Value not in cache, fetch it from file - byte[] data = getData(id); - - if (data != null) { - resultValue = data2value(id, data, null); - // Store value in cache - cacheValue(cacheID, resultValue); + long stamp = revisionLock.readLock(); + try { + // Check value cache + Long cacheID = id; + LmdbValue resultValue = cachedValue(cacheID); + + if (resultValue == null) { + // Value not in cache, fetch it from file + byte[] data = getData(id); + + if (data != null) { + resultValue = data2value(id, data, null); + // Store value in cache + cacheValue(cacheID, resultValue); + } } - } - return resultValue; + return resultValue; + } finally { + revisionLock.unlockRead(stamp); + } } /** @@ -385,7 +493,7 @@ public boolean resolveValue(long id, LmdbValue value) { return false; } - private void resizeMap(long txn, int requiredSize) throws IOException { + private void resizeMap(long txn, long requiredSize) throws IOException { if (autoGrow) { if (LmdbUtil.requiresResize(mapSize, pageSize, txn, requiredSize)) { // map is full, resize @@ -418,7 +526,7 @@ private void resizeMap(long txn, int requiredSize) throws IOException { private long findId(byte[] data, boolean create) throws IOException { Long id = readTransaction(env, (stack, txn) -> { - if (data.length < MAX_KEY_SIZE) { + if (data.length <= MAX_KEY_SIZE) { MDBVal dataVal = MDBVal.calloc(stack); dataVal.mv_data(stack.bytes(data)); MDBVal idVal = MDBVal.calloc(stack); @@ -507,8 +615,7 @@ private long findId(byte[] data, boolean create) throws IOException { ByteBuffer hashIdBb = hashVal.mv_data(); hashIdBb.position(hashLength); idVal.mv_data(hashIdBb); - if (mdb_get(txn, dbi, idVal, dataVal) == 0 && - dataVal.mv_data().compareTo(dataBb) == 0) { + if (mdb_get(txn, dbi, idVal, dataVal) == 0 && dataVal.mv_data().compareTo(dataBb) == 0) { // id was found if stored value is equal to requested value return data2id(hashIdBb); } @@ -618,47 +725,217 @@ public long getId(Value value, boolean create) throws IOException { } } - // Check cache - Long cachedID = valueIDCache.get(value); + long stamp = revisionLock.readLock(); + try { + // Check cache + Long cachedID = valueIDCache.get(value); + + if (cachedID != null) { + long id = cachedID; + + if (isOwnValue) { + // Store id in value for fast access in any consecutive calls + ((LmdbValue) value).setInternalID(id, revision); + } - if (cachedID != null) { - long id = cachedID; + return id; + } - if (isOwnValue) { - // Store id in value for fast access in any consecutive calls - ((LmdbValue) value).setInternalID(id, revision); + // ID not cached, search in file + byte[] data = value2data(value, create); + if (data == null && value instanceof Literal) { + data = literal2legacy((Literal) value); } - return id; + if (data != null) { + long id = findId(data, create); + + if (id != LmdbValue.UNKNOWN_ID) { + if (isOwnValue) { + // Store id in value for fast access in any consecutive calls + ((LmdbValue) value).setInternalID(id, revision); + // Store id in cache + valueIDCache.put((LmdbValue) value, id); + } else { + // Store id in cache + LmdbValue nv = getLmdbValue(value); + nv.setInternalID(id, revision); + valueIDCache.put(nv, id); + } + } + + return id; + } + } finally { + revisionLock.unlockRead(stamp); } - // ID not cached, search in file - byte[] data = value2data(value, create); - if (data == null && value instanceof Literal) { - data = literal2legacy((Literal) value); + return LmdbValue.UNKNOWN_ID; + } + + public void gcIds(Collection ids) throws IOException { + if (!ids.isEmpty()) { + resizeMap(writeTxn, 2 * ids.size() * (1 + Long.BYTES + 2 + Long.BYTES)); + + writeTransaction((stack, writeTxn) -> { + MDBVal revIdVal = MDBVal.calloc(stack); + MDBVal dataVal = MDBVal.calloc(stack); + + ByteBuffer revIdBb = stack.malloc(1 + Long.BYTES + 2 + Long.BYTES); + Varint.writeUnsigned(revIdBb, revision.getRevisionId()); + int revLength = revIdBb.position(); + for (Long id : ids) { + revIdBb.position(revLength).limit(revIdBb.capacity()); + revIdVal.mv_data(id2data(revIdBb, id).flip()); + E(mdb_put(writeTxn, unusedDbi, revIdVal, dataVal, 0)); + } + + deleteValueToIdMappings(stack, writeTxn, ids); + + invalidateRevisionOnCommit = true; + if (nextValueEvictionTime < 0) { + nextValueEvictionTime = System.currentTimeMillis() + VALUE_EVICTION_INTERVAL; + } + return null; + }); } + } - if (data != null) { - long id = findId(data, create); + protected void deleteValueToIdMappings(MemoryStack stack, long txn, Collection ids) throws IOException { + int maxHashKeyLength = 2 + 2 * Long.BYTES + 2; + ByteBuffer hashBb = stack.malloc(maxHashKeyLength); + MDBVal idVal = MDBVal.calloc(stack); + ByteBuffer idBb = idBuffer(stack); + MDBVal hashVal = MDBVal.calloc(stack); + MDBVal dataVal = MDBVal.calloc(stack); - if (id != LmdbValue.UNKNOWN_ID) { - if (isOwnValue) { - // Store id in value for fast access in any consecutive calls - ((LmdbValue) value).setInternalID(id, revision); - // Store id in cache - valueIDCache.put((LmdbValue) value, id); - } else { - // Store id in cache - LmdbValue nv = getLmdbValue(value); - nv.setInternalID(id, revision); - valueIDCache.put(nv, id); + long valuesCursor = 0; + try { + for (Long id : ids) { + idVal.mv_data(id2data(idBb.clear(), id).flip()); + if (mdb_get(txn, dbi, idVal, dataVal) == 0) { + ByteBuffer dataBuffer = dataVal.mv_data(); + int dataLength = dataVal.mv_data().remaining(); + if (dataLength > MAX_KEY_SIZE) { + byte[] data = new byte[dataLength]; + dataVal.mv_data().get(data); + long dataHash = hash(data); + + hashBb.clear(); + hashBb.put(HASH_KEY); + Varint.writeUnsigned(hashBb, dataHash); + int hashLength = hashBb.position(); + hashBb.flip(); + + hashVal.mv_data(hashBb); + + // delete HASH -> ID association + if (mdb_del(txn, dbi, hashVal, dataVal) == 0) { + // was first entry, find a possible next entry and make it the first + hashBb.put(0, HASHID_KEY); + hashBb.rewind(); + hashVal.mv_data(hashBb); + + if (valuesCursor == 0) { + // initialize cursor + PointerBuffer pp = stack.mallocPointer(1); + E(mdb_cursor_open(txn, dbi, pp)); + valuesCursor = pp.get(0); + } + + if (mdb_cursor_get(valuesCursor, hashVal, dataVal, MDB_SET_RANGE) == 0) { + if (compareRegion(hashVal.mv_data(), 0, hashBb, 0, hashLength) == 0) { + ByteBuffer idBuffer2 = hashVal.mv_data(); + idBuffer2.position(hashLength); + idVal.mv_data(idBuffer2); + + hashVal.mv_data(hashBb); + + // HASH -> ID + E(mdb_put(txn, dbi, hashVal, idVal, 0)); + // delete existing mapping + E(mdb_cursor_del(valuesCursor, 0)); + } + } + } else { + // was not the first entry, delete HASH+ID association + hashBb.put(0, HASHID_KEY); + hashBb.limit(hashLength + idVal.mv_data().remaining()); + hashBb.position(hashLength); + hashBb.put(idVal.mv_data()); + hashBb.flip(); + + hashVal.mv_data(hashBb); + // delete HASH+ID -> [] association + mdb_del(txn, dbi, hashVal, null); + } + } else { + // delete value -> ID association + dataVal.mv_data(dataBuffer); + mdb_del(txn, dbi, dataVal, null); + } + + // does not delete ID -> value association } } - - return id; + } finally { + if (valuesCursor != 0) { + mdb_cursor_close(valuesCursor); + } } + } - return LmdbValue.UNKNOWN_ID; + protected void freeUnusedIdsAndValues(MemoryStack stack, long txn, Set revisionIds) throws IOException { + MDBVal idVal = MDBVal.calloc(stack); + MDBVal revIdVal = MDBVal.calloc(stack); + MDBVal dataVal = MDBVal.calloc(stack); + MDBVal emptyVal = MDBVal.calloc(stack); + + ByteBuffer revIdBb = stack.malloc(1 + Long.BYTES + 2 + Long.BYTES); + + boolean freeIds = false; + long unusedIdsCursor = 0; + try { + PointerBuffer pp = stack.mallocPointer(1); + E(mdb_cursor_open(txn, unusedDbi, pp)); + unusedIdsCursor = pp.get(0); + + if (revisionIds == null) { + // marker to delete all IDs + revisionIds = Collections.singleton(0L); + } + for (Long revisionId : revisionIds) { + // iterate all unused IDs for revision + revIdBb.clear(); + Varint.writeUnsigned(revIdBb, revisionId); + revIdVal.mv_data(revIdBb.flip()); + if (mdb_cursor_get(unusedIdsCursor, revIdVal, dataVal, MDB_SET_RANGE) == 0) { + do { + ByteBuffer keyBb = revIdVal.mv_data(); + long revisionOfId = Varint.readUnsigned(keyBb); + if (revisionId == 0L || revisionOfId == revisionId) { + idVal.mv_data(keyBb); + + // add id to free list + E(mdb_put(txn, freeDbi, idVal, emptyVal, 0)); + // delete id -> value association + E(mdb_del(txn, dbi, idVal, null)); + // delete id and value from unused list + E(mdb_cursor_del(unusedIdsCursor, 0)); + + freeIds = true; + } else { + break; + } + } while (mdb_cursor_get(unusedIdsCursor, revIdVal, dataVal, MDB_NEXT) == 0); + } + } + } finally { + if (unusedIdsCursor != 0) { + mdb_cursor_close(unusedIdsCursor); + } + } + this.freeIdsAvailable |= freeIds; } public void startTransaction() throws IOException { @@ -667,6 +944,21 @@ public void startTransaction() throws IOException { E(mdb_txn_begin(env, NULL, 0, pp)); writeTxn = pp.get(0); + + // delete unused IDs if required on a regular basis + // this is also run after opening the database + if (nextValueEvictionTime >= 0 && System.currentTimeMillis() >= nextValueEvictionTime) { + synchronized (unusedRevisionIds) { + MDBStat stat = MDBStat.malloc(stack); + mdb_stat(writeTxn, unusedDbi, stat); + + resizeMap(writeTxn, stat.ms_entries() * (2 + Long.BYTES)); + + freeUnusedIdsAndValues(stack, writeTxn, unusedRevisionIds); + unusedRevisionIds.clear(); + } + nextValueEvictionTime = -1; + } } } @@ -676,11 +968,32 @@ public void startTransaction() throws IOException { void endTransaction(boolean commit) throws IOException { if (writeTxn != 0) { if (commit) { - E(mdb_txn_commit(writeTxn)); + if (invalidateRevisionOnCommit) { + long stamp = revisionLock.writeLock(); + try { + E(mdb_txn_commit(writeTxn)); + long revisionId = lazyRevision.getRevisionId(); + cleaner.register(lazyRevision, () -> { + synchronized (unusedRevisionIds) { + unusedRevisionIds.add(revisionId); + } + if (nextValueEvictionTime < 0) { + nextValueEvictionTime = System.currentTimeMillis() + VALUE_EVICTION_INTERVAL; + } + }); + setNewRevision(); + clearCaches(); + } finally { + revisionLock.unlockWrite(stamp); + } + } else { + E(mdb_txn_commit(writeTxn)); + } } else { mdb_txn_abort(writeTxn); } writeTxn = 0; + invalidateRevisionOnCommit = false; } } @@ -727,14 +1040,16 @@ public void clear() throws IOException { new File(dir, "data.mdb").delete(); new File(dir, "lock.mdb").delete(); + clearCaches(); + open(); + setNewRevision(); + } + + protected void clearCaches() { Arrays.fill(valueCache, null); valueIDCache.clear(); namespaceCache.clear(); namespaceIDCache.clear(); - - open(); - - setNewRevision(); } /** @@ -1096,4 +1411,8 @@ public LmdbLiteral getLmdbLiteral(Literal l) { return new LmdbLiteral(revision, l.getLabel(), datatype, l.getCoreDatatype()); } } -} + + public void forceEvictionOfValues() { + nextValueEvictionTime = 0L; + } +} \ No newline at end of file diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/ValueStoreRevision.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/ValueStoreRevision.java index f64eeadbfa0..55703efd865 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/ValueStoreRevision.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/ValueStoreRevision.java @@ -11,6 +11,7 @@ package org.eclipse.rdf4j.sail.lmdb; import java.io.Serializable; +import java.util.Objects; import org.eclipse.rdf4j.sail.lmdb.model.LmdbValue; @@ -19,37 +20,85 @@ * LmdbValue to be valid, the revision object needs to be equal to the concerning ValueStore's revision object. The * ValueStore's revision object is changed whenever values are removed from it or IDs are changed. */ -public class ValueStoreRevision implements Serializable { +public interface ValueStoreRevision { + abstract class Base implements ValueStoreRevision { + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ValueStoreRevision)) { + return false; + } + ValueStoreRevision other = (ValueStoreRevision) o; + return getRevisionId() == other.getRevisionId() && Objects.equals(getValueStore(), other.getValueStore()); + } - /*-----------* - * Constants * - *-----------*/ + @Override + public int hashCode() { + return Objects.hash(getValueStore(), getRevisionId()); + } + } - private static final long serialVersionUID = -2434063125560285009L; + class Default extends Base implements Serializable { + private static final long serialVersionUID = -2434063125560285009L; - /*-----------* - * Variables * - *-----------*/ + private static volatile long revisionIdCounter = 0; - transient private final ValueStore valueStore; + transient private final ValueStore valueStore; - /*--------------* - * Constructors * - *--------------*/ + private final long revisionId = ++revisionIdCounter; - public ValueStoreRevision(ValueStore valueStore) { - this.valueStore = valueStore; - } + public Default(ValueStore valueStore) { + this.valueStore = valueStore; + } + + public long getRevisionId() { + return revisionId; + } - /*---------* - * Methods * - *---------*/ + public ValueStore getValueStore() { + return valueStore; + } - public ValueStore getValueStore() { - return valueStore; + public boolean resolveValue(long id, LmdbValue value) { + return valueStore.resolveValue(id, value); + } } - public boolean resolveValue(long id, LmdbValue value) { - return valueStore.resolveValue(id, value); + class Lazy extends Base implements Serializable { + private static final long serialVersionUID = -2434063125560285009L; + + private final ValueStoreRevision revision; + + public Lazy(ValueStoreRevision revision) { + this.revision = revision; + } + + @Override + public long getRevisionId() { + return revision.getRevisionId(); + } + + @Override + public ValueStore getValueStore() { + return revision.getValueStore(); + } + + @Override + public boolean resolveValue(long id, LmdbValue value) { + if (revision.resolveValue(id, value)) { + // set unwrapped version of revision + value.setInternalID(id, revision); + return true; + } + return false; + } } + + long getRevisionId(); + + ValueStore getValueStore(); + + boolean resolveValue(long id, LmdbValue value); } diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/model/LmdbBNode.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/model/LmdbBNode.java index 6d764e94c2c..eeab5e74797 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/model/LmdbBNode.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/model/LmdbBNode.java @@ -83,7 +83,7 @@ public String getID() { return super.getID(); } - protected void init() { + public void init() { if (!initialized) { synchronized (this) { if (!initialized) { diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/model/LmdbIRI.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/model/LmdbIRI.java index 0c4806be299..8bc261d44d0 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/model/LmdbIRI.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/model/LmdbIRI.java @@ -99,7 +99,7 @@ public String stringValue() { return super.stringValue(); } - protected void init() { + public void init() { if (!initialized) { synchronized (this) { if (!initialized) { diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/model/LmdbLiteral.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/model/LmdbLiteral.java index 06f3be84c6f..d6efea7435d 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/model/LmdbLiteral.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/model/LmdbLiteral.java @@ -192,7 +192,7 @@ public void setLanguage(String language) { this.language = language; } - protected void init() { + public void init() { if (!initialized) { synchronized (this) { if (!initialized) { diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/model/LmdbValue.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/model/LmdbValue.java index e617e9b66db..27cfe423baa 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/model/LmdbValue.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/model/LmdbValue.java @@ -29,6 +29,11 @@ public interface LmdbValue extends Value { */ long getInternalID(); + /** + * Initializes this value if it was a lazy value (ID-only value) before. + */ + void init(); + /** * Gets the revision of the value store that created this value. The value's internal ID is only valid when it's * value store revision is equal to the value store's current revision. diff --git a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/ValueStoreTest.java b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/ValueStoreTest.java new file mode 100644 index 00000000000..0a4c9d3c866 --- /dev/null +++ b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/ValueStoreTest.java @@ -0,0 +1,136 @@ +/******************************************************************************* + * Copyright (c) 2023 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.sail.lmdb; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; + +import org.eclipse.rdf4j.sail.lmdb.config.LmdbStoreConfig; +import org.eclipse.rdf4j.sail.lmdb.model.LmdbValue; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +/** + * Low-level tests for {@link ValueStore}. + */ +public class ValueStoreTest { + + private ValueStore valueStore; + private File dataDir; + + @BeforeEach + public void before(@TempDir File dataDir) throws Exception { + this.dataDir = dataDir; + this.valueStore = createValueStore(); + } + + private ValueStore createValueStore() throws IOException { + return new ValueStore(new File(dataDir, "values"), new LmdbStoreConfig()); + } + + @Test + public void testGcValues() throws Exception { + Random random = new Random(1337); + LmdbValue values[] = new LmdbValue[1000]; + valueStore.startTransaction(); + for (int i = 0; i < values.length; i++) { + values[i] = valueStore.createLiteral("This is a random literal:" + random.nextLong()); + valueStore.storeValue(values[i]); + } + valueStore.commit(); + + ValueStoreRevision revBefore = valueStore.getRevision(); + + valueStore.startTransaction(); + Set ids = new HashSet<>(); + for (int i = 0; i < 30; i++) { + ids.add(values[i].getInternalID()); + } + valueStore.gcIds(ids); + valueStore.commit(); + + ValueStoreRevision revAfter = valueStore.getRevision(); + + assertNotEquals("revisions must change after gc of IDs", revBefore, revAfter); + + Arrays.fill(values, null); + // GC would collect revision at some point in time + // just add revision ID to free list for this test as forcing GC is not possible + valueStore.unusedRevisionIds.add(revBefore.getRevisionId()); + + valueStore.forceEvictionOfValues(); + valueStore.startTransaction(); + valueStore.commit(); + + valueStore.startTransaction(); + for (int i = 0; i < 30; i++) { + LmdbValue value = valueStore.createLiteral("This is a random literal:" + random.nextLong()); + values[i] = value; + valueStore.storeValue(value); + // this ID should have been reused + ids.remove(value.getInternalID()); + } + valueStore.commit(); + + assertEquals("IDs should have been reused", Collections.emptySet(), ids); + } + + @Test + public void testGcValuesAfterRestart() throws Exception { + Random random = new Random(1337); + LmdbValue values[] = new LmdbValue[1000]; + valueStore.startTransaction(); + for (int i = 0; i < values.length; i++) { + values[i] = valueStore.createLiteral("This is a random literal:" + random.nextLong()); + valueStore.storeValue(values[i]); + } + valueStore.commit(); + + valueStore.startTransaction(); + Set ids = new HashSet<>(); + for (int i = 0; i < 30; i++) { + ids.add(values[i].getInternalID()); + } + valueStore.gcIds(ids); + valueStore.commit(); + + // close and recreate store + valueStore.close(); + valueStore = createValueStore(); + + valueStore.startTransaction(); + for (int i = 0; i < 30; i++) { + LmdbValue value = valueStore.createLiteral("This is a random literal:" + random.nextLong()); + values[i] = value; + valueStore.storeValue(value); + // this ID should have been reused + ids.remove(value.getInternalID()); + } + valueStore.commit(); + + assertEquals("IDs should have been reused", Collections.emptySet(), ids); + } + + @AfterEach + public void after() throws Exception { + valueStore.close(); + } +}