Using Flow instead of iterators for query evaluation? #4320
Replies: 12 comments 25 replies
-
Interesting idea. Not fully across what this would look like, but sounds worth writing a draft proposal for. Of course as you rightly say the impact on products like GraphDB will likely be major. On the other hand if we can do this in a way where we can at least offer a good migration path, that should not be a massive concern. In the long run, getting rid of the wrapped iterator stack will have significant performance benefits. |
Beta Was this translation helpful? Give feedback.
-
Looks like the Flow is part of the Reactive Systems field. Does Flow solve some of the same problems as RxJava? I'm curious what will happen to the likes of RxJava once Loom is introduced. My understanding is that issues that lead developers to use RxJava can be solved in new and better ways once Loom becomes available. I can't say that I actually understand how or why, but it was the impression I got from the various talks I've heard about Loom. From the same talks I also get the impression that developers don't really like complexity that comes with RxJava, but those speakers would naturally be biased and I don't have any first hand knowledge myself. |
Beta Was this translation helpful? Give feedback.
-
Not much experience with this, but IIRC the main difference is that RxJava (and Flow) also deal with backpressure, while Loom doesn't (or not as gracefully) ? |
Beta Was this translation helpful? Give feedback.
-
TL;DR I think Loom is really really cool, but I think its benefits for a query execution engine are limited. To my understanding Loom will dramatically lower cost for multi-threading in terms of memory and thread-switching overhead. Thread pooling also supports this, but with a poorer developer experience. CompletableFuture in Java in part addresses this; RxJava and the likes improve on this by provide a dataflow paradigm. Some also provide tools for back-pressure and scheduling different parts of data-flows on different thread pools (e.g. to separate IO bound from CPU bound work). I think that Loom will have a tremendous impact on workloads that do a lot of (network) IO. E.g., web-servers and services in a micro-services environment that for a large part 'orchestrate' with external services / clients with the convenience of imperative programming. For query execution I don't know if Loom brings that much benefits. For CPU bound parts it doesn't make sense to have (much) more threads than cores. For IO bound parts it could be desirable to pipeline requests to storage in order to take advantage of reordering / batching; especially with spinning disks. Maybe this could be cheaper with Loom than with a thread-pool? I'm not so sure though how well Loom / Java supports async disk I/O. At least using async I/O (on linux) in Java is not available out of the (OpenJDK) box. |
Beta Was this translation helpful? Give feedback.
-
The dataflow paradigm of Flow, RxJava, reactivestreams, etc. does fit query execution quite well. Although I wonder whether it will provide a substantial benefit over plain old iterators. Async disk I/O could be one of these areas. In order to improve efficiency in RDF4J’s query execution vectorisation, batching, pipelining and re-ordering are definitely interesting. A challenge with Flow and related could be that the dataflow is ‘record’ oriented. You request n elements and then get up to n elements but one-at-a-time via onNext. For vectorisation etc. I think many systems are taking a ‘record batch’ approach. Perhaps that could be developed based on Flow or the like. @Jerven, could you outline in some more details how you would see Flow helping in vectorisation and multi-threading? (As for some context on why I am at all trying to mingle in this discussion: I’m working on a append-mostly, read-many RDF-star storage with dictionary encoding and a reverse index from values per 'position' to statements based on RDF4J). |
Beta Was this translation helpful? Give feedback.
-
I think generally reactive services and async code are horrible. I regret picking vert.x for one of my work services. However, the wins of Flow (with Loom or without it) in our situation is that it gives us thread safety if we go into query parallelization. Still the main issue is taking the result of a Flow and turning this into a lazy closeable iterator as we have now. So let's take a simple query as an example. SELECT ?x (count(?y) as ?ys) WHERE {
?x rdf:value ?y .
FILTER(?y > 1)
} GROUP BY ?x Here if we currently have
Each iterator may (but does not have to) run in their own thread. The publish/subscribe is buffered and each iteratator when scheduled can work on a few 100 items before the next iterator is scheduled. This gives DB vectored results. In the end we move from a pull architecture to a push one. Which is interesting for halyard as that seems to do the same. Notes on loom:
|
Beta Was this translation helpful? Give feedback.
-
Well there is an attempt at trying to be iterator compatible. This deadlocks though and is not correct in implementation :( |
Beta Was this translation helpful? Give feedback.
-
Anyone revisited this idea recently? I've embarked on a mission with my team to build an RDF4J layer on top of our existing HBase tables and Elasticsearch indices. We're able to greatly reduce query latency by employing a pipelined version of Essentially the nested loop becomes: /**
* Joins left bindings with right in a nested-loop like fashion. The left {@link Flux} is flat-mapped against the
* right query evaluation step. The sequential version of flat-map is used to preserve order. Subscribe of the
* nested flux is done on a different thread for the pipelined (eager) evaluation.
*/
public Flux<BindingSet> evaluateReactive(BindingSet bindings) {
return evaluate(leftPrepared, bindings)
.flatMapSequential(
leftBindings -> evaluate(rightPrepared, leftBindings)
.subscribeOn(SCHEDULER)
);
} Combined with a buffering subscription to adapt the 'Flux' to 'ClosableIteration' to work within the RDF4J architecture. We've seen latency of some queries drop from just shy of 3 minutes to 10-15 seconds. A small step was taken to support stacking such operators (e.g. a join on a pipelined join). And also to provide this from 'leaf' data accessors ( public interface ReactiveQueryEvaluationStep extends QueryEvaluationStep {
Flux<BindingSet> evaluateReactive(BindingSet bindings);
int getBlockingBufferSize();
@Override
default CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bindings) {
Flux<BindingSet> flux = evaluateReactive(bindings);
return new CloseableFluxIteration<>(flux, getBlockingBufferSize());
}
} Ideally though, such adaptation would only be necessary at the 'edge of the system'. E.g.: TupleQueryResult result = connection.prepareTupleQuery("select * where { ... }").evaluate();
Flux<BindingSet> flux = result.flux(); Such a public interface QueryResult<T> extends AutoCloseable, Iterable<T> {
Flux<T> flux();
@Override
default Iterator<T> iterator() {
return flux().toIterable().iterator();
}
default Stream<T> stream() {
return flux().toStream();
}
} I'll park the whole _Closeable_Iteration concern here 🥸 |
Beta Was this translation helpful? Give feedback.
-
This is also very similar to what Halyard is doing for query evaluation. |
Beta Was this translation helpful? Give feedback.
-
@hmottestad I don't know exactly what the state is of the elasticsearch-store, but the latency issue also plays a big role in that implementation. I can imagine |
Beta Was this translation helpful? Give feedback.
-
I've dabbled a bit in an implementation based on reactor.io: frensjan@4d9fad5 The changes show that it's possible to use reactive flows in RDF4J. And I think that it's possible to do so with a migration phase wherein both iterators and reactive streams can be mixed. It's awfully rough around the edges though, cuts corners, ignores the fact that iterators in RDF4J need to be closed and what not. Next to the question whether this is desirable from an architectural point of view, there is the matter of performance. I've used the
Some profiling shows that the reactive streams protocol is simply a lot more involved than the more straight-forward vulcano-style of iterators. I guess that in many use cases such extra costs are negligible in comparison to the actual work. But in the 'complex' query benchmark, the tight nested loop evaluation of the joins (implemented with the flat-map sequential operator) generates substantial overhead. I haven't found an easy way to improve this. From an architecture view this of course does open some avenues. E.g., concurrency for operators that access remote systems through multi-threaded pipelined joins. But also some areas which are CPU bound could be sped up by parallelisation. |
Beta Was this translation helpful? Give feedback.
-
Edit : it was to good to be true and the advantage was due to broken code. I did an experiment with loom but the results look to good to be true. I will investigate next week. This on the memory store so mostly a big advantage in compute, or a major advantage in broken code ;) |
Beta Was this translation helpful? Give feedback.
-
I have been looking at the
Flow
class as an alternative to the internal use of iterators during query evaluation. I think we could use this to replace the current iterator approach. This would not only make query evaluation multi-threaded but also gives us "vectored" volcano for free.The
QueryContext
will need to be extended to carry along the SubmissionPublisher and eachQueryEvaluationStep
needs to subscribe to the priors in the execution plan and publish as well.This would be a major change inside the query evaluation and unlikely to be backwards compatible. Halyard and GraphDB might be majorly affected.
Beta Was this translation helpful? Give feedback.
All reactions