From 0dab8350b69a706f4d2ce8fbbc85688f910f2982 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Ricardo=20Iv=C3=A1n=20Vieitez=20Parra?=
 <3857362+corrideat@users.noreply.github.com>
Date: Wed, 6 Mar 2024 12:08:07 +0100
Subject: [PATCH] Convert eventsAfter to stream (missing: change usage in
 ChatMain)

---
 backend/database.js                  |   4 +-
 shared/domains/chelonia/chelonia.js  | 151 ++++++++++++++++++++-------
 shared/domains/chelonia/internals.js |  27 ++---
 3 files changed, 130 insertions(+), 52 deletions(-)

diff --git a/backend/database.js b/backend/database.js
index 5a237e92d3..69789ce4f0 100644
--- a/backend/database.js
+++ b/backend/database.js
@@ -42,7 +42,7 @@ if (!fs.existsSync(dataFolder)) {
 // Streams stored contract log entries since the given entry hash (inclusive!).
 sbp('sbp/selectors/register', {
   'backend/db/streamEntriesAfter': async function (contractID: string, hash: string, requestedLimit: number = MAX_EVENTS_AFTER): Promise<*> {
-    const limit = Math.min(requestedLimit, process.env.MAX_EVENTS_BATCH_SIZE ?? Number.POSITIVE_INFINITY)
+    const limit = Math.min(requestedLimit, process.env.MAX_EVENTS_BATCH_SIZE ?? 500)
     const latestHEADinfo = await sbp('chelonia/db/latestHEADinfo', contractID)
     if (!latestHEADinfo) {
       throw Boom.notFound(`contractID ${contractID} doesn't exist!`)
@@ -85,7 +85,7 @@ sbp('sbp/selectors/register', {
     return stream
   },
   'backend/db/streamEntriesBefore': async function (before: string, requestedLimit: number): Promise<*> {
-    let limit = Math.min(requestedLimit, process.env.MAX_EVENTS_BATCH_SIZE ?? Number.POSITIVE_INFINITY)
+    let limit = Math.min(requestedLimit, process.env.MAX_EVENTS_BATCH_SIZE ?? 500)
     let prefix = '['
     let currentHEAD = before
     let entry = await sbp('chelonia/db/getEntry', currentHEAD)
diff --git a/shared/domains/chelonia/chelonia.js b/shared/domains/chelonia/chelonia.js
index d729c2f56a..7a1868d201 100644
--- a/shared/domains/chelonia/chelonia.js
+++ b/shared/domains/chelonia/chelonia.js
@@ -820,47 +820,122 @@ export default (sbp('sbp/selectors/register', {
   // TODO: r.body is a stream.Transform, should we use a callback to process
   //       the events one-by-one instead of converting to giant json object?
   //       however, note if we do that they would be processed in reverse...
-  'chelonia/out/eventsAfter': async function (contractID: string, since: string, limit?: number) {
-    const aggregateEvents = []
-    let remainingEvents = limit ?? Number.POSITIVE_INFINITY
-    for (;;) {
-      const requestLimit = Math.min(limit ?? MAX_EVENTS_AFTER, remainingEvents)
+  'chelonia/out/eventsAfter': function (contractID: string, since: string, limit?: number) {
+    const fetchEventsStreamReader = async () => {
+      requestLimit = Math.min(limit ?? MAX_EVENTS_AFTER, remainingEvents)
       const eventsResponse = await fetch(`${this.config.connectionURL}/eventsAfter/${contractID}/${since}/${requestLimit}`, { signal: this.abortController.signal })
-      const events = await handleFetchResult('json')(eventsResponse)
-      // Sanity check
-      if (!Array.isArray(events)) throw new Error('Invalid response type')
-      if (events.length > requestLimit) {
-        throw new Error('Received too many events')
-      }
-      if (GIMessage.deserializeHEAD(b64ToStr(events[0])).hash !== since) {
-        throw new Error('hash() !== since')
-      }
-      // Avoid duplicating the intermediate event in the result.
-      if (aggregateEvents.length) {
-        events.unshift()
-      }
-      remainingEvents -= events.length
-      // Because the number of events could potentially be quite large and hit
-      // stack limits (or limits to how many arguments can be passed), if the
-      // number of events is larger than 8192, then split it into smaller
-      // chunks.
-      if (events.length <= 8192) {
-        aggregateEvents.push(...events.map(b64ToStr))
-      } else {
-        while (events.length) {
-          aggregateEvents.push(...events.splice(0, 8192).map(b64ToStr))
+      if (!eventsResponse.ok) throw new Error('Unexpected status code')
+      if (!eventsResponse.body) throw new Error('Missing body')
+      latestHEAD = eventsResponse.headers.get('shelter-headinfo-head')
+      requestCount++
+      // $FlowFixMe[incompatible-use]
+      return eventsResponse.body.getReader()
+    }
+    let requestCount = 0
+    let remainingEvents = limit ?? Number.POSITIVE_INFINITY
+    let eventsStreamReader
+    let latestHEAD
+    let state: 'fetch' | 'read-eos' | 'read-new-response' | 'read' | 'events' = 'fetch'
+    let requestLimit: number
+    let count: number
+    let buffer: string = ''
+    let currentEvent: string
+    return new ReadableStream({
+      async pull (controller) {
+        for (;;) {
+          switch (state) {
+            case 'fetch': {
+              eventsStreamReader = await fetchEventsStreamReader()
+              state = 'read-new-response'
+              count = 0
+              break
+            }
+            case 'read-eos':
+            case 'read-new-response':
+            case 'read': {
+              const { done, value } = await eventsStreamReader.read()
+              if (done) {
+                if (remainingEvents === 0 || since === latestHEAD) {
+                  controller.close()
+                } else {
+                  controller.error(new Error('Invalid response: done too early'))
+                }
+                return
+              }
+              if (!value) {
+                controller.error(new Error('Invalid response: missing body'))
+                return
+              }
+              buffer = buffer + Buffer.from(value).toString().trim()
+              // If there was only whitespace, try reading again
+              if (!buffer) break
+              if (state === 'read-new-response') {
+                // Response is in JSON format, so we look for the start of an
+                // array (`[`)
+                if (buffer[0] !== '[') {
+                  console.error('@@no array start delimiter', buffer)
+                  controller.error(new Error('Invalid response: no array start delimiter'))
+                  return
+                }
+                buffer = buffer.slice(1)
+              }
+              state = 'events'
+              break
+            }
+            case 'events': {
+              // Find where the current event ends
+              const nextIdx = buffer.search(/(?<=\s*)[,\]]/)
+              // The current event isn't finished yet, so we try to read
+              // more data from the server
+              if (nextIdx < 0) {
+                state = 'read'
+                break
+              }
+              try {
+                const eventValue = buffer.slice(0, nextIdx).trim()
+                if (eventValue) {
+                  if (count === requestLimit) {
+                    controller.error(new Error('Received too many events'))
+                    return
+                  }
+                  currentEvent = b64ToStr(JSON.parse(eventValue))
+                  if (count === 0) {
+                    const hash = GIMessage.deserializeHEAD(currentEvent).hash
+                    if (hash !== since) {
+                      controller.error(new Error('hash() !== since'))
+                      return
+                    }
+                  }
+                  buffer = buffer.slice(nextIdx + 1).trimStart()
+                  count++
+                  if (count++ === 0 && requestCount === 0) {
+                    break
+                  }
+                  controller.enqueue(currentEvent)
+                  remainingEvents--
+                  break
+                }
+                // The response stream is finished (no more events to read)
+                if (buffer[nextIdx] === ']') {
+                  if (currentEvent) {
+                    since = GIMessage.deserializeHEAD(currentEvent).hash
+                  }
+                  state = 'read-eos'
+                } else {
+                  controller.error(new Error('Missing end delimiter'))
+                  return
+                }
+              } catch (e) {
+                console.error('[chelonia] Error during event parsing', e)
+                controller.error(e)
+                return
+              }
+              break
+            }
+          }
         }
       }
-      const latest = GIMessage.deserializeHEAD(aggregateEvents[aggregateEvents.length - 1]).hash
-      // If there are no remaining events, or if we've received the latest event,
-      // break out of the loop
-      if (
-        remainingEvents === 0 ||
-        latest === eventsResponse.headers.get('shelter-headinfo-head')
-      ) break
-      since = latest
-    }
-    return aggregateEvents
+    })
   },
   'chelonia/out/latestHEADInfo': function (contractID: string) {
     return fetch(`${this.config.connectionURL}/latestHEADinfo/${contractID}`, {
diff --git a/shared/domains/chelonia/internals.js b/shared/domains/chelonia/internals.js
index 48fa6953bc..47c137bbea 100644
--- a/shared/domains/chelonia/internals.js
+++ b/shared/domains/chelonia/internals.js
@@ -1125,26 +1125,29 @@ export default (sbp('sbp/selectors/register', {
       if (latest !== recent) {
         console.debug(`[chelonia] Synchronizing Contract ${contractID}: our recent was ${recent || 'undefined'} but the latest is ${latest}`)
         // TODO: fetch events from localStorage instead of server if we have them
-        const events = await sbp('chelonia/out/eventsAfter', contractID, recent || contractID)
+        const eventsStream = sbp('chelonia/out/eventsAfter', contractID, recent || contractID)
         // Sanity check: verify event with latest hash exists in list of events
         // TODO: using findLastIndex, it will be more clean but it needs Cypress 9.7+ which has bad performance
         //       https://docs.cypress.io/guides/references/changelog#9-7-0
         //       https://github.com/cypress-io/cypress/issues/22868
         let latestHashFound = false
-        for (let i = events.length - 1; i >= 0; i--) {
-          if (GIMessage.deserializeHEAD(events[i]).hash === latest) {
-            latestHashFound = true
+        // state.contracts[contractID] && events.shift()
+        const eventReader = eventsStream.getReader()
+        // remove the first element in cases where we are not getting the contract for the first time
+        for (let skip = !!state.contracts[contractID]; ; skip = false) {
+          const { done, value: event } = await eventReader.read()
+          if (done) {
+            if (!latestHashFound) {
+              throw new ChelErrorUnrecoverable(`expected hash ${latest} in list of events for contract ${contractID}`)
+            }
             break
           }
-        }
-        if (!latestHashFound) {
-          throw new ChelErrorUnrecoverable(`expected hash ${latest} in list of events for contract ${contractID}`)
-        }
-        // remove the first element in cases where we are not getting the contract for the first time
-        state.contracts[contractID] && events.shift()
-        for (let i = 0; i < events.length; i++) {
+          if (!latestHashFound) {
+            latestHashFound = GIMessage.deserializeHEAD(event).hash === latest
+          }
+          if (skip) continue
           // this must be called directly, instead of via enqueueHandleEvent
-          await sbp('chelonia/private/in/handleEvent', contractID, events[i])
+          await sbp('chelonia/private/in/handleEvent', contractID, event)
         }
       } else if (!isSubcribed) {
         this.subscriptionSet.add(contractID)