Skip to content

Commit

Permalink
fix(asset-handler): do not clear pending task queue on a transient fa…
Browse files Browse the repository at this point in the history
…ilure, today this is causing missing assets in final export when a transient failure is encountered
  • Loading branch information
j33ty committed May 8, 2024
1 parent 7242900 commit 593e496
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 13 deletions.
32 changes: 23 additions & 9 deletions src/AssetHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,17 @@ class AssetHandler {
throw dlError
}

this.queue.add(() =>
doDownload().catch((err) => {
debug('Error downloading asset', err)
this.queue.clear()
this.reject(err)
}),
)
this.queue
.add(() =>
doDownload().catch((err) => {
debug('Failed to download the asset, aborting download', err)
this.queue.clear()
this.reject(err)
}),
)
.catch((error) => {
debug('Queued task failed', error)
})
}

maybeCreateAssetDirs() {
Expand Down Expand Up @@ -192,7 +196,6 @@ class AssetHandler {
}

if (stream.statusCode !== 200) {
this.queue.clear()
let errMsg
try {
const err = await tryGetErrorFromStream(stream)
Expand Down Expand Up @@ -389,9 +392,20 @@ function writeHashedStream(filePath, stream) {

function tryGetErrorFromStream(stream) {
return new Promise((resolve, reject) => {
miss.pipe(stream, miss.concat(parse), (err) => (err ? reject(err) : noop))
let receivedData = false

miss.pipe(stream, miss.concat(parse), (err) => {
if (err) {
reject(err)
} else if (!receivedData) {
// Resolve with null if no data was received, to let the caller
// know we couldn't parse the error.
resolve(null)
}
})

function parse(body) {
receivedData = true
try {
const parsed = JSON.parse(body.toString('utf8'))
resolve(parsed.message || parsed.error || null)
Expand Down
8 changes: 4 additions & 4 deletions src/export.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ async function exportDataset(opts) {
maxRetries: options.maxAssetRetries,
})

debug('Outputting assets (temporarily) to %s', tmpDir)
debug('Outputting to %s', options.outputPath === '-' ? 'stdout' : options.outputPath)
debug('Downloading assets (temporarily) to %s', tmpDir)
debug('Downloading to %s', options.outputPath === '-' ? 'stdout' : options.outputPath)

let outputStream
if (isWritableStream(options.outputPath)) {
Expand All @@ -77,13 +77,13 @@ async function exportDataset(opts) {

miss.finished(archive, async (archiveErr) => {
if (archiveErr) {
debug('Archiving errored! %s', archiveErr.stack)
debug('Archiving errored: %s', archiveErr.stack)
await cleanup()
reject(archiveErr)
return
}

debug('Archive finished!')
debug('Archive finished')
})

debug('Getting dataset export stream')
Expand Down

0 comments on commit 593e496

Please sign in to comment.