Skip to content

Commit

Permalink
Convert eventsAfter to stream (missing: change usage in ChatMain)
Browse files Browse the repository at this point in the history
  • Loading branch information
corrideat committed Mar 6, 2024
1 parent 3e47421 commit 0dab835
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 52 deletions.
4 changes: 2 additions & 2 deletions backend/database.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ if (!fs.existsSync(dataFolder)) {
// Streams stored contract log entries since the given entry hash (inclusive!).
sbp('sbp/selectors/register', {
'backend/db/streamEntriesAfter': async function (contractID: string, hash: string, requestedLimit: number = MAX_EVENTS_AFTER): Promise<*> {
const limit = Math.min(requestedLimit, process.env.MAX_EVENTS_BATCH_SIZE ?? Number.POSITIVE_INFINITY)
const limit = Math.min(requestedLimit, process.env.MAX_EVENTS_BATCH_SIZE ?? 500)
const latestHEADinfo = await sbp('chelonia/db/latestHEADinfo', contractID)
if (!latestHEADinfo) {
throw Boom.notFound(`contractID ${contractID} doesn't exist!`)
Expand Down Expand Up @@ -85,7 +85,7 @@ sbp('sbp/selectors/register', {
return stream
},
'backend/db/streamEntriesBefore': async function (before: string, requestedLimit: number): Promise<*> {
let limit = Math.min(requestedLimit, process.env.MAX_EVENTS_BATCH_SIZE ?? Number.POSITIVE_INFINITY)
let limit = Math.min(requestedLimit, process.env.MAX_EVENTS_BATCH_SIZE ?? 500)
let prefix = '['
let currentHEAD = before
let entry = await sbp('chelonia/db/getEntry', currentHEAD)
Expand Down
151 changes: 113 additions & 38 deletions shared/domains/chelonia/chelonia.js
Original file line number Diff line number Diff line change
Expand Up @@ -820,47 +820,122 @@ export default (sbp('sbp/selectors/register', {
// TODO: r.body is a stream.Transform, should we use a callback to process
// the events one-by-one instead of converting to giant json object?
// however, note if we do that they would be processed in reverse...
'chelonia/out/eventsAfter': async function (contractID: string, since: string, limit?: number) {
const aggregateEvents = []
let remainingEvents = limit ?? Number.POSITIVE_INFINITY
for (;;) {
const requestLimit = Math.min(limit ?? MAX_EVENTS_AFTER, remainingEvents)
'chelonia/out/eventsAfter': function (contractID: string, since: string, limit?: number) {
const fetchEventsStreamReader = async () => {
requestLimit = Math.min(limit ?? MAX_EVENTS_AFTER, remainingEvents)
const eventsResponse = await fetch(`${this.config.connectionURL}/eventsAfter/${contractID}/${since}/${requestLimit}`, { signal: this.abortController.signal })
const events = await handleFetchResult('json')(eventsResponse)
// Sanity check
if (!Array.isArray(events)) throw new Error('Invalid response type')
if (events.length > requestLimit) {
throw new Error('Received too many events')
}
if (GIMessage.deserializeHEAD(b64ToStr(events[0])).hash !== since) {
throw new Error('hash() !== since')
}
// Avoid duplicating the intermediate event in the result.
if (aggregateEvents.length) {
events.unshift()
}
remainingEvents -= events.length
// Because the number of events could potentially be quite large and hit
// stack limits (or limits to how many arguments can be passed), if the
// number of events is larger than 8192, then split it into smaller
// chunks.
if (events.length <= 8192) {
aggregateEvents.push(...events.map(b64ToStr))
} else {
while (events.length) {
aggregateEvents.push(...events.splice(0, 8192).map(b64ToStr))
if (!eventsResponse.ok) throw new Error('Unexpected status code')
if (!eventsResponse.body) throw new Error('Missing body')
latestHEAD = eventsResponse.headers.get('shelter-headinfo-head')
requestCount++
// $FlowFixMe[incompatible-use]
return eventsResponse.body.getReader()
}
let requestCount = 0
let remainingEvents = limit ?? Number.POSITIVE_INFINITY
let eventsStreamReader
let latestHEAD
let state: 'fetch' | 'read-eos' | 'read-new-response' | 'read' | 'events' = 'fetch'
let requestLimit: number
let count: number
let buffer: string = ''
let currentEvent: string
return new ReadableStream({
async pull (controller) {
for (;;) {
switch (state) {
case 'fetch': {
eventsStreamReader = await fetchEventsStreamReader()
state = 'read-new-response'
count = 0
break
}
case 'read-eos':
case 'read-new-response':
case 'read': {
const { done, value } = await eventsStreamReader.read()
if (done) {
if (remainingEvents === 0 || since === latestHEAD) {
controller.close()
} else {
controller.error(new Error('Invalid response: done too early'))
}
return
}
if (!value) {
controller.error(new Error('Invalid response: missing body'))
return
}
buffer = buffer + Buffer.from(value).toString().trim()
// If there was only whitespace, try reading again
if (!buffer) break
if (state === 'read-new-response') {
// Response is in JSON format, so we look for the start of an
// array (`[`)
if (buffer[0] !== '[') {
console.error('@@no array start delimiter', buffer)
controller.error(new Error('Invalid response: no array start delimiter'))
return
}
buffer = buffer.slice(1)
}
state = 'events'
break
}
case 'events': {
// Find where the current event ends
const nextIdx = buffer.search(/(?<=\s*)[,\]]/)
// The current event isn't finished yet, so we try to read
// more data from the server
if (nextIdx < 0) {
state = 'read'
break
}
try {
const eventValue = buffer.slice(0, nextIdx).trim()
if (eventValue) {
if (count === requestLimit) {
controller.error(new Error('Received too many events'))
return
}
currentEvent = b64ToStr(JSON.parse(eventValue))
if (count === 0) {
const hash = GIMessage.deserializeHEAD(currentEvent).hash
if (hash !== since) {
controller.error(new Error('hash() !== since'))
return
}
}
buffer = buffer.slice(nextIdx + 1).trimStart()
count++
if (count++ === 0 && requestCount === 0) {
break
}
controller.enqueue(currentEvent)
remainingEvents--
break
}
// The response stream is finished (no more events to read)
if (buffer[nextIdx] === ']') {
if (currentEvent) {
since = GIMessage.deserializeHEAD(currentEvent).hash
}
state = 'read-eos'
} else {
controller.error(new Error('Missing end delimiter'))
return
}
} catch (e) {
console.error('[chelonia] Error during event parsing', e)
controller.error(e)
return
}
break
}
}
}
}
const latest = GIMessage.deserializeHEAD(aggregateEvents[aggregateEvents.length - 1]).hash
// If there are no remaining events, or if we've received the latest event,
// break out of the loop
if (
remainingEvents === 0 ||
latest === eventsResponse.headers.get('shelter-headinfo-head')
) break
since = latest
}
return aggregateEvents
})
},
'chelonia/out/latestHEADInfo': function (contractID: string) {
return fetch(`${this.config.connectionURL}/latestHEADinfo/${contractID}`, {
Expand Down
27 changes: 15 additions & 12 deletions shared/domains/chelonia/internals.js
Original file line number Diff line number Diff line change
Expand Up @@ -1125,26 +1125,29 @@ export default (sbp('sbp/selectors/register', {
if (latest !== recent) {
console.debug(`[chelonia] Synchronizing Contract ${contractID}: our recent was ${recent || 'undefined'} but the latest is ${latest}`)
// TODO: fetch events from localStorage instead of server if we have them
const events = await sbp('chelonia/out/eventsAfter', contractID, recent || contractID)
const eventsStream = sbp('chelonia/out/eventsAfter', contractID, recent || contractID)
// Sanity check: verify event with latest hash exists in list of events
// TODO: using findLastIndex, it will be more clean but it needs Cypress 9.7+ which has bad performance
// https://docs.cypress.io/guides/references/changelog#9-7-0
// https://github.com/cypress-io/cypress/issues/22868
let latestHashFound = false
for (let i = events.length - 1; i >= 0; i--) {
if (GIMessage.deserializeHEAD(events[i]).hash === latest) {
latestHashFound = true
// state.contracts[contractID] && events.shift()
const eventReader = eventsStream.getReader()
// remove the first element in cases where we are not getting the contract for the first time
for (let skip = !!state.contracts[contractID]; ; skip = false) {
const { done, value: event } = await eventReader.read()
if (done) {
if (!latestHashFound) {
throw new ChelErrorUnrecoverable(`expected hash ${latest} in list of events for contract ${contractID}`)
}
break
}
}
if (!latestHashFound) {
throw new ChelErrorUnrecoverable(`expected hash ${latest} in list of events for contract ${contractID}`)
}
// remove the first element in cases where we are not getting the contract for the first time
state.contracts[contractID] && events.shift()
for (let i = 0; i < events.length; i++) {
if (!latestHashFound) {
latestHashFound = GIMessage.deserializeHEAD(event).hash === latest
}
if (skip) continue
// this must be called directly, instead of via enqueueHandleEvent
await sbp('chelonia/private/in/handleEvent', contractID, events[i])
await sbp('chelonia/private/in/handleEvent', contractID, event)
}
} else if (!isSubcribed) {
this.subscriptionSet.add(contractID)
Expand Down

0 comments on commit 0dab835

Please sign in to comment.