Skip to content

Commit

Permalink
Xo tasks @xo/backups (#8012)
Browse files Browse the repository at this point in the history
  • Loading branch information
b-Nollet authored Dec 16, 2024
1 parent ba3753a commit 542e801
Show file tree
Hide file tree
Showing 22 changed files with 139 additions and 117 deletions.
4 changes: 2 additions & 2 deletions @xen-orchestra/backups/HealthCheckVmBackup.mjs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Task } from './Task.mjs'
import { Task } from '@vates/task'

export class HealthCheckVmBackup {
#restoredVm
Expand All @@ -14,7 +14,7 @@ export class HealthCheckVmBackup {
async run() {
return Task.run(
{
name: 'vmstart',
properties: { name: 'vmstart' },
},
async () => {
let restoredVm = this.#restoredVm
Expand Down
4 changes: 2 additions & 2 deletions @xen-orchestra/backups/ImportVmBackup.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ import assert from 'node:assert'

import { formatFilenameDate } from './_filenameDate.mjs'
import { importIncrementalVm } from './_incrementalVm.mjs'
import { Task } from './Task.mjs'
import { watchStreamSize } from './_watchStreamSize.mjs'
import { VhdNegative, VhdSynthetic } from 'vhd-lib'
import { decorateClass } from '@vates/decorate-with'
import { Task } from '@vates/task'
import { createLogger } from '@xen-orchestra/log'
import { dirname, join } from 'node:path'
import pickBy from 'lodash/pickBy.js'
Expand Down Expand Up @@ -240,7 +240,7 @@ export class ImportVmBackup {

return Task.run(
{
name: 'transfer',
properties: { name: 'transfer' },
},
async () => {
const xapi = this._xapi
Expand Down
6 changes: 3 additions & 3 deletions @xen-orchestra/backups/_backupWorker.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import { decorateMethodsWith } from '@vates/decorate-with'
import { deduped } from '@vates/disposable/deduped.js'
import { getHandler } from '@xen-orchestra/fs'
import { parseDuration } from '@vates/parse-duration'
import { Task } from '@vates/task'
import { Xapi } from '@xen-orchestra/xapi'

import { RemoteAdapter } from './RemoteAdapter.mjs'
import { Task } from './Task.mjs'

createCachedLookup().patchGlobal()

Expand Down Expand Up @@ -178,8 +178,8 @@ process.on('message', async message => {
const result = message.runWithLogs
? await Task.run(
{
name: 'backup run',
onLog: data =>
properties: { name: 'backup run' },
onProgress: data =>
emitMessage({
data,
type: 'log',
Expand Down
8 changes: 4 additions & 4 deletions @xen-orchestra/backups/_cleanVm.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import { isMetadataFile, isVhdFile, isXvaFile, isXvaSumFile } from './_backupTyp
import { limitConcurrency } from 'limit-concurrency-decorator'
import { mergeVhdChain } from 'vhd-lib/merge.js'

import { Task } from './Task.mjs'
import { Disposable } from 'promise-toolbox'
import { Task } from '@vates/task'
import handlerPath from '@xen-orchestra/fs/path'

const { DISK_TYPES } = Constants
Expand Down Expand Up @@ -201,9 +201,9 @@ export async function cleanVm(

// remove broken VHDs
await asyncMap(vhds, async path => {
if(removeTmp && basename(path)[0] === '.'){
if (removeTmp && basename(path)[0] === '.') {
logInfo('deleting temporary VHD', { path })
return VhdAbstract.unlink(handler, path)
return VhdAbstract.unlink(handler, path)
}
try {
await Disposable.use(openVhd(handler, path, { checkSecondFooter: !interruptedVhds.has(path) }), vhd => {
Expand Down Expand Up @@ -488,7 +488,7 @@ export async function cleanVm(

await Promise.all([
...unusedVhdsDeletion,
toMerge.length !== 0 && (merge ? Task.run({ name: 'merge' }, doMerge) : () => Promise.resolve()),
toMerge.length !== 0 && (merge ? Task.run({ properties: { name: 'merge' } }, doMerge) : () => Promise.resolve()),
asyncMap(unusedXvas, path => {
logWarn('unused XVA', { path })
if (remove) {
Expand Down
2 changes: 1 addition & 1 deletion @xen-orchestra/backups/_incrementalVm.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import { asyncMap } from '@xen-orchestra/async-map'
import { CancelToken } from 'promise-toolbox'
import { compareVersions } from 'compare-versions'
import { defer } from 'golike-defer'
import { Task } from '@vates/task'

import { cancelableMap } from './_cancelableMap.mjs'
import { Task } from './Task.mjs'
import pick from 'lodash/pick.js'
import { BASE_DELTA_VDI, COPY_OF, VM_UUID } from './_otherConfig.mjs'

Expand Down
31 changes: 18 additions & 13 deletions @xen-orchestra/backups/_runners/Metadata.mjs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import { asyncMap } from '@xen-orchestra/async-map'
import { Task } from '@vates/task'
import Disposable from 'promise-toolbox/Disposable'
import ignoreErrors from 'promise-toolbox/ignoreErrors'

import { extractIdsFromSimplePattern } from '../extractIdsFromSimplePattern.mjs'
import { PoolMetadataBackup } from './_PoolMetadataBackup.mjs'
import { XoMetadataBackup } from './_XoMetadataBackup.mjs'
import { DEFAULT_SETTINGS, Abstract } from './_Abstract.mjs'
import { runTask } from './_runTask.mjs'
import { getAdaptersByRemote } from './_getAdaptersByRemote.mjs'

const noop = Function.prototype

const DEFAULT_METADATA_SETTINGS = {
retentionPoolMetadata: 0,
retentionXoMetadata: 0,
Expand Down Expand Up @@ -55,13 +57,16 @@ export const Metadata = class MetadataBackupRunner extends Abstract {
poolIds.map(id =>
this._getRecord('pool', id).catch(error => {
// See https://github.com/vatesfr/xen-orchestra/commit/6aa6cfba8ec939c0288f0fa740f6dfad98c43cbb
runTask(
Task.run(
{
name: 'get pool record',
data: { type: 'pool', id },
properties: {
id,
name: 'get pool record',
type: 'pool',
},
},
() => Promise.reject(error)
)
).catch(noop)
})
)
),
Expand All @@ -81,11 +86,11 @@ export const Metadata = class MetadataBackupRunner extends Abstract {
if (pools.length !== 0 && settings.retentionPoolMetadata !== 0) {
promises.push(
asyncMap(pools, async pool =>
runTask(
Task.run(
{
name: `Starting metadata backup for the pool (${pool.$id}). (${job.id})`,
data: {
properties: {
id: pool.$id,
name: `Starting metadata backup for the pool (${pool.$id}). (${job.id})`,
pool,
poolMaster: await ignoreErrors.call(pool.$xapi.getRecord('host', pool.master)),
type: 'pool',
Expand All @@ -100,17 +105,17 @@ export const Metadata = class MetadataBackupRunner extends Abstract {
schedule,
settings,
}).run()
)
).catch(noop)
)
)
}

if (job.xoMetadata !== undefined && settings.retentionXoMetadata !== 0) {
promises.push(
runTask(
Task.run(
{
name: `Starting XO metadata backup. (${job.id})`,
data: {
properties: {
name: `Starting XO metadata backup. (${job.id})`,
type: 'xo',
},
},
Expand All @@ -122,7 +127,7 @@ export const Metadata = class MetadataBackupRunner extends Abstract {
schedule,
settings,
}).run()
)
).catch(noop)
)
}
await Promise.all(promises)
Expand Down
38 changes: 21 additions & 17 deletions @xen-orchestra/backups/_runners/VmsRemote.mjs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { asyncMapSettled } from '@xen-orchestra/async-map'
import Disposable from 'promise-toolbox/Disposable'
import { limitConcurrency } from 'limit-concurrency-decorator'
import { Task } from '@vates/task'

import { extractIdsFromSimplePattern } from '../extractIdsFromSimplePattern.mjs'
import { Task } from '../Task.mjs'
import createStreamThrottle from './_createStreamThrottle.mjs'
import { DEFAULT_SETTINGS, Abstract } from './_Abstract.mjs'
import { getAdaptersByRemote } from './_getAdaptersByRemote.mjs'
Expand Down Expand Up @@ -76,7 +76,7 @@ export const VmsRemote = class RemoteVmsBackupRunner extends Abstract {
}
nTriesByVmId[vmUuid]++

const taskStart = { name: 'backup VM', data: { type: 'VM', id: vmUuid } }
const taskStart = { properties: { id: vmUuid, name: 'backup VM', type: 'VM' } }
const vmSettings = { ...settings, ...allSettings[vmUuid] }
const isLastRun = nTriesByVmId[vmUuid] === vmSettings.nRetriesVmBackupFailures + 1

Expand Down Expand Up @@ -110,23 +110,27 @@ export const VmsRemote = class RemoteVmsBackupRunner extends Abstract {
taskByVmId[vmUuid] = new Task(taskStart)
}
const task = taskByVmId[vmUuid]
// error has to be caught in the task to prevent its failure, but handled outside the task to execute another task.run()
let taskError
return task
.run(async () => {
try {
const result = await vmBackup.run()
task.success(result)
return result
} catch (error) {
if (isLastRun) {
throw error
} else {
Task.warning(`Retry the VM mirror backup due to an error`, {
attempt: nTriesByVmId[vmUuid],
error: error.message,
})
queue.add(vmUuid)
}
.runInside(async () =>
vmBackup.run().catch(error => {
taskError = error
})
)
.then(result => {
if (taskError === undefined) {
return task.success(result)
}
if (isLastRun) {
return task.failure(taskError)
}
// don't end the task
task.warning(`Retry the VM mirror backup due to an error`, {
attempt: nTriesByVmId[vmUuid],
error: taskError.message,
})
queue.add(vmUuid)
})
.catch(noop)
}
Expand Down
47 changes: 24 additions & 23 deletions @xen-orchestra/backups/_runners/VmsXapi.mjs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import { asyncMapSettled } from '@xen-orchestra/async-map'
import Disposable from 'promise-toolbox/Disposable'
import { limitConcurrency } from 'limit-concurrency-decorator'
import { Task } from '@vates/task'

import { extractIdsFromSimplePattern } from '../extractIdsFromSimplePattern.mjs'
import { Task } from '../Task.mjs'
import createStreamThrottle from './_createStreamThrottle.mjs'
import { DEFAULT_SETTINGS, Abstract } from './_Abstract.mjs'
import { runTask } from './_runTask.mjs'
import { getAdaptersByRemote } from './_getAdaptersByRemote.mjs'
import { IncrementalXapi } from './_vmRunners/IncrementalXapi.mjs'
import { FullXapi } from './_vmRunners/FullXapi.mjs'
Expand Down Expand Up @@ -63,13 +62,12 @@ export const VmsXapi = class VmsXapiBackupRunner extends Abstract {
Disposable.all(
extractIdsFromSimplePattern(job.srs).map(id =>
this._getRecord('SR', id).catch(error => {
runTask(
Task.run(
{
name: 'get SR record',
data: { type: 'SR', id },
properties: { id, name: 'get SR record', type: 'SR' },
},
() => Promise.reject(error)
)
).catch(noop)
})
)
),
Expand Down Expand Up @@ -106,11 +104,12 @@ export const VmsXapi = class VmsXapiBackupRunner extends Abstract {
}
return taskByVmId[vmUuid]
}
const vmBackupFailed = error => {
const vmBackupFailed = async (error, task) => {
if (isLastRun) {
throw error
return task.failure(error)
} else {
Task.warning(`Retry the VM backup due to an error`, {
// don't end the task
task.warning(`Retry the VM backup due to an error`, {
attempt: nTriesByVmId[vmUuid],
error: error.message,
})
Expand All @@ -124,19 +123,21 @@ export const VmsXapi = class VmsXapiBackupRunner extends Abstract {
nTriesByVmId[vmUuid]++

const vmSettings = { ...settings, ...allSettings[vmUuid] }
const taskStart = { name: 'backup VM', data: { type: 'VM', id: vmUuid } }
const taskStart = { properties: { id: vmUuid, name: 'backup VM', type: 'VM' } }
const isLastRun = nTriesByVmId[vmUuid] === vmSettings.nRetriesVmBackupFailures + 1

return this._getRecord('VM', vmUuid).then(
disposableVm =>
Disposable.use(disposableVm, async vm => {
if (taskStart.data.name_label === undefined) {
taskStart.data.name_label = vm.name_label
if (taskStart.properties.name_label === undefined) {
taskStart.properties.name_label = vm.name_label
}

const task = getVmTask()
// error has to be caught in the task to prevent its failure, but handled outside the task to execute another task.run()
let taskError
return task
.run(async () => {
.runInside(async () => {
const opts = {
baseSettings,
config,
Expand All @@ -161,21 +162,21 @@ export const VmsXapi = class VmsXapiBackupRunner extends Abstract {
throw new Error(`Job mode ${job.mode} not implemented`)
}
}

try {
const result = await vmBackup.run()
return vmBackup.run().catch(error => {
taskError = error
})
})
.then(result => {
if (taskError === undefined) {
task.success(result)
return result
} catch (error) {
vmBackupFailed(error)
} else {
// ending the task with error or not ending the task
vmBackupFailed(taskError, task)
}
})
.catch(noop) // errors are handled by logs
}),
error =>
getVmTask().run(() => {
vmBackupFailed(error)
})
error => vmBackupFailed(error, getVmTask())
)
}
const { concurrency } = settings
Expand Down
15 changes: 10 additions & 5 deletions @xen-orchestra/backups/_runners/_Abstract.mjs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import Disposable from 'promise-toolbox/Disposable'
import pTimeout from 'promise-toolbox/timeout'
import { compileTemplate } from '@xen-orchestra/template'
import { runTask } from './_runTask.mjs'
import { RemoteTimeoutError } from './_RemoteTimeoutError.mjs'
import { Task } from '@vates/task'

const noop = Function.prototype

export const DEFAULT_SETTINGS = {
getRemoteTimeout: 300e3,
Expand Down Expand Up @@ -36,13 +38,16 @@ export const Abstract = class AbstractRunner {
})
} catch (error) {
// See https://github.com/vatesfr/xen-orchestra/commit/6aa6cfba8ec939c0288f0fa740f6dfad98c43cbb
runTask(
Task.run(
{
name: 'get remote adapter',
data: { type: 'remote', id: remoteId },
properties: {
id: remoteId,
name: 'get remote adapter',
type: 'remote',
},
},
() => Promise.reject(error)
)
).catch(noop)
}
}
}
Expand Down
Loading

0 comments on commit 542e801

Please sign in to comment.