Skip to content

Commit

Permalink
Merge branch 'main' into merge-main
Browse files Browse the repository at this point in the history
# Conflicts:
#	core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/impl/DefaultEvaluationStrategy.java
#	core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSailConnection.java
#	tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java
  • Loading branch information
hmottestad committed Oct 3, 2023
2 parents 080b086 + c0ab946 commit c0abaab
Show file tree
Hide file tree
Showing 29 changed files with 547 additions and 96 deletions.
9 changes: 4 additions & 5 deletions .github/workflows/dash-license.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,16 @@ jobs:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v2
- name: Set up JDK
uses: actions/setup-java@v3
uses: actions/setup-java@v1
with:
distribution: 'temurin'
java-version: '20'
java-version: '21'
- name: Cache local Maven repository
uses: actions/cache@v3
with:
path: ~/.m2/repository
key: ${{ runner.os }}-jdk20-maven-${{ hashFiles('**/pom.xml') }}
key: ${{ runner.os }}-jdk21-maven-${{ hashFiles('**/pom.xml') }}
restore-keys: |
${{ runner.os }}-jdk20-maven-
- name: Run license-check
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/develop-status.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
jdk: [11, 20]
jdk: [11, 21]

steps:
- uses: actions/checkout@v2
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/main-status.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
jdk: [11, 20]
jdk: [11, 21]

steps:
- uses: actions/checkout@v2
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pr-verify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
jdk: [11, 20]
jdk: [11, 21]
steps:
- uses: actions/checkout@v2
- name: Set up JDK
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,9 +715,20 @@ public void close() throws QueryEvaluationException {
protected QueryEvaluationStep prepare(DescribeOperator node, QueryEvaluationContext context)
throws QueryEvaluationException {
QueryEvaluationStep child = precompile(node.getArg(), context);
return bs -> new DescribeIteration(child.evaluate(bs), DefaultEvaluationStrategy.this,
node.getBindingNames(),
bs);
return bs -> {
CloseableIteration<BindingSet> evaluate = null;

try {
evaluate = child.evaluate(bs);
return new DescribeIteration(evaluate, DefaultEvaluationStrategy.this, node.getBindingNames(), bs);
} catch (Throwable t) {
if (evaluate != null) {
evaluate.close();
}
throw t;
}

};
}

protected QueryEvaluationStep prepare(Distinct node, QueryEvaluationContext context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,4 +219,20 @@ protected CloseableIteration<BindingSet> createNextIteration(Value subject, Valu
return strategy.evaluate(pattern, parentBindings);
}

@Override
protected void handleClose() throws QueryEvaluationException {
try {
super.handleClose();

} finally {
try {
if (currentDescribeExprIter != null) {
currentDescribeExprIter.close();
}
} finally {
sourceIter.close();
}

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -349,11 +349,19 @@ public RepositoryResult<Statement> getStatements(Resource subj, IRI pred, Value
Resource... contexts) throws RepositoryException {
Objects.requireNonNull(contexts,
"contexts argument may not be null; either the value should be cast to Resource or an empty array should be supplied");

CloseableIteration<? extends Statement> statements = null;
try {
return createRepositoryResult(sailConnection.getStatements(subj, pred, obj, includeInferred, contexts));
} catch (SailException e) {
throw new RepositoryException("Unable to get statements from Sail", e);
statements = sailConnection.getStatements(subj, pred, obj, includeInferred, contexts);
return createRepositoryResult(statements);
} catch (Throwable t) {
if (statements != null) {
statements.close();
}
if (t instanceof SailException) {
throw new RepositoryException("Unable to get statements from Sail", t);
} else {
throw t;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,38 @@ public void shutDown() throws SailException {
activeConnectionsCopy = new IdentityHashMap<>(activeConnections);
}

// Interrupt any threads that are still using a connection, in case they are waiting for a lock
for (Map.Entry<SailConnection, Throwable> entry : activeConnectionsCopy.entrySet()) {
try {
SailConnection con = entry.getKey();

if (con instanceof AbstractSailConnection) {
AbstractSailConnection sailCon = (AbstractSailConnection) con;
Thread owner = sailCon.getOwner();
if (owner != Thread.currentThread()) {
owner.interrupt();
// wait up to 1 second for the owner thread to die
owner.join(1000);
if (owner.isAlive()) {
logger.error(
"Closing active connection due to shut down and interrupted the owning thread of the connection {} but thread is still alive after 1000 ms!",
owner);
}
}
}

} catch (Throwable e) {
if (e instanceof InterruptedException) {
throw new SailException(e);
} else if (e instanceof AssertionError) {
// ignore assertions errors
} else if (e instanceof Error) {
throw (Error) e;
}
// ignore all other exceptions
}
}

// Forcefully close any connections that are still open
for (Map.Entry<SailConnection, Throwable> entry : activeConnectionsCopy.entrySet()) {
SailConnection con = entry.getKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand Down Expand Up @@ -97,15 +99,7 @@ public abstract class AbstractSailConnection implements SailConnection {
private boolean isOpen = true;
private static final VarHandle IS_OPEN;

static {
try {
IS_OPEN = MethodHandles.lookup()
.in(AbstractSailConnection.class)
.findVarHandle(AbstractSailConnection.class, "isOpen", boolean.class);
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
private Thread owner;

/**
* Lock used to prevent concurrent calls to update methods like addStatement, clear, commit, etc. within a
Expand Down Expand Up @@ -152,6 +146,7 @@ public AbstractSailConnection(AbstractSail sailBase) {
} else {
activeIterationsDebug = Collections.emptyMap();
}
owner = Thread.currentThread();
}

/*---------*
Expand Down Expand Up @@ -236,6 +231,8 @@ public boolean isActive() throws UnknownSailTransactionStateException {

@Override
public final void close() throws SailException {
Thread deadlockPreventionThread = startDeadlockPreventionThread();

// obtain an exclusive lock so that any further operations on this
// connection (including those from any concurrent threads) are blocked.
if (!IS_OPEN.compareAndSet(this, true, false)) {
Expand All @@ -249,7 +246,12 @@ public final void close() throws SailException {
if (sumDone == sumBlocking) {
break;
} else {
Thread.onSpinWait();
if (Thread.currentThread().isInterrupted()) {
throw new SailException(
"Connection was interrupted while waiting on active operations before it could be closed.");
} else {
LockSupport.parkNanos(Duration.ofMillis(10).toNanos());
}
}
}

Expand Down Expand Up @@ -278,10 +280,58 @@ public final void close() throws SailException {
sailBase.connectionClosed(this);
}
} finally {
if (deadlockPreventionThread != null) {
deadlockPreventionThread.interrupt();
}
}

}

/**
* If the current thread is not the owner, starts a thread to handle potential deadlocks by interrupting the owner.
*
* @return The started deadlock prevention thread or null if the current thread is the owner.
*/
private Thread startDeadlockPreventionThread() {
Thread deadlockPreventionThread = null;

if (Thread.currentThread() != owner) {

if (logger.isInfoEnabled()) {
// use info level for this because FedX prevalently closes connections from different threads
logger.info(
"Closing connection from a different thread than the one that opened it. Connections should not be shared between threads. Opened by {} closed by {}",
owner, Thread.currentThread(), new Throwable("Throwable used for stacktrace"));
}

deadlockPreventionThread = new Thread(() -> {
try {
// This thread should sleep for a while so that the callee has a chance to finish.
// The callee will interrupt this thread when it is finished, which means that there were no
// deadlocks and we can exit.
Thread.sleep(sailBase.connectionTimeOut / 2);

owner.interrupt();
// wait for up to 1 second for the owner thread to die
owner.join(1000);
if (owner.isAlive()) {
logger.error("Interrupted thread {} but thread is still alive after 1000 ms!", owner);
}

} catch (InterruptedException ignored) {
// this thread is interrupted as a signal that there were no deadlocks, so the exception can be
// ignored and we can simply exit
}

});

deadlockPreventionThread.setDaemon(true);
deadlockPreventionThread.start();

}
return deadlockPreventionThread;
}

@Override
public final CloseableIteration<? extends BindingSet> evaluate(TupleExpr tupleExpr,
Dataset dataset, BindingSet bindings, boolean includeInferred) throws SailException {
Expand Down Expand Up @@ -764,6 +814,16 @@ protected void setStatementsRemoved() {
statementsRemoved = true;
}

/**
* This is for internal use only. It returns the thread that opened this connection.
*
* @return the thread that opened this connection.
*/
@InternalUseOnly
public Thread getOwner() {
return owner;
}

/**
* Registers an iteration as active by wrapping it in a {@link SailBaseIteration} object and adding it to the list
* of active iterations.
Expand Down Expand Up @@ -847,40 +907,50 @@ protected AbstractSail getSailBase() {
}

private void forceCloseActiveOperations() throws SailException {
for (int i = 0; i < 10 && isActiveOperation() && !debugEnabled; i++) {
System.gc();
try {
Thread.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Thread deadlockPreventionThread = startDeadlockPreventionThread();
try {
for (int i = 0; i < 10 && isActiveOperation() && !debugEnabled; i++) {
System.gc();
try {
Thread.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SailException(e);
}
}
}

if (debugEnabled) {

var activeIterationsCopy = new IdentityHashMap<>(activeIterationsDebug);
activeIterationsDebug.clear();

if (!activeIterationsCopy.isEmpty()) {
for (var entry : activeIterationsCopy.entrySet()) {
try {
logger.warn("Unclosed iteration", entry.getValue());
entry.getKey().close();
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
if (debugEnabled) {

var activeIterationsCopy = new IdentityHashMap<>(activeIterationsDebug);
activeIterationsDebug.clear();

if (!activeIterationsCopy.isEmpty()) {
for (var entry : activeIterationsCopy.entrySet()) {
try {
logger.warn("Unclosed iteration", entry.getValue());
entry.getKey().close();
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
throw new SailException(e);
}
logger.warn("Exception occurred while closing unclosed iterations.", e);
}
logger.warn("Exception occurred while closing unclosed iterations.", e);
}
}

var entry = activeIterationsCopy.entrySet().stream().findAny().orElseThrow();
var entry = activeIterationsCopy.entrySet().stream().findAny().orElseThrow();

throw new SailException(
"Connection closed before all iterations were closed: " + entry.getKey().toString(),
entry.getValue());
throw new SailException(
"Connection closed before all iterations were closed: " + entry.getKey().toString(),
entry.getValue());
}
}
} finally {
if (deadlockPreventionThread != null) {
deadlockPreventionThread.interrupt();
}
}

}

/**
Expand Down Expand Up @@ -1025,4 +1095,14 @@ public synchronized void release() {
}
}
}

static {
try {
IS_OPEN = MethodHandles.lookup()
.in(AbstractSailConnection.class)
.findVarHandle(AbstractSailConnection.class, "isOpen", boolean.class);
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,5 @@ public Supplier<CollectionFactory> getCollectionFactory() {
verifyBaseSailSet();
return baseSail.getCollectionFactory();
}

}
Loading

0 comments on commit c0abaab

Please sign in to comment.