From 91b71f2ee88fa40e6f75fe1436dbb80a4c46b3d4 Mon Sep 17 00:00:00 2001 From: Chris Anderson Date: Tue, 10 Sep 2024 18:17:27 -0700 Subject: [PATCH 01/10] subscribe --- src/blockstore/fragment-gateway.ts | 7 +++++++ src/blockstore/gateway.ts | 2 ++ src/blockstore/store.ts | 4 ++++ 3 files changed, 13 insertions(+) diff --git a/src/blockstore/fragment-gateway.ts b/src/blockstore/fragment-gateway.ts index 848ee114..a68edd5e 100644 --- a/src/blockstore/fragment-gateway.ts +++ b/src/blockstore/fragment-gateway.ts @@ -167,6 +167,13 @@ export class FragmentGateway implements bs.Gateway { return Result.Ok(buffer || new Uint8Array(0)); } + async subscribe(url: URI, callback: (msg: Uint8Array) => void): Promise { + if (!this.innerGW.subscribe) { + throw this.logger.Error().Msg("Subscribe not supported").AsError(); + } + return this.innerGW.subscribe(url, callback); + } + async delete(url: URI): Promise { const rfrags = await getFrags(url, this.innerGW, this.headerSize, this.logger); for (const rfrag of rfrags) { diff --git a/src/blockstore/gateway.ts b/src/blockstore/gateway.ts index 7c10003c..c0d6fb76 100644 --- a/src/blockstore/gateway.ts +++ b/src/blockstore/gateway.ts @@ -24,4 +24,6 @@ export interface Gateway { // get could return a NotFoundError if the key is not found get(url: URI): Promise; delete(url: URI): Promise; + // be notified of remote meta + subscribe?(url: URI, callback: (meta: Uint8Array) => void): Promise; } diff --git a/src/blockstore/store.ts b/src/blockstore/store.ts index 99e30eaf..1ae32e07 100644 --- a/src/blockstore/store.ts +++ b/src/blockstore/store.ts @@ -151,6 +151,10 @@ export class MetaStoreImpl extends BaseStoreImpl implements MetaStore { sthis, ensureLogger(sthis, "MetaStoreImpl"), ); + if (opts.gateway.subscribe) { + this.logger.Debug().Str("url", url.toString()).Msg("Subscribing to the gateway with URL"); + opts.gateway.subscribe(url, (byteHead: Uint8Array) => this.handleEventByteHead(byteHead)); + } } makeHeader({ cars }: DbMeta): ToString { From efb83f931a0500287e241925ba619bc5aa707649 Mon Sep 17 00:00:00 2001 From: Chris Anderson Date: Tue, 10 Sep 2024 20:31:09 -0700 Subject: [PATCH 02/10] remote is only one to subscribe (for now) --- src/blockstore/store-remote.ts | 2 +- src/blockstore/store.ts | 5 +++-- src/crdt.ts | 3 ++- tests/fireproof/fireproof.test.ts | 2 +- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/blockstore/store-remote.ts b/src/blockstore/store-remote.ts index ab2ea356..33a997c3 100644 --- a/src/blockstore/store-remote.ts +++ b/src/blockstore/store-remote.ts @@ -36,7 +36,7 @@ export async function RemoteDataStore(sthis: SuperThis, name: string, url: URI, return ds; } export async function RemoteMetaStore(sthis: SuperThis, name: string, url: URI, opts: StoreOpts) { - const ms = new MetaStoreImpl(sthis, name, url, opts); + const ms = new MetaStoreImpl(sthis, name, url, opts, true); await ms.start(); return ms; } diff --git a/src/blockstore/store.ts b/src/blockstore/store.ts index 1ae32e07..5416983f 100644 --- a/src/blockstore/store.ts +++ b/src/blockstore/store.ts @@ -139,7 +139,7 @@ export class MetaStoreImpl extends BaseStoreImpl implements MetaStore { readonly subscribers = new Map(); parents: CarClockHead = []; - constructor(sthis: SuperThis, name: string, url: URI, opts: StoreOpts) { + constructor(sthis: SuperThis, name: string, url: URI, opts: StoreOpts, remote?: boolean) { // const my = new URL(url.toString()); // my.searchParams.set("storekey", 'insecure'); super( @@ -151,7 +151,8 @@ export class MetaStoreImpl extends BaseStoreImpl implements MetaStore { sthis, ensureLogger(sthis, "MetaStoreImpl"), ); - if (opts.gateway.subscribe) { + this.logger.Debug().Str("gateway", opts.gateway.subscribe?.toString()).Msg("Gateway information"); + if (remote && opts.gateway.subscribe) { this.logger.Debug().Str("url", url.toString()).Msg("Subscribing to the gateway with URL"); opts.gateway.subscribe(url, (byteHead: Uint8Array) => this.handleEventByteHead(byteHead)); } diff --git a/src/crdt.ts b/src/crdt.ts index fe519a42..96767eb8 100644 --- a/src/crdt.ts +++ b/src/crdt.ts @@ -123,7 +123,8 @@ export class CRDT { // await this.clock.ready(); await Promise.all([this.blockstore.ready(), this.indexBlockstore.ready(), this.clock.ready()]); } catch (e) { - throw this.logger.Error().Err(e).Msg("CRDT not ready").AsError(); + const ee = e as Error; + throw this.logger.Error().Err(e).Msg(`CRDT is not ready: ${ee.stack}`).AsError(); } }); } diff --git a/tests/fireproof/fireproof.test.ts b/tests/fireproof/fireproof.test.ts index c0370c57..5080da7e 100644 --- a/tests/fireproof/fireproof.test.ts +++ b/tests/fireproof/fireproof.test.ts @@ -180,7 +180,7 @@ describe("benchmarking with compaction", function () { await sthis.start(); db = new Database("test-benchmark-compaction", { autoCompact: 3 }); }); - it("insert during compaction", async function () { + it.skip("insert during compaction", async function () { const ok = await db.put({ _id: "test", foo: "fast" }); expect(ok).toBeTruthy(); expect(ok.id).toBe("test"); From cea83186d8e299405dbbab986212c6cf3e342f16 Mon Sep 17 00:00:00 2001 From: Chris Anderson Date: Wed, 11 Sep 2024 12:25:57 -0700 Subject: [PATCH 03/10] remote debug logging --- src/blockstore/store.ts | 18 +++++++++++++++--- tests/fireproof/hello.test.ts | 7 +++++++ 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/src/blockstore/store.ts b/src/blockstore/store.ts index 5416983f..4f95d82e 100644 --- a/src/blockstore/store.ts +++ b/src/blockstore/store.ts @@ -151,8 +151,8 @@ export class MetaStoreImpl extends BaseStoreImpl implements MetaStore { sthis, ensureLogger(sthis, "MetaStoreImpl"), ); - this.logger.Debug().Str("gateway", opts.gateway.subscribe?.toString()).Msg("Gateway information"); if (remote && opts.gateway.subscribe) { + this.logger.Debug().Str("gateway", opts.gateway.subscribe?.toString()).Msg("Gateway information"); this.logger.Debug().Str("url", url.toString()).Msg("Subscribing to the gateway with URL"); opts.gateway.subscribe(url, (byteHead: Uint8Array) => this.handleEventByteHead(byteHead)); } @@ -182,9 +182,18 @@ export class MetaStoreImpl extends BaseStoreImpl implements MetaStore { } async decodeMetaBlock(bytes: Uint8Array): Promise<{ eventCid: CarClockLink; dbMeta: DbMeta; parents: string[] }> { - const crdtEntry = JSON.parse(this.sthis.txt.decode(bytes)) as { data: string; parents: string[]; cid: string }; + const decodedString = this.sthis.txt.decode(bytes); + this.logger.Debug().Str("decodedString", decodedString).Int("length", decodedString.length).Msg("Decoded string before JSON parse"); + + const crdtEntry = JSON.parse(decodedString) as { data: string; parents: string[]; cid: string }; + this.logger.Debug().Any("crdtEntry", crdtEntry).Msg("Parsed CRDT entry"); + const eventBytes = decodeFromBase64(crdtEntry.data); + this.logger.Debug().Any("eventBytes", eventBytes).Msg("Decoded event bytes from base64"); + const eventBlock = await this.decodeEventBlock(eventBytes); + this.logger.Debug().Any("eventBlock", eventBlock).Msg("Decoded event block"); + return { eventCid: eventBlock.cid as CarClockLink, parents: crdtEntry.parents, @@ -208,7 +217,7 @@ export class MetaStoreImpl extends BaseStoreImpl implements MetaStore { async load(): Promise { const branch = "main"; - this.logger.Debug().Str("branch", branch).Msg("loading"); + this.logger.Debug().Str("branch", branch).Msg("loadingMeta"); const url = await this.gateway.buildUrl(this.url(), branch); if (url.isErr()) { throw this.logger.Error().Result("buidUrl", url).Str("branch", branch).Msg("got error from gateway.buildUrl").AsError(); @@ -216,10 +225,13 @@ export class MetaStoreImpl extends BaseStoreImpl implements MetaStore { const bytes = await this.gateway.get(url.Ok()); if (bytes.isErr()) { if (isNotFoundError(bytes)) { + this.logger.Debug().Msg("meta not found"); return undefined; } throw this.logger.Error().Url(url.Ok()).Result("bytes:", bytes).Msg("gateway get").AsError(); } + this.logger.Debug().Int("bytesLength", bytes.Ok().length).Msg("Length of bytes"); + this.logger.Debug().Str("decodedBytes", this.sthis.txt.decode(bytes.Ok())).Msg("Decoded bytes as string"); const dbMetas = await this.handleByteHeads([bytes.Ok()]); await this.loader?.handleDbMetasFromStore(dbMetas.map((m) => m.dbMeta)); // the old one didn't await const cids = dbMetas.map((m) => m.eventCid); diff --git a/tests/fireproof/hello.test.ts b/tests/fireproof/hello.test.ts index ab5025ef..1088a1a8 100644 --- a/tests/fireproof/hello.test.ts +++ b/tests/fireproof/hello.test.ts @@ -53,4 +53,11 @@ describe("hello public API", function () { expect(query.rows.length).toBe(1); expect(query.rows[0].key).toBe("bar"); }); + it('should get when you open it again', async function () { + await db.close(); + await db.destroy(); + const db2 = database("test-public-api"); + doc = await db2.get("test"); + expect(doc.foo).toBe("bar"); + }); }); From f8af0aafab3ff4cc80969a51ac3df97971c30861 Mon Sep 17 00:00:00 2001 From: Chris Anderson Date: Thu, 12 Sep 2024 07:34:29 -0700 Subject: [PATCH 04/10] unskip --- tests/fireproof/fireproof.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/fireproof/fireproof.test.ts b/tests/fireproof/fireproof.test.ts index 5080da7e..c0370c57 100644 --- a/tests/fireproof/fireproof.test.ts +++ b/tests/fireproof/fireproof.test.ts @@ -180,7 +180,7 @@ describe("benchmarking with compaction", function () { await sthis.start(); db = new Database("test-benchmark-compaction", { autoCompact: 3 }); }); - it.skip("insert during compaction", async function () { + it("insert during compaction", async function () { const ok = await db.put({ _id: "test", foo: "fast" }); expect(ok).toBeTruthy(); expect(ok.id).toBe("test"); From 23ee86f0036107f9b95a00eb6710cdef5619f896 Mon Sep 17 00:00:00 2001 From: Chris Anderson Date: Thu, 12 Sep 2024 08:20:20 -0700 Subject: [PATCH 05/10] remove logging --- src/blockstore/store.ts | 18 ++---------------- 1 file changed, 2 insertions(+), 16 deletions(-) diff --git a/src/blockstore/store.ts b/src/blockstore/store.ts index 4f95d82e..0775fec1 100644 --- a/src/blockstore/store.ts +++ b/src/blockstore/store.ts @@ -182,18 +182,8 @@ export class MetaStoreImpl extends BaseStoreImpl implements MetaStore { } async decodeMetaBlock(bytes: Uint8Array): Promise<{ eventCid: CarClockLink; dbMeta: DbMeta; parents: string[] }> { - const decodedString = this.sthis.txt.decode(bytes); - this.logger.Debug().Str("decodedString", decodedString).Int("length", decodedString.length).Msg("Decoded string before JSON parse"); - - const crdtEntry = JSON.parse(decodedString) as { data: string; parents: string[]; cid: string }; - this.logger.Debug().Any("crdtEntry", crdtEntry).Msg("Parsed CRDT entry"); - - const eventBytes = decodeFromBase64(crdtEntry.data); - this.logger.Debug().Any("eventBytes", eventBytes).Msg("Decoded event bytes from base64"); - - const eventBlock = await this.decodeEventBlock(eventBytes); - this.logger.Debug().Any("eventBlock", eventBlock).Msg("Decoded event block"); - + const crdtEntry = JSON.parse(this.sthis.txt.decode(bytes)) as { data: string; parents: string[]; cid: string }; + const eventBlock = await this.decodeEventBlock(decodeFromBase64(crdtEntry.data)); return { eventCid: eventBlock.cid as CarClockLink, parents: crdtEntry.parents, @@ -217,7 +207,6 @@ export class MetaStoreImpl extends BaseStoreImpl implements MetaStore { async load(): Promise { const branch = "main"; - this.logger.Debug().Str("branch", branch).Msg("loadingMeta"); const url = await this.gateway.buildUrl(this.url(), branch); if (url.isErr()) { throw this.logger.Error().Result("buidUrl", url).Str("branch", branch).Msg("got error from gateway.buildUrl").AsError(); @@ -225,13 +214,10 @@ export class MetaStoreImpl extends BaseStoreImpl implements MetaStore { const bytes = await this.gateway.get(url.Ok()); if (bytes.isErr()) { if (isNotFoundError(bytes)) { - this.logger.Debug().Msg("meta not found"); return undefined; } throw this.logger.Error().Url(url.Ok()).Result("bytes:", bytes).Msg("gateway get").AsError(); } - this.logger.Debug().Int("bytesLength", bytes.Ok().length).Msg("Length of bytes"); - this.logger.Debug().Str("decodedBytes", this.sthis.txt.decode(bytes.Ok())).Msg("Decoded bytes as string"); const dbMetas = await this.handleByteHeads([bytes.Ok()]); await this.loader?.handleDbMetasFromStore(dbMetas.map((m) => m.dbMeta)); // the old one didn't await const cids = dbMetas.map((m) => m.eventCid); From f5eb11ee51f1afb61015de87be9cc9fd236308b5 Mon Sep 17 00:00:00 2001 From: Chris Anderson Date: Thu, 12 Sep 2024 08:20:32 -0700 Subject: [PATCH 06/10] format --- tests/fireproof/hello.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/fireproof/hello.test.ts b/tests/fireproof/hello.test.ts index 1088a1a8..95234816 100644 --- a/tests/fireproof/hello.test.ts +++ b/tests/fireproof/hello.test.ts @@ -53,7 +53,7 @@ describe("hello public API", function () { expect(query.rows.length).toBe(1); expect(query.rows[0].key).toBe("bar"); }); - it('should get when you open it again', async function () { + it("should get when you open it again", async function () { await db.close(); await db.destroy(); const db2 = database("test-public-api"); From 3901d0a1669f74cd502f23170adf3e1078cf3399 Mon Sep 17 00:00:00 2001 From: Chris Anderson Date: Thu, 12 Sep 2024 08:24:38 -0700 Subject: [PATCH 07/10] cleanup log --- src/blockstore/store.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/blockstore/store.ts b/src/blockstore/store.ts index 0775fec1..db2982f5 100644 --- a/src/blockstore/store.ts +++ b/src/blockstore/store.ts @@ -152,8 +152,7 @@ export class MetaStoreImpl extends BaseStoreImpl implements MetaStore { ensureLogger(sthis, "MetaStoreImpl"), ); if (remote && opts.gateway.subscribe) { - this.logger.Debug().Str("gateway", opts.gateway.subscribe?.toString()).Msg("Gateway information"); - this.logger.Debug().Str("url", url.toString()).Msg("Subscribing to the gateway with URL"); + this.logger.Debug().Str("url", url.toString()).Msg("Subscribing to the gateway"); opts.gateway.subscribe(url, (byteHead: Uint8Array) => this.handleEventByteHead(byteHead)); } } From 8be46ad19798bb12b621dff14c6d09004efd6343 Mon Sep 17 00:00:00 2001 From: Chris Anderson Date: Thu, 12 Sep 2024 08:27:07 -0700 Subject: [PATCH 08/10] VoidResult --- src/blockstore/fragment-gateway.ts | 10 ++++++---- src/blockstore/gateway.ts | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/blockstore/fragment-gateway.ts b/src/blockstore/fragment-gateway.ts index a68edd5e..1086b142 100644 --- a/src/blockstore/fragment-gateway.ts +++ b/src/blockstore/fragment-gateway.ts @@ -3,6 +3,7 @@ import { bs, ensureSuperLog, Logger, Result, SuperThis } from "@fireproof/core"; import { base58btc } from "multiformats/bases/base58"; import { encode, decode } from "cborg"; +import { VoidResult } from "./gateway"; function getFragSize(url: URI): number { const fragSize = url.getParam("fragSize"); @@ -167,11 +168,12 @@ export class FragmentGateway implements bs.Gateway { return Result.Ok(buffer || new Uint8Array(0)); } - async subscribe(url: URI, callback: (msg: Uint8Array) => void): Promise { - if (!this.innerGW.subscribe) { - throw this.logger.Error().Msg("Subscribe not supported").AsError(); + async subscribe(url: URI, callback: (msg: Uint8Array) => void): Promise { + if (this.innerGW.subscribe) { + return this.innerGW.subscribe(url, callback); + } else { + return Result.Err(this.logger.Error().Msg("Subscribe not supported").AsError()); } - return this.innerGW.subscribe(url, callback); } async delete(url: URI): Promise { diff --git a/src/blockstore/gateway.ts b/src/blockstore/gateway.ts index c0d6fb76..0d913760 100644 --- a/src/blockstore/gateway.ts +++ b/src/blockstore/gateway.ts @@ -25,5 +25,5 @@ export interface Gateway { get(url: URI): Promise; delete(url: URI): Promise; // be notified of remote meta - subscribe?(url: URI, callback: (meta: Uint8Array) => void): Promise; + subscribe?(url: URI, callback: (meta: Uint8Array) => void): Promise; } From 37b8081644dc1924d47df1aedee902662120f370 Mon Sep 17 00:00:00 2001 From: Chris Anderson Date: Thu, 12 Sep 2024 08:29:11 -0700 Subject: [PATCH 09/10] void result --- src/blockstore/fragment-gateway.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/blockstore/fragment-gateway.ts b/src/blockstore/fragment-gateway.ts index 1086b142..bf6c854b 100644 --- a/src/blockstore/fragment-gateway.ts +++ b/src/blockstore/fragment-gateway.ts @@ -3,7 +3,7 @@ import { bs, ensureSuperLog, Logger, Result, SuperThis } from "@fireproof/core"; import { base58btc } from "multiformats/bases/base58"; import { encode, decode } from "cborg"; -import { VoidResult } from "./gateway"; + function getFragSize(url: URI): number { const fragSize = url.getParam("fragSize"); From 6fde74fde93134575435e943122185d99ef8cd18 Mon Sep 17 00:00:00 2001 From: Chris Anderson Date: Thu, 12 Sep 2024 08:30:15 -0700 Subject: [PATCH 10/10] format --- src/blockstore/fragment-gateway.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/blockstore/fragment-gateway.ts b/src/blockstore/fragment-gateway.ts index bf6c854b..ed0fe6f0 100644 --- a/src/blockstore/fragment-gateway.ts +++ b/src/blockstore/fragment-gateway.ts @@ -4,7 +4,6 @@ import { bs, ensureSuperLog, Logger, Result, SuperThis } from "@fireproof/core"; import { base58btc } from "multiformats/bases/base58"; import { encode, decode } from "cborg"; - function getFragSize(url: URI): number { const fragSize = url.getParam("fragSize"); let ret = 0;