From dfafbd3e65cb74b64a62b2f63129a927ddcd79f4 Mon Sep 17 00:00:00 2001 From: ash Date: Fri, 15 Nov 2024 23:05:48 +0700 Subject: [PATCH] feat: use worker thread for piece hashing (#198) Worker threads actually share memory in Node.js so there's no copy getting the CAR bytes into the worker. This allows WASM piece hashing in a worker that doesn't stall the main thread. #### Before Notice how the spinner is stalled for multiple seconds as piece hashing is done: https://github.com/storacha-network/w3cli/assets/152863/5d88f8c5-e7ae-4da3-96ce-a686fb3f2258 #### After No stall! https://github.com/storacha-network/w3cli/assets/152863/747a412e-1c27-456b-913f-a4ec6e29498a Co-authored-by: Alan Shaw --- index.js | 18 ++---------------- lib.js | 37 ++++++++++++++++++++++++++++++++----- piece-hasher-worker.js | 11 +++++++++++ 3 files changed, 45 insertions(+), 21 deletions(-) create mode 100644 piece-hasher-worker.js diff --git a/index.js b/index.js index 5dbdeac..1080cd0 100644 --- a/index.js +++ b/index.js @@ -5,12 +5,10 @@ import ora from 'ora' import { CID } from 'multiformats/cid' import { base64 } from 'multiformats/bases/base64' import { identity } from 'multiformats/hashes/identity' -import * as Digest from 'multiformats/hashes/digest' import * as DID from '@ipld/dag-ucan/did' import * as dagJSON from '@ipld/dag-json' import { CarWriter } from '@ipld/car' import { filesFromPaths } from 'files-from-path' -import * as PieceHasher from 'fr32-sha2-256-trunc254-padded-binary-tree-multihash' import * as Account from './account.js' import { spaceAccess } from '@web3-storage/w3up-client/capability/access' @@ -26,6 +24,7 @@ import { readProofFromBytes, uploadListResponseToString, startOfLastMonth, + pieceHasher, } from './lib.js' import * as ucanto from '@ucanto/core' import { ed25519 } from '@ucanto/principal' @@ -160,20 +159,7 @@ export async function upload(firstPath, opts) { : `Storing ${Math.min(Math.round((totalSent / totalSize) * 100), 100)}%` const root = await uploadFn({ - pieceHasher: { - code: PieceHasher.code, - name: 'fr32-sha2-256-trunc254-padded-binary-tree-multihash', - async digest (input) { - const hasher = PieceHasher.create() - hasher.write(input) - - const bytes = new Uint8Array(hasher.multihashByteLength()) - hasher.digestInto(bytes, 0, true) - hasher.free() - - return Digest.decode(bytes) - } - }, + pieceHasher, onShardStored: ({ cid, size, piece }) => { totalSent += size if (opts?.verbose) { diff --git a/lib.js b/lib.js index 51388b3..e5f5dab 100644 --- a/lib.js +++ b/lib.js @@ -1,5 +1,7 @@ -import fs from 'fs' -import path from 'path' +import fs from 'node:fs' +import path from 'node:path' +import { Worker } from 'node:worker_threads' +import { fileURLToPath } from 'node:url' // @ts-expect-error no typings :( import tree from 'pretty-tree' import { importDAG } from '@ucanto/core/delegation' @@ -28,9 +30,9 @@ import chalk from 'chalk' * @typedef {import('@web3-storage/capabilities/types').FilecoinInfoSuccess} FilecoinInfoSuccess */ -/** - * - */ +const __filename = fileURLToPath(import.meta.url) +const __dirname = path.dirname(__filename) + export function getPkg() { // @ts-ignore JSON.parse works with Buffer in Node.js return JSON.parse(fs.readFileSync(new URL('./package.json', import.meta.url))) @@ -347,3 +349,28 @@ export const streamToBlob = async source => { })) return new Blob(chunks) } + +const workerPath = path.join(__dirname, 'piece-hasher-worker.js') + +/** @see https://github.com/multiformats/multicodec/pull/331/files */ +const pieceHasherCode = 0x1011 + +/** @type {import('multiformats').MultihashHasher} */ +export const pieceHasher = { + code: pieceHasherCode, + name: 'fr32-sha2-256-trunc254-padded-binary-tree', + async digest (input) { + const bytes = await new Promise((resolve, reject) => { + const worker = new Worker(workerPath, { workerData: input }) + worker.on('message', resolve) + worker.on('error', reject) + worker.on('exit', (code) => { + if (code !== 0) reject(new Error(`Piece hasher worker exited with code: ${code}`)) + }) + }) + const digest = + /** @type {import('multiformats').MultihashDigest} */ + (Digest.decode(bytes)) + return digest + } +} diff --git a/piece-hasher-worker.js b/piece-hasher-worker.js new file mode 100644 index 0000000..88f72fe --- /dev/null +++ b/piece-hasher-worker.js @@ -0,0 +1,11 @@ +import { parentPort, workerData } from 'node:worker_threads' +import * as PieceHasher from 'fr32-sha2-256-trunc254-padded-binary-tree-multihash' + +const hasher = PieceHasher.create() +hasher.write(workerData) + +const bytes = new Uint8Array(hasher.multihashByteLength()) +hasher.digestInto(bytes, 0, true) +hasher.free() + +parentPort?.postMessage(bytes)