Skip to content

Commit

Permalink
Fixing RDF import handling
Browse files Browse the repository at this point in the history
  • Loading branch information
namedgraph committed Oct 22, 2023
1 parent 69a7d2c commit 72f40c7
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 39 deletions.
1 change: 0 additions & 1 deletion http-tests/imports/csv-test.rq
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ CONSTRUCT
{
?item a dh:Item ;
sioc:has_container ?container ;
dh:slug ?id ;
dct:title ?string ;
rdf:value ?integer
}
Expand Down
25 changes: 16 additions & 9 deletions scripts/imports/create-rdf-import.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ print_usage()

hash turtle 2>/dev/null || { echo >&2 "turtle not on \$PATH. Need to set \$JENA_HOME. Aborting."; exit 1; }

urlencode() {
python -c 'import urllib.parse, sys; print(urllib.parse.quote(sys.argv[1], sys.argv[2]))' \
"$1" "$urlencode_safe"
}

args=()
while [[ $# -gt 0 ]]
do
Expand Down Expand Up @@ -109,6 +114,11 @@ if [ -z "$file" ] ; then
exit 1
fi

if [ -z "$slug" ] ; then
slug=$(uuidgen | tr '[:upper:]' '[:lower:]') # lowercase
fi
encoded_slug=$(urlencode "$slug")

container="${base}imports/"

args+=("-f")
Expand All @@ -117,7 +127,7 @@ args+=("-p")
args+=("$cert_password")
args+=("-t")
args+=("text/turtle") # content type
args+=("${base}importer")
args+=("${container}${encoded_slug}/")

turtle+="@prefix ldh: <https://w3id.org/atomgraph/linkeddatahub#> .\n"
turtle+="@prefix dh: <https://www.w3.org/ns/ldt/document-hierarchy#> .\n"
Expand All @@ -127,10 +137,10 @@ turtle+="@prefix sioc: <http://rdfs.org/sioc/ns#> .\n"
turtle+="_:import a ldh:RDFImport .\n"
turtle+="_:import dct:title \"${title}\" .\n"
turtle+="_:import ldh:file <${file}> .\n"
turtle+="_:item a dh:Item .\n"
turtle+="_:item foaf:primaryTopic _:import .\n"
turtle+="_:item sioc:has_container <${container}> .\n"
turtle+="_:item dct:title \"${title}\" .\n"
turtle+="<${container}${encoded_slug}/> a dh:Item .\n"
turtle+="<${container}${encoded_slug}/> foaf:primaryTopic _:import .\n"
turtle+="<${container}${encoded_slug}/> sioc:has_container <${container}> .\n"
turtle+="<${container}${encoded_slug}/> dct:title \"${title}\" .\n"

if [ -n "$graph" ] ; then
turtle+="@prefix sd: <http://www.w3.org/ns/sparql-service-description#> .\n"
Expand All @@ -143,12 +153,9 @@ fi
if [ -n "$description" ] ; then
turtle+="_:import dct:description \"${description}\" .\n"
fi
if [ -n "$slug" ] ; then
turtle+="_:item dh:slug \"${slug}\" .\n"
fi
if [ -n "$fragment" ] ; then
turtle+="_:import ldh:fragment \"${fragment}\" .\n"
fi

# submit Turtle doc to the server
echo -e "$turtle" | turtle --base="$base" | ../create-document.sh "${args[@]}"
echo -e "$turtle" | turtle --base="$base" | ../put-document.sh "${args[@]}"
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.atomgraph.client.MediaTypes;
import com.atomgraph.client.vocabulary.LDT;
import com.atomgraph.core.client.GraphStoreClient;
import com.atomgraph.core.client.LinkedDataClient;
import com.atomgraph.core.model.DatasetAccessor;
import com.atomgraph.linkeddatahub.imports.stream.RDFGraphStoreOutput;
Expand Down Expand Up @@ -110,7 +109,7 @@ public ImportExecutor(ExecutorService execService)
* @param ldc Linked Data client
* @param graphStoreClient GSP client
*/
public void start(Service service, Service adminService, String appBaseURI, LinkedDataClient ldc, GraphStoreClient graphStoreClient, CSVImport csvImport)
public void start(Service service, Service adminService, String appBaseURI, LinkedDataClient ldc, LinkedDataClient graphStoreClient, CSVImport csvImport)
{
if (csvImport == null) throw new IllegalArgumentException("CSVImport cannot be null");
if (log.isDebugEnabled()) log.debug("Submitting new import to thread pool: {}", csvImport.toString());
Expand Down Expand Up @@ -143,7 +142,7 @@ public void start(Service service, Service adminService, String appBaseURI, Link
* @param graphStoreClient GSP client
*/

public void start(Service service, Service adminService, String appBaseURI, LinkedDataClient ldc, GraphStoreClient graphStoreClient, RDFImport rdfImport)
public void start(Service service, Service adminService, String appBaseURI, LinkedDataClient ldc, LinkedDataClient graphStoreClient, RDFImport rdfImport)
{
if (rdfImport == null) throw new IllegalArgumentException("RDFImport cannot be null");
if (log.isDebugEnabled()) log.debug("Submitting new import to thread pool: {}", rdfImport.toString());
Expand Down Expand Up @@ -323,15 +322,15 @@ protected Function<Response, CSVGraphStoreOutput> getStreamRDFOutputWriter(Servi
*
* @param service SPARQL service of the application
* @param adminService SPARQL service of the admin application
* @param graphStoreClient GSP client
* @param ldc Linked Data client
* @param baseURI base URI
* @param query transformation query
* @param imp import resource
* @return function
*/
protected Function<Response, RDFGraphStoreOutput> getStreamRDFOutputWriter(Service service, Service adminService, GraphStoreClient graphStoreClient, String baseURI, Query query, RDFImport imp)
protected Function<Response, RDFGraphStoreOutput> getStreamRDFOutputWriter(Service service, Service adminService, LinkedDataClient ldc, String baseURI, Query query, RDFImport imp)
{
return new StreamRDFOutputWriter(service, adminService, graphStoreClient, baseURI, query, imp.getGraphName() != null ? imp.getGraphName().getURI() : null);
return new StreamRDFOutputWriter(service, adminService, ldc, baseURI, query, imp.getGraphName() != null ? imp.getGraphName().getURI() : null);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
*/
package com.atomgraph.linkeddatahub.imports.stream;

import com.atomgraph.core.client.GraphStoreClient;
import com.atomgraph.core.client.LinkedDataClient;
import com.atomgraph.linkeddatahub.model.Service;
import java.io.InputStream;
import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.core.Response;
import java.net.URI;
import org.apache.jena.query.Dataset;
import org.apache.jena.query.Query;
import org.apache.jena.query.QueryExecution;
Expand All @@ -41,7 +42,7 @@ public class RDFGraphStoreOutput
{

private final Service service, adminService;
private final GraphStoreClient graphStoreClient;
private final LinkedDataClient ldc;
private final String base;
private final InputStream is;
private final Query query;
Expand All @@ -53,18 +54,18 @@ public class RDFGraphStoreOutput
*
* @param service SPARQL service of the application
* @param adminService SPARQL service of the admin application
* @param graphStoreClient GSP client for RDF results
* @param ldc Linked Data client for RDF results
* @param is RDF input stream
* @param base base URI
* @param query <code>CONSTRUCT</code> transformation query or null
* @param lang RDF language
* @param graphURI named graph URI
*/
public RDFGraphStoreOutput(Service service, Service adminService, GraphStoreClient graphStoreClient, InputStream is, String base, Query query, Lang lang, String graphURI)
public RDFGraphStoreOutput(Service service, Service adminService, LinkedDataClient ldc, InputStream is, String base, Query query, Lang lang, String graphURI)
{
this.service = service;
this.adminService = adminService;
this.graphStoreClient = graphStoreClient;
this.ldc = ldc;
this.is = is;
this.base = base;
this.query = query;
Expand All @@ -76,6 +77,7 @@ public RDFGraphStoreOutput(Service service, Service adminService, GraphStoreClie
* Reads RDF and writes (possibly transformed) RDF into a named graph.
* The input is transformed if the SPARQL transformation query was provided.
* Extended SPARQL syntax is used to allow the <code>CONSTRUCT GRAPH</code> query form.
* The default graph output is ignored.
*/
public void write()
{
Expand All @@ -90,8 +92,9 @@ public void write()

dataset.listNames().forEachRemaining(graphUri ->
{
// exceptions get swallowed by the client! TO-DO: wait for completion
if (!dataset.getNamedModel(graphUri).isEmpty()) getGraphStoreClient().add(graphUri, dataset.getNamedModel(graphUri));
Model namedModel = dataset.getNamedModel(graphUri);
// exceptions get swallowed by the client? TO-DO: wait for completion
if (!namedModel.isEmpty()) getLinkedDataClient().put(URI.create(graphUri), namedModel);

// purge cache entries that include the graph URI
if (getService().getBackendProxy() != null) ban(getService().getClient(), getService().getBackendProxy(), graphUri).close();
Expand All @@ -104,7 +107,7 @@ public void write()
{
if (getGraphURI() == null) throw new IllegalStateException("Neither RDFImport query nor graph name is specified");

getGraphStoreClient().add(getGraphURI(), model); // exceptions get swallowed by the client! TO-DO: wait for completion
getLinkedDataClient().put(URI.create(getGraphURI()), model); // exceptions get swallowed by the client? TO-DO: wait for completion

// purge cache entries that include the graph URI
if (getService().getBackendProxy() != null) ban(getService().getClient(), getService().getBackendProxy(), getGraphURI()).close();
Expand Down Expand Up @@ -153,13 +156,13 @@ public Service getAdminService()
}

/**
* Returns Graph Store Protocol client.
* Returns Linked Data client.
*
* @return GSP client
* @return client object
*/
public GraphStoreClient getGraphStoreClient()
public LinkedDataClient getLinkedDataClient()
{
return graphStoreClient;
return ldc;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.atomgraph.linkeddatahub.imports.stream;

import com.atomgraph.core.MediaType;
import com.atomgraph.core.client.GraphStoreClient;
import com.atomgraph.core.client.LinkedDataClient;
import com.atomgraph.linkeddatahub.model.Service;
import java.io.File;
import java.io.FileInputStream;
Expand Down Expand Up @@ -48,7 +48,7 @@ public class StreamRDFOutputWriter implements Function<Response, RDFGraphStoreOu
private static final Logger log = LoggerFactory.getLogger(StreamRDFOutputWriter.class);

private final Service service, adminService;
private final GraphStoreClient graphStoreClient;
private final LinkedDataClient ldc;
private final String baseURI, graphURI;
private final Query query;

Expand All @@ -57,16 +57,16 @@ public class StreamRDFOutputWriter implements Function<Response, RDFGraphStoreOu
*
* @param service SPARQL service of the application
* @param adminService SPARQL service of the admin application
* @param graphStoreClient GSP client
* @param ldc GSP client
* @param baseURI base URI
* @param query transformation query or null
* @param graphURI target graph URI
*/
public StreamRDFOutputWriter(Service service, Service adminService, GraphStoreClient graphStoreClient, String baseURI, Query query, String graphURI)
public StreamRDFOutputWriter(Service service, Service adminService, LinkedDataClient ldc, String baseURI, Query query, String graphURI)
{
this.service = service;
this.adminService = adminService;
this.graphStoreClient = graphStoreClient;
this.ldc = ldc;
this.baseURI = baseURI;
this.query = query;
this.graphURI = graphURI;
Expand All @@ -92,7 +92,7 @@ public RDFGraphStoreOutput apply(Response rdfInput)
Lang lang = RDFLanguages.contentTypeToLang(mediaType.toString()); // convert media type to RDF language
if (lang == null) throw new BadRequestException("Content type '" + mediaType + "' is not an RDF media type");

RDFGraphStoreOutput output = new RDFGraphStoreOutput(getService(), getAdminService(), getGraphStoreClient(), fis, getBaseURI(), getQuery(), lang, getGraphURI());
RDFGraphStoreOutput output = new RDFGraphStoreOutput(getService(), getAdminService(), getLinkedDataClient(), fis, getBaseURI(), getQuery(), lang, getGraphURI());
output.write();
return output;
}
Expand Down Expand Up @@ -129,13 +129,13 @@ public Service getAdminService()
}

/**
* Returns the Graph Store Protocol client.
* Returns the Linked Data client.
*
* @return GSP client
* @return client object
*/
public GraphStoreClient getGraphStoreClient()
public LinkedDataClient getLinkedDataClient()
{
return graphStoreClient;
return ldc;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void rowProcessed(String[] row, ParsingContext context)
{
// exceptions get swallowed by the client? TO-DO: wait for completion
Model namedModel = rowDataset.getNamedModel(graphUri);
put(Entity.entity(namedModel, APPLICATION_NTRIPLES_TYPE), graphUri);
if (!namedModel.isEmpty()) put(Entity.entity(namedModel, APPLICATION_NTRIPLES_TYPE), graphUri);

// purge cache entries that include the graph URI
if (getService().getBackendProxy() != null) ban(getService().getClient(), getService().getBackendProxy(), graphUri).close();
Expand Down

0 comments on commit 72f40c7

Please sign in to comment.