Skip to content

Conversation

Aklakan
Copy link
Contributor

@Aklakan Aklakan commented May 12, 2025

GitHub issue resolved #2701

Pull request Description: ARQ plugin + Fuseki Plugin to track and abort ongoing SPARQL executions.
I tried to design the changes to jena's core in such a way that execution tracking can be enabled without requiring any changes to existing code.

Summary of changes.

Interception of SPARQL Requests and Execution Tracking

Changes in jena-arq:

  • Added SparqlDispatcherRegistry + infrastructure which allows to intercept SPARQL update and query statements (as objects or strings) against DatasetGraphs. Update-/QueryExecDataset now first delegates to the dispatcher chain.
  • The usual Update-/QueryEngineFactory handling is now the last handler in this dispatcher chain.
  • Execution tracking is the first element in the chain, initialized with the plugin InitExecTracking. If in the dispatcher chain there is a TaskListener is the context, then the Update-/QueryExec instances are wrapped with a tracking wrapping such that the listener is notified.
  • The default TaskListener implementation is TaskEventBroker which supports de-/registering of further listeners.
  • For keeping track of running tasks and a history of the last n executions, there is TaskEventHistory which is a subclass of TaskEventBroker.
  • Added a parseCheck dataset context attribute. If false, then update-/query requests are forwarded via the dispatcher without parsing.

RDFLink-based Execution Tracking

This adds infrastructure to jena-rdfconnection in order to track executions against RDFLinks via the newly introduced class DatasetGraphOverRDFLink.

  • Replaced the unfinished class GraphSPARQLService with DatasetGraphOverSparql. This is a base class in jena-arq that implements all methods by means of SPARQL requests. It is wired up with the tests in TS_SparqlCore. Caveat: As each update is a separate request and bulk updates may be split into multiple requests, blank nodes may not work as expected.
  • Added DatasetGraphOverRDFLink as a subclass of DatasetGraphOverSparql which provides a newRDFLink() method and implements all DatasetGraph methods based on the RDFLink.
  • Added dispatchers that intercept requests against DatasetGraphOverRDFLink and delegates them to the RDFLink.
  • Added a DatasetAssemblerHTTP which configures a DatasetGraphOverRDFLink instance, which allows use of this system with Fuseki.
  • Added ExampleDBpediaViaRemoteDataset which demonstrates a query against such a dataset making use of virtuoso specific features.
PREFIX fuseki:    <http://jena.apache.org/fuseki#>
PREFIX rdf:       <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX ja:        <http://jena.hpl.hp.com/2005/11/Assembler#>
PREFIX remote:    <http://jena.apache.org/2025/http#>

<#service> rdf:type fuseki:Service ;
    fuseki:name "dbpedia-remote" ;
    fuseki:endpoint [
      fuseki:operation fuseki:query ;
    ] ;
    fuseki:endpoint [
      fuseki:operation fuseki:tracker ;
      fuseki:name "tracker" ;
    ] ;
    fuseki:dataset  <#baseDS>
   .

<#baseDS> rdf:type remote:DatasetHTTP ;
    remote:queryEndpoint "http://dbpedia.org/sparql" ;
    .

Fuseki Mod: Execution Tracker

Added a simple web page that will show a live view of ongoing and completed queries.

  • Fuseki plugin will register a TaskEventHistory in the endpoint context and connect it to the dataset context's TaskEventBroker (broker will be created if absent).
  • Web page will disconnect from event stream if there is too much activity and prompt for manual reload - prevents browser tab from freezing.
  • Abort action can be disabled via context attribute:
<#service> rdf:type fuseki:Service ;
  fuseki:endpoint [
    fuseki:operation fuseki:tracker ;
    fuseki:name "tracker" ;
    ja:context [ ja:cxtName "allowAbort" ; ja:cxtValue false ] ; # Disable abort buttons in the Web page and the REST API
  ] ;
  fuseki:dataset <#baseDS> .
Jena-Query-Dashboard.webm

Misc changes

  • QueryExecHTTP: Improved support to cancel HTTP requests. So far cancel would hang until the InputStream of the HTTP response could be obtained. Now the HTTP request can be cancelled immediately.
  • HttpLib: Updated the methods to work on the async variant in order to make immediate abort on QueryExecHTTP work.
  • Consolidated Task system of jena-geosparql with that of the execution tracking.
  • Endpoint context is now never null so that Fuseki mods can always associate information with endpoints.

  • Tests are included.
  • Documentation change and updates are provided for the Apache Jena website
  • Commits have been squashed to remove intermediate development commit messages.
  • Key commit messages start with the issue number (GH-xxxx)

By submitting this pull request, I acknowledge that I am making a contribution to the Apache Software Foundation under the terms and conditions of the Contributor's Agreement.


See the Apache Jena "Contributing" guide.

@Aklakan Aklakan marked this pull request as draft May 12, 2025 14:12
@Aklakan Aklakan force-pushed the 2025-05-11-exectracker branch from 0f8a131 to a5ca301 Compare May 12, 2025 15:47
@rvesse
Copy link
Member

rvesse commented May 13, 2025

Really interesting piece of work, once did something much cruder (at least UI wise) in a previous $dayjob

Perhaps this could also serve as a base for discussion about further improvements and any necessary core changes for Jena 6. Ideally the ExecTracker mechanism would not require a wrapping with DatasetGraphWithExecTracker and instead this would be handled in the core machinery already. In cases where a specific DatasetGraph implementation is expected, the need for a wrapper to track executions may make things complex.

Yes I think this would be much cleaner if the tracking mechanism was integrated directly into the execution machinery without requiring extra wrapping as you do in this PR.

It would be nice if there were programmatic APIs for interacting with tracked queries/updates (there's some pieces towards that here but appears mostly focused on exposing stuff to the UI from my skim-reading of the code) so that applications that embed Jena could access and manage tracked queries/updates as desired.

Fuseki already has the concept of Tasks that's used for things like backups and compactions, would it make sense to integrate query/update tracking into that rather than creating a separate tracking mechanism. That might need generalising that mechanism, or pulling it more into Jena's core rather than Fuseki machinery, so might not be worth the effort, wdyt?

@Aklakan
Copy link
Contributor Author

Aklakan commented May 13, 2025

It would be nice if there were programmatic APIs for interacting with tracked queries/updates (there's some pieces towards that here but appears mostly focused on exposing stuff to the UI from my skim-reading of the code) so that applications that embed Jena could access and manage tracked queries/updates as desired.

The ARQ Plugin adds ExecTracker which so far would be the programmatic API. (Still a bit crude but intended as a starting point towards this goal.)
The DatasetGraphWithExecTracker wrapping is needed such that the query/update engines process queries by adding the tracking.
So by integrating the tracking closer to the core, the DatasetGraphWithExecTracker would only be needed for adding tracking to alternative query engines, but the main query/update engines could interact with the tracker directly.

An important question is, whether tracking executions within the DatasetGraph's context is the way to move forward.

Fuseki already has the concept of Tasks [...] pulling it more into Jena's core [...]

Yes, I yet need to look into how much effort it would be to disentangle Fuseki' task tracker from the Fuseki - but adding such a mechanism to core (and updating Fuseki for it) would be most likely the way to go.

@Aklakan Aklakan force-pushed the 2025-05-11-exectracker branch 3 times, most recently from 28ccc1b to 603c8e7 Compare May 14, 2025 19:37
@afs afs changed the title GH2701: Fuseki Mod to list and abort running executions. GH-2701: Fuseki Mod to list and abort running executions. Jun 23, 2025
@Aklakan Aklakan force-pushed the 2025-05-11-exectracker branch 4 times, most recently from 0d4c658 to 61039ae Compare July 19, 2025 19:05
Comment on lines 12 to 42
public class ChainingQueryDispatcherExecTracker
implements ChainingQueryDispatcher
{
@Override
public QueryExec create(Query query, DatasetGraph dsg, Binding initialBinding, Context context,
QueryDispatcher chain) {
QueryExec delegate = chain.create(query, dsg, initialBinding, context);
QueryExec result = TaskTrackerRegistry.track(context, delegate);
TaskTrackerRegistry.remove(context);
return result;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relevant code for intercepting query execution construction over a dataset graph using the newly proposed dispatcher system. Here, execution tracking is added.

@Aklakan
Copy link
Contributor Author

Aklakan commented Jul 19, 2025

The original code integrated exec tracking into the Update-/QueryEngineFactory system. However, this was sub-par because execution could only be intercepted on the QueryIterator level.

My updated proposal is to introduce a new layer Update-/QueryDispatcher on top of the Update-/QueryEngineFactory machinery. Now it is possible to intercept any query / update request to a dataset - even without having to parse the query.

Old design: QueryExecBuilderDataset -build-> QueryExecDataset -exec-> QueryEngineRegistry

New design: QueryExecBuilderDataset -build-> QueryDispatcherRegistry -customized build-> QueryExec

The last element of the dispatcher chain forwards the request to the usual Update-/QueryEngineFactory system.
The returned QueryExec may be of any type, such as a tracking wrapper - it is no longer necessarily a QueryExecDataset.

Consequences:

  • Code that relies on QueryExecBuilderDataset to always return a QueryExecDataset may break when adding custom interceptors. This should be a low price to pay because conventional code relies on the QueryExec interface.
  • Execution Tracking can be added without any change to the existing code.

Related Ongoing Work

As a demo for related work based on this infrastructure, we are using it to integrate third party triple stores - such as Qlever - into Fuseki. This way we can use one server framework to manage several triple stores.
Technically, we created a DatasetGraph wrapper implementation that is backed by a factory of HTTP RDFConnection instances (such as to a remote Qlever database). The query dispatcher just forwards query/update requests to an RDFConnection obtained from such a DatasetGraph implementation.
Any opinions on this design?

As a final note, we also already created an assembler that starts qlever from a docker image as part of Fuseki (via the Java TestContainer framework), so the configuration looks like this:

<#baseDS> rdf:type qlever:Dataset ;
  qlever:location "/run/databases/qlever/mydb/" ;
  qlever:indexName "myindex" ;
  qlever:accessToken "abcde" ;
  qlever:memoryMaxSize "4G" ;
  qlever:defaultQueryTimeout "600s" ;

Example of Qlever in Fuseki

@Aklakan Aklakan force-pushed the 2025-05-11-exectracker branch 8 times, most recently from 1833ad7 to 7f59fa7 Compare July 22, 2025 17:33
@Aklakan
Copy link
Contributor Author

Aklakan commented Jul 23, 2025

I added an assembler to build datasets backed by remote (HTTP) SPARQL endpoints based on a new DatasetGraphOverRDFLink class in combination with the new dispatcher system, and this works out of the box with the execution tracking:

The PR should be review ready in the next few days.

image

@Aklakan Aklakan force-pushed the 2025-05-11-exectracker branch 6 times, most recently from 91e9758 to c107201 Compare July 29, 2025 16:43
@Aklakan Aklakan marked this pull request as ready for review July 29, 2025 16:48
@Aklakan Aklakan force-pushed the 2025-05-11-exectracker branch from f91303c to df88500 Compare August 28, 2025 11:30
@afs
Copy link
Member

afs commented Sep 1, 2025

The Fuseki module contains an HTML interface that includes a control to stop an update or query execution.

  1. It is per dataset so there isn't a single page of all request in progress which is useful to see to find out about the current state of the server.
  2. It is uncontrolled - any one can stop a request.
  3. Administration function go under /$/ so that the pattern can have security applied.

I'm not sure what the best thing to do it but one possibility is:

  1. The endpoint is /$/exectracker and it returns the state of all requests.
  2. operation:tracker still exists to cause registration to happen for the dataset.

@Aklakan
Copy link
Contributor Author

Aklakan commented Sep 1, 2025

  1. It is uncontrolled - any one can stop a request.

Its possible to combine this with Fuseki's security and there is a context flag to disable the stop action (both in HTML and the API):

<#service> rdf:type fuseki:Service ;
  fuseki:name "coypu" ;
  # ...
  fuseki:endpoint [
    fuseki:operation fuseki:tracker ;
    fuseki:name "tracker" ;
    ja:context [ ja:cxtName "allowAbort" ; ja:cxtValue false ] ; # Disable stop action
  ] ;
  fuseki:endpoint [
    fuseki:operation fuseki:tracker ;
    fuseki:name "admin-tracker" ;
    fuseki:allowedUsers "admin" ;

Our deployment:

Pending improvements:

  • I think that enabling the stop action should be opt-in rather than opt-out.
  • The HTML table should not exceed 100% width.
  1. Administration function go under /$/ so that the pattern can have security applied.

Hm, if there was a context on the server level (not sure right now if there already is one), then the exec tracker fmod could wire up the listeners such that all events from the datasets are delegated to the server-wide listener.

@Aklakan Aklakan force-pushed the 2025-05-11-exectracker branch 2 times, most recently from a08d2bd to 81058d6 Compare September 10, 2025 12:14
@Aklakan Aklakan force-pushed the 2025-05-11-exectracker branch from a7868ba to b1013cb Compare September 14, 2025 08:55
@namedgraph
Copy link
Contributor

Unrelated but DatasetGraphOverSparql, DatasetGraphOverRDFLink etc. look like the wrong design to me. They wrap the transport implementations inside what should be RDF spec implementations (Model and Dataset). That is a dead-end IMO because the class hierarchy becomes open-ended - every new transport (let's imagine, I don't know, some binary-based protocol) will now require a new Dataset subclass. Using inheritance this way feels wrong to me.

IMO the transport details should be handled by clients external to Dataset, e.g. the same StreamRDFToUpdateRequest.sendGraphTriplesToStream(graph, graphName, sink); that is used in this PR (although I'm not crazy about that class either 😅 ).
The number of Model/Dataset implementations should stay finite - implement the specs and then subclass for some transactional/TDB mechanisms but that's it.

@Aklakan
Copy link
Contributor Author

Aklakan commented Sep 15, 2025

The goal of DatasetGraphOverSparql is to act as a bridge between ARQ and external SPARQL-capabale stores.
The design of the sparql dispatcher system proposed by this PR allows both QueryExec.dataset(dsgOverSparql) and RDFLink.connect(dsgOverSparql) to efficiently proxy queries and updates to the backend.
However, for graph-store-protocol (GSP) operations, ARQ relies on the DatasetGraph API, which is indeed far from ideal for use with a SPARQL backend.
It is consistent however, because the class name (on ARQ level) is DatasetGraphOverSparql and not DatasetGraphOverSparqlAndGSP - so the protocol is fixed to SPARQL.
Sidenote: Following my line of reasoning, DatasetGraphOverSparqlViaRDFLink would be more accurate than DatasetGraphOverRDFLink.

For SPARQL, the current design already allows configuration of protocol matters on the RDFLink level.
The snippet below is a variation of this PR's ExampleDBpediaViaRemoteDataset.java:

    Creator<RDFLink> linkCreator = () -> RDFLinkHTTP.newBuilder()
            .destination("http://dbpedia.org/sparql")
            // Request using thrift instead of the default application/sparql-results+json.
            .acceptHeaderSelectQuery(WebContent.contentTypeResultsThrift)
            .build()

    DatasetGraph dsg = new DatasetGraphOverRDFLink(linkCreator);
    QueryExec.dataset(dsg).query("...")...; // Queries will be dispatched to the link and
                               // execution won't use the DatasetGraph API.

The fundamental issue is that DatasetGraph is central to most parts of Jena (up to Fuseki).

At some point in the future - in the appropriate places - it might be worth superseding DatasetGraph with a more general RDFLinkSource (a factory of RDFLinks - similar to JDBC's DataSource).
This way, Fuseki could forward GSP requests to vendor-specific driver implementations - but I feel that these changes are outside the scope of this PR.
The idea of a JDBC-like DataSource was briefly mentioned in #1390 (comment)

Transforming update requests via StreamRDFToUpdateRequest.sendGraphTriplesToStream

I think in principle the abstraction with a configurable update sink is ok, but I agree that a custom graph-to-update-requests mapper should not require subclassing!
Also, the default strategy should put all inserts into a single request instead of performing some magic splitting that will break blank nodes.

[Edit] A thought: If both RDFLink and DatasetGraph had an addAll(Iterator<Quad>) method then at least this kind of insert could be delegated relatively efficiently to a graph implementation compared to dsg::add(individualQuad). Wrapping an iterator as a DatasetGraph and passing it to DatasetGraph.addAll(DatasetGraph) would already be possible but it feels hacky. So far I still feel that improving the situation for GSP is better handled as a separate issue.

@namedgraph
Copy link
Contributor

@Aklakan I understand the need for a proxy but I'm not convinced it should live in a Dataset implementation. It might feel that way because many of Jena's core classes unfortunately conflate the domain model implementation with the transport details - which are orthogonal concerns IMO. I know I've said this before, but this much more cleanly separated in JAX-RS.

Conceptually the proxy is not of a dataset - it is of an endpoint. A dataset should not care where it came from - it's just a bag of quads.

To illustrate, this is a SPARQL endpoint implementation in JAX-RS. The separation of concerns is much cleaner IMO.
https://github.com/AtomGraph/Core/blob/master/src/main/java/com/atomgraph/core/model/impl/SPARQLEndpointImpl.java
It can use an different EndpointAccessor implementations - one based on a Dataset and one for remote endpoints.

@Aklakan
Copy link
Contributor Author

Aklakan commented Sep 16, 2025

DatasetGraph can play a dual role:

  • Domain API
  • Endpoint (something that can be connected to)

In jena-rdfconnection, with RDFLink.connect(dsg) there exists already an API that could be extended to handle accessor indirection: connect() could use a registry of Function<DatesetGraph, RDFLink> linkProviders to establish an appropriate link to the dataset graph.

The same goes for QueryExec.dataset() and UpdateExec.dataset() - so these methods could return a Query/UpdateExecBuilder based on a registry of providers. But instead of having 2 additional ARQ-level provider registries, I think they should rely on the RDFLink-level one.

I am not too fond of the proposed dispatcher/interception system because it only intercepts query/update execution but not on the connection process itself. So I am indeed thinking about moving the interception to a higher level.

The main aspects I am pondering over are:

  • RDFLink is in jena-rdfconnection - links so far don't exist on ARQ level.
  • For the sake of tracking update / query executions, a simple "Query/ExecPostProcessor" registry with conceptually a list of Function<QueryExec, QueryExec>would be sufficient. {Query, Update}ExecBuilder{Dataset, HTTP} could natively support a post processor registry - no interceptors needed.
  • I realized that for transaction-capable dataset, Fuseki processes updates in a streaming way via an UpdateSink. This is pretty cool, but it currently goes directly to the DatasetGraph API and thus bypasses UpdateExecBuilder, does not even create an UpdateExec, and thus cannot yet be exec-tracked. I think this streaming API needs to be added to the UpdateExecBuilder.

It can use an different EndpointAccessor implementations - one based on a Dataset and one for remote endpoints.

This is indeed conceptually similar to RDFLink.

@namedgraph
Copy link
Contributor

Agree to disagree :)

connect() to me smells of JDBC or some protocol like that - Linked Data and SPARQL are based on REST that does not require any "connection".

This is why I'm only using Jena's in-memory Model/Dataset and readers/parsers and not the HTTP layer - I've replaced it all with JAX-RS/Jersey.

@afs
Copy link
Member

afs commented Sep 19, 2025

DatasetGraph is local, fine grained for code usage and Jena-specific. Something somewhere has to implement storage.

The world could be built on this but it will be unsatisfactory because it is too fine-grained for web use.

RDFLink is remote-first, coarse grained and uses a standardized protocol.
It is connection oriented. One RDFLink is a connection.

RDFLink can present the same API to local and remote; if local it goes to DatasetGraph.

Sometimes, your have code that works on datasets but you want to apply it to something that only supports SPARQL protocols. You need an adapter to invert the stack. Adapters that map low-level APIs to higher-level APIs and are rarely great for performance or using the details of the high-level API but if the difference is access/no-access, an adapater gets the job done.

Comment on lines 670 to 673
// public void cancel() {
// closed = true;
// abort();
// }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove.

@afs
Copy link
Member

afs commented Sep 19, 2025

@Aklakan --

In order to make progress, can this be split into multiple PRs? The first would be broken down into changes in ARQ for async.

This would give us some focus to complete subtask, and also help review because this single PR is causing the github review UI to struggle!

A question I'm trying to answer for myself is how much of this is fundamental machinery and how much is driven by your usage of Jena.

@Aklakan Aklakan mentioned this pull request Sep 23, 2025
2 tasks
@Aklakan
Copy link
Contributor Author

Aklakan commented Sep 23, 2025

Yes it makes sense to split the (mostly) finished parts out from this pr for isolated review. I factored the async http part out in #3464 . My suggestion is to use this pr as the "full stack" branch from which smaller PRs can be derived if the overall system works. I plan on making an updated proposal on the query/update execution tracking part around next week.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

API call to get a snapshot of running queries
4 participants