Skip to content

Commit

Permalink
Add Reader class and refactor shield sync (#471)
Browse files Browse the repository at this point in the history
* Add Reader class and refactor shield sync

* Comment

---------

Co-authored-by: Alessandro Rezzi <[email protected]>
  • Loading branch information
Duddino and panleone authored Nov 19, 2024
1 parent d60c69b commit c95fe65
Show file tree
Hide file tree
Showing 3 changed files with 272 additions and 62 deletions.
98 changes: 98 additions & 0 deletions scripts/reader.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
export class Reader {
#i = 0;
#maxBytes = 0;
#availableBytes;
#done = false;
/**
* @type{()=>{} | null} Called when bytes are available.
* There can't be more than 1 awaiter
*/
#awaiter = null;

/**
* @returns {number} Content length if available, or an estimante
*/
get contentLength() {
return this.#availableBytes.length;
}

/**
* @returns {number} Number or bytes read
*/
get readBytes() {
return this.#i;
}
/**
* @param
*/
constructor(req) {
this.#availableBytes = new Uint8Array(
req.headers?.get('Content-Length') || 1024
);
const stream = req.body.getReader();
(async () => {
while (true) {
const { done, value } = await stream.read();
if (value) {
this.#appendBytes(value);
}
if (done) {
this.#done = true;
break;
}
}
})();
}

#resizeArray(newLength) {
if (newLength <= this.#availableBytes.length) {
throw new Error(
'New length must be greater than the current length.'
);
}

const newArray = new Uint8Array(newLength);
newArray.set(this.#availableBytes);
this.#availableBytes = newArray;
}

#appendBytes(bytes) {
// If we have content-length, there should never be a need to
// resize
if (bytes.length + this.#maxBytes > this.#availableBytes.length) {
this.#resizeArray((bytes.length + this.#maxBytes) * 2);
}

this.#availableBytes.set(bytes, this.#maxBytes);
this.#maxBytes += bytes.length;
// Notify the awaiter if there is one
if (this.#awaiter) this.#awaiter();
}

/**
* @param{number} byteLength
* @returns {Promise<Uint8Array | null>} bytes or null if there are no more bytes
*/
async read(byteLength) {
if (this.#awaiter) throw new Error('Called read more than once');
while (true) {
if (this.#maxBytes - this.#i >= byteLength) {
this.#awaiter = null;
// We have enough bytes to respond
const res = this.#availableBytes.subarray(
this.#i,
this.#i + byteLength
);
this.#i += byteLength;
return res;
}

// There are no more bytes to await, so we can return null
if (this.#done) return null;
// If we didn't respond, wait for the next batch of bytes, then try again
await new Promise((res) => {
this.#awaiter = res;
});
}
}
}
106 changes: 44 additions & 62 deletions scripts/wallet.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { validateMnemonic } from 'bip39';
import { Reader } from './reader.js';
import { decrypt } from './aes-gcm.js';
import { bytesToNum, parseWIF } from './encoding.js';
import { beforeUnloadListener, blockCount } from './global.js';
Expand Down Expand Up @@ -787,100 +788,81 @@ export class Wallet {
wallet.#shield.getLastSyncedBlock() + 1
);
if (!req.ok) throw new Error("Couldn't sync shield");
const reader = req.body.getReader();
const reader = new Reader(req);

/** @type{string[]} Array of txs in the current block */
let txs = [];
let processedBytes = 0;
const length = req.headers.get('Content-Length');
const length = reader.contentLength;
/** @type {Uint8Array} Array of bytes that we are processing **/
const processing = new Uint8Array(length);
getEventEmitter().emit(
'shield-sync-status-update',
0,
length,
false
);
let i = 0;
let max = 0;
while (true) {
/**
* @type {{done: boolean, value: Uint8Array?}}
*/
const { done, value } = await reader.read();
/**
* Array of blocks ready to pass to the shield library
* @type {{txs: string[]; height: number; time: number}[]}
*/
const blocksArray = [];

if (value) {
// Append received bytes in the processing array
processing.set(value, max);
max += value.length;
processedBytes += value.length;
// Loop until we have less than 4 bytes (length)
while (max - i >= 4) {
const length = Number(
bytesToNum(processing.subarray(i, i + 4))
);
// If we have less bytes than the length break and wait for the next
// batch of bytes
if (max - i < length) break;

i += 4;
const bytes = processing.subarray(i, length + i);
i += length;
// 0x5d rapresents the block
if (bytes[0] === 0x5d) {
const height = Number(
bytesToNum(bytes.slice(1, 5))
);
const time = Number(bytesToNum(bytes.slice(5, 9)));

blocksArray.push({ txs, height, time });
txs = [];
} else if (bytes[0] === 0x03) {
// 0x03 is the tx version. We should only get v3 transactions
const hex = bytesToHex(bytes);
txs.push({
hex,
txid: Transaction.getTxidFromHex(hex),
});
} else {
// This is neither a block or a tx.
throw new Error('Failed to parse shield binary');
}
}
}

/**
* Array of blocks ready to pass to the shield library
* @type {{txs: string[]; height: number; time: number}[]}
*/
let blocksArray = [];
const handleAllBlocks = async () => {
// Process the current batch of blocks before starting to parse the next one
if (blocksArray.length) {
await this.#shield.handleBlocks(blocksArray);
}
blocksArray = [];
// Emit status update
getEventEmitter().emit(
'shield-sync-status-update',
processedBytes,
reader.readBytes,
length,
false
);
if (done) break;
};
while (true) {
const packetLengthBytes = await reader.read(4);
if (!packetLengthBytes) break;
const packetLength = Number(bytesToNum(packetLengthBytes));

const bytes = await reader.read(packetLength);
if (!bytes) throw new Error('Stream was cut short');
if (bytes[0] === 0x5d) {
const height = Number(bytesToNum(bytes.slice(1, 5)));
const time = Number(bytesToNum(bytes.slice(5, 9)));

blocksArray.push({ txs, height, time });
txs = [];
} else if (bytes[0] === 0x03) {
// 0x03 is the tx version. We should only get v3 transactions
const hex = bytesToHex(bytes);
txs.push({
hex,
txid: Transaction.getTxidFromHex(hex),
});
} else {
// This is neither a block or a tx.
throw new Error('Failed to parse shield binary');
}
if (blocksArray.length > 1000) {
await handleAllBlocks();
}
}

getEventEmitter().emit('shield-sync-status-update', 0, 0, true);
await handleAllBlocks();
// At this point it should be safe to assume that shield is ready to use
await this.saveShieldOnDisk();
} catch (e) {
debugError(DebugTopics.WALLET, e);
}

// At this point it should be safe to assume that shield is ready to use
await this.saveShieldOnDisk();
const networkSaplingRoot = (
await getNetwork().getBlock(this.#shield.getLastSyncedBlock())
).finalsaplingroot;
if (networkSaplingRoot)
await this.#checkShieldSaplingRoot(networkSaplingRoot);
this.#isSynced = true;

getEventEmitter().emit('shield-sync-status-update', 0, 0, true);
}

/**
Expand Down
130 changes: 130 additions & 0 deletions tests/unit/reader.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import { describe, it, expect } from 'vitest';
import { Reader } from '../../scripts/reader.js';

function createMockStream(chunks, contentLength = null) {
let i = 0;
return {
headers: {
get: (key) => (key === 'Content-Length' ? contentLength : null),
},
body: {
getReader: () => {
return {
read: async () => {
if (i < chunks.length) {
const value = chunks[i];
i++;
return { done: false, value };
}
return { done: true, value: null };
},
};
},
},
};
}

describe('Reader without content length', () => {
it('should read bytes correctly when available', async () => {
const mockStream = createMockStream([
new Uint8Array([1, 2, 3, 4]),
new Uint8Array([5, 6, 7, 8]),
]);

const reader = new Reader(mockStream);

const result1 = await reader.read(4);
expect(result1).toEqual(new Uint8Array([1, 2, 3, 4]));

const result2 = await reader.read(4);
expect(result2).toEqual(new Uint8Array([5, 6, 7, 8]));

// Reads after the stream is done should yield null
expect(await reader.read(10)).toBe(null);
});

it('should wait for more bytes if not enough are available', async () => {
const mockStream = createMockStream([
new Uint8Array([1, 2, 3]),
new Uint8Array([4, 5, 6]),
]);

const reader = new Reader(mockStream);

const result = await reader.read(6);
expect(result).toEqual(new Uint8Array([1, 2, 3, 4, 5, 6]));
// Reads after the stream is done should yield null
expect(await reader.read(1)).toBe(null);
});

it('should throw an error if read is called multiple times concurrently', async () => {
const mockStream = createMockStream([new Uint8Array([1, 2, 3])]);

const reader = new Reader(mockStream);

const read1 = reader.read(2);
const read2 = reader.read(2);

await expect(read2).rejects.toThrow('Called read more than once');
await expect(read1).resolves.toEqual(new Uint8Array([1, 2]));
});

it('should handle reading less than available bytes', async () => {
const mockStream = createMockStream([new Uint8Array([1, 2, 3, 4, 5])]);

const reader = new Reader(mockStream);

const result1 = await reader.read(3);
expect(result1).toEqual(new Uint8Array([1, 2, 3]));

const result2 = await reader.read(2);
expect(result2).toEqual(new Uint8Array([4, 5]));
});
});

describe('Reader with Content-Length', () => {
it('should initialize buffer size based on Content-Length header', async () => {
const mockStream = createMockStream([], 2048);
const reader = new Reader(mockStream);

// Read some bytes to indirectly validate initialization
const readPromise = reader.read(0); // No bytes to read, but ensures no errors
await expect(readPromise).resolves.toEqual(new Uint8Array(0));
});

it('should work if Content-Length is not set', async () => {
const mockStream = createMockStream([]);
const reader = new Reader(mockStream);

// Read some bytes to validate no Content-Length doesn't break initialization
const readPromise = reader.read(0);
await expect(readPromise).resolves.toEqual(new Uint8Array(0));
});

it('should handle reading bytes when Content-Length is specified', async () => {
const mockStream = createMockStream(
[new Uint8Array([1, 2, 3, 4])],
2048 // Content-Length
);

const reader = new Reader(mockStream);

const result = await reader.read(4);
expect(result).toEqual(new Uint8Array([1, 2, 3, 4]));
});

it('should resize the buffer if more bytes are received than Content-Length', async () => {
const mockStream = createMockStream(
[new Uint8Array([1, 2, 3, 4]), new Uint8Array([5, 6, 7, 8])],
4 // Content-Length is smaller than total bytes received
);

const reader = new Reader(mockStream);

const result1 = await reader.read(4);
expect(result1).toEqual(new Uint8Array([1, 2, 3, 4]));

const result2 = await reader.read(4);
expect(result2).toEqual(new Uint8Array([5, 6, 7, 8]));
});
});

0 comments on commit c95fe65

Please sign in to comment.