Skip to content

Commit

Permalink
GH-4784 interrupt thread that is owning a connection before closing i…
Browse files Browse the repository at this point in the history
…f we are closing from a different thread
  • Loading branch information
hmottestad committed Sep 29, 2023
1 parent 6bc981a commit 6292077
Show file tree
Hide file tree
Showing 8 changed files with 256 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,34 @@ public void shutDown() throws SailException {
activeConnectionsCopy = new IdentityHashMap<>(activeConnections);
}

// Interrupt any threads that are still using a connection, in case they are waiting for a lock
for (Map.Entry<SailConnection, Throwable> entry : activeConnectionsCopy.entrySet()) {
try {
SailConnection con = entry.getKey();

if (con instanceof AbstractSailConnection) {
AbstractSailConnection sailCon = (AbstractSailConnection) con;
sailCon.getOwner().interrupt();
sailCon.getOwner().join(1000);
if (sailCon.getOwner().isAlive()) {
logger.error(
"Closing active connection due to shut down and interrupted the owning thread of the connection {} but thread is still alive after 1000 ms!",
sailCon.getOwner());
}
}

} catch (Throwable e) {
if (e instanceof InterruptedException) {
throw new SailException(e);
} else if (e instanceof AssertionError) {
// ignore assertions errors
} else if (e instanceof Error) {
throw (Error) e;
}
// ignore all other exceptions
}
}

// Forcefully close any connections that are still open
for (Map.Entry<SailConnection, Throwable> entry : activeConnectionsCopy.entrySet()) {
SailConnection con = entry.getKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand Down Expand Up @@ -98,15 +100,7 @@ public abstract class AbstractSailConnection implements SailConnection {
private boolean isOpen = true;
private static final VarHandle IS_OPEN;

static {
try {
IS_OPEN = MethodHandles.lookup()
.in(AbstractSailConnection.class)
.findVarHandle(AbstractSailConnection.class, "isOpen", boolean.class);
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
private Thread owner;

/**
* Lock used to prevent concurrent calls to update methods like addStatement, clear, commit, etc. within a
Expand Down Expand Up @@ -161,6 +155,7 @@ public AbstractSailConnection(AbstractSail sailBase) {
} else {
activeIterationsDebug = Collections.emptyMap();
}
owner = Thread.currentThread();
}

/*---------*
Expand Down Expand Up @@ -252,6 +247,23 @@ public boolean isActive() throws UnknownSailTransactionStateException {

@Override
public final void close() throws SailException {
if (Thread.currentThread() != owner) {
logger.error(
"Closing connection from a different thread than the one that opened it. Connections should not be shared between threads. Opened by "
+ owner + " closed by " + Thread.currentThread(),
new Throwable("Throwable used for stacktrace"));
owner.interrupt();
try {
owner.join(1000);
if (owner.isAlive()) {
logger.error("Interrupted thread {} but thread is still alive after 1000 ms!", owner);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SailException(e);
}
}

// obtain an exclusive lock so that any further operations on this
// connection (including those from any concurrent threads) are blocked.
if (!IS_OPEN.compareAndSet(this, true, false)) {
Expand All @@ -268,7 +280,7 @@ public final void close() throws SailException {
if (sumDone == sumBlocking) {
break;
} else {
Thread.onSpinWait();
LockSupport.parkNanos(Duration.ofMillis(10).toNanos());
}
}

Expand Down Expand Up @@ -876,6 +888,10 @@ protected Lock getTransactionLock() throws SailException {
return new JavaLock(updateLock);
}

public Thread getOwner() {
return owner;
}

/**
* Registers an iteration as active by wrapping it in a {@link SailBaseIteration} object and adding it to the list
* of active iterations.
Expand Down Expand Up @@ -959,6 +975,15 @@ protected AbstractSail getSailBase() {
}

private void forceCloseActiveOperations() throws SailException {
if (owner != Thread.currentThread()) {
owner.interrupt();
try {
owner.join(1000);
} catch (InterruptedException e) {
throw new SailException(e);
}
}

for (int i = 0; i < 10 && isActiveOperation() && !debugEnabled; i++) {
System.gc();
try {
Expand Down Expand Up @@ -1138,4 +1163,14 @@ public synchronized void release() {
}
}
}

static {
try {
IS_OPEN = MethodHandles.lookup()
.in(AbstractSailConnection.class)
.findVarHandle(AbstractSailConnection.class, "isOpen", boolean.class);
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,5 @@ public Supplier<CollectionFactory> getCollectionFactory() {
verifyBaseSailSet();
return baseSail.getCollectionFactory();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -20,6 +21,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;

import org.eclipse.rdf4j.common.iteration.CloseableIteration;
Expand Down Expand Up @@ -236,7 +238,8 @@ public void close() throws SailException {
running.set(false);
tripleStoreExecutor.shutdown();
while (!tripleStoreExecutor.isTerminated()) {
Thread.yield();
tripleStoreExecutor.shutdownNow();
LockSupport.parkNanos(Duration.ofMillis(100).toNanos());
}
tripleStore.close();
}
Expand Down Expand Up @@ -561,6 +564,9 @@ private void startTransaction(boolean preferThreading) throws SailException {
op.execute();
}
} else {
if (Thread.interrupted()) {
throw new InterruptedException();
}
Thread.yield();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,10 +577,10 @@ public MemorySailSink(boolean explicit, boolean serializable) throws SailExcepti
this.explicit = explicit;
if (serializable) {
this.serializable = currentSnapshot;
reservedSnapshot = snapshotMonitor.reserve(this.serializable, this);
this.reservedSnapshot = snapshotMonitor.reserve(this.serializable, this);
} else {
this.serializable = Integer.MAX_VALUE;
reservedSnapshot = null;
this.reservedSnapshot = null;
}
}

Expand Down Expand Up @@ -631,6 +631,7 @@ public synchronized void prepare() throws SailException {
}
}
}
assert txnLock && txnLockManager.isHeldByCurrentThread() : "Should still be holding lock";
}

@Override
Expand All @@ -641,6 +642,7 @@ public synchronized void flush() throws SailException {
if (requireCleanup) {
scheduleSnapshotCleanup();
}
assert txnLock && txnLockManager.isHeldByCurrentThread() : "Should still be holding lock";
}
}

Expand All @@ -653,12 +655,26 @@ public void close() {
reservedSnapshot.release();
}
} finally {
boolean toCloseTxnLock = txnLock;
txnLock = false;
if (toCloseTxnLock) {
txnLockManager.unlock();
try {
releaseLock();
} finally {
observations = null;
}
observations = null;

}

}
}

synchronized private void releaseLock() {
if (txnLock) {
try {
txnLock = false;
txnLockManager.unlock();
} catch (IllegalMonitorStateException t) {
txnLock = true;
throw new SailException("Failed to release lock from thread " + Thread.currentThread()
+ " because it was locked by another thread.", t);
}

}
Expand All @@ -668,18 +684,21 @@ public void close() {
public synchronized void setNamespace(String prefix, String name) {
acquireExclusiveTransactionLock();
namespaceStore.setNamespace(prefix, name);
assert txnLock && txnLockManager.isHeldByCurrentThread() : "Should still be holding lock";
}

@Override
public synchronized void removeNamespace(String prefix) {
acquireExclusiveTransactionLock();
namespaceStore.removeNamespace(prefix);
assert txnLock && txnLockManager.isHeldByCurrentThread() : "Should still be holding lock";
}

@Override
public synchronized void clearNamespaces() {
acquireExclusiveTransactionLock();
namespaceStore.clear();
assert txnLock && txnLockManager.isHeldByCurrentThread() : "Should still be holding lock";
}

@Override
Expand Down Expand Up @@ -715,6 +734,7 @@ public synchronized void clear(Resource... contexts) {
} catch (InterruptedException e) {
throw convertToSailException(e);
}
assert txnLock && txnLockManager.isHeldByCurrentThread() : "Should still be holding lock";
}

@Override
Expand All @@ -726,6 +746,7 @@ public synchronized void approve(Resource subj, IRI pred, Value obj, Resource ct
} catch (InterruptedException e) {
throw convertToSailException(e);
}
assert txnLock && txnLockManager.isHeldByCurrentThread() : "Should still be holding lock";
}

@Override
Expand All @@ -740,6 +761,7 @@ public synchronized void approveAll(Set<Statement> approved, Set<Resource> appro
} catch (InterruptedException e) {
throw convertToSailException(e);
}
assert txnLock && txnLockManager.isHeldByCurrentThread() : "Should still be holding lock";
}

@Override
Expand All @@ -751,6 +773,7 @@ public synchronized void deprecateAll(Set<Statement> deprecated) {
for (Statement statement : deprecated) {
innerDeprecate(statement, nextSnapshot);
}
assert txnLock && txnLockManager.isHeldByCurrentThread() : "Should still be holding lock";
}

@Override
Expand All @@ -759,6 +782,7 @@ public synchronized void deprecate(Statement statement) throws SailException {
invalidateCache();
requireCleanup = true;
innerDeprecate(statement, nextSnapshot);
assert txnLock && txnLockManager.isHeldByCurrentThread() : "Should still be holding lock";
}

private void innerDeprecate(Statement statement, int nextSnapshot) {
Expand Down Expand Up @@ -795,7 +819,11 @@ private void acquireExclusiveTransactionLock() throws SailException {
if (!txnLock) {
synchronized (this) {
if (!txnLock) {
txnLockManager.lock();
try {
txnLockManager.lockInterruptibly();
} catch (InterruptedException e) {
throw convertToSailException(e);
}
nextSnapshot = currentSnapshot + 1;
txnLock = true;
}
Expand Down Expand Up @@ -900,6 +928,7 @@ public boolean deprecateByQuery(Resource subj, IRI pred, Value obj, Resource[] c
throw convertToSailException(e);
}
invalidateCache();
assert txnLock && txnLockManager.isHeldByCurrentThread() : "Should still be holding lock";

return deprecated;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,14 @@ public void init() throws SailException {

logger.info("Shapes will be persisted in: " + path);

shapesRepo = new SailRepository(new MemoryStore(new File(path)));
MemoryStore sail = new MemoryStore(new File(path));
sail.setConnectionTimeOut(1000);
shapesRepo = new SailRepository(sail);

} else {
shapesRepo = new SailRepository(new MemoryStore());
MemoryStore sail = new MemoryStore();
sail.setConnectionTimeOut(1000);
shapesRepo = new SailRepository(sail);
}

shapesRepo.init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.eclipse.rdf4j.sail.SailConnectionListener;
import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.UpdateContext;
import org.eclipse.rdf4j.sail.helpers.AbstractSailConnection;
import org.eclipse.rdf4j.sail.helpers.NotifyingSailConnectionWrapper;
import org.eclipse.rdf4j.sail.memory.MemoryStore;
import org.eclipse.rdf4j.sail.shacl.ShaclSail.TransactionSettings.ValidationApproach;
Expand Down Expand Up @@ -739,6 +740,29 @@ synchronized public void close() throws SailException {
return;
}

if (getWrappedConnection() instanceof AbstractSailConnection) {
AbstractSailConnection abstractSailConnection = (AbstractSailConnection) getWrappedConnection();

if (Thread.currentThread() != abstractSailConnection.getOwner()) {
Thread owner = abstractSailConnection.getOwner();
logger.error(
"Closing connection from a different thread than the one that opened it. Connections should not be shared between threads. Opened by "
+ owner + " closed by " + Thread.currentThread(),
new Throwable("Throwable used for stacktrace"));
owner.interrupt();
try {
owner.join(1000);
if (owner.isAlive()) {
logger.error("Interrupted thread {} but thread is still alive after 1000 ms!", owner);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SailException(e);
}
}

}

try {
if (isActive()) {
rollback();
Expand Down
Loading

0 comments on commit 6292077

Please sign in to comment.