Skip to content

Commit

Permalink
Merge pull request #210 from fireproof-storage/gateway-subscribe
Browse files Browse the repository at this point in the history
subscribe
  • Loading branch information
jchris committed Sep 12, 2024
2 parents 3c74a81 + 6fde74f commit d40ca0b
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 6 deletions.
8 changes: 8 additions & 0 deletions src/blockstore/fragment-gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,14 @@ export class FragmentGateway implements bs.Gateway {
return Result.Ok(buffer || new Uint8Array(0));
}

async subscribe(url: URI, callback: (msg: Uint8Array) => void): Promise<bs.VoidResult> {
if (this.innerGW.subscribe) {
return this.innerGW.subscribe(url, callback);
} else {
return Result.Err(this.logger.Error().Msg("Subscribe not supported").AsError());
}
}

async delete(url: URI): Promise<bs.VoidResult> {
const rfrags = await getFrags(url, this.innerGW, this.headerSize, this.logger);
for (const rfrag of rfrags) {
Expand Down
2 changes: 2 additions & 0 deletions src/blockstore/gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ export interface Gateway {
// get could return a NotFoundError if the key is not found
get(url: URI): Promise<GetResult>;
delete(url: URI): Promise<VoidResult>;
// be notified of remote meta
subscribe?(url: URI, callback: (meta: Uint8Array) => void): Promise<VoidResult>;
}
2 changes: 1 addition & 1 deletion src/blockstore/store-remote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export async function RemoteDataStore(sthis: SuperThis, name: string, url: URI,
return ds;
}
export async function RemoteMetaStore(sthis: SuperThis, name: string, url: URI, opts: StoreOpts) {
const ms = new MetaStoreImpl(sthis, name, url, opts);
const ms = new MetaStoreImpl(sthis, name, url, opts, true);
await ms.start();
return ms;
}
Expand Down
10 changes: 6 additions & 4 deletions src/blockstore/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ export class MetaStoreImpl extends BaseStoreImpl implements MetaStore {
readonly subscribers = new Map<string, LoadHandler[]>();
parents: CarClockHead = [];

constructor(sthis: SuperThis, name: string, url: URI, opts: StoreOpts) {
constructor(sthis: SuperThis, name: string, url: URI, opts: StoreOpts, remote?: boolean) {
// const my = new URL(url.toString());
// my.searchParams.set("storekey", 'insecure');
super(
Expand All @@ -151,6 +151,10 @@ export class MetaStoreImpl extends BaseStoreImpl implements MetaStore {
sthis,
ensureLogger(sthis, "MetaStoreImpl"),
);
if (remote && opts.gateway.subscribe) {
this.logger.Debug().Str("url", url.toString()).Msg("Subscribing to the gateway");
opts.gateway.subscribe(url, (byteHead: Uint8Array) => this.handleEventByteHead(byteHead));
}
}

makeHeader({ cars }: DbMeta): ToString<DbMeta> {
Expand Down Expand Up @@ -178,8 +182,7 @@ export class MetaStoreImpl extends BaseStoreImpl implements MetaStore {

async decodeMetaBlock(bytes: Uint8Array): Promise<{ eventCid: CarClockLink; dbMeta: DbMeta; parents: string[] }> {
const crdtEntry = JSON.parse(this.sthis.txt.decode(bytes)) as { data: string; parents: string[]; cid: string };
const eventBytes = decodeFromBase64(crdtEntry.data);
const eventBlock = await this.decodeEventBlock(eventBytes);
const eventBlock = await this.decodeEventBlock(decodeFromBase64(crdtEntry.data));
return {
eventCid: eventBlock.cid as CarClockLink,
parents: crdtEntry.parents,
Expand All @@ -203,7 +206,6 @@ export class MetaStoreImpl extends BaseStoreImpl implements MetaStore {

async load(): Promise<DbMeta[] | Falsy> {
const branch = "main";
this.logger.Debug().Str("branch", branch).Msg("loading");
const url = await this.gateway.buildUrl(this.url(), branch);
if (url.isErr()) {
throw this.logger.Error().Result("buidUrl", url).Str("branch", branch).Msg("got error from gateway.buildUrl").AsError();
Expand Down
3 changes: 2 additions & 1 deletion src/crdt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ export class CRDT<T extends DocTypes> {
// await this.clock.ready();
await Promise.all([this.blockstore.ready(), this.indexBlockstore.ready(), this.clock.ready()]);
} catch (e) {
throw this.logger.Error().Err(e).Msg("CRDT not ready").AsError();
const ee = e as Error;
throw this.logger.Error().Err(e).Msg(`CRDT is not ready: ${ee.stack}`).AsError();
}
});
}
Expand Down
7 changes: 7 additions & 0 deletions tests/fireproof/hello.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,11 @@ describe("hello public API", function () {
expect(query.rows.length).toBe(1);
expect(query.rows[0].key).toBe("bar");
});
it("should get when you open it again", async function () {
await db.close();
await db.destroy();
const db2 = database("test-public-api");
doc = await db2.get("test");
expect(doc.foo).toBe("bar");
});
});

0 comments on commit d40ca0b

Please sign in to comment.