Skip to content

Commit

Permalink
[ issue #5 ] NoSql Graph implementation (framework)
Browse files Browse the repository at this point in the history
  • Loading branch information
agazzarini committed Aug 13, 2014
1 parent 86746b3 commit 0a68618
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import org.gazzax.labs.jena.nosql.fwk.log.MessageCatalog;
import org.slf4j.LoggerFactory;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;

/**
Expand All @@ -18,6 +17,11 @@ public class CassandraClientShutdownHook implements ClientShutdownHook {
private final static Log LOGGER = new Log(LoggerFactory.getLogger(ClientShutdownHook.class));
private final Session session;

/**
* Builds a new Cassandra Client shutdown hook.
*
* @param session the connection to Cassandra.
*/
public CassandraClientShutdownHook(final Session session) {
this.session = session;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.gazzax.labs.jena.nosql.cassandra.dao.Cassandra2xBidirectionalMapDAO;
import org.gazzax.labs.jena.nosql.cassandra.dao.Cassandra2xMapDAO;
import org.gazzax.labs.jena.nosql.cassandra.dao.CassandraTripleIndexDAO;
import org.gazzax.labs.jena.nosql.cassandra.graph.CassandraGraph;
import org.gazzax.labs.jena.nosql.fwk.InitialisationException;
import org.gazzax.labs.jena.nosql.fwk.configuration.Configuration;
import org.gazzax.labs.jena.nosql.fwk.dictionary.TopLevelDictionary;
Expand Down Expand Up @@ -46,7 +45,6 @@
import com.datastax.driver.core.policies.Policies;
import com.datastax.driver.core.policies.ReconnectionPolicy;
import com.datastax.driver.core.policies.RetryPolicy;
import com.hp.hpl.jena.graph.Graph;

/**
* Concrete factory for creating Cassandra-backed domain and data access objects.
Expand Down Expand Up @@ -163,12 +161,6 @@ public TopLevelDictionary getDictionary() {
return dictionary;
}


@Override
public Graph getGraph() {
return new CassandraGraph(this);
}

@Override
public ClientShutdownHook getClientShutdownHook() {
return new CassandraClientShutdownHook(session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void initialise(final StorageLayerFactory factory) throws InitialisationE
* @return the id associated with the given value.
* @throws StorageLayerException in case of data access failure.
*/
public byte[] get(final String value) throws StorageLayerException {
public byte[] getId(final String value) throws StorageLayerException {
return byValue.get(value);
}

Expand Down Expand Up @@ -95,11 +95,11 @@ public boolean contains(final byte[] id) throws StorageLayerException {
/**
* Removes the given resource from this index.
*
* @param n3 the n3 representation of the resource to be removed.
* @param value the n3 representation of the resource to be removed.
* @throws StorageLayerException in case of data access failure.
*/
public void remove(final String n3) throws StorageLayerException {
byId.remove(byValue.get(n3));
byValue.remove(n3);
public void remove(final String value) throws StorageLayerException {
byId.remove(byValue.get(value));
byValue.remove(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ protected Node getValueInternal(final byte[] id, final boolean p) throws Storage
protected byte[] getID(final String n3, final boolean p) throws StorageLayerException {
return (n3 == null || n3.isEmpty() || n3.charAt(0) == '?')
? null
: p ? pIndex.get(n3) : soIndex.get(n3);
: p ? pIndex.getId(n3) : soIndex.getId(n3);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ protected void initialiseInternal(final StorageLayerFactory factory) throws Init
protected byte[] getID(final String n3, final boolean p) throws StorageLayerException {
return (n3 == null || n3.isEmpty() || n3.charAt(0) == '?')
? null
: index.get(n3);
: index.getId(n3);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ protected byte[] getIdInternal(final String value, final boolean p) throws Stora
byte[] id = null;

synchronized (this) {
id = index.get(value);
id = index.getId(value);
if (id[0] == NOT_SET[0]) {
id = newId(value, index);
index.putEntry(value, id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.gazzax.labs.jena.nosql.fwk.dictionary.TopLevelDictionary;
import org.gazzax.labs.jena.nosql.fwk.ds.MapDAO;
import org.gazzax.labs.jena.nosql.fwk.ds.TripleIndexDAO;
import org.gazzax.labs.jena.nosql.fwk.graph.NoSqlGraph;

import com.hp.hpl.jena.graph.Graph;

Expand Down Expand Up @@ -69,7 +70,9 @@ public abstract <K, V> MapDAO<K, V> getMapDAO(
*
* @return the {@link Graph} specific implementation associated with the underlying kind of storage.
*/
public abstract Graph getGraph();
public Graph getGraph() {
return new NoSqlGraph(this);
}

/**
* Factory method for obtaining a concrete factory.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package org.gazzax.labs.jena.nosql.fwk.graph;

import java.util.ArrayList;
import java.util.Iterator;

import org.gazzax.labs.jena.nosql.fwk.StorageLayerException;
import org.gazzax.labs.jena.nosql.fwk.dictionary.TopLevelDictionary;
import org.gazzax.labs.jena.nosql.fwk.ds.TripleIndexDAO;
import org.gazzax.labs.jena.nosql.fwk.factory.StorageLayerFactory;
import org.gazzax.labs.jena.nosql.fwk.log.Log;
import org.gazzax.labs.jena.nosql.fwk.log.MessageCatalog;
import org.gazzax.labs.jena.nosql.fwk.log.MessageFactory;
import org.slf4j.LoggerFactory;

import com.hp.hpl.jena.graph.Graph;
import com.hp.hpl.jena.graph.GraphEvents;
import com.hp.hpl.jena.graph.Node;
import com.hp.hpl.jena.graph.Triple;
import com.hp.hpl.jena.graph.TripleMatch;
import com.hp.hpl.jena.graph.impl.GraphBase;
import com.hp.hpl.jena.shared.AddDeniedException;
import com.hp.hpl.jena.shared.DeleteDeniedException;
import com.hp.hpl.jena.util.iterator.ExtendedIterator;
import com.hp.hpl.jena.util.iterator.WrappedIterator;

/**
* NoSQL {@link Graph} implementation.
*
* @author Andrea Gazzarini
* @since 1.0
*/
public class NoSqlGraph extends GraphBase {
private final static Log LOGGER = new Log(LoggerFactory.getLogger(NoSqlGraph.class));

private final static Iterator<byte[][]> EMPTY_IDS_ITERATOR = new ArrayList<byte[][]>(0).iterator();
private final static ExtendedIterator<Triple> EMPTY_TRIPLES_ITERATOR = WrappedIterator.createNoRemove(new ArrayList<Triple>(0).iterator());

private final TripleIndexDAO dao;
private final TopLevelDictionary dictionary;
private final Node name;

/**
* Builds a new unnamed graph with the given factory.
*
* @param factory the storage layer factory.
*/
public NoSqlGraph(final StorageLayerFactory factory) {
this(null, factory);
}

/**
* Builds a new named graph with the given data.
*
* @param name the graph name.
* @param factory the storage layer factory.
*/
public NoSqlGraph(final Node name, final StorageLayerFactory factory) {
this.name = name;
this.dao = factory.getTripleIndexDAO();
this.dictionary = factory.getDictionary();
}

@Override
public void performAdd(final Triple triple) {
try {
final byte [][] ids =
(name == null)
? dictionary.asIdentifiers(triple.getSubject(), triple.getPredicate(), triple.getObject())
: dictionary.asIdentifiers(triple.getSubject(), triple.getPredicate(), triple.getObject(), name);
dao.insertTriple(ids);
dao.executePendingMutations();
} catch (final StorageLayerException exception) {
final String message = MessageFactory.createMessage(MessageCatalog._00101_UNABLE_TO_ADD_TRIPLE, triple);
LOGGER.error(message, exception);
throw new AddDeniedException(message, triple);
}
}

@Override
public void performDelete(final Triple triple) {
try {
final byte [][] identifiers =
(name == null)
? dictionary.asIdentifiers(
triple.getSubject(),
triple.getPredicate(),
triple.getObject())
: dictionary.asIdentifiers(
triple.getSubject(),
triple.getPredicate(),
triple.getObject(),
name);

if (triple.isConcrete()) {
dao.deleteTriple(identifiers);
} else if (triple.getSubject().isConcrete() &&
triple.getPredicate().isConcrete() &&
triple.getObject().isConcrete()){
clear();
} else {
// TODO: batch size must be configurable
dao.deleteTriples(query(identifiers), 1000);
}
} catch (final StorageLayerException exception) {
final String message = MessageFactory.createMessage(MessageCatalog._00100_UNABLE_TO_DELETE_TRIPLE, triple);
LOGGER.error(message, exception);
throw new DeleteDeniedException(message, triple);
}
}

@Override
public void clear()
{
dao.clear();
getEventManager().notifyEvent(this, GraphEvents.removeAll ) ;
}

@Override
protected ExtendedIterator<Triple> graphBaseFind(final TripleMatch pattern) {
try
{
final byte [][] identifiers =
(name == null)
? dictionary.asIdentifiers(
pattern.getMatchSubject(),
pattern.getMatchPredicate(),
pattern.getMatchObject())
: dictionary.asIdentifiers(
pattern.getMatchSubject(),
pattern.getMatchPredicate(),
pattern.getMatchObject(),
name);
return WrappedIterator.createNoRemove(dictionary.asTripleIterator(query(identifiers)));
} catch (StorageLayerException exception) {
LOGGER.error(MessageCatalog._00010_DATA_ACCESS_LAYER_FAILURE, exception);
return EMPTY_TRIPLES_ITERATOR;
}
}

Iterator<byte[][]> query(final byte[][] query) throws StorageLayerException {
return (query != null && query.length >= 3)
? dao.query(query)
: EMPTY_IDS_ITERATOR;
}
}

0 comments on commit 0a68618

Please sign in to comment.