Skip to content

Commit

Permalink
Feat/reed solomon (#446)
Browse files Browse the repository at this point in the history
* chore: Update docs

* docs: Update

* docs: Update

* feat: Set Webworker

* feat: Clean InjectWorker

* feat: Clean InjectWorker
  • Loading branch information
rrr523 authored Dec 21, 2023
1 parent 3d19853 commit 65333cd
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 141 deletions.
5 changes: 5 additions & 0 deletions .changeset/mighty-laws-suffer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@bnb-chain/reed-solomon': minor
---

feat: Clean InjectWorker
5 changes: 5 additions & 0 deletions .changeset/tidy-boats-laugh.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@bnb-chain/reed-solomon': minor
---

feat: Support setting worker counts
40 changes: 17 additions & 23 deletions packages/reed-solomon/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ Use directly in the browser via script tag:
get reed solomon
</button>

<script src="https://cdn.jsdelivr.net/npm/@bnb-chain/reed-solomon/dist/index.aio.js"></script>
<script src="https://cdn.jsdelivr.net/npm/@bnb-chain/reed-solomon/dist/index.aio.js"></script>
<script>
const fileInput = document.getElementById('file');
Expand All @@ -55,42 +54,37 @@ Use directly in the browser via script tag:
### Browser(WebWorker)

```html
<!-- prefetch js -->
<link rel="prefetch" href="https://unpkg.com/@bnb-chain/reed-solomon/dist/web.adapter.aio.js" />
<link rel="prefetch" href="https://unpkg.com/@bnb-chain/reed-solomon/dist/utils.aio.js" />
<script src="https://unpkg.com/@bnb-chain/reed-solomon/dist/web.adapter.aio.js"></script>
<script>
const rs = new WebAdapter.WebAdapterReedSolomon()
rs.initWorkers({
workerNum: 6,
injectWorker,
})
document.getElementById('worker-btn').onclick = async function() {
const selectFile = fileInput.files[0];
const arrBuffer = await selectFile.arrayBuffer()
if (!arrBuffer) alert('no file selected');
const sourceData = new Uint8Array(arrBuffer)
const rs = new WebAdapter.WebAdapterReedSolomon()
const res = await rs.encodeInWorker(injectWorker, sourceData)
const res = await rs.encodeInWorker(sourceData)
}
// inject worker
function injectWorker() {
// replace your CDN url
importScripts('http://localhost:9002/dist/web.adapter.aio.js');
importScripts('http://localhost:9002/dist/utils.aio.js');
// or download this file and put it to your CDN server
importScripts('https://unpkg.com/@bnb-chain/reed-solomon/dist/web.adapter.aio.js');
importScripts('https://unpkg.com/@bnb-chain/reed-solomon/dist/utils.aio.js');
const rs = new WebAdapter.WebAdapterReedSolomon();
onmessage = function (event) {
const { index, chunk } = event.data;
const encodeShards = rs.encodeSegment(chunk);
let encodeDataHash = [];
for (let i = 0; i < encodeShards.length; i++) {
const priceHash = RSUtils.sha256(encodeShards[i]);
encodeDataHash.push(priceHash);
}
postMessage({
index,
segChecksum: RSUtils.sha256(chunk),
encodeDataHash,
});
self.close();
const encodeShard = rs.getEncodeShard(chunk, index)
postMessage(encodeShard);
};
}
</script>
Expand Down Expand Up @@ -128,6 +122,6 @@ const res = await rs.encodeInWorker(__filename, Uint8Array.from(fileBuffer))

[Code](./examples/node-worker.js)

## Benchark
## Benchmark

[benchmark](./benchmark.md)
4 changes: 2 additions & 2 deletions packages/reed-solomon/examples/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ const fs = require('node:fs');
const path = require('node:path');
const { ReedSolomon } = require('../dist/index');

const fileBuffer = fs.readFileSync('./README.md');
const sourceData = fs.readFileSync('./README.md');

(async () => {
const rs = new ReedSolomon();
console.log('file size', sourceData.length / 1024 / 1024, 'm');
console.time('cost');
const res = await rs.encode(Uint8Array.from(fileBuffer));
const res = await rs.encode(Uint8Array.from(sourceData));
console.log('res', res);
console.timeEnd('cost');
})();
29 changes: 12 additions & 17 deletions packages/reed-solomon/examples/web-worker.html
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@
<script type="module">
const fileInput = document.getElementById('file');

const rs = new WebAdapter.WebAdapterReedSolomon()
rs.initWorkers({
workerNum: 6,
injectWorker,
})

// use webworker
document.getElementById('worker-btn').onclick = async function() {
const selectFile = fileInput.files[0];
Expand All @@ -26,35 +32,24 @@
const sourceData = new Uint8Array(arrBuffer)
console.time('webworker cost')
console.log('file size', sourceData.length / 1024 / 1024, 'm')
const rs = new WebAdapter.WebAdapterReedSolomon()
const res = await rs.encodeInWorker(injectWorker, sourceData)
const res = await rs.encodeInWorker(sourceData)
console.log('res', res)
console.timeEnd('webworker cost')
}

function injectWorker() {
importScripts('http://localhost:9002/dist/web.adapter.aio.js');
importScripts('http://localhost:9002/dist/utils.aio.js');
// or use public CDN
// importScripts('https://unpkg.com/@bnb-chain/reed-solomon/dist/web.adapter.aio.js');
// importScripts('https://unpkg.com/@bnb-chain/reed-solomon/dist/utils.aio.js');

const rs = new WebAdapter.WebAdapterReedSolomon();

onmessage = function (event) {
const { index, chunk } = event.data;
const encodeShards = rs.encodeSegment(chunk);
let encodeDataHash = [];

for (let i = 0; i < encodeShards.length; i++) {
const priceHash = RSUtils.sha256(encodeShards[i]);
encodeDataHash.push(priceHash);
}

postMessage({
index,
segChecksum: RSUtils.sha256(chunk),
encodeDataHash,
});

self.close();
const encodeShard = rs.getEncodeShard(chunk, index)
postMessage(encodeShard);
};
}

Expand Down
49 changes: 40 additions & 9 deletions packages/reed-solomon/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -161,19 +161,46 @@ export class ReedSolomon {
for (let i = 0; i < encodeDataHashList.length; i++) {
encodeDataHashList[i] = [];
}

let encodeShards = chunkList.map((chunk, index) => {
return this.getEncodeShard(chunk, index);
});

return this.getChecksumsByEncodeShards(encodeShards);
}

getEncodeShard(chunk, index) {
const encodeShards = this.encodeSegment(chunk);
let encodeDataHash = [];
for (let i = 0; i < encodeShards.length; i++) {
const priceHash = sha256(encodeShards[i]);
encodeDataHash.push(priceHash);
}
return {
index,
segChecksum: sha256(chunk),
encodeDataHash,
};
}

/**
* @param {Array[{index, segChecksum, encodeDataHash}]} encodeShards
*/
getChecksumsByEncodeShards(encodeShards) {
let hashList = [];
let segChecksumList = [];
let encodeDataHashList = new Array(this.totalShards);
for (let i = 0; i < encodeDataHashList.length; i++) {
encodeDataHashList[i] = [];
}

for (let i = 0; i < chunkList.length; i++) {
const data = chunkList[i];
// console.log('data i', i)
const encodeShards = this.encodeSegment(data);
// console.log('data done', i)
segChecksumList.push(sha256(data));
for (let i = 0; i < encodeShards.length; i++) {
segChecksumList.push(encodeShards[i].segChecksum);
}

for (let i = 0; i < encodeShards.length; i++) {
const priceHash = sha256(encodeShards[i]);
encodeDataHashList[i].push(priceHash);
for (let i = 0; i < encodeShards.length; i++) {
for (let j = 0; j < encodeDataHashList.length; j++) {
encodeDataHashList[j][i] = encodeShards[i].encodeDataHash[j];
}
}

Expand All @@ -185,4 +212,8 @@ export class ReedSolomon {

return toBase64(hashList);
}

sortByIndex(encodeShards) {
return encodeShards.sort((a, b) => a.index - b.index);
}
}
54 changes: 8 additions & 46 deletions packages/reed-solomon/src/node.adapter.js
Original file line number Diff line number Diff line change
@@ -1,24 +1,15 @@
import { sha256 } from 'ethereum-cryptography/sha256.js';
import { isMainThread, parentPort, Worker, workerData } from 'node:worker_threads';
import { ReedSolomon } from './index';
import { getIntegrityUint8Array, toBase64, splitPrice } from './utils';
import { splitPrice } from './utils';

export class NodeAdapterReedSolomon extends ReedSolomon {
async encodeInWorker(p, sourceData) {
return new Promise((resolve, reject) => {
if (isMainThread) {
// RES is `encodeShards` Array
let RES = [];
const chunkList = splitPrice(sourceData, this.segmentSize);

let hashList = [];
let segChecksumList = [];
let encodeDataHashList = new Array(6);

for (let i = 0; i < encodeDataHashList.length; i++) {
encodeDataHashList[i] = [];
}

const threads = new Set();
let RES = [];

for (let i = 0; i < chunkList.length; i++) {
const worker = new Worker(p, {
Expand All @@ -38,24 +29,8 @@ export class NodeAdapterReedSolomon extends ReedSolomon {
threads.delete(w);
// console.log(`Thread exiting, ${threads.size} running...`)
if (threads.size === 0) {
for (let i = 0; i < RES.length; i++) {
segChecksumList.push(RES[i].segChecksum);
}

for (let i = 0; i < chunkList.length; i++) {
for (let j = 0; j < encodeDataHashList.length; j++) {
encodeDataHashList[j][i] = RES[i].encodeDataHash[j];
}
}

hashList[0] = sha256(getIntegrityUint8Array(segChecksumList));

for (let i = 0; i < encodeDataHashList.length; i++) {
hashList[i + 1] = sha256(getIntegrityUint8Array(encodeDataHashList[i]));
}

const res = toBase64(hashList);
resolve(res);
const sortedRes = this.sortByIndex(RES);
resolve(this.getChecksumsByEncodeShards(sortedRes));
}
});

Expand All @@ -65,23 +40,10 @@ export class NodeAdapterReedSolomon extends ReedSolomon {
});
}
} else {
const encodeShards = this.encodeSegment(workerData.chunk);
let encodeDataHash = [];

for (let i = 0; i < encodeShards.length; i++) {
const priceHash = sha256(encodeShards[i]);
encodeDataHash.push(priceHash);
}

// console.log('encodeShards', encodeShards.length)
// console.log('encodeDataHash', encodeDataHash.length)
// console.log('workerData.index', workerData.index)
const { chunk, index } = workerData;

parentPort.postMessage({
index: workerData.index,
segChecksum: sha256(workerData.chunk),
encodeDataHash: encodeDataHash,
});
const encodeShard = this.getEncodeShard(chunk, index);
parentPort.postMessage(encodeShard);
}
});
}
Expand Down
79 changes: 36 additions & 43 deletions packages/reed-solomon/src/web.adapter.js
Original file line number Diff line number Diff line change
@@ -1,58 +1,51 @@
import { ReedSolomon } from '.';
import { sha256, getIntegrityUint8Array, toBase64, splitPrice } from './utils';
import { splitPrice } from './utils';

export class WebAdapterReedSolomon extends ReedSolomon {
async encodeInWorker(workerFn, sourceData) {
const chunkList = splitPrice(sourceData, this.segmentSize);
initWorkers({ injectWorker, workerNum = 10 }) {
this.workerNum = workerNum;
this.workers = [];
for (let i = 0; i < workerNum; i++) {
const worker = createWorker(injectWorker);
this.workers.push(worker);
}
}

const workers = [];
async encodeInWorker(sourceData) {
// RES is `encodeShards` Array
let RES = [];
const chunkList = splitPrice(sourceData, this.segmentSize);
const queue = [];

for (let i = 0; i < chunkList.length; i++) {
// const worker = new Worker('worker.js');
const worker = createWorker(workerFn);
workers.push(worker);
worker.postMessage({
queue.push({
index: i,
chunk: chunkList[i],
});
}

const plist = workers.map(
(worker) =>
new Promise((resolve) => {
worker.onmessage = (e) => {
resolve(e.data);
};
}),
);

return Promise.all(plist).then((RES) => {
let hashList = [];
let segChecksumList = [];
let encodeDataHashList = new Array(this.totalShards);
for (let i = 0; i < encodeDataHashList.length; i++) {
encodeDataHashList[i] = [];
}

for (let i = 0; i < RES.length; i++) {
segChecksumList.push(RES[i].segChecksum);
return new Promise((resolve) => {
let completedWorkers = 0;

for (let i = 0; i < queue.length; i++) {
const worker = this.workers[i % this.workerNum];
worker.postMessage({
index: queue[i].index,
chunk: queue[i].chunk,
});

worker.onmessage = (e) => {
// console.log('worker data', e.data)
completedWorkers++;
RES.push(e.data);

if (completedWorkers === queue.length) {
// console.log('RES', RES)
const sortedRes = this.sortByIndex(RES);
resolve(this.getChecksumsByEncodeShards(sortedRes));
}
};
}

for (let i = 0; i < chunkList.length; i++) {
for (let j = 0; j < encodeDataHashList.length; j++) {
encodeDataHashList[j][i] = RES[i].encodeDataHash[j];
}
}

hashList[0] = sha256(getIntegrityUint8Array(segChecksumList));

for (let i = 0; i < encodeDataHashList.length; i++) {
hashList[i + 1] = sha256(getIntegrityUint8Array(encodeDataHashList[i]));
}

const res = toBase64(hashList);

return res;
});
}
}
Expand Down
Loading

0 comments on commit 65333cd

Please sign in to comment.