Skip to content

Commit

Permalink
fix bug in fedx
Browse files Browse the repository at this point in the history
  • Loading branch information
hmottestad committed Sep 30, 2023
1 parent 7b72a75 commit a566033
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,6 @@ public void remove() throws X {
*/
@Override
protected void handleClose() throws X {
try {
super.handleClose();
} finally {
Iterations.closeCloseable(iter);
}
Iterations.closeCloseable(iter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,6 @@ public void remove() throws X {
*/
@Override
protected void handleClose() throws X {
try {
super.handleClose();
} finally {
Iterations.closeCloseable(wrappedIter);
}
Iterations.closeCloseable(wrappedIter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -352,50 +352,58 @@ public TupleExpr optimize(TupleExpr expr, EvaluationStatistics evaluationStatist
public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(TupleExpr expr, BindingSet bindings)
throws QueryEvaluationException {

CloseableIteration<BindingSet, QueryEvaluationException> ret;
CloseableIteration<BindingSet, QueryEvaluationException> ret = null;

if (expr instanceof StatementPattern) {
ret = evaluate((StatementPattern) expr, bindings);
} else if (expr instanceof UnaryTupleOperator) {
ret = evaluate((UnaryTupleOperator) expr, bindings);
} else if (expr instanceof BinaryTupleOperator) {
ret = evaluate((BinaryTupleOperator) expr, bindings);
} else if (expr instanceof SingletonSet) {
ret = evaluate((SingletonSet) expr, bindings);
} else if (expr instanceof EmptySet) {
ret = evaluate((EmptySet) expr, bindings);
} else if (expr instanceof ZeroLengthPath) {
ret = evaluate((ZeroLengthPath) expr, bindings);
} else if (expr instanceof ArbitraryLengthPath) {
ret = evaluate((ArbitraryLengthPath) expr, bindings);
} else if (expr instanceof BindingSetAssignment) {
ret = evaluate((BindingSetAssignment) expr, bindings);
} else if (expr instanceof TripleRef) {
ret = evaluate((TripleRef) expr, bindings);
} else if (expr instanceof TupleFunctionCall) {
if (getQueryEvaluationMode().compareTo(QueryEvaluationMode.STANDARD) < 0) {
throw new QueryEvaluationException(
"Tuple function call not supported in query evaluation mode " + getQueryEvaluationMode());
try {
if (expr instanceof StatementPattern) {
ret = evaluate((StatementPattern) expr, bindings);
} else if (expr instanceof UnaryTupleOperator) {
ret = evaluate((UnaryTupleOperator) expr, bindings);
} else if (expr instanceof BinaryTupleOperator) {
ret = evaluate((BinaryTupleOperator) expr, bindings);
} else if (expr instanceof SingletonSet) {
ret = evaluate((SingletonSet) expr, bindings);
} else if (expr instanceof EmptySet) {
ret = evaluate((EmptySet) expr, bindings);
} else if (expr instanceof ZeroLengthPath) {
ret = evaluate((ZeroLengthPath) expr, bindings);
} else if (expr instanceof ArbitraryLengthPath) {
ret = evaluate((ArbitraryLengthPath) expr, bindings);
} else if (expr instanceof BindingSetAssignment) {
ret = evaluate((BindingSetAssignment) expr, bindings);
} else if (expr instanceof TripleRef) {
ret = evaluate((TripleRef) expr, bindings);
} else if (expr instanceof TupleFunctionCall) {
if (getQueryEvaluationMode().compareTo(QueryEvaluationMode.STANDARD) < 0) {
throw new QueryEvaluationException(
"Tuple function call not supported in query evaluation mode " + getQueryEvaluationMode());
}
return evaluate(expr, bindings);
} else if (expr == null) {
throw new IllegalArgumentException("expr must not be null");
} else {
throw new QueryEvaluationException("Unsupported tuple expr type: " + expr.getClass());
}
return evaluate((TupleFunctionCall) expr, bindings);
} else if (expr == null) {
throw new IllegalArgumentException("expr must not be null");
} else {
throw new QueryEvaluationException("Unsupported tuple expr type: " + expr.getClass());
}

if (trackTime) {
// set resultsSizeActual to at least be 0 so we can track iterations that don't procude anything
expr.setTotalTimeNanosActual(Math.max(0, expr.getTotalTimeNanosActual()));
ret = new TimedIterator(ret, expr);
}
if (trackTime) {
// set resultsSizeActual to at least be 0 so we can track iterations that don't procude anything
expr.setTotalTimeNanosActual(Math.max(0, expr.getTotalTimeNanosActual()));
ret = new TimedIterator(ret, expr);
}

if (trackResultSize) {
// set resultsSizeActual to at least be 0 so we can track iterations that don't procude anything
expr.setResultSizeActual(Math.max(0, expr.getResultSizeActual()));
ret = new ResultSizeCountingIterator(ret, expr);
if (trackResultSize) {
// set resultsSizeActual to at least be 0 so we can track iterations that don't procude anything
expr.setResultSizeActual(Math.max(0, expr.getResultSizeActual()));
ret = new ResultSizeCountingIterator(ret, expr);
}
return ret;

} catch (Throwable t) {
if (ret != null) {
ret.close();
}
throw t;
}
return ret;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,4 +220,21 @@ protected CloseableIteration<BindingSet, QueryEvaluationException> createNextIte
return strategy.evaluate(pattern, parentBindings);
}

@Override
protected void handleClose() throws QueryEvaluationException {
try {
super.handleClose();

} finally {
try {
if (currentDescribeExprIter != null)
currentDescribeExprIter.close();
} finally {

}
if (sourceIter instanceof CloseableIteration) {
((CloseableIteration<?, QueryEvaluationException>) sourceIter).close();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -349,11 +349,19 @@ public RepositoryResult<Statement> getStatements(Resource subj, IRI pred, Value
Resource... contexts) throws RepositoryException {
Objects.requireNonNull(contexts,
"contexts argument may not be null; either the value should be cast to Resource or an empty array should be supplied");

CloseableIteration<? extends Statement, SailException> statements = null;
try {
return createRepositoryResult(sailConnection.getStatements(subj, pred, obj, includeInferred, contexts));
} catch (SailException e) {
throw new RepositoryException("Unable to get statements from Sail", e);
statements = sailConnection.getStatements(subj, pred, obj, includeInferred, contexts);
return createRepositoryResult(statements);
} catch (Throwable t) {
if (statements != null) {
statements.close();
}
if (t instanceof SailException) {
throw new RepositoryException("Unable to get statements from Sail", t);
} else {
throw t;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,14 @@ public void shutDown() throws SailException {

if (con instanceof AbstractSailConnection) {
AbstractSailConnection sailCon = (AbstractSailConnection) con;
sailCon.getOwner().interrupt();
sailCon.getOwner().join(1000);
if (sailCon.getOwner().isAlive()) {
logger.error(
"Closing active connection due to shut down and interrupted the owning thread of the connection {} but thread is still alive after 1000 ms!",
sailCon.getOwner());
if (sailCon.getOwner() != Thread.currentThread()) {
sailCon.getOwner().interrupt();
sailCon.getOwner().join(1000);
if (sailCon.getOwner().isAlive()) {
logger.error(
"Closing active connection due to shut down and interrupted the owning thread of the connection {} but thread is still alive after 1000 ms!",
sailCon.getOwner());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,32 +247,7 @@ public boolean isActive() throws UnknownSailTransactionStateException {

@Override
public final void close() throws SailException {
Thread deadlockPreventionThread = null;

if (Thread.currentThread() != owner) {
logger.info(
"Closing connection from a different thread than the one that opened it. Connections should not be shared between threads. Opened by "
+ owner + " closed by " + Thread.currentThread(),
new Throwable("Throwable used for stacktrace"));

deadlockPreventionThread = new Thread(() -> {
try {
Thread.sleep(sailBase.connectionTimeOut);
owner.interrupt();

owner.join(1000);
if (owner.isAlive()) {
logger.error("Interrupted thread {} but thread is still alive after 1000 ms!", owner);
}

} catch (InterruptedException ignored) {
}

});
deadlockPreventionThread.setDaemon(true);
deadlockPreventionThread.start();

}
Thread deadlockPreventionThread = startDeadlockPreventionThread();

// obtain an exclusive lock so that any further operations on this
// connection (including those from any concurrent threads) are blocked.
Expand Down Expand Up @@ -333,6 +308,36 @@ public final void close() throws SailException {

}

private Thread startDeadlockPreventionThread() {
Thread deadlockPreventionThread = null;

if (Thread.currentThread() != owner) {
logger.info(
"Closing connection from a different thread than the one that opened it. Connections should not be shared between threads. Opened by "
+ owner + " closed by " + Thread.currentThread(),
new Throwable("Throwable used for stacktrace"));

deadlockPreventionThread = new Thread(() -> {
try {
Thread.sleep(sailBase.connectionTimeOut);
owner.interrupt();

owner.join(1000);
if (owner.isAlive()) {
logger.error("Interrupted thread {} but thread is still alive after 1000 ms!", owner);
}

} catch (InterruptedException ignored) {
}

});
deadlockPreventionThread.setDaemon(true);
deadlockPreventionThread.start();

}
return deadlockPreventionThread;
}

@Override
public final CloseableIteration<? extends BindingSet, QueryEvaluationException> evaluate(TupleExpr tupleExpr,
Dataset dataset, BindingSet bindings, boolean includeInferred) throws SailException {
Expand Down Expand Up @@ -998,50 +1003,51 @@ protected AbstractSail getSailBase() {
}

private void forceCloseActiveOperations() throws SailException {
if (owner != Thread.currentThread()) {
owner.interrupt();
try {
owner.join(1000);
} catch (InterruptedException e) {
throw new SailException(e);
}
}

for (int i = 0; i < 10 && isActiveOperation() && !debugEnabled; i++) {
System.gc();
try {
Thread.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Thread deadlockPreventionThread = startDeadlockPreventionThread();
try {
for (int i = 0; i < 10 && isActiveOperation() && !debugEnabled; i++) {
System.gc();
try {
Thread.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SailException(e);
}
}
}

if (debugEnabled) {

var activeIterationsCopy = new IdentityHashMap<>(activeIterationsDebug);
activeIterationsDebug.clear();

if (!activeIterationsCopy.isEmpty()) {
for (var entry : activeIterationsCopy.entrySet()) {
try {
logger.warn("Unclosed iteration", entry.getValue());
entry.getKey().close();
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
if (debugEnabled) {

var activeIterationsCopy = new IdentityHashMap<>(activeIterationsDebug);
activeIterationsDebug.clear();

if (!activeIterationsCopy.isEmpty()) {
for (var entry : activeIterationsCopy.entrySet()) {
try {
logger.warn("Unclosed iteration", entry.getValue());
entry.getKey().close();
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
throw new SailException(e);
}
logger.warn("Exception occurred while closing unclosed iterations.", e);
}
logger.warn("Exception occurred while closing unclosed iterations.", e);
}
}

var entry = activeIterationsCopy.entrySet().stream().findAny().orElseThrow();
var entry = activeIterationsCopy.entrySet().stream().findAny().orElseThrow();

throw new SailException(
"Connection closed before all iterations were closed: " + entry.getKey().toString(),
entry.getValue());
}
throw new SailException(
"Connection closed before all iterations were closed: " + entry.getKey().toString(),
entry.getValue());
}

}
} finally {
if (deadlockPreventionThread != null) {
deadlockPreventionThread.interrupt();
}
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -872,10 +872,19 @@ public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(Describ
throw new FedXRuntimeException(
"Expected a FedXDescribeOperator Node. Found " + operator.getClass() + " instead.");
}
CloseableIteration<BindingSet, QueryEvaluationException> iter = evaluate(operator.getArg(), bindings);
// Note: we need to evaluate the DESCRIBE over the entire federation
return new FederatedDescribeIteration(iter, this, operator.getBindingNames(), bindings,
((FederatedDescribeOperator) operator).getQueryInfo());
CloseableIteration<BindingSet, QueryEvaluationException> iter = null;
try {
iter = evaluate(operator.getArg(), bindings);
// Note: we need to evaluate the DESCRIBE over the entire federation
return new FederatedDescribeIteration(iter, this, operator.getBindingNames(), bindings,
((FederatedDescribeOperator) operator).getQueryInfo());
} catch (Throwable t) {
if (iter != null) {
iter.close();
}
throw t;
}

}

protected CloseableIteration<BindingSet, QueryEvaluationException> evaluateAtStatementSources(Object preparedQuery,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,11 @@ public void remove() throws QueryEvaluationException {
@Override
protected void handleClose() throws QueryEvaluationException {
try {
super.handleClose();
inner.close();
} finally {
try {
inner.close();
} finally {
// make sure to close all scheduled / running parallel executions
// (e.g. if the query result is not fully consumed)
queryInfo.close();
}

// make sure to close all scheduled / running parallel executions
// (e.g. if the query result is not fully consumed)
queryInfo.close();
}
}

Expand Down
Loading

0 comments on commit a566033

Please sign in to comment.