Skip to content

Commit

Permalink
feat: Support for multi-threaded hash calculation
Browse files Browse the repository at this point in the history
  • Loading branch information
devinxl committed Dec 1, 2023
1 parent 9a08e4a commit 085c6a3
Show file tree
Hide file tree
Showing 12 changed files with 613 additions and 56 deletions.
3 changes: 3 additions & 0 deletions examples/nextjs/src/components/object/index.tsx
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import { CreateObject } from './create';
import { DeleteObject } from './del';
import { ObjectInfo } from './info';
import { getChecksumApiWorker } from '@bnb-chain/greenfiled-file-handle';

export const ObjectComponent = () => {
console.log('getChecksumApi', getChecksumApiWorker, getChecksumApiWorker());

return (
<>
<h2>Object</h2>
Expand Down
6 changes: 6 additions & 0 deletions packages/file-handle/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,11 @@
"typescript": "^5.1.6",
"webpack": "^5.88.1",
"webpack-cli": "^5.1.4"
},
"dependencies": {
"buffer": "^6.0.3",
"comlink": "^4.4.1",
"hash-wasm": "^4.11.0",
"lodash-es": "^4.17.21"
}
}
14 changes: 13 additions & 1 deletion packages/file-handle/src/browser/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { ensureServiceIsRunning, initialize, instantiateWASM } from './init';
import { DEFAULT_DATA_BLOCKS, DEFAULT_PARITY_BLOCKS, DEFAULT_SEGMENT_SIZE } from '../constants';
import * as Comlink from 'comlink';

// 1. modify method of `exports` and `globalThis` export.
export const startRunningService = async (wasmURL) => {
Expand All @@ -10,7 +11,7 @@ export const startRunningService = async (wasmURL) => {
// const { add } = exports;

// `globalThis` is a map to complex way of `syscall/js` way.
const { getCheckSums } = globalThis;
const { getCheckSums } = globalThis.greenfieldSdk;

return {
getCheckSums,
Expand All @@ -26,3 +27,14 @@ export const getCheckSums = async (
await initialize();
return ensureServiceIsRunning().getCheckSums(bytes, segmentSize, dataBlocks, parityBlocks);
};

// please keep singleton
export const getChecksumApiWorker = () => {
const worker = new Worker(
/* webpackChunkName: "workers/checksumWorker-worker" */ new URL(
'./worker/checksumWorker.js',
import.meta.url,
),
);
return Comlink.wrap(worker);
};
15 changes: 15 additions & 0 deletions packages/file-handle/src/browser/worker/calcPrimaryWorker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { sha256 } from 'hash-wasm';

const encodePrimary = async (chunkId, buffer) => {
const primary = await sha256(new Uint8Array(buffer));
return [chunkId, primary];
};

onmessage = async (e) => {
const { chunkId, buffer, taskId } = e.data;
const result = await encodePrimary(chunkId, buffer);
postMessage({
result,
taskId,
});
};
54 changes: 54 additions & 0 deletions packages/file-handle/src/browser/worker/calcSecondWorker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { sha256 } from 'hash-wasm';
import { decodeBase64 } from '../../utils';
import Go from '../wasm_exec.js';

const init = async () => {
const go = new Go();
const input = window.__PUBLIC_FILE_HANDLE_WASM_PATH__;
const result = await WebAssembly.instantiateStreaming(fetch(input), go.importObject);
if (result) {
go.run(result.instance);
// Ensure hash-wasm initial success,
// Otherwise, after the browser finishes loading the page,
// the user immediately uploads a large object,
// and hash-wasm has a certain probability of initialization failure due to memory problems in chrome.
await sha256('');
}
};

init();

const encodeRawSegment = async (chunkId, buffer, dataBlocks, parityBlocks) => {
const results = [];
const bytes = new Uint8Array(buffer);

if (typeof greenfieldSdk === 'undefined') {
await init();
}
const result = greenfieldSdk.encodeRawSegment(bytes, dataBlocks, parityBlocks);
const shards = JSON.parse(result.result);

// Empty chunks should also return digest arrays of the corresponding length.
await Promise.all(
shards.map(async (shard, idx) => {
if (!results[idx]) {
results[idx] = [];
}
const hex = await sha256(decodeBase64(shard || ''));
results[idx].unshift(hex);
}),
);

return [chunkId, results];
};

onmessage = async (e) => {
const { chunkId, buffer, dataBlocks, parityBlocks, taskId } = e.data;

const result = await encodeRawSegment(chunkId, buffer, dataBlocks, parityBlocks);

postMessage({
result,
taskId,
});
};
173 changes: 173 additions & 0 deletions packages/file-handle/src/browser/worker/checksumWorker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
import { Buffer } from 'buffer';
import * as Comlink from 'comlink';
import { sha256 } from 'hash-wasm';
import { values } from 'lodash-es';
import { encodeBase64 } from '../../utils';
import { DEFAULT_DATA_BLOCKS, DEFAULT_PARITY_BLOCKS, DEFAULT_SEGMENT_SIZE } from '../../constants';

const WORKER_POOL_SIZE = 6;
const _createFileChunks = (file) => {
if (!file.size) return [{ file }];
const SIZE = DEFAULT_SEGMENT_SIZE;
const fileChunkList = [];
let cur = 0;
while (cur < file.size) {
fileChunkList.push({ file: file.slice(cur, cur + SIZE) });
cur += SIZE;
}
return fileChunkList;
};

const _generateIntegrityHash = async (list) => {
const hex = await sha256(Buffer.from(list.join(''), 'hex'));
return encodeBase64(Uint8Array.from(Buffer.from(hex, 'hex')));
};

const _initPrimaryWorkers = ({ consumers }) => {
const workers = new Array(WORKER_POOL_SIZE).fill(1).map(() => {
return new Worker(
/* webpackChunkName: "workers/calcPrimaryWorker-worker" */ new URL(
'./calcPrimaryWorker.js',
import.meta.url,
),
{
type: 'module',
},
);
});
workers.forEach((it) => {
it.onmessage = (e) => {
const { result, taskId } = e.data;
const id = result[0];
if (!consumers[id]) return;
const { resolve, data, taskId: _taskId } = consumers[id];
if (taskId !== _taskId) return;
data[result[0]] = result[1];
resolve();
};
});

return workers;
};
const _initSecondWorkers = ({ consumers }) => {
const workers = new Array(WORKER_POOL_SIZE).fill(1).map(() => {
return new Worker(
/* webpackChunkName: "workers/calcSecondWorker-worker" */ new URL(
'./calcSecondWorker.js',
import.meta.url,
),
);
});
workers.forEach((it) => {
it.onmessage = (e) => {
const { result, taskId } = e.data;
const id = result[0];
if (!consumers[id]) return;
const { resolve, data, taskId: _taskId } = consumers[id];
if (taskId !== _taskId) return;
data[result[0]] = result[1];
resolve();
};
});

return workers;
};

// js vm instance memory will not release immediately. try reuse worker thread.
let primaryWorkers = [];
let secondWorkers = [];

const primaryWorkerConsumers = {};
primaryWorkers = _initPrimaryWorkers({
consumers: primaryWorkerConsumers,
});

const secondWorkerConsumers = {};
secondWorkers = _initSecondWorkers({
consumers: secondWorkerConsumers,
});

export const generateCheckSumV2 = async (file) => {
if (!file) return {};

const taskId = Date.now();
let checkSumRes;

values(primaryWorkerConsumers).forEach((r) => r.resolve());
values(secondWorkerConsumers).forEach((r) => r.resolve());

try {
const fileChunks = _createFileChunks(file);
const secondResults = [];
const primaryResults = [];

const segments = fileChunks.map(async (fileItem, chunkId) => {
const buffer = await fileItem.file.arrayBuffer();

const primaryPromise = new Promise((resolve) => {
primaryWorkerConsumers[chunkId] = {
resolve,
data: primaryResults,
taskId,
};

const workerIdx = chunkId % WORKER_POOL_SIZE;
primaryWorkers[workerIdx].postMessage({ chunkId, buffer, taskId });
});

// shards
const shardsPromise = new Promise((resolve) => {
secondWorkerConsumers[chunkId] = {
resolve,
data: secondResults,
taskId,
};

const workerIdx = chunkId % WORKER_POOL_SIZE;
secondWorkers[workerIdx].postMessage({
chunkId,
buffer,
DEFAULT_DATA_BLOCKS,
DEFAULT_PARITY_BLOCKS,
taskId,
});
});

return Promise.all([shardsPromise, primaryPromise]);
});

await Promise.all(segments);

const combinedShards = [];
secondResults.forEach((items, idx) => {
items.forEach((child, childIdx) => {
if (!combinedShards[childIdx]) {
combinedShards[childIdx] = [];
} else if (!combinedShards[childIdx][idx]) {
combinedShards[childIdx][idx] = [];
}
combinedShards[childIdx][idx] = child[0];
});
});

const primaryCheckSum = await _generateIntegrityHash(primaryResults);
const secondsCheckSum = await Promise.all(
combinedShards.map((it) => _generateIntegrityHash(it)),
);
const value = [primaryCheckSum].concat(secondsCheckSum);
checkSumRes = {
fileChunks: fileChunks.length,
contentLength: file.size,
expectCheckSums: value,
};
} catch (e) {
// eslint-disable-next-line no-console
console.log('check sum error', e);
}

return checkSumRes;
};

Comlink.expose({
generateCheckSumV2,
});
31 changes: 29 additions & 2 deletions packages/file-handle/src/go-wasm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package main
import (
"bytes"
"encoding/json"
"fmt"
"syscall/js"

lib "github.com/bnb-chain/greenfield-common/go/hash"
redundancy "github.com/bnb-chain/greenfield-common/go/redundancy"
)

func hashFunc(this js.Value, args []js.Value) interface{} {
Expand Down Expand Up @@ -36,7 +38,32 @@ func hashFunc(this js.Value, args []js.Value) interface{} {
}
}

func encodeRawSegment(this js.Value, args []js.Value) interface{} {
array := args[0]
dataBlocks := args[1].Int()
parityBlocks := args[2].Int()

fmt.Print("array: ", array, dataBlocks, parityBlocks)

byteLength := array.Get("byteLength").Int()
data := make([]byte, byteLength)
var buffer []uint8 = make([]uint8, byteLength)
js.CopyBytesToGo(buffer, array)
reader := bytes.NewReader(buffer)
reader.Read(data)

encodeShards, _ := redundancy.EncodeRawSegment(data, dataBlocks, parityBlocks)
shardsJson, _ := json.Marshal(encodeShards)
return map[string]interface{}{
"result": string(shardsJson),
}
}

func main() {
js.Global().Set("getCheckSums", js.FuncOf(hashFunc))
<-make(chan struct{})
done := make(chan int, 0)
js.Global().Set("greenfieldSdk", map[string]interface{}{
"getCheckSums": js.FuncOf(hashFunc),
"encodeRawSegment": js.FuncOf(encodeRawSegment),
})
<-done
}
2 changes: 1 addition & 1 deletion packages/file-handle/src/node/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export const startRunningService = async () => {
// const { add } = exports;

// `globalThis` is a map to complex way of `syscall/js` way.
const { getCheckSums } = globalThis;
const { getCheckSums } = globalThis.greenfieldSdk;

return {
getCheckSums,
Expand Down
42 changes: 42 additions & 0 deletions packages/file-handle/src/utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
const base64Chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/';

export function encodeBase64(data, pad = true) {
const len = data.length;
const extraBytes = len % 3;
const parts = [];

const len2 = len - extraBytes;
for (let i = 0; i < len2; i += 3) {
const tmp = ((data[i] << 16) & 0xff0000) + ((data[i + 1] << 8) & 0xff00) + (data[i + 2] & 0xff);

const triplet =
base64Chars.charAt((tmp >> 18) & 0x3f) +
base64Chars.charAt((tmp >> 12) & 0x3f) +
base64Chars.charAt((tmp >> 6) & 0x3f) +
base64Chars.charAt(tmp & 0x3f);

parts.push(triplet);
}

if (extraBytes === 1) {
const tmp = data[len - 1];
const a = base64Chars.charAt(tmp >> 2);
const b = base64Chars.charAt((tmp << 4) & 0x3f);

parts.push(`${a}${b}`);
if (pad) {
parts.push('==');
}
} else if (extraBytes === 2) {
const tmp = (data[len - 2] << 8) + data[len - 1];
const a = base64Chars.charAt(tmp >> 10);
const b = base64Chars.charAt((tmp >> 4) & 0x3f);
const c = base64Chars.charAt((tmp << 2) & 0x3f);
parts.push(`${a}${b}${c}`);
if (pad) {
parts.push('=');
}
}

return parts.join('');
}
Binary file modified packages/file-handle/src/wasm/file-handle.wasm
Binary file not shown.
Loading

0 comments on commit 085c6a3

Please sign in to comment.