Skip to content

Commit

Permalink
[ issue #5 ] major refactoring on AbstractFactory (DAOs)
Browse files Browse the repository at this point in the history
  • Loading branch information
agazzarini committed Aug 29, 2014
1 parent 8a524bb commit 173b33a
Show file tree
Hide file tree
Showing 17 changed files with 199 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.gazzax.labs.jena.nosql.fwk.dictionary.TopLevelDictionary;
import org.gazzax.labs.jena.nosql.fwk.dictionary.node.TransientNodeDictionary;
import org.gazzax.labs.jena.nosql.fwk.ds.MapDAO;
import org.gazzax.labs.jena.nosql.fwk.ds.TripleIndexDAO;
import org.gazzax.labs.jena.nosql.fwk.ds.GraphDAO;
import org.gazzax.labs.jena.nosql.fwk.factory.ClientShutdownHook;
import org.gazzax.labs.jena.nosql.fwk.factory.StorageLayerFactory;

Expand All @@ -38,6 +38,7 @@
import com.datastax.driver.core.policies.Policies;
import com.datastax.driver.core.policies.ReconnectionPolicy;
import com.datastax.driver.core.policies.RetryPolicy;
import com.hp.hpl.jena.graph.Node;

/**
* Concrete factory for creating Cassandra-backed domain and data access objects.
Expand Down Expand Up @@ -75,12 +76,19 @@ public <K, V> MapDAO<K, V> getMapDAO(
}

@Override
public TripleIndexDAO<byte[][], byte[][]> getTripleIndexDAO() {
public GraphDAO<byte[][], byte[][]> getGraphDAO(final Node name) {
return new CassandraTripleIndexDAO(session, dictionary);
}

@Override
public GraphDAO<byte[][], byte[][]> getGraphDAO() {
return new CassandraTripleIndexDAO(session, dictionary);
}

@Override
public void accept(final Configuration<Map<String, Object>> configuration) {
deletionBatchSize = configuration.getParameter("delete-batch-size", Integer.valueOf(1000));

final String hosts = configuration.getParameter("cassandra-contact-points", "localhost");

final Cluster.Builder builder = Cluster.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import org.gazzax.labs.jena.nosql.fwk.StorageLayerException;
import org.gazzax.labs.jena.nosql.fwk.dictionary.TopLevelDictionary;
import org.gazzax.labs.jena.nosql.fwk.ds.TripleIndexDAO;
import org.gazzax.labs.jena.nosql.fwk.ds.GraphDAO;

import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.BoundStatement;
Expand All @@ -22,15 +22,15 @@
import com.google.common.collect.AbstractIterator;

/**
* Cassandra 2x (CQL-based) implementation of {@link TripleIndexDAO}.
* Cassandra 2x (CQL-based) implementation of {@link GraphDAO}.
*
* This class has been derived from CumulusRDF code, with many thanks to CumulusRDF team for allowing this.
*
* @see https://code.google.com/p/cumulusrdf
* @author Andrea Gazzarini
* @since 1.0
*/
public class CassandraTripleIndexDAO implements TripleIndexDAO<byte[][], byte[][]> {
public class CassandraTripleIndexDAO implements GraphDAO<byte[][], byte[][]> {
protected static final byte[] EMPTY_VAL = new byte[0];
protected static final String SELECT_SPOC_FROM = "SELECT s, p, o, c FROM ";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
import org.gazzax.labs.jena.nosql.fwk.configuration.Configuration;
import org.gazzax.labs.jena.nosql.fwk.dictionary.TopLevelDictionary;
import org.gazzax.labs.jena.nosql.fwk.ds.MapDAO;
import org.gazzax.labs.jena.nosql.fwk.ds.TripleIndexDAO;
import org.gazzax.labs.jena.nosql.fwk.ds.GraphDAO;
import org.gazzax.labs.jena.nosql.fwk.factory.ClientShutdownHook;
import org.gazzax.labs.jena.nosql.fwk.factory.StorageLayerFactory;
import org.gazzax.labs.jena.nosql.solr.dao.SolrTripleIndexDAO;
import org.gazzax.labs.jena.nosql.solr.dao.SolrGraphDAO;
import org.gazzax.labs.jena.nosql.solr.graph.SolrGraph;

import com.hp.hpl.jena.graph.Graph;
Expand All @@ -30,11 +30,14 @@ public class SolrStorageLayerFactory extends StorageLayerFactory {

@Override
public void accept(final Configuration<Map<String, Object>> configuration) {
deletionBatchSize = configuration.getParameter("delete-batch-size", Integer.valueOf(1000));

final String address = configuration.getParameter("solr-address", "http://127.0.0.1:8080/solr/store");
try {
solr = (SolrServer) Class.forName(configuration.getParameter("solr-server-class", HttpSolrServer.class.getName()))
.getConstructor(String.class)
.newInstance(address);

} catch (final Exception exception) {
throw new IllegalArgumentException(exception);
}
Expand All @@ -51,19 +54,24 @@ public <K, V> MapDAO<K, V> getMapDAO(

@Override
public Graph getGraph() {
return new SolrGraph(this);
return new SolrGraph(this, deletionBatchSize);
}

@Override
public Graph getGraph(Node graphNode) {
return new SolrGraph(graphNode, this);
return new SolrGraph(graphNode, this, deletionBatchSize);
}

@Override
public TripleIndexDAO<Triple, TripleMatch> getTripleIndexDAO() {
return new SolrTripleIndexDAO(solr);
public GraphDAO<Triple, TripleMatch> getGraphDAO(final Node name) {
return new SolrGraphDAO(solr, name);
}


@Override
public GraphDAO<Triple, TripleMatch> getGraphDAO() {
return new SolrGraphDAO(solr);
}

@Override
public TopLevelDictionary getDictionary() {
return dictionary;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public Triple next() {
* @param solr the SOLR facade.
* @param query the query that will be submitted.
*/
public SolrDeepPagingIterator(final SolrServer solr, final SolrQuery query) {
SolrDeepPagingIterator(final SolrServer solr, final SolrQuery query) {
this.solr = solr;
this.query = query;
this.sentCursorMark = CursorMarkParams.CURSOR_MARK_START;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrInputDocument;
import org.gazzax.labs.jena.nosql.fwk.StorageLayerException;
import org.gazzax.labs.jena.nosql.fwk.ds.TripleIndexDAO;
import org.gazzax.labs.jena.nosql.fwk.ds.GraphDAO;
import org.gazzax.labs.jena.nosql.fwk.log.Log;
import org.gazzax.labs.jena.nosql.fwk.log.MessageCatalog;
import org.gazzax.labs.jena.nosql.solr.Field;
Expand All @@ -24,18 +24,36 @@
import com.hp.hpl.jena.graph.Triple;
import com.hp.hpl.jena.graph.TripleMatch;

public class SolrTripleIndexDAO implements TripleIndexDAO<Triple, TripleMatch> {
protected final Log logger = new Log(LoggerFactory.getLogger(SolrTripleIndexDAO.class));
/**
* {@link GraphDAO} implementation for Apache SOLR.
*
* @see http://lucene.apache.org/solr
* @author Andrea Gazzarini
* @since 1.0
*/
public class SolrGraphDAO implements GraphDAO<Triple, TripleMatch> {
protected final Log logger = new Log(LoggerFactory.getLogger(SolrGraphDAO.class));

private final SolrServer solr;
private final Node name;

/**
* Builds a new {@link TripleIndexDAO} with the given SOLR client.
* Builds a new {@link GraphDAO} with the given SOLR client.
*
* @param solr the SOLR client.
*/
public SolrTripleIndexDAO(final SolrServer solr) {
public SolrGraphDAO(final SolrServer solr) {
this(solr, null);
}

/**
* Builds a new {@link GraphDAO} with the given SOLR client.
*
* @param solr the SOLR client.
*/
public SolrGraphDAO(final SolrServer solr, final Node name) {
this.solr = solr;
this.name = name;
}

@Override
Expand All @@ -45,6 +63,9 @@ public void insertTriple(final Triple triple) throws StorageLayerException {
document.setField(Field.P, asNtURI(triple.getPredicate()));
document.setField(Field.O, asNt(triple.getObject()));

// TODO : with a state pattern I could avoid this conditional logic.
document.setField(Field.C, name != null ? asNtURI(name) : null);

try {
solr.add(document);
} catch (final Exception exception) {
Expand All @@ -61,21 +82,6 @@ public void deleteTriple(final Triple triple) throws StorageLayerException {
}
}

/**
* Builds a delete query starting from a given triple.
*
* @param triple the triple.
* @return a delete query starting from a given triple.
*/
private String deleteQuery(final Triple triple) {

return new StringBuilder()
.append(Field.S).append(":\"").append(ClientUtils.escapeQueryChars(asNt(triple.getSubject()))).append("\" AND ")
.append(Field.P).append(":\"").append(ClientUtils.escapeQueryChars(asNt(triple.getPredicate()))).append("\" AND ")
.append(Field.O).append(":\"").append(ClientUtils.escapeQueryChars(asNt(triple.getObject()))).append("\"")
.toString();
}

// TODO: To be optimized...with this implementation wildcard queries are not supported
// so if I need to delete 5 triples then 5 commands should be issued.
@Override
Expand All @@ -100,17 +106,14 @@ public List<Triple> deleteTriples(

@Override
public void executePendingMutations() throws StorageLayerException {
try {
solr.commit();
} catch (final Exception exception) {
throw new StorageLayerException(exception);
}
// Do nothing here...
}

// TODO: delete without name deletes all??
@Override
public void clear() {
try {
solr.deleteByQuery("*:*");
solr.deleteByQuery(name != null ? "*:*" : Field.C + ":\"" + ClientUtils.escapeQueryChars(asNtURI(name)) + "\"");
} catch (final Exception exception) {
logger.error(MessageCatalog._00170_UNABLE_TO_CLEAR, exception);
}
Expand All @@ -126,15 +129,19 @@ public Iterator<Triple> query(final TripleMatch query) throws StorageLayerExcept
final Node o = query.getMatchObject();

if (s != null) {
q.addFilterQuery(newFilterQuery(Field.S, ClientUtils.escapeQueryChars(asNt(s))));
q.addFilterQuery(newFilterQuery(Field.S, asNt(s)));
}

if (p != null) {
q.addFilterQuery(newFilterQuery(Field.P, ClientUtils.escapeQueryChars(asNtURI(p))));
q.addFilterQuery(newFilterQuery(Field.P, asNtURI(p)));
}

if (o != null) {
q.addFilterQuery(newFilterQuery(Field.O, ClientUtils.escapeQueryChars(asNt(o))));
q.addFilterQuery(newFilterQuery(Field.O, asNt(o)));
}

if (name != null) {
q.addFilterQuery(newFilterQuery(Field.C, asNtURI(name)));
}

return new SolrDeepPagingIterator(solr, q);
Expand All @@ -156,4 +163,23 @@ String newFilterQuery(final String fieldName, final String value) {
.toString();
}

/**
* Builds a delete query starting from a given triple.
*
* @param triple the triple.
* @return a delete query starting from a given triple.
*/
String deleteQuery(final Triple triple) {

final StringBuilder builder = new StringBuilder()
.append(Field.S).append(":\"").append(ClientUtils.escapeQueryChars(asNt(triple.getSubject()))).append("\" AND ")
.append(Field.P).append(":\"").append(ClientUtils.escapeQueryChars(asNtURI(triple.getPredicate()))).append("\" AND ")
.append(Field.O).append(":\"").append(ClientUtils.escapeQueryChars(asNt(triple.getObject()))).append("\"");

if (name != null) {
builder.append(" AND ").append(Field.C).append(":\"").append(ClientUtils.escapeQueryChars(asNtURI(name))).append("\"");
}

return builder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import java.util.Iterator;

import org.gazzax.labs.jena.nosql.fwk.StorageLayerException;
import org.gazzax.labs.jena.nosql.fwk.ds.TripleIndexDAO;
import org.gazzax.labs.jena.nosql.fwk.ds.GraphDAO;
import org.gazzax.labs.jena.nosql.fwk.factory.StorageLayerFactory;
import org.gazzax.labs.jena.nosql.fwk.log.Log;
import org.gazzax.labs.jena.nosql.fwk.log.MessageCatalog;
Expand All @@ -29,38 +29,40 @@
*/
public class SolrGraph extends GraphBase {
private final static Log LOGGER = new Log(LoggerFactory.getLogger(SolrGraph.class));

private final static ExtendedIterator<Triple> EMPTY_TRIPLES_ITERATOR = WrappedIterator.createNoRemove(new ArrayList<Triple>(0).iterator());

private final int deletionBatchSize;

private final TripleIndexDAO<Triple, TripleMatch> dao;
private final Node name;

private final GraphDAO<Triple, TripleMatch> dao;

/**
* Builds a new unnamed graph with the given factory.
*
* @param factory the storage layer factory.
* @param deletionBatchSize the batch size in case of massive deletions.
*/
public SolrGraph(final StorageLayerFactory factory) {
this(null, factory);
public SolrGraph(final StorageLayerFactory factory, final int deletionBatchSize) {
this(null, factory, deletionBatchSize);
}

/**
* Builds a new named graph with the given data.
*
* @param name the graph name.
* @param factory the storage layer factory.
* @param deletionBatchSize the batch size in case of massive deletions.
*/
@SuppressWarnings("unchecked")
public SolrGraph(final Node name, final StorageLayerFactory factory) {
this.name = name;
this.dao = factory.getTripleIndexDAO();
@SuppressWarnings("unchecked")
public SolrGraph(final Node name, final StorageLayerFactory factory, final int deletionBatchSize) {
this.deletionBatchSize = deletionBatchSize;
this.dao = name != null ? factory.getGraphDAO(name) : factory.getGraphDAO();
}

@Override
public void performAdd(final Triple triple) {
try {
dao.insertTriple(triple);
dao.executePendingMutations();
// dao.executePendingMutations();
} catch (final StorageLayerException exception) {
final String message = MessageFactory.createMessage(MessageCatalog._00101_UNABLE_TO_ADD_TRIPLE, triple);
LOGGER.error(message, exception);
Expand All @@ -73,13 +75,10 @@ public void performDelete(final Triple triple) {
try {
if (triple.isConcrete()) {
dao.deleteTriple(triple);
} else if (triple.getSubject().isConcrete() &&
triple.getPredicate().isConcrete() &&
triple.getObject().isConcrete()){
} else if ( !triple.getSubject().isConcrete() && !triple.getPredicate().isConcrete() && !triple.getObject().isConcrete()){
clear();
} else {
// TODO: batch size must be configurable
dao.deleteTriples(query(triple), 1000);
dao.deleteTriples(query(triple), deletionBatchSize);
}
} catch (final StorageLayerException exception) {
final String message = MessageFactory.createMessage(MessageCatalog._00100_UNABLE_TO_DELETE_TRIPLE, triple);
Expand All @@ -89,16 +88,14 @@ public void performDelete(final Triple triple) {
}

@Override
public void clear()
{
public void clear() {
dao.clear();
getEventManager().notifyEvent(this, GraphEvents.removeAll ) ;
}

@Override
protected ExtendedIterator<Triple> graphBaseFind(final TripleMatch pattern) {
try
{
public ExtendedIterator<Triple> graphBaseFind(final TripleMatch pattern) {
try {
return WrappedIterator.createNoRemove(query(pattern));
} catch (StorageLayerException exception) {
LOGGER.error(MessageCatalog._00010_DATA_ACCESS_LAYER_FAILURE, exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
## JENA-NOSQL default configuration ##
###########################################

# SorlServer implementation
solr-server-class: "org.apache.solr.client.solrj.impl.HttpSolrServer"
solr-address: "http://127.0.0.1:8080/solr/store"
delete-batch-size: 1000
Loading

0 comments on commit 173b33a

Please sign in to comment.