diff --git a/@xen-orchestra/backups/HealthCheckVmBackup.mjs b/@xen-orchestra/backups/HealthCheckVmBackup.mjs index 41b9e96952b..382ca28f575 100644 --- a/@xen-orchestra/backups/HealthCheckVmBackup.mjs +++ b/@xen-orchestra/backups/HealthCheckVmBackup.mjs @@ -1,4 +1,4 @@ -import { Task } from './Task.mjs' +import { Task } from '@vates/task' export class HealthCheckVmBackup { #restoredVm @@ -14,7 +14,7 @@ export class HealthCheckVmBackup { async run() { return Task.run( { - name: 'vmstart', + properties: { name: 'vmstart' }, }, async () => { let restoredVm = this.#restoredVm diff --git a/@xen-orchestra/backups/ImportVmBackup.mjs b/@xen-orchestra/backups/ImportVmBackup.mjs index 5c2482c29ae..4bcaf7da672 100644 --- a/@xen-orchestra/backups/ImportVmBackup.mjs +++ b/@xen-orchestra/backups/ImportVmBackup.mjs @@ -2,10 +2,10 @@ import assert from 'node:assert' import { formatFilenameDate } from './_filenameDate.mjs' import { importIncrementalVm } from './_incrementalVm.mjs' -import { Task } from './Task.mjs' import { watchStreamSize } from './_watchStreamSize.mjs' import { VhdNegative, VhdSynthetic } from 'vhd-lib' import { decorateClass } from '@vates/decorate-with' +import { Task } from '@vates/task' import { createLogger } from '@xen-orchestra/log' import { dirname, join } from 'node:path' import pickBy from 'lodash/pickBy.js' @@ -240,7 +240,7 @@ export class ImportVmBackup { return Task.run( { - name: 'transfer', + properties: { name: 'transfer' }, }, async () => { const xapi = this._xapi diff --git a/@xen-orchestra/backups/_backupWorker.mjs b/@xen-orchestra/backups/_backupWorker.mjs index 6f5ab6b6794..b56f40db94e 100644 --- a/@xen-orchestra/backups/_backupWorker.mjs +++ b/@xen-orchestra/backups/_backupWorker.mjs @@ -14,10 +14,10 @@ import { decorateMethodsWith } from '@vates/decorate-with' import { deduped } from '@vates/disposable/deduped.js' import { getHandler } from '@xen-orchestra/fs' import { parseDuration } from '@vates/parse-duration' +import { Task } from '@vates/task' import { Xapi } from '@xen-orchestra/xapi' import { RemoteAdapter } from './RemoteAdapter.mjs' -import { Task } from './Task.mjs' createCachedLookup().patchGlobal() @@ -178,8 +178,8 @@ process.on('message', async message => { const result = message.runWithLogs ? await Task.run( { - name: 'backup run', - onLog: data => + properties: { name: 'backup run' }, + onProgress: data => emitMessage({ data, type: 'log', diff --git a/@xen-orchestra/backups/_cleanVm.mjs b/@xen-orchestra/backups/_cleanVm.mjs index 32f4d9dbfd7..6508765690b 100644 --- a/@xen-orchestra/backups/_cleanVm.mjs +++ b/@xen-orchestra/backups/_cleanVm.mjs @@ -8,8 +8,8 @@ import { isMetadataFile, isVhdFile, isXvaFile, isXvaSumFile } from './_backupTyp import { limitConcurrency } from 'limit-concurrency-decorator' import { mergeVhdChain } from 'vhd-lib/merge.js' -import { Task } from './Task.mjs' import { Disposable } from 'promise-toolbox' +import { Task } from '@vates/task' import handlerPath from '@xen-orchestra/fs/path' const { DISK_TYPES } = Constants @@ -201,9 +201,9 @@ export async function cleanVm( // remove broken VHDs await asyncMap(vhds, async path => { - if(removeTmp && basename(path)[0] === '.'){ + if (removeTmp && basename(path)[0] === '.') { logInfo('deleting temporary VHD', { path }) - return VhdAbstract.unlink(handler, path) + return VhdAbstract.unlink(handler, path) } try { await Disposable.use(openVhd(handler, path, { checkSecondFooter: !interruptedVhds.has(path) }), vhd => { @@ -488,7 +488,7 @@ export async function cleanVm( await Promise.all([ ...unusedVhdsDeletion, - toMerge.length !== 0 && (merge ? Task.run({ name: 'merge' }, doMerge) : () => Promise.resolve()), + toMerge.length !== 0 && (merge ? Task.run({ properties: { name: 'merge' } }, doMerge) : () => Promise.resolve()), asyncMap(unusedXvas, path => { logWarn('unused XVA', { path }) if (remove) { diff --git a/@xen-orchestra/backups/_incrementalVm.mjs b/@xen-orchestra/backups/_incrementalVm.mjs index beeb64b7088..7f0d972da1e 100644 --- a/@xen-orchestra/backups/_incrementalVm.mjs +++ b/@xen-orchestra/backups/_incrementalVm.mjs @@ -4,9 +4,9 @@ import { asyncMap } from '@xen-orchestra/async-map' import { CancelToken } from 'promise-toolbox' import { compareVersions } from 'compare-versions' import { defer } from 'golike-defer' +import { Task } from '@vates/task' import { cancelableMap } from './_cancelableMap.mjs' -import { Task } from './Task.mjs' import pick from 'lodash/pick.js' import { BASE_DELTA_VDI, COPY_OF, VM_UUID } from './_otherConfig.mjs' diff --git a/@xen-orchestra/backups/_runners/Metadata.mjs b/@xen-orchestra/backups/_runners/Metadata.mjs index 9e0a7d26276..325a95fc518 100644 --- a/@xen-orchestra/backups/_runners/Metadata.mjs +++ b/@xen-orchestra/backups/_runners/Metadata.mjs @@ -1,4 +1,5 @@ import { asyncMap } from '@xen-orchestra/async-map' +import { Task } from '@vates/task' import Disposable from 'promise-toolbox/Disposable' import ignoreErrors from 'promise-toolbox/ignoreErrors' @@ -6,9 +7,10 @@ import { extractIdsFromSimplePattern } from '../extractIdsFromSimplePattern.mjs' import { PoolMetadataBackup } from './_PoolMetadataBackup.mjs' import { XoMetadataBackup } from './_XoMetadataBackup.mjs' import { DEFAULT_SETTINGS, Abstract } from './_Abstract.mjs' -import { runTask } from './_runTask.mjs' import { getAdaptersByRemote } from './_getAdaptersByRemote.mjs' +const noop = Function.prototype + const DEFAULT_METADATA_SETTINGS = { retentionPoolMetadata: 0, retentionXoMetadata: 0, @@ -55,13 +57,16 @@ export const Metadata = class MetadataBackupRunner extends Abstract { poolIds.map(id => this._getRecord('pool', id).catch(error => { // See https://github.com/vatesfr/xen-orchestra/commit/6aa6cfba8ec939c0288f0fa740f6dfad98c43cbb - runTask( + Task.run( { - name: 'get pool record', - data: { type: 'pool', id }, + properties: { + id, + name: 'get pool record', + type: 'pool', + }, }, () => Promise.reject(error) - ) + ).catch(noop) }) ) ), @@ -81,11 +86,11 @@ export const Metadata = class MetadataBackupRunner extends Abstract { if (pools.length !== 0 && settings.retentionPoolMetadata !== 0) { promises.push( asyncMap(pools, async pool => - runTask( + Task.run( { - name: `Starting metadata backup for the pool (${pool.$id}). (${job.id})`, - data: { + properties: { id: pool.$id, + name: `Starting metadata backup for the pool (${pool.$id}). (${job.id})`, pool, poolMaster: await ignoreErrors.call(pool.$xapi.getRecord('host', pool.master)), type: 'pool', @@ -100,17 +105,17 @@ export const Metadata = class MetadataBackupRunner extends Abstract { schedule, settings, }).run() - ) + ).catch(noop) ) ) } if (job.xoMetadata !== undefined && settings.retentionXoMetadata !== 0) { promises.push( - runTask( + Task.run( { - name: `Starting XO metadata backup. (${job.id})`, - data: { + properties: { + name: `Starting XO metadata backup. (${job.id})`, type: 'xo', }, }, @@ -122,7 +127,7 @@ export const Metadata = class MetadataBackupRunner extends Abstract { schedule, settings, }).run() - ) + ).catch(noop) ) } await Promise.all(promises) diff --git a/@xen-orchestra/backups/_runners/VmsRemote.mjs b/@xen-orchestra/backups/_runners/VmsRemote.mjs index 2ad14d2a024..8e3a74e7aeb 100644 --- a/@xen-orchestra/backups/_runners/VmsRemote.mjs +++ b/@xen-orchestra/backups/_runners/VmsRemote.mjs @@ -1,9 +1,9 @@ import { asyncMapSettled } from '@xen-orchestra/async-map' import Disposable from 'promise-toolbox/Disposable' import { limitConcurrency } from 'limit-concurrency-decorator' +import { Task } from '@vates/task' import { extractIdsFromSimplePattern } from '../extractIdsFromSimplePattern.mjs' -import { Task } from '../Task.mjs' import createStreamThrottle from './_createStreamThrottle.mjs' import { DEFAULT_SETTINGS, Abstract } from './_Abstract.mjs' import { getAdaptersByRemote } from './_getAdaptersByRemote.mjs' @@ -76,7 +76,7 @@ export const VmsRemote = class RemoteVmsBackupRunner extends Abstract { } nTriesByVmId[vmUuid]++ - const taskStart = { name: 'backup VM', data: { type: 'VM', id: vmUuid } } + const taskStart = { properties: { id: vmUuid, name: 'backup VM', type: 'VM' } } const vmSettings = { ...settings, ...allSettings[vmUuid] } const isLastRun = nTriesByVmId[vmUuid] === vmSettings.nRetriesVmBackupFailures + 1 @@ -110,23 +110,27 @@ export const VmsRemote = class RemoteVmsBackupRunner extends Abstract { taskByVmId[vmUuid] = new Task(taskStart) } const task = taskByVmId[vmUuid] + // error has to be caught in the task to prevent its failure, but handled outside the task to execute another task.run() + let taskError return task - .run(async () => { - try { - const result = await vmBackup.run() - task.success(result) - return result - } catch (error) { - if (isLastRun) { - throw error - } else { - Task.warning(`Retry the VM mirror backup due to an error`, { - attempt: nTriesByVmId[vmUuid], - error: error.message, - }) - queue.add(vmUuid) - } + .runInside(async () => + vmBackup.run().catch(error => { + taskError = error + }) + ) + .then(result => { + if (taskError === undefined) { + return task.success(result) } + if (isLastRun) { + return task.failure(taskError) + } + // don't end the task + task.warning(`Retry the VM mirror backup due to an error`, { + attempt: nTriesByVmId[vmUuid], + error: taskError.message, + }) + queue.add(vmUuid) }) .catch(noop) } diff --git a/@xen-orchestra/backups/_runners/VmsXapi.mjs b/@xen-orchestra/backups/_runners/VmsXapi.mjs index 57b5cc3f8b4..0396208d348 100644 --- a/@xen-orchestra/backups/_runners/VmsXapi.mjs +++ b/@xen-orchestra/backups/_runners/VmsXapi.mjs @@ -1,12 +1,11 @@ import { asyncMapSettled } from '@xen-orchestra/async-map' import Disposable from 'promise-toolbox/Disposable' import { limitConcurrency } from 'limit-concurrency-decorator' +import { Task } from '@vates/task' import { extractIdsFromSimplePattern } from '../extractIdsFromSimplePattern.mjs' -import { Task } from '../Task.mjs' import createStreamThrottle from './_createStreamThrottle.mjs' import { DEFAULT_SETTINGS, Abstract } from './_Abstract.mjs' -import { runTask } from './_runTask.mjs' import { getAdaptersByRemote } from './_getAdaptersByRemote.mjs' import { IncrementalXapi } from './_vmRunners/IncrementalXapi.mjs' import { FullXapi } from './_vmRunners/FullXapi.mjs' @@ -63,13 +62,12 @@ export const VmsXapi = class VmsXapiBackupRunner extends Abstract { Disposable.all( extractIdsFromSimplePattern(job.srs).map(id => this._getRecord('SR', id).catch(error => { - runTask( + Task.run( { - name: 'get SR record', - data: { type: 'SR', id }, + properties: { id, name: 'get SR record', type: 'SR' }, }, () => Promise.reject(error) - ) + ).catch(noop) }) ) ), @@ -106,11 +104,12 @@ export const VmsXapi = class VmsXapiBackupRunner extends Abstract { } return taskByVmId[vmUuid] } - const vmBackupFailed = error => { + const vmBackupFailed = async (error, task) => { if (isLastRun) { - throw error + return task.failure(error) } else { - Task.warning(`Retry the VM backup due to an error`, { + // don't end the task + task.warning(`Retry the VM backup due to an error`, { attempt: nTriesByVmId[vmUuid], error: error.message, }) @@ -124,19 +123,21 @@ export const VmsXapi = class VmsXapiBackupRunner extends Abstract { nTriesByVmId[vmUuid]++ const vmSettings = { ...settings, ...allSettings[vmUuid] } - const taskStart = { name: 'backup VM', data: { type: 'VM', id: vmUuid } } + const taskStart = { properties: { id: vmUuid, name: 'backup VM', type: 'VM' } } const isLastRun = nTriesByVmId[vmUuid] === vmSettings.nRetriesVmBackupFailures + 1 return this._getRecord('VM', vmUuid).then( disposableVm => Disposable.use(disposableVm, async vm => { - if (taskStart.data.name_label === undefined) { - taskStart.data.name_label = vm.name_label + if (taskStart.properties.name_label === undefined) { + taskStart.properties.name_label = vm.name_label } const task = getVmTask() + // error has to be caught in the task to prevent its failure, but handled outside the task to execute another task.run() + let taskError return task - .run(async () => { + .runInside(async () => { const opts = { baseSettings, config, @@ -161,21 +162,21 @@ export const VmsXapi = class VmsXapiBackupRunner extends Abstract { throw new Error(`Job mode ${job.mode} not implemented`) } } - - try { - const result = await vmBackup.run() + return vmBackup.run().catch(error => { + taskError = error + }) + }) + .then(result => { + if (taskError === undefined) { task.success(result) - return result - } catch (error) { - vmBackupFailed(error) + } else { + // ending the task with error or not ending the task + vmBackupFailed(taskError, task) } }) .catch(noop) // errors are handled by logs }), - error => - getVmTask().run(() => { - vmBackupFailed(error) - }) + error => vmBackupFailed(error, getVmTask()) ) } const { concurrency } = settings diff --git a/@xen-orchestra/backups/_runners/_Abstract.mjs b/@xen-orchestra/backups/_runners/_Abstract.mjs index 8694f605728..df732f40eca 100644 --- a/@xen-orchestra/backups/_runners/_Abstract.mjs +++ b/@xen-orchestra/backups/_runners/_Abstract.mjs @@ -1,8 +1,10 @@ import Disposable from 'promise-toolbox/Disposable' import pTimeout from 'promise-toolbox/timeout' import { compileTemplate } from '@xen-orchestra/template' -import { runTask } from './_runTask.mjs' import { RemoteTimeoutError } from './_RemoteTimeoutError.mjs' +import { Task } from '@vates/task' + +const noop = Function.prototype export const DEFAULT_SETTINGS = { getRemoteTimeout: 300e3, @@ -36,13 +38,16 @@ export const Abstract = class AbstractRunner { }) } catch (error) { // See https://github.com/vatesfr/xen-orchestra/commit/6aa6cfba8ec939c0288f0fa740f6dfad98c43cbb - runTask( + Task.run( { - name: 'get remote adapter', - data: { type: 'remote', id: remoteId }, + properties: { + id: remoteId, + name: 'get remote adapter', + type: 'remote', + }, }, () => Promise.reject(error) - ) + ).catch(noop) } } } diff --git a/@xen-orchestra/backups/_runners/_PoolMetadataBackup.mjs b/@xen-orchestra/backups/_runners/_PoolMetadataBackup.mjs index 0c8156b2064..8d68552174b 100644 --- a/@xen-orchestra/backups/_runners/_PoolMetadataBackup.mjs +++ b/@xen-orchestra/backups/_runners/_PoolMetadataBackup.mjs @@ -1,9 +1,9 @@ import { asyncMap } from '@xen-orchestra/async-map' +import { Task } from '@vates/task' import { DIR_XO_POOL_METADATA_BACKUPS } from '../RemoteAdapter.mjs' import { forkStreamUnpipe } from './_forkStreamUnpipe.mjs' import { formatFilenameDate } from '../_filenameDate.mjs' -import { Task } from '../Task.mjs' export const PATH_DB_DUMP = '/pool/xmldbdump' @@ -54,8 +54,8 @@ export class PoolMetadataBackup { ([remoteId, adapter]) => Task.run( { - name: `Starting metadata backup for the pool (${pool.$id}) for the remote (${remoteId}). (${job.id})`, - data: { + properties: { + name: `Starting metadata backup for the pool (${pool.$id}) for the remote (${remoteId}). (${job.id})`, id: remoteId, type: 'remote', }, diff --git a/@xen-orchestra/backups/_runners/_XoMetadataBackup.mjs b/@xen-orchestra/backups/_runners/_XoMetadataBackup.mjs index 49ae47a5847..a5cf6caba43 100644 --- a/@xen-orchestra/backups/_runners/_XoMetadataBackup.mjs +++ b/@xen-orchestra/backups/_runners/_XoMetadataBackup.mjs @@ -1,9 +1,9 @@ import { asyncMap } from '@xen-orchestra/async-map' import { join } from '@xen-orchestra/fs/path' +import { Task } from '@vates/task' import { DIR_XO_CONFIG_BACKUPS } from '../RemoteAdapter.mjs' import { formatFilenameDate } from '../_filenameDate.mjs' -import { Task } from '../Task.mjs' export class XoMetadataBackup { constructor({ config, job, remoteAdapters, schedule, settings }) { @@ -51,8 +51,8 @@ export class XoMetadataBackup { ([remoteId, adapter]) => Task.run( { - name: `Starting XO metadata backup for the remote (${remoteId}). (${job.id})`, - data: { + properties: { + name: `Starting XO metadata backup for the remote (${remoteId}). (${job.id})`, id: remoteId, type: 'remote', }, diff --git a/@xen-orchestra/backups/_runners/_vmRunners/IncrementalXapi.mjs b/@xen-orchestra/backups/_runners/_vmRunners/IncrementalXapi.mjs index 2bd432901a4..895825edbfb 100644 --- a/@xen-orchestra/backups/_runners/_vmRunners/IncrementalXapi.mjs +++ b/@xen-orchestra/backups/_runners/_vmRunners/IncrementalXapi.mjs @@ -1,6 +1,7 @@ import { asyncEach } from '@vates/async-each' import { createLogger } from '@xen-orchestra/log' import { pipeline } from 'node:stream' +import { Task } from '@vates/task' import isVhdDifferencingDisk from 'vhd-lib/isVhdDifferencingDisk.js' import keyBy from 'lodash/keyBy.js' import mapValues from 'lodash/mapValues.js' @@ -11,7 +12,6 @@ import { exportIncrementalVm } from '../../_incrementalVm.mjs' import { forkDeltaExport } from './_forkDeltaExport.mjs' import { IncrementalRemoteWriter } from '../_writers/IncrementalRemoteWriter.mjs' import { IncrementalXapiWriter } from '../_writers/IncrementalXapiWriter.mjs' -import { Task } from '../../Task.mjs' import { watchStreamSize } from '../../_watchStreamSize.mjs' import { DATETIME, diff --git a/@xen-orchestra/backups/_runners/_vmRunners/_Abstract.mjs b/@xen-orchestra/backups/_runners/_vmRunners/_Abstract.mjs index d9db561d175..bc048813c49 100644 --- a/@xen-orchestra/backups/_runners/_vmRunners/_Abstract.mjs +++ b/@xen-orchestra/backups/_runners/_vmRunners/_Abstract.mjs @@ -1,6 +1,6 @@ import { asyncMap } from '@xen-orchestra/async-map' import { createLogger } from '@xen-orchestra/log' -import { Task } from '../../Task.mjs' +import { Task } from '@vates/task' const { debug, warn } = createLogger('xo:backups:AbstractVmRunner') @@ -80,7 +80,7 @@ export const Abstract = class AbstractVmBackupRunner { // create a task to have an info in the logs and reports return Task.run( { - name: 'health check', + properties: { name: 'health check' }, }, () => { Task.info(`This VM doesn't match the health check's tags for this schedule`) diff --git a/@xen-orchestra/backups/_runners/_vmRunners/_AbstractRemote.mjs b/@xen-orchestra/backups/_runners/_vmRunners/_AbstractRemote.mjs index 1ae1a10243f..03c8f9254e9 100644 --- a/@xen-orchestra/backups/_runners/_vmRunners/_AbstractRemote.mjs +++ b/@xen-orchestra/backups/_runners/_vmRunners/_AbstractRemote.mjs @@ -4,12 +4,12 @@ import { decorateMethodsWith } from '@vates/decorate-with' import { defer } from 'golike-defer' import { Disposable } from 'promise-toolbox' import { createPredicate } from 'value-matcher' +import { Task } from '@vates/task' import { getVmBackupDir } from '../../_getVmBackupDir.mjs' import { Abstract } from './_Abstract.mjs' import { extractIdsFromSimplePattern } from '../../extractIdsFromSimplePattern.mjs' -import { Task } from '../../Task.mjs' export const AbstractRemote = class AbstractRemoteVmBackupRunner extends Abstract { _filterPredicate diff --git a/@xen-orchestra/backups/_runners/_vmRunners/_AbstractXapi.mjs b/@xen-orchestra/backups/_runners/_vmRunners/_AbstractXapi.mjs index f6c9cffc511..4bdac62063c 100644 --- a/@xen-orchestra/backups/_runners/_vmRunners/_AbstractXapi.mjs +++ b/@xen-orchestra/backups/_runners/_vmRunners/_AbstractXapi.mjs @@ -4,9 +4,9 @@ import ignoreErrors from 'promise-toolbox/ignoreErrors' import { asyncMap } from '@xen-orchestra/async-map' import { decorateMethodsWith } from '@vates/decorate-with' import { defer } from 'golike-defer' +import { Task } from '@vates/task' import { getOldEntries } from '../../_getOldEntries.mjs' -import { Task } from '../../Task.mjs' import { Abstract } from './_Abstract.mjs' import { DATETIME, @@ -142,7 +142,7 @@ export const AbstractXapi = class AbstractXapiVmBackupRunner extends Abstract { const settings = this._settings if (this._mustDoSnapshot()) { - await Task.run({ name: 'snapshot' }, async () => { + await Task.run({ properties: { name: 'snapshot' } }, async () => { if (!settings.bypassVdiChainsCheck) { await vm.$assertHealthyVdiChains() } diff --git a/@xen-orchestra/backups/_runners/_writers/FullRemoteWriter.mjs b/@xen-orchestra/backups/_runners/_writers/FullRemoteWriter.mjs index 9d1066dbccc..22db9335824 100644 --- a/@xen-orchestra/backups/_runners/_writers/FullRemoteWriter.mjs +++ b/@xen-orchestra/backups/_runners/_writers/FullRemoteWriter.mjs @@ -1,6 +1,7 @@ +import { Task } from '@vates/task' + import { formatFilenameDate } from '../../_filenameDate.mjs' import { getOldEntries } from '../../_getOldEntries.mjs' -import { Task } from '../../Task.mjs' import { MixinRemoteWriter } from './_MixinRemoteWriter.mjs' import { AbstractFullWriter } from './_AbstractFullWriter.mjs' @@ -9,11 +10,11 @@ export class FullRemoteWriter extends MixinRemoteWriter(AbstractFullWriter) { constructor(props) { super(props) - this.run = Task.wrapFn( + this.run = Task.wrap( { - name: 'export', - data: { + properties: { id: props.remoteId, + name: 'export', type: 'remote', // necessary? @@ -63,7 +64,7 @@ export class FullRemoteWriter extends MixinRemoteWriter(AbstractFullWriter) { await deleteOldBackups() } - await Task.run({ name: 'transfer' }, async () => { + await Task.run({ properties: { name: 'transfer' } }, async () => { await adapter.outputStream(dataFilename, stream, { maxStreamLength, streamLength, diff --git a/@xen-orchestra/backups/_runners/_writers/FullXapiWriter.mjs b/@xen-orchestra/backups/_runners/_writers/FullXapiWriter.mjs index 001f978c4b2..ce97285d74c 100644 --- a/@xen-orchestra/backups/_runners/_writers/FullXapiWriter.mjs +++ b/@xen-orchestra/backups/_runners/_writers/FullXapiWriter.mjs @@ -1,9 +1,9 @@ import ignoreErrors from 'promise-toolbox/ignoreErrors' import { asyncMap, asyncMapSettled } from '@xen-orchestra/async-map' +import { Task } from '@vates/task' import { formatFilenameDate } from '../../_filenameDate.mjs' import { getOldEntries } from '../../_getOldEntries.mjs' -import { Task } from '../../Task.mjs' import { AbstractFullWriter } from './_AbstractFullWriter.mjs' import { MixinXapiWriter } from './_MixinXapiWriter.mjs' @@ -14,11 +14,11 @@ export class FullXapiWriter extends MixinXapiWriter(AbstractFullWriter) { constructor(props) { super(props) - this.run = Task.wrapFn( + this.run = Task.wrap( { - name: 'export', - data: { + properties: { id: props.sr.uuid, + name: 'export', name_label: this._sr.name_label, type: 'SR', @@ -52,7 +52,7 @@ export class FullXapiWriter extends MixinXapiWriter(AbstractFullWriter) { } let targetVmRef - await Task.run({ name: 'transfer' }, async () => { + await Task.run({ properties: { name: 'transfer' } }, async () => { targetVmRef = await xapi.VM_import(stream, sr.$ref, vm => Promise.all([ !_warmMigration && vm.add_tags('Disaster Recovery'), diff --git a/@xen-orchestra/backups/_runners/_writers/IncrementalRemoteWriter.mjs b/@xen-orchestra/backups/_runners/_writers/IncrementalRemoteWriter.mjs index 40a5558f1c1..de28dbcf862 100644 --- a/@xen-orchestra/backups/_runners/_writers/IncrementalRemoteWriter.mjs +++ b/@xen-orchestra/backups/_runners/_writers/IncrementalRemoteWriter.mjs @@ -7,10 +7,10 @@ import { createLogger } from '@xen-orchestra/log' import { decorateClass } from '@vates/decorate-with' import { defer } from 'golike-defer' import { dirname, basename } from 'node:path' +import { Task } from '@vates/task' import { formatFilenameDate } from '../../_filenameDate.mjs' import { getOldEntries } from '../../_getOldEntries.mjs' -import { Task } from '../../Task.mjs' import { MixinRemoteWriter } from './_MixinRemoteWriter.mjs' import { AbstractIncrementalWriter } from './_AbstractIncrementalWriter.mjs' @@ -72,19 +72,20 @@ export class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrement prepare({ isFull }) { // create the task related to this export and ensure all methods are called in this context const task = new Task({ - name: 'export', - data: { + properties: { id: this._remoteId, isFull, + name: 'export', type: 'remote', }, }) - this.transfer = task.wrapFn(this.transfer) - this.healthCheck = task.wrapFn(this.healthCheck) - this.cleanup = task.wrapFn(this.cleanup) - this.afterBackup = task.wrapFn(this.afterBackup, true) + this._prepare = task.wrapInside(this._prepare) + this.transfer = task.wrapInside(this.transfer) + this.healthCheck = task.wrapInside(this.healthCheck) + this.cleanup = task.wrapInside(this.cleanup) + this.afterBackup = task.wrap(this.afterBackup) - return task.run(() => this._prepare()) + return this._prepare() } async _prepare() { @@ -219,7 +220,7 @@ export class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrement vmSnapshot, } - const { size } = await Task.run({ name: 'transfer' }, async () => { + const { size } = await Task.run({ properties: { name: 'transfer' } }, async () => { let transferSize = 0 await asyncEach( Object.keys(deltaExport.vdis), diff --git a/@xen-orchestra/backups/_runners/_writers/IncrementalXapiWriter.mjs b/@xen-orchestra/backups/_runners/_writers/IncrementalXapiWriter.mjs index ce1c2ad1503..9bd2fd3aa5f 100644 --- a/@xen-orchestra/backups/_runners/_writers/IncrementalXapiWriter.mjs +++ b/@xen-orchestra/backups/_runners/_writers/IncrementalXapiWriter.mjs @@ -1,10 +1,10 @@ import { asyncMap, asyncMapSettled } from '@xen-orchestra/async-map' +import { Task } from '@vates/task' import ignoreErrors from 'promise-toolbox/ignoreErrors' import { formatFilenameDate } from '../../_filenameDate.mjs' import { getOldEntries } from '../../_getOldEntries.mjs' import { importIncrementalVm } from '../../_incrementalVm.mjs' -import { Task } from '../../Task.mjs' import { AbstractIncrementalWriter } from './_AbstractIncrementalWriter.mjs' import { MixinXapiWriter } from './_MixinXapiWriter.mjs' @@ -43,20 +43,24 @@ export class IncrementalXapiWriter extends MixinXapiWriter(AbstractIncrementalWr prepare({ isFull }) { // create the task related to this export and ensure all methods are called in this context const task = new Task({ - name: 'export', - data: { + properties: { id: this._sr.uuid, isFull, + name: 'export', name_label: this._sr.name_label, type: 'SR', }, }) - const hasHealthCheckSr = this._healthCheckSr !== undefined - this.transfer = task.wrapFn(this.transfer) - this.cleanup = task.wrapFn(this.cleanup, !hasHealthCheckSr) - this.healthCheck = task.wrapFn(this.healthCheck, hasHealthCheckSr) + this._prepare = task.wrapInside(this._prepare) + this.transfer = task.wrapInside(this.transfer) + if (this._healthCheckSr !== undefined) { + this.cleanup = task.wrapInside(this.cleanup) + this.healthCheck = task.wrap(this.healthCheck) + } else { + this.cleanup = task.wrap(this.cleanup) + } - return task.run(() => this._prepare(isFull)) + return this._prepare(isFull) } async _prepare(isFull) { @@ -139,7 +143,7 @@ export class IncrementalXapiWriter extends MixinXapiWriter(AbstractIncrementalWr const { uuid: srUuid, $xapi: xapi } = sr let targetVmRef - await Task.run({ name: 'transfer' }, async () => { + await Task.run({ properties: { name: 'transfer' } }, async () => { targetVmRef = await importIncrementalVm(this.#decorateVmMetadata(deltaExport), sr) return { size: Object.values(sizeContainers).reduce((sum, { size }) => sum + size, 0), diff --git a/@xen-orchestra/backups/_runners/_writers/_MixinRemoteWriter.mjs b/@xen-orchestra/backups/_runners/_writers/_MixinRemoteWriter.mjs index 20011b7dbef..9cfb75d4acf 100644 --- a/@xen-orchestra/backups/_runners/_writers/_MixinRemoteWriter.mjs +++ b/@xen-orchestra/backups/_runners/_writers/_MixinRemoteWriter.mjs @@ -1,12 +1,12 @@ import { createLogger } from '@xen-orchestra/log' import { join } from 'node:path' +import { Task } from '@vates/task' import assert from 'node:assert' import { formatFilenameDate } from '../../_filenameDate.mjs' import { getVmBackupDir } from '../../_getVmBackupDir.mjs' import { HealthCheckVmBackup } from '../../HealthCheckVmBackup.mjs' import { ImportVmBackup } from '../../ImportVmBackup.mjs' -import { Task } from '../../Task.mjs' import * as MergeWorker from '../../merge-worker/index.mjs' import ms from 'ms' @@ -27,7 +27,7 @@ export const MixinRemoteWriter = (BaseClass = Object) => async _cleanVm(options) { try { - return await Task.run({ name: 'clean-vm' }, () => { + return await Task.run({ properties: { name: 'clean-vm' } }, () => { return this._adapter.cleanVm(this._vmBackupDir, { ...options, fixMetadata: true, @@ -86,7 +86,7 @@ export const MixinRemoteWriter = (BaseClass = Object) => } return Task.run( { - name: 'health check', + properties: { name: 'health check' }, }, async () => { const xapi = sr.$xapi diff --git a/@xen-orchestra/backups/_runners/_writers/_MixinXapiWriter.mjs b/@xen-orchestra/backups/_runners/_writers/_MixinXapiWriter.mjs index f7d6419a331..8c551f4ee25 100644 --- a/@xen-orchestra/backups/_runners/_writers/_MixinXapiWriter.mjs +++ b/@xen-orchestra/backups/_runners/_writers/_MixinXapiWriter.mjs @@ -1,7 +1,7 @@ import assert from 'node:assert/strict' +import { Task } from '@vates/task' import { HealthCheckVmBackup } from '../../HealthCheckVmBackup.mjs' -import { Task } from '../../Task.mjs' import ms from 'ms' export const MixinXapiWriter = (BaseClass = Object) => @@ -33,7 +33,7 @@ export const MixinXapiWriter = (BaseClass = Object) => // copy VM return Task.run( { - name: 'health check', + properties: { name: 'health check' }, }, async () => { const { $xapi: xapi } = sr @@ -47,12 +47,12 @@ export const MixinXapiWriter = (BaseClass = Object) => } if (await this.#isAlreadyOnHealthCheckSr(baseVm)) { healthCheckVmRef = await Task.run( - { name: 'cloning-vm' }, + { properties: { name: 'cloning-vm' } }, async () => await xapi.callAsync('VM.clone', this._targetVmRef, `Health Check - ${baseVm.name_label}`) ) } else { healthCheckVmRef = await Task.run( - { name: 'copying-vm' }, + { properties: { name: 'copying-vm' } }, async () => await xapi.callAsync( 'VM.copy', diff --git a/@xen-orchestra/backups/package.json b/@xen-orchestra/backups/package.json index beb0f62cb0e..6ccf6a0e236 100644 --- a/@xen-orchestra/backups/package.json +++ b/@xen-orchestra/backups/package.json @@ -27,6 +27,7 @@ "@vates/fuse-vhd": "^2.1.1", "@vates/nbd-client": "^3.1.0", "@vates/parse-duration": "^0.1.1", + "@vates/task": "^0.4.0", "@xen-orchestra/async-map": "^0.1.2", "@xen-orchestra/fs": "^4.1.7", "@xen-orchestra/log": "^0.6.0",