From c95fe6597fd470690830ced24efb88ae83c7c8f3 Mon Sep 17 00:00:00 2001 From: Duddino Date: Tue, 19 Nov 2024 13:49:25 +0100 Subject: [PATCH] Add Reader class and refactor shield sync (#471) * Add Reader class and refactor shield sync * Comment --------- Co-authored-by: Alessandro Rezzi --- scripts/reader.js | 98 ++++++++++++++++++++++++++++ scripts/wallet.js | 106 +++++++++++++------------------ tests/unit/reader.spec.js | 130 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 272 insertions(+), 62 deletions(-) create mode 100644 scripts/reader.js create mode 100644 tests/unit/reader.spec.js diff --git a/scripts/reader.js b/scripts/reader.js new file mode 100644 index 000000000..65623b49a --- /dev/null +++ b/scripts/reader.js @@ -0,0 +1,98 @@ +export class Reader { + #i = 0; + #maxBytes = 0; + #availableBytes; + #done = false; + /** + * @type{()=>{} | null} Called when bytes are available. + * There can't be more than 1 awaiter + */ + #awaiter = null; + + /** + * @returns {number} Content length if available, or an estimante + */ + get contentLength() { + return this.#availableBytes.length; + } + + /** + * @returns {number} Number or bytes read + */ + get readBytes() { + return this.#i; + } + /** + * @param + */ + constructor(req) { + this.#availableBytes = new Uint8Array( + req.headers?.get('Content-Length') || 1024 + ); + const stream = req.body.getReader(); + (async () => { + while (true) { + const { done, value } = await stream.read(); + if (value) { + this.#appendBytes(value); + } + if (done) { + this.#done = true; + break; + } + } + })(); + } + + #resizeArray(newLength) { + if (newLength <= this.#availableBytes.length) { + throw new Error( + 'New length must be greater than the current length.' + ); + } + + const newArray = new Uint8Array(newLength); + newArray.set(this.#availableBytes); + this.#availableBytes = newArray; + } + + #appendBytes(bytes) { + // If we have content-length, there should never be a need to + // resize + if (bytes.length + this.#maxBytes > this.#availableBytes.length) { + this.#resizeArray((bytes.length + this.#maxBytes) * 2); + } + + this.#availableBytes.set(bytes, this.#maxBytes); + this.#maxBytes += bytes.length; + // Notify the awaiter if there is one + if (this.#awaiter) this.#awaiter(); + } + + /** + * @param{number} byteLength + * @returns {Promise} bytes or null if there are no more bytes + */ + async read(byteLength) { + if (this.#awaiter) throw new Error('Called read more than once'); + while (true) { + if (this.#maxBytes - this.#i >= byteLength) { + this.#awaiter = null; + // We have enough bytes to respond + const res = this.#availableBytes.subarray( + this.#i, + this.#i + byteLength + ); + this.#i += byteLength; + return res; + } + + // There are no more bytes to await, so we can return null + if (this.#done) return null; + // If we didn't respond, wait for the next batch of bytes, then try again + await new Promise((res) => { + this.#awaiter = res; + }); + } + } +} diff --git a/scripts/wallet.js b/scripts/wallet.js index 1fd5ef567..aced92f58 100644 --- a/scripts/wallet.js +++ b/scripts/wallet.js @@ -1,4 +1,5 @@ import { validateMnemonic } from 'bip39'; +import { Reader } from './reader.js'; import { decrypt } from './aes-gcm.js'; import { bytesToNum, parseWIF } from './encoding.js'; import { beforeUnloadListener, blockCount } from './global.js'; @@ -787,100 +788,81 @@ export class Wallet { wallet.#shield.getLastSyncedBlock() + 1 ); if (!req.ok) throw new Error("Couldn't sync shield"); - const reader = req.body.getReader(); + const reader = new Reader(req); /** @type{string[]} Array of txs in the current block */ let txs = []; - let processedBytes = 0; - const length = req.headers.get('Content-Length'); + const length = reader.contentLength; /** @type {Uint8Array} Array of bytes that we are processing **/ - const processing = new Uint8Array(length); getEventEmitter().emit( 'shield-sync-status-update', 0, length, false ); - let i = 0; - let max = 0; - while (true) { - /** - * @type {{done: boolean, value: Uint8Array?}} - */ - const { done, value } = await reader.read(); - /** - * Array of blocks ready to pass to the shield library - * @type {{txs: string[]; height: number; time: number}[]} - */ - const blocksArray = []; - - if (value) { - // Append received bytes in the processing array - processing.set(value, max); - max += value.length; - processedBytes += value.length; - // Loop until we have less than 4 bytes (length) - while (max - i >= 4) { - const length = Number( - bytesToNum(processing.subarray(i, i + 4)) - ); - // If we have less bytes than the length break and wait for the next - // batch of bytes - if (max - i < length) break; - - i += 4; - const bytes = processing.subarray(i, length + i); - i += length; - // 0x5d rapresents the block - if (bytes[0] === 0x5d) { - const height = Number( - bytesToNum(bytes.slice(1, 5)) - ); - const time = Number(bytesToNum(bytes.slice(5, 9))); - - blocksArray.push({ txs, height, time }); - txs = []; - } else if (bytes[0] === 0x03) { - // 0x03 is the tx version. We should only get v3 transactions - const hex = bytesToHex(bytes); - txs.push({ - hex, - txid: Transaction.getTxidFromHex(hex), - }); - } else { - // This is neither a block or a tx. - throw new Error('Failed to parse shield binary'); - } - } - } + /** + * Array of blocks ready to pass to the shield library + * @type {{txs: string[]; height: number; time: number}[]} + */ + let blocksArray = []; + const handleAllBlocks = async () => { // Process the current batch of blocks before starting to parse the next one if (blocksArray.length) { await this.#shield.handleBlocks(blocksArray); } + blocksArray = []; // Emit status update getEventEmitter().emit( 'shield-sync-status-update', - processedBytes, + reader.readBytes, length, false ); - if (done) break; + }; + while (true) { + const packetLengthBytes = await reader.read(4); + if (!packetLengthBytes) break; + const packetLength = Number(bytesToNum(packetLengthBytes)); + + const bytes = await reader.read(packetLength); + if (!bytes) throw new Error('Stream was cut short'); + if (bytes[0] === 0x5d) { + const height = Number(bytesToNum(bytes.slice(1, 5))); + const time = Number(bytesToNum(bytes.slice(5, 9))); + + blocksArray.push({ txs, height, time }); + txs = []; + } else if (bytes[0] === 0x03) { + // 0x03 is the tx version. We should only get v3 transactions + const hex = bytesToHex(bytes); + txs.push({ + hex, + txid: Transaction.getTxidFromHex(hex), + }); + } else { + // This is neither a block or a tx. + throw new Error('Failed to parse shield binary'); + } + if (blocksArray.length > 1000) { + await handleAllBlocks(); + } } - - getEventEmitter().emit('shield-sync-status-update', 0, 0, true); + await handleAllBlocks(); + // At this point it should be safe to assume that shield is ready to use + await this.saveShieldOnDisk(); } catch (e) { debugError(DebugTopics.WALLET, e); } - // At this point it should be safe to assume that shield is ready to use - await this.saveShieldOnDisk(); const networkSaplingRoot = ( await getNetwork().getBlock(this.#shield.getLastSyncedBlock()) ).finalsaplingroot; if (networkSaplingRoot) await this.#checkShieldSaplingRoot(networkSaplingRoot); this.#isSynced = true; + + getEventEmitter().emit('shield-sync-status-update', 0, 0, true); } /** diff --git a/tests/unit/reader.spec.js b/tests/unit/reader.spec.js new file mode 100644 index 000000000..ffec131c3 --- /dev/null +++ b/tests/unit/reader.spec.js @@ -0,0 +1,130 @@ +import { describe, it, expect } from 'vitest'; +import { Reader } from '../../scripts/reader.js'; + +function createMockStream(chunks, contentLength = null) { + let i = 0; + return { + headers: { + get: (key) => (key === 'Content-Length' ? contentLength : null), + }, + body: { + getReader: () => { + return { + read: async () => { + if (i < chunks.length) { + const value = chunks[i]; + i++; + return { done: false, value }; + } + return { done: true, value: null }; + }, + }; + }, + }, + }; +} + +describe('Reader without content length', () => { + it('should read bytes correctly when available', async () => { + const mockStream = createMockStream([ + new Uint8Array([1, 2, 3, 4]), + new Uint8Array([5, 6, 7, 8]), + ]); + + const reader = new Reader(mockStream); + + const result1 = await reader.read(4); + expect(result1).toEqual(new Uint8Array([1, 2, 3, 4])); + + const result2 = await reader.read(4); + expect(result2).toEqual(new Uint8Array([5, 6, 7, 8])); + + // Reads after the stream is done should yield null + expect(await reader.read(10)).toBe(null); + }); + + it('should wait for more bytes if not enough are available', async () => { + const mockStream = createMockStream([ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5, 6]), + ]); + + const reader = new Reader(mockStream); + + const result = await reader.read(6); + expect(result).toEqual(new Uint8Array([1, 2, 3, 4, 5, 6])); + // Reads after the stream is done should yield null + expect(await reader.read(1)).toBe(null); + }); + + it('should throw an error if read is called multiple times concurrently', async () => { + const mockStream = createMockStream([new Uint8Array([1, 2, 3])]); + + const reader = new Reader(mockStream); + + const read1 = reader.read(2); + const read2 = reader.read(2); + + await expect(read2).rejects.toThrow('Called read more than once'); + await expect(read1).resolves.toEqual(new Uint8Array([1, 2])); + }); + + it('should handle reading less than available bytes', async () => { + const mockStream = createMockStream([new Uint8Array([1, 2, 3, 4, 5])]); + + const reader = new Reader(mockStream); + + const result1 = await reader.read(3); + expect(result1).toEqual(new Uint8Array([1, 2, 3])); + + const result2 = await reader.read(2); + expect(result2).toEqual(new Uint8Array([4, 5])); + }); +}); + +describe('Reader with Content-Length', () => { + it('should initialize buffer size based on Content-Length header', async () => { + const mockStream = createMockStream([], 2048); + const reader = new Reader(mockStream); + + // Read some bytes to indirectly validate initialization + const readPromise = reader.read(0); // No bytes to read, but ensures no errors + await expect(readPromise).resolves.toEqual(new Uint8Array(0)); + }); + + it('should work if Content-Length is not set', async () => { + const mockStream = createMockStream([]); + const reader = new Reader(mockStream); + + // Read some bytes to validate no Content-Length doesn't break initialization + const readPromise = reader.read(0); + await expect(readPromise).resolves.toEqual(new Uint8Array(0)); + }); + + it('should handle reading bytes when Content-Length is specified', async () => { + const mockStream = createMockStream( + [new Uint8Array([1, 2, 3, 4])], + 2048 // Content-Length + ); + + const reader = new Reader(mockStream); + + const result = await reader.read(4); + expect(result).toEqual(new Uint8Array([1, 2, 3, 4])); + }); + + it('should resize the buffer if more bytes are received than Content-Length', async () => { + const mockStream = createMockStream( + [new Uint8Array([1, 2, 3, 4]), new Uint8Array([5, 6, 7, 8])], + 4 // Content-Length is smaller than total bytes received + ); + + const reader = new Reader(mockStream); + + const result1 = await reader.read(4); + expect(result1).toEqual(new Uint8Array([1, 2, 3, 4])); + + const result2 = await reader.read(4); + expect(result2).toEqual(new Uint8Array([5, 6, 7, 8])); + }); +});