Skip to content

Commit

Permalink
eclipse-rdf4jGH-4819 [LmdbStore] Add initial support for merge joins.
Browse files Browse the repository at this point in the history
  • Loading branch information
kenwenzel committed Oct 25, 2023
1 parent dd731f6 commit 16eb699
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ abstract static class StatefulOperation implements Operation {
*/
private final AtomicBoolean storeTxnStarted = new AtomicBoolean(false);

private final Set<StatementOrder> supportedOrders;

/**
* Creates a new {@link LmdbSailStore}.
*/
Expand All @@ -203,6 +205,7 @@ protected Long read(ByteBuffer buffer) {
namespaceStore = new NamespaceStore(dataDir);
valueStore = new ValueStore(new File(dataDir, "values"), config);
tripleStore = new TripleStore(new File(dataDir, "triples"), config);
supportedOrders = tripleStore.getStatementOrders();
contextStore = new ContextStore(this, dataDir);
initialized = true;
} finally {
Expand Down Expand Up @@ -315,7 +318,7 @@ CloseableIteration<Resource> getContexts() throws IOException {
CloseableIteration<? extends Statement> stIter1;
if (records == null) {
// Iterator over all statements
stIter1 = createStatementIterator(txn, null, null, null, true);
stIter1 = createStatementIterator(null, txn, null, null, null, true);
} else {
stIter1 = new LmdbStatementIterator(records, valueStore);
}
Expand Down Expand Up @@ -359,6 +362,7 @@ protected void handleClose() throws SailException {
* @return A StatementIterator that can be used to iterate over the statements that match the specified pattern.
*/
CloseableIteration<? extends Statement> createStatementIterator(
StatementOrder statementOrder,
Txn txn, Resource subj, IRI pred, Value obj, boolean explicit, Resource... contexts) throws IOException {
long subjID = LmdbValue.UNKNOWN_ID;
if (subj != null) {
Expand Down Expand Up @@ -405,7 +409,8 @@ CloseableIteration<? extends Statement> createStatementIterator(
ArrayList<LmdbStatementIterator> perContextIterList = new ArrayList<>(contextIDList.size());

for (long contextID : contextIDList) {
RecordIterator records = tripleStore.getTriples(txn, subjID, predID, objID, contextID, explicit);
RecordIterator records = tripleStore.getTriples(statementOrder, txn, subjID, predID, objID, contextID,
explicit);
perContextIterList.add(new LmdbStatementIterator(records, valueStore));
}

Expand Down Expand Up @@ -888,7 +893,7 @@ public CloseableIteration<? extends Resource> getContextIDs() throws SailExcepti
public CloseableIteration<? extends Statement> getStatements(Resource subj, IRI pred, Value obj,
Resource... contexts) throws SailException {
try {
return createStatementIterator(txn, subj, pred, obj, explicit, contexts);
return createStatementIterator(null, txn, subj, pred, obj, explicit, contexts);
} catch (IOException e) {
throw new SailException("Unable to get statements", e);
}
Expand All @@ -897,17 +902,25 @@ public CloseableIteration<? extends Statement> getStatements(Resource subj, IRI
@Override
public CloseableIteration<? extends Statement> getStatements(StatementOrder statementOrder, Resource subj,
IRI pred, Value obj, Resource... contexts) throws SailException {
throw new UnsupportedOperationException("Not implemented yet");
try {
return createStatementIterator(statementOrder, txn, subj, pred, obj, explicit, contexts);
} catch (IOException e) {
throw new SailException("Unable to get statements", e);
}
}

@Override
public Set<StatementOrder> getSupportedOrders(Resource subj, IRI pred, Value obj, Resource... contexts) {
return Set.of();
return supportedOrders;
}

@Override
public Comparator<Value> getComparator() {
throw new UnsupportedOperationException("Not implemented yet");
return (a, b) -> {
long id1 = (a instanceof LmdbValue) ? ((LmdbValue) a).getInternalID() : 0;
long id2 = (b instanceof LmdbValue) ? ((LmdbValue) b).getInternalID() : 0;
return Long.compare(id1, id2);
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@
import java.util.StringTokenizer;
import java.util.concurrent.locks.StampedLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.eclipse.rdf4j.common.order.StatementOrder;
import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.lmdb.TxnManager.Mode;
import org.eclipse.rdf4j.sail.lmdb.TxnManager.Txn;
Expand Down Expand Up @@ -267,6 +270,24 @@ private void checkVersion() throws SailException {
}
}

Set<StatementOrder> getStatementOrders() {
Set<String> indexSpecs = getIndexSpecs();
return indexSpecs.stream().flatMap(s -> {
switch (s.charAt(0)) {
case 's':
return Stream.of(StatementOrder.S);
case 'p':
return Stream.of(StatementOrder.P);
case 'o':
return Stream.of(StatementOrder.O);
case 'c':
return Stream.of(StatementOrder.C);
}
return Stream.empty();
}
).collect(Collectors.toSet());
}

private Set<String> getIndexSpecs() throws SailException {
String indexesStr = properties.getProperty(INDEXES_KEY);

Expand All @@ -288,8 +309,8 @@ TxnManager getTxnManager() {
}

/**
* Parses a comma/whitespace-separated list of index specifications. Index specifications are required to consists
* of 4 characters: 's', 'p', 'o' and 'c'.
* Parses a comma/whitespace-separated list of index specifications. Index specifications are required to consist of
* 4 characters: 's', 'p', 'o' and 'c'.
*
* @param indexSpecStr A string like "spoc, pocs, cosp".
* @return A Set containing the parsed index specifications.
Expand Down Expand Up @@ -473,7 +494,26 @@ public RecordIterator getAllTriplesSortedByContext(Txn txn) throws IOException {

public RecordIterator getTriples(Txn txn, long subj, long pred, long obj, long context, boolean explicit)
throws IOException {
TripleIndex index = getBestIndex(subj, pred, obj, context);
return getTriples(null, txn, subj, pred, obj, context, explicit);
}

public RecordIterator getTriples(StatementOrder statementOrder, Txn txn, long subj, long pred, long obj,
long context, boolean explicit)
throws IOException {
TripleIndex index = null;
if (statementOrder != null) {
char component = statementOrder.name().toLowerCase().charAt(0);
for (TripleIndex candidate : indexes) {
if (candidate.fieldSeq[0] == component) {
index = candidate;
}
}
if (index == null) {
throw new IOException("No index for statement order '" + statementOrder.name() + "' available.");
}
} else {
index = getBestIndex(subj, pred, obj, context);
}
// System.out.println("get triples: " + Arrays.asList(subj, pred, obj,context));
boolean doRangeSearch = index.getPatternScore(subj, pred, obj, context) > 0;
return getTriplesUsingIndex(txn, subj, pred, obj, context, explicit, index, doRangeSearch);
Expand Down Expand Up @@ -870,7 +910,7 @@ public boolean storeTriple(long subj, long pred, long obj, long context, boolean
*/
public void removeTriplesByContext(long subj, long pred, long obj, long context,
boolean explicit, Consumer<long[]> handler) throws IOException {
RecordIterator records = getTriples(txnManager.createTxn(writeTxn), subj, pred, obj, context, explicit);
RecordIterator records = getTriples(null, txnManager.createTxn(writeTxn), subj, pred, obj, context, explicit);
removeTriples(records, explicit, handler);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public class QueryBenchmark {

public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include("QueryBenchmark") // adapt to control which benchmark tests to run
.include("QueryBenchmark.complexQuery") // adapt to control which benchmark tests to run
.forks(1)
.build();

Expand Down

0 comments on commit 16eb699

Please sign in to comment.