Skip to content

Commit

Permalink
feat: use worker thread for piece hashing (#198)
Browse files Browse the repository at this point in the history
Worker threads actually share memory in Node.js so there's no copy
getting the CAR bytes into the worker. This allows WASM piece hashing in
a worker that doesn't stall the main thread.

#### Before

Notice how the spinner is stalled for multiple seconds as piece hashing
is done:


https://github.com/storacha-network/w3cli/assets/152863/5d88f8c5-e7ae-4da3-96ce-a686fb3f2258

#### After

No stall!


https://github.com/storacha-network/w3cli/assets/152863/747a412e-1c27-456b-913f-a4ec6e29498a

Co-authored-by: Alan Shaw <[email protected]>
  • Loading branch information
alanshaw and Alan Shaw authored Nov 15, 2024
1 parent 3f59da7 commit dfafbd3
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 21 deletions.
18 changes: 2 additions & 16 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@ import ora from 'ora'
import { CID } from 'multiformats/cid'
import { base64 } from 'multiformats/bases/base64'
import { identity } from 'multiformats/hashes/identity'
import * as Digest from 'multiformats/hashes/digest'
import * as DID from '@ipld/dag-ucan/did'
import * as dagJSON from '@ipld/dag-json'
import { CarWriter } from '@ipld/car'
import { filesFromPaths } from 'files-from-path'
import * as PieceHasher from 'fr32-sha2-256-trunc254-padded-binary-tree-multihash'
import * as Account from './account.js'

import { spaceAccess } from '@web3-storage/w3up-client/capability/access'
Expand All @@ -26,6 +24,7 @@ import {
readProofFromBytes,
uploadListResponseToString,
startOfLastMonth,
pieceHasher,
} from './lib.js'
import * as ucanto from '@ucanto/core'
import { ed25519 } from '@ucanto/principal'
Expand Down Expand Up @@ -160,20 +159,7 @@ export async function upload(firstPath, opts) {
: `Storing ${Math.min(Math.round((totalSent / totalSize) * 100), 100)}%`

const root = await uploadFn({
pieceHasher: {
code: PieceHasher.code,
name: 'fr32-sha2-256-trunc254-padded-binary-tree-multihash',
async digest (input) {
const hasher = PieceHasher.create()
hasher.write(input)

const bytes = new Uint8Array(hasher.multihashByteLength())
hasher.digestInto(bytes, 0, true)
hasher.free()

return Digest.decode(bytes)
}
},
pieceHasher,
onShardStored: ({ cid, size, piece }) => {
totalSent += size
if (opts?.verbose) {
Expand Down
37 changes: 32 additions & 5 deletions lib.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import fs from 'fs'
import path from 'path'
import fs from 'node:fs'
import path from 'node:path'
import { Worker } from 'node:worker_threads'
import { fileURLToPath } from 'node:url'
// @ts-expect-error no typings :(
import tree from 'pretty-tree'
import { importDAG } from '@ucanto/core/delegation'
Expand Down Expand Up @@ -28,9 +30,9 @@ import chalk from 'chalk'
* @typedef {import('@web3-storage/capabilities/types').FilecoinInfoSuccess} FilecoinInfoSuccess
*/

/**
*
*/
const __filename = fileURLToPath(import.meta.url)
const __dirname = path.dirname(__filename)

export function getPkg() {
// @ts-ignore JSON.parse works with Buffer in Node.js
return JSON.parse(fs.readFileSync(new URL('./package.json', import.meta.url)))
Expand Down Expand Up @@ -347,3 +349,28 @@ export const streamToBlob = async source => {
}))
return new Blob(chunks)
}

const workerPath = path.join(__dirname, 'piece-hasher-worker.js')

/** @see https://github.com/multiformats/multicodec/pull/331/files */
const pieceHasherCode = 0x1011

/** @type {import('multiformats').MultihashHasher<typeof pieceHasherCode>} */
export const pieceHasher = {
code: pieceHasherCode,
name: 'fr32-sha2-256-trunc254-padded-binary-tree',
async digest (input) {
const bytes = await new Promise((resolve, reject) => {
const worker = new Worker(workerPath, { workerData: input })
worker.on('message', resolve)
worker.on('error', reject)
worker.on('exit', (code) => {
if (code !== 0) reject(new Error(`Piece hasher worker exited with code: ${code}`))
})
})
const digest =
/** @type {import('multiformats').MultihashDigest<typeof pieceHasherCode>} */
(Digest.decode(bytes))
return digest
}
}
11 changes: 11 additions & 0 deletions piece-hasher-worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { parentPort, workerData } from 'node:worker_threads'
import * as PieceHasher from 'fr32-sha2-256-trunc254-padded-binary-tree-multihash'

const hasher = PieceHasher.create()
hasher.write(workerData)

const bytes = new Uint8Array(hasher.multihashByteLength())
hasher.digestInto(bytes, 0, true)
hasher.free()

parentPort?.postMessage(bytes)

0 comments on commit dfafbd3

Please sign in to comment.