diff --git a/src/lib/Generator.class.ts b/src/lib/Generator.class.ts index 76de66d..6a1e3b1 100644 --- a/src/lib/Generator.class.ts +++ b/src/lib/Generator.class.ts @@ -1,19 +1,29 @@ +/* eslint-disable @typescript-eslint/method-signature-style */ import type { ConstructQuery } from "sparqljs"; import type Stage from "./Stage.class.js"; import getSPARQLQuery from "../utils/getSPARQLQuery.js"; -import type { Quad, NamedNode, ResultStream } from "@rdfjs/types"; +import type { Quad, NamedNode } from "@rdfjs/types"; import getSPARQLQueryString from "../utils/getSPARQLQueryString.js"; import getEndpoint from "../utils/getEndpoint.js"; import type { Endpoint, QueryEngine } from "./types.js"; import getEngine from '../utils/getEngine.js'; import getEngineSource from '../utils/getEngineSource.js'; +import EventEmitter from 'node:events'; -export default class Generator { +declare interface Generator { + on(event: "data", listener: (statement: Quad) => void): this; + on(event: "end", listener: (numResults: number) => void): this; + + emit(event: "data", statement: Quad): boolean; + emit(event: "end", numResults: number): boolean; +} +class Generator extends EventEmitter { private readonly query: ConstructQuery; private readonly engine: QueryEngine; private source: string = '' private readonly endpoint: Endpoint; public constructor(stage: Stage) { + super() this.query = getSPARQLQuery( stage.configuration.generator.query, "construct" @@ -27,7 +37,7 @@ export default class Generator { this.engine = getEngine(this.endpoint) } - public async loadStatements($this: NamedNode): Promise> { + public run($this: NamedNode): void { // Prebinding, see https://www.w3.org/TR/shacl/#pre-binding // we know the query is safe to use replacement since we checked it before const queryString = getSPARQLQueryString(this.query) @@ -36,8 +46,21 @@ export default class Generator { `<${$this.value}>` ); if (this.source === '') this.source = getEngineSource(this.endpoint) - return this.engine.queryQuads(queryString, { + let numberOfStatements = 0 + this.engine.queryQuads(queryString, { sources: [this.source] - }); + }).then(stream => { + stream.on('data', (quad: Quad) => { + numberOfStatements ++ + this.emit('data', quad) + }) + stream.on('end', () => { + this.emit('end', numberOfStatements) + }) + }).catch(e => { + throw e as Error + }) } } + +export default Generator \ No newline at end of file diff --git a/src/lib/Iterator.class.ts b/src/lib/Iterator.class.ts index f90bef2..3ef2f89 100644 --- a/src/lib/Iterator.class.ts +++ b/src/lib/Iterator.class.ts @@ -1,7 +1,8 @@ +/* eslint-disable @typescript-eslint/method-signature-style */ +import EventEmitter from 'node:events'; import type { SelectQuery } from "sparqljs"; import type Stage from "./Stage.class.js"; -import type { Term, NamedNode } from "@rdfjs/types"; -import { DataFactory } from "n3"; +import type { NamedNode } from "@rdfjs/types"; import getSPARQLQuery from "../utils/getSPARQLQuery.js"; import { type Bindings } from "@comunica/types"; import getSPARQLQueryString from "../utils/getSPARQLQueryString.js"; @@ -12,63 +13,65 @@ import getEngineSource from '../utils/getEngineSource.js'; const DEFAULT_LIMIT = 10; -export default class Iterator - implements AsyncIterator, AsyncIterable +declare interface Iterator { + on(event: "data", listener: ($this: NamedNode) => void): this; + on(event: "end", listener: (numResults: number) => void): this; + + emit(event: "data", $this: NamedNode): boolean; + emit(event: "end", numResults: number): boolean; +} + +class Iterator + extends EventEmitter { private readonly query: SelectQuery; public readonly endpoint: Endpoint; private readonly engine: QueryEngine; private source: string = '' - private bindings?: Bindings[]; private $offset = 0; - private $index = 0; - constructor(private readonly stage: Stage) { + private totalResults = 0 + + constructor(stage: Stage) { + super() this.query = getSPARQLQuery(stage.configuration.iterator.query, "select"); this.query.limit = stage.configuration.iterator.batchSize ?? DEFAULT_LIMIT; this.endpoint = getEndpoint(stage) this.engine = getEngine(this.endpoint) } - private async loadBindings(): Promise { + public run(): void { + let resultsPerPage = 0 if (this.source === '') this.source = getEngineSource(this.endpoint) this.query.offset = this.$offset; const queryString = getSPARQLQueryString(this.query); - const bindings = await this.engine.queryBindings(queryString, { + this.engine.queryBindings(queryString, { sources: [this.source], - }); - return bindings.toArray(); - } - - public async next(): Promise<{ value: NamedNode; done: boolean }> { - if (this.bindings === undefined) this.bindings = await this.loadBindings(); - if (this.$index > this.bindings.length - 1) { - this.$offset += this.query.limit ?? DEFAULT_LIMIT; - this.bindings = await this.loadBindings(); - this.$index = 0; - } - const done: boolean = - this.bindings === undefined || this.bindings.length === 0; - let value: Term; - if (!done) { - const $this = this.bindings[this.$index].get("this"); - if ($this === undefined) { - throw new Error( - `Missing binding \`$this\` in your SPARQL query in \`${this.stage.configuration.iterator.query}\`.` - ); - } else if ($this.termType !== "NamedNode") { - throw new Error( - `Binding \`$this\` in your SPARQL query in \`${this.stage.configuration.iterator.query}\` should be an Iri (NamedNode), found a ${$this.termType}.` - ); - } - value = $this as NamedNode; - } else { - value = DataFactory.namedNode(""); - } - this.$index++; - return { value, done }; - } + }).then(stream => { + stream.on('data', (binding: Bindings) => { + resultsPerPage++ + if (!binding.has('this')) throw new Error('Missing binding $this in the Iterator result.') + const $this = binding.get('this')! + if ($this.termType !== 'NamedNode') { + throw new Error(`Binding $this in the Iterator result must be an Iri/NamedNode, but it is of type ${$this.termType}.`) + } else { + this.emit('data', $this) + } + }); - public [Symbol.asyncIterator](): Iterator { - return this; + stream.on('end', () => { + this.totalResults += resultsPerPage + this.$offset += this.query.limit! + if (resultsPerPage < this.query.limit!) { + this.emit('end', this.totalResults) + } else { + this.run() + } + }); + }) + .catch(e => { + throw e + }) } } + +export default Iterator \ No newline at end of file diff --git a/src/lib/Pipeline.class.ts b/src/lib/Pipeline.class.ts index 5f46f1d..2986e4e 100644 --- a/src/lib/Pipeline.class.ts +++ b/src/lib/Pipeline.class.ts @@ -99,35 +99,33 @@ class Pipeline { spinner.fail((e as Error).message); this.error(e as Error); } - let i = -1 - for (const name of this.stages.keys()) { - i++ - if (i < startFromStage) { - ora().start().info(`skipping stage "${chalk.bold(name)}" as requested`).stop() - } else { - const spinner = ora("Loading results from Iterator").start(); - const stage = this.stages.get(name)!; - stage.on("iteratorResult", ($this) => { - spinner.text = $this.value; - }); - let count = 0 - stage.on("generatorResult", () => { - count++ - }); - try { - await stage.run() - // @TODO: the # of quads is not correct, should be something in the async loop... - spinner.succeed(`stage "${chalk.bold(name)}" resulted in ${count} quads`) - } catch (e) { - spinner.fail((e as Error).message); - this.error(e as Error); + const stageNames = Array.from(this.stages.keys()).splice(startFromStage) + + function run(stages: Map): void { + const stage = stages.get(stageNames.shift()!)! + const spinner = ora("Loading results from Iterator").start(); + stage.on("iteratorResult", ($this) => { + spinner.text = $this.value; + }); + stage.on("end", (iris, statements) => { + spinner.succeed(`stage "${chalk.bold(stage.name)}" resulted in ${statements} statement${statements === 1 ?'':'s'} in ${iris} iteration${iris === 1 ?'':'s'}.`) + if (stageNames.length !== 0) { + run(stages) + } else { + console.info( + chalk.green(`✔ your pipeline was completed in ${duration(now)}`) + ); } + }); + try { + stage.run() + } catch(e) { + spinner.fail((e as Error).message); } } - console.info( - chalk.green(`✔ your pipeline was completed in ${duration(now)}`) - ); + + run(this.stages) } get name(): string { diff --git a/src/lib/Stage.class.ts b/src/lib/Stage.class.ts index dd54793..3d7d2f4 100644 --- a/src/lib/Stage.class.ts +++ b/src/lib/Stage.class.ts @@ -1,4 +1,3 @@ -/* eslint-disable @typescript-eslint/adjacent-overload-signatures */ /* eslint-disable @typescript-eslint/method-signature-style */ import EventEmitter from 'node:events'; import File from './File.class.js'; @@ -9,25 +8,16 @@ import kebabcase from 'lodash.kebabcase' import type Pipeline from './Pipeline.class.js'; import path from 'node:path'; import { Writer } from 'n3' -import type { Quad, NamedNode } from '@rdfjs/types' +import type { NamedNode } from '@rdfjs/types' import type { WriteStream } from 'node:fs'; declare interface Stage { - on(event: "generatorResult", listener: () => void): this; - off(event: "generatorResult", listener: () => void): this; - emit(event: "generatorResult"): boolean; - - on(event: "generatorResultFinished", listener: (statements: number) => void): this; - off(event: "generatorResultFinished", listener: (statements: number) => void): this; - emit(event: "generatorResultFinished", statements: number): boolean; - - on(event: "finished", listener: (statements: number) => void): this; - off(event: "finished", listener: (statements: number) => void): this; - emit(event: "finished", statements: number): boolean; - + on(event: "generatorResult", listener: (count: number) => void): this; + on(event: "end", listener: (iteratorCount: number, statements: number) => void): this; on(event: "iteratorResult", listener: ($this: NamedNode) => void): this; - off(event: "iteratorResult", listener: ($this: NamedNode) => void): this; - emit(event: "iteratorResult", $this: NamedNode): boolean; + emit(event: "generatorResult", count: number): boolean; + emit(event: "end", iteratorCount: number, statements: number): boolean; + emit(event: "iteratorResult", $this: NamedNode): boolean; } class Stage extends EventEmitter { @@ -62,27 +52,29 @@ class Stage extends EventEmitter { return this.configuration.name } - public async run(): Promise { + public run(): void { let quadCount = 0 - const writeStream = this.destination() - for await (const $this of this.iterator) { - let qc = 0 + let iteratorCount = 0 + let generatorCount = 0 + const writer = new Writer(this.destination(), { end: false, format: 'N-Triples' }) + this.generator.on('data', quad => { + writer.addQuad(quad) + quadCount ++ + }) + this.generator.on('end', _ => { + generatorCount++ + if (generatorCount === iteratorCount) { + this.emit('end', iteratorCount, quadCount) + } + }) + this.iterator.on('data', $this => { + this.generator.run($this) this.emit('iteratorResult', $this) - const quadStream = await this.generator.loadStatements($this) - const writer = new Writer(writeStream, { end: false, format: 'N-Triples' }) - quadStream.on('data', (quad: Quad) => { - this.emit('generatorResult') - quadCount++ - qc++ - writer.addQuad(quad) - }) - - quadStream.on('end', () => { - this.emit('generatorResultFinished', qc) - }) - } - this.emit('finished', quadCount) - return quadCount + }) + this.iterator.on('end', count => { + iteratorCount = count + }) + this.iterator.run() } }