diff --git a/core/model/src/main/java/org/eclipse/rdf4j/model/impl/AbstractMemoryOverflowModel.java b/core/model/src/main/java/org/eclipse/rdf4j/model/impl/AbstractMemoryOverflowModel.java new file mode 100644 index 0000000000..5c5002d96d --- /dev/null +++ b/core/model/src/main/java/org/eclipse/rdf4j/model/impl/AbstractMemoryOverflowModel.java @@ -0,0 +1,438 @@ +/******************************************************************************* + * Copyright (c) 2024 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + ******************************************************************************/ + +package org.eclipse.rdf4j.model.impl; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.locks.ReentrantLock; + +import javax.management.NotificationEmitter; +import javax.management.openmbean.CompositeData; + +import org.eclipse.rdf4j.common.annotation.InternalUseOnly; +import org.eclipse.rdf4j.model.IRI; +import org.eclipse.rdf4j.model.Model; +import org.eclipse.rdf4j.model.Namespace; +import org.eclipse.rdf4j.model.Resource; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.model.Value; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.sun.management.GarbageCollectionNotificationInfo; +import com.sun.management.GcInfo; + +@InternalUseOnly +public abstract class AbstractMemoryOverflowModel extends AbstractModel { + + private static final long serialVersionUID = 4119844228099208169L; + + private static final Runtime RUNTIME = Runtime.getRuntime(); + + /** + * The default batch size is 1024. This is the number of statements that will be written to disk at a time. + */ + @SuppressWarnings("StaticNonFinalField") + public static int BATCH_SIZE = 1024; + + /** + * ms GC activity over the past second that triggers overflow to disk + */ + @SuppressWarnings("StaticNonFinalField") + public static int MEMORY_THRESHOLD_HIGH = 300; + + /** + * ms GC activity over the past second that disables overflow to disk + */ + @SuppressWarnings("StaticNonFinalField") + public static int MEMORY_THRESHOLD_MEDIUM = 200; + + /** + * ms GC activity over the past second that skips overflow to disk in anticipation of GC freeing up memory + */ + @SuppressWarnings("StaticNonFinalField") + public static int MEMORY_THRESHOLD_LOW = 100; + + private static volatile boolean overflow; + + // To reduce the chance of OOM we will always overflow once we get close to running out of memory even if we think + // we have space for one more block. The limit is currently set at 32 MB for small heaps and 128 MB for large heaps. + /** + * The minimum amount of free memory before overflowing to disk. Defaults to 32 MB for heaps smaller than 1 GB and + * 128 MB for heaps larger than 1 GB. + */ + @SuppressWarnings("StaticNonFinalField") + public static int MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING = RUNTIME.maxMemory() >= 1024 * 1024 * 1024 ? 128 : 32; + + static final Logger logger = LoggerFactory.getLogger(AbstractMemoryOverflowModel.class); + + private volatile LinkedHashModel memory; + + protected transient volatile T disk; + + private final SimpleValueFactory vf = SimpleValueFactory.getInstance(); + + private static final int[] GC_LOAD = new int[10]; + private static int prevBucket; + + private static volatile boolean highGcLoad = false; + private static final Queue gcInfos = new ConcurrentLinkedQueue<>(); + + // if we are in a low memory situation and the GC load is low we will not overflow to disk size it is likely that + // the GC will be able to free up enough memory + private static volatile boolean lowMemLowGcSum = false; + + private final ReentrantLock lock = new ReentrantLock(); + + static { + List gcBeans = ManagementFactory.getGarbageCollectorMXBeans(); + for (GarbageCollectorMXBean gcBean : gcBeans) { + NotificationEmitter emitter = (NotificationEmitter) gcBean; + emitter.addNotificationListener((notification, o) -> { + + long currentBucket = (System.currentTimeMillis() / 100) % 10; + while (currentBucket != prevBucket) { + prevBucket = (prevBucket + 1) % 10; + GC_LOAD[prevBucket] = 0; + } + + while (true) { + GcInfo poll = gcInfos.poll(); + if (poll == null) { + break; + } + GC_LOAD[(int) currentBucket] += (int) poll.getDuration(); + } + + // extract garbage collection information from notification. + GarbageCollectionNotificationInfo gcNotificationInfo = GarbageCollectionNotificationInfo + .from((CompositeData) notification.getUserData()); + GcInfo gcInfo = gcNotificationInfo.getGcInfo(); + gcInfos.add(gcInfo); + long gcSum = 0; + long gcMax = 0; + for (int i : GC_LOAD) { + gcSum += i; + if (i > gcMax) { + gcMax = i; + } + } + + if (gcSum < gcMax * 1.3) { + gcSum -= (gcMax / 2); + } + + double v = mbFree(); + if (!highGcLoad && !lowMemLowGcSum && v < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING + && gcSum < MEMORY_THRESHOLD_LOW) { + lowMemLowGcSum = true; + return; + } + + lowMemLowGcSum = false; + + if (!highGcLoad && v < 256 + && (gcSum > MEMORY_THRESHOLD_HIGH || v < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING)) { + logger.debug("High GC load detected. Free memory: {} MB, GC sum: {} ms in past 1000 ms", v, gcSum); + highGcLoad = true; + } else if ((v > 256 || gcSum < MEMORY_THRESHOLD_MEDIUM) && highGcLoad + && v > MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING) { + logger.debug("GC load back to normal. Free memory: {} MB, GC sum: {} ms in past 1000 ms", v, gcSum); + highGcLoad = false; + } + + }, null, null); + } + } + + private static double mbFree() { + // maximum heap size the JVM can allocate + long maxMemory = RUNTIME.maxMemory(); + + // total currently allocated JVM memory + long totalMemory = RUNTIME.totalMemory(); + + // amount of memory free in the currently allocated JVM memory + long freeMemory = RUNTIME.freeMemory(); + + // estimated memory used + long used = totalMemory - freeMemory; + + // amount of memory the JVM can still allocate from the OS (upper boundary is the max heap) + long freeToAllocateMemory = maxMemory - used; + + return (freeToAllocateMemory / 1024.0 / 1024.0); + + } + + private volatile boolean closed; + + public AbstractMemoryOverflowModel() { + memory = new LinkedHashModel(); + } + + public AbstractMemoryOverflowModel(Set namespaces) { + memory = new LinkedHashModel(namespaces, 0); + } + + @Override + public synchronized Set getNamespaces() { + return memory.getNamespaces(); + } + + @Override + public synchronized Optional getNamespace(String prefix) { + return memory.getNamespace(prefix); + } + + @Override + public synchronized Namespace setNamespace(String prefix, String name) { + return memory.setNamespace(prefix, name); + } + + @Override + public void setNamespace(Namespace namespace) { + memory.setNamespace(namespace); + } + + @Override + public synchronized Optional removeNamespace(String prefix) { + return memory.removeNamespace(prefix); + } + + @Override + public boolean contains(Resource subj, IRI pred, Value obj, Resource... contexts) { + return getDelegate().contains(subj, pred, obj, contexts); + } + + @Override + public boolean add(Resource subj, IRI pred, Value obj, Resource... contexts) { + checkMemoryOverflow(); + return getDelegate().add(subj, pred, obj, contexts); + } + + @Override + public boolean add(Statement st) { + checkMemoryOverflow(); + return getDelegate().add(st); + } + + @Override + public boolean addAll(Collection c) { + checkMemoryOverflow(); + if (disk != null || c.size() <= BATCH_SIZE) { + return getDelegate().addAll(c); + } else { + boolean ret = false; + HashSet buffer = new HashSet<>(); + for (Statement st : c) { + buffer.add(st); + if (buffer.size() >= BATCH_SIZE) { + ret |= getDelegate().addAll(buffer); + buffer.clear(); + checkMemoryOverflow(); + } + } + if (!buffer.isEmpty()) { + ret |= getDelegate().addAll(buffer); + buffer.clear(); + } + + return ret; + + } + + } + + @Override + public boolean remove(Resource subj, IRI pred, Value obj, Resource... contexts) { + return getDelegate().remove(subj, pred, obj, contexts); + } + + @Override + public int size() { + return getDelegate().size(); + } + + @Override + public Iterator iterator() { + return getDelegate().iterator(); + } + + @Override + public boolean clear(Resource... contexts) { + return getDelegate().clear(contexts); + } + + @Override + public Model filter(final Resource subj, final IRI pred, final Value obj, final Resource... contexts) { + return new FilteredModel(this, subj, pred, obj, contexts) { + + private static final long serialVersionUID = -475666402618133101L; + + @Override + public int size() { + return getDelegate().filter(subj, pred, obj, contexts).size(); + } + + @Override + public Iterator iterator() { + return getDelegate().filter(subj, pred, obj, contexts).iterator(); + } + + @Override + protected void removeFilteredTermIteration(Iterator iter, Resource subj, IRI pred, Value obj, + Resource... contexts) { + AbstractMemoryOverflowModel.this.removeTermIteration(iter, subj, pred, obj, contexts); + } + }; + } + + @Override + public synchronized void removeTermIteration(Iterator iter, Resource subj, IRI pred, Value obj, + Resource... contexts) { + if (disk == null) { + memory.removeTermIteration(iter, subj, pred, obj, contexts); + } else { + disk.removeTermIteration(iter, subj, pred, obj, contexts); + } + } + + private Model getDelegate() { + var memory = this.memory; + if (memory != null) { + return memory; + } else { + var disk = this.disk; + if (disk != null) { + return disk; + } + + try { + lock.lockInterruptibly(); + try { + if (this.memory != null) { + return this.memory; + } + if (this.disk != null) { + return this.disk; + } + if (closed) { + throw new IllegalStateException("MemoryOverflowModel is closed"); + } + throw new IllegalStateException("MemoryOverflowModel is in an inconsistent state"); + } finally { + lock.unlock(); + } + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + + } + } + + private void writeObject(ObjectOutputStream s) throws IOException { + // Write out any hidden serialization magic + s.defaultWriteObject(); + // Write in size + Model delegate = getDelegate(); + s.writeInt(delegate.size()); + // Write in all elements + for (Statement st : delegate) { + Resource subj = st.getSubject(); + IRI pred = st.getPredicate(); + Value obj = st.getObject(); + Resource ctx = st.getContext(); + s.writeObject(vf.createStatement(subj, pred, obj, ctx)); + } + } + + private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException { + // Read in any hidden serialization magic + s.defaultReadObject(); + // Read in size + int size = s.readInt(); + // Read in all elements + for (int i = 0; i < size; i++) { + add((Statement) s.readObject()); + } + } + + private synchronized void checkMemoryOverflow() { + try { + lock.lockInterruptibly(); + try { + + if (disk == getDelegate()) { + return; + } + + if (overflow || highGcLoad) { + logger.debug("Syncing triples to disk due to gc load"); + overflowToDisk(); + if (!highGcLoad) { + overflow = false; + } + } + } finally { + lock.unlock(); + } + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + + } + + private synchronized void overflowToDisk() { + + try { + lock.lockInterruptibly(); + try { + overflow = true; + if (memory == null) { + assert disk != null; + return; + } + + LinkedHashModel memory = this.memory; + this.memory = null; + overflowToDiskInner(memory); + + logger.debug("overflow synced to disk"); + System.gc(); + } finally { + lock.unlock(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + + } + + protected abstract void overflowToDiskInner(Model memory); +} diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStore.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStore.java index 51927d83b9..f4f41b58fb 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStore.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStore.java @@ -251,7 +251,7 @@ protected void initializeInternal() throws SailException { FileUtils.writeStringToFile(versionFile, VERSION, StandardCharsets.UTF_8); } backingStore = new LmdbSailStore(dataDir, config); - this.store = new SnapshotSailStore(backingStore, () -> new MemoryOverflowModel() { + this.store = new SnapshotSailStore(backingStore, () -> new MemoryOverflowModel(false) { @Override protected LmdbSailStore createSailStore(File dataDir) throws IOException, SailException { // Model can't fit into memory, use another LmdbSailStore to store delta diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/MemoryOverflowModel.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/MemoryOverflowModel.java index e9e8fd7e30..861a12233b 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/MemoryOverflowModel.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/MemoryOverflowModel.java @@ -13,26 +13,12 @@ import java.io.File; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.nio.file.Files; -import java.util.Collection; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Optional; -import java.util.Set; import org.eclipse.rdf4j.common.io.FileUtil; -import org.eclipse.rdf4j.model.IRI; import org.eclipse.rdf4j.model.Model; -import org.eclipse.rdf4j.model.Namespace; -import org.eclipse.rdf4j.model.Resource; -import org.eclipse.rdf4j.model.Statement; -import org.eclipse.rdf4j.model.Value; -import org.eclipse.rdf4j.model.impl.AbstractModel; -import org.eclipse.rdf4j.model.impl.FilteredModel; +import org.eclipse.rdf4j.model.impl.AbstractMemoryOverflowModel; import org.eclipse.rdf4j.model.impl.LinkedHashModel; -import org.eclipse.rdf4j.model.impl.SimpleValueFactory; import org.eclipse.rdf4j.sail.SailException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,309 +28,25 @@ * estimated memory usage is more than the amount of free memory available. Once the threshold is cross this * implementation seamlessly changes to a disk based {@link SailSourceModel}. */ -abstract class MemoryOverflowModel extends AbstractModel { - - private static final long serialVersionUID = 4119844228099208169L; - - private static final Runtime RUNTIME = Runtime.getRuntime(); - - private static final int LARGE_BLOCK = 5 * 1024; - - private static volatile boolean overflow; - - // To reduce the chance of OOM we will always overflow once we get close to running out of memory even if we think - // we have space for one more block. The limit is currently set at 32 MB for small heaps and 128 MB for large heaps. - private static final int MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING = RUNTIME.maxMemory() >= 1024 ? 128 * 1024 * 1024 - : 32 * 1024 * 1024; +abstract class MemoryOverflowModel extends AbstractMemoryOverflowModel { final Logger logger = LoggerFactory.getLogger(MemoryOverflowModel.class); - - private volatile LinkedHashModel memory; + private final boolean verifyAdditions; private transient File dataDir; private transient LmdbSailStore store; - private transient volatile SailSourceModel disk; - - private long baseline = 0; - - private long maxBlockSize = 0; - - SimpleValueFactory vf = SimpleValueFactory.getInstance(); - - public MemoryOverflowModel() { - memory = new LinkedHashModel(LARGE_BLOCK * 2); - } - - public MemoryOverflowModel(Model model) { - this(model.getNamespaces()); - addAll(model); - } - - public MemoryOverflowModel(Set namespaces, Collection c) { - this(namespaces); - addAll(c); - } - - public MemoryOverflowModel(Set namespaces) { - memory = new LinkedHashModel(namespaces, LARGE_BLOCK); - } - - @Override - public synchronized void closeIterator(Iterator iter) { - super.closeIterator(iter); - if (disk != null) { - disk.closeIterator(iter); - } - } - - @Override - public synchronized Set getNamespaces() { - return memory.getNamespaces(); - } - - @Override - public synchronized Optional getNamespace(String prefix) { - return memory.getNamespace(prefix); - } - - @Override - public synchronized Namespace setNamespace(String prefix, String name) { - return memory.setNamespace(prefix, name); - } - - @Override - public void setNamespace(Namespace namespace) { - memory.setNamespace(namespace); - } - - @Override - public synchronized Optional removeNamespace(String prefix) { - return memory.removeNamespace(prefix); - } - - @Override - public boolean contains(Resource subj, IRI pred, Value obj, Resource... contexts) { - return getDelegate().contains(subj, pred, obj, contexts); - } - - @Override - public boolean add(Resource subj, IRI pred, Value obj, Resource... contexts) { - checkMemoryOverflow(); - return getDelegate().add(subj, pred, obj, contexts); - } - - @Override - public boolean add(Statement st) { - checkMemoryOverflow(); - return getDelegate().add(st); - } - - @Override - public boolean addAll(Collection c) { - checkMemoryOverflow(); - if (disk != null || c.size() <= 1024) { - return getDelegate().addAll(c); - } else { - boolean ret = false; - HashSet buffer = new HashSet<>(); - for (Statement st : c) { - buffer.add(st); - if (buffer.size() >= 1024) { - ret |= getDelegate().addAll(buffer); - buffer.clear(); - innerCheckMemoryOverflow(); - } - } - if (!buffer.isEmpty()) { - ret |= getDelegate().addAll(buffer); - buffer.clear(); - } - - return ret; - - } - - } - - @Override - public boolean remove(Resource subj, IRI pred, Value obj, Resource... contexts) { - return getDelegate().remove(subj, pred, obj, contexts); - } - - @Override - public int size() { - return getDelegate().size(); - } - - @Override - public Iterator iterator() { - return getDelegate().iterator(); - } - - @Override - public boolean clear(Resource... contexts) { - return getDelegate().clear(contexts); - } - - @Override - public Model filter(final Resource subj, final IRI pred, final Value obj, final Resource... contexts) { - return new FilteredModel(this, subj, pred, obj, contexts) { - - private static final long serialVersionUID = -475666402618133101L; - - @Override - public int size() { - return getDelegate().filter(subj, pred, obj, contexts).size(); - } - - @Override - public Iterator iterator() { - return getDelegate().filter(subj, pred, obj, contexts).iterator(); - } - - @Override - protected void removeFilteredTermIteration(Iterator iter, Resource subj, IRI pred, Value obj, - Resource... contexts) { - MemoryOverflowModel.this.removeTermIteration(iter, subj, pred, obj, contexts); - } - }; - } - - @Override - public synchronized void removeTermIteration(Iterator iter, Resource subj, IRI pred, Value obj, - Resource... contexts) { - if (disk == null) { - memory.removeTermIteration(iter, subj, pred, obj, contexts); - } else { - disk.removeTermIteration(iter, subj, pred, obj, contexts); - } + public MemoryOverflowModel(boolean verifyAdditions) { + super(); + this.verifyAdditions = verifyAdditions; } protected abstract LmdbSailStore createSailStore(File dataDir) throws IOException, SailException; - private Model getDelegate() { - var memory = this.memory; - if (memory != null) { - return memory; - } else { - var disk = this.disk; - if (disk != null) { - return disk; - } - synchronized (this) { - if (this.memory != null) { - return this.memory; - } - if (this.disk != null) { - return this.disk; - } - throw new IllegalStateException("MemoryOverflowModel is in an inconsistent state"); - } - } - } - - private void writeObject(ObjectOutputStream s) throws IOException { - // Write out any hidden serialization magic - s.defaultWriteObject(); - // Write in size - Model delegate = getDelegate(); - s.writeInt(delegate.size()); - // Write in all elements - for (Statement st : delegate) { - Resource subj = st.getSubject(); - IRI pred = st.getPredicate(); - Value obj = st.getObject(); - Resource ctx = st.getContext(); - s.writeObject(vf.createStatement(subj, pred, obj, ctx)); - } - } - - private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException { - // Read in any hidden serialization magic - s.defaultReadObject(); - // Read in size - int size = s.readInt(); - // Read in all elements - for (int i = 0; i < size; i++) { - add((Statement) s.readObject()); - } - } - - private void checkMemoryOverflow() { - if (disk == getDelegate()) { - return; - } - - if (overflow) { - innerCheckMemoryOverflow(); - } - int size = size() + 1; - if (size >= LARGE_BLOCK && size % LARGE_BLOCK == 0) { - innerCheckMemoryOverflow(); - } - - } - - private void innerCheckMemoryOverflow() { - if (disk == getDelegate()) { - return; - } - - // maximum heap size the JVM can allocate - long maxMemory = RUNTIME.maxMemory(); - - // total currently allocated JVM memory - long totalMemory = RUNTIME.totalMemory(); - - // amount of memory free in the currently allocated JVM memory - long freeMemory = RUNTIME.freeMemory(); - - // estimated memory used - long used = totalMemory - freeMemory; - - // amount of memory the JVM can still allocate from the OS (upper boundary is the max heap) - long freeToAllocateMemory = maxMemory - used; - - if (baseline > 0) { - long blockSize = used - baseline; - if (blockSize > maxBlockSize) { - maxBlockSize = blockSize; - } - if (overflow && freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING * 2) { - // stricter memory requirements to not overflow if other models are overflowing - logger.debug("syncing at {} triples. max block size: {}", size(), maxBlockSize); - overflowToDisk(); - System.gc(); - } else if (freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING || - freeToAllocateMemory < Math.min(0.15 * maxMemory, maxBlockSize)) { - // Sync if either the estimated size of the next block is larger than remaining memory, or - // if less than 15% of the heap is still free (this last condition to avoid GC overhead limit) - - logger.debug("syncing at {} triples. max block size: {}", size(), maxBlockSize); - overflowToDisk(); - System.gc(); - } else { - if (overflow) { - overflow = false; - } - } - } - baseline = used; - } - - private synchronized void overflowToDisk() { - overflow = true; - if (memory == null) { - assert disk != null; - return; - } - + @Override + protected void overflowToDiskInner(Model memory) { try { - LinkedHashModel memory = this.memory; - this.memory = null; - assert disk == null; dataDir = Files.createTempDirectory("model").toFile(); logger.debug("memory overflow using temp directory {}", dataDir); @@ -369,11 +71,10 @@ protected void finalize() throws Throwable { super.finalize(); } }; - - logger.debug("overflow synced to disk"); } catch (IOException | SailException e) { String path = dataDir != null ? dataDir.getAbsolutePath() : "(unknown)"; logger.error("Error while writing to overflow directory " + path, e); } } + } diff --git a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/OverflowBenchmarkConcurrent.java b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/OverflowBenchmarkConcurrent.java index 419ff93cc3..eef34f93d1 100644 --- a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/OverflowBenchmarkConcurrent.java +++ b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/OverflowBenchmarkConcurrent.java @@ -29,6 +29,7 @@ import org.apache.commons.io.FileUtils; import org.assertj.core.util.Files; +import org.eclipse.rdf4j.common.io.FileUtil; import org.eclipse.rdf4j.model.IRI; import org.eclipse.rdf4j.model.Model; import org.eclipse.rdf4j.model.Statement; @@ -56,6 +57,7 @@ import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.runner.Runner; import org.openjdk.jmh.runner.RunnerException; @@ -71,8 +73,8 @@ @State(Scope.Benchmark) @Warmup(iterations = 5) @BenchmarkMode({ Mode.AverageTime }) -@Fork(value = 1, jvmArgs = { "-Xms1G", "-Xmx1G", "-XX:+UseParallelGC" }) -@Measurement(iterations = 5, batchSize = 1, time = 1, timeUnit = TimeUnit.MILLISECONDS) +@Fork(value = 1, jvmArgs = { "-Xms1G", "-Xmx1G", "-XX:+UseG1GC" }) +@Measurement(iterations = 5) @OutputTimeUnit(TimeUnit.MILLISECONDS) public class OverflowBenchmarkConcurrent { @@ -81,14 +83,24 @@ public void setup() { ((Logger) (LoggerFactory .getLogger("org.eclipse.rdf4j.sail.lmdb.MemoryOverflowModel"))) .setLevel(ch.qos.logback.classic.Level.DEBUG); + + ((Logger) (LoggerFactory + .getLogger("org.eclipse.rdf4j.model.impl.AbstractMemoryOverflowModel"))) + .setLevel(ch.qos.logback.classic.Level.DEBUG); } - public static void main(String[] args) throws RunnerException { - Options opt = new OptionsBuilder() - .include("OverflowBenchmarkConcurrent") // adapt to run other benchmark tests - .build(); + @TearDown(Level.Trial) + public void tearDown() throws InterruptedException { + System.out.println("HERE"); +// Thread.sleep(1000*60*60*60); + } - new Runner(opt).run(); + public static void main(String[] args) throws RunnerException, IOException { + OverflowBenchmarkConcurrent overflowBenchmarkConcurrent = new OverflowBenchmarkConcurrent(); + overflowBenchmarkConcurrent.setup(); + while (true) { + overflowBenchmarkConcurrent.manyConcurrentTransactions(); + } } @Benchmark @@ -166,7 +178,11 @@ public void manyConcurrentTransactions() throws IOException { try (RepositoryResult statements = connection.getStatements(null, null, null)) { statements.stream().limit(10_000).forEach(connection::remove); } - connection.commit(); + try { + connection.commit(); + } catch (Exception e) { + connection.rollback(); + } System.out.println("Removed"); } @@ -194,7 +210,7 @@ public void manyConcurrentTransactions() throws IOException { try { sailRepository.shutDown(); } finally { - FileUtils.deleteDirectory(temporaryFolder); + FileUtil.deleteDir(temporaryFolder); } } } diff --git a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/MemoryOverflowModel.java b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/MemoryOverflowModel.java index e651d77b07..70e4f7cf05 100644 --- a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/MemoryOverflowModel.java +++ b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/MemoryOverflowModel.java @@ -12,25 +12,11 @@ import java.io.File; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.nio.file.Files; -import java.util.Collection; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Optional; -import java.util.Set; -import org.eclipse.rdf4j.model.IRI; import org.eclipse.rdf4j.model.Model; -import org.eclipse.rdf4j.model.Namespace; -import org.eclipse.rdf4j.model.Resource; -import org.eclipse.rdf4j.model.Statement; -import org.eclipse.rdf4j.model.Value; -import org.eclipse.rdf4j.model.impl.AbstractModel; -import org.eclipse.rdf4j.model.impl.FilteredModel; +import org.eclipse.rdf4j.model.impl.AbstractMemoryOverflowModel; import org.eclipse.rdf4j.model.impl.LinkedHashModel; -import org.eclipse.rdf4j.model.impl.SimpleValueFactory; import org.eclipse.rdf4j.sail.SailException; import org.eclipse.rdf4j.sail.base.SailStore; import org.slf4j.Logger; @@ -43,21 +29,10 @@ * * @author James Leigh */ -abstract class MemoryOverflowModel extends AbstractModel { +abstract class MemoryOverflowModel extends AbstractMemoryOverflowModel { private static final long serialVersionUID = 4119844228099208169L; - private static final Runtime RUNTIME = Runtime.getRuntime(); - - private static final int LARGE_BLOCK = 1024 * 5; - - private static volatile boolean overflow; - - // To reduce the chance of OOM we will always overflow once we get close to running out of memory even if we think - // we have space for one more block. The limit is currently set at 32 MB for small heaps and 128 MB for large heaps. - private static final int MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING = RUNTIME.maxMemory() >= 1024 ? 128 * 1024 * 1024 - : 32 * 1024 * 1024; - final Logger logger = LoggerFactory.getLogger(MemoryOverflowModel.class); private volatile LinkedHashModel memory; @@ -66,287 +41,11 @@ abstract class MemoryOverflowModel extends AbstractModel { private transient SailStore store; - private transient volatile SailSourceModel disk; - - private long baseline = 0; - - private long maxBlockSize = 0; - - SimpleValueFactory vf = SimpleValueFactory.getInstance(); - - public MemoryOverflowModel() { - memory = new LinkedHashModel(LARGE_BLOCK * 2); - } - - public MemoryOverflowModel(Model model) { - this(model.getNamespaces()); - addAll(model); - } - - public MemoryOverflowModel(Set namespaces, Collection c) { - this(namespaces); - addAll(c); - } - - public MemoryOverflowModel(Set namespaces) { - memory = new LinkedHashModel(namespaces, LARGE_BLOCK); - } - - @Override - public synchronized void closeIterator(Iterator iter) { - super.closeIterator(iter); - if (disk != null) { - disk.closeIterator(iter); - } - } - - @Override - public synchronized Set getNamespaces() { - return memory.getNamespaces(); - } - - @Override - public synchronized Optional getNamespace(String prefix) { - return memory.getNamespace(prefix); - } - - @Override - public synchronized Namespace setNamespace(String prefix, String name) { - return memory.setNamespace(prefix, name); - } - - @Override - public void setNamespace(Namespace namespace) { - memory.setNamespace(namespace); - } - - @Override - public synchronized Optional removeNamespace(String prefix) { - return memory.removeNamespace(prefix); - } - - @Override - public boolean contains(Resource subj, IRI pred, Value obj, Resource... contexts) { - return getDelegate().contains(subj, pred, obj, contexts); - } - - @Override - public boolean add(Resource subj, IRI pred, Value obj, Resource... contexts) { - checkMemoryOverflow(); - return getDelegate().add(subj, pred, obj, contexts); - } - - @Override - public boolean add(Statement st) { - checkMemoryOverflow(); - return getDelegate().add(st); - } - - @Override - public boolean addAll(Collection c) { - checkMemoryOverflow(); - if (disk != null || c.size() <= 1024) { - return getDelegate().addAll(c); - } else { - boolean ret = false; - HashSet buffer = new HashSet<>(); - for (Statement st : c) { - buffer.add(st); - if (buffer.size() >= 1024) { - ret |= getDelegate().addAll(buffer); - buffer.clear(); - innerCheckMemoryOverflow(); - } - } - if (!buffer.isEmpty()) { - ret |= getDelegate().addAll(buffer); - buffer.clear(); - } - - return ret; - - } - - } - - @Override - public boolean remove(Resource subj, IRI pred, Value obj, Resource... contexts) { - return getDelegate().remove(subj, pred, obj, contexts); - } - - @Override - public int size() { - return getDelegate().size(); - } - - @Override - public Iterator iterator() { - return getDelegate().iterator(); - } - - @Override - public boolean clear(Resource... contexts) { - return getDelegate().clear(contexts); - } - - @Override - public Model filter(final Resource subj, final IRI pred, final Value obj, final Resource... contexts) { - return new FilteredModel(this, subj, pred, obj, contexts) { - - private static final long serialVersionUID = -475666402618133101L; - - @Override - public int size() { - return getDelegate().filter(subj, pred, obj, contexts).size(); - } - - @Override - public Iterator iterator() { - return getDelegate().filter(subj, pred, obj, contexts).iterator(); - } - - @Override - protected void removeFilteredTermIteration(Iterator iter, Resource subj, IRI pred, Value obj, - Resource... contexts) { - MemoryOverflowModel.this.removeTermIteration(iter, subj, pred, obj, contexts); - } - }; - } - - @Override - public synchronized void removeTermIteration(Iterator iter, Resource subj, IRI pred, Value obj, - Resource... contexts) { - if (disk == null) { - memory.removeTermIteration(iter, subj, pred, obj, contexts); - } else { - disk.removeTermIteration(iter, subj, pred, obj, contexts); - } - } - protected abstract SailStore createSailStore(File dataDir) throws IOException, SailException; - private Model getDelegate() { - var memory = this.memory; - if (memory != null) { - return memory; - } else { - var disk = this.disk; - if (disk != null) { - return disk; - } - synchronized (this) { - if (this.memory != null) { - return this.memory; - } - if (this.disk != null) { - return this.disk; - } - throw new IllegalStateException("MemoryOverflowModel is in an inconsistent state"); - } - } - } - - private void writeObject(ObjectOutputStream s) throws IOException { - // Write out any hidden serialization magic - s.defaultWriteObject(); - // Write in size - Model delegate = getDelegate(); - s.writeInt(delegate.size()); - // Write in all elements - for (Statement st : delegate) { - Resource subj = st.getSubject(); - IRI pred = st.getPredicate(); - Value obj = st.getObject(); - Resource ctx = st.getContext(); - s.writeObject(vf.createStatement(subj, pred, obj, ctx)); - } - } - - private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException { - // Read in any hidden serialization magic - s.defaultReadObject(); - // Read in size - int size = s.readInt(); - // Read in all elements - for (int i = 0; i < size; i++) { - add((Statement) s.readObject()); - } - } - - private void checkMemoryOverflow() { - if (disk == getDelegate()) { - return; - } - - if (overflow) { - innerCheckMemoryOverflow(); - } - int size = size() + 1; - if (size >= LARGE_BLOCK && size % LARGE_BLOCK == 0) { - innerCheckMemoryOverflow(); - } - - } - - private void innerCheckMemoryOverflow() { - if (disk == getDelegate()) { - return; - } - - // maximum heap size the JVM can allocate - long maxMemory = RUNTIME.maxMemory(); - - // total currently allocated JVM memory - long totalMemory = RUNTIME.totalMemory(); - - // amount of memory free in the currently allocated JVM memory - long freeMemory = RUNTIME.freeMemory(); - - // estimated memory used - long used = totalMemory - freeMemory; - - // amount of memory the JVM can still allocate from the OS (upper boundary is the max heap) - long freeToAllocateMemory = maxMemory - used; - - if (baseline > 0) { - long blockSize = used - baseline; - if (blockSize > maxBlockSize) { - maxBlockSize = blockSize; - } - if (overflow && freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING * 2) { - // stricter memory requirements to not overflow if other models are overflowing - logger.debug("syncing at {} triples. max block size: {}", size(), maxBlockSize); - overflowToDisk(); - System.gc(); - } else if (freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING || - freeToAllocateMemory < Math.min(0.15 * maxMemory, maxBlockSize)) { - // Sync if either the estimated size of the next block is larger than remaining memory, or - // if less than 15% of the heap is still free (this last condition to avoid GC overhead limit) - - logger.debug("syncing at {} triples. max block size: {}", size(), maxBlockSize); - overflowToDisk(); - System.gc(); - } else { - if (overflow) { - overflow = false; - } - } - } - baseline = used; - } - - private synchronized void overflowToDisk() { - overflow = true; - - if (memory == null) { - assert disk != null; - return; - } - + @Override + protected void overflowToDiskInner(Model memory) { try { - LinkedHashModel memory = this.memory; - this.memory = null; - assert disk == null; dataDir = Files.createTempDirectory("model").toFile(); logger.debug("memory overflow using temp directory {}", dataDir); diff --git a/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/benchmark/OverflowBenchmarkConcurrent.java b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/benchmark/OverflowBenchmarkConcurrent.java index 826f138674..277c6c498e 100644 --- a/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/benchmark/OverflowBenchmarkConcurrent.java +++ b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/benchmark/OverflowBenchmarkConcurrent.java @@ -69,10 +69,10 @@ * @author HÃ¥vard Ottestad */ @State(Scope.Benchmark) -@Warmup(iterations = 0) +@Warmup(iterations = 5) @BenchmarkMode({ Mode.AverageTime }) -@Fork(value = 1, jvmArgs = { "-Xms1G", "-Xmx1G", "-XX:+UseParallelGC" }) -@Measurement(iterations = 10, batchSize = 1, time = 1, timeUnit = TimeUnit.MILLISECONDS) +@Fork(value = 1, jvmArgs = { "-Xms1G", "-Xmx1G", "-XX:+UseG1GC" }) +@Measurement(iterations = 5) @OutputTimeUnit(TimeUnit.MILLISECONDS) public class OverflowBenchmarkConcurrent { @@ -81,6 +81,10 @@ public void setup() { ((Logger) (LoggerFactory .getLogger("org.eclipse.rdf4j.sail.nativerdf.MemoryOverflowModel"))) .setLevel(ch.qos.logback.classic.Level.DEBUG); + + ((Logger) (LoggerFactory + .getLogger("org.eclipse.rdf4j.model.impl.AbstractMemoryOverflowModel"))) + .setLevel(ch.qos.logback.classic.Level.DEBUG); } public static void main(String[] args) throws RunnerException {