Skip to content

Commit

Permalink
fix: handle multiple json docs inside one chunk when streaming with c…
Browse files Browse the repository at this point in the history
…ursor
  • Loading branch information
sgulseth committed Jul 22, 2024
1 parent 11e78c7 commit b17e562
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 40 deletions.
5 changes: 5 additions & 0 deletions src/filterSystemDocuments.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@ const miss = require('mississippi')
const debug = require('./debug')

const isSystemDocument = (doc) => doc && doc._id && doc._id.indexOf('_.') === 0
const isCursor = (doc) => doc && !doc._id && doc.nextCursor !== undefined

module.exports = () =>
miss.through.obj((doc, enc, callback) => {
if (isSystemDocument(doc)) {
debug('%s is a system document, skipping', doc && doc._id)
return callback()
}
if (isCursor(doc)) {
debug('%o is a cursor, skipping', doc)
return callback()
}

return callback(null, doc)
})
72 changes: 36 additions & 36 deletions src/getDocumentCursorStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,63 +4,63 @@ const pkg = require('../package.json')
const debug = require('./debug')
const requestStream = require('./requestStream')

// same regex as split2 is using by default: https://github.com/mcollina/split2/blob/53432f54bd5bf422bd55d91d38f898b6c9496fc1/index.js#L86
const splitRegex = /\r?\n/

module.exports = async (options) => {
let streamsInflight = 0
function decrementInflight(stream) {
streamsInflight--
if (streamsInflight === 0) {
stream.end()
}
}

const stream = new Transform({
async transform(chunk, encoding, callback) {
if (encoding !== 'buffer' && encoding !== 'string') {
callback(null, chunk)
return
}
this.push(chunk, encoding)

let parsedChunk = null
try {
const chunkStr = chunk.toString()
if (chunkStr.trim() !== '') {
parsedChunk = JSON.parse(chunkStr)
for (const chunkStr of chunk.toString().split(splitRegex)) {
if (chunkStr.trim() === '') {
continue
}
} catch (err) {
// Ignore JSON parse errors
// this can happen if the chunk is not a JSON object. We just pass it through and let the caller handle it.
debug('Failed to parse JSON chunk, ignoring', err, chunk.toString())
}

if (
parsedChunk !== null &&
typeof parsedChunk === 'object' &&
'nextCursor' in parsedChunk &&
typeof parsedChunk.nextCursor === 'string' &&
!('_id' in parsedChunk)
) {
debug('Got next cursor "%s", fetching next stream', parsedChunk.nextCursor)
streamsInflight++
try {
parsedChunk = JSON.parse(chunkStr)
} catch (err) {
// Ignore JSON parse errors
// this can happen if the chunk is not a JSON object. We just pass it through and let the caller handle it.
debug('Failed to parse JSON chunk, ignoring', err, chunkStr)
}

const reqStream = await startStream(options, parsedChunk.nextCursor)
reqStream.on('end', () => {
streamsInflight--
if (streamsInflight === 0) {
stream.end()
}
})
reqStream.pipe(this, {end: false})
if (
parsedChunk !== null &&
typeof parsedChunk === 'object' &&
'nextCursor' in parsedChunk &&
typeof parsedChunk.nextCursor === 'string' &&
!('_id' in parsedChunk)
) {
debug('Got next cursor "%s", fetching next stream', parsedChunk.nextCursor)
streamsInflight++

callback()
return
const reqStream = await startStream(options, parsedChunk.nextCursor)
reqStream.on('end', () => decrementInflight(this))
reqStream.pipe(this, {end: false})
}
}

callback(null, chunk)
callback()
},
})

streamsInflight++
const reqStream = await startStream(options, '')
reqStream.on('end', () => {
streamsInflight--
if (streamsInflight === 0) {
stream.end()
}
})

reqStream.on('end', () => decrementInflight(stream))
reqStream.pipe(stream, {end: false})
return stream
}
Expand Down
8 changes: 4 additions & 4 deletions test/export.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ describe('export', () => {
{
_id: 'third-but-not-the-last',
_type: 'article',
title: 'Hello again, world!',
title: 'Hello again, \r\nworld!',
},
{
_id: 'fourth-and-last',
Expand All @@ -550,6 +550,7 @@ describe('export', () => {
res.write(JSON.stringify(documents[0]))
res.write('\n')
res.write(JSON.stringify({nextCursor: 'cursor-1'}))
res.write('\n')
res.end()
return
}
Expand All @@ -558,21 +559,20 @@ describe('export', () => {
res.write(JSON.stringify(documents[1]))
res.write('\n')
res.write(JSON.stringify({nextCursor: 'cursor-2'}))
res.write('\n')
res.end()
return
}

case 'cursor-2': {
res.write(JSON.stringify(documents[2]))
res.write(`${JSON.stringify(documents[2])}\n${JSON.stringify({nextCursor: 'cursor-3'})}`)
res.write('\n')
res.write(JSON.stringify({nextCursor: 'cursor-3'}))
res.end()
return
}

case 'cursor-3': {
res.write(JSON.stringify(documents[3]))
res.write('\n')
res.end()
return
}
Expand Down

0 comments on commit b17e562

Please sign in to comment.