From 18964075c16721f8bf8c9098236646290ec0d5ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ha=CC=8Avard=20Ottestad?= Date: Thu, 9 May 2024 13:10:06 +0200 Subject: [PATCH] GH-2998 add all when overflowing instead of one statement at a time --- .../sail/nativerdf/MemoryOverflowModel.java | 123 +++++++++---- .../rdf4j/sail/nativerdf/NativeSailStore.java | 38 ++++ .../rdf4j/sail/nativerdf/SailSourceModel.java | 171 ++++++++++++------ 3 files changed, 248 insertions(+), 84 deletions(-) diff --git a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/MemoryOverflowModel.java b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/MemoryOverflowModel.java index 001e25de584..14dfc19935d 100644 --- a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/MemoryOverflowModel.java +++ b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/MemoryOverflowModel.java @@ -16,6 +16,7 @@ import java.io.ObjectOutputStream; import java.nio.file.Files; import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; import java.util.Optional; import java.util.Set; @@ -50,9 +51,12 @@ abstract class MemoryOverflowModel extends AbstractModel { private static final int LARGE_BLOCK = 10000; + private static volatile boolean overflow; + // To reduce the chance of OOM we will always overflow once we get close to running out of memory even if we think - // we have space for one more block. The limit is currently set at 32 MB - private static final int MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING = 32 * 1024 * 1024; + // we have space for one more block. The limit is currently set at 32 MB for small heaps and 128 MB for large heaps. + private static final int MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING = RUNTIME.maxMemory() >= 1024 ? 128 * 1024 * 1024 + : 32 * 1024 * 1024; final Logger logger = LoggerFactory.getLogger(MemoryOverflowModel.class); @@ -138,6 +142,33 @@ public boolean add(Statement st) { return getDelegate().add(st); } + @Override + public boolean addAll(Collection c) { + checkMemoryOverflow(); + if (disk != null || c.size() <= 1024) { + return getDelegate().addAll(c); + } else { + boolean ret = false; + HashSet buffer = new HashSet<>(); + for (Statement st : c) { + buffer.add(st); + if (buffer.size() >= 1024) { + ret |= getDelegate().addAll(buffer); + buffer.clear(); + innerCheckMemoryOverflow(); + } + } + if (!buffer.isEmpty()) { + ret |= getDelegate().addAll(buffer); + buffer.clear(); + } + + return ret; + + } + + } + @Override public boolean remove(Resource subj, IRI pred, Value obj, Resource... contexts) { return getDelegate().remove(subj, pred, obj, contexts); @@ -234,43 +265,69 @@ private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundEx private synchronized void checkMemoryOverflow() { if (disk == null) { + if (overflow) { + System.out.println("HERE: " + overflow); + innerCheckMemoryOverflow(); + } int size = size(); if (size >= LARGE_BLOCK && size % LARGE_BLOCK == 0) { - // maximum heap size the JVM can allocate - long maxMemory = RUNTIME.maxMemory(); - - // total currently allocated JVM memory - long totalMemory = RUNTIME.totalMemory(); - - // amount of memory free in the currently allocated JVM memory - long freeMemory = RUNTIME.freeMemory(); - - // estimated memory used - long used = totalMemory - freeMemory; - - // amount of memory the JVM can still allocate from the OS (upper boundary is the max heap) - long freeToAllocateMemory = maxMemory - used; - - if (baseline > 0) { - long blockSize = used - baseline; - if (blockSize > maxBlockSize) { - maxBlockSize = blockSize; - } - - // Sync if either the estimated size of the next block is larger than remaining memory, or - // if less than 15% of the heap is still free (this last condition to avoid GC overhead limit) - if (freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING || - freeToAllocateMemory < Math.min(0.15 * maxMemory, maxBlockSize)) { - logger.debug("syncing at {} triples. max block size: {}", size, maxBlockSize); - overflowToDisk(); - } + innerCheckMemoryOverflow(); + } + } + } + + private void innerCheckMemoryOverflow() { + if (disk != null) { + return; + } + + // maximum heap size the JVM can allocate + long maxMemory = RUNTIME.maxMemory(); + + // total currently allocated JVM memory + long totalMemory = RUNTIME.totalMemory(); + + // amount of memory free in the currently allocated JVM memory + long freeMemory = RUNTIME.freeMemory(); + + // estimated memory used + long used = totalMemory - freeMemory; + + // amount of memory the JVM can still allocate from the OS (upper boundary is the max heap) + long freeToAllocateMemory = maxMemory - used; + + if (baseline > 0) { + long blockSize = used - baseline; + if (blockSize > maxBlockSize) { + maxBlockSize = blockSize; + } + if (overflow && freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING * 2) { + // stricter memory requirements to not overflow if other models are overflowing + logger.debug("syncing at {} triples. max block size: {}", size(), maxBlockSize); + overflowToDisk(); + System.gc(); + } else if (freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING || + freeToAllocateMemory < Math.min(0.15 * maxMemory, maxBlockSize)) { + // Sync if either the estimated size of the next block is larger than remaining memory, or + // if less than 15% of the heap is still free (this last condition to avoid GC overhead limit) + + logger.debug("syncing at {} triples. max block size: {}", size(), maxBlockSize); + overflowToDisk(); + System.gc(); + } else { + if (overflow) { + System.out.println("DO NOT OVERFLOW ANYMORE"); + overflow = false; } - baseline = used; } } + baseline = used; } private synchronized void overflowToDisk() { + System.out.println("OVERFLOWING"); + overflow = true; + try { LinkedHashModel memory = this.memory; this.memory = null; @@ -279,8 +336,8 @@ private synchronized void overflowToDisk() { dataDir = Files.createTempDirectory("model").toFile(); logger.debug("memory overflow using temp directory {}", dataDir); store = createSailStore(dataDir); - disk = new SailSourceModel(store); - disk.addAll(memory); + disk = new SailSourceModel(store, memory); +// disk.addAll(memory); logger.debug("overflow synced to disk"); } catch (IOException | SailException e) { String path = dataDir != null ? dataDir.getAbsolutePath() : "(unknown)"; diff --git a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeSailStore.java b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeSailStore.java index da790e890ac..7aeb8d97109 100644 --- a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeSailStore.java +++ b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeSailStore.java @@ -444,6 +444,44 @@ public void approve(Resource subj, IRI pred, Value obj, Resource ctx) throws Sai addStatement(subj, pred, obj, explicit, ctx); } + @Override + public void approveAll(Set approved, Set approvedContexts) { + sinkStoreAccessLock.lock(); + startTriplestoreTransaction(); + + try { + for (Statement statement : approved) { + Resource subj = statement.getSubject(); + IRI pred = statement.getPredicate(); + Value obj = statement.getObject(); + Resource context = statement.getContext(); + + int subjID = valueStore.storeValue(subj); + int predID = valueStore.storeValue(pred); + int objID = valueStore.storeValue(obj); + + int contextID = 0; + if (context != null) { + contextID = valueStore.storeValue(context); + } + + boolean wasNew = tripleStore.storeTriple(subjID, predID, objID, contextID, explicit); + if (wasNew && context != null) { + contextStore.increment(context); + } + + } + } catch (IOException e) { + throw new SailException(e); + } catch (RuntimeException e) { + logger.error("Encountered an unexpected problem while trying to add a statement", e); + throw e; + } finally { + sinkStoreAccessLock.unlock(); + } + + } + @Override public void deprecate(Statement statement) throws SailException { removeStatements(statement.getSubject(), statement.getPredicate(), statement.getObject(), explicit, diff --git a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/SailSourceModel.java b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/SailSourceModel.java index 1e73b9cdffc..9b334528fe8 100644 --- a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/SailSourceModel.java +++ b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/SailSourceModel.java @@ -11,6 +11,8 @@ package org.eclipse.rdf4j.sail.nativerdf; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; @@ -28,6 +30,7 @@ import org.eclipse.rdf4j.model.impl.AbstractModel; import org.eclipse.rdf4j.model.impl.EmptyModel; import org.eclipse.rdf4j.model.impl.FilteredModel; +import org.eclipse.rdf4j.model.impl.LinkedHashModel; import org.eclipse.rdf4j.model.impl.SimpleNamespace; import org.eclipse.rdf4j.model.util.ModelException; import org.eclipse.rdf4j.sail.SailException; @@ -47,52 +50,6 @@ class SailSourceModel extends AbstractModel { private static final Logger logger = LoggerFactory.getLogger(SailSourceModel.class); - private final class StatementIterator implements Iterator { - - final CloseableIteration stmts; - - Statement last; - - StatementIterator(CloseableIteration closeableIteration) { - this.stmts = closeableIteration; - } - - @Override - public boolean hasNext() { - try { - if (stmts.hasNext()) { - return true; - } - stmts.close(); - return false; - } catch (SailException e) { - throw new ModelException(e); - } - } - - @Override - public Statement next() { - try { - last = stmts.next(); - if (last == null) { - stmts.close(); - } - return last; - } catch (SailException e) { - throw new ModelException(e); - } - } - - @Override - public void remove() { - if (last == null) { - throw new IllegalStateException("next() not yet called"); - } - SailSourceModel.this.remove(last); - last = null; - } - } - final SailSource source; SailDataset dataset; @@ -111,6 +68,13 @@ public SailSourceModel(SailSource source) { this.source = source; } + public SailSourceModel(SailStore store, Model bulk) { + this(store); + sink().approveAll(bulk, bulk.contexts()); + size = bulk.size(); + sink.flush(); + } + @Override public void closeIterator(Iterator iter) { super.closeIterator(iter); @@ -265,6 +229,69 @@ public synchronized boolean add(Resource subj, IRI pred, Value obj, Resource... } } + @Override + public boolean add(Statement st) { + Resource subj = st.getSubject(); + IRI pred = st.getPredicate(); + Value obj = st.getObject(); + Resource ctx = st.getContext(); + try { + if (contains(subj, pred, obj, ctx)) { + logger.trace("already contains statement {} {} {} {}", subj, pred, obj, ctx); + return false; + } + if (size >= 0) { + size++; + } + + sink().approve(subj, pred, obj, ctx); + + return true; + } catch (SailException e) { + throw new ModelException(e); + } + } + + @Override + public boolean addAll(Collection statements) { + if (statements.isEmpty()) { + return false; + } + + if (statements.size() == 1) { + Statement st = statements.iterator().next(); + return add(st.getSubject(), st.getPredicate(), st.getObject(), st.getContext()); + } + + boolean added = false; + + HashSet tempSet = new HashSet<>(); + HashSet contexts = new HashSet<>(); + SailSink sink = sink(); + + for (Statement statement : statements) { + if (tempSet.size() >= 1024) { + sink.approveAll(tempSet, contexts); + tempSet.clear(); + contexts.clear(); + added = true; + } + if (!contains(statement.getSubject(), statement.getPredicate(), statement.getObject(), + statement.getContext())) { + contexts.add(statement.getContext()); + tempSet.add(statement); + } + } + + if (!tempSet.isEmpty()) { + sink.approveAll(tempSet, contexts); + added = true; + } + + return added; + + } + @Override public synchronized boolean clear(Resource... contexts) { try { @@ -411,12 +438,8 @@ private boolean contains(SailDataset dataset, Resource subj, IRI pred, Value obj if (dataset == null) { return false; } - CloseableIteration stmts; - stmts = dataset.getStatements(subj, pred, obj, contexts); - try { + try (CloseableIteration stmts = dataset.getStatements(subj, pred, obj, contexts)) { return stmts.hasNext(); - } finally { - stmts.close(); } } @@ -466,4 +489,50 @@ Resource[] cast(Value[] contexts) { return result; } + private final class StatementIterator implements Iterator { + + final CloseableIteration stmts; + + Statement last; + + StatementIterator(CloseableIteration closeableIteration) { + this.stmts = closeableIteration; + } + + @Override + public boolean hasNext() { + try { + if (stmts.hasNext()) { + return true; + } + stmts.close(); + return false; + } catch (SailException e) { + throw new ModelException(e); + } + } + + @Override + public Statement next() { + try { + last = stmts.next(); + if (last == null) { + stmts.close(); + } + return last; + } catch (SailException e) { + throw new ModelException(e); + } + } + + @Override + public void remove() { + if (last == null) { + throw new IllegalStateException("next() not yet called"); + } + SailSourceModel.this.remove(last); + last = null; + } + } + }