From 1ddff1a77ccf74607826e82f4b4f621f6f15fa8f Mon Sep 17 00:00:00 2001 From: David de Boer Date: Sat, 15 Jun 2024 13:43:10 +0200 Subject: [PATCH] refactor: Extract Progress class (#85) --- src/file.ts | 6 +-- src/pipeline.ts | 130 ++++++++++++++++++++++------------------------- src/progress.ts | 46 +++++++++++++++++ src/triply-db.ts | 6 +-- 4 files changed, 112 insertions(+), 76 deletions(-) create mode 100644 src/progress.ts diff --git a/src/file.ts b/src/file.ts index d65d360..891e710 100644 --- a/src/file.ts +++ b/src/file.ts @@ -9,9 +9,9 @@ import { import {isFile, isFilePathString} from './utils/guards.js'; import {dirname} from 'path'; import chalk from 'chalk'; -import {type Ora} from 'ora'; import type Pipeline from './pipeline.js'; import {pipeline as streamPipeline} from 'stream/promises'; +import {Progress} from './progress.js'; export default class File { public readonly $id = 'File'; @@ -64,10 +64,10 @@ export default class File { return isFile(value); } - public async write(pipeline: Pipeline, spinner: Ora): Promise { + public async write(pipeline: Pipeline, progress: Progress): Promise { const stageNames = Array.from(pipeline.stages.keys()); for (const stageName of stageNames) { - if (spinner !== undefined) spinner.suffixText = chalk.bold(stageName); + progress.suffixText(chalk.bold(stageName)); await streamPipeline( createReadStream(pipeline.stages.get(stageName)!.destinationPath), this.getStream(true) diff --git a/src/pipeline.ts b/src/pipeline.ts index 1a15a4f..5bb4047 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -1,10 +1,7 @@ -import ora, {Ora} from 'ora'; import kebabcase from 'lodash.kebabcase'; import type {Configuration} from './configuration.js'; import chalk from 'chalk'; import Stage from './stage.js'; -import formatDuration from './utils/formatDuration.js'; -import {millify} from 'millify'; import File from './file.js'; import path from 'node:path'; import * as fs from 'node:fs'; @@ -12,13 +9,14 @@ import {isFilePathString, isTriplyDBPathString} from './utils/guards.js'; import TriplyDB from './triply-db.js'; import prettyMilliseconds from 'pretty-ms'; import {memoryConsumption} from './utils/memory.js'; +import {Progress} from './progress.js'; +import {millify} from 'millify'; +import formatDuration from './utils/formatDuration.js'; interface PipelineOptions { startFromStageName?: string; silent?: boolean; } -let spinner: Ora; - class Pipeline { public readonly stages = new Map(); public dataDir: string; @@ -116,13 +114,9 @@ class Pipeline { } public async run(): Promise { - this.startTime = performance.now(); - if (!(this.opts?.silent === true)) - console.info( - chalk.cyan(`▶ Starting pipeline “${chalk.bold(this.name)}”`) - ); - spinner = ora('Validating pipeline'); - if (!(this.opts?.silent === true)) spinner.start(); + const progress = new Progress({silent: this.opts?.silent === true}) + .line(chalk.cyan(`▶ Starting pipeline “${chalk.bold(this.name)}”`)) + .start('Validating pipeline'); let startFromStage = 0; try { if (this.opts?.startFromStageName !== undefined) { @@ -136,7 +130,6 @@ class Pipeline { this.opts.startFromStageName )}.` ); - if (!(this.opts?.silent === true)) spinner.fail(e.message); this.error(e); } else { startFromStage = ix - 1; @@ -149,7 +142,6 @@ class Pipeline { this.opts.startFromStageName )}.` ); - if (!(this.opts?.silent === true)) spinner.fail(e.message); this.error(e); } else { startFromStage = Array.from(this.stages.keys()).findIndex( @@ -157,9 +149,8 @@ class Pipeline { ); } } - if (!(this.opts?.silent === true)) spinner.succeed(); + progress.succeed(); } catch (e) { - spinner.fail((e as Error).message); this.error(e as Error); } @@ -168,87 +159,84 @@ class Pipeline { Array.from(this.stages.keys()) .slice(0, startFromStage) .forEach(stagename => { - ora() - .start() - .info(`stage "${chalk.bold(stagename)}" was skipped`) + new Progress({silent: this.opts?.silent === true}) + .start(`stage "${chalk.bold(stagename)}" was skipped`) .stop(); }); - await this.runRecursive(); + await this.runStage(); } - private async runRecursive(): Promise { + private async runStage(): Promise { const stage = this.stages.get(this.stageNames.shift()!)!; - spinner = ora('Loading results from Iterator'); + const progress = new Progress({silent: this.opts?.silent === true}).start( + 'Loading results from iterator' + ); const startTime = performance.now(); let iterationsProcessed = 0; - if (!(this.opts?.silent === true)) spinner.start(); await new Promise((resolve, reject) => { stage.on('iteratorResult', (_$this, quadsGenerated) => { iterationsProcessed++; - this.updateSpinner( + this.increaseProgress( + progress, stage, - startTime, iterationsProcessed, quadsGenerated ); }); stage.on('generatorResult', count => { - this.updateSpinner(stage, startTime, iterationsProcessed, count); + this.increaseProgress(progress, stage, iterationsProcessed, count); }); stage.on('error', e => { - spinner.fail(); - this.error(e); reject(e); + this.error(e); }); stage.on('end', (iris, statements) => { - if (!(this.opts?.silent === true)) - spinner.succeed( - `Stage “${chalk.bold( - stage.name - )}” resulted in ${statements.toLocaleString()} statement${ - statements === 1 ? '' : 's' - } in ${iris.toLocaleString()} iteration${ - iris === 1 ? '' : 's' - } (took ${prettyMilliseconds(performance.now() - startTime)})` - ); + progress.succeed( + `Stage “${chalk.bold( + stage.name + )}” resulted in ${statements.toLocaleString()} statement${ + statements === 1 ? '' : 's' + } in ${iris.toLocaleString()} iteration${ + iris === 1 ? '' : 's' + } (took ${prettyMilliseconds(performance.now() - startTime)})` + ); resolve(); }); try { stage.run(); } catch (e) { - spinner.fail((e as Error).message); + progress.fail((e as Error).message); reject(e); } }); - if (this.stageNames.length !== 0) return this.runRecursive(); + if (this.stageNames.length !== 0) return this.runStage(); try { await this.writeResult(); } catch (e) { throw new Error('Pipeline failed: ' + (e as Error).message); } - if (!(this.opts?.silent === true)) - console.info( - chalk.green( - `✔ Your pipeline “${chalk.bold( - this.name - )}” was completed in ${prettyMilliseconds( - performance.now() - this.startTime - )} using ${memoryConsumption()} MB of memory` - ) - ); + progress.line( + chalk.green( + `✔ Your pipeline “${chalk.bold( + this.name + )}” was completed in ${prettyMilliseconds( + performance.now() - this.startTime + )} using ${memoryConsumption()} MB of memory` + ) + ); } private async writeResult(): Promise { - spinner = ora('Writing results to destination'); - if (!(this.opts?.silent === true)) spinner.start(); - await this.destination.write(this, spinner); - if (!(this.opts?.silent === true)) - spinner.suffixText = `${chalk.bold( - path.relative(process.cwd(), this.destination.path) - )}`; - if (!(this.opts?.silent === true)) spinner.succeed(); + const progress = new Progress({silent: this.opts?.silent === true}).start( + 'Writing results to destination' + ); + await this.destination.write(this, progress); + progress.suffixText( + chalk.bold(path.relative(process.cwd(), this.destination.path)) + ); + progress.succeed(); } get name(): string { @@ -259,9 +247,9 @@ class Pipeline { return this.$configuration.description; } - private updateSpinner( + private increaseProgress( + progress: Progress, stage: Stage, - startTime: number, iterationsProcessed: number, quadsGenerated: number ) { @@ -269,16 +257,18 @@ class Pipeline { return; } - spinner.text = `Running stage “${chalk.bold( - stage.name - )}”:\n\n Processed elements: ${millify( - iterationsProcessed - )}\n Generated quads: ${millify( - quadsGenerated - )}\n Duration: ${formatDuration( - startTime, - performance.now() - )}\n Memory: ${memoryConsumption()} MB`; + progress.text( + `Running stage “${chalk.bold( + stage.name + )}”:\n\n Processed elements: ${millify( + iterationsProcessed + )}\n Generated quads: ${millify( + quadsGenerated + )}\n Duration: ${formatDuration( + progress.startTime, + performance.now() + )}\n Memory: ${memoryConsumption()} MB` + ); } } diff --git a/src/progress.ts b/src/progress.ts new file mode 100644 index 0000000..d61ec81 --- /dev/null +++ b/src/progress.ts @@ -0,0 +1,46 @@ +import ora, {Ora} from 'ora'; + +export class Progress { + public readonly startTime = performance.now(); + private readonly spinner?: Ora; + + constructor(private options: {silent: boolean}) { + if (!options.silent) { + this.spinner = ora(); + } + return this; + } + + start(text: string) { + this.spinner?.start(text); + return this; + } + + line(text: string) { + if (!this.options.silent) { + console.info(text); + } + return this; + } + + text(text: string) { + if (this.spinner) this.spinner.text = text; + } + + fail(message: string) { + this.spinner?.fail(message); + } + + suffixText(text: string) { + if (this.spinner === undefined) return; + this.spinner.suffixText = text; + } + + succeed(text?: string) { + this.spinner?.succeed(text); + } + + stop() { + this.spinner?.stop(); + } +} diff --git a/src/triply-db.ts b/src/triply-db.ts index 795fe6f..1fd897f 100644 --- a/src/triply-db.ts +++ b/src/triply-db.ts @@ -1,6 +1,6 @@ -import {type Ora} from 'ora'; import type Pipeline from './pipeline.js'; import App from '@triply/triplydb'; +import {Progress} from './progress.js'; const pattern = /^triplydb:\/\/([a-z0-9-]+)\/([a-z0-9-]+)$/; @@ -46,7 +46,7 @@ export default class TriplyDB { return this.$datasetUrl ?? this.$dsn; } - public async write(pipeline: Pipeline, spinner: Ora): Promise { + public async write(pipeline: Pipeline, progress: Progress): Promise { const filenames = Array.from(pipeline.stages.keys()).map( stageName => pipeline.stages.get(stageName)!.destinationPath ); @@ -56,7 +56,7 @@ export default class TriplyDB { .then(async dataset => { const appInfo = await this.app!.getInfo(); this.$datasetUrl = `${appInfo.consoleUrl}/${this.accountname}/${this.datasetname}`; - spinner.info(`uploading data to ${this.$dsn}`); + progress.line(`uploading data to ${this.$dsn}`); await dataset.importFromFiles(filenames, {mergeGraphs: true}); }) .catch(e => {