From 4d8bd876d41f90fe6571941375616c45493d98ae Mon Sep 17 00:00:00 2001 From: Dhruv Soni Date: Tue, 11 Jun 2024 06:04:16 +0530 Subject: [PATCH] splitting stuff inside the loader --- packages/encrypted-blockstore/src/loader.ts | 603 +++++++++++--------- 1 file changed, 343 insertions(+), 260 deletions(-) diff --git a/packages/encrypted-blockstore/src/loader.ts b/packages/encrypted-blockstore/src/loader.ts index 364115d8..50210396 100644 --- a/packages/encrypted-blockstore/src/loader.ts +++ b/packages/encrypted-blockstore/src/loader.ts @@ -1,6 +1,6 @@ -import pLimit from 'p-limit' -import { CarReader } from '@ipld/car' -import { CID } from 'multiformats' +import pLimit from "p-limit"; +import { CarReader } from "@ipld/car"; +import { CID } from "multiformats"; import type { AnyBlock, @@ -10,98 +10,108 @@ import type { DbMeta, TransactionMeta, CarGroup, - CarLog -} from './types' -import type { BlockstoreOpts } from './transaction' - -import { encodeCarFiles, encodeCarHeader, parseCarFile } from './loader-helpers' -import { decodeEncryptedCar, encryptedEncodeCarFile } from './encrypt-helpers' - -import { getCrypto, randomBytes } from './crypto-web' -import { DataStore, MetaStore } from './store' -import { RemoteWAL } from './remote-wal' - -import { DataStore as AbstractDataStore, MetaStore as AbstractMetaStore } from './store' -import type { CarTransaction } from './transaction' -import { CommitQueue } from './commit-queue' + CarLog, +} from "./types"; +import type { BlockstoreOpts } from "./transaction"; + +import { encodeCarFile, encodeCarHeader, parseCarFile } from "./loader-helpers"; +import { decodeEncryptedCar, encryptedEncodeCarFile } from "./encrypt-helpers"; + +import { getCrypto, randomBytes } from "./crypto-web"; +import { DataStore, MetaStore } from "./store"; +import { RemoteWAL } from "./remote-wal"; + +import { + DataStore as AbstractDataStore, + MetaStore as AbstractMetaStore, +} from "./store"; +import { CarTransaction } from "./transaction"; +import { CommitQueue } from "./commit-queue"; +import * as CBW from "@ipld/car/buffer-writer"; +import { Block, encode, decode } from "multiformats/block"; export function cidListIncludes(list: CarLog, cids: CarGroup) { return list.some((arr: CarGroup) => { - return arr.toString() === cids.toString() - }) + return arr.toString() === cids.toString(); + }); } // this works for car groups because toString looks like bafy,bafy function uniqueCids(list: CarLog, remove: Set = new Set()): CarLog { - const byString = new Map() + const byString = new Map(); for (const cid of list) { - if (remove.has(cid.toString())) continue - byString.set(cid.toString(), cid) + if (remove.has(cid.toString())) continue; + byString.set(cid.toString(), cid); } - return [...byString.values()] + return [...byString.values()]; } export function toHexString(byteArray: Uint8Array) { return Array.from(byteArray) - .map(byte => byte.toString(16).padStart(2, '0')) - .join('') + .map((byte) => byte.toString(16).padStart(2, "0")) + .join(""); } abstract class AbstractRemoteMetaStore extends AbstractMetaStore { - abstract handleByteHeads(byteHeads: Uint8Array[], branch?: string): Promise + abstract handleByteHeads( + byteHeads: Uint8Array[], + branch?: string + ): Promise; } export abstract class Loadable { - name: string = '' - remoteCarStore?: DataStore - carStore?: DataStore - carLog: CarLog = new Array() - remoteMetaStore?: AbstractRemoteMetaStore - remoteFileStore?: AbstractDataStore - fileStore?: DataStore + name: string = ""; + remoteCarStore?: DataStore; + carStore?: DataStore; + carLog: CarLog = new Array(); + remoteMetaStore?: AbstractRemoteMetaStore; + remoteFileStore?: AbstractDataStore; + fileStore?: DataStore; } export class Loader implements Loadable { - name: string - ebOpts: BlockstoreOpts - commitQueue: CommitQueue = new CommitQueue() - isCompacting = false - isWriting = false - remoteMetaStore?: AbstractRemoteMetaStore - remoteCarStore?: AbstractDataStore - fileStore: DataStore - remoteFileStore?: AbstractDataStore - remoteWAL: RemoteWAL - metaStore?: MetaStore - carStore: DataStore - carLog: CarLog = new Array() - carReaders: Map> = new Map() - ready: Promise - key?: string - keyId?: string - seenCompacted: Set = new Set() - processedCars: Set = new Set() - writing: Promise = Promise.resolve() - - private getBlockCache: Map = new Map() - private seenMeta: Set = new Set() + name: string; + ebOpts: BlockstoreOpts; + commitQueue: CommitQueue = new CommitQueue(); + isCompacting = false; + isWriting = false; + remoteMetaStore?: AbstractRemoteMetaStore; + remoteCarStore?: AbstractDataStore; + fileStore: DataStore; + remoteFileStore?: AbstractDataStore; + remoteWAL: RemoteWAL; + metaStore?: MetaStore; + carStore: DataStore; + carLog: CarLog = new Array(); + carReaders: Map> = new Map(); + ready: Promise; + key?: string; + keyId?: string; + seenCompacted: Set = new Set(); + processedCars: Set = new Set(); + writing: Promise = Promise.resolve(); + + private getBlockCache: Map = new Map(); + private seenMeta: Set = new Set(); constructor(name: string, ebOpts: BlockstoreOpts) { - this.name = name - this.ebOpts = ebOpts + this.name = name; + this.ebOpts = ebOpts; - this.carStore = ebOpts.store.makeDataStore(this.name) - this.fileStore = ebOpts.store.makeDataStore(this.name) - this.remoteWAL = ebOpts.store.makeRemoteWAL(this) + this.carStore = ebOpts.store.makeDataStore(this.name); + this.fileStore = ebOpts.store.makeDataStore(this.name); + this.remoteWAL = ebOpts.store.makeRemoteWAL(this); this.ready = Promise.resolve().then(async () => { - this.metaStore = ebOpts.store.makeMetaStore(this) + this.metaStore = ebOpts.store.makeMetaStore(this); if (!this.metaStore || !this.carStore || !this.remoteWAL) - throw new Error('stores not initialized') - const metas = this.ebOpts.meta ? [this.ebOpts.meta] : await this.metaStore!.load('main') + throw new Error("stores not initialized"); + const metas = this.ebOpts.meta + ? [this.ebOpts.meta] + : await this.metaStore!.load("main"); if (metas) { - await this.handleDbMetasFromStore(metas) + await this.handleDbMetasFromStore(metas); } - }) + }); } // async snapToCar(carCid: AnyLink | string) { @@ -118,66 +128,71 @@ export class Loader implements Loadable { async handleDbMetasFromStore(metas: DbMeta[]): Promise { for (const meta of metas) { const writingFn = async () => { - this.isWriting = true - await this.mergeDbMetaIntoClock(meta) - this.isWriting = false - } - this._setWaitForWrite(writingFn) - await writingFn() + this.isWriting = true; + await this.mergeDbMetaIntoClock(meta); + this.isWriting = false; + }; + this._setWaitForWrite(writingFn); + await writingFn(); } } async mergeDbMetaIntoClock(meta: DbMeta): Promise { if (this.isCompacting) { - throw new Error('cannot merge while compacting') + throw new Error("cannot merge while compacting"); } - if (this.seenMeta.has(meta.cars.toString())) return - this.seenMeta.add(meta.cars.toString()) + if (this.seenMeta.has(meta.cars.toString())) return; + this.seenMeta.add(meta.cars.toString()); if (meta.key) { - await this.setKey(meta.key) + await this.setKey(meta.key); } if (cidListIncludes(this.carLog, meta.cars)) { - return + return; } - const carHeader = (await this.loadCarHeaderFromMeta(meta)) as CarHeader + const carHeader = (await this.loadCarHeaderFromMeta(meta)) as CarHeader; // fetch other cars down the compact log? // todo we should use a CID set for the compacted cids (how to expire?) // console.log('merge carHeader', carHeader.head.length, carHeader.head.toString(), meta.car.toString()) - carHeader.compact.map(c => c.toString()).forEach(this.seenCompacted.add, this.seenCompacted) - await this.getMoreReaders(carHeader.cars.flat()) + carHeader.compact + .map((c) => c.toString()) + .forEach(this.seenCompacted.add, this.seenCompacted); + await this.getMoreReaders(carHeader.cars.flat()); this.carLog = [ - ...uniqueCids([meta.cars, ...this.carLog, ...carHeader.cars], this.seenCompacted) - ] - await this.ebOpts.applyMeta(carHeader.meta) + ...uniqueCids( + [meta.cars, ...this.carLog, ...carHeader.cars], + this.seenCompacted + ), + ]; + await this.ebOpts.applyMeta(carHeader.meta); } protected async ingestKeyFromMeta(meta: DbMeta): Promise { - const { key } = meta + const { key } = meta; if (key) { - await this.setKey(key) + await this.setKey(key); } } async loadCarHeaderFromMeta({ cars: cids }: DbMeta): Promise { //Call loadCar for every cid - const reader = await this.loadCar(cids[0]) - return await parseCarFile(reader) + const reader = await this.loadCar(cids[0]); + return await parseCarFile(reader); } // eslint-disable-next-line @typescript-eslint/require-await async _getKey(): Promise { - if (this.key) return this.key + if (this.key) return this.key; // generate a random key if (!this.ebOpts.public) { if (getCrypto()) { - await this.setKey(toHexString(randomBytes(32))) + await this.setKey(toHexString(randomBytes(32))); } else { - console.warn('missing crypto module, using public mode') + console.warn("missing crypto module, using public mode"); } } - return this.key + return this.key; } async commitFiles( @@ -185,7 +200,9 @@ export class Loader implements Loadable { done: TransactionMeta, opts: CommitOpts = { noLoader: false, compact: false } ): Promise { - return this.commitQueue.enqueue(() => this._commitInternalFiles(t, done, opts)) + return this.commitQueue.enqueue(() => + this._commitInternalFiles(t, done, opts) + ); } // can these skip the queue? or have a file queue? async _commitInternalFiles( @@ -193,25 +210,29 @@ export class Loader implements Loadable { done: TransactionMeta, opts: CommitOpts = { noLoader: false, compact: false } ): Promise { - await this.ready + await this.ready; const { files: roots } = this.makeFileCarHeader(done) as { - files: AnyLink[] - } - const cids:AnyLink[]=[] - const cars = await this.prepareCarFiles(roots[0], t, !!opts.public) - for(const car of cars) - { - const {cid,bytes}=car - await this.fileStore!.save({ cid, bytes }) - await this.remoteWAL!.enqueueFile(cid, !!opts.public) - cids.push(cid) + files: AnyLink[]; + }; + const cids: AnyLink[] = []; + const cars = await this.prepareCarFiles(roots, t, !!opts.public); + for (const car of cars) { + const { cid, bytes } = car; + await this.fileStore!.save({ cid, bytes }); + await this.remoteWAL!.enqueueFile(cid, !!opts.public); + cids.push(cid); } - - return cids + + return cids; } async loadFileCar(cid: AnyLink, isPublic = false): Promise { - return await this.storesLoadCar(cid, this.fileStore, this.remoteFileStore, isPublic) + return await this.storesLoadCar( + cid, + this.fileStore, + this.remoteFileStore, + isPublic + ); } async commit( @@ -219,25 +240,25 @@ export class Loader implements Loadable { done: TransactionMeta, opts: CommitOpts = { noLoader: false, compact: false } ): Promise { - return this.commitQueue.enqueue(() => this._commitInternal(t, done, opts)) + return this.commitQueue.enqueue(() => this._commitInternal(t, done, opts)); } async cacheTransaction(t: CarTransaction) { for await (const block of t.entries()) { - const sBlock = block.cid.toString() + const sBlock = block.cid.toString(); if (!this.getBlockCache.has(sBlock)) { - this.getBlockCache.set(sBlock, block) + this.getBlockCache.set(sBlock, block); } } } async cacheCarReader(carCidStr: string, reader: CarReader) { - if (this.processedCars.has(carCidStr)) return - this.processedCars.add(carCidStr) + if (this.processedCars.has(carCidStr)) return; + this.processedCars.add(carCidStr); for await (const block of reader.blocks()) { - const sBlock = block.cid.toString() + const sBlock = block.cid.toString(); if (!this.getBlockCache.has(sBlock)) { - this.getBlockCache.set(sBlock, block) + this.getBlockCache.set(sBlock, block); } } } @@ -247,77 +268,126 @@ export class Loader implements Loadable { done: TransactionMeta, opts: CommitOpts = { noLoader: false, compact: false } ): Promise { - await this.ready - const fp = this.makeCarHeader(done, this.carLog, !!opts.compact) as CarHeader - let roots: AnyLink[] = await this.prepareRoots(fp, t) + await this.ready; + const fp = this.makeCarHeader( + done, + this.carLog, + !!opts.compact + ) as CarHeader; + let roots: AnyLink[] = await this.prepareRoots(fp, t); //We need to split the data inside the prepareCarFile? //While splitting every CAR file should have a copy of root[0] meaning it should have a copy of fp // Maximum size of each CAR file as 1mb? - const cars = await this.prepareCarFiles(roots[0], t, !!opts.public) - const cids:AnyLink[]=[] - for(const car of cars) - { - const {cid,bytes}=car - await this.carStore!.save({ cid, bytes }) - await this.cacheTransaction(t) - const newDbMeta = { cars: [cid], key: this.key || null } as DbMeta - await this.remoteWAL!.enqueue(newDbMeta, opts) - await this.metaStore!.save(newDbMeta) - await this.updateCarLog([cid], fp, !!opts.compact) - cids.push(cid) + const cars = await this.prepareCarFiles(roots, t, !!opts.public); + const cids: AnyLink[] = []; + for (const car of cars) { + const { cid, bytes } = car; + await this.carStore!.save({ cid, bytes }); + await this.cacheTransaction(t); + const newDbMeta = { cars: [cid], key: this.key || null } as DbMeta; + await this.remoteWAL!.enqueue(newDbMeta, opts); + await this.metaStore!.save(newDbMeta); + await this.updateCarLog([cid], fp, !!opts.compact); + cids.push(cid); } - - return cids + + return cids; } async prepareRoots(fp: CarHeader, t: CarTransaction): Promise { - const header = await encodeCarHeader(fp) - await t.put(header.cid, header.bytes) + const header = await encodeCarHeader(fp); + await t.put(header.cid, header.bytes); // const got = await t.get(header.cid) // if (!got) throw new Error('missing header block: ' + header.cid.toString()) - return [header.cid] + return [header.cid]; } async prepareCarFiles( - root: AnyLink, + roots: AnyLink[], t: CarTransaction, isPublic: boolean ): Promise<{ cid: AnyLink; bytes: Uint8Array }[]> { - const theKey = isPublic ? null : await this._getKey() - const carFiles= theKey && this.ebOpts.crypto - ? await encryptedEncodeCarFile(this.ebOpts.crypto, theKey, root, t) - : await encodeCarFiles([root], t) - return carFiles + const theKey = isPublic ? null : await this._getKey(); + const carFiles: { cid: AnyLink; bytes: Uint8Array }[] = []; + let size = 0; + let threshold = 1024 * 1024; + let count = 0; + // @ts-ignore -- TODO: TypeScript does not like this casting + const headerSize = CBW.headerLength({ roots } as { + roots: CID[]; + }); + size += headerSize; + for (const { cid, bytes } of t.entries()) { + // @ts-ignore -- TODO: TypeScript does not like this casting + size += CBW.blockLength({ cid, bytes } as Block< + unknown, + number, + number, + 1 + >); + count++; + } + + let splitcount = Math.ceil(size / threshold); + let splitpercount = Math.floor(count / splitcount); + let newcount = 0; + let clonedt = new CarTransaction(t.parent); + for (const { cid, bytes } of t.entries()) { + newcount++; + clonedt.putSync(cid, bytes); + if (newcount > splitpercount) { + const carFile = + theKey && this.ebOpts.crypto + ? await encryptedEncodeCarFile( + this.ebOpts.crypto, + theKey, + roots[0], + clonedt + ) + : await encodeCarFile([roots[0]], clonedt); + carFiles.push(carFile); + clonedt=new CarTransaction(t.parent) + } + } + return carFiles; } protected makeFileCarHeader(result: TransactionMeta): TransactionMeta { - const files: AnyLink[] = [] + const files: AnyLink[] = []; for (const [, meta] of Object.entries(result.files!)) { - if (meta && typeof meta === 'object' && 'cid' in meta && meta !== null) { - files.push(meta.cid as AnyLink) + if (meta && typeof meta === "object" && "cid" in meta && meta !== null) { + files.push(meta.cid as AnyLink); } } - return { files } + return { files }; } - async updateCarLog(cids: CarGroup, fp: CarHeader, compact: boolean): Promise { + async updateCarLog( + cids: CarGroup, + fp: CarHeader, + compact: boolean + ): Promise { if (compact) { - const previousCompactCid = fp.compact[fp.compact.length - 1] - fp.compact.map(c => c.toString()).forEach(this.seenCompacted.add, this.seenCompacted) - this.carLog = [...uniqueCids([...this.carLog, ...fp.cars, cids], this.seenCompacted)] - void this.removeCidsForCompact(previousCompactCid[0]) + const previousCompactCid = fp.compact[fp.compact.length - 1]; + fp.compact + .map((c) => c.toString()) + .forEach(this.seenCompacted.add, this.seenCompacted); + this.carLog = [ + ...uniqueCids([...this.carLog, ...fp.cars, cids], this.seenCompacted), + ]; + void this.removeCidsForCompact(previousCompactCid[0]); } else { - this.carLog.unshift(cids) + this.carLog.unshift(cids); } } async removeCidsForCompact(cid: AnyLink) { const carHeader = await this.loadCarHeaderFromMeta({ - cars: [cid] - } as unknown as DbMeta) + cars: [cid], + } as unknown as DbMeta); for (const cids of carHeader.compact) { for (const cid of cids) { - await this.carStore!.remove(cid) + await this.carStore!.remove(cid); } } } @@ -332,23 +402,23 @@ export class Loader implements Loadable { // } async *entries(cache = true): AsyncIterableIterator { - await this.ready + await this.ready; if (cache) { for (const [, block] of this.getBlockCache) { - yield block + yield block; } } else { for (const [, block] of this.getBlockCache) { - yield block + yield block; } for (const cids of this.carLog) { for (const cid of cids) { - const reader = await this.loadCar(cid) - if (!reader) throw new Error(`missing car reader ${cid.toString()}`) + const reader = await this.loadCar(cid); + if (!reader) throw new Error(`missing car reader ${cid.toString()}`); for await (const block of reader.blocks()) { - const sCid = block.cid.toString() + const sCid = block.cid.toString(); if (!this.getBlockCache.has(sCid)) { - yield block + yield block; } } } @@ -357,85 +427,85 @@ export class Loader implements Loadable { } async getBlock(cid: AnyLink): Promise { - await this.ready - const sCid = cid.toString() - if (this.getBlockCache.has(sCid)) return this.getBlockCache.get(sCid) + await this.ready; + const sCid = cid.toString(); + if (this.getBlockCache.has(sCid)) return this.getBlockCache.get(sCid); const getCarCid = async (carCid: AnyLink) => { - const reader = await this.loadCar(carCid) + const reader = await this.loadCar(carCid); if (!reader) { - throw new Error(`missing car reader ${carCid.toString()}`) + throw new Error(`missing car reader ${carCid.toString()}`); } - await this.cacheCarReader(carCid.toString(), reader).catch(e => {}) - if (this.getBlockCache.has(sCid)) return this.getBlockCache.get(sCid) - throw new Error(`block not in reader: ${cid.toString()}`) - } + await this.cacheCarReader(carCid.toString(), reader).catch((e) => {}); + if (this.getBlockCache.has(sCid)) return this.getBlockCache.get(sCid); + throw new Error(`block not in reader: ${cid.toString()}`); + }; const getCompactCarCids = async (carCid: AnyLink) => { - const reader = await this.loadCar(carCid) + const reader = await this.loadCar(carCid); if (!reader) { - throw new Error(`missing car reader ${carCid.toString()}`) + throw new Error(`missing car reader ${carCid.toString()}`); } - const header = await parseCarFile(reader) + const header = await parseCarFile(reader); - const compacts = header.compact + const compacts = header.compact; - let got: AnyBlock | undefined - const batchSize = 5 + let got: AnyBlock | undefined; + const batchSize = 5; for (let i = 0; i < compacts.length; i += batchSize) { - const promises: Promise[] = [] + const promises: Promise[] = []; for (let j = i; j < Math.min(i + batchSize, compacts.length); j++) { for (const cid of compacts[j]) { - promises.push(getCarCid(cid)) + promises.push(getCarCid(cid)); } } try { - got = await Promise.any(promises) + got = await Promise.any(promises); } catch { // Ignore the error and continue with the next iteration } - if (got) break + if (got) break; } - if (this.getBlockCache.has(sCid)) return this.getBlockCache.get(sCid) - throw new Error(`block not in compact reader: ${cid.toString()}`) - } + if (this.getBlockCache.has(sCid)) return this.getBlockCache.get(sCid); + throw new Error(`block not in compact reader: ${cid.toString()}`); + }; - let got - const batchSize = 5 + let got; + const batchSize = 5; for (let i = 0; i < this.carLog.length; i += batchSize) { - const promises: Promise[] = [] + const promises: Promise[] = []; for (let j = i; j < Math.min(i + batchSize, this.carLog.length); j++) { for (const cid of this.carLog[j]) { - promises.push(getCarCid(cid)) + promises.push(getCarCid(cid)); } } try { - got = await Promise.any(promises) + got = await Promise.any(promises); } catch { // Ignore the error and continue with the next iteration } - if (got) break + if (got) break; } if (!got) { for (let i = 0; i < this.carLog.length; i += batchSize) { - const promises: Promise[] = [] + const promises: Promise[] = []; for (let j = i; j < Math.min(i + batchSize, this.carLog.length); j++) { for (const cid of this.carLog[j]) { - promises.push(getCarCid(cid)) + promises.push(getCarCid(cid)); } } try { - got = await Promise.any(promises) + got = await Promise.any(promises); } catch { // Ignore the error and continue with the next iteration } - if (got) break + if (got) break; } } - return got + return got; } protected makeCarHeader( @@ -443,14 +513,20 @@ export class Loader implements Loadable { cars: CarLog, compact: boolean = false ): CarHeader { - const coreHeader = compact ? { cars: [], compact: cars } : { cars, compact: [] } - return { ...coreHeader, meta: result } + const coreHeader = compact + ? { cars: [], compact: cars } + : { cars, compact: [] }; + return { ...coreHeader, meta: result }; } protected async loadCar(cid: AnyLink): Promise { - if (!this.carStore) throw new Error('car store not initialized') - const loaded = await this.storesLoadCar(cid, this.carStore, this.remoteCarStore) - return loaded + if (!this.carStore) throw new Error("car store not initialized"); + const loaded = await this.storesLoadCar( + cid, + this.carStore, + this.remoteCarStore + ); + return loaded; } //What if instead it returns an Array of CarHeader @@ -460,121 +536,128 @@ export class Loader implements Loadable { remote?: AbstractDataStore, publicFiles?: boolean ): Promise { - const cidsString = cid.toString() + const cidsString = cid.toString(); if (!this.carReaders.has(cidsString)) { this.carReaders.set( cidsString, (async () => { - let loadedCar: AnyBlock | null = null + let loadedCar: AnyBlock | null = null; try { //loadedCar now is an array of AnyBlocks - loadedCar = await local.load(cid) + loadedCar = await local.load(cid); } catch (e) { if (remote) { - const remoteCar = await remote.load(cid) + const remoteCar = await remote.load(cid); if (remoteCar) { // todo test for this - await local.save(remoteCar) - loadedCar = remoteCar + await local.save(remoteCar); + loadedCar = remoteCar; } } } - if (!loadedCar) throw new Error(`missing car files ${cidsString}`) + if (!loadedCar) throw new Error(`missing car files ${cidsString}`); //This needs a fix as well as the fromBytes function expects a Uint8Array //Either we can merge the bytes or return an array of rawReaders - const rawReader = await CarReader.fromBytes(loadedCar.bytes) + const rawReader = await CarReader.fromBytes(loadedCar.bytes); const readerP = publicFiles ? Promise.resolve(rawReader) - : this.ensureDecryptedReader(rawReader) - this.carReaders.set(cidsString, readerP) - return readerP - })().catch(e => { - this.carReaders.delete(cidsString) - throw e + : this.ensureDecryptedReader(rawReader); + this.carReaders.set(cidsString, readerP); + return readerP; + })().catch((e) => { + this.carReaders.delete(cidsString); + throw e; }) - ) + ); } - return this.carReaders.get(cidsString) as Promise + return this.carReaders.get(cidsString) as Promise; } protected async ensureDecryptedReader(reader: CarReader): Promise { - const theKey = await this._getKey() - if (this.ebOpts.public || !(theKey && this.ebOpts.crypto)) return reader - const { blocks, root } = await decodeEncryptedCar(this.ebOpts.crypto, theKey, reader) + const theKey = await this._getKey(); + if (this.ebOpts.public || !(theKey && this.ebOpts.crypto)) return reader; + const { blocks, root } = await decodeEncryptedCar( + this.ebOpts.crypto, + theKey, + reader + ); return { getRoots: () => [root], get: blocks.get.bind(blocks), - blocks: blocks.entries.bind(blocks) - } as unknown as CarReader + blocks: blocks.entries.bind(blocks), + } as unknown as CarReader; } protected async setKey(key: string) { - if (this.key && this.key !== key) throw new Error('key mismatch') - this.key = key - const crypto = getCrypto() - if (!crypto) throw new Error('missing crypto module') - const subtle = crypto.subtle - const encoder = new TextEncoder() - const data = encoder.encode(key) - const hashBuffer = await subtle.digest('SHA-256', data) - const hashArray = Array.from(new Uint8Array(hashBuffer)) - this.keyId = hashArray.map(b => b.toString(16).padStart(2, '0')).join('') + if (this.key && this.key !== key) throw new Error("key mismatch"); + this.key = key; + const crypto = getCrypto(); + if (!crypto) throw new Error("missing crypto module"); + const subtle = crypto.subtle; + const encoder = new TextEncoder(); + const data = encoder.encode(key); + const hashBuffer = await subtle.digest("SHA-256", data); + const hashArray = Array.from(new Uint8Array(hashBuffer)); + this.keyId = hashArray.map((b) => b.toString(16).padStart(2, "0")).join(""); } protected async getMoreReaders(cids: AnyLink[]) { - const limit = pLimit(5) - const missing = cids.filter(cid => !this.carReaders.has(cid.toString())) - await Promise.all(missing.map(cid => limit(() => this.loadCar(cid)))) + const limit = pLimit(5); + const missing = cids.filter((cid) => !this.carReaders.has(cid.toString())); + await Promise.all(missing.map((cid) => limit(() => this.loadCar(cid)))); } async _setWaitForWrite(_writingFn: () => Promise): Promise { - const wr = this.writing + const wr = this.writing; this.writing = wr.then(async () => { - await _writingFn() - return wr - }) - return this.writing.then(() => {}) + await _writingFn(); + return wr; + }); + return this.writing.then(() => {}); } } // duplicated in @fireproof/connect export type UploadMetaFnParams = { - name: string - branch: string -} + name: string; + branch: string; +}; export type UploadDataFnParams = { - type: 'data' | 'file' - name: string - car: string - size: string -} + type: "data" | "file"; + name: string; + car: string; + size: string; +}; -export type DownloadFnParamTypes = 'data' | 'file' +export type DownloadFnParamTypes = "data" | "file"; export type DownloadDataFnParams = { - type: DownloadFnParamTypes - name: string - car: string -} + type: DownloadFnParamTypes; + name: string; + car: string; +}; export type DownloadMetaFnParams = { - name: string - branch: string -} + name: string; + branch: string; +}; export interface Connection { - loader: Loader - loaded: Promise - connectMeta({ loader }: { loader: Loader }): void - connectStorage({ loader }: { loader: Loader }): void + loader: Loader; + loaded: Promise; + connectMeta({ loader }: { loader: Loader }): void; + connectStorage({ loader }: { loader: Loader }): void; - metaUpload(bytes: Uint8Array, params: UploadMetaFnParams): Promise + metaUpload( + bytes: Uint8Array, + params: UploadMetaFnParams + ): Promise; dataUpload( bytes: Uint8Array, params: UploadDataFnParams, opts?: { public?: boolean } - ): Promise - metaDownload(params: DownloadMetaFnParams): Promise - dataDownload(params: DownloadDataFnParams): Promise + ): Promise; + metaDownload(params: DownloadMetaFnParams): Promise; + dataDownload(params: DownloadDataFnParams): Promise; }