Skip to content

Commit

Permalink
[ issue #5 ] triple deletion has been implemented on C*Graph
Browse files Browse the repository at this point in the history
  • Loading branch information
agazzarini committed Aug 13, 2014
1 parent acd0f73 commit 668fb72
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.utils.Bytes;
import com.google.common.collect.AbstractIterator;

/**
* Cassandra 2x (CQL-based) implementation of {@link TripleIndexDAO}.
Expand Down Expand Up @@ -50,10 +53,6 @@ protected BatchStatement initialValue() {
private PreparedStatement clearSPOCStatement;
private PreparedStatement clearOSPCStatement;
private PreparedStatement clearPOSCStatement;
private PreparedStatement clearNSPOStatement;
private PreparedStatement clearNPOSStatement;
private PreparedStatement clearDSPOStatement;
private PreparedStatement clearDPOSStatement;

private PreparedStatement[] queries;

Expand Down Expand Up @@ -159,22 +158,50 @@ public void executePendingMutations() throws StorageLayerException {
}
}

@Override
public Iterator<byte[][]> query(final byte[][] query) throws StorageLayerException {
int qindex = (query[0] == null) ? 4 : 0;
qindex += (query[1] == null) ? 2 : 0;
qindex += (query[2] == null) ? 1 : 0;

final BoundStatement statement = queries[qindex].bind();
int index = 0;
for (final byte[] binding : query) {
if (binding != null) {
statement.setBytesUnsafe(index++, ByteBuffer.wrap(binding));
}
}

final Iterator<Row> iterator = session.executeAsync(statement).getUninterruptibly().iterator();
return new AbstractIterator<byte[][]>() {
@Override
protected byte[][] computeNext() {
return iterator.hasNext()? asByteArray(iterator.next()) : endOfData();
}
};
}

private byte[][] asByteArray(final Row row) {
final byte[] s = Bytes.getArray(row.getBytesUnsafe(0));
final byte[] p = Bytes.getArray(row.getBytesUnsafe(1));
final byte[] o = Bytes.getArray(row.getBytesUnsafe(2));
final ByteBuffer c = row.getBytesUnsafe(3);
return (c != null)
? new byte[][] {s, p, o}
: new byte[][] {s, p, o, Bytes.getArray(c)};
}

@Override
public void clear() {
session.execute(clearSPOCStatement.bind());
session.execute(clearOSPCStatement.bind());
session.execute(clearPOSCStatement.bind());
session.execute(clearNSPOStatement.bind());
session.execute(clearNPOSStatement.bind());
session.execute(clearDSPOStatement.bind());
session.execute(clearDPOSStatement.bind());
}

/**
* Initializes PreparedStatements.
*/
protected void prepareStatements() {

insertSPOCStatement = session.prepare("INSERT INTO " + S_POC + "(s, p, o, c) VALUES (?, ?, ?, ?)");
insertOSPCStatement = session.prepare("INSERT INTO " + O_SPC + "(o, s, p, c) VALUES (?, ?, ?, ?)");
insertPOSCStatement = session.prepare("INSERT INTO " + PO_SC + "(p, o, s, c, p_index) VALUES (?, ?, ?, ?, ?)");
Expand All @@ -187,58 +214,18 @@ protected void prepareStatements() {
clearOSPCStatement = session.prepare("TRUNCATE " + O_SPC);
clearPOSCStatement = session.prepare("TRUNCATE " + PO_SC);

queries = new PreparedStatement[8];
queries[0] = session.prepare(SELECT_SPOC_FROM + S_POC + " WHERE s = ? AND p = ? AND o = ? LIMIT ?");
queries[1] = session.prepare(SELECT_SPOC_FROM + S_POC + " WHERE s = ? AND p = ? LIMIT ?");
queries[2] = session.prepare(SELECT_SPOC_FROM + O_SPC + " WHERE s = ? AND o = ? LIMIT ?");
queries[3] = session.prepare(SELECT_SPOC_FROM + S_POC + " WHERE s = ? LIMIT ?");
queries[4] = session.prepare(SELECT_SPOC_FROM + PO_SC + " WHERE p = ? AND o = ? LIMIT ?");
queries[5] = session.prepare(SELECT_SPOC_FROM + PO_SC + " WHERE p_index = ? LIMIT ?");
queries[6] = session.prepare(SELECT_SPOC_FROM + O_SPC + " WHERE o = ? LIMIT ?");
queries[7] = session.prepare(SELECT_SPOC_FROM + S_POC + " LIMIT ?");
}

/**
* Returns the index of the prepared statement to handle the range query with the given parameters.
*
* @param reverse True if the result should be returned reversed, false if it should be returned normally.
* @param subjectIsVariable True if the subject of the query is variable, false if it is set.
* @param typeIsDouble True if the type of the range is double, false if it is date.
* @param upperBoundIsOpen True if the upper bound is smaller-than relation, false if it is a smaller-than-or-equal-to relation.
* @param lowerBoundIsOpen True if the lower bound is greater-than relation, false if it is a greater-than-or-equal-to relation.
* @return The index of the prepared statement.
*/
int getRangeQueryIndex(
final boolean reverse,
final boolean subjectIsVariable,
final boolean typeIsDouble,
final boolean upperBoundIsOpen,
final boolean lowerBoundIsOpen) {
int index = 0;

if (reverse) {
index += 16;
}

if (subjectIsVariable) {
index += 8;
}

if (typeIsDouble) {
index += 4;
}

if (upperBoundIsOpen) {
index += 2;
}

if (lowerBoundIsOpen) {
index += 1;
}

return index;
}

queries = new PreparedStatement[] {
session.prepare(SELECT_SPOC_FROM + S_POC + " WHERE s = ? AND p = ? AND o = ?"),
session.prepare(SELECT_SPOC_FROM + S_POC + " WHERE s = ? AND p = ?"),
session.prepare(SELECT_SPOC_FROM + O_SPC + " WHERE s = ? AND o = ?"),
session.prepare(SELECT_SPOC_FROM + S_POC + " WHERE s = ?"),
session.prepare(SELECT_SPOC_FROM + PO_SC + " WHERE p = ? AND o = ?"),
session.prepare(SELECT_SPOC_FROM + PO_SC + " WHERE p_index = ?"),
session.prepare(SELECT_SPOC_FROM + O_SPC + " WHERE o = ?"),
session.prepare(SELECT_SPOC_FROM + S_POC)
};
}

/**
* Internal method used for reuse delete stuff.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.gazzax.labs.jena.nosql.cassandra.graph;

import java.util.ArrayList;
import java.util.Iterator;

import org.gazzax.labs.jena.nosql.fwk.StorageLayerException;
Expand All @@ -12,6 +13,7 @@
import org.slf4j.LoggerFactory;

import com.hp.hpl.jena.graph.Graph;
import com.hp.hpl.jena.graph.GraphEvents;
import com.hp.hpl.jena.graph.Node;
import com.hp.hpl.jena.graph.Triple;
import com.hp.hpl.jena.graph.TripleMatch;
Expand All @@ -31,6 +33,8 @@
public class CassandraGraph extends GraphBase {
private final static Log LOGGER = new Log(LoggerFactory.getLogger(CassandraGraph.class));

private final static Iterator<byte[][]> EMPTY_ITERATOR = new ArrayList<byte[][]>(0).iterator();

private final TripleIndexDAO dao;
private final TopLevelDictionary dictionary;
private final Node name;
Expand Down Expand Up @@ -104,13 +108,22 @@ public void performDelete(final Triple triple) {
}
}

@Override
public void clear()
{
dao.clear();
getEventManager().notifyEvent(this, GraphEvents.removeAll ) ;
}

@Override
protected ExtendedIterator<Triple> graphBaseFind(TripleMatch m) {
// TODO Auto-generated method stub
return null;
}

Iterator<byte[][]> query(final byte[][] query) {
return null;
Iterator<byte[][]> query(final byte[][] query) throws StorageLayerException {
return (query != null && query.length >= 3)
? dao.query(query)
: EMPTY_ITERATOR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import com.hp.hpl.jena.graph.Graph;
import com.hp.hpl.jena.rdf.model.Model;
import com.hp.hpl.jena.rdf.model.ModelFactory;
import com.hp.hpl.jena.rdf.model.Statement;
import com.hp.hpl.jena.rdf.model.StmtIterator;
import com.hp.hpl.jena.sparql.vocabulary.FOAF;

public class ExampleTest {

Expand All @@ -22,9 +25,18 @@ public static void main(String[] args) throws Exception {
// implementations of family members (Dictionary, Graph and so on)
final Graph graph = factory.getGraph(); // This is a CassandraGraph
final Model model = ModelFactory.createModelForGraph(graph);

Statement st = model.createStatement(
model.createResource("http://rdf.gx.org/id/resources#me"),
FOAF.name,
model.createLiteral("Andrea Gazzarini"));

model.read(new FileReader("/work/data/rdf/sample.nt"), "http://base.example.org", "N3");
model.add(st);
model.remove(st);

model.read(new FileReader("/work/data/rdf/sample.nt"), "http://base.example.org", "N3");
model.removeAll();

factory.getClientShutdownHook().close(); // This is a CassandraClientShutdownHook
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,6 @@ List<byte[][]> deleteTriples(
* Clears the storage (i.e. remove all triples from the storage).
*/
void clear();

Iterator<byte[][]> query(byte[][] query) throws StorageLayerException;
}

0 comments on commit 668fb72

Please sign in to comment.