diff --git a/lib/activity-state.js b/lib/activity-state.js index d9e675e..57d2062 100644 --- a/lib/activity-state.js +++ b/lib/activity-state.js @@ -1,26 +1,26 @@ /* global Zinnia */ -// Create activity events when we go online or offline +// Create activity events when we become healthy or produce errors export class ActivityState { - #ok = null + #healthy = null onOutdatedClient () { this.onError('SPARK is outdated. Please upgrade Filecoin Station to the latest version.') } onError (msg) { - if (this.#ok === null || this.#ok) { - this.#ok = false + if (this.#healthy === null || this.#healthy) { + this.#healthy = false Zinnia.activity.error(msg ?? 'SPARK failed reporting retrieval') } } - onSuccess () { - if (this.#ok === null) { - this.#ok = true + onHealthy () { + if (this.#healthy === null) { + this.#healthy = true Zinnia.activity.info('SPARK started reporting retrievals') - } else if (!this.#ok) { - this.#ok = true + } else if (!this.#healthy) { + this.#healthy = true Zinnia.activity.info('SPARK retrieval reporting resumed') } } diff --git a/lib/constants.js b/lib/constants.js index ddb75b8..df723e7 100644 --- a/lib/constants.js +++ b/lib/constants.js @@ -1,2 +1,3 @@ export const SPARK_VERSION = '1.4.0' export const DELAY_BETWEEN_RETRIEVALS = 10_000 +export const MAX_CAR_SIZE = 200 * 1024 * 1024 // 200 MB diff --git a/lib/deno-encoding-hex.js b/lib/deno-encoding-hex.js new file mode 100644 index 0000000..10aba7a --- /dev/null +++ b/lib/deno-encoding-hex.js @@ -0,0 +1,93 @@ +// deno-fmt-ignore-file +// deno-lint-ignore-file +// This code was bundled using `deno bundle` and it's not recommended to edit it manually +// +// You can re-create this file by running the following command: +// deno bundle "https://deno.land/std@0.203.0/encoding/hex.ts" > lib/deno-encoding-hex.js + +const encoder = new TextEncoder() +function getTypeName (value) { + const type = typeof value + if (type !== 'object') { + return type + } else if (value === null) { + return 'null' + } else { + return value?.constructor?.name ?? 'object' + } +} +function validateBinaryLike (source) { + if (typeof source === 'string') { + return encoder.encode(source) + } else if (source instanceof Uint8Array) { + return source + } else if (source instanceof ArrayBuffer) { + return new Uint8Array(source) + } + throw new TypeError(`The input must be a Uint8Array, a string, or an ArrayBuffer. Received a value of the type ${getTypeName(source)}.`) +} +const hexTable = new TextEncoder().encode('0123456789abcdef') +const textEncoder = new TextEncoder() +const textDecoder = new TextDecoder() +function errInvalidByte (__byte) { + return new TypeError(`Invalid byte '${String.fromCharCode(__byte)}'`) +} +function errLength () { + return new RangeError('Odd length hex string') +} +function fromHexChar (__byte) { + if (__byte >= 48 && __byte <= 57) return __byte - 48 + if (__byte >= 97 && __byte <= 102) return __byte - 97 + 10 + if (__byte >= 65 && __byte <= 70) return __byte - 65 + 10 + throw errInvalidByte(__byte) +} +function encode (src) { + const dst = new Uint8Array(src.length * 2) + for (let i = 0; i < dst.length; i++) { + const v = src[i] + dst[i * 2] = hexTable[v >> 4] + dst[i * 2 + 1] = hexTable[v & 0x0f] + } + return dst +} +function encodeHex (src) { + const u8 = validateBinaryLike(src) + const dst = new Uint8Array(u8.length * 2) + for (let i = 0; i < dst.length; i++) { + const v = u8[i] + dst[i * 2] = hexTable[v >> 4] + dst[i * 2 + 1] = hexTable[v & 0x0f] + } + return textDecoder.decode(dst) +} +function decode (src) { + const dst = new Uint8Array(src.length / 2) + for (let i = 0; i < dst.length; i++) { + const a = fromHexChar(src[i * 2]) + const b = fromHexChar(src[i * 2 + 1]) + dst[i] = a << 4 | b + } + if (src.length % 2 === 1) { + fromHexChar(src[dst.length * 2]) + throw errLength() + } + return dst +} +function decodeHex (src) { + const u8 = textEncoder.encode(src) + const dst = new Uint8Array(u8.length / 2) + for (let i = 0; i < dst.length; i++) { + const a = fromHexChar(u8[i * 2]) + const b = fromHexChar(u8[i * 2 + 1]) + dst[i] = a << 4 | b + } + if (u8.length % 2 === 1) { + fromHexChar(u8[dst.length * 2]) + throw errLength() + } + return dst +} +export { encode } +export { encodeHex } +export { decode } +export { decodeHex } diff --git a/lib/spark.js b/lib/spark.js index a9d9143..9435ad3 100644 --- a/lib/spark.js +++ b/lib/spark.js @@ -1,7 +1,8 @@ /* global Zinnia */ import { ActivityState } from './activity-state.js' -import { SPARK_VERSION, DELAY_BETWEEN_RETRIEVALS } from './constants.js' +import { SPARK_VERSION, DELAY_BETWEEN_RETRIEVALS, MAX_CAR_SIZE } from './constants.js' +import { encodeHex } from './deno-encoding-hex.js' const sleep = dt => new Promise(resolve => setTimeout(resolve, dt)) @@ -46,6 +47,11 @@ export default class Spark { }, 60_000) } + // WebCrypto API does not support streams yet, the hashing function requires entire data + // to be provided at once. See https://github.com/w3c/webcrypto/issues/73 + const carBuffer = new ArrayBuffer(0, { maxByteLength: MAX_CAR_SIZE }) + const carBytes = new Uint8Array(carBuffer) + try { resetTimeout() const res = await this.#fetch(url, { signal }) @@ -58,8 +64,30 @@ export default class Spark { stats.firstByteAt = new Date() } stats.byteLength += value.byteLength + + // We want to limit how large content we are willing to download. + // 1. To make sure we don't spend too much time (and network bandwidth) on a single task, + // so that we can complete more tasks per round + // 2. Until we have streaming hashes, we need to keep the entire payload in memory, and so + // we need to put an upper limit on how much memory we consume. + if (stats.byteLength > MAX_CAR_SIZE) { + stats.carTooLarge = true + break + } + + const offset = carBuffer.byteLength + carBuffer.resize(offset + value.byteLength) + carBytes.set(value, offset) + resetTimeout() } + + if (!stats.carTooLarge) { + const digest = await crypto.subtle.digest('sha-256', carBytes) + // 12 is the code for sha2-256 + // 20 is the digest length (32 bytes = 256 bits) + stats.carChecksum = '1220' + encodeHex(digest) + } } else { console.error('Retrieval failed with status code %s: %s', res.status, await res.text()) @@ -98,13 +126,14 @@ export default class Spark { async nextRetrieval () { const { id: retrievalId, ...retrieval } = await this.getRetrieval() - let success = false const stats = { timeout: false, startAt: new Date(), firstByteAt: null, endAt: null, + carTooLarge: false, byteLength: 0, + carChecksum: null, statusCode: null } const searchParams = new URLSearchParams({ @@ -114,13 +143,12 @@ export default class Spark { const url = `ipfs://${retrieval.cid}?${searchParams.toString()}` try { await this.fetchCAR(url, stats) - success = true } catch (err) { console.error(`Failed to fetch ${url}`) console.error(err) } - const measurementId = await this.submitMeasurement(retrieval, { success, ...stats }) + const measurementId = await this.submitMeasurement(retrieval, { ...stats }) Zinnia.jobCompleted() return measurementId } @@ -129,7 +157,7 @@ export default class Spark { while (true) { try { await this.nextRetrieval() - this.#activity.onSuccess() + this.#activity.onHealthy() } catch (err) { if (err.statusCode === 400 && err.serverMessage === 'OUTDATED CLIENT') { this.#activity.onOutdatedClient() diff --git a/test/spark.js b/test/spark.js index b767255..54fde92 100644 --- a/test/spark.js +++ b/test/spark.js @@ -3,7 +3,7 @@ import Spark from '../lib/spark.js' import { test } from 'zinnia:test' import { assertInstanceOf, assertEquals, assertArrayIncludes } from 'zinnia:assert' -import { SPARK_VERSION } from '../lib/constants.js' +import { SPARK_VERSION, MAX_CAR_SIZE } from '../lib/constants.js' test('getRetrieval', async () => { const round = { @@ -64,7 +64,9 @@ test('fetchCAR', async () => { startAt: new Date(), firstByteAt: null, endAt: null, + carTooLarge: false, byteLength: 0, + carChecksum: null, statusCode: null } await spark.fetchCAR(URL, stats) @@ -72,11 +74,42 @@ test('fetchCAR', async () => { assertInstanceOf(stats.startAt, Date) assertInstanceOf(stats.firstByteAt, Date) assertInstanceOf(stats.endAt, Date) + assertEquals(stats.carTooLarge, false) assertEquals(stats.byteLength, 3) + assertEquals(stats.carChecksum, '1220039058c6f2c0cb492c533b0a4d14ef77cc0f78abccced5287d84a1a2011cfb81') assertEquals(stats.statusCode, 200) assertEquals(requests, [{ url: URL }]) }) +test('fetchCAR exceeding MAX_CAR_SIZE', async () => { + const URL = 'url' + const fetch = async url => { + return { + status: 200, + ok: true, + body: (async function * () { + const data = new Uint8Array(MAX_CAR_SIZE + 1) + data.fill(11, 0, -1) + yield data + })() + } + } + const spark = new Spark({ fetch }) + const stats = { + timeout: false, + carTooLarge: false, + byteLength: 0, + carChecksum: null, + statusCode: null + } + await spark.fetchCAR(URL, stats) + assertEquals(stats.timeout, false) + assertEquals(stats.carTooLarge, true) + assertEquals(stats.byteLength, MAX_CAR_SIZE + 1) + assertEquals(stats.carChecksum, null) + assertEquals(stats.statusCode, 200) +}) + test('submitRetrieval', async () => { const requests = [] const fetch = async (url, opts) => { @@ -84,7 +117,7 @@ test('submitRetrieval', async () => { return { status: 200, ok: true, async json () { return { id: 123 } } } } const spark = new Spark({ fetch }) - await spark.submitMeasurement({ cid: 'bafytest' }, { success: true }) + await spark.submitMeasurement({ cid: 'bafytest' }, {}) assertEquals(requests, [ { url: 'https://spark.fly.dev/measurements', @@ -94,7 +127,6 @@ test('submitRetrieval', async () => { sparkVersion: SPARK_VERSION, zinniaVersion: Zinnia.versions.zinnia, cid: 'bafytest', - success: true, participantAddress: Zinnia.walletAddress }), headers: { 'Content-Type': 'application/json' }