Skip to content

Commit

Permalink
feat: add support for exporting documents with "inconsistent" cursor
Browse files Browse the repository at this point in the history
  • Loading branch information
sgulseth committed Jul 2, 2024
1 parent edada48 commit 4294be0
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 2 deletions.
8 changes: 8 additions & 0 deletions src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,11 @@ exports.DOCUMENT_STREAM_DEBUG_INTERVAL = 10000
* @internal
*/
exports.REQUEST_READ_TIMEOUT = 3 * 60 * 1000 // 3 minutes

/**
What mode to use when exporting documents.
stream: Export all documents in the dataset in one request, this will be consistent but might be slow on large datasets.
cursor: Export documents using a cursor, this might lead to inconsistent results if a mutation is performed while exporting.
*/
exports.MODE_STREAM = 'stream'
exports.MODE_CURSOR = 'cursor'
16 changes: 14 additions & 2 deletions src/export.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ const filterDocumentTypes = require('./filterDocumentTypes')
const filterDrafts = require('./filterDrafts')
const filterSystemDocuments = require('./filterSystemDocuments')
const getDocumentsStream = require('./getDocumentsStream')
const getDocumentCursorStream = require('./getDocumentCursorStream')
const logFirstChunk = require('./logFirstChunk')
const rejectOnApiError = require('./rejectOnApiError')
const stringifyStream = require('./stringifyStream')
const tryParseJson = require('./tryParseJson')
const rimraf = require('./util/rimraf')
const validateOptions = require('./validateOptions')
const {DOCUMENT_STREAM_DEBUG_INTERVAL} = require('./constants')
const {DOCUMENT_STREAM_DEBUG_INTERVAL, MODE_CURSOR, MODE_STREAM} = require('./constants')

const noop = () => null

Expand Down Expand Up @@ -118,7 +119,7 @@ async function exportDataset(opts) {
cb(null, doc)
}

const inputStream = await getDocumentsStream(options)
const inputStream = await getDocumentInputStream(options)
debug('Got HTTP %d', inputStream.statusCode)
debug('Response headers: %o', inputStream.headers)

Expand Down Expand Up @@ -250,6 +251,17 @@ async function exportDataset(opts) {
return result
}

function getDocumentInputStream(options) {
if (options.mode === MODE_STREAM) {
return getDocumentsStream(options)
}
if (options.mode === MODE_CURSOR) {
return getDocumentCursorStream(options)
}

throw new Error(`Invalid mode: ${options.mode}`)
}

function isWritableStream(val) {
return (
val !== null &&
Expand Down
73 changes: 73 additions & 0 deletions src/getDocumentCursorStream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
const {Transform} = require('node:stream')

const pkg = require('../package.json')
const requestStream = require('./requestStream')

module.exports = async (options) => {
let streamsInflight = 0
const stream = new Transform({
async transform(chunk, encoding, callback) {
if (encoding !== 'buffer' && encoding !== 'string') {
callback(null, chunk)
return
}

let parsedChunk = null
try {
parsedChunk = JSON.parse(chunk.toString())
} catch (err) {
// Ignore JSON parse errors
// this can happen if the chunk is not a JSON object. We just pass it through and let the caller handle it.
}

if (
parsedChunk !== null &&
typeof parsedChunk === 'object' &&
'nextCursor' in parsedChunk &&
typeof parsedChunk.nextCursor === 'string' &&
!('_id' in parsedChunk)
) {
streamsInflight++

const reqStream = await startStream(options, parsedChunk.nextCursor)
reqStream.on('end', () => {
streamsInflight--
if (streamsInflight === 0) {
stream.end()
}
})
reqStream.pipe(this, {end: false})

callback()
return
}

callback(null, chunk)
},
})

streamsInflight++
const reqStream = await startStream(options, '')
reqStream.on('end', () => {
streamsInflight--
if (streamsInflight === 0) {
stream.end()
}
})

reqStream.pipe(stream, {end: false})
return stream
}

function startStream(options, nextCursor) {
const url = options.client.getUrl(
`/data/export/${options.dataset}?nextCursor=${encodeURIComponent(nextCursor)}`,
)
const token = options.client.config().token
const headers = {
'User-Agent': `${pkg.name}@${pkg.version}`,
...(token ? {Authorization: `Bearer ${token}`} : {}),
}

return requestStream({url, headers, maxRetries: options.maxRetries})
}
12 changes: 12 additions & 0 deletions src/validateOptions.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ const {
DOCUMENT_STREAM_MAX_RETRIES,
ASSET_DOWNLOAD_MAX_RETRIES,
REQUEST_READ_TIMEOUT,
MODE_STREAM,
MODE_CURSOR,
} = require('./constants')

const clientMethods = ['getUrl', 'config']
Expand All @@ -13,6 +15,7 @@ const exportDefaults = {
drafts: true,
assets: true,
raw: false,
mode: MODE_STREAM,
maxRetries: DOCUMENT_STREAM_MAX_RETRIES,
maxAssetRetries: ASSET_DOWNLOAD_MAX_RETRIES,
readTimeout: REQUEST_READ_TIMEOUT,
Expand All @@ -25,6 +28,15 @@ function validateOptions(opts) {
throw new Error(`options.dataset must be a valid dataset name`)
}

if (
typeof options.mode !== 'string' ||
(options.mode !== MODE_STREAM && options.mode !== MODE_CURSOR)
) {
throw new Error(
`options.mode must be either "${MODE_STREAM}" or "${MODE_CURSOR}", got "${options.mode}"`,
)
}

if (options.onProgress && typeof options.onProgress !== 'function') {
throw new Error(`options.onProgress must be a function`)
}
Expand Down
129 changes: 129 additions & 0 deletions test/export.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const {mkdir, rm} = require('fs/promises')
const {afterAll, describe, expect, test, afterEach} = require('@jest/globals')

const exportDataset = require('../src/export')
const {MODE_CURSOR} = require('../src/constants')
const {assertContents} = require('./helpers')

const OUTPUT_ROOT_DIR = joinPath(os.tmpdir(), 'sanity-export-tests')
Expand Down Expand Up @@ -509,4 +510,132 @@ describe('export', () => {
files: {},
})
})

test('export mode must be either cursor or stream', async () => {
const options = await getOptions({mode: 'murg'})
await expect(exportDataset(options)).rejects.toThrow(
'options.mode must be either "stream" or "cursor", got "murg"',
)
})

test('can export with cursor, multiple cursors', async () => {
const port = 43215
const documents = [
{
_id: 'first',
_type: 'article',
title: 'Hello, world!',
},
{
_id: 'second',
_type: 'article',
title: 'Goodbye, cruel world!',
},
{
_id: 'third-but-not-the-last',
_type: 'article',
title: 'Hello again, world!',
},
{
_id: 'fourth-and-last',
_type: 'article',
title: 'Goodbye again, cruel world!',
},
]
server = await getServer(port, (req, res) => {
res.writeHead(200, 'OK', {'Content-Type': 'application/x-ndjson'})
const url = new URL(req.url, `http://localhost:${port}`)
switch (url.searchParams.get('nextCursor')) {
case '': {
res.write(JSON.stringify(documents[0]))
res.write('\n')
res.write(JSON.stringify({nextCursor: 'cursor-1'}))
res.end()
return
}

case 'cursor-1': {
res.write(JSON.stringify(documents[1]))
res.write('\n')
res.write(JSON.stringify({nextCursor: 'cursor-2'}))
res.end()
return
}

case 'cursor-2': {
res.write(JSON.stringify(documents[2]))
res.write('\n')
res.write(JSON.stringify({nextCursor: 'cursor-3'}))
res.end()
return
}

case 'cursor-3': {
res.write(JSON.stringify(documents[3]))
res.write('\n')
res.end()
return
}

default: {
throw new Error(`Unexpected cursor: ${req.query.nextCursor}`)
}
}
})
const options = await getOptions({port, mode: MODE_CURSOR})
const result = await exportDataset(options)
expect(result).toMatchObject({
assetCount: 0,
documentCount: 4,
outputPath: /out\.tar\.gz$/,
})

await assertContents(result.outputPath, {
documents,
})
})
test('can export with cursor, no cursor', async () => {
const port = 43215
const documents = [
{
_id: 'first',
_type: 'article',
title: 'Hello, world!',
},
{
_id: 'second',
_type: 'article',
title: 'Goodbye, cruel world!',
},
{
_id: 'third-but-not-the-last',
_type: 'article',
title: 'Hello again, world!',
},
{
_id: 'fourth-and-last',
_type: 'article',
title: 'Goodbye again, cruel world!',
},
]
server = await getServer(port, (req, res) => {
res.writeHead(200, 'OK', {'Content-Type': 'application/x-ndjson'})
for (const document of documents) {
res.write(JSON.stringify(document))
res.write('\n')
}
res.end()
})
const options = await getOptions({port, mode: MODE_CURSOR})
const result = await exportDataset(options)
expect(result).toMatchObject({
assetCount: 0,
documentCount: 4,
outputPath: /out\.tar\.gz$/,
})

await assertContents(result.outputPath, {
documents,
})
})
})

0 comments on commit 4294be0

Please sign in to comment.