Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-4819 Merge Join #4822

Merged
merged 7 commits into from
Dec 20, 2023
Merged

GH-4819 Merge Join #4822

merged 7 commits into from
Dec 20, 2023

Conversation

hmottestad
Copy link
Contributor

GitHub issue resolved: #4819

Briefly describe the changes proposed in this PR:


PR Author Checklist (see the contributor guidelines for more details):

  • my pull request is self-contained
  • I've added tests for the changes I made
  • I've applied code formatting (you can use mvn process-resources to format from the command line)
  • I've squashed my commits where necessary
  • every commit message starts with the issue number (GH-xxxx) followed by a meaningful description of the change

Comment on lines +398 to +399
@Override
public final CloseableIteration<? extends Statement> getStatements(StatementOrder order, Resource subj, IRI pred,
Value obj, boolean includeInferred, Resource... contexts) throws SailException {
Copy link
Contributor Author

@hmottestad hmottestad Oct 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is at the SailConnection level. It might be better to have the order argument be after includeInferred and before Resource... contexts. Though I do prefer to keep things consistent between the Sail level and the Dataset level, which doesn't have an inferred argument. Keeping the order as the first argument also makes sense since the specified order could be for context.

@hmottestad
Copy link
Contributor Author

@abrokenjester @kenwenzel I'm working on support merge join. The first step is to add support for the sail to return ordered statements. I would really appreciate some feedback on how to make my changes as consistent as possible with the existing interfaces. Any suggestions on naming is also very welcome.

@hmottestad
Copy link
Contributor Author

Multi variable joins might be problematic:

 ?a a ?b.
 ?a ex:type ?b.

@kenwenzel
Copy link
Contributor

@hmottestad I can offer to add support for ordered indexes to LmdbStore. Do you already have any performance figures?

Comment on lines 896 to 918

@Override
public CloseableIteration<? extends Statement> getStatements(StatementOrder statementOrder, Resource subj,
IRI pred, Value obj, Resource... contexts) throws SailException {
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public Set<StatementOrder> getSupportedOrders(Resource subj, IRI pred, Value obj, Resource... contexts) {
return Set.of();
}

@Override
public Comparator<Value> getComparator() {
throw new UnsupportedOperationException("Not implemented yet");
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kenwenzel here are the three methods you need to implement to get merge join working.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hmottestad I've added an initial implementation in:
https://github.com/kenwenzel/rdf4j/tree/GH-4819-merge-join

Unfortunately, QueryBenchmark.complexQuery() directly fails with an Exception. Maybe it helps to locate the problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed it, was an optimisation I had made where I hadn't quite accounted for all the edge cases.

@hmottestad
Copy link
Contributor Author

hmottestad commented Oct 24, 2023

@hmottestad I can offer to add support for ordered indexes to LmdbStore. Do you already have any performance figures?

This would be really awesome. I've tagged you above in a comment to show what needs to be implemented. That should hopefully be all that is needed. One thing I'm not certain about is transaction isolation, so it might be best to test it out with IsolationLevels.NONE. There is also some work left to be done on the DualUnionIteration, let me know if you run into the UnsupportedOperationException in that class.

I haven't made any benchmarks, mostly because I've only implemented this in the ExtensibleStore with a backing data structure that sorts the data for each request. That is just so I can test things out locally. The end goal is for https://github.com/the-qa-company/qEndpoint to use it for analytical queries. They have a really innovative product that uses HDT for storing the data on disk and can work with massive datasets using very little memory.

The queries that we want to support are ones that would anyway need to read most of the data from a range query on the index.

For instance:

select (count(?a) as ?count) where {
    ?a a foaf:Person; # using a OPSC index
       foaf:age ?age. # using a PSOC index
}

If you want to see if the query planner chooses merge join you can use the query explanation:

Explanation explain = connection
    .prepareTupleQuery(query)
    .explain(Explanation.Level.Executed);
System.out.println(explain);

And you should see Join (InnerMergeJoinIterator) in the query explanation.

Here is an example for a completely different query:

Projection (resultSizeActual=5)
╠══ ProjectionElemList
║     ProjectionElem "s"
║     ProjectionElem "o"
║     ProjectionElem "o2"
║     ProjectionElem "s2"
║     ProjectionElem "o3"
╚══ Join (JoinIterator) (resultSizeActual=5)
   ├── Join (InnerMergeJoinIterator) (resultSizeActual=5) [left]
   │  ╠══ Join (InnerMergeJoinIterator) (resultSizeActual=5) [left]
   │  ║  ├── StatementPattern [statementOrder: S]  (costEstimate=0, resultSizeEstimate=0, resultSizeActual=5) [left]
   │  ║  │     s: Var (name=s)
   │  ║  │     p: Var (name=_const_9285ccfc_uri, value=http://www.w3.org/2000/01/rdf-schema#label, anonymous)
   │  ║  │     o: Var (name=o)
   │  ║  └── StatementPattern [statementOrder: S]  (costEstimate=0, resultSizeEstimate=0, resultSizeActual=14) [right]
   │  ║        s: Var (name=s)
   │  ║        p: Var (name=_const_44b0da67_uri, value=http://www.w3.org/2000/01/rdf-schema#comment, anonymous)
   │  ║        o: Var (name=o2)
   │  ╚══ StatementPattern [statementOrder: S]  (costEstimate=0, resultSizeEstimate=0, resultSizeActual=14) [right]
   │        s: Var (name=s)
   │        p: Var (name=_const_531c5f7d_uri, value=http://xmlns.com/foaf/0.1/knows, anonymous)
   │        o: Var (name=s2)
   └── StatementPattern (costEstimate=0, resultSizeEstimate=0, resultSizeActual=5) [right]
         s: Var (name=s2)
         p: Var (name=_const_44b0da67_uri, value=http://www.w3.org/2000/01/rdf-schema#comment, anonymous)
         o: Var (name=o3)

@hmottestad
Copy link
Contributor Author

This all gives me an idea for making the query explanation smarter by including what index is being used and a recommendation for the most optimal index so the user can configure the best indexes for their particular queries.

@hmottestad
Copy link
Contributor Author

@kenwenzel I think the performance is a bit bad because it doesn't pick the optimal index.

?a foaf:knows ?b sorted by S should use the PSOC index, since P is fixed in the range query all the data will be sorted on S.

I changed up the code that picks the index just to see what would happen (TripleStore.java):

  if (statementOrder != null) {
            char component = statementOrder.name().toLowerCase().charAt(0);
            for (TripleIndex candidate : indexes) {
                if (pred != -1 && statementOrder == StatementOrder.S) {
                    if (candidate.getFieldSeq()[0] == 'p' && candidate.getFieldSeq()[1] == 's') {
                        index = candidate;
                        break;
                    }
                } else if (pred != -1 && statementOrder == StatementOrder.O) {
                    if (candidate.getFieldSeq()[0] == 'p' && candidate.getFieldSeq()[1] == 'o') {
                        index = candidate;
                        break;
                    }
                } else if (candidate.fieldSeq[0] == component) {
                    index = candidate;
                    break;
                }
            }
            if (index == null) {
                throw new IOException("No index for statement order '" + statementOrder.name() + "' available.");
            }
        } else {

Combined with changing the indexes used in the QueryBenchmark:

		LmdbStoreConfig config = new LmdbStoreConfig("spoc,ospc,psoc,posc");
		repository = new SailRepository(new LmdbStore(file, config));

And the performance is on par with the current performance. Not really any faster or slower.

Looking at the query explanation I can see that there isn't much that is joined using merge join:

Projection (resultSizeActual=1.4K)
╠══ ProjectionElemList
║     ProjectionElem "type1"
║     ProjectionElem "type2"
║     ProjectionElem "language"
║     ProjectionElem "mbox"
╚══ Join (JoinIterator) (resultSizeActual=1.4K)
   ├── Join (InnerMergeJoinIterator) (resultSizeActual=1.4K) [left]
   │  ╠══ StatementPattern [statementOrder: S]  (costEstimate=24, resultSizeEstimate=47, resultSizeActual=47) [left]
   │  ║     s: Var (name=_anon_80ec52ca59b749b1829959ee8abc6b207, anonymous)
   │  ║     p: Var (name=_const_23b75369_uri, value=http://xmlns.com/foaf/0.1/mbox, anonymous)
   │  ║     o: Var (name=mbox)
   │  ╚══ StatementPattern [statementOrder: O]  (costEstimate=5, resultSizeEstimate=739, resultSizeActual=10.6K) [right]
   │        s: Var (name=a)
   │        p: Var (name=_const_c4c0156c_uri, value=http://purl.org/dc/terms/publisher, anonymous)
   │        o: Var (name=_anon_80ec52ca59b749b1829959ee8abc6b207, anonymous)
   └── Join (JoinIterator) (resultSizeActual=1.4K) [right]
      ╠══ StatementPattern (costEstimate=10, resultSizeEstimate=368, resultSizeActual=1.4K) [left]
      ║     s: Var (name=b)
      ║     p: Var (name=_const_7420303d_uri, value=http://www.w3.org/ns/dcat#dataset, anonymous)
      ║     o: Var (name=a)
      ╚══ Join (JoinIterator) (resultSizeActual=1.4K) [right]
         ├── StatementPattern (costEstimate=19, resultSizeEstimate=368, resultSizeActual=1.4K) [left]
         │     s: Var (name=a)
         │     p: Var (name=_const_5ca739_uri, value=http://purl.org/dc/terms/identifier, anonymous)
         │     o: Var (name=identifier)
         └── Join (JoinIterator) (resultSizeActual=1.4K) [right]
            ╠══ StatementPattern (costEstimate=19, resultSizeEstimate=372, resultSizeActual=1.4K) [left]
            ║     s: Var (name=a)
            ║     p: Var (name=_const_2783f1e8_uri, value=http://purl.org/dc/terms/language, anonymous)
            ║     o: Var (name=language)
            ╚══ Join (JoinIterator) (resultSizeActual=1.4K) [right]
               ├── StatementPattern (costEstimate=80, resultSizeEstimate=6.4K, resultSizeActual=1.4K) [left]
               │     s: Var (name=b)
               │     p: Var (name=_const_f5e5585a_uri, value=http://www.w3.org/1999/02/22-rdf-syntax-ns#type, anonymous)
               │     o: Var (name=type1)
               └── StatementPattern (costEstimate=80, resultSizeEstimate=6.4K, resultSizeActual=1.4K) [right]
                     s: Var (name=a)
                     p: Var (name=_const_f5e5585a_uri, value=http://www.w3.org/1999/02/22-rdf-syntax-ns#type, anonymous)
                     o: Var (name=type2)

I'll try out some other queries to see how they look.

@hmottestad
Copy link
Contributor Author

@kenwenzel try this query:

PREFIX ex: <http://example.com/ns#>
PREFIX owl: <http://www.w3.org/2002/07/owl#>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX sh: <http://www.w3.org/ns/shacl#>
PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
PREFIX dcat: <http://www.w3.org/ns/dcat#>
PREFIX dc: <http://purl.org/dc/terms/>
PREFIX skos:  <http://www.w3.org/2004/02/skos/core#>
PREFIX foaf:  <http://xmlns.com/foaf/0.1/>
PREFIX dct: <http://purl.org/dc/terms/>

SELECT  (count(?a) as ?count) where {
        ?a  dct:identifier ?identifier ;
            dct:description ?description ;
            dcat:contactPoint ?contactPoint .
}

On my machine this is 3x faster with merge join.

PS: You can disable merge join by changing the set of orders that are returned:

		@Override
		public Set<StatementOrder> getSupportedOrders(Resource subj, IRI pred, Value obj, Resource... contexts) {
// 			return supportedOrders;
			return Set.of();
		}

@kenwenzel
Copy link
Contributor

@hmottestad Probably the merge join will show its strength when the IO performance degrades due to memory mapping and/or slow storage systems. This should be the case if the databases grow larger than RAM.

@hmottestad
Copy link
Contributor Author

@kenwenzel collaborating on merge join would be a lot simpler if you were a committer. Would you be interested in me nominating you?

@kenwenzel
Copy link
Contributor

@kenwenzel collaborating on merge join would be a lot simpler if you were a committer. Would you be interested in me nominating you?

I can't make guarantees regarding contributions but I don't want to refuse the offer a second time (Jeen already asked a while ago). So yes and thank you for asking.

@JervenBolleman
Copy link
Contributor

@hmottestad I was asked by the team (@D063520) behind the qEndpoint to help with implementing merge joins :) and I see you are already working on it.

@hmottestad
Copy link
Contributor Author

@hmottestad I was asked by the team (@D063520) behind the qEndpoint to help with implementing merge joins :) and I see you are already working on it.

They are sponsoring this feature :)

@D063520
Copy link

D063520 commented Nov 1, 2023

@JervenBolleman: we reached out different people and I understood you are too busy, @hmottestad had some free time and he was super reactive

@JervenBolleman
Copy link
Contributor

@JervenBolleman: we reached out different people and I understood you are too busy, @hmottestad had some free time and he was super reactive

I think it is wonderful you/the qa company are sponsoring this feature and @hmottestad is much better at this kind of work then I am :) win win all round.

@hmottestad
Copy link
Contributor Author

hmottestad commented Dec 2, 2023

TODO:

  • Make tests for edge cases when using the new unmark feature
  • Add method to iterators to return which variable the iterator is sorted by, if any
  • Add support for ordered statements to the DualUnionIteration

No simple way to do Add method to iterators to return which variable the iterator is sorted by, if any because of the way the maven project is structured.

import org.eclipse.rdf4j.rio.RDFHandler;
import org.eclipse.rdf4j.rio.RDFHandlerException;

public abstract class AbstractQueryPreparer implements QueryPreparer {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class was deleted by accident in 5.0.0-M2.

@hmottestad hmottestad marked this pull request as ready for review December 20, 2023 15:03
@hmottestad
Copy link
Contributor Author

hmottestad commented Dec 20, 2023

TODO

  • Check performance of MemoryStore
  • Check performance of ShaclSail
  • Squash/rename commits

@hmottestad
Copy link
Contributor Author

hmottestad commented Dec 20, 2023

Develop branch

Benchmark                                                     Mode  Cnt    Score    Error  Units
QueryBenchmark.complexQuery                                   avgt    5    1.024 ±  0.030  ms/op
QueryBenchmark.different_datasets_with_similar_distributions  avgt    5    0.462 ±  0.004  ms/op
QueryBenchmark.groupByQuery                                   avgt    5    0.606 ±  0.005  ms/op
QueryBenchmark.long_chain                                     avgt    5  172.359 ±  9.939  ms/op
QueryBenchmark.lots_of_optional                               avgt    5   39.920 ±  2.006  ms/op
QueryBenchmark.minus                                          avgt    5  870.831 ± 46.413  ms/op
QueryBenchmark.nested_optionals                               avgt    5   51.299 ±  2.751  ms/op
QueryBenchmark.pathExpressionQuery1                           avgt    5    5.000 ±  0.082  ms/op
QueryBenchmark.pathExpressionQuery2                           avgt    5    0.527 ±  0.007  ms/op
QueryBenchmark.query_distinct_predicates                      avgt    5   51.837 ±  1.054  ms/op
QueryBenchmark.simple_filter_not                              avgt    5    1.655 ±  0.038  ms/op
Benchmark                                    Mode  Cnt   Score   Error  Units
ParallelQueryBenchmark.mixedQueriesAndReads  avgt    5  15.747 ± 0.534  ms/op
Benchmark                                          Mode  Cnt     Score     Error  Units
ComplexLargeBenchmark.noPreloadingNonEmptyRemoved  avgt    5  3603.342 ± 180.153  ms/op

This branch

Benchmark                                                     Mode  Cnt    Score    Error  Units
QueryBenchmark.complexQuery                                   avgt    5    1.039 ±  0.030  ms/op
QueryBenchmark.different_datasets_with_similar_distributions  avgt    5    0.452 ±  0.003  ms/op
QueryBenchmark.groupByQuery                                   avgt    5    0.575 ±  0.026  ms/op
QueryBenchmark.long_chain                                     avgt    5  155.290 ±  9.074  ms/op
QueryBenchmark.lots_of_optional                               avgt    5   41.770 ±  1.891  ms/op
QueryBenchmark.minus                                          avgt    5  870.915 ± 46.426  ms/op
QueryBenchmark.nested_optionals                               avgt    5   51.305 ±  2.648  ms/op
QueryBenchmark.pathExpressionQuery1                           avgt    5    5.495 ±  1.240  ms/op
QueryBenchmark.pathExpressionQuery2                           avgt    5    0.493 ±  0.006  ms/op
QueryBenchmark.query_distinct_predicates                      avgt    5   51.940 ±  4.232  ms/op
QueryBenchmark.simple_filter_not                              avgt    5    1.909 ±  0.665  ms/op
Benchmark                                    Mode  Cnt   Score   Error  Units
ParallelQueryBenchmark.mixedQueriesAndReads  avgt    5  15.657 ± 0.555  ms/op
Benchmark                                          Mode  Cnt     Score     Error  Units
ComplexLargeBenchmark.noPreloadingNonEmptyRemoved  avgt    5  3708.608 ± 295.725  ms/op

@hmottestad
Copy link
Contributor Author

Doesn't seem to be any performance degradation.

…he write transaction pointer because it was allocated by the java.nio.ByteBuffer and not using unsafe or equivalent.
@hmottestad hmottestad merged commit a6ec965 into develop Dec 20, 2023
9 checks passed
@hmottestad hmottestad deleted the GH-4819-merge-join branch December 20, 2023 19:25
@hmottestad hmottestad linked an issue Dec 20, 2023 that may be closed by this pull request
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support for merge join and ordered indexes
4 participants