diff --git a/jena-nosql-binding/jena-nosql-binding-cassandra/src/main/java/org/gazzax/labs/jena/nosql/cassandra/CassandraClientShutdownHook.java b/jena-nosql-binding/jena-nosql-binding-cassandra/src/main/java/org/gazzax/labs/jena/nosql/cassandra/CassandraClientShutdownHook.java index 413cc9d..8d1cf27 100644 --- a/jena-nosql-binding/jena-nosql-binding-cassandra/src/main/java/org/gazzax/labs/jena/nosql/cassandra/CassandraClientShutdownHook.java +++ b/jena-nosql-binding/jena-nosql-binding-cassandra/src/main/java/org/gazzax/labs/jena/nosql/cassandra/CassandraClientShutdownHook.java @@ -5,7 +5,6 @@ import org.gazzax.labs.jena.nosql.fwk.log.MessageCatalog; import org.slf4j.LoggerFactory; -import com.datastax.driver.core.Cluster; import com.datastax.driver.core.Session; /** @@ -18,6 +17,11 @@ public class CassandraClientShutdownHook implements ClientShutdownHook { private final static Log LOGGER = new Log(LoggerFactory.getLogger(ClientShutdownHook.class)); private final Session session; + /** + * Builds a new Cassandra Client shutdown hook. + * + * @param session the connection to Cassandra. + */ public CassandraClientShutdownHook(final Session session) { this.session = session; } diff --git a/jena-nosql-binding/jena-nosql-binding-cassandra/src/main/java/org/gazzax/labs/jena/nosql/cassandra/CassandraStorageLayerFactory.java b/jena-nosql-binding/jena-nosql-binding-cassandra/src/main/java/org/gazzax/labs/jena/nosql/cassandra/CassandraStorageLayerFactory.java index eafbbb5..999801f 100644 --- a/jena-nosql-binding/jena-nosql-binding-cassandra/src/main/java/org/gazzax/labs/jena/nosql/cassandra/CassandraStorageLayerFactory.java +++ b/jena-nosql-binding/jena-nosql-binding-cassandra/src/main/java/org/gazzax/labs/jena/nosql/cassandra/CassandraStorageLayerFactory.java @@ -15,7 +15,6 @@ import org.gazzax.labs.jena.nosql.cassandra.dao.Cassandra2xBidirectionalMapDAO; import org.gazzax.labs.jena.nosql.cassandra.dao.Cassandra2xMapDAO; import org.gazzax.labs.jena.nosql.cassandra.dao.CassandraTripleIndexDAO; -import org.gazzax.labs.jena.nosql.cassandra.graph.CassandraGraph; import org.gazzax.labs.jena.nosql.fwk.InitialisationException; import org.gazzax.labs.jena.nosql.fwk.configuration.Configuration; import org.gazzax.labs.jena.nosql.fwk.dictionary.TopLevelDictionary; @@ -46,7 +45,6 @@ import com.datastax.driver.core.policies.Policies; import com.datastax.driver.core.policies.ReconnectionPolicy; import com.datastax.driver.core.policies.RetryPolicy; -import com.hp.hpl.jena.graph.Graph; /** * Concrete factory for creating Cassandra-backed domain and data access objects. @@ -163,12 +161,6 @@ public TopLevelDictionary getDictionary() { return dictionary; } - - @Override - public Graph getGraph() { - return new CassandraGraph(this); - } - @Override public ClientShutdownHook getClientShutdownHook() { return new CassandraClientShutdownHook(session); diff --git a/jena-nosql-framework/src/main/java/org/gazzax/labs/jena/nosql/fwk/BIndex.java b/jena-nosql-framework/src/main/java/org/gazzax/labs/jena/nosql/fwk/BIndex.java index cf1796b..c096895 100644 --- a/jena-nosql-framework/src/main/java/org/gazzax/labs/jena/nosql/fwk/BIndex.java +++ b/jena-nosql-framework/src/main/java/org/gazzax/labs/jena/nosql/fwk/BIndex.java @@ -53,7 +53,7 @@ public void initialise(final StorageLayerFactory factory) throws InitialisationE * @return the id associated with the given value. * @throws StorageLayerException in case of data access failure. */ - public byte[] get(final String value) throws StorageLayerException { + public byte[] getId(final String value) throws StorageLayerException { return byValue.get(value); } @@ -95,11 +95,11 @@ public boolean contains(final byte[] id) throws StorageLayerException { /** * Removes the given resource from this index. * - * @param n3 the n3 representation of the resource to be removed. + * @param value the n3 representation of the resource to be removed. * @throws StorageLayerException in case of data access failure. */ - public void remove(final String n3) throws StorageLayerException { - byId.remove(byValue.get(n3)); - byValue.remove(n3); + public void remove(final String value) throws StorageLayerException { + byId.remove(byValue.get(value)); + byValue.remove(value); } } \ No newline at end of file diff --git a/jena-nosql-framework/src/main/java/org/gazzax/labs/jena/nosql/fwk/dictionary/node/PersistentNodeDictionary.java b/jena-nosql-framework/src/main/java/org/gazzax/labs/jena/nosql/fwk/dictionary/node/PersistentNodeDictionary.java index dc27a45..fe259c3 100644 --- a/jena-nosql-framework/src/main/java/org/gazzax/labs/jena/nosql/fwk/dictionary/node/PersistentNodeDictionary.java +++ b/jena-nosql-framework/src/main/java/org/gazzax/labs/jena/nosql/fwk/dictionary/node/PersistentNodeDictionary.java @@ -83,7 +83,7 @@ protected Node getValueInternal(final byte[] id, final boolean p) throws Storage protected byte[] getID(final String n3, final boolean p) throws StorageLayerException { return (n3 == null || n3.isEmpty() || n3.charAt(0) == '?') ? null - : p ? pIndex.get(n3) : soIndex.get(n3); + : p ? pIndex.getId(n3) : soIndex.getId(n3); } @Override diff --git a/jena-nosql-framework/src/main/java/org/gazzax/labs/jena/nosql/fwk/dictionary/node/SingleIndexNodeDictionary.java b/jena-nosql-framework/src/main/java/org/gazzax/labs/jena/nosql/fwk/dictionary/node/SingleIndexNodeDictionary.java index cde2a46..f30d624 100644 --- a/jena-nosql-framework/src/main/java/org/gazzax/labs/jena/nosql/fwk/dictionary/node/SingleIndexNodeDictionary.java +++ b/jena-nosql-framework/src/main/java/org/gazzax/labs/jena/nosql/fwk/dictionary/node/SingleIndexNodeDictionary.java @@ -49,7 +49,7 @@ protected void initialiseInternal(final StorageLayerFactory factory) throws Init protected byte[] getID(final String n3, final boolean p) throws StorageLayerException { return (n3 == null || n3.isEmpty() || n3.charAt(0) == '?') ? null - : index.get(n3); + : index.getId(n3); } /** diff --git a/jena-nosql-framework/src/main/java/org/gazzax/labs/jena/nosql/fwk/dictionary/string/PersistentStringDictionary.java b/jena-nosql-framework/src/main/java/org/gazzax/labs/jena/nosql/fwk/dictionary/string/PersistentStringDictionary.java index 21196bd..d3f84c1 100644 --- a/jena-nosql-framework/src/main/java/org/gazzax/labs/jena/nosql/fwk/dictionary/string/PersistentStringDictionary.java +++ b/jena-nosql-framework/src/main/java/org/gazzax/labs/jena/nosql/fwk/dictionary/string/PersistentStringDictionary.java @@ -35,7 +35,7 @@ protected byte[] getIdInternal(final String value, final boolean p) throws Stora byte[] id = null; synchronized (this) { - id = index.get(value); + id = index.getId(value); if (id[0] == NOT_SET[0]) { id = newId(value, index); index.putEntry(value, id); diff --git a/jena-nosql-framework/src/main/java/org/gazzax/labs/jena/nosql/fwk/factory/StorageLayerFactory.java b/jena-nosql-framework/src/main/java/org/gazzax/labs/jena/nosql/fwk/factory/StorageLayerFactory.java index c775aec..7f171ee 100644 --- a/jena-nosql-framework/src/main/java/org/gazzax/labs/jena/nosql/fwk/factory/StorageLayerFactory.java +++ b/jena-nosql-framework/src/main/java/org/gazzax/labs/jena/nosql/fwk/factory/StorageLayerFactory.java @@ -11,6 +11,7 @@ import org.gazzax.labs.jena.nosql.fwk.dictionary.TopLevelDictionary; import org.gazzax.labs.jena.nosql.fwk.ds.MapDAO; import org.gazzax.labs.jena.nosql.fwk.ds.TripleIndexDAO; +import org.gazzax.labs.jena.nosql.fwk.graph.NoSqlGraph; import com.hp.hpl.jena.graph.Graph; @@ -69,7 +70,9 @@ public abstract MapDAO getMapDAO( * * @return the {@link Graph} specific implementation associated with the underlying kind of storage. */ - public abstract Graph getGraph(); + public Graph getGraph() { + return new NoSqlGraph(this); + } /** * Factory method for obtaining a concrete factory. diff --git a/jena-nosql-framework/src/main/java/org/gazzax/labs/jena/nosql/fwk/graph/NoSqlGraph.java b/jena-nosql-framework/src/main/java/org/gazzax/labs/jena/nosql/fwk/graph/NoSqlGraph.java new file mode 100644 index 0000000..e15e707 --- /dev/null +++ b/jena-nosql-framework/src/main/java/org/gazzax/labs/jena/nosql/fwk/graph/NoSqlGraph.java @@ -0,0 +1,145 @@ +package org.gazzax.labs.jena.nosql.fwk.graph; + +import java.util.ArrayList; +import java.util.Iterator; + +import org.gazzax.labs.jena.nosql.fwk.StorageLayerException; +import org.gazzax.labs.jena.nosql.fwk.dictionary.TopLevelDictionary; +import org.gazzax.labs.jena.nosql.fwk.ds.TripleIndexDAO; +import org.gazzax.labs.jena.nosql.fwk.factory.StorageLayerFactory; +import org.gazzax.labs.jena.nosql.fwk.log.Log; +import org.gazzax.labs.jena.nosql.fwk.log.MessageCatalog; +import org.gazzax.labs.jena.nosql.fwk.log.MessageFactory; +import org.slf4j.LoggerFactory; + +import com.hp.hpl.jena.graph.Graph; +import com.hp.hpl.jena.graph.GraphEvents; +import com.hp.hpl.jena.graph.Node; +import com.hp.hpl.jena.graph.Triple; +import com.hp.hpl.jena.graph.TripleMatch; +import com.hp.hpl.jena.graph.impl.GraphBase; +import com.hp.hpl.jena.shared.AddDeniedException; +import com.hp.hpl.jena.shared.DeleteDeniedException; +import com.hp.hpl.jena.util.iterator.ExtendedIterator; +import com.hp.hpl.jena.util.iterator.WrappedIterator; + +/** + * NoSQL {@link Graph} implementation. + * + * @author Andrea Gazzarini + * @since 1.0 + */ +public class NoSqlGraph extends GraphBase { + private final static Log LOGGER = new Log(LoggerFactory.getLogger(NoSqlGraph.class)); + + private final static Iterator EMPTY_IDS_ITERATOR = new ArrayList(0).iterator(); + private final static ExtendedIterator EMPTY_TRIPLES_ITERATOR = WrappedIterator.createNoRemove(new ArrayList(0).iterator()); + + private final TripleIndexDAO dao; + private final TopLevelDictionary dictionary; + private final Node name; + + /** + * Builds a new unnamed graph with the given factory. + * + * @param factory the storage layer factory. + */ + public NoSqlGraph(final StorageLayerFactory factory) { + this(null, factory); + } + + /** + * Builds a new named graph with the given data. + * + * @param name the graph name. + * @param factory the storage layer factory. + */ + public NoSqlGraph(final Node name, final StorageLayerFactory factory) { + this.name = name; + this.dao = factory.getTripleIndexDAO(); + this.dictionary = factory.getDictionary(); + } + + @Override + public void performAdd(final Triple triple) { + try { + final byte [][] ids = + (name == null) + ? dictionary.asIdentifiers(triple.getSubject(), triple.getPredicate(), triple.getObject()) + : dictionary.asIdentifiers(triple.getSubject(), triple.getPredicate(), triple.getObject(), name); + dao.insertTriple(ids); + dao.executePendingMutations(); + } catch (final StorageLayerException exception) { + final String message = MessageFactory.createMessage(MessageCatalog._00101_UNABLE_TO_ADD_TRIPLE, triple); + LOGGER.error(message, exception); + throw new AddDeniedException(message, triple); + } + } + + @Override + public void performDelete(final Triple triple) { + try { + final byte [][] identifiers = + (name == null) + ? dictionary.asIdentifiers( + triple.getSubject(), + triple.getPredicate(), + triple.getObject()) + : dictionary.asIdentifiers( + triple.getSubject(), + triple.getPredicate(), + triple.getObject(), + name); + + if (triple.isConcrete()) { + dao.deleteTriple(identifiers); + } else if (triple.getSubject().isConcrete() && + triple.getPredicate().isConcrete() && + triple.getObject().isConcrete()){ + clear(); + } else { + // TODO: batch size must be configurable + dao.deleteTriples(query(identifiers), 1000); + } + } catch (final StorageLayerException exception) { + final String message = MessageFactory.createMessage(MessageCatalog._00100_UNABLE_TO_DELETE_TRIPLE, triple); + LOGGER.error(message, exception); + throw new DeleteDeniedException(message, triple); + } + } + + @Override + public void clear() + { + dao.clear(); + getEventManager().notifyEvent(this, GraphEvents.removeAll ) ; + } + + @Override + protected ExtendedIterator graphBaseFind(final TripleMatch pattern) { + try + { + final byte [][] identifiers = + (name == null) + ? dictionary.asIdentifiers( + pattern.getMatchSubject(), + pattern.getMatchPredicate(), + pattern.getMatchObject()) + : dictionary.asIdentifiers( + pattern.getMatchSubject(), + pattern.getMatchPredicate(), + pattern.getMatchObject(), + name); + return WrappedIterator.createNoRemove(dictionary.asTripleIterator(query(identifiers))); + } catch (StorageLayerException exception) { + LOGGER.error(MessageCatalog._00010_DATA_ACCESS_LAYER_FAILURE, exception); + return EMPTY_TRIPLES_ITERATOR; + } + } + + Iterator query(final byte[][] query) throws StorageLayerException { + return (query != null && query.length >= 3) + ? dao.query(query) + : EMPTY_IDS_ITERATOR; + } +} \ No newline at end of file