Skip to content

Commit

Permalink
feat: Nodejs support worker
Browse files Browse the repository at this point in the history
  • Loading branch information
rrr523 committed Dec 18, 2023
1 parent 089a224 commit deee365
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 48 deletions.
3 changes: 2 additions & 1 deletion packages/reed-solomon/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
"main": "./dist/index.js",
"default": "./dist/index.js",
"types": "./types/index.d.ts"
}
},
"./utils": "./dist/utils.js"
},
"scripts": {
"prebuild": "rimraf ./dist",
Expand Down
5 changes: 3 additions & 2 deletions packages/reed-solomon/rollup.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ export default async () => {
],
},
{
input: './src/index.js',
input: ['./src/index.js', './src/node.adapter.js', './src/utils.js'],
output: {
format: 'cjs',
file: 'dist/index.js',
// file: 'dist/index.js',
dir: 'dist',
sourcemap: true,
},
external: resolveExternal(),
Expand Down
83 changes: 38 additions & 45 deletions packages/reed-solomon/src/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { sha256 } from 'ethereum-cryptography/sha256.js';
import { galMulSlice, galMulSliceXor } from './galois';
import { buildMatrix } from './matrix';
import { concat, getIntegrityUint8Array, toBase64 } from './utils';
import { concat, getIntegrityUint8Array, splitPrice, toBase64 } from './utils';

export class ReedSolomon {
constructor(
Expand All @@ -16,7 +16,7 @@ export class ReedSolomon {
this.segmentSize = segmentSize;
}

allocAligned(shards, each) {
_allocAligned(shards, each) {
// const eachAligned = (parseInt((each + 63) / 64)) * 64; // Use Math.ceil instead of ((each + 63) / 64) * 64

const eachAligned = ((each + 63) >>> 6) << 6;
Expand All @@ -31,7 +31,7 @@ export class ReedSolomon {
return res;
}

split(data) {
_split(data) {
if (data.length === 0) {
return [];
}
Expand Down Expand Up @@ -68,7 +68,7 @@ export class ReedSolomon {
if (data.length < needTotal) {
const fullShards = data.length / perShard;
// padding = new Array(this.totalShards - fullShards).fill(0);
padding = this.allocAligned(this.totalShards - fullShards, perShard);
padding = this._allocAligned(this.totalShards - fullShards, perShard);

if (dataLen > perShard * fullShards) {
const copyFrom = data.slice(perShard * fullShards, dataLen);
Expand Down Expand Up @@ -97,30 +97,7 @@ export class ReedSolomon {
return dst;
}

encodeSegment(data) {
if (data.length == 0) throw new Error('data buffer length is 0');

const shared = this.split(data);

const output = shared.slice(this.dataShards);

// r.m
const matrix = buildMatrix(this.totalShards, this.dataShards);

let parity = [];
for (let i = 0; i < this.parityShards; i++) {
parity.push(matrix[this.dataShards + i]);
}

return this.codeSomeShards(
parity,
shared.slice(0, this.dataShards),
output.slice(0, this.parityShards),
shared[0].length,
);
}

codeSomeShards(matrixRows, inputs, outputs) {
_codeSomeShards(matrixRows, inputs, outputs) {
let start = 0;
let end = inputs[0].length;

Expand Down Expand Up @@ -150,44 +127,60 @@ export class ReedSolomon {
return concat(inputs, outputs);
}

encode(sourceData) {
let chunkList = [];
let cur = 0;
encodeSegment(data) {
if (data.length == 0) throw new Error('data buffer length is 0');

// TODO: if totalShards is not `5JCeuQeRkm5NMpJWZG3hSuFU=`
if (sourceData.length == 0) {
return new Array(this.totalShards + 1).fill('47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU=');
}
const shared = this._split(data);

const output = shared.slice(this.dataShards);

// r.m
const matrix = buildMatrix(this.totalShards, this.dataShards);

while (cur < sourceData.length) {
chunkList.push(sourceData.slice(cur, cur + this.segmentSize));
cur += this.segmentSize;
let parity = [];
for (let i = 0; i < this.parityShards; i++) {
parity.push(matrix[this.dataShards + i]);
}

let encodeDataHash = new Array(this.totalShards);
return this._codeSomeShards(
parity,
shared.slice(0, this.dataShards),
output.slice(0, this.parityShards),
shared[0].length,
);
}

for (let i = 0; i < encodeDataHash.length; i++) {
encodeDataHash[i] = [];
encode(sourceData) {
if (sourceData.length == 0) {
throw new Error('file buffer is empty');
}

const chunkList = splitPrice(sourceData, this.segmentSize);

let encodeDataHashList = new Array(this.totalShards);
for (let i = 0; i < encodeDataHashList.length; i++) {
encodeDataHashList[i] = [];
}
let hashList = [];
let segChecksumList = [];

for (let i = 0; i < chunkList.length; i++) {
const data = chunkList[i];
// console.log('data i', i)
const encodeShards = this.encodeSegment(data);
// console.log('data i done')
// console.log('data done', i)
segChecksumList.push(sha256(data));

for (let i = 0; i < encodeShards.length; i++) {
const priceHash = sha256(encodeShards[i]);
encodeDataHash[i].push(priceHash);
encodeDataHashList[i].push(priceHash);
}
}

hashList[0] = sha256(getIntegrityUint8Array(segChecksumList));

for (let i = 0; i < encodeDataHash.length; i++) {
hashList[i + 1] = sha256(getIntegrityUint8Array(encodeDataHash[i]));
for (let i = 0; i < encodeDataHashList.length; i++) {
hashList[i + 1] = sha256(getIntegrityUint8Array(encodeDataHashList[i]));
}

return toBase64(hashList);
Expand Down
88 changes: 88 additions & 0 deletions packages/reed-solomon/src/node.adapter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import { sha256 } from 'ethereum-cryptography/sha256.js';
import { isMainThread, parentPort, Worker, workerData } from 'node:worker_threads';
import { ReedSolomon } from './index';
import { getIntegrityUint8Array, toBase64, splitPrice } from './utils';

export class NodeAdapterReedSolomon extends ReedSolomon {
async encodeInWorker(p, sourceData) {
return new Promise((resolve, reject) => {
if (isMainThread) {
const chunkList = splitPrice(sourceData, this.segmentSize);

let hashList = [];
let segChecksumList = [];
let encodeDataHashList = new Array(6);

for (let i = 0; i < encodeDataHashList.length; i++) {
encodeDataHashList[i] = [];
}

const threads = new Set();
let RES = [];

for (let i = 0; i < chunkList.length; i++) {
const worker = new Worker(p, {
workerData: {
index: i,
chunk: chunkList[i],
},
});
threads.add(worker);
}

for (let w of threads) {
w.on('error', (err) => {
throw err;
});
w.on('exit', () => {
threads.delete(w);
// console.log(`Thread exiting, ${threads.size} running...`)
if (threads.size === 0) {
for (let i = 0; i < RES.length; i++) {
segChecksumList.push(RES[i].segChecksum);
}

for (let i = 0; i < chunkList.length; i++) {
for (let j = 0; j < encodeDataHashList.length; j++) {
encodeDataHashList[j][i] = RES[i].encodeDataHash[j];
}
}

hashList[0] = sha256(getIntegrityUint8Array(segChecksumList));

for (let i = 0; i < encodeDataHashList.length; i++) {
hashList[i + 1] = sha256(getIntegrityUint8Array(encodeDataHashList[i]));
}

const res = toBase64(hashList);
resolve(res);
}
});

w.on('message', (message) => {
// console.log('message', message.encodeData.index)
RES[message.index] = message;
});
}
} else {
const encodeShards = this.encodeSegment(workerData.chunk);
let encodeDataHash = [];

for (let i = 0; i < encodeShards.length; i++) {
const priceHash = sha256(encodeShards[i]);
encodeDataHash.push(priceHash);
}

// console.log('encodeShards', encodeShards.length)
// console.log('encodeDataHash', encodeDataHash.length)
// console.log('workerData.index', workerData.index)

parentPort.postMessage({
index: workerData.index,
segChecksum: sha256(workerData.chunk),
encodeDataHash: encodeDataHash,
});
}
});
}
}
15 changes: 15 additions & 0 deletions packages/reed-solomon/src/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,18 @@ export function toBase64(hashList) {
}
return res;
}

/**
* split data to same length price
*/
export function splitPrice(data, size) {
let chunkList = [];
let cur = 0;

while (cur < data.length) {
chunkList.push(data.slice(cur, cur + size));
cur += size;
}

return chunkList;
}
19 changes: 19 additions & 0 deletions packages/reed-solomon/types/index.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
declare class ReedSolomon {
constructor(dataShards = 4, parityShards = 2, segmentSize = 16777216);

encodeSegment(data: Uint8Array): Uint8Array[];

encode(data: Uint8Arary): string[];
}

declare class NodeAdapterReedSolomon {
encodeInWorker(p: string, data: Uint8Array): Promise<string[]>;
}

export function concat(a: Uint8Array[], b: Uint8Array[]): Uint8Array[];

export function getIntegrityUint8Array(uin8arr: Uint8Array[]): Uint8Array;

export function toBase64(hashList: Uint8Array[]): string[];

export function splitPrice(data: Uint8Array, size: number): Uint8Array[];

0 comments on commit deee365

Please sign in to comment.