From a56603326c3cebb0ff81d446336f1168f4cd5706 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ha=CC=8Avard=20Ottestad?= Date: Sat, 30 Sep 2023 12:47:37 +0200 Subject: [PATCH] fix bug in fedx --- .../common/iteration/ConvertingIteration.java | 6 +- .../common/iteration/IterationWrapper.java | 6 +- .../impl/DefaultEvaluationStrategy.java | 86 ++++++------ .../iterator/DescribeIteration.java | 17 +++ .../sail/SailRepositoryConnection.java | 16 ++- .../rdf4j/sail/helpers/AbstractSail.java | 14 +- .../sail/helpers/AbstractSailConnection.java | 130 +++++++++--------- .../evaluation/FederationEvalStrategy.java | 17 ++- ...opRemainingExecutionsOnCloseIteration.java | 13 +- .../eclipse/rdf4j/federated/FedXBaseTest.java | 8 +- 10 files changed, 178 insertions(+), 135 deletions(-) diff --git a/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/ConvertingIteration.java b/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/ConvertingIteration.java index eace1cd3e35..1d3a691bd5f 100644 --- a/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/ConvertingIteration.java +++ b/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/ConvertingIteration.java @@ -106,10 +106,6 @@ public void remove() throws X { */ @Override protected void handleClose() throws X { - try { - super.handleClose(); - } finally { - Iterations.closeCloseable(iter); - } + Iterations.closeCloseable(iter); } } diff --git a/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/IterationWrapper.java b/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/IterationWrapper.java index 4c3243e0833..b4f3fe7fd56 100644 --- a/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/IterationWrapper.java +++ b/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/IterationWrapper.java @@ -121,10 +121,6 @@ public void remove() throws X { */ @Override protected void handleClose() throws X { - try { - super.handleClose(); - } finally { - Iterations.closeCloseable(wrappedIter); - } + Iterations.closeCloseable(wrappedIter); } } diff --git a/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/impl/DefaultEvaluationStrategy.java b/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/impl/DefaultEvaluationStrategy.java index 08ac9611680..91326abd064 100644 --- a/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/impl/DefaultEvaluationStrategy.java +++ b/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/impl/DefaultEvaluationStrategy.java @@ -352,50 +352,58 @@ public TupleExpr optimize(TupleExpr expr, EvaluationStatistics evaluationStatist public CloseableIteration evaluate(TupleExpr expr, BindingSet bindings) throws QueryEvaluationException { - CloseableIteration ret; + CloseableIteration ret = null; - if (expr instanceof StatementPattern) { - ret = evaluate((StatementPattern) expr, bindings); - } else if (expr instanceof UnaryTupleOperator) { - ret = evaluate((UnaryTupleOperator) expr, bindings); - } else if (expr instanceof BinaryTupleOperator) { - ret = evaluate((BinaryTupleOperator) expr, bindings); - } else if (expr instanceof SingletonSet) { - ret = evaluate((SingletonSet) expr, bindings); - } else if (expr instanceof EmptySet) { - ret = evaluate((EmptySet) expr, bindings); - } else if (expr instanceof ZeroLengthPath) { - ret = evaluate((ZeroLengthPath) expr, bindings); - } else if (expr instanceof ArbitraryLengthPath) { - ret = evaluate((ArbitraryLengthPath) expr, bindings); - } else if (expr instanceof BindingSetAssignment) { - ret = evaluate((BindingSetAssignment) expr, bindings); - } else if (expr instanceof TripleRef) { - ret = evaluate((TripleRef) expr, bindings); - } else if (expr instanceof TupleFunctionCall) { - if (getQueryEvaluationMode().compareTo(QueryEvaluationMode.STANDARD) < 0) { - throw new QueryEvaluationException( - "Tuple function call not supported in query evaluation mode " + getQueryEvaluationMode()); + try { + if (expr instanceof StatementPattern) { + ret = evaluate((StatementPattern) expr, bindings); + } else if (expr instanceof UnaryTupleOperator) { + ret = evaluate((UnaryTupleOperator) expr, bindings); + } else if (expr instanceof BinaryTupleOperator) { + ret = evaluate((BinaryTupleOperator) expr, bindings); + } else if (expr instanceof SingletonSet) { + ret = evaluate((SingletonSet) expr, bindings); + } else if (expr instanceof EmptySet) { + ret = evaluate((EmptySet) expr, bindings); + } else if (expr instanceof ZeroLengthPath) { + ret = evaluate((ZeroLengthPath) expr, bindings); + } else if (expr instanceof ArbitraryLengthPath) { + ret = evaluate((ArbitraryLengthPath) expr, bindings); + } else if (expr instanceof BindingSetAssignment) { + ret = evaluate((BindingSetAssignment) expr, bindings); + } else if (expr instanceof TripleRef) { + ret = evaluate((TripleRef) expr, bindings); + } else if (expr instanceof TupleFunctionCall) { + if (getQueryEvaluationMode().compareTo(QueryEvaluationMode.STANDARD) < 0) { + throw new QueryEvaluationException( + "Tuple function call not supported in query evaluation mode " + getQueryEvaluationMode()); + } + return evaluate(expr, bindings); + } else if (expr == null) { + throw new IllegalArgumentException("expr must not be null"); + } else { + throw new QueryEvaluationException("Unsupported tuple expr type: " + expr.getClass()); } - return evaluate((TupleFunctionCall) expr, bindings); - } else if (expr == null) { - throw new IllegalArgumentException("expr must not be null"); - } else { - throw new QueryEvaluationException("Unsupported tuple expr type: " + expr.getClass()); - } - if (trackTime) { - // set resultsSizeActual to at least be 0 so we can track iterations that don't procude anything - expr.setTotalTimeNanosActual(Math.max(0, expr.getTotalTimeNanosActual())); - ret = new TimedIterator(ret, expr); - } + if (trackTime) { + // set resultsSizeActual to at least be 0 so we can track iterations that don't procude anything + expr.setTotalTimeNanosActual(Math.max(0, expr.getTotalTimeNanosActual())); + ret = new TimedIterator(ret, expr); + } - if (trackResultSize) { - // set resultsSizeActual to at least be 0 so we can track iterations that don't procude anything - expr.setResultSizeActual(Math.max(0, expr.getResultSizeActual())); - ret = new ResultSizeCountingIterator(ret, expr); + if (trackResultSize) { + // set resultsSizeActual to at least be 0 so we can track iterations that don't procude anything + expr.setResultSizeActual(Math.max(0, expr.getResultSizeActual())); + ret = new ResultSizeCountingIterator(ret, expr); + } + return ret; + + } catch (Throwable t) { + if (ret != null) { + ret.close(); + } + throw t; } - return ret; } @Override diff --git a/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/iterator/DescribeIteration.java b/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/iterator/DescribeIteration.java index 8f8de47721e..3ee75a19595 100644 --- a/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/iterator/DescribeIteration.java +++ b/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/iterator/DescribeIteration.java @@ -220,4 +220,21 @@ protected CloseableIteration createNextIte return strategy.evaluate(pattern, parentBindings); } + @Override + protected void handleClose() throws QueryEvaluationException { + try { + super.handleClose(); + + } finally { + try { + if (currentDescribeExprIter != null) + currentDescribeExprIter.close(); + } finally { + + } + if (sourceIter instanceof CloseableIteration) { + ((CloseableIteration) sourceIter).close(); + } + } + } } diff --git a/core/repository/sail/src/main/java/org/eclipse/rdf4j/repository/sail/SailRepositoryConnection.java b/core/repository/sail/src/main/java/org/eclipse/rdf4j/repository/sail/SailRepositoryConnection.java index 019bd105a0e..b53930ff72a 100644 --- a/core/repository/sail/src/main/java/org/eclipse/rdf4j/repository/sail/SailRepositoryConnection.java +++ b/core/repository/sail/src/main/java/org/eclipse/rdf4j/repository/sail/SailRepositoryConnection.java @@ -349,11 +349,19 @@ public RepositoryResult getStatements(Resource subj, IRI pred, Value Resource... contexts) throws RepositoryException { Objects.requireNonNull(contexts, "contexts argument may not be null; either the value should be cast to Resource or an empty array should be supplied"); - + CloseableIteration statements = null; try { - return createRepositoryResult(sailConnection.getStatements(subj, pred, obj, includeInferred, contexts)); - } catch (SailException e) { - throw new RepositoryException("Unable to get statements from Sail", e); + statements = sailConnection.getStatements(subj, pred, obj, includeInferred, contexts); + return createRepositoryResult(statements); + } catch (Throwable t) { + if (statements != null) { + statements.close(); + } + if (t instanceof SailException) { + throw new RepositoryException("Unable to get statements from Sail", t); + } else { + throw t; + } } } diff --git a/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSail.java b/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSail.java index 108974263e0..a9b973efc5f 100644 --- a/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSail.java +++ b/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSail.java @@ -229,12 +229,14 @@ public void shutDown() throws SailException { if (con instanceof AbstractSailConnection) { AbstractSailConnection sailCon = (AbstractSailConnection) con; - sailCon.getOwner().interrupt(); - sailCon.getOwner().join(1000); - if (sailCon.getOwner().isAlive()) { - logger.error( - "Closing active connection due to shut down and interrupted the owning thread of the connection {} but thread is still alive after 1000 ms!", - sailCon.getOwner()); + if (sailCon.getOwner() != Thread.currentThread()) { + sailCon.getOwner().interrupt(); + sailCon.getOwner().join(1000); + if (sailCon.getOwner().isAlive()) { + logger.error( + "Closing active connection due to shut down and interrupted the owning thread of the connection {} but thread is still alive after 1000 ms!", + sailCon.getOwner()); + } } } diff --git a/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSailConnection.java b/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSailConnection.java index 3c3e2610e52..b312c438178 100644 --- a/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSailConnection.java +++ b/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSailConnection.java @@ -247,32 +247,7 @@ public boolean isActive() throws UnknownSailTransactionStateException { @Override public final void close() throws SailException { - Thread deadlockPreventionThread = null; - - if (Thread.currentThread() != owner) { - logger.info( - "Closing connection from a different thread than the one that opened it. Connections should not be shared between threads. Opened by " - + owner + " closed by " + Thread.currentThread(), - new Throwable("Throwable used for stacktrace")); - - deadlockPreventionThread = new Thread(() -> { - try { - Thread.sleep(sailBase.connectionTimeOut); - owner.interrupt(); - - owner.join(1000); - if (owner.isAlive()) { - logger.error("Interrupted thread {} but thread is still alive after 1000 ms!", owner); - } - - } catch (InterruptedException ignored) { - } - - }); - deadlockPreventionThread.setDaemon(true); - deadlockPreventionThread.start(); - - } + Thread deadlockPreventionThread = startDeadlockPreventionThread(); // obtain an exclusive lock so that any further operations on this // connection (including those from any concurrent threads) are blocked. @@ -333,6 +308,36 @@ public final void close() throws SailException { } + private Thread startDeadlockPreventionThread() { + Thread deadlockPreventionThread = null; + + if (Thread.currentThread() != owner) { + logger.info( + "Closing connection from a different thread than the one that opened it. Connections should not be shared between threads. Opened by " + + owner + " closed by " + Thread.currentThread(), + new Throwable("Throwable used for stacktrace")); + + deadlockPreventionThread = new Thread(() -> { + try { + Thread.sleep(sailBase.connectionTimeOut); + owner.interrupt(); + + owner.join(1000); + if (owner.isAlive()) { + logger.error("Interrupted thread {} but thread is still alive after 1000 ms!", owner); + } + + } catch (InterruptedException ignored) { + } + + }); + deadlockPreventionThread.setDaemon(true); + deadlockPreventionThread.start(); + + } + return deadlockPreventionThread; + } + @Override public final CloseableIteration evaluate(TupleExpr tupleExpr, Dataset dataset, BindingSet bindings, boolean includeInferred) throws SailException { @@ -998,50 +1003,51 @@ protected AbstractSail getSailBase() { } private void forceCloseActiveOperations() throws SailException { - if (owner != Thread.currentThread()) { - owner.interrupt(); - try { - owner.join(1000); - } catch (InterruptedException e) { - throw new SailException(e); - } - } - - for (int i = 0; i < 10 && isActiveOperation() && !debugEnabled; i++) { - System.gc(); - try { - Thread.sleep(1); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + Thread deadlockPreventionThread = startDeadlockPreventionThread(); + try { + for (int i = 0; i < 10 && isActiveOperation() && !debugEnabled; i++) { + System.gc(); + try { + Thread.sleep(1); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SailException(e); + } } - } - - if (debugEnabled) { - var activeIterationsCopy = new IdentityHashMap<>(activeIterationsDebug); - activeIterationsDebug.clear(); - - if (!activeIterationsCopy.isEmpty()) { - for (var entry : activeIterationsCopy.entrySet()) { - try { - logger.warn("Unclosed iteration", entry.getValue()); - entry.getKey().close(); - } catch (Exception e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); + if (debugEnabled) { + + var activeIterationsCopy = new IdentityHashMap<>(activeIterationsDebug); + activeIterationsDebug.clear(); + + if (!activeIterationsCopy.isEmpty()) { + for (var entry : activeIterationsCopy.entrySet()) { + try { + logger.warn("Unclosed iteration", entry.getValue()); + entry.getKey().close(); + } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + throw new SailException(e); + } + logger.warn("Exception occurred while closing unclosed iterations.", e); } - logger.warn("Exception occurred while closing unclosed iterations.", e); } - } - var entry = activeIterationsCopy.entrySet().stream().findAny().orElseThrow(); + var entry = activeIterationsCopy.entrySet().stream().findAny().orElseThrow(); - throw new SailException( - "Connection closed before all iterations were closed: " + entry.getKey().toString(), - entry.getValue()); - } + throw new SailException( + "Connection closed before all iterations were closed: " + entry.getKey().toString(), + entry.getValue()); + } + } + } finally { + if (deadlockPreventionThread != null) { + deadlockPreventionThread.interrupt(); + } } + } /** diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java index 5261ed3c668..7617941bc38 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java @@ -872,10 +872,19 @@ public CloseableIteration evaluate(Describ throw new FedXRuntimeException( "Expected a FedXDescribeOperator Node. Found " + operator.getClass() + " instead."); } - CloseableIteration iter = evaluate(operator.getArg(), bindings); - // Note: we need to evaluate the DESCRIBE over the entire federation - return new FederatedDescribeIteration(iter, this, operator.getBindingNames(), bindings, - ((FederatedDescribeOperator) operator).getQueryInfo()); + CloseableIteration iter = null; + try { + iter = evaluate(operator.getArg(), bindings); + // Note: we need to evaluate the DESCRIBE over the entire federation + return new FederatedDescribeIteration(iter, this, operator.getBindingNames(), bindings, + ((FederatedDescribeOperator) operator).getQueryInfo()); + } catch (Throwable t) { + if (iter != null) { + iter.close(); + } + throw t; + } + } protected CloseableIteration evaluateAtStatementSources(Object preparedQuery, diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/iterator/StopRemainingExecutionsOnCloseIteration.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/iterator/StopRemainingExecutionsOnCloseIteration.java index bb7a092f93d..bfaf9599ab5 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/iterator/StopRemainingExecutionsOnCloseIteration.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/iterator/StopRemainingExecutionsOnCloseIteration.java @@ -60,16 +60,11 @@ public void remove() throws QueryEvaluationException { @Override protected void handleClose() throws QueryEvaluationException { try { - super.handleClose(); + inner.close(); } finally { - try { - inner.close(); - } finally { - // make sure to close all scheduled / running parallel executions - // (e.g. if the query result is not fully consumed) - queryInfo.close(); - } - + // make sure to close all scheduled / running parallel executions + // (e.g. if the query result is not fully consumed) + queryInfo.close(); } } diff --git a/tools/federation/src/test/java/org/eclipse/rdf4j/federated/FedXBaseTest.java b/tools/federation/src/test/java/org/eclipse/rdf4j/federated/FedXBaseTest.java index 531647dcac4..27263a14e6d 100644 --- a/tools/federation/src/test/java/org/eclipse/rdf4j/federated/FedXBaseTest.java +++ b/tools/federation/src/test/java/org/eclipse/rdf4j/federated/FedXBaseTest.java @@ -109,7 +109,13 @@ protected void execute(String queryFile, String expectedResultFile, boolean chec // Some query results will automatically close themselves when they are exhausted. To properly test that // query results are closed correctly we need to evaluate the query without retrieving any elements. if (doubleCheckClose) { - ((GraphQuery) query).evaluate().close(); + try { + GraphQueryResult evaluate = ((GraphQuery) query).evaluate(); + evaluate.close(); + } catch (Throwable t) { + throw t; + } + } try (GraphQueryResult gqr = ((GraphQuery) query).evaluate()) {