Skip to content

Commit

Permalink
fix(datastore): only store cached replays
Browse files Browse the repository at this point in the history
  • Loading branch information
blakebyrnes committed Nov 16, 2024
1 parent 17c50c3 commit 56a00b0
Show file tree
Hide file tree
Showing 19 changed files with 190 additions and 28 deletions.
14 changes: 14 additions & 0 deletions datastore/core/lib/QueryRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import IDatastoreApiContext from '../interfaces/IDatastoreApiContext';
import { IDatastoreManifestWithRuntime } from './DatastoreRegistry';
import { validateAuthentication, validateFunctionCoreVersions } from './datastoreUtils';
import PaymentsProcessor from './PaymentsProcessor';
import { ICacheUpdates } from '@ulixee/datastore/interfaces/IExtractorPluginCore';

export default class QueryRunner {
public startTime = Date.now();
Expand Down Expand Up @@ -150,6 +151,11 @@ export default class QueryRunner {
}
}

const cacheOutputs: ICacheUpdates = {};
options.onCacheUpdated = (sessionId, crawler, action) => {
cacheOutputs[sessionId] = { crawler, action };
};

try {
outputs = await this.context.workTracker.trackRun(run(options));
// release the hold
Expand All @@ -163,6 +169,14 @@ export default class QueryRunner {
runError = error;
}

for (const plugin of Object.values(pluginCoresByName)) {
if (plugin.afterRunExtractor)
await plugin.afterRunExtractor(options, outputs, cacheOutputs, {
scriptEntrypoint: this.datastoreManifest.runtimePath,
functionName: name,
});
}

await this.context.statsTracker.recordEntityStats({
version: options.version,
datastoreId: options.id,
Expand Down
2 changes: 1 addition & 1 deletion datastore/main/interfaces/ICrawlerOutputSchema.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { string } from '@ulixee/schema';
import { boolean, object, string } from '@ulixee/schema';

Check warning on line 1 in datastore/main/interfaces/ICrawlerOutputSchema.ts

View workflow job for this annotation

GitHub Actions / Test Node.js 18 on ubuntu-latest

'boolean' is defined but never used. Allowed unused vars must match /^_/u

Check warning on line 1 in datastore/main/interfaces/ICrawlerOutputSchema.ts

View workflow job for this annotation

GitHub Actions / Test Node.js 18 on ubuntu-latest

'object' is defined but never used. Allowed unused vars must match /^_/u

export const CrawlerOutputSchema = {
crawler: string({ description: 'The type of crawler output that has been produced.' }),
Expand Down
5 changes: 5 additions & 0 deletions datastore/main/interfaces/IExtractorContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ export default interface IExtractorContext<TSchema extends IExtractorSchema> {
readonly onQueryResult?: (result: IDatastoreQueryResult) => Promise<any> | void;
readonly queryId: string;
readonly authentication: IDatastoreApiTypes['Datastore.query']['args']['authentication'];
readonly onCacheUpdated?: (
sessionId: string,
crawler: string,
action: 'cached' | 'evicted',
) => Promise<void> | void;

readonly payment: IPayment;
crawl<T extends Crawler>(crawler: T, options?: T['runArgsType']): Promise<ICrawlerOutputSchema>;
Expand Down
12 changes: 12 additions & 0 deletions datastore/main/interfaces/IExtractorPluginCore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ import type IExtractorRunOptions from '@ulixee/datastore/interfaces/IExtractorRu
import type IExtractorSchema from '@ulixee/datastore/interfaces/IExtractorSchema';
import { ConnectionToClient, ConnectionToCore } from '@ulixee/net';

export interface ICacheUpdates {
[sessionId: string]: { action: 'cached' | 'evicted'; crawler: string };
}
export default interface IExtractorPluginCore<ISchema extends IExtractorSchema = any> {
name: string;
version: string;
Expand All @@ -22,5 +25,14 @@ export default interface IExtractorPluginCore<ISchema extends IExtractorSchema =
functionName: string;
},
): void | Promise<void>;
afterRunExtractor?(
options: IExtractorRunOptions<ISchema>,
output: ISchema['output'],
cacheUpdates: ICacheUpdates,
runtime?: {
scriptEntrypoint: string;
functionName: string;
},
): void | Promise<void>;
onCoreClose?(): void | Promise<void>;
}
5 changes: 5 additions & 0 deletions datastore/main/interfaces/IQueryOptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import {
export default interface IQueryOptions extends IDatastoreQueryMetadata {
domain?: string;
onQueryResult?: (result: IDatastoreQueryResult) => Promise<any> | void;
onCacheUpdated?: (
sessionId: string,
crawler: string,
status: 'cached' | 'evicted',
) => Promise<void> | void;
}

export type IQueryOptionsWithoutId = Partial<Omit<IQueryOptions, 'payment'>>;
23 changes: 18 additions & 5 deletions datastore/main/lib/Crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ export default class Crawler<
datastoreMetadata: _d,
input,
schema,
onCacheUpdated,
...rest
} = context as IExtractorContext<TSchema>;
const cached = await this.findCached(input);
Expand All @@ -121,17 +122,26 @@ export default class Crawler<

const result = await originalRun({ input, schema, ...rest } as any);
const output = await result.toCrawlerOutput();
await this.saveToCache(input, output, onCacheUpdated);
Output.emit(output as any);
await this.saveToCache(input, output);
}

protected async saveToCache(
input: TContext['input'],
output: ICrawlerOutputSchema,
): Promise<void> {
if (this.crawlerComponents.disableCache || !output.sessionId) return null;
onCacheUpdated?: TContext['onCacheUpdated'],
): Promise<{ didCache?: boolean; replacedSessionId?: string }> {
if (this.crawlerComponents.disableCache || !output.sessionId) return {};
const serializedInput = this.getSerializedInput(input);
await this.cache.queryInternal('DELETE FROM self WHERE input=$1', [serializedInput]);
const previousResult = await this.cache.queryInternal(
'DELETE FROM self WHERE input=$1 RETURNING sessionId',
[serializedInput],
);

const deletedSessionId = previousResult?.[0]?.sessionId;
if (onCacheUpdated && deletedSessionId) {
await onCacheUpdated(deletedSessionId, output.crawler, 'evicted');
}

const data = {
input: serializedInput,
Expand All @@ -140,12 +150,15 @@ export default class Crawler<
};

const fields = Object.keys(data);
const fieldKeys = fields.map((_, i) => `$${i + 1}`);

const fieldKeys = fields.map((_, i) => `$${i + 1}`);
await this.cache.queryInternal(
`INSERT INTO self (${fields.join(', ')}) VALUES (${fieldKeys.join(', ')})`,
Object.values(data),
);
if (onCacheUpdated) {
await onCacheUpdated(output.sessionId, output.crawler, 'cached');
}
}

protected async findCached(input: TContext['input']): Promise<ICrawlerOutputSchema> {
Expand Down
5 changes: 3 additions & 2 deletions datastore/main/lib/CreditsTable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,14 @@ export default class CreditsTable extends Table<typeof CreditsSchema> {
if (credit.remainingCredits < amount)
throw new Error('This Credit has insufficient balance remaining to create a payment.');

const result = (await this.queryInternal(
const results = await this.queryInternal(
'UPDATE self SET remainingCredits = remainingCredits - $2 ' +
'WHERE id = $1 AND (remainingCredits - $2) >= 0 ' +
'RETURNING remainingCredits',
[id, amount],
)) as any;
);

const result = results?.[0];
if (result === undefined) throw new Error('Could not create a payment from the given Credits.');
return result.remainingCredits;
}
Expand Down
8 changes: 8 additions & 0 deletions datastore/main/lib/ExtractorContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ export default class ExtractorContext<
return this.#extractorInternal.options.onQueryResult;
}

public get onCacheUpdated(): (
sessionId: string,
crawler: string,
action: 'cached' | 'evicted',
) => Promise<void> | void {
return this.#extractorInternal.options.onCacheUpdated;
}

#extractorInternal: ExtractorInternal<ISchema>;
#datastoreInternal: DatastoreInternal;
#callbacks: IQueryInternalCallbacks;
Expand Down
4 changes: 2 additions & 2 deletions datastore/main/payments/LocalchainWithSync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ export default class LocalchainWithSync
return;
}
let millisToNextTick = Number(this.#localchain.ticker.millisToNextTick());
if (Number.isNaN(millisToNextTick)) {
if (Number.isNaN(millisToNextTick) || millisToNextTick < 0) {
millisToNextTick = 1000;
}
this.nextTick = setTimeout(async () => {
Expand Down Expand Up @@ -324,7 +324,7 @@ export default class LocalchainWithSync
this.isSynching = false;
this.scheduleNextTick();
}
}, millisToNextTick);
}, millisToNextTick).unref();
}

public static async load(
Expand Down
2 changes: 1 addition & 1 deletion datastore/main/storage-engines/SqliteStorageEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ export default class SqliteStorageEngine extends AbstractStorageEngine {
const parsedSql = sqlParser.toSql();
if (sqlParser.isInsert() || sqlParser.isDelete() || sqlParser.isUpdate()) {
if (sqlParser.hasReturn() === true) {
return this.#db.prepare(parsedSql).get(valueMap) as any;
return this.#db.prepare(parsedSql).all(valueMap) as any;
}

const result = this.#db.prepare(parsedSql).run(valueMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ export default class ReplayRegistryEndpoints {
await ctx.replayRegistry.replayStorageRegistry.store(sessionId, db);
return { success: true };
},
'ReplayRegistry.delete': async ({ sessionId }, ctx) => {
return await ctx.replayRegistry.replayStorageRegistry.delete(sessionId);
},
'ReplayRegistry.get': async ({ sessionId }, ctx) => {
const result = await ctx.replayRegistry.replayStorageRegistry.get(sessionId);
if (result) return { db: await result.db };
Expand Down
21 changes: 21 additions & 0 deletions datastore/plugins/hero-core/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import IDatastoreCoreConfigureOptions from '@ulixee/datastore-core/interfaces/IDatastoreCoreConfigureOptions';
import { IHeroExtractorRunOptions } from '@ulixee/datastore-plugins-hero';
import type IExtractorPluginCore from '@ulixee/datastore/interfaces/IExtractorPluginCore';
import type { ICacheUpdates } from '@ulixee/datastore/interfaces/IExtractorPluginCore';
import { ConnectionToHeroCore } from '@ulixee/hero';
import HeroCore from '@ulixee/hero-core';
import { ConnectionToClient, ConnectionToCore } from '@ulixee/net';
Expand All @@ -9,6 +10,7 @@ import { nanoid } from 'nanoid';
import * as Path from 'path';
import ReplayRegistryEndpoints from './endpoints/ReplayRegistryEndpoints';
import ReplayRegistry from './lib/ReplayRegistry';
import ICrawlerOutputSchema from '@ulixee/datastore/interfaces/ICrawlerOutputSchema';

Check warning on line 13 in datastore/plugins/hero-core/index.ts

View workflow job for this annotation

GitHub Actions / Test Node.js 18 on ubuntu-latest

'ICrawlerOutputSchema' is defined but never used. Allowed unused vars must match /^_/u

const pkg = require('@ulixee/datastore-plugins-hero/package.json');

Expand Down Expand Up @@ -85,6 +87,25 @@ export default class DatastoreForHeroPluginCore implements IExtractorPluginCore
options.connectionToCore = this.connectionToCore;
}

public async afterRunExtractor(
_options: IHeroExtractorRunOptions<any>,
_output: unknown,
cacheUpdates: ICacheUpdates,
): Promise<void> {
if (!cacheUpdates) return;
const actions = [];
for (const [sessionId, { action, crawler }] of Object.entries(cacheUpdates)) {
if (crawler !== 'Hero') continue;
if (action === 'evicted') {
actions.push(this.replayRegistry.delete(sessionId));
}
if (action === 'cached') {
actions.push(this.replayRegistry.store(sessionId));
}
}
await Promise.allSettled(actions);
}

public registerHostedServices(connectionToClient: ConnectionToClient<any, any>): void {
this.endpoints?.attachToConnection(connectionToClient, { replayRegistry: this.replayRegistry });
}
Expand Down
52 changes: 38 additions & 14 deletions datastore/plugins/hero-core/lib/ReplayRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export default class ReplayRegistry

public readonly replayStorageRegistry?: ReplayRegistryDiskStore;
private readonly serviceClient?: ReplayRegistryServiceClient;
private readonly storePromises = new Set<Promise<any>>();
private readonly storePromises = new Map<string, Promise<any>[]>();

private defaultSessionRegistry: DefaultSessionRegistry;

Expand All @@ -46,7 +46,7 @@ export default class ReplayRegistry
}

public async flush(): Promise<void> {
const storage = [...this.storePromises];
const storage = [...this.storePromises.values()];
this.storePromises.clear();
await Promise.allSettled(storage);
}
Expand All @@ -56,7 +56,9 @@ export default class ReplayRegistry
}

public async retain(sessionId: string, customPath?: string): Promise<SessionDb> {
const record = await this.defaultSessionRegistry.retain(sessionId, customPath).catch(() => null);
const record = await this.defaultSessionRegistry
.retain(sessionId, customPath)
.catch(() => null);
if (record) return record;

for (const store of [this.replayStorageRegistry, this.serviceClient]) {
Expand Down Expand Up @@ -94,17 +96,44 @@ export default class ReplayRegistry
}

public async close(sessionId: string, isDeleteRequested: boolean): Promise<void> {
const timestamp = Date.now();
const entry = await this.defaultSessionRegistry.get(sessionId);
const sessionRecord = entry?.session.get();
if (this.storePromises.has(sessionId)) {
await Promise.allSettled(this.storePromises.get(sessionId));
this.storePromises.delete(sessionId);
}
await this.defaultSessionRegistry.close(sessionId, isDeleteRequested);
}

public async delete(sessionId: string): Promise<void> {
if (this.storePromises.has(sessionId)) {
await Promise.allSettled(this.storePromises.get(sessionId));
this.storePromises.delete(sessionId);
}
await this.defaultSessionRegistry.close(sessionId, true);
if (this.serviceClient) {
await this.serviceClient.delete(sessionId);
} else {
await this.replayStorageRegistry.delete(sessionId);
}
}

if (sessionRecord.scriptRuntime === 'datastore') {
this.enqueue(this.store(sessionId, timestamp, entry.path));
public async store(sessionId: string): Promise<void> {
if (!this.storePromises.has(sessionId)) {
this.storePromises.set(sessionId, []);
}
this.storePromises
.get(sessionId)
.push(
this.storeInternal(sessionId).catch(e => console.warn(`Error storing cached session`, e)),
);
}

private async store(sessionId: string, timestamp: number, path: string): Promise<void> {
private async storeInternal(sessionId: string): Promise<void> {
const entry = await this.defaultSessionRegistry.get(sessionId);
if (!entry?.session) throw new Error(`Session not able to be retained: ${sessionId}`);

const path = entry.path;
const timestamp = Date.now();

const db = await ReplayRegistryDiskStore.getCompressedDb(path);
if (this.serviceClient) {
await this.serviceClient.store({
Expand All @@ -116,9 +145,4 @@ export default class ReplayRegistry
await this.replayStorageRegistry.store(sessionId, db);
}
}

private enqueue(promise: Promise<any>): void {
this.storePromises.add(promise);
void promise.then(() => this.storePromises.delete(promise)).catch(console.error);
}
}
8 changes: 8 additions & 0 deletions datastore/plugins/hero-core/lib/ReplayRegistryDiskStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ export default class ReplayRegistryDiskStore {
}
}

public async delete(
sessionId: string,
): Promise<IReplayRegistryApiTypes['ReplayRegistry.delete']['result']> {
const path = Path.join(this.storageDir, `${sessionId}.db.gz`);
const didFail = await Fs.promises.unlink(path).catch(() => true);
return { success: !didFail };
}

public async store(
sessionId: string,
db: Buffer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ export default class ReplayRegistryServiceClient {
});
}

public async delete(
sessionId: string,
): Promise<IReplayRegistryApiTypes['ReplayRegistry.delete']['result']> {
return await this.client.sendRequest({
command: 'ReplayRegistry.delete',
args: [{ sessionId }],
});
}

public async get(
sessionId: string,
): Promise<IReplayRegistryApiTypes['ReplayRegistry.get']['result']> {
Expand Down
Loading

0 comments on commit 56a00b0

Please sign in to comment.