Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

closes issue #8 #9

Merged
merged 1 commit into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions src/lib/File.class.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { type WriteStream, createWriteStream, existsSync, statSync } from 'fs'
import { type WriteStream, createWriteStream, existsSync, statSync, mkdirSync } from 'fs'
import { isFile, isFilePathString } from '../utils/guards.js'
import { dirname } from 'path'

export default class File {
public static $id = 'File'
private readonly $isValid?: boolean
public constructor(private $path: string) {}
public constructor(private $path: string, private readonly skipExistsCheck: boolean = false) {}

public validate(): File {
if (this.$isValid !== undefined) return this
Expand All @@ -13,7 +14,7 @@ export default class File {
throw new Error(`The filename \`${wrongFilePath}\` should start with \`file://\``)
}
this.$path = this.$path.replace(/^file:\/\//, '')
if (!existsSync(this.$path) || !statSync(this.$path).isFile()) {
if (!this.skipExistsCheck && (!existsSync(this.$path) || !statSync(this.$path).isFile())) {
throw new Error(`File not found: \`${this.$path}\``)
}
return this
Expand All @@ -23,11 +24,14 @@ export default class File {
return this.$path
}

public getStream(): WriteStream {
public getStream(append: boolean = false): WriteStream {
if (existsSync(this.$path)) {
// throw new Error(`File already exists: \`${this.$path}\``)
}
return createWriteStream(this.$path)
if (!existsSync(dirname(this.$path))) {
mkdirSync(dirname(this.$path), { recursive: true})
}
return createWriteStream(this.$path, append ? {flags: 'a'} : {})
}

public toString(): string {
Expand Down
2 changes: 1 addition & 1 deletion src/lib/LDWorkbenchConfiguration.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export interface LDWorkbenchConfiguration {
/**
* The file where the final result of your pipeline is saved.
*/
destination: string;
destination?: string;
/**
* This is where you define the individual iterator/generator for each step.
*
Expand Down
25 changes: 20 additions & 5 deletions src/lib/Pipeline.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,33 @@ import type { LDWorkbenchConfiguration } from "./LDWorkbenchConfiguration.js";
import chalk from "chalk";
import Stage from "./Stage.class.js";
import duration from "../utils/duration.js";
import File from './File.class.js'
import path from "node:path";
import * as fs from "node:fs";
import { isFilePathString } from '../utils/guards.js';

class Pipeline {
public readonly stages = new Map<string, Stage>();
public dataDir: string;
private $isValidated: boolean = false;
private stageNames: string[] = [];
private now = new Date();
private readonly destination: File

public constructor(
private readonly $configuration: LDWorkbenchConfiguration
) {
// create data folder:
this.dataDir = path.join("data", kebabcase(this.$configuration.name));
fs.mkdirSync(this.dataDir, { recursive: true });
const destinationFile = this.configuration.destination ?? `file://${path.join(this.dataDir, 'statements.nt')}`
if (!isFilePathString(destinationFile)) {
throw new Error('We currently only allow publishing data to local files.')
}
if(!destinationFile.endsWith('.nt')) {
throw new Error('We currently only writing results in N-Triples format,\nmake sure your destination filename ends with \'.nt\'.')
}
this.destination = new File(destinationFile, true)
}

private error(e: Error, stage?: string): void {
Expand Down Expand Up @@ -142,7 +153,7 @@ class Pipeline {
if (this.stageNames.length !== 0) {
this.runRecursive();
} else {
this.concat()
this.writeResult()
console.info(
chalk.green(
`✔ your pipeline "${chalk.bold(
Expand All @@ -159,10 +170,14 @@ class Pipeline {
}
}

private concat(): void {
private writeResult(): void {
const spinner = ora("Combining statements from all stages:").start();
const destinationPath = path.join(this.dataDir, 'statements.nt')
const destinationStream = fs.createWriteStream(destinationPath, {flags:'a'})

const destinationPathNew = this.configuration.destination
if (!isFilePathString(destinationPathNew)) {
throw new Error('We currently only allow publishing data to local files.')
}
const destinationStream = this.destination.getStream()
const stageNames = Array.from(this.stages.keys())
for (const stageName of stageNames) {
spinner.suffixText = chalk.bold(stageName)
Expand All @@ -171,7 +186,7 @@ class Pipeline {
destinationStream.write(buffer)
})
}
spinner.suffixText = chalk.bold(destinationPath)
spinner.suffixText = chalk.bold(this.destination.toString())
spinner.succeed()
}

Expand Down
3 changes: 1 addition & 2 deletions static/example/config.yml
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
# Metadata for your pipeline:
name: Example Pipeline
destination: file://data/example/example.ttl
description: >
This is an example pipeline. It uses files that are available in this repository
and SPARQL endpoints that should work.

# The individual stages for your pipeline
stages:
- name: "Stage 1"
iterator:
iterator:
query: file://static/example/iterator-stage-1.rq
endpoint: https://api.triplydb.com/datasets/Triply/iris/services/demo-service/sparql
generator:
Expand Down
2 changes: 1 addition & 1 deletion static/ld-workbench.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,5 @@
}
}
},
"required": ["name", "destination", "stages"]
"required": ["name", "stages"]
}