Skip to content

Commit

Permalink
Stream management to read sequentially
Browse files Browse the repository at this point in the history
  • Loading branch information
corrideat committed Mar 25, 2024
1 parent 21d4bda commit e3c5587
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions backend/database.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ if (!fs.existsSync(dataFolder)) {
// Streams stored contract log entries since the given entry hash (inclusive!).
sbp('sbp/selectors/register', {
'backend/db/streamEntriesAfter': async function (contractID: string, height: string, requestedLimit: ?number): Promise<*> {
const limit = Math.min(requestedLimit ?? Number.POSITIVE_INFINITY, process.env.MAX_EVENTS_BATCH_SIZE ?? 500)
const limit = Math.min(requestedLimit ?? Number.POSITIVE_INFINITY, process.env.MAX_EVENTS_BATCH_SIZE ?? 5)
const latestHEADinfo = await sbp('chelonia/db/latestHEADinfo', contractID)
if (!latestHEADinfo) {
throw Boom.notFound(`contractID ${contractID} doesn't exist!`)
Expand All @@ -49,29 +49,35 @@ sbp('sbp/selectors/register', {
let counter = 0
let currentHash = await sbp('chelonia/db/get', `_private_hidx=${contractID}#${height}`)
let prefix = '['
let ended = false
// NOTE: if this ever stops working you can also try Readable.from():
// https://nodejs.org/api/stream.html#stream_stream_readable_from_iterable_options
const stream = new Readable({
read (): void {
if (ended) return
if (currentHash && counter < limit) {
sbp('chelonia/db/getEntry', currentHash).then(async entry => {
if (entry) {
this.push(`${prefix}"${strToB64(entry.serialize())}"`)
const currentPrefix = prefix
prefix = ','
counter++
currentHash = await sbp('chelonia/db/get', `_private_hidx=${contractID}#${entry.height() + 1}`)
this.push(`${currentPrefix}"${strToB64(entry.serialize())}"`)
} else {
this.push(counter > 0 ? ']' : '[]')
this.push(null)
ended = true
}
}).catch(e => {
console.error(`[backend] streamEntriesAfter: read(): ${e.message}:`, e)
this.push(counter > 0 ? ']' : '[]')
this.push(null)
ended = true
})
} else {
this.push(counter > 0 ? ']' : '[]')
this.push(null)
ended = true
}
}
})
Expand Down

0 comments on commit e3c5587

Please sign in to comment.