diff --git a/src/lib/Generator.class.ts b/src/lib/Generator.class.ts index e43e770..15b4f07 100644 --- a/src/lib/Generator.class.ts +++ b/src/lib/Generator.class.ts @@ -4,7 +4,7 @@ import getSPARQLQuery from '../utils/getSPARQLQuery.js'; import type {Quad, NamedNode} from '@rdfjs/types'; import getSPARQLQueryString from '../utils/getSPARQLQueryString.js'; import getEndpoint from '../utils/getEndpoint.js'; -import type {Endpoint, QueryEngine} from './types.js'; +import type {Endpoint, QueryEngine, QuerySource} from './types.js'; import getEngine from '../utils/getEngine.js'; import getEngineSource from '../utils/getEngineSource.js'; import EventEmitter from 'node:events'; @@ -23,10 +23,9 @@ export default class Generator extends EventEmitter { private iterationsProcessed = 0; private iterationsIncoming = 0; private statements = 0; - private source = ''; private $thisList: NamedNode[] = []; private readonly endpoint: Endpoint; - // private iteratorEnded: boolean = false; + private source?: QuerySource; public constructor( private readonly stage: Stage, private readonly index: number @@ -77,7 +76,6 @@ export default class Generator extends EventEmitter { this.source }: ${(e as Error).message}` ); - if (this.source === '') this.source = getEngineSource(this.endpoint); const unionQuery = getSPARQLQuery( getSPARQLQueryString(this.query), 'construct' @@ -92,7 +90,7 @@ export default class Generator extends EventEmitter { this.engine .queryQuads(getSPARQLQueryString(unionQuery), { - sources: [this.source], + sources: [(this.source ??= getEngineSource(this.endpoint))], }) .then(stream => { stream.on('data', (quad: Quad) => { diff --git a/src/lib/Iterator.class.ts b/src/lib/Iterator.class.ts index b85b014..a5c3dbf 100644 --- a/src/lib/Iterator.class.ts +++ b/src/lib/Iterator.class.ts @@ -6,7 +6,7 @@ import getSPARQLQuery from '../utils/getSPARQLQuery.js'; import {type Bindings} from '@comunica/types'; import getSPARQLQueryString from '../utils/getSPARQLQueryString.js'; import getEndpoint from '../utils/getEndpoint.js'; -import type {Endpoint, QueryEngine} from './types.js'; +import type {Endpoint, QueryEngine, QuerySource} from './types.js'; import getEngine from '../utils/getEngine.js'; import getEngineSource from '../utils/getEngineSource.js'; import parse from 'parse-duration'; @@ -24,7 +24,7 @@ export default class Iterator extends EventEmitter { public readonly endpoint: Endpoint; private readonly engine: QueryEngine; private readonly delay: number = 0; - private source = ''; + private source?: QuerySource; private $offset = 0; public totalResults = 0; @@ -50,7 +50,6 @@ export default class Iterator extends EventEmitter { public run(): void { setTimeout(() => { let resultsPerPage = 0; - if (this.source === '') this.source = getEngineSource(this.endpoint); this.query.offset = this.$offset; const queryString = getSPARQLQueryString(this.query); const error = (e: unknown): Error => @@ -63,7 +62,7 @@ export default class Iterator extends EventEmitter { ); this.engine .queryBindings(queryString, { - sources: [this.source], + sources: [(this.source ??= getEngineSource(this.endpoint))], }) .then(stream => { stream.on('data', (binding: Bindings) => { diff --git a/src/lib/Pipeline.class.ts b/src/lib/Pipeline.class.ts index b693f6d..80f1775 100644 --- a/src/lib/Pipeline.class.ts +++ b/src/lib/Pipeline.class.ts @@ -22,7 +22,6 @@ let spinner: Ora; class Pipeline { public readonly stages = new Map(); public dataDir: string; - private $isValidated = false; private stageNames: string[] = []; private startTime = performance.now(); private readonly destination: File | TriplyDB; @@ -64,6 +63,7 @@ class Pipeline { this.destination = isTriplyDBPathString(destinationFile) ? new TriplyDB(destinationFile).validate() : new File(destinationFile, true).validate(); + this.validate(); } private error(e: Error, stage?: string): void { @@ -76,7 +76,6 @@ class Pipeline { } public getPreviousStage(stage: Stage): Stage | undefined { - this.validate(); if (!this.stages.has(stage.name)) { throw new Error( `This is unexpected: missing stage "${stage.name}" in stages.` @@ -88,18 +87,18 @@ class Pipeline { else return this.stages.get(names[ix - 1]); } - public validate(): void { - if (this.$isValidated) return; - let i = 0; + private validate(): void { if (this.$configuration.stages.length === 0) { throw new Error('Your pipeline contains no stages.'); } + + if (this.$configuration.stages[0].iterator.endpoint === undefined) { + throw new Error( + 'The first stage of your pipeline must have an endpoint defined for the Iterator.' + ); + } + for (const stageConfiguration of this.$configuration.stages) { - if (i === 0 && stageConfiguration.iterator.endpoint === undefined) { - throw new Error( - 'The first stage of your pipeline must have an endpoint defined for the Iterator.' - ); - } if (this.stages.has(stageConfiguration.name)) { throw new Error( `Detected a duplicate name for stage \`${stageConfiguration.name}\` in your pipeline: each stage must have a unique name.` @@ -109,9 +108,7 @@ class Pipeline { stageConfiguration.name, new Stage(this, stageConfiguration) ); - i++; } - this.$isValidated = true; } public get configuration(): LDWorkbenchConfiguration { @@ -128,7 +125,6 @@ class Pipeline { if (!(this.opts?.silent === true)) spinner.start(); let startFromStage = 0; try { - this.validate(); if (this.opts?.startFromStageName !== undefined) { if (/^\d+$/.test(this.opts.startFromStageName)) { const ix = parseInt(this.opts.startFromStageName); diff --git a/src/lib/tests/Generator.class.test.ts b/src/lib/tests/Generator.class.test.ts index acce184..fa5aff3 100644 --- a/src/lib/tests/Generator.class.test.ts +++ b/src/lib/tests/Generator.class.test.ts @@ -68,7 +68,6 @@ describe('Generator Class', () => { expect(generator).to.have.property('query'); expect(generator).to.have.property('engine'); expect(generator).to.have.property('endpoint'); - expect(generator).to.have.property('source'); }); }); describe('run', () => { @@ -108,8 +107,6 @@ describe('Generator Class', () => { }; // read file after pipeline has finished const pipelineParallelGenerators = new Pipeline(config, {silent: true}); - pipelineParallelGenerators.validate(); - await pipelineParallelGenerators.run(); const file = fs.readFileSync(filePath, {encoding: 'utf-8'}); const fileLines = file.split('\n').sort(); @@ -148,7 +145,6 @@ describe('Generator Class', () => { ], }; const pipelineBatch = new Pipeline(batchConfiguration, {silent: true}); - pipelineBatch.validate(); pipelineBatch .run() .then(() => { diff --git a/src/lib/tests/Iterator.class.test.ts b/src/lib/tests/Iterator.class.test.ts index b0e3c39..a8a6b7a 100644 --- a/src/lib/tests/Iterator.class.test.ts +++ b/src/lib/tests/Iterator.class.test.ts @@ -65,7 +65,6 @@ describe('Iterator Class', () => { expect(iterator).to.have.property('query'); expect(iterator).to.have.property('endpoint'); expect(iterator).to.have.property('engine'); - expect(iterator).to.have.property('source'); expect(iterator).to.have.property('$offset', 0); expect(iterator).to.have.property('totalResults', 0); }); diff --git a/src/lib/tests/Pipeline.class.test.ts b/src/lib/tests/Pipeline.class.test.ts index 1bf2b74..cf40173 100644 --- a/src/lib/tests/Pipeline.class.test.ts +++ b/src/lib/tests/Pipeline.class.test.ts @@ -55,7 +55,6 @@ describe('Pipeline Class', () => { expect(pipeline).to.be.an.instanceOf(Pipeline); expect(pipeline).to.have.property('stages').that.is.a('Map'); expect(pipeline).to.have.property('dataDir').that.is.a('string'); - expect(pipeline).to.have.property('$isValidated', false); expect(pipeline).to.have.property('stageNames').that.is.an('array'); expect(pipeline).to.have.property('startTime').that.is.an('number'); expect(pipeline) @@ -99,7 +98,6 @@ describe('Pipeline Class', () => { ], }; const pipeline = new Pipeline(configuration, {silent: true}); - pipeline.validate(); const stage1 = pipeline.stages.get('Stage 1')!; const stage2 = pipeline.stages.get('Stage 2')!; @@ -155,10 +153,9 @@ describe('Pipeline Class', () => { destination: 'file://pipelines/data/example-pipeline.nt', stages: [], } as unknown as LDWorkbenchConfiguration; - const pipeline = new Pipeline(invalidConfiguration, {silent: true}); let failed = false; try { - pipeline.validate(); + new Pipeline(invalidConfiguration, {silent: true}); } catch (error) { if (error instanceof Error) { if (error.message === 'Your pipeline contains no stages.') { @@ -204,10 +201,9 @@ describe('Pipeline Class', () => { }, ], } as unknown as LDWorkbenchConfiguration; - const pipeline = new Pipeline(invalidConfiguration, {silent: true}); let failed = false; try { - pipeline.validate(); + new Pipeline(invalidConfiguration, {silent: true}); } catch (error) { if (error instanceof Error) { if ( @@ -259,10 +255,9 @@ describe('Pipeline Class', () => { }, ], }; - const pipeline = new Pipeline(configDuplicateStageName, {silent: true}); let failed = false; try { - pipeline.validate(); + new Pipeline(configDuplicateStageName, {silent: true}); } catch (error) { if (error instanceof Error) { if ( @@ -315,10 +310,9 @@ describe('Pipeline Class', () => { }, ], }; - const pipeline = new Pipeline(configDuplicateStageName, {silent: true}); let failed = false; try { - pipeline.validate(); + new Pipeline(configDuplicateStageName, {silent: true}); } catch (error) { failed = true; if (error instanceof Error) { diff --git a/src/lib/tests/PreviousStage.class.test.ts b/src/lib/tests/PreviousStage.class.test.ts index 4e3fba4..5c0701c 100644 --- a/src/lib/tests/PreviousStage.class.test.ts +++ b/src/lib/tests/PreviousStage.class.test.ts @@ -43,7 +43,6 @@ describe('PreviousStage Class', () => { ], }; const pipeline = new Pipeline(config, {silent: true}); - pipeline.validate(); const stage: Stage = new Stage(pipeline, config.stages[1]); const stagesSoFar = Array.from(stage.pipeline.stages.keys()); const previousStage = new PreviousStage(stage, stagesSoFar.pop()!); @@ -88,7 +87,6 @@ describe('PreviousStage Class', () => { ], }; const pipeline = new Pipeline(config, {silent: true}); - pipeline.validate(); const stage: Stage = new Stage(pipeline, config.stages[0]); const stagesSoFar = Array.from(stage.pipeline.stages.keys()); const previousStage = new PreviousStage(stage, stagesSoFar.pop()!); @@ -132,7 +130,6 @@ describe('PreviousStage Class', () => { ], }; const pipeline = new Pipeline(config, {silent: true}); - pipeline.validate(); const stageTwo: Stage = new Stage(pipeline, config.stages[1]); const stagesSoFar = Array.from(stageTwo.pipeline.stages.keys()); const previousStage = new PreviousStage(stageTwo, stagesSoFar.pop()!); // should be stage one @@ -178,7 +175,6 @@ describe('PreviousStage Class', () => { ], }; const pipeline = new Pipeline(config, {silent: true}); - pipeline.validate(); const stage: Stage = new Stage(pipeline, config.stages[1]); const stagesSoFar = Array.from(stage.pipeline.stages.keys()); const previousStage = new PreviousStage(stage, stagesSoFar.pop()!); diff --git a/src/lib/types.ts b/src/lib/types.ts index aaff9e2..efc22c7 100644 --- a/src/lib/types.ts +++ b/src/lib/types.ts @@ -5,3 +5,4 @@ import {type QueryEngine as QueryEngineFile} from '@comunica/query-sparql-file'; export type Endpoint = File | URL | PreviousStage; export type QueryEngine = QueryEngineSparql | QueryEngineFile; +export type QuerySource = {type?: string; value: string}; diff --git a/src/main.ts b/src/main.ts index b0dcae9..e0ae9f4 100644 --- a/src/main.ts +++ b/src/main.ts @@ -72,12 +72,11 @@ async function main(): Promise { ); } - const pipeline = new Pipeline(configuration, { - startFromStageName: cliArgs.stage, - silent: cliArgs.silent, - }); - try { + const pipeline = new Pipeline(configuration, { + startFromStageName: cliArgs.stage, + silent: cliArgs.silent, + }); await pipeline.run(); } catch (e) { error( diff --git a/src/utils/getEndpoint.ts b/src/utils/getEndpoint.ts index b2147f4..1dc5167 100644 --- a/src/utils/getEndpoint.ts +++ b/src/utils/getEndpoint.ts @@ -25,8 +25,7 @@ export default function getEndpoint( return new File(endpoint); } else if (endpoint !== undefined) { try { - // fix for GraphDB, see https://github.com/comunica/comunica/issues/962 - return new URL((endpoint as string).replace(/^sparql@/, '')); + return new URL(endpoint); } catch (e) { throw new Error(`"${endpoint as string}" is not a valid URL`); } diff --git a/src/utils/getEngineSource.ts b/src/utils/getEngineSource.ts index 39603cb..b6f19de 100644 --- a/src/utils/getEngineSource.ts +++ b/src/utils/getEngineSource.ts @@ -1,10 +1,9 @@ import {isPreviousStage} from './guards.js'; import {existsSync} from 'fs'; import path from 'path'; -import type {Endpoint} from '../lib/types.js'; +import type {Endpoint, QuerySource} from '../lib/types.js'; -export default function getEngineSource(endpoint: Endpoint): string { - let source: string; +export default function getEngineSource(endpoint: Endpoint): QuerySource { if (isPreviousStage(endpoint)) { const previousStage = endpoint.load(); if (!existsSync(previousStage.destinationPath)) { @@ -12,9 +11,18 @@ export default function getEngineSource(endpoint: Endpoint): string { `The result from stage "${previousStage.name}" (${previousStage.destinationPath}) is not available, make sure to run that stage first` ); } - source = path.resolve(previousStage.destinationPath); - } else { - source = endpoint.toString(); + return { + type: 'file', + value: path.resolve(previousStage.destinationPath), + }; + } else if (endpoint instanceof URL) { + return { + type: 'sparql', + value: endpoint.toString(), + }; } - return source; + + return { + value: endpoint.toString(), + }; } diff --git a/src/utils/tests/utilities.test.ts b/src/utils/tests/utilities.test.ts index dbe186b..597a0d6 100644 --- a/src/utils/tests/utilities.test.ts +++ b/src/utils/tests/utilities.test.ts @@ -402,41 +402,10 @@ describe('Utilities', () => { }, ], }; - const pipeline = new Pipeline(config, {silent: true}); - const stageConfig = config.stages[0]; - // getEndpoint is use in Stage's Iterator, and it will throw there. - expect(() => new Stage(pipeline, stageConfig)).to.throw( + expect(() => new Pipeline(config, {silent: true})).to.throw( 'Error in the iterator of stage `Stage 1`: "invalidExample" is not a valid URL' ); }); - it("should work with URL's prepended with 'sparql@'", () => { - const url = - 'sparql@https://www.goudatijdmachine.nl/sparql/repositories/nafotocollectie'; // will be accepted - const config: LDWorkbenchConfiguration = { - name: 'Example Pipeline', - description: - 'This is an example pipeline. It uses files that are available in this repository and SPARQL endpoints that should work.\n', - destination: 'file://pipelines/data/example-pipeline.nt', - stages: [ - { - name: 'Stage 1', - iterator: { - query: 'file://static/example/iterator-stage-1.rq', - endpoint: url, - }, - generator: [ - { - query: 'file://static/example/generator-stage-1-1.rq', - }, - ], - }, - ], - }; - const pipeline = new Pipeline(config, {silent: true}); - const stageConfig = config.stages[0]; - // getEndpoint is use in Stage's Iterator, and it will throw there. - expect(() => new Stage(pipeline, stageConfig)).to.not.throw(); - }); it('should throw if stage has undefined endpoint and is first stage', () => { const endpoint = undefined; const config: LDWorkbenchConfiguration = { @@ -459,10 +428,8 @@ describe('Utilities', () => { }, ], }; - const pipeline = new Pipeline(config, {silent: true}); - const stageConfig = config.stages[0]; - expect(() => new Stage(pipeline, stageConfig)).to.throw( - 'Error in the iterator of stage `Stage 1`: no destination defined for the iterator and no previous stage to use that result' + expect(() => new Pipeline(config, {silent: true})).to.throw( + 'The first stage of your pipeline must have an endpoint defined for the Iterator.' ); }); it('should return PreviousStage if stage has undefined endpoint', () => { @@ -500,7 +467,6 @@ describe('Utilities', () => { ], }; const pipeline = new Pipeline(config, {silent: true}); - pipeline.validate(); const stage = new Stage(pipeline, config.stages[1]); const retrievedEndpoint = getEndpoint(stage); expect(isPreviousStage(retrievedEndpoint)).to.equal(true); @@ -551,7 +517,6 @@ describe('Utilities', () => { ], }; const pipeline = new Pipeline(config, {silent: true}); - pipeline.validate(); const stage: Stage = new Stage(pipeline, config.stages[1]); const stagesSoFar = Array.from(stage.pipeline.stages.keys()); const previousStage = new PreviousStage(stage, stagesSoFar.pop()!); @@ -560,13 +525,16 @@ describe('Utilities', () => { }); }); describe('getEngineSource', () => { - it('should return string when input is File', () => { + it('should return generic source when input is File', () => { const f = new File(`file://${path.join('./static/example/config.yml')}`); - expect(typeof getEngineSource(f) === 'string').to.equal(true); + expect(getEngineSource(f)).to.deep.equal({value: f.toString()}); }); - it('should return string when input is URL', () => { + it('should return URL source when input is URL', () => { const url = new URL('https://www.example.com'); - expect(typeof getEngineSource(url) === 'string').to.equal(true); + expect(getEngineSource(url)).to.deep.equal({ + type: 'sparql', + value: url.toString(), + }); }); it('should return engine source string when input is PreviousStage with destinationPath', async () => { const config: LDWorkbenchConfiguration = { @@ -602,13 +570,12 @@ describe('Utilities', () => { ], }; const pipeline = new Pipeline(config, {silent: true}); - pipeline.validate(); const stage2: Stage = new Stage(pipeline, config.stages[1]); const stagesSoFar = Array.from(stage2.pipeline.stages.keys()); const previousStage = new PreviousStage(stage2, stagesSoFar.pop()!); const engineSource = getEngineSource(previousStage); expect( - engineSource === + engineSource.value === path.join( process.cwd(), '/pipelines/data/example-pipeline/stage-1.nt' @@ -619,7 +586,6 @@ describe('Utilities', () => { beforeEach(() => { const configuration = loadConfiguration('./static/example/config.yml'); const pipeline = new Pipeline(configuration, {silent: true}); - pipeline.validate(); const stage: Stage = new Stage(pipeline, configuration.stages[1]); const stagesSoFar = Array.from(stage.pipeline.stages.keys()); const previousStage = new PreviousStage(stage, stagesSoFar.pop()!); @@ -641,7 +607,6 @@ describe('Utilities', () => { afterEach(() => { const configuration = loadConfiguration('./static/example/config.yml'); const pipeline = new Pipeline(configuration, {silent: true}); - pipeline.validate(); const stage: Stage = new Stage(pipeline, configuration.stages[1]); const stagesSoFar = Array.from(stage.pipeline.stages.keys()); const previousStage = new PreviousStage(stage, stagesSoFar.pop()!); @@ -659,7 +624,6 @@ describe('Utilities', () => { it('should throw when input is PreviousStage and destinationPath does not exist', () => { const configuration = loadConfiguration('./static/example/config.yml'); const pipeline = new Pipeline(configuration, {silent: true}); - pipeline.validate(); const stage: Stage = new Stage(pipeline, configuration.stages[1]); const stagesSoFar = Array.from(stage.pipeline.stages.keys()); const previousStage = new PreviousStage(stage, stagesSoFar.pop()!);