From 634945b676c1e25828a0597b4e2b4397db20d5c0 Mon Sep 17 00:00:00 2001 From: Quentin Roy Date: Wed, 31 Jan 2024 16:06:31 +0100 Subject: [PATCH] log-server: reduce store's getLogs memory footprint (#220) * llimit the number logs loaded in mem * test select query limit * changeset * fix missing logs with limitSelectQuery * fix lint --- .changeset/old-zebras-tease.md | 5 + .../__snapshots__/sqlite-store.test.ts.snap | 135 ++++++++ .../{store.test.ts => sqlite-store.test.ts} | 301 ++++++++++-------- .../src/db-migrations/2023-08-07-init.ts | 5 + packages/log-server/src/store.ts | 147 ++++++--- 5 files changed, 411 insertions(+), 182 deletions(-) create mode 100644 .changeset/old-zebras-tease.md create mode 100644 packages/log-server/__tests__/__snapshots__/sqlite-store.test.ts.snap rename packages/log-server/__tests__/{store.test.ts => sqlite-store.test.ts} (85%) diff --git a/.changeset/old-zebras-tease.md b/.changeset/old-zebras-tease.md new file mode 100644 index 00000000..2a635f55 --- /dev/null +++ b/.changeset/old-zebras-tease.md @@ -0,0 +1,5 @@ +--- +'@lightmill/log-server': minor +--- + +Add a select query result limit for SQliteStore to prevent too many logs to be loaded in memory at the same time. Once the limit is attained logs are yielded and the next logs are loaded once their done being processed. diff --git a/packages/log-server/__tests__/__snapshots__/sqlite-store.test.ts.snap b/packages/log-server/__tests__/__snapshots__/sqlite-store.test.ts.snap new file mode 100644 index 00000000..2bdf8551 --- /dev/null +++ b/packages/log-server/__tests__/__snapshots__/sqlite-store.test.ts.snap @@ -0,0 +1,135 @@ +// Vitest Snapshot v1, https://vitest.dev/guide/snapshot.html + +exports[`SQLiteStore#getLogs (selectQueryLimit: 2) > should return the logs in order of experimentId, runId, and ascending number 2`] = ` +[ + { + "experimentId": "experiment1", + "number": 1, + "runId": "run1", + "type": "log1", + "values": { + "msg": "hello", + "recipient": "Anna", + }, + }, + { + "experimentId": "experiment1", + "number": 2, + "runId": "run1", + "type": "log1", + "values": { + "msg": "bonjour", + "recipient": "Jo", + }, + }, + { + "experimentId": "experiment1", + "number": 3, + "runId": "run1", + "type": "log3", + "values": { + "foo": true, + "x": 25, + "y": 0, + }, + }, + { + "experimentId": "experiment1", + "number": 1, + "runId": "run2", + "type": "log1", + "values": { + "bar": null, + "message": "hola", + }, + }, + { + "experimentId": "experiment1", + "number": 2, + "runId": "run2", + "type": "log2", + "values": { + "foo": false, + "x": 12, + }, + }, + { + "experimentId": "experiment2", + "number": 1, + "runId": "run1", + "type": "log2", + "values": { + "foo": true, + "x": 25, + "y": 0, + }, + }, +] +`; + +exports[`SQLiteStore#getLogs (selectQueryLimit: 10000) > should return the logs in order of experimentId, runId, and ascending number 2`] = ` +[ + { + "experimentId": "experiment1", + "number": 1, + "runId": "run1", + "type": "log1", + "values": { + "msg": "hello", + "recipient": "Anna", + }, + }, + { + "experimentId": "experiment1", + "number": 2, + "runId": "run1", + "type": "log1", + "values": { + "msg": "bonjour", + "recipient": "Jo", + }, + }, + { + "experimentId": "experiment1", + "number": 3, + "runId": "run1", + "type": "log3", + "values": { + "foo": true, + "x": 25, + "y": 0, + }, + }, + { + "experimentId": "experiment1", + "number": 1, + "runId": "run2", + "type": "log1", + "values": { + "bar": null, + "message": "hola", + }, + }, + { + "experimentId": "experiment1", + "number": 2, + "runId": "run2", + "type": "log2", + "values": { + "foo": false, + "x": 12, + }, + }, + { + "experimentId": "experiment2", + "number": 1, + "runId": "run1", + "type": "log2", + "values": { + "foo": true, + "x": 25, + "y": 0, + }, + }, +] +`; diff --git a/packages/log-server/__tests__/store.test.ts b/packages/log-server/__tests__/sqlite-store.test.ts similarity index 85% rename from packages/log-server/__tests__/store.test.ts rename to packages/log-server/__tests__/sqlite-store.test.ts index ce3f9016..568ddde5 100644 --- a/packages/log-server/__tests__/store.test.ts +++ b/packages/log-server/__tests__/sqlite-store.test.ts @@ -767,35 +767,44 @@ describe('SQLiteStore#getLogValueNames', () => { }); }); -describe('SQLiteStore#getLogs', () => { - let store: SQLiteStore; - beforeEach(async () => { - store = new SQLiteStore(':memory:'); - await store.migrateDatabase(); - await store.addRun({ runId: 'run1', experimentId: 'experiment1' }); - await store.addRun({ runId: 'run2', experimentId: 'experiment1' }); - await store.addRun({ runId: 'run1', experimentId: 'experiment2' }); - await store.addLogs('experiment1', 'run1', [ - { type: 'log1', number: 1, values: { msg: 'hello', recipient: 'Anna' } }, - { type: 'log1', number: 2, values: { msg: 'bonjour', recipient: 'Jo' } }, - ]); - await store.addLogs('experiment1', 'run2', [ - { type: 'log1', number: 1, values: { message: 'hola', bar: null } }, - { type: 'log2', number: 2, values: { x: 12, foo: false } }, - ]); - await store.addLogs('experiment2', 'run1', [ - { type: 'log2', number: 1, values: { x: 25, y: 0, foo: true } }, - ]); - await store.addLogs('experiment1', 'run1', [ - { type: 'log3', number: 3, values: { x: 25, y: 0, foo: true } }, - ]); - }); - afterEach(async () => { - await store.close(); - }); +for (const limit of [10000, 2]) { + describe(`SQLiteStore#getLogs (selectQueryLimit: ${limit})`, () => { + let store: SQLiteStore; + beforeEach(async () => { + store = new SQLiteStore(':memory:', { selectQueryLimit: limit }); + await store.migrateDatabase(); + await store.addRun({ runId: 'run1', experimentId: 'experiment1' }); + await store.addRun({ runId: 'run2', experimentId: 'experiment1' }); + await store.addRun({ runId: 'run1', experimentId: 'experiment2' }); + await store.addLogs('experiment1', 'run1', [ + { + type: 'log1', + number: 1, + values: { msg: 'hello', recipient: 'Anna' }, + }, + { + type: 'log1', + number: 2, + values: { msg: 'bonjour', recipient: 'Jo' }, + }, + ]); + await store.addLogs('experiment1', 'run2', [ + { type: 'log1', number: 1, values: { message: 'hola', bar: null } }, + { type: 'log2', number: 2, values: { x: 12, foo: false } }, + ]); + await store.addLogs('experiment2', 'run1', [ + { type: 'log2', number: 1, values: { x: 25, y: 0, foo: true } }, + ]); + await store.addLogs('experiment1', 'run1', [ + { type: 'log3', number: 3, values: { x: 25, y: 0, foo: true } }, + ]); + }); + afterEach(async () => { + await store.close(); + }); - it('should return the logs in order of experimentId, runId, and ascending number', async () => { - await expect(fromAsync(store.getLogs())).resolves.toMatchInlineSnapshot(` + it('should return the logs in order of experimentId, runId, and ascending number', async () => { + await expect(fromAsync(store.getLogs())).resolves.toMatchInlineSnapshot(` [ { "experimentId": "experiment1", @@ -861,17 +870,36 @@ describe('SQLiteStore#getLogs', () => { }, ] `); - }); - it('should ignore missing logs', async () => { - await store.addLogs('experiment2', 'run1', [ - { type: 'log1', number: 11, values: { msg: 'hello', recipient: 'Anna' } }, - { type: 'log1', number: 33, values: { msg: 'bonjour', recipient: 'Jo' } }, - ]); - await store.addLogs('experiment2', 'run1', [ - { type: 'log1', number: 22, values: { msg: 'hello', recipient: 'Anna' } }, - { type: 'log1', number: 44, values: { msg: 'bonjour', recipient: 'Jo' } }, - ]); - await expect(fromAsync(store.getLogs())).resolves.toMatchInlineSnapshot(` + }); + it('should return the logs in order of experimentId, runId, and ascending number', async () => { + await expect(fromAsync(store.getLogs())).resolves.toMatchSnapshot(); + }); + it('should ignore missing logs', async () => { + await store.addLogs('experiment2', 'run1', [ + { + type: 'log1', + number: 11, + values: { msg: 'hello', recipient: 'Anna' }, + }, + { + type: 'log1', + number: 33, + values: { msg: 'bonjour', recipient: 'Jo' }, + }, + ]); + await store.addLogs('experiment2', 'run1', [ + { + type: 'log1', + number: 22, + values: { msg: 'hello', recipient: 'Anna' }, + }, + { + type: 'log1', + number: 44, + values: { msg: 'bonjour', recipient: 'Jo' }, + }, + ]); + await expect(fromAsync(store.getLogs())).resolves.toMatchInlineSnapshot(` [ { "experimentId": "experiment1", @@ -977,10 +1005,10 @@ describe('SQLiteStore#getLogs', () => { }, ] `); - }); - it('should be able to filter logs of a particular type', async () => { - await expect(fromAsync(store.getLogs({ type: 'log1' }))).resolves - .toMatchInlineSnapshot(` + }); + it('should be able to filter logs of a particular type', async () => { + await expect(fromAsync(store.getLogs({ type: 'log1' }))).resolves + .toMatchInlineSnapshot(` [ { "experimentId": "experiment1", @@ -1014,8 +1042,8 @@ describe('SQLiteStore#getLogs', () => { }, ] `); - await expect(fromAsync(store.getLogs({ type: 'log2' }))).resolves - .toMatchInlineSnapshot(` + await expect(fromAsync(store.getLogs({ type: 'log2' }))).resolves + .toMatchInlineSnapshot(` [ { "experimentId": "experiment1", @@ -1040,10 +1068,10 @@ describe('SQLiteStore#getLogs', () => { }, ] `); - }); - it('should be able to filter logs from a particular experiment', async () => { - await expect(fromAsync(store.getLogs({ experimentId: 'experiment1' }))) - .resolves.toMatchInlineSnapshot(` + }); + it('should be able to filter logs from a particular experiment', async () => { + await expect(fromAsync(store.getLogs({ experimentId: 'experiment1' }))) + .resolves.toMatchInlineSnapshot(` [ { "experimentId": "experiment1", @@ -1098,8 +1126,8 @@ describe('SQLiteStore#getLogs', () => { }, ] `); - await expect(fromAsync(store.getLogs({ experimentId: 'experiment2' }))) - .resolves.toMatchInlineSnapshot(` + await expect(fromAsync(store.getLogs({ experimentId: 'experiment2' }))) + .resolves.toMatchInlineSnapshot(` [ { "experimentId": "experiment2", @@ -1114,10 +1142,10 @@ describe('SQLiteStore#getLogs', () => { }, ] `); - }); - it('should be able to filter logs from a particular run', async () => { - await expect(fromAsync(store.getLogs({ runId: 'run1' }))).resolves - .toMatchInlineSnapshot(` + }); + it('should be able to filter logs from a particular run', async () => { + await expect(fromAsync(store.getLogs({ runId: 'run1' }))).resolves + .toMatchInlineSnapshot(` [ { "experimentId": "experiment1", @@ -1163,8 +1191,8 @@ describe('SQLiteStore#getLogs', () => { }, ] `); - await expect(fromAsync(store.getLogs({ runId: 'run2' }))).resolves - .toMatchInlineSnapshot(` + await expect(fromAsync(store.getLogs({ runId: 'run2' }))).resolves + .toMatchInlineSnapshot(` [ { "experimentId": "experiment1", @@ -1188,11 +1216,11 @@ describe('SQLiteStore#getLogs', () => { }, ] `); - }); - it('should be able to filter logs by run, experiment, and type all at once', async () => { - await expect( - fromAsync(store.getLogs({ experimentId: 'experiment1', type: 'log2' })), - ).resolves.toMatchInlineSnapshot(` + }); + it('should be able to filter logs by run, experiment, and type all at once', async () => { + await expect( + fromAsync(store.getLogs({ experimentId: 'experiment1', type: 'log2' })), + ).resolves.toMatchInlineSnapshot(` [ { "experimentId": "experiment1", @@ -1206,15 +1234,15 @@ describe('SQLiteStore#getLogs', () => { }, ] `); - await expect( - fromAsync( - store.getLogs({ - experimentId: 'experiment1', - type: 'log1', - runId: 'run1', - }), - ), - ).resolves.toMatchInlineSnapshot(` + await expect( + fromAsync( + store.getLogs({ + experimentId: 'experiment1', + type: 'log1', + runId: 'run1', + }), + ), + ).resolves.toMatchInlineSnapshot(` [ { "experimentId": "experiment1", @@ -1238,33 +1266,35 @@ describe('SQLiteStore#getLogs', () => { }, ] `); - }); - it('should resolve with an empty array if no log matches the filter', async () => { - await expect( - fromAsync(store.getLogs({ experimentId: 'experiment2', type: 'log1' })), - ).resolves.toEqual([]); - await expect( - fromAsync(store.getLogs({ experimentId: 'do not exist' })), - ).resolves.toEqual([]); - await expect( - fromAsync(store.getLogs({ runId: 'do not exist' })), - ).resolves.toEqual([]); - await expect( - fromAsync(store.getLogs({ type: 'do not exist' })), - ).resolves.toEqual([]); - }); - it('should return logs added after resuming', async () => { - await store.resumeRun({ - experimentId: 'experiment2', - runId: 'run1', - resumeFrom: 2, }); - await store.addLogs('experiment2', 'run1', [ - { type: 'log3', number: 2, values: { x: 25, y: 0, foo: true } }, - ]); - await expect( - fromAsync(store.getLogs({ experimentId: 'experiment2', runId: 'run1' })), - ).resolves.toMatchInlineSnapshot(` + it('should resolve with an empty array if no log matches the filter', async () => { + await expect( + fromAsync(store.getLogs({ experimentId: 'experiment2', type: 'log1' })), + ).resolves.toEqual([]); + await expect( + fromAsync(store.getLogs({ experimentId: 'do not exist' })), + ).resolves.toEqual([]); + await expect( + fromAsync(store.getLogs({ runId: 'do not exist' })), + ).resolves.toEqual([]); + await expect( + fromAsync(store.getLogs({ type: 'do not exist' })), + ).resolves.toEqual([]); + }); + it('should return logs added after resuming', async () => { + await store.resumeRun({ + experimentId: 'experiment2', + runId: 'run1', + resumeFrom: 2, + }); + await store.addLogs('experiment2', 'run1', [ + { type: 'log3', number: 2, values: { x: 25, y: 0, foo: true } }, + ]); + await expect( + fromAsync( + store.getLogs({ experimentId: 'experiment2', runId: 'run1' }), + ), + ).resolves.toMatchInlineSnapshot(` [ { "experimentId": "experiment2", @@ -1290,16 +1320,18 @@ describe('SQLiteStore#getLogs', () => { }, ] `); - }); - it('should not return logs canceled from resuming', async () => { - await store.resumeRun({ - experimentId: 'experiment1', - runId: 'run2', - resumeFrom: 2, }); - await expect( - fromAsync(store.getLogs({ experimentId: 'experiment1', runId: 'run2' })), - ).resolves.toMatchInlineSnapshot(` + it('should not return logs canceled from resuming', async () => { + await store.resumeRun({ + experimentId: 'experiment1', + runId: 'run2', + resumeFrom: 2, + }); + await expect( + fromAsync( + store.getLogs({ experimentId: 'experiment1', runId: 'run2' }), + ), + ).resolves.toMatchInlineSnapshot(` [ { "experimentId": "experiment1", @@ -1313,32 +1345,36 @@ describe('SQLiteStore#getLogs', () => { }, ] `); - await store.resumeRun({ - experimentId: 'experiment1', - runId: 'run2', - resumeFrom: 1, - }); - await expect( - fromAsync(store.getLogs({ experimentId: 'experiment1', runId: 'run2' })), - ).resolves.toEqual([]); - }); - it('should return logs overwriting other logs after resuming', async () => { - await store.addLogs('experiment1', 'run2', [ - { type: 'log1', number: 3, values: { x: 5 } }, - { type: 'log1', number: 4, values: { x: 6 } }, - ]); - await store.resumeRun({ - experimentId: 'experiment1', - runId: 'run2', - resumeFrom: 2, + await store.resumeRun({ + experimentId: 'experiment1', + runId: 'run2', + resumeFrom: 1, + }); + await expect( + fromAsync( + store.getLogs({ experimentId: 'experiment1', runId: 'run2' }), + ), + ).resolves.toEqual([]); }); - await store.addLogs('experiment1', 'run2', [ - { type: 'overwriting', number: 2, values: { x: 1 } }, - { type: 'overwriting', number: 3, values: { x: 2 } }, - ]); - await expect( - fromAsync(store.getLogs({ experimentId: 'experiment1', runId: 'run2' })), - ).resolves.toMatchInlineSnapshot(` + it('should return logs overwriting other logs after resuming', async () => { + await store.addLogs('experiment1', 'run2', [ + { type: 'log1', number: 3, values: { x: 5 } }, + { type: 'log1', number: 4, values: { x: 6 } }, + ]); + await store.resumeRun({ + experimentId: 'experiment1', + runId: 'run2', + resumeFrom: 2, + }); + await store.addLogs('experiment1', 'run2', [ + { type: 'overwriting', number: 2, values: { x: 1 } }, + { type: 'overwriting', number: 3, values: { x: 2 } }, + ]); + await expect( + fromAsync( + store.getLogs({ experimentId: 'experiment1', runId: 'run2' }), + ), + ).resolves.toMatchInlineSnapshot(` [ { "experimentId": "experiment1", @@ -1370,8 +1406,9 @@ describe('SQLiteStore#getLogs', () => { }, ] `); + }); }); -}); +} async function fromAsync(iterable: AsyncIterable) { let values: T[] = []; diff --git a/packages/log-server/src/db-migrations/2023-08-07-init.ts b/packages/log-server/src/db-migrations/2023-08-07-init.ts index fe132561..a0f833e5 100644 --- a/packages/log-server/src/db-migrations/2023-08-07-init.ts +++ b/packages/log-server/src/db-migrations/2023-08-07-init.ts @@ -218,5 +218,10 @@ export async function up(db: Kysely) { SELECT RAISE(ABORT, 'Cannot delete existing log value'); END; `.execute(trx); + await trx.schema + .createIndex('logValueLogIdName') + .on('logValue') + .columns(['logId', 'name']) + .execute(); }); } diff --git a/packages/log-server/src/store.ts b/packages/log-server/src/store.ts index 825012a0..33eaa9ac 100644 --- a/packages/log-server/src/store.ts +++ b/packages/log-server/src/store.ts @@ -15,7 +15,7 @@ import { } from 'kysely'; import { JsonObject } from 'type-fest'; import loglevel, { LogLevelDesc } from 'loglevel'; -import { groupBy } from 'remeda'; +import { groupBy, last } from 'remeda'; import { arrayify } from './utils.js'; const __dirname = url.fileURLToPath(new URL('.', import.meta.url)); @@ -68,13 +68,18 @@ type Database = { export class SQLiteStore { #db: Kysely; + #selectQueryLimit: number; constructor( db: string, - { logLevel = loglevel.getLevel() }: { logLevel?: LogLevelDesc } = {}, + { + logLevel = loglevel.getLevel(), + selectQueryLimit = 1000, + }: { logLevel?: LogLevelDesc; selectQueryLimit?: number } = {}, ) { const logger = loglevel.getLogger('store'); logger.setLevel(logLevel); + this.#selectQueryLimit = selectQueryLimit; this.#db = new Kysely({ dialect: new SqliteDialect({ database: new SQLiteDB(db) }), log: (event) => { @@ -449,63 +454,105 @@ export class SQLiteStore { }); } + async *#getLogValues(filter: LogFilter) { + let lastRow: { + logNumber: number; + logId: number; + experimentId: string; + runId: string; + name: string; + } | null = null; + let isFirst = true; + let isDone = false; + while (!isDone) { + let query = this.#db + .selectFrom('runLogView as l') + .innerJoin('logValue as v', 'l.logId', 'v.logId') + .$if(filter.experimentId != null, (qb) => + qb.where('l.experimentId', 'in', arrayify(filter.experimentId, true)), + ) + .$if(filter.runId != null, (qb) => + qb.where('l.runId', 'in', arrayify(filter.runId, true)), + ) + .$if(filter.type != null, (qb) => + qb.where('l.type', 'in', arrayify(filter.type, true)), + ) + .where('l.type', 'is not', null) + .select([ + 'l.experimentId as experimentId', + 'l.runId as runId', + 'l.logId as logId', + 'l.type as type', + 'l.logNumber as logNumber', + 'v.name as name', + 'v.value as value', + ]) + .$narrowType<{ type: string }>() + .orderBy('experimentId') + .orderBy('runId') + .orderBy('logNumber') + .orderBy('name') + .limit(this.#selectQueryLimit) + .$if(!isFirst, (qb) => + qb.where((eb) => { + if (lastRow === null) throw new Error('lastRow is null'); + return eb.or([ + eb('experimentId', '>', lastRow.experimentId), + eb.and([ + eb('experimentId', '=', lastRow.experimentId), + eb('runId', '>', lastRow.runId), + ]), + eb.and([ + eb('experimentId', '=', lastRow.experimentId), + eb('runId', '=', lastRow.runId), + eb('logNumber', '>', lastRow.logNumber), + ]), + eb.and([ + eb('experimentId', '=', lastRow.experimentId), + eb('runId', '=', lastRow.runId), + eb('logNumber', '=', lastRow.logNumber), + eb('name', '>', lastRow.name), + ]), + ]); + }), + ); + let result = await query.execute(); + isFirst = false; + lastRow = last(result) ?? null; + isDone = lastRow == null; + yield* result; + } + } + async *getLogs(filter: LogFilter = {}): AsyncGenerator { - // It would probably be better not to read everything at once because - // this could be a lot of data. Instead we could read a few yield, and - // restart with the remaining. However until this becomes a problem, this - // is good enough. - let result = await this.#db - .selectFrom('runLogView as l') - .innerJoin('logValue as v', 'l.logId', 'v.logId') - .$if(filter.experimentId != null, (qb) => - qb.where('l.experimentId', 'in', arrayify(filter.experimentId, true)), - ) - .$if(filter.runId != null, (qb) => - qb.where('l.runId', 'in', arrayify(filter.runId, true)), - ) - .$if(filter.type != null, (qb) => - qb.where('l.type', 'in', arrayify(filter.type, true)), - ) - .where('l.type', 'is not', null) - .select([ - 'l.experimentId as experimentId', - 'l.runId as runId', - 'l.logId as logId', - 'l.type as type', - 'l.logNumber as number', - 'v.name', - 'v.value', - ]) - .$narrowType<{ type: string }>() - .orderBy('experimentId') - .orderBy('runId') - .orderBy('number') - .execute(); + const logValuesIterator = this.#getLogValues(filter); - if (result.length === 0) return; + let first = await logValuesIterator.next(); + if (first.done) return; + let currentValues = [first.value]; - function reconstructLog(start: number, end: number) { - let first = result[start]; + function getLogFromCurrentValues(): Log { + let { experimentId, runId, logNumber, type } = currentValues[0]; return { - experimentId: first.experimentId, - runId: first.runId, - type: first.type, - number: first.number, - values: reconstructValues(result.slice(start, end)), + experimentId, + runId, + number: logNumber, + type, + values: reconstructValues(currentValues), }; } - let currentLogStart = 0; - let currentLogId = result[0].logId; - for (let i = 1; i < result.length; i++) { - let row = result[i]; - if (row.logId !== currentLogId) { - yield reconstructLog(currentLogStart, i); - currentLogStart = i; - currentLogId = row.logId; + for await (let logValue of logValuesIterator) { + let logFirst = currentValues[0]; + if (logValue.logId !== logFirst.logId) { + yield getLogFromCurrentValues(); + currentValues = [logValue]; + } else { + currentValues.push(logValue); } } - yield reconstructLog(currentLogStart, result.length); + + yield getLogFromCurrentValues(); } async migrateDatabase() {