From 62920770d55356c280c15b540efed8a0052b4133 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ha=CC=8Avard=20Ottestad?= Date: Wed, 27 Sep 2023 15:26:54 +0200 Subject: [PATCH] GH-4784 interrupt thread that is owning a connection before closing if we are closing from a different thread --- .../rdf4j/sail/helpers/AbstractSail.java | 28 +++++ .../sail/helpers/AbstractSailConnection.java | 55 +++++++-- .../rdf4j/sail/helpers/SailWrapper.java | 1 + .../rdf4j/sail/lmdb/LmdbSailStore.java | 8 +- .../rdf4j/sail/memory/MemorySailStore.java | 45 ++++++-- .../eclipse/rdf4j/sail/shacl/ShaclSail.java | 9 +- .../rdf4j/sail/shacl/ShaclSailConnection.java | 24 ++++ .../testsuite/sail/SailConcurrencyTest.java | 107 ++++++++++++++++++ 8 files changed, 256 insertions(+), 21 deletions(-) diff --git a/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSail.java b/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSail.java index e5d206ad226..108974263e0 100644 --- a/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSail.java +++ b/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSail.java @@ -222,6 +222,34 @@ public void shutDown() throws SailException { activeConnectionsCopy = new IdentityHashMap<>(activeConnections); } + // Interrupt any threads that are still using a connection, in case they are waiting for a lock + for (Map.Entry entry : activeConnectionsCopy.entrySet()) { + try { + SailConnection con = entry.getKey(); + + if (con instanceof AbstractSailConnection) { + AbstractSailConnection sailCon = (AbstractSailConnection) con; + sailCon.getOwner().interrupt(); + sailCon.getOwner().join(1000); + if (sailCon.getOwner().isAlive()) { + logger.error( + "Closing active connection due to shut down and interrupted the owning thread of the connection {} but thread is still alive after 1000 ms!", + sailCon.getOwner()); + } + } + + } catch (Throwable e) { + if (e instanceof InterruptedException) { + throw new SailException(e); + } else if (e instanceof AssertionError) { + // ignore assertions errors + } else if (e instanceof Error) { + throw (Error) e; + } + // ignore all other exceptions + } + } + // Forcefully close any connections that are still open for (Map.Entry entry : activeConnectionsCopy.entrySet()) { SailConnection con = entry.getKey(); diff --git a/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSailConnection.java b/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSailConnection.java index 019bff3a5e3..0f3a6b70d67 100644 --- a/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSailConnection.java +++ b/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSailConnection.java @@ -12,6 +12,7 @@ import java.lang.invoke.MethodHandles; import java.lang.invoke.VarHandle; +import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.util.IdentityHashMap; @@ -19,6 +20,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.LongAdder; +import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -98,15 +100,7 @@ public abstract class AbstractSailConnection implements SailConnection { private boolean isOpen = true; private static final VarHandle IS_OPEN; - static { - try { - IS_OPEN = MethodHandles.lookup() - .in(AbstractSailConnection.class) - .findVarHandle(AbstractSailConnection.class, "isOpen", boolean.class); - } catch (ReflectiveOperationException e) { - throw new Error(e); - } - } + private Thread owner; /** * Lock used to prevent concurrent calls to update methods like addStatement, clear, commit, etc. within a @@ -161,6 +155,7 @@ public AbstractSailConnection(AbstractSail sailBase) { } else { activeIterationsDebug = Collections.emptyMap(); } + owner = Thread.currentThread(); } /*---------* @@ -252,6 +247,23 @@ public boolean isActive() throws UnknownSailTransactionStateException { @Override public final void close() throws SailException { + if (Thread.currentThread() != owner) { + logger.error( + "Closing connection from a different thread than the one that opened it. Connections should not be shared between threads. Opened by " + + owner + " closed by " + Thread.currentThread(), + new Throwable("Throwable used for stacktrace")); + owner.interrupt(); + try { + owner.join(1000); + if (owner.isAlive()) { + logger.error("Interrupted thread {} but thread is still alive after 1000 ms!", owner); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SailException(e); + } + } + // obtain an exclusive lock so that any further operations on this // connection (including those from any concurrent threads) are blocked. if (!IS_OPEN.compareAndSet(this, true, false)) { @@ -268,7 +280,7 @@ public final void close() throws SailException { if (sumDone == sumBlocking) { break; } else { - Thread.onSpinWait(); + LockSupport.parkNanos(Duration.ofMillis(10).toNanos()); } } @@ -876,6 +888,10 @@ protected Lock getTransactionLock() throws SailException { return new JavaLock(updateLock); } + public Thread getOwner() { + return owner; + } + /** * Registers an iteration as active by wrapping it in a {@link SailBaseIteration} object and adding it to the list * of active iterations. @@ -959,6 +975,15 @@ protected AbstractSail getSailBase() { } private void forceCloseActiveOperations() throws SailException { + if (owner != Thread.currentThread()) { + owner.interrupt(); + try { + owner.join(1000); + } catch (InterruptedException e) { + throw new SailException(e); + } + } + for (int i = 0; i < 10 && isActiveOperation() && !debugEnabled; i++) { System.gc(); try { @@ -1138,4 +1163,14 @@ public synchronized void release() { } } } + + static { + try { + IS_OPEN = MethodHandles.lookup() + .in(AbstractSailConnection.class) + .findVarHandle(AbstractSailConnection.class, "isOpen", boolean.class); + } catch (ReflectiveOperationException e) { + throw new Error(e); + } + } } diff --git a/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/SailWrapper.java b/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/SailWrapper.java index 91863ac8131..099e3e5d813 100644 --- a/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/SailWrapper.java +++ b/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/SailWrapper.java @@ -150,4 +150,5 @@ public Supplier getCollectionFactory() { verifyBaseSailSet(); return baseSail.getCollectionFactory(); } + } diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java index 239a40cfb16..9eb26a3ab2f 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java @@ -12,6 +12,7 @@ import java.io.File; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -20,6 +21,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.ReentrantLock; import org.eclipse.rdf4j.common.iteration.CloseableIteration; @@ -236,7 +238,8 @@ public void close() throws SailException { running.set(false); tripleStoreExecutor.shutdown(); while (!tripleStoreExecutor.isTerminated()) { - Thread.yield(); + tripleStoreExecutor.shutdownNow(); + LockSupport.parkNanos(Duration.ofMillis(100).toNanos()); } tripleStore.close(); } @@ -561,6 +564,9 @@ private void startTransaction(boolean preferThreading) throws SailException { op.execute(); } } else { + if (Thread.interrupted()) { + throw new InterruptedException(); + } Thread.yield(); } } diff --git a/core/sail/memory/src/main/java/org/eclipse/rdf4j/sail/memory/MemorySailStore.java b/core/sail/memory/src/main/java/org/eclipse/rdf4j/sail/memory/MemorySailStore.java index 89be7f9d112..59aca80393d 100644 --- a/core/sail/memory/src/main/java/org/eclipse/rdf4j/sail/memory/MemorySailStore.java +++ b/core/sail/memory/src/main/java/org/eclipse/rdf4j/sail/memory/MemorySailStore.java @@ -577,10 +577,10 @@ public MemorySailSink(boolean explicit, boolean serializable) throws SailExcepti this.explicit = explicit; if (serializable) { this.serializable = currentSnapshot; - reservedSnapshot = snapshotMonitor.reserve(this.serializable, this); + this.reservedSnapshot = snapshotMonitor.reserve(this.serializable, this); } else { this.serializable = Integer.MAX_VALUE; - reservedSnapshot = null; + this.reservedSnapshot = null; } } @@ -631,6 +631,7 @@ public synchronized void prepare() throws SailException { } } } + assert txnLock && txnLockManager.isHeldByCurrentThread() : "Should still be holding lock"; } @Override @@ -641,6 +642,7 @@ public synchronized void flush() throws SailException { if (requireCleanup) { scheduleSnapshotCleanup(); } + assert txnLock && txnLockManager.isHeldByCurrentThread() : "Should still be holding lock"; } } @@ -653,12 +655,26 @@ public void close() { reservedSnapshot.release(); } } finally { - boolean toCloseTxnLock = txnLock; - txnLock = false; - if (toCloseTxnLock) { - txnLockManager.unlock(); + try { + releaseLock(); + } finally { + observations = null; } - observations = null; + + } + + } + } + + synchronized private void releaseLock() { + if (txnLock) { + try { + txnLock = false; + txnLockManager.unlock(); + } catch (IllegalMonitorStateException t) { + txnLock = true; + throw new SailException("Failed to release lock from thread " + Thread.currentThread() + + " because it was locked by another thread.", t); } } @@ -668,18 +684,21 @@ public void close() { public synchronized void setNamespace(String prefix, String name) { acquireExclusiveTransactionLock(); namespaceStore.setNamespace(prefix, name); + assert txnLock && txnLockManager.isHeldByCurrentThread() : "Should still be holding lock"; } @Override public synchronized void removeNamespace(String prefix) { acquireExclusiveTransactionLock(); namespaceStore.removeNamespace(prefix); + assert txnLock && txnLockManager.isHeldByCurrentThread() : "Should still be holding lock"; } @Override public synchronized void clearNamespaces() { acquireExclusiveTransactionLock(); namespaceStore.clear(); + assert txnLock && txnLockManager.isHeldByCurrentThread() : "Should still be holding lock"; } @Override @@ -715,6 +734,7 @@ public synchronized void clear(Resource... contexts) { } catch (InterruptedException e) { throw convertToSailException(e); } + assert txnLock && txnLockManager.isHeldByCurrentThread() : "Should still be holding lock"; } @Override @@ -726,6 +746,7 @@ public synchronized void approve(Resource subj, IRI pred, Value obj, Resource ct } catch (InterruptedException e) { throw convertToSailException(e); } + assert txnLock && txnLockManager.isHeldByCurrentThread() : "Should still be holding lock"; } @Override @@ -740,6 +761,7 @@ public synchronized void approveAll(Set approved, Set appro } catch (InterruptedException e) { throw convertToSailException(e); } + assert txnLock && txnLockManager.isHeldByCurrentThread() : "Should still be holding lock"; } @Override @@ -751,6 +773,7 @@ public synchronized void deprecateAll(Set deprecated) { for (Statement statement : deprecated) { innerDeprecate(statement, nextSnapshot); } + assert txnLock && txnLockManager.isHeldByCurrentThread() : "Should still be holding lock"; } @Override @@ -759,6 +782,7 @@ public synchronized void deprecate(Statement statement) throws SailException { invalidateCache(); requireCleanup = true; innerDeprecate(statement, nextSnapshot); + assert txnLock && txnLockManager.isHeldByCurrentThread() : "Should still be holding lock"; } private void innerDeprecate(Statement statement, int nextSnapshot) { @@ -795,7 +819,11 @@ private void acquireExclusiveTransactionLock() throws SailException { if (!txnLock) { synchronized (this) { if (!txnLock) { - txnLockManager.lock(); + try { + txnLockManager.lockInterruptibly(); + } catch (InterruptedException e) { + throw convertToSailException(e); + } nextSnapshot = currentSnapshot + 1; txnLock = true; } @@ -900,6 +928,7 @@ public boolean deprecateByQuery(Resource subj, IRI pred, Value obj, Resource[] c throw convertToSailException(e); } invalidateCache(); + assert txnLock && txnLockManager.isHeldByCurrentThread() : "Should still be holding lock"; return deprecated; } diff --git a/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ShaclSail.java b/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ShaclSail.java index eeb9c99cee9..ffccd562f2c 100644 --- a/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ShaclSail.java +++ b/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ShaclSail.java @@ -359,9 +359,14 @@ public void init() throws SailException { logger.info("Shapes will be persisted in: " + path); - shapesRepo = new SailRepository(new MemoryStore(new File(path))); + MemoryStore sail = new MemoryStore(new File(path)); + sail.setConnectionTimeOut(1000); + shapesRepo = new SailRepository(sail); + } else { - shapesRepo = new SailRepository(new MemoryStore()); + MemoryStore sail = new MemoryStore(); + sail.setConnectionTimeOut(1000); + shapesRepo = new SailRepository(sail); } shapesRepo.init(); diff --git a/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ShaclSailConnection.java b/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ShaclSailConnection.java index 43687cb2090..779bdd7472e 100644 --- a/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ShaclSailConnection.java +++ b/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ShaclSailConnection.java @@ -44,6 +44,7 @@ import org.eclipse.rdf4j.sail.SailConnectionListener; import org.eclipse.rdf4j.sail.SailException; import org.eclipse.rdf4j.sail.UpdateContext; +import org.eclipse.rdf4j.sail.helpers.AbstractSailConnection; import org.eclipse.rdf4j.sail.helpers.NotifyingSailConnectionWrapper; import org.eclipse.rdf4j.sail.memory.MemoryStore; import org.eclipse.rdf4j.sail.shacl.ShaclSail.TransactionSettings.ValidationApproach; @@ -739,6 +740,29 @@ synchronized public void close() throws SailException { return; } + if (getWrappedConnection() instanceof AbstractSailConnection) { + AbstractSailConnection abstractSailConnection = (AbstractSailConnection) getWrappedConnection(); + + if (Thread.currentThread() != abstractSailConnection.getOwner()) { + Thread owner = abstractSailConnection.getOwner(); + logger.error( + "Closing connection from a different thread than the one that opened it. Connections should not be shared between threads. Opened by " + + owner + " closed by " + Thread.currentThread(), + new Throwable("Throwable used for stacktrace")); + owner.interrupt(); + try { + owner.join(1000); + if (owner.isAlive()) { + logger.error("Interrupted thread {} but thread is still alive after 1000 ms!", owner); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SailException(e); + } + } + + } + try { if (isActive()) { rollback(); diff --git a/testsuites/sail/src/main/java/org/eclipse/rdf4j/testsuite/sail/SailConcurrencyTest.java b/testsuites/sail/src/main/java/org/eclipse/rdf4j/testsuite/sail/SailConcurrencyTest.java index 69c817621a6..f31175fb40e 100644 --- a/testsuites/sail/src/main/java/org/eclipse/rdf4j/testsuite/sail/SailConcurrencyTest.java +++ b/testsuites/sail/src/main/java/org/eclipse/rdf4j/testsuite/sail/SailConcurrencyTest.java @@ -17,15 +17,20 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.eclipse.rdf4j.common.iteration.CloseableIteration; +import org.eclipse.rdf4j.common.transaction.IsolationLevels; import org.eclipse.rdf4j.model.IRI; import org.eclipse.rdf4j.model.Resource; import org.eclipse.rdf4j.model.ValueFactory; +import org.eclipse.rdf4j.model.vocabulary.RDF; import org.eclipse.rdf4j.model.vocabulary.RDFS; import org.eclipse.rdf4j.sail.Sail; import org.eclipse.rdf4j.sail.SailConnection; import org.eclipse.rdf4j.sail.SailException; +import org.eclipse.rdf4j.sail.helpers.AbstractSail; +import org.eclipse.rdf4j.sail.helpers.SailWrapper; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -307,6 +312,108 @@ public void testGetContextIDs() throws Exception { } } + @Test + public void testConcurrentConnectionsShutdown() throws InterruptedException { + if (store instanceof AbstractSail) { + ((AbstractSail) store).setConnectionTimeOut(200); + } else if (store instanceof SailWrapper) { + Sail baseSail = ((SailWrapper) store).getBaseSail(); + if (baseSail instanceof AbstractSail) { + ((AbstractSail) baseSail).setConnectionTimeOut(200); + } + } + + CountDownLatch countDownLatch = new CountDownLatch(1); + Thread thread = new Thread(() -> { + SailConnection connection = store.getConnection(); + countDownLatch.countDown(); + connection.begin(IsolationLevels.NONE); + connection.addStatement(RDF.FIRST, RDF.TYPE, RDF.PROPERTY); + }); + thread.setName("Thread 1"); + thread.start(); + + CountDownLatch countDownLatch2 = new CountDownLatch(1); + Thread thread2 = new Thread(() -> { + SailConnection connection = store.getConnection(); + countDownLatch2.countDown(); + connection.begin(IsolationLevels.NONE); + connection.addStatement(RDF.REST, RDF.TYPE, RDF.PROPERTY); + + }); + thread2.setName("Thread 2"); + thread2.start(); + + countDownLatch.await(); + countDownLatch2.await(); + + Thread.sleep(1000); + + store.shutDown(); + + } + + @Test + public void testConcurrentConnectionsShutdownAndClose() throws InterruptedException { + if (store instanceof AbstractSail) { + ((AbstractSail) store).setConnectionTimeOut(200); + } + + try (SailConnection connection = store.getConnection()) { + connection.begin(); + connection.addStatement(RDF.TYPE, RDF.TYPE, RDF.PROPERTY); + connection.commit(); + } + + AtomicReference connection1 = new AtomicReference<>(); + AtomicReference connection2 = new AtomicReference<>(); + + CountDownLatch countDownLatch = new CountDownLatch(1); + Thread thread = new Thread(() -> { + connection1.set(store.getConnection()); + countDownLatch.countDown(); + connection1.get().begin(IsolationLevels.NONE); + connection1.get().clear(); + }); + thread.setName("Thread 1"); + thread.start(); + + CountDownLatch countDownLatch2 = new CountDownLatch(1); + Thread thread2 = new Thread(() -> { + connection2.set(store.getConnection()); + countDownLatch2.countDown(); + connection2.get().begin(IsolationLevels.NONE); + connection2.get().clear(); + + }); + thread2.setName("Thread 2"); + thread2.start(); + + countDownLatch.await(); + countDownLatch2.await(); + + Thread.sleep(1000); + + Thread thread3 = new Thread(() -> { + + }); + thread3.setName("Thread 3"); + thread3.start(); + + try { + if (thread2.isAlive()) { + connection2.get().close(); + connection1.get().close(); + } else { + connection1.get().close(); + connection2.get().close(); + } + } catch (SailException ignored) { + } + + store.shutDown(); + } + protected synchronized void fail(String message, Throwable t) { System.err.println(message); t.printStackTrace();