diff --git a/conjure-cli-native/pom.xml b/conjure-cli-native/pom.xml index 1ddcb50..24b8e14 100644 --- a/conjure-cli-native/pom.xml +++ b/conjure-cli-native/pom.xml @@ -54,8 +54,8 @@ - org.aksw.jena-sparql-api - jena-sparql-api-conjure + org.aksw.jenax + jenax-conjure-core @@ -64,13 +64,8 @@ - com.beust - jcommander - - - - org.slf4j - slf4j-log4j12 + org.apache.logging.log4j + log4j-slf4j18-impl diff --git a/conjure-cli-native/src/main/java/org/aksw/conjure/cli/main/ConfigCliConjureNative.java b/conjure-cli-native/src/main/java/org/aksw/conjure/cli/main/ConfigCliConjureNative.java index 585db3c..0c65040 100644 --- a/conjure-cli-native/src/main/java/org/aksw/conjure/cli/main/ConfigCliConjureNative.java +++ b/conjure-cli-native/src/main/java/org/aksw/conjure/cli/main/ConfigCliConjureNative.java @@ -7,7 +7,7 @@ import org.aksw.jena_sparql_api.conjure.dataset.engine.ConjureConstants; import org.aksw.jena_sparql_api.conjure.dataset.engine.ConjureFormatConfig; import org.aksw.jena_sparql_api.conjure.job.api.Job; -import org.aksw.jena_sparql_api.transform.result_set.QueryExecutionTransformResult; +import org.aksw.jenax.arq.util.node.NodeTransformLib2; import org.apache.jena.rdf.model.Model; import org.apache.jena.rdf.model.ModelFactory; import org.apache.jena.riot.RDFDataMgr; @@ -19,46 +19,46 @@ @Configuration public class ConfigCliConjureNative { - + // @Bean // public Object applicationRunner() { // System.out.println("Test"); // return 1; // } - @Bean - @Autowired - public ApplicationRunner applicationRunner( - DataRef catalogDataRef, - Job job, - ConjureFormatConfig formatConfig) { - return args -> { - List datasets = MainCliConjureNative.executeJob(catalogDataRef, job, formatConfig); - - Model model = ModelFactory.createDefaultModel(); - for(DcatDataset dataset : datasets) { - Model contribModel = dataset.getModel(); - model.add(contribModel); - } + @Bean + @Autowired + public ApplicationRunner applicationRunner( + DataRef catalogDataRef, + Job job, + ConjureFormatConfig formatConfig) { + return args -> { + List datasets = MainCliConjureNative.executeJob(catalogDataRef, job, formatConfig); + + Model model = ModelFactory.createDefaultModel(); + for(DcatDataset dataset : datasets) { + Model contribModel = dataset.getModel(); + model.add(contribModel); + } + + postProcessResultModel(model, job); + + RDFDataMgr.write(System.out, model, RDFFormat.TURTLE_PRETTY); - postProcessResultModel(model, job); - - RDFDataMgr.write(System.out, model, RDFFormat.TURTLE_PRETTY); - // RDFDataMgr.write(System.out, job.getModel(), RDFFormat.TURTLE_BLOCKS); // for(DcatDataset dataset : datasets) { // RDFDataMgr.write(System.out, dataset.getModel(), RDFFormat.TURTLE_BLOCKS); // } - }; - } - - public static void postProcessResultModel(Model model, Job job) { - model.add(job.getModel()); + }; + } + + public static void postProcessResultModel(Model model, Job job) { + model.add(job.getModel()); - QueryExecutionTransformResult.applyNodeTransform( - n -> ConjureConstants.PROV_PLACEHOLDER_NODE.equals(n) ? job.asNode() : n, - model); - } + NodeTransformLib2.applyNodeTransform( + n -> ConjureConstants.PROV_PLACEHOLDER_NODE.equals(n) ? job.asNode() : n, + model); + } } diff --git a/conjure-cli-native/src/main/java/org/aksw/conjure/cli/main/MainCliConjureNative.java b/conjure-cli-native/src/main/java/org/aksw/conjure/cli/main/MainCliConjureNative.java index 04aa39e..230d217 100644 --- a/conjure-cli-native/src/main/java/org/aksw/conjure/cli/main/MainCliConjureNative.java +++ b/conjure-cli-native/src/main/java/org/aksw/conjure/cli/main/MainCliConjureNative.java @@ -36,12 +36,12 @@ import org.aksw.jena_sparql_api.http.repository.api.HttpResourceRepositoryFromFileSystem; import org.aksw.jena_sparql_api.http.repository.api.ResourceStore; import org.aksw.jena_sparql_api.http.repository.impl.HttpResourceRepositoryFromFileSystemImpl; -import org.aksw.jena_sparql_api.mapper.proxy.JenaPluginUtils; -import org.aksw.jena_sparql_api.rx.SparqlRx; -import org.aksw.jena_sparql_api.stmt.SparqlStmt; -import org.aksw.jena_sparql_api.stmt.SparqlStmtParser; -import org.aksw.jena_sparql_api.stmt.SparqlStmtParserImpl; -import org.aksw.jena_sparql_api.utils.Vars; +import org.aksw.jenax.arq.util.var.Vars; +import org.aksw.jenax.reprogen.core.JenaPluginUtils; +import org.aksw.jenax.sparql.query.rx.SparqlRx; +import org.aksw.jenax.stmt.core.SparqlStmt; +import org.aksw.jenax.stmt.core.SparqlStmtParser; +import org.aksw.jenax.stmt.core.SparqlStmtParserImpl; import org.apache.jena.graph.Node; import org.apache.jena.graph.NodeFactory; import org.apache.jena.query.Query; @@ -69,74 +69,74 @@ @SpringBootApplication public class MainCliConjureNative { - public static String URL_SCHEME_FILE = "file://"; - - - private static final Logger logger = LoggerFactory.getLogger(MainCliConjureNative.class); - - public static CommandMain cm; - - public MainCliConjureNative() { - } - - public static Map loadSources(Path basePath, Collection sources) { - Map result = new HashMap<>(); - for(String source : sources) { - Path path = resolvePath(basePath, source); - if(path != null) { - try { - byte[] content = Files.readAllBytes(path); - result.put(source, content); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } - return result; - } - - /** - * Write a sourceToContent map to physical files, and return a map with - * sourceToPath, where path is the path to the written file - * - * @param baseFolder - * @param sourceToContent - * @return - * @throws IOException - */ - public static BiMap writeFiles(Path baseFolder, Map sourceToContent) throws IOException { - BiMap result = HashBiMap.create(); - for(Entry e : sourceToContent.entrySet()) { - String source = e.getKey(); - Path tmpPath = Paths.get(source); - Path relPath = tmpPath.getFileName(); - - byte[] content = e.getValue(); - - Path absPath = baseFolder.resolve(relPath); - logger.info("Writing file " + relPath + " with " + content.length + " to " + absPath); - Files.createDirectories(absPath.getParent()); - Files.write(absPath, content, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING); - - result.put(source, absPath); - } - - return result; - } - - - public static Set toFileUris(Collection paths) { - Set result = paths.stream() - .map(MainCliConjureNative::toFileUri) - .collect(Collectors.toSet()); - - return result; - } - - public static String toFileUri(Path path) { - String result = path.toUri().toString(); - return result; - } + public static String URL_SCHEME_FILE = "file://"; + + + private static final Logger logger = LoggerFactory.getLogger(MainCliConjureNative.class); + + public static CommandMain cm; + + public MainCliConjureNative() { + } + + public static Map loadSources(Path basePath, Collection sources) { + Map result = new HashMap<>(); + for(String source : sources) { + Path path = resolvePath(basePath, source); + if(path != null) { + try { + byte[] content = Files.readAllBytes(path); + result.put(source, content); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + return result; + } + + /** + * Write a sourceToContent map to physical files, and return a map with + * sourceToPath, where path is the path to the written file + * + * @param baseFolder + * @param sourceToContent + * @return + * @throws IOException + */ + public static BiMap writeFiles(Path baseFolder, Map sourceToContent) throws IOException { + BiMap result = HashBiMap.create(); + for(Entry e : sourceToContent.entrySet()) { + String source = e.getKey(); + Path tmpPath = Paths.get(source); + Path relPath = tmpPath.getFileName(); + + byte[] content = e.getValue(); + + Path absPath = baseFolder.resolve(relPath); + logger.info("Writing file " + relPath + " with " + content.length + " to " + absPath); + Files.createDirectories(absPath.getParent()); + Files.write(absPath, content, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING); + + result.put(source, absPath); + } + + return result; + } + + + public static Set toFileUris(Collection paths) { + Set result = paths.stream() + .map(MainCliConjureNative::toFileUri) + .collect(Collectors.toSet()); + + return result; + } + + public static String toFileUri(Path path) { + String result = path.toUri().toString(); + return result; + } // public static String toFileUri(Path path) { // String result; @@ -149,20 +149,20 @@ public static String toFileUri(Path path) { // return result; // } - public static URL resolveOnClassPath(ClassLoader classLoader, String path) { - URL result = classLoader.getResource(path); - if(result != null) { - try(InputStream in = result.openStream()) { - - } catch (IOException e) { - result = null; - } - } - - return result; - } - - + public static URL resolveOnClassPath(ClassLoader classLoader, String path) { + URL result = classLoader.getResource(path); + if(result != null) { + try(InputStream in = result.openStream()) { + + } catch (IOException e) { + result = null; + } + } + + return result; + } + + // public static Path urlAsPath(Path basePath, String arg) { // Path result = // arg.startsWith(URL_SCHEME_FILE) ? Paths.get(arg.substring(URL_SCHEME_FILE.length())) : @@ -171,163 +171,163 @@ public static URL resolveOnClassPath(ClassLoader classLoader, String path) { // // return result; // } - - // We may need canonicalization to make cli arg handling and spring interop nicer - public static String canonicalizeSource(Path basePath, String arg) { - Path path = resolvePath(basePath, arg); - String result = path == null ? arg : toFileUri(path); - return result; - } - - public static Path resolvePath(Path basePath, String arg) { - - Path result = - arg.startsWith(URL_SCHEME_FILE) ? Paths.get(arg.substring(URL_SCHEME_FILE.length())) : - arg.startsWith("/") ? Paths.get(arg) : - resolveOnClassPath(MainCliConjureNative.class.getClassLoader(), arg) != null ? null : - arg.contains(":/") ? null : // URL-like arguments of any kind - basePath.resolve(arg); - - return result; - } - - public static Op loadConjureJob(String fileOrUri) { - Model model = RDFDataMgr.loadModel(fileOrUri); - List ops = model.listSubjects().mapWith(x -> JenaPluginUtils.polymorphicCast(x, Op.class)) - // .filter(op -> op.getParent()) // TODO Find the root - .toList(); - - // Expect 1 result - Op result = ops.iterator().next(); - - return result; - } - - public static void main(String[] args) throws Exception { + + // We may need canonicalization to make cli arg handling and spring interop nicer + public static String canonicalizeSource(Path basePath, String arg) { + Path path = resolvePath(basePath, arg); + String result = path == null ? arg : toFileUri(path); + return result; + } + + public static Path resolvePath(Path basePath, String arg) { + + Path result = + arg.startsWith(URL_SCHEME_FILE) ? Paths.get(arg.substring(URL_SCHEME_FILE.length())) : + arg.startsWith("/") ? Paths.get(arg) : + resolveOnClassPath(MainCliConjureNative.class.getClassLoader(), arg) != null ? null : + arg.contains(":/") ? null : // URL-like arguments of any kind + basePath.resolve(arg); + + return result; + } + + public static Op loadConjureJob(String fileOrUri) { + Model model = RDFDataMgr.loadModel(fileOrUri); + List ops = model.listSubjects().mapWith(x -> JenaPluginUtils.polymorphicCast(x, Op.class)) + // .filter(op -> op.getParent()) // TODO Find the root + .toList(); + + // Expect 1 result + Op result = ops.iterator().next(); + + return result; + } + + public static void main(String[] args) throws Exception { // if(true) { // System.out.println(MainCliConjureNative.toFileUri(Paths.get("test"))); // return; // } - - ConjureCliArgs cliArgs = ConjureCliArgs.parse(args); - - if (cliArgs.getCm().help) { - cliArgs.getJcommander().usage(); - return; - } - - SpringSourcesConfig config = ConfigConjureSparkBase.parseArgs(cliArgs); - Set sources = config.getSources(); - - SpringApplication app = new SpringApplicationBuilder() - .sources(ConfigConjureSparkBase.class, ConfigCliConjureNative.class) - .bannerMode(Banner.Mode.OFF) - .headless(false) - .web(WebApplicationType.NONE) - .build(); - - app.setSources(sources); - - try (ConfigurableApplicationContext ctx = app.run(args)) { - } - } - - - public static RDFNode clone(RDFNode rdfNode) { - Model clone = ModelFactory.createDefaultModel(); - clone.add(rdfNode.getModel()); - RDFNode result = rdfNode.inModel(clone); - return result; - } - - public static List createTasksContexts(DataRef catalogDataRef, Job job, - HttpResourceRepositoryFromFileSystem repo, - RDFFormat catalogFormat) throws Exception { - - //RDFFormat rdfFormat = formatConfig.getCatalogFormat(); - - // Create a copy of data catalogDataRef to prevent changing it - DataRef clone = JenaPluginUtils.polymorphicCast(clone(catalogDataRef), DataRef.class); - - Op catalogWorkflow = OpDataRefResource.from(clone.getModel(), clone); + + ConjureCliArgs cliArgs = ConjureCliArgs.parse(args); + + if (cliArgs.getCm().help) { + cliArgs.getJcommander().usage(); + return; + } + + SpringSourcesConfig config = ConfigConjureSparkBase.parseArgs(cliArgs); + Set sources = config.getSources(); + + SpringApplication app = new SpringApplicationBuilder() + .sources(ConfigConjureSparkBase.class, ConfigCliConjureNative.class) + .bannerMode(Banner.Mode.OFF) + .headless(false) + .web(WebApplicationType.NONE) + .build(); + + app.setSources(sources); + + try (ConfigurableApplicationContext ctx = app.run(args)) { + } + } + + + public static RDFNode clone(RDFNode rdfNode) { + Model clone = ModelFactory.createDefaultModel(); + clone.add(rdfNode.getModel()); + RDFNode result = rdfNode.inModel(clone); + return result; + } + + public static List createTasksContexts(DataRef catalogDataRef, Job job, + HttpResourceRepositoryFromFileSystem repo, + RDFFormat catalogFormat) throws Exception { + + //RDFFormat rdfFormat = formatConfig.getCatalogFormat(); + + // Create a copy of data catalogDataRef to prevent changing it + DataRef clone = JenaPluginUtils.polymorphicCast(clone(catalogDataRef), DataRef.class); + + Op catalogWorkflow = OpDataRefResource.from(clone.getModel(), clone); // String origHash = ResourceTreeUtils.createGenericHash(conjureWorkflow); // String coreHash = ResourceTreeUtils.createGenericHash(coreOp); - // Op catalogCreationWorkflow = job.getOp(); - - Function parser = SparqlStmtParser.wrapWithOptimizePrefixes( - SparqlStmtParserImpl.create(Syntax.syntaxARQ, DefaultPrefixes.prefixes, false)); - // HttpResourceRepositoryFromFileSystemImpl repo = - // HttpResourceRepositoryFromFileSystemImpl.createDefault(); - // ResourceStore cacheStore = repo.getCacheStore(); - OpExecutorDefault catalogExecutor = new OpExecutorDefault(repo, - new TaskContext(job, new HashMap<>(), new HashMap<>()), new HashMap<>(), catalogFormat); - - String queryStr = "CONSTRUCT {\n" - + " ?a ?b ?c .\n" - + " ?c ?d ?e\n" + " } {\n" - + "\n" - + " { SELECT DISTINCT ?a {\n" - + " ?a dcat:distribution [\n" - + -// " dcat:byteSize ?byteSize\n" + - " ]\n" - + " } }\n" - + "\n" - + " ?a ?b ?c\n" - + " OPTIONAL { ?c ?d ?e }\n" + "}"; - - Query dcatQuery = parser.apply(queryStr).getAsQueryStmt().getQuery(); - - List taskContexts = new ArrayList<>(); - // List inputRecords; -// try(RdfDataObject catalog = DataObjects.fromSparqlEndpoint("https://databus.dbpedia.org/repo/sparql", null, null)) { - try (RdfDataPod catalog = catalogWorkflow.accept(catalogExecutor)) { - try (RDFConnection conn = catalog.openConnection()) { - - List catalogRecords = SparqlRx.execConstructGrouped(conn, Vars.a, dcatQuery) - .map(RDFNode::asResource).toList().blockingGet(); - - // For every input record is a dcat entry, assign an anonymous dataref - for (Resource catalogRecord : catalogRecords) { - Map nameToDataRef = new HashMap<>(); - - Query q = parser.apply("SELECT DISTINCT ?x { ?x dcat:distribution [] }").getQuery(); - Model m = catalogRecord.getModel(); - - // QueryExecution qe = - - List dcatDataRefs = SparqlRx.execSelect(() -> QueryExecutionFactory.create(q, m)) - .map(qs -> qs.get("x")).map(RDFNode::asResource).toList().blockingGet(); - - int i = 0; - for (Resource r : dcatDataRefs) { - Model xxmodel = ModelFactory.createDefaultModel(); - xxmodel.add(r.getModel()); - r = r.inModel(xxmodel); - - DataRefDcat dr = DataRefDcat.create(xxmodel, r); - Op drOp = OpDataRefResource.from(xxmodel, dr); - - // TODO Add option whether to log the input record - // RDFDataMgr.write(System.err, dr.getModel(), RDFFormat.TURTLE_PRETTY); - - nameToDataRef.put("unnamedDataRef" + (i++), drOp); - } - - logger.info("Registered data refs for input " + catalogRecord + " are: " + nameToDataRef); - Map nameToModel = new HashMap<>(); - nameToModel.put("http://input", catalogRecord.getModel()); - - TaskContext taskContext = new TaskContext(catalogRecord, nameToDataRef, nameToModel); - taskContexts.add(taskContext); - // Note, that the dcat ref query was run on the inputContext models - // So the following assertion is assumed to hold: - // dcatDataRef.getModel() == inputRecord.getModel() - } - - logger.info("Created " + taskContexts.size() + " task contexts"); + // Op catalogCreationWorkflow = job.getOp(); + + Function parser = SparqlStmtParser.wrapWithOptimizePrefixes( + SparqlStmtParserImpl.create(Syntax.syntaxARQ, DefaultPrefixes.get(), false)); + // HttpResourceRepositoryFromFileSystemImpl repo = + // HttpResourceRepositoryFromFileSystemImpl.createDefault(); + // ResourceStore cacheStore = repo.getCacheStore(); + OpExecutorDefault catalogExecutor = new OpExecutorDefault(repo, + new TaskContext(job, new HashMap<>(), new HashMap<>()), new HashMap<>(), catalogFormat); + + String queryStr = "CONSTRUCT {\n" + + " ?a ?b ?c .\n" + + " ?c ?d ?e\n" + " } {\n" + + "\n" + + " { SELECT DISTINCT ?a {\n" + + " ?a dcat:distribution [\n" + + +// " dcat:byteSize ?byteSize\n" + + " ]\n" + + " } }\n" + + "\n" + + " ?a ?b ?c\n" + + " OPTIONAL { ?c ?d ?e }\n" + "}"; + + Query dcatQuery = parser.apply(queryStr).getAsQueryStmt().getQuery(); + + List taskContexts = new ArrayList<>(); + // List inputRecords; +// try(RdfDataObject catalog = DataObjects.fromSparqlEndpoint("https://databus.dbpedia.org/repo/sparql", null, null)) { + try (RdfDataPod catalog = catalogWorkflow.accept(catalogExecutor)) { + try (RDFConnection conn = catalog.openConnection()) { + + List catalogRecords = SparqlRx.execConstructGrouped(conn, dcatQuery, Vars.a) + .map(RDFNode::asResource).toList().blockingGet(); + + // For every input record is a dcat entry, assign an anonymous dataref + for (Resource catalogRecord : catalogRecords) { + Map nameToDataRef = new HashMap<>(); + + Query q = parser.apply("SELECT DISTINCT ?x { ?x dcat:distribution [] }").getQuery(); + Model m = catalogRecord.getModel(); + + // QueryExecution qe = + + List dcatDataRefs = SparqlRx.execSelect(() -> QueryExecutionFactory.create(q, m)) + .map(qs -> qs.get("x")).map(RDFNode::asResource).toList().blockingGet(); + + int i = 0; + for (Resource r : dcatDataRefs) { + Model xxmodel = ModelFactory.createDefaultModel(); + xxmodel.add(r.getModel()); + r = r.inModel(xxmodel); + + DataRefDcat dr = DataRefDcat.create(xxmodel, r); + Op drOp = OpDataRefResource.from(xxmodel, dr); + + // TODO Add option whether to log the input record + // RDFDataMgr.write(System.err, dr.getModel(), RDFFormat.TURTLE_PRETTY); + + nameToDataRef.put("unnamedDataRef" + (i++), drOp); + } + + logger.info("Registered data refs for input " + catalogRecord + " are: " + nameToDataRef); + Map nameToModel = new HashMap<>(); + nameToModel.put("http://input", catalogRecord.getModel()); + + TaskContext taskContext = new TaskContext(catalogRecord, nameToDataRef, nameToModel); + taskContexts.add(taskContext); + // Note, that the dcat ref query was run on the inputContext models + // So the following assertion is assumed to hold: + // dcatDataRef.getModel() == inputRecord.getModel() + } + + logger.info("Created " + taskContexts.size() + " task contexts"); // if(true) { // return; @@ -340,36 +340,36 @@ public static List createTasksContexts(DataRef catalogDataRef, Job // .map(qs -> qs.get("o")) // .map(RDFNode::toString) // .toList() -// .blockingGet(); - } - } - - return taskContexts; - - } - - public static List executeJob( - DataRef catalogDataRef, - Job job, - ConjureFormatConfig formatConfig) throws Exception { - - RDFFormat catalogFormat = formatConfig.getCatalogFormat(); - - // Function parser = - // SparqlStmtParserImpl.create(Syntax.syntaxARQ, DefaultPrefixes.prefixes, - // false); - HttpResourceRepositoryFromFileSystemImpl repo = HttpResourceRepositoryFromFileSystemImpl.createDefault(); - ResourceStore cacheStore = repo.getCacheStore(); - // OpExecutorDefault catalogExecutor = new OpExecutorDefault(repo, new - // TaskContext(job, new HashMap<>(), new HashMap<>())); - - List taskContexts = createTasksContexts(catalogDataRef, job, repo, catalogFormat); - - List result = taskContexts.stream() - .map(taskContext -> ExecutionUtils.executeJob(job, taskContext, repo, cacheStore, formatConfig)) - .collect(Collectors.toList()); - return result; - } +// .blockingGet(); + } + } + + return taskContexts; + + } + + public static List executeJob( + DataRef catalogDataRef, + Job job, + ConjureFormatConfig formatConfig) throws Exception { + + RDFFormat catalogFormat = formatConfig.getCatalogFormat(); + + // Function parser = + // SparqlStmtParserImpl.create(Syntax.syntaxARQ, DefaultPrefixes.prefixes, + // false); + HttpResourceRepositoryFromFileSystemImpl repo = HttpResourceRepositoryFromFileSystemImpl.createDefault(); + ResourceStore cacheStore = repo.getCacheStore(); + // OpExecutorDefault catalogExecutor = new OpExecutorDefault(repo, new + // TaskContext(job, new HashMap<>(), new HashMap<>())); + + List taskContexts = createTasksContexts(catalogDataRef, job, repo, catalogFormat); + + List result = taskContexts.stream() + .map(taskContext -> ExecutionUtils.executeJob(job, taskContext, repo, cacheStore, formatConfig)) + .collect(Collectors.toList()); + return result; + } // public static List executeJob(List taskContexts, Job job, // HttpResourceRepositoryFromFileSystem repo, ResourceStore cacheStore) throws Exception { @@ -391,53 +391,53 @@ public static List executeJob( //// RDFDataMgr.write(System.out, closure.getModel(), RDFFormat.TURTLE_PRETTY); //// } // } - - public static Path resolveLocalUncFileUrl(String str, Set localHostNames) { - Path result = null; - if(str.startsWith(URL_SCHEME_FILE)) { - URL url = null; - try { - url = new URL(str); - } catch (MalformedURLException e) { - logger.warn("Invalid URL", e); - } - - if(url != null) { - String host = url.getHost(); - if(localHostNames.contains(host)) { - String pathStr = url.getPath(); - result = Paths.get(pathStr); - } - } - } - - return result; - } - - public static Path stringToPath(String str) { - Path result = str.startsWith(URL_SCHEME_FILE) ? Paths.get(str.substring(URL_SCHEME_FILE.length())) : null; - return result; - } - - public static NodeTransform asNodeTransform(Function pathToIri) { - return o -> { - String str = o.isURI() ? o.getURI() : o.isLiteral() ? o.getLiteralLexicalForm() : null; - - Path path = str == null ? null : stringToPath(str); - String iri = path == null ? null : pathToIri.apply(path); - Node r = iri == null ? o : NodeFactory.createURI(iri); - return r; - }; - } + + public static Path resolveLocalUncFileUrl(String str, Set localHostNames) { + Path result = null; + if(str.startsWith(URL_SCHEME_FILE)) { + URL url = null; + try { + url = new URL(str); + } catch (MalformedURLException e) { + logger.warn("Invalid URL", e); + } + + if(url != null) { + String host = url.getHost(); + if(localHostNames.contains(host)) { + String pathStr = url.getPath(); + result = Paths.get(pathStr); + } + } + } + + return result; + } + + public static Path stringToPath(String str) { + Path result = str.startsWith(URL_SCHEME_FILE) ? Paths.get(str.substring(URL_SCHEME_FILE.length())) : null; + return result; + } + + public static NodeTransform asNodeTransform(Function pathToIri) { + return o -> { + String str = o.isURI() ? o.getURI() : o.isLiteral() ? o.getLiteralLexicalForm() : null; + + Path path = str == null ? null : stringToPath(str); + String iri = path == null ? null : pathToIri.apply(path); + Node r = iri == null ? o : NodeFactory.createURI(iri); + return r; + }; + } // public void replaceNs(Model model, Function pathToIri) { // StmtIterator it = model.listStatements(); // while(it.hasNext()) { // Statement stmt = it.next(); // RDFNode o = stmt.getObject(); -// +// // } -// +// // for(DcatDataset resultDataset : resultDatasets) { // DcatDataset closure = resultDataset.inModel(ResourceUtils.reachableClosure(resultDataset)).as(DcatDataset.class); // RDFDataMgrEx.execSparql(closure.getModel(), "replacens.sparql", ImmutableMap.builder() diff --git a/conjure-cli-spark/pom.xml b/conjure-cli-spark/pom.xml index e324b04..3ae8c55 100644 --- a/conjure-cli-spark/pom.xml +++ b/conjure-cli-spark/pom.xml @@ -294,11 +294,6 @@ - - org.scalastyle - scalastyle-maven-plugin - - @@ -307,11 +302,6 @@ maven-compiler-plugin - - net.alchim31.maven - scala-maven-plugin - - org.apache.maven.plugins maven-surefire-plugin @@ -363,60 +353,6 @@ - - - net.alchim31.maven - scala-maven-plugin - 4.0.2 - - - - add-source - compile - testCompile - - - ${scala.version} - incremental - true - - -target:jvm-1.8 - -unchecked - -deprecation - -feature - -dependencyfile - ${project.build.directory}/.scala_dependencies - -Xmax-classfile-name - 128 - - - -Xss10m - -Xms1024m - -Xmx1024m - -XX:PermSize=${PermGen} - -XX:MaxPermSize=${MaxPermGen} - -XX:ReservedCodeCacheSize=${CodeCacheSize} - - - - - - - - - - - - - attach-javadocs - - doc-jar - - - - - - maven-compiler-plugin 3.8.1 @@ -462,27 +398,6 @@ 2.20 - - - org.scalatest - scalatest-maven-plugin - 2.0.0 - - net.sansa_stack.test.conformance.IntegrationTestSuite - ${project.build.directory}/surefire-reports - . - SANSA-RDF-Tests.txt - - - - test - - test - - - - - @@ -520,67 +435,6 @@ 2.5 - - org.apache.maven.plugins - maven-enforcer-plugin - 3.0.0-M1 - - - enforce-versions - - enforce - - - - - ${maven.version} - - - - - - - - org.jboss.netty - org.codehaus.groovy - - true - - - - - - - - - org.codehaus.mojo - build-helper-maven-plugin - 3.0.0 - - - add-source - generate-sources - - add-source - - - - src/main/scala_${scala.binary.version} - - - - - - - com.versioneye - versioneye-maven-plugin - 3.11.4 - - org.apache.maven.plugins @@ -669,64 +523,12 @@ - - org.codehaus.mojo - cobertura-maven-plugin - 2.7 - - xml - 256m - - true - - - - - - org.apache.maven.plugins - maven-pmd-plugin - 3.8 - - true - utf-8 - 100 - 1.8 - - - - org.codehaus.mojo jdepend-maven-plugin 2.0 - - - org.scalastyle - scalastyle-maven-plugin - 1.0.0 - - false - true - true - false - ${project.basedir}/src/main/scala - ${project.basedir}/src/test/scala - - ${scalastyle.config.path} - ${project.basedir}/scalastyle-output.xml - UTF-8 - - - - - check - - - - - diff --git a/conjure-cli-spark/src/main/java/net/sansa_stack/query/spark/conjure/kryo/SerializerTaskContext.java b/conjure-cli-spark/src/main/java/net/sansa_stack/query/spark/conjure/kryo/SerializerTaskContext.java index 5ab9584..47eccd9 100644 --- a/conjure-cli-spark/src/main/java/net/sansa_stack/query/spark/conjure/kryo/SerializerTaskContext.java +++ b/conjure-cli-spark/src/main/java/net/sansa_stack/query/spark/conjure/kryo/SerializerTaskContext.java @@ -6,7 +6,7 @@ import org.aksw.jena_sparql_api.conjure.dataset.algebra.Op; import org.aksw.jena_sparql_api.conjure.dataset.engine.TaskContext; -import org.aksw.jena_sparql_api.mapper.proxy.JenaPluginUtils; +import org.aksw.jenax.reprogen.core.JenaPluginUtils; import org.apache.jena.rdf.model.Model; import org.apache.jena.rdf.model.Resource; @@ -16,34 +16,34 @@ import com.esotericsoftware.kryo.io.Output; public class SerializerTaskContext - extends Serializer + extends Serializer { - @Override - public void write(Kryo kryo, Output output, TaskContext ctx) { - kryo.writeClassAndObject(output, ctx.getInputRecord()); - - Map tmpDataRefMap = ctx.getDataRefMapping().entrySet().stream() - .collect(Collectors.toMap(Entry::getKey, e -> e.getValue().asResource())); - - kryo.writeClassAndObject(output, tmpDataRefMap); - kryo.writeClassAndObject(output, ctx.getCtxModels()); - } - - @Override - public TaskContext read(Kryo kryo, Input input, Class type) { - Resource inputRecord = (Resource)kryo.readClassAndObject(input); - @SuppressWarnings("unchecked") - Map tmpDataRefMap = (Map)kryo.readClassAndObject(input); - Map dataRefMap = tmpDataRefMap.entrySet().stream() - .collect(Collectors.toMap(Entry::getKey, e -> JenaPluginUtils.polymorphicCast(e.getValue(), Op.class))); - - @SuppressWarnings("unchecked") - Map cxtModels = (Map)kryo.readClassAndObject(input); - - TaskContext result = new TaskContext(inputRecord, dataRefMap, cxtModels); - - return result; - } - + @Override + public void write(Kryo kryo, Output output, TaskContext ctx) { + kryo.writeClassAndObject(output, ctx.getInputRecord()); + + Map tmpDataRefMap = ctx.getDataRefMapping().entrySet().stream() + .collect(Collectors.toMap(Entry::getKey, e -> e.getValue().asResource())); + + kryo.writeClassAndObject(output, tmpDataRefMap); + kryo.writeClassAndObject(output, ctx.getCtxModels()); + } + + @Override + public TaskContext read(Kryo kryo, Input input, Class type) { + Resource inputRecord = (Resource)kryo.readClassAndObject(input); + @SuppressWarnings("unchecked") + Map tmpDataRefMap = (Map)kryo.readClassAndObject(input); + Map dataRefMap = tmpDataRefMap.entrySet().stream() + .collect(Collectors.toMap(Entry::getKey, e -> JenaPluginUtils.polymorphicCast(e.getValue(), Op.class))); + + @SuppressWarnings("unchecked") + Map cxtModels = (Map)kryo.readClassAndObject(input); + + TaskContext result = new TaskContext(inputRecord, dataRefMap, cxtModels); + + return result; + } + } diff --git a/conjure-datasources/pom.xml b/conjure-datasources/pom.xml new file mode 100644 index 0000000..e0ca952 --- /dev/null +++ b/conjure-datasources/pom.xml @@ -0,0 +1,69 @@ + + + 4.0.0 + + conjure-datasources + jar + + + org.aksw.conjure + conjure-parent + 1.0.1-SNAPSHOT + + + + + org.aksw.jenax + jenax-arq-connection-core + + + + org.aksw.jenax + jenax-arq-plugins-service-vfs + + + + org.aksw.difs + difs-system + + + + net.sansa-stack + sansa-spark-jena-java_2.12 + + + + org.apache.jena + jena-tdb2 + + + + + + + + junit + junit + test + + + + + + + + + + + + + + diff --git a/conjure-datasources/src/main/java/org/aksw/conjure/datasource/JenaPluginConjureDataSources.java b/conjure-datasources/src/main/java/org/aksw/conjure/datasource/JenaPluginConjureDataSources.java new file mode 100644 index 0000000..2115b36 --- /dev/null +++ b/conjure-datasources/src/main/java/org/aksw/conjure/datasource/JenaPluginConjureDataSources.java @@ -0,0 +1,31 @@ +package org.aksw.conjure.datasource; + +import org.aksw.jenax.arq.datasource.RdfDataSourceFactoryRegistry; +import org.aksw.jenax.arq.datasource.RdfDataSourceFactoryRemote; +import org.apache.jena.sys.JenaSubsystemLifecycle; + +public class JenaPluginConjureDataSources + implements JenaSubsystemLifecycle +{ + public void start() { + init(); + } + + @Override + public void stop() { + } + + + public static void init() { + addDefaults(RdfDataSourceFactoryRegistry.get()); + } + + public static RdfDataSourceFactoryRegistry addDefaults(RdfDataSourceFactoryRegistry registry) { + registry.putFactory("mem", new RdfDataSourceFactoryMem()); + registry.putFactory("tdb2", new RdfDataSourceFactoryTdb2()); + registry.putFactory("remote", new RdfDataSourceFactoryRemote()); + registry.putFactory("difs", new RdfDataSourceFactoryDifs()); + + return registry; + } +} diff --git a/conjure-datasources/src/main/java/org/aksw/conjure/datasource/RdfDataSourceDecoratorSansa.java b/conjure-datasources/src/main/java/org/aksw/conjure/datasource/RdfDataSourceDecoratorSansa.java new file mode 100644 index 0000000..d2eb72c --- /dev/null +++ b/conjure-datasources/src/main/java/org/aksw/conjure/datasource/RdfDataSourceDecoratorSansa.java @@ -0,0 +1,76 @@ +package org.aksw.conjure.datasource; + +import java.util.Map; + +import org.aksw.jenax.arq.connection.core.RDFLinkAdapterEx; +import org.aksw.jenax.arq.connection.link.RDFLinkDelegateWithWorkerThread; +import org.aksw.jenax.arq.connection.link.RDFLinkUtils; +import org.aksw.jenax.arq.datasource.RdfDataSourceDecorator; +import org.aksw.jenax.connection.datasource.RdfDataSource; +import org.aksw.jenax.connection.datasource.RdfDataSourceDelegateBase; +import org.apache.hadoop.conf.Configuration; +import org.apache.jena.rdfconnection.RDFConnection; +import org.apache.jena.rdflink.LinkDatasetGraph; +import org.apache.jena.rdflink.RDFConnectionAdapter; +import org.apache.jena.rdflink.RDFLink; +import org.apache.jena.rdflink.RDFLinkModular; + +import net.sansa_stack.spark.io.rdf.loader.LinkDatasetGraphSansa; + +public class RdfDataSourceDecoratorSansa + implements RdfDataSourceDecorator +{ + public static Configuration createDefaultHadoopConfiguration() { + Configuration conf = new Configuration(false); + conf.set("fs.defaultFS", "file:///"); + return conf; + } + + @Override + public RdfDataSource decorate(RdfDataSource dataSource, Map config) { + // RdfDataSourceSpecBasic spec = RdfDataSourceSpecBasicFromMap.wrap(config); + + RdfDataSource result = new RdfDataSourceDelegateBase(dataSource) { + @Override + public org.apache.jena.rdfconnection.RDFConnection getConnection() { + RDFConnection rawConn = dataSource.getConnection(); + // RDFLink queryLink = RDFLinkAdapterEx.adapt(rawConn); + + + // RDFConnection conn = RDFConnectionAdapter.adapt(RDFLinkDelegateWithWorkerThread.wrap(RDFLinkAdapterEx.adapt(connx))); + + // If true then the graphstore LOAD action may acquire multiple update connections for the INSERT requests + // Multiple concurrent update transaction are prone to deadlocks + + RDFLink rawUpdateLink = RDFLinkAdapterEx.adapt(rawConn); + + boolean enforceSameThreadOnLink = true; + RDFLink updateLink = enforceSameThreadOnLink + ? RDFLinkDelegateWithWorkerThread.wrap(rawUpdateLink) + : rawUpdateLink; + + + boolean allowMultipleConnections = false; + + LinkDatasetGraph linkDg; + if (allowMultipleConnections) { + linkDg = LinkDatasetGraphSansa.create(createDefaultHadoopConfiguration(), () -> RDFLinkAdapterEx.adapt(dataSource.getConnection())); + } else { + linkDg = LinkDatasetGraphSansa.create(createDefaultHadoopConfiguration(), () -> new RDFLinkAdapterEx(RDFConnectionAdapter.adapt(updateLink)) { + @Override + public void close() { + // noop as we reuse the primary connection - the primary one has to be closed + } + }); + } + + RDFConnection r = RDFConnectionAdapter.adapt( + RDFLinkUtils.wrapWithLoadViaLinkDatasetGraph(new RDFLinkModular(updateLink, updateLink, linkDg))); + return r; + } + }; + + return result; + } + +} diff --git a/conjure-datasources/src/main/java/org/aksw/conjure/datasource/RdfDataSourceFactoryDifs.java b/conjure-datasources/src/main/java/org/aksw/conjure/datasource/RdfDataSourceFactoryDifs.java new file mode 100644 index 0000000..70ac341 --- /dev/null +++ b/conjure-datasources/src/main/java/org/aksw/conjure/datasource/RdfDataSourceFactoryDifs.java @@ -0,0 +1,128 @@ +package org.aksw.conjure.datasource; + +import java.io.Closeable; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Map; +import java.util.Map.Entry; + +import org.aksw.commons.io.util.FileUtils; +import org.aksw.commons.io.util.PathUtils; +import org.aksw.commons.io.util.symlink.SymbolicLinkStrategies; +import org.aksw.commons.util.exception.FinallyAll; +import org.aksw.difs.builder.DifsFactory; +import org.aksw.difs.system.domain.StoreDefinition; +import org.aksw.jena_sparql_api.arq.service.vfs.ServiceExecutorFactoryRegistratorVfs; +import org.aksw.jenax.arq.datasource.RdfDataSourceFactory; +import org.aksw.jenax.arq.datasource.RdfDataSourceFromDataset; +import org.aksw.jenax.arq.datasource.RdfDataSourceSpecBasic; +import org.aksw.jenax.arq.datasource.RdfDataSourceSpecBasicFromMap; +import org.aksw.jenax.arq.engine.quad.RDFConnectionFactoryQuadForm; +import org.aksw.jenax.connection.datasource.RdfDataSource; +import org.apache.jena.query.ARQ; +import org.apache.jena.query.Dataset; +import org.apache.jena.rdf.model.ModelFactory; +import org.apache.jena.sparql.util.Context; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RdfDataSourceFactoryDifs + implements RdfDataSourceFactory +{ + private static final Logger logger = LoggerFactory.getLogger(RdfDataSourceFactoryDifs.class); + + @Override + public RdfDataSource create(Map config) throws Exception { + + RdfDataSourceSpecBasic spec = RdfDataSourceSpecBasicFromMap.wrap(config); + + if (spec.getLocation() == null) { + throw new IllegalArgumentException("Difs engine requires the location of a store config file"); + } + + Entry fsInfo = PathUtils.resolveFsAndPath(spec.getLocationContext(), spec.getLocation()); + Path confFile = fsInfo.getKey(); + + boolean canWrite = confFile.getFileSystem().equals(FileSystems.getDefault()); + + Context cxt = ARQ.getContext().copy(); + ServiceExecutorFactoryRegistratorVfs.register(cxt); + + + // If the config does not exist then delete it upon completion + + Path basePath = confFile.getParent(); + if (basePath == null) { + throw new IllegalArgumentException("Location must be a file"); + } + + // TODO Get default paths from difs factory to ensure consistency + Path dftStorePath = basePath.resolve("store"); + Path dftIndexPath = basePath.resolve("index"); + + boolean deleteWhenDone = + Boolean.TRUE.equals(spec.isAutoDeleteIfCreated()) + && canWrite + && !Files.exists(confFile) && !Files.exists(dftStorePath) && !Files.exists(dftIndexPath); + + if (deleteWhenDone) { + logger.info(String.format("Creating temporary difs store with config file %s (files will be deleted when done)", confFile)); + } +// else { +// logger.info(String.format("Connecting to existing difs store using config %s", confFile)); +// } + + Path dftLocksPath = basePath.resolve("locks"); + Path dftTxnsPath = basePath.resolve("txns"); + + // Make sure we don't accidently delete possibly unrelated locks / txns folders (better safe than sorry) + if (deleteWhenDone) { + for (Path validation : Arrays.asList(dftLocksPath, dftTxnsPath) ) { + if (Files.exists(validation)) { + throw new IllegalStateException("Neither store/index folders nor config file found but either orphaned or unrelated file " + validation + " existed."); + } + } + } + + Path ancestorPath = FileUtils.getFirstExistingAncestor(confFile); + Files.createDirectories(basePath); + + // Path createDirs = ancestorPath.relativize(basePath); + + + + StoreDefinition defaultDefinition = ModelFactory.createDefaultModel().createResource().as(StoreDefinition.class) + .setStorePath("store") + .setIndexPath("index") + .setAllowEmptyGraphs(true); + + Dataset dataset = DifsFactory.newInstance() + .setStoreDefinition(defaultDefinition) + .setUseJournal(canWrite) + .setSymbolicLinkStrategy(SymbolicLinkStrategies.FILE) + .setConfigFile(confFile) + .setCreateIfNotExists(true) + .setMaximumNamedGraphCacheSize(10000) + .connectAsDataset(); + + RdfDataSource result = RdfDataSourceFromDataset.create(dataset, + ds -> RDFConnectionFactoryQuadForm.connect(ds, cxt), () -> { + if (deleteWhenDone) { + logger.info(String.format("Deleting difs files based at %s", basePath)); + FinallyAll.run( + () -> FileUtils.deleteRecursivelyIfExists(dftIndexPath), + () -> FileUtils.deleteRecursivelyIfExists(dftStorePath), + () -> FileUtils.deleteRecursivelyIfExists(dftTxnsPath), + () -> FileUtils.deleteRecursivelyIfExists(dftLocksPath), + () -> Files.deleteIfExists(confFile), + () -> FileUtils.deleteEmptyFolders(basePath, ancestorPath, false), + () -> fsInfo.getValue().close() + ); + } + }); + + return result; + } +} \ No newline at end of file diff --git a/conjure-datasources/src/main/java/org/aksw/conjure/datasource/RdfDataSourceFactoryMem.java b/conjure-datasources/src/main/java/org/aksw/conjure/datasource/RdfDataSourceFactoryMem.java new file mode 100644 index 0000000..4826503 --- /dev/null +++ b/conjure-datasources/src/main/java/org/aksw/conjure/datasource/RdfDataSourceFactoryMem.java @@ -0,0 +1,39 @@ +package org.aksw.conjure.datasource; + +import java.util.Map; + +import org.aksw.jena_sparql_api.arq.service.vfs.ServiceExecutorFactoryRegistratorVfs; +import org.aksw.jenax.arq.connection.dataset.DatasetRDFConnectionFactory; +import org.aksw.jenax.arq.connection.dataset.DatasetRDFConnectionFactoryBuilder; +import org.aksw.jenax.arq.datasource.RdfDataSourceFactory; +import org.aksw.jenax.arq.datasource.RdfDataSourceFromDataset; +import org.aksw.jenax.arq.datasource.RdfDataSourceSpecBasic; +import org.aksw.jenax.arq.datasource.RdfDataSourceSpecBasicFromMap; +import org.aksw.jenax.connection.datasource.RdfDataSource; +import org.apache.jena.query.ARQ; +import org.apache.jena.query.DatasetFactory; +import org.apache.jena.sparql.util.Context; + +public class RdfDataSourceFactoryMem + implements RdfDataSourceFactory +{ + @Override + public RdfDataSource create(Map config) { + RdfDataSourceSpecBasic spec = RdfDataSourceSpecBasicFromMap.wrap(config); + if (spec.getLocation() != null) { + throw new IllegalArgumentException("In-Memory data source does not accept a location."); + } + + Context cxt = ARQ.getContext().copy(); + ServiceExecutorFactoryRegistratorVfs.register(cxt); + + DatasetRDFConnectionFactory connector = DatasetRDFConnectionFactoryBuilder.create() + .setDefaultQueryEngineFactoryProvider() + .setDefaultUpdateEngineFactoryProvider() + .setContext(cxt) + .build(); + + RdfDataSource result = RdfDataSourceFromDataset.create(DatasetFactory.create(), connector::connect, null); + return result; + } +} \ No newline at end of file diff --git a/conjure-datasources/src/main/java/org/aksw/conjure/datasource/RdfDataSourceFactoryTdb2.java b/conjure-datasources/src/main/java/org/aksw/conjure/datasource/RdfDataSourceFactoryTdb2.java new file mode 100644 index 0000000..4d8d278 --- /dev/null +++ b/conjure-datasources/src/main/java/org/aksw/conjure/datasource/RdfDataSourceFactoryTdb2.java @@ -0,0 +1,117 @@ +package org.aksw.conjure.datasource; + +import java.io.Closeable; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Map; +import java.util.Map.Entry; + +import org.aksw.commons.io.util.PathUtils; +import org.aksw.jenax.arq.datasource.RdfDataSourceFactory; +import org.aksw.jenax.arq.datasource.RdfDataSourceFromDataset; +import org.aksw.jenax.arq.datasource.RdfDataSourceSpecBasic; +import org.aksw.jenax.arq.datasource.RdfDataSourceSpecBasicFromMap; +import org.aksw.jenax.connection.datasource.RdfDataSource; +import org.apache.jena.dboe.base.file.Location; +import org.apache.jena.query.Dataset; +import org.apache.jena.rdfconnection.RDFConnection; +import org.apache.jena.tdb2.TDB2Factory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.StandardSystemProperty; +import com.google.common.io.MoreFiles; + +public class RdfDataSourceFactoryTdb2 + implements RdfDataSourceFactory +{ + private static final Logger logger = LoggerFactory.getLogger(RdfDataSourceFactoryTdb2.class); + + + @Override + public RdfDataSource create(Map config) throws Exception { + RdfDataSource result; + + RdfDataSourceSpecBasic spec = RdfDataSourceSpecBasicFromMap.wrap(config); + Entry fsInfo = PathUtils.resolveFsAndPath(spec.getLocationContext(), spec.getLocation()); + + Path dbPath = fsInfo == null ? null : fsInfo.getKey(); + Closeable fsCloseAction = fsInfo == null ? () -> {} : fsInfo.getValue(); + + + boolean createdDbDir = false; + + if (dbPath == null) { + String tmpDirStr = spec.getTempDir(); + if (tmpDirStr == null) { + tmpDirStr = StandardSystemProperty.JAVA_IO_TMPDIR.value(); + } + + if (tmpDirStr == null) { + throw new IllegalStateException("Temp dir neither specified nor obtainable from java.io.tmpdir"); + } + + Path tmpDir = Paths.get(tmpDirStr); + dbPath = Files.createTempDirectory(tmpDir, "sparql-integrate-tdb2-").toAbsolutePath(); + createdDbDir = true; + } else { + dbPath = dbPath.toAbsolutePath(); + if (!Files.exists(dbPath)) { + Files.createDirectories(dbPath); + createdDbDir = true; + } + } + + Path finalDbPath = dbPath; + Closeable deleteAction; + if (createdDbDir) { + if (Boolean.TRUE.equals(spec.isAutoDeleteIfCreated())) { + logger.info("Created new directory (its content will deleted when done): " + finalDbPath); + deleteAction = () -> { + logger.info("Deleting created directory: " + finalDbPath); + MoreFiles.deleteRecursively(finalDbPath); + }; + } else { + logger.info("Created new directory (will be kept after done): " + finalDbPath); + deleteAction = () -> {}; + } + } else { + logger.warn("Folder already existed - delete action disabled: " + finalDbPath); + deleteAction = () -> {}; + } + + // Set up a partial close action because connecting to the db may yet fail + Closeable partialCloseAction = () -> { + try { + deleteAction.close(); + } finally { + fsCloseAction.close(); + } + }; + + Location location = Location.create(finalDbPath); + try { + Dataset dataset = TDB2Factory.connectDataset(location); + + logger.info("Connecting to TDB2 database in folder " + finalDbPath); + Closeable finalDeleteAction = () -> { + try { + dataset.close(); + } finally { + partialCloseAction.close(); + } + }; + + result = RdfDataSourceFromDataset.create( + dataset, + RDFConnection::connect, + finalDeleteAction); + } catch (Exception e) { + partialCloseAction.close(); + throw new RuntimeException(e); + } + + return result; + } +} \ No newline at end of file diff --git a/conjure-datasources/src/main/resources/META-INF/services/org.apache.jena.sys.JenaSubsystemLifecycle b/conjure-datasources/src/main/resources/META-INF/services/org.apache.jena.sys.JenaSubsystemLifecycle new file mode 100644 index 0000000..bf76510 --- /dev/null +++ b/conjure-datasources/src/main/resources/META-INF/services/org.apache.jena.sys.JenaSubsystemLifecycle @@ -0,0 +1 @@ +org.aksw.conjure.datasource.JenaPluginConjureDataSources diff --git a/pom.xml b/pom.xml index e65df0b..47d00cb 100644 --- a/pom.xml +++ b/pom.xml @@ -15,6 +15,7 @@ https://github.com/SmartDataAnalytics/Conjure + conjure-datasources conjure-cli-native conjure-cli-spark conjure-debian-cli @@ -53,13 +54,18 @@ - 3.14.0 - 1-SNAPSHOT + 4.3.1 + 4.3.1-1-SNAPSHOT - ${jena.version}-${jena-sparql-api.subversion} + 0.8.0-RC2-SNAPSHOT + 1.0.0-SNAPSHOT + 1.8.0-beta4 + 2.16.0 + 4.13 2.2.1.RELEASE + - 1.8 - 1.8 + 11 http://java.sun.com/javase/8/docs/api/ UTF-8 UTF-8 @@ -82,40 +87,62 @@ - + - - + + org.aksw.jenax + jenax-arq-connection-core + ${jenax.version} + + + + org.aksw.jenax + jenax-arq-plugins-service-vfs + ${jenax.version} + + + + + org.aksw.difs + difs-system + ${difs.version} + + - org.apache.spark - spark-core_${scala.binary.version} - ${spark.version} - + net.sansa-stack + sansa-spark-jena-java_2.12 + ${sansa.version} - javax.ws.rs - javax.ws.rs-api + log4j + log4j + + + org.slf4j + slf4j-log4j12 - org.apache.spark - spark-sql_${scala.binary.version} - ${spark.version} + org.apache.jena + jena-tdb2 + ${jena.version} + org.springframework.boot @@ -131,9 +158,9 @@ - org.aksw.jena-sparql-api - jena-sparql-api-conjure - ${jena-sparql-api.version} + org.aksw.jenax + jenax-conjure-core + ${jenax.version} @@ -143,21 +170,21 @@ - com.beust - jcommander - 1.78 + org.apache.logging.log4j + log4j-slf4j18-impl + ${log4j.version} org.slf4j - slf4j-log4j12 - 1.7.25 + slf4j-api + ${slf4j.version} junit junit - 4.12 + ${junit.version} @@ -176,10 +203,12 @@ maven-project-info-reports-plugin 2.9 + @@ -242,11 +271,10 @@ org.apache.maven.plugins maven-compiler-plugin - 3.6.0 + 3.8.1 - ${maven.compiler.source} - ${maven.compiler.target} - ${project.build.sourceEncoding} + ${maven.compiler.release} + @@ -478,13 +506,13 @@ maven.aksw.internal University Leipzig, AKSW Maven2 Internal Repository - http://maven.aksw.org/repository/internal/ + https://maven.aksw.org/repository/internal/ maven.aksw.snapshots University Leipzig, AKSW Maven2 Snapshot Repository - http://maven.aksw.org/repository/snapshots/ + https://maven.aksw.org/repository/snapshots/