From f7bb83a6731b293619526cde29b17f99c0cd934a Mon Sep 17 00:00:00 2001 From: Quentin Roy Date: Tue, 30 Jan 2024 18:52:24 +0100 Subject: [PATCH 1/5] log-server: limit the number logs loaded in mem --- packages/log-server/src/store.ts | 125 ++++++++++++++++++------------- 1 file changed, 75 insertions(+), 50 deletions(-) diff --git a/packages/log-server/src/store.ts b/packages/log-server/src/store.ts index 825012a0..d05ab52b 100644 --- a/packages/log-server/src/store.ts +++ b/packages/log-server/src/store.ts @@ -15,7 +15,7 @@ import { } from 'kysely'; import { JsonObject } from 'type-fest'; import loglevel, { LogLevelDesc } from 'loglevel'; -import { groupBy } from 'remeda'; +import { groupBy, last, pick } from 'remeda'; import { arrayify } from './utils.js'; const __dirname = url.fileURLToPath(new URL('.', import.meta.url)); @@ -68,13 +68,18 @@ type Database = { export class SQLiteStore { #db: Kysely; + #selectQueryLimit: number; constructor( db: string, - { logLevel = loglevel.getLevel() }: { logLevel?: LogLevelDesc } = {}, + { + logLevel = loglevel.getLevel(), + selectQueryLimit = 1000, + }: { logLevel?: LogLevelDesc; selectQueryLimit?: number } = {}, ) { const logger = loglevel.getLogger('store'); logger.setLevel(logLevel); + this.#selectQueryLimit = selectQueryLimit; this.#db = new Kysely({ dialect: new SqliteDialect({ database: new SQLiteDB(db) }), log: (event) => { @@ -449,63 +454,83 @@ export class SQLiteStore { }); } + async *#getLogValues(filter: LogFilter) { + let lastRow: { + number: number; + logId: number; + experimentId: string; + runId: string; + } | null = null; + let isFirst = true; + let isDone = false; + while (!isDone) { + let result = await this.#db + .selectFrom('runLogView as l') + .innerJoin('logValue as v', 'l.logId', 'v.logId') + .$if(filter.experimentId != null, (qb) => + qb.where('l.experimentId', 'in', arrayify(filter.experimentId, true)), + ) + .$if(filter.runId != null, (qb) => + qb.where('l.runId', 'in', arrayify(filter.runId, true)), + ) + .$if(filter.type != null, (qb) => + qb.where('l.type', 'in', arrayify(filter.type, true)), + ) + .$if(!isFirst, (qb) => { + if (lastRow === null) throw new Error('lastRow is null'); + return qb + .where('l.experimentId', '>=', lastRow.experimentId) + .where('l.runId', '>=', lastRow.runId) + .where('l.logNumber', '>', lastRow.number); + }) + .where('l.type', 'is not', null) + .select([ + 'l.experimentId as experimentId', + 'l.runId as runId', + 'l.logId as logId', + 'l.type as type', + 'l.logNumber as number', + 'v.name', + 'v.value', + ]) + .$narrowType<{ type: string }>() + .orderBy('experimentId') + .orderBy('runId') + .orderBy('number') + .limit(this.#selectQueryLimit) + .execute(); + isFirst = false; + lastRow = last(result) ?? null; + isDone = lastRow == null; + yield* result; + } + } + async *getLogs(filter: LogFilter = {}): AsyncGenerator { - // It would probably be better not to read everything at once because - // this could be a lot of data. Instead we could read a few yield, and - // restart with the remaining. However until this becomes a problem, this - // is good enough. - let result = await this.#db - .selectFrom('runLogView as l') - .innerJoin('logValue as v', 'l.logId', 'v.logId') - .$if(filter.experimentId != null, (qb) => - qb.where('l.experimentId', 'in', arrayify(filter.experimentId, true)), - ) - .$if(filter.runId != null, (qb) => - qb.where('l.runId', 'in', arrayify(filter.runId, true)), - ) - .$if(filter.type != null, (qb) => - qb.where('l.type', 'in', arrayify(filter.type, true)), - ) - .where('l.type', 'is not', null) - .select([ - 'l.experimentId as experimentId', - 'l.runId as runId', - 'l.logId as logId', - 'l.type as type', - 'l.logNumber as number', - 'v.name', - 'v.value', - ]) - .$narrowType<{ type: string }>() - .orderBy('experimentId') - .orderBy('runId') - .orderBy('number') - .execute(); + const logValuesIterator = this.#getLogValues(filter); - if (result.length === 0) return; + let first = await logValuesIterator.next(); + if (first.done) return; + let currentValues = [first.value]; - function reconstructLog(start: number, end: number) { - let first = result[start]; + function getLogFromCurrentValues() { return { - experimentId: first.experimentId, - runId: first.runId, - type: first.type, - number: first.number, - values: reconstructValues(result.slice(start, end)), + ...pick(currentValues[0], ['experimentId', 'runId', 'number', 'type']), + values: reconstructValues(currentValues), }; } - let currentLogStart = 0; - let currentLogId = result[0].logId; - for (let i = 1; i < result.length; i++) { - let row = result[i]; - if (row.logId !== currentLogId) { - yield reconstructLog(currentLogStart, i); - currentLogStart = i; - currentLogId = row.logId; + for await (let logValue of logValuesIterator) { + let logFirst = currentValues[0]; + if (logValue.logId !== logFirst.logId) { + yield getLogFromCurrentValues(); + currentValues = [logValue]; + } else { + currentValues.push(logValue); } } - yield reconstructLog(currentLogStart, result.length); + + yield getLogFromCurrentValues(); } async migrateDatabase() { From 3c88646345b47f0141459d25131ea1070bf29b02 Mon Sep 17 00:00:00 2001 From: Quentin Roy Date: Wed, 31 Jan 2024 11:20:03 +0100 Subject: [PATCH 2/5] log-server: test select query limit --- .../__snapshots__/sqlite-store.test.ts.snap | 135 ++++++++ .../{store.test.ts => sqlite-store.test.ts} | 301 ++++++++++-------- 2 files changed, 304 insertions(+), 132 deletions(-) create mode 100644 packages/log-server/__tests__/__snapshots__/sqlite-store.test.ts.snap rename packages/log-server/__tests__/{store.test.ts => sqlite-store.test.ts} (85%) diff --git a/packages/log-server/__tests__/__snapshots__/sqlite-store.test.ts.snap b/packages/log-server/__tests__/__snapshots__/sqlite-store.test.ts.snap new file mode 100644 index 00000000..2bdf8551 --- /dev/null +++ b/packages/log-server/__tests__/__snapshots__/sqlite-store.test.ts.snap @@ -0,0 +1,135 @@ +// Vitest Snapshot v1, https://vitest.dev/guide/snapshot.html + +exports[`SQLiteStore#getLogs (selectQueryLimit: 2) > should return the logs in order of experimentId, runId, and ascending number 2`] = ` +[ + { + "experimentId": "experiment1", + "number": 1, + "runId": "run1", + "type": "log1", + "values": { + "msg": "hello", + "recipient": "Anna", + }, + }, + { + "experimentId": "experiment1", + "number": 2, + "runId": "run1", + "type": "log1", + "values": { + "msg": "bonjour", + "recipient": "Jo", + }, + }, + { + "experimentId": "experiment1", + "number": 3, + "runId": "run1", + "type": "log3", + "values": { + "foo": true, + "x": 25, + "y": 0, + }, + }, + { + "experimentId": "experiment1", + "number": 1, + "runId": "run2", + "type": "log1", + "values": { + "bar": null, + "message": "hola", + }, + }, + { + "experimentId": "experiment1", + "number": 2, + "runId": "run2", + "type": "log2", + "values": { + "foo": false, + "x": 12, + }, + }, + { + "experimentId": "experiment2", + "number": 1, + "runId": "run1", + "type": "log2", + "values": { + "foo": true, + "x": 25, + "y": 0, + }, + }, +] +`; + +exports[`SQLiteStore#getLogs (selectQueryLimit: 10000) > should return the logs in order of experimentId, runId, and ascending number 2`] = ` +[ + { + "experimentId": "experiment1", + "number": 1, + "runId": "run1", + "type": "log1", + "values": { + "msg": "hello", + "recipient": "Anna", + }, + }, + { + "experimentId": "experiment1", + "number": 2, + "runId": "run1", + "type": "log1", + "values": { + "msg": "bonjour", + "recipient": "Jo", + }, + }, + { + "experimentId": "experiment1", + "number": 3, + "runId": "run1", + "type": "log3", + "values": { + "foo": true, + "x": 25, + "y": 0, + }, + }, + { + "experimentId": "experiment1", + "number": 1, + "runId": "run2", + "type": "log1", + "values": { + "bar": null, + "message": "hola", + }, + }, + { + "experimentId": "experiment1", + "number": 2, + "runId": "run2", + "type": "log2", + "values": { + "foo": false, + "x": 12, + }, + }, + { + "experimentId": "experiment2", + "number": 1, + "runId": "run1", + "type": "log2", + "values": { + "foo": true, + "x": 25, + "y": 0, + }, + }, +] +`; diff --git a/packages/log-server/__tests__/store.test.ts b/packages/log-server/__tests__/sqlite-store.test.ts similarity index 85% rename from packages/log-server/__tests__/store.test.ts rename to packages/log-server/__tests__/sqlite-store.test.ts index ce3f9016..568ddde5 100644 --- a/packages/log-server/__tests__/store.test.ts +++ b/packages/log-server/__tests__/sqlite-store.test.ts @@ -767,35 +767,44 @@ describe('SQLiteStore#getLogValueNames', () => { }); }); -describe('SQLiteStore#getLogs', () => { - let store: SQLiteStore; - beforeEach(async () => { - store = new SQLiteStore(':memory:'); - await store.migrateDatabase(); - await store.addRun({ runId: 'run1', experimentId: 'experiment1' }); - await store.addRun({ runId: 'run2', experimentId: 'experiment1' }); - await store.addRun({ runId: 'run1', experimentId: 'experiment2' }); - await store.addLogs('experiment1', 'run1', [ - { type: 'log1', number: 1, values: { msg: 'hello', recipient: 'Anna' } }, - { type: 'log1', number: 2, values: { msg: 'bonjour', recipient: 'Jo' } }, - ]); - await store.addLogs('experiment1', 'run2', [ - { type: 'log1', number: 1, values: { message: 'hola', bar: null } }, - { type: 'log2', number: 2, values: { x: 12, foo: false } }, - ]); - await store.addLogs('experiment2', 'run1', [ - { type: 'log2', number: 1, values: { x: 25, y: 0, foo: true } }, - ]); - await store.addLogs('experiment1', 'run1', [ - { type: 'log3', number: 3, values: { x: 25, y: 0, foo: true } }, - ]); - }); - afterEach(async () => { - await store.close(); - }); +for (const limit of [10000, 2]) { + describe(`SQLiteStore#getLogs (selectQueryLimit: ${limit})`, () => { + let store: SQLiteStore; + beforeEach(async () => { + store = new SQLiteStore(':memory:', { selectQueryLimit: limit }); + await store.migrateDatabase(); + await store.addRun({ runId: 'run1', experimentId: 'experiment1' }); + await store.addRun({ runId: 'run2', experimentId: 'experiment1' }); + await store.addRun({ runId: 'run1', experimentId: 'experiment2' }); + await store.addLogs('experiment1', 'run1', [ + { + type: 'log1', + number: 1, + values: { msg: 'hello', recipient: 'Anna' }, + }, + { + type: 'log1', + number: 2, + values: { msg: 'bonjour', recipient: 'Jo' }, + }, + ]); + await store.addLogs('experiment1', 'run2', [ + { type: 'log1', number: 1, values: { message: 'hola', bar: null } }, + { type: 'log2', number: 2, values: { x: 12, foo: false } }, + ]); + await store.addLogs('experiment2', 'run1', [ + { type: 'log2', number: 1, values: { x: 25, y: 0, foo: true } }, + ]); + await store.addLogs('experiment1', 'run1', [ + { type: 'log3', number: 3, values: { x: 25, y: 0, foo: true } }, + ]); + }); + afterEach(async () => { + await store.close(); + }); - it('should return the logs in order of experimentId, runId, and ascending number', async () => { - await expect(fromAsync(store.getLogs())).resolves.toMatchInlineSnapshot(` + it('should return the logs in order of experimentId, runId, and ascending number', async () => { + await expect(fromAsync(store.getLogs())).resolves.toMatchInlineSnapshot(` [ { "experimentId": "experiment1", @@ -861,17 +870,36 @@ describe('SQLiteStore#getLogs', () => { }, ] `); - }); - it('should ignore missing logs', async () => { - await store.addLogs('experiment2', 'run1', [ - { type: 'log1', number: 11, values: { msg: 'hello', recipient: 'Anna' } }, - { type: 'log1', number: 33, values: { msg: 'bonjour', recipient: 'Jo' } }, - ]); - await store.addLogs('experiment2', 'run1', [ - { type: 'log1', number: 22, values: { msg: 'hello', recipient: 'Anna' } }, - { type: 'log1', number: 44, values: { msg: 'bonjour', recipient: 'Jo' } }, - ]); - await expect(fromAsync(store.getLogs())).resolves.toMatchInlineSnapshot(` + }); + it('should return the logs in order of experimentId, runId, and ascending number', async () => { + await expect(fromAsync(store.getLogs())).resolves.toMatchSnapshot(); + }); + it('should ignore missing logs', async () => { + await store.addLogs('experiment2', 'run1', [ + { + type: 'log1', + number: 11, + values: { msg: 'hello', recipient: 'Anna' }, + }, + { + type: 'log1', + number: 33, + values: { msg: 'bonjour', recipient: 'Jo' }, + }, + ]); + await store.addLogs('experiment2', 'run1', [ + { + type: 'log1', + number: 22, + values: { msg: 'hello', recipient: 'Anna' }, + }, + { + type: 'log1', + number: 44, + values: { msg: 'bonjour', recipient: 'Jo' }, + }, + ]); + await expect(fromAsync(store.getLogs())).resolves.toMatchInlineSnapshot(` [ { "experimentId": "experiment1", @@ -977,10 +1005,10 @@ describe('SQLiteStore#getLogs', () => { }, ] `); - }); - it('should be able to filter logs of a particular type', async () => { - await expect(fromAsync(store.getLogs({ type: 'log1' }))).resolves - .toMatchInlineSnapshot(` + }); + it('should be able to filter logs of a particular type', async () => { + await expect(fromAsync(store.getLogs({ type: 'log1' }))).resolves + .toMatchInlineSnapshot(` [ { "experimentId": "experiment1", @@ -1014,8 +1042,8 @@ describe('SQLiteStore#getLogs', () => { }, ] `); - await expect(fromAsync(store.getLogs({ type: 'log2' }))).resolves - .toMatchInlineSnapshot(` + await expect(fromAsync(store.getLogs({ type: 'log2' }))).resolves + .toMatchInlineSnapshot(` [ { "experimentId": "experiment1", @@ -1040,10 +1068,10 @@ describe('SQLiteStore#getLogs', () => { }, ] `); - }); - it('should be able to filter logs from a particular experiment', async () => { - await expect(fromAsync(store.getLogs({ experimentId: 'experiment1' }))) - .resolves.toMatchInlineSnapshot(` + }); + it('should be able to filter logs from a particular experiment', async () => { + await expect(fromAsync(store.getLogs({ experimentId: 'experiment1' }))) + .resolves.toMatchInlineSnapshot(` [ { "experimentId": "experiment1", @@ -1098,8 +1126,8 @@ describe('SQLiteStore#getLogs', () => { }, ] `); - await expect(fromAsync(store.getLogs({ experimentId: 'experiment2' }))) - .resolves.toMatchInlineSnapshot(` + await expect(fromAsync(store.getLogs({ experimentId: 'experiment2' }))) + .resolves.toMatchInlineSnapshot(` [ { "experimentId": "experiment2", @@ -1114,10 +1142,10 @@ describe('SQLiteStore#getLogs', () => { }, ] `); - }); - it('should be able to filter logs from a particular run', async () => { - await expect(fromAsync(store.getLogs({ runId: 'run1' }))).resolves - .toMatchInlineSnapshot(` + }); + it('should be able to filter logs from a particular run', async () => { + await expect(fromAsync(store.getLogs({ runId: 'run1' }))).resolves + .toMatchInlineSnapshot(` [ { "experimentId": "experiment1", @@ -1163,8 +1191,8 @@ describe('SQLiteStore#getLogs', () => { }, ] `); - await expect(fromAsync(store.getLogs({ runId: 'run2' }))).resolves - .toMatchInlineSnapshot(` + await expect(fromAsync(store.getLogs({ runId: 'run2' }))).resolves + .toMatchInlineSnapshot(` [ { "experimentId": "experiment1", @@ -1188,11 +1216,11 @@ describe('SQLiteStore#getLogs', () => { }, ] `); - }); - it('should be able to filter logs by run, experiment, and type all at once', async () => { - await expect( - fromAsync(store.getLogs({ experimentId: 'experiment1', type: 'log2' })), - ).resolves.toMatchInlineSnapshot(` + }); + it('should be able to filter logs by run, experiment, and type all at once', async () => { + await expect( + fromAsync(store.getLogs({ experimentId: 'experiment1', type: 'log2' })), + ).resolves.toMatchInlineSnapshot(` [ { "experimentId": "experiment1", @@ -1206,15 +1234,15 @@ describe('SQLiteStore#getLogs', () => { }, ] `); - await expect( - fromAsync( - store.getLogs({ - experimentId: 'experiment1', - type: 'log1', - runId: 'run1', - }), - ), - ).resolves.toMatchInlineSnapshot(` + await expect( + fromAsync( + store.getLogs({ + experimentId: 'experiment1', + type: 'log1', + runId: 'run1', + }), + ), + ).resolves.toMatchInlineSnapshot(` [ { "experimentId": "experiment1", @@ -1238,33 +1266,35 @@ describe('SQLiteStore#getLogs', () => { }, ] `); - }); - it('should resolve with an empty array if no log matches the filter', async () => { - await expect( - fromAsync(store.getLogs({ experimentId: 'experiment2', type: 'log1' })), - ).resolves.toEqual([]); - await expect( - fromAsync(store.getLogs({ experimentId: 'do not exist' })), - ).resolves.toEqual([]); - await expect( - fromAsync(store.getLogs({ runId: 'do not exist' })), - ).resolves.toEqual([]); - await expect( - fromAsync(store.getLogs({ type: 'do not exist' })), - ).resolves.toEqual([]); - }); - it('should return logs added after resuming', async () => { - await store.resumeRun({ - experimentId: 'experiment2', - runId: 'run1', - resumeFrom: 2, }); - await store.addLogs('experiment2', 'run1', [ - { type: 'log3', number: 2, values: { x: 25, y: 0, foo: true } }, - ]); - await expect( - fromAsync(store.getLogs({ experimentId: 'experiment2', runId: 'run1' })), - ).resolves.toMatchInlineSnapshot(` + it('should resolve with an empty array if no log matches the filter', async () => { + await expect( + fromAsync(store.getLogs({ experimentId: 'experiment2', type: 'log1' })), + ).resolves.toEqual([]); + await expect( + fromAsync(store.getLogs({ experimentId: 'do not exist' })), + ).resolves.toEqual([]); + await expect( + fromAsync(store.getLogs({ runId: 'do not exist' })), + ).resolves.toEqual([]); + await expect( + fromAsync(store.getLogs({ type: 'do not exist' })), + ).resolves.toEqual([]); + }); + it('should return logs added after resuming', async () => { + await store.resumeRun({ + experimentId: 'experiment2', + runId: 'run1', + resumeFrom: 2, + }); + await store.addLogs('experiment2', 'run1', [ + { type: 'log3', number: 2, values: { x: 25, y: 0, foo: true } }, + ]); + await expect( + fromAsync( + store.getLogs({ experimentId: 'experiment2', runId: 'run1' }), + ), + ).resolves.toMatchInlineSnapshot(` [ { "experimentId": "experiment2", @@ -1290,16 +1320,18 @@ describe('SQLiteStore#getLogs', () => { }, ] `); - }); - it('should not return logs canceled from resuming', async () => { - await store.resumeRun({ - experimentId: 'experiment1', - runId: 'run2', - resumeFrom: 2, }); - await expect( - fromAsync(store.getLogs({ experimentId: 'experiment1', runId: 'run2' })), - ).resolves.toMatchInlineSnapshot(` + it('should not return logs canceled from resuming', async () => { + await store.resumeRun({ + experimentId: 'experiment1', + runId: 'run2', + resumeFrom: 2, + }); + await expect( + fromAsync( + store.getLogs({ experimentId: 'experiment1', runId: 'run2' }), + ), + ).resolves.toMatchInlineSnapshot(` [ { "experimentId": "experiment1", @@ -1313,32 +1345,36 @@ describe('SQLiteStore#getLogs', () => { }, ] `); - await store.resumeRun({ - experimentId: 'experiment1', - runId: 'run2', - resumeFrom: 1, - }); - await expect( - fromAsync(store.getLogs({ experimentId: 'experiment1', runId: 'run2' })), - ).resolves.toEqual([]); - }); - it('should return logs overwriting other logs after resuming', async () => { - await store.addLogs('experiment1', 'run2', [ - { type: 'log1', number: 3, values: { x: 5 } }, - { type: 'log1', number: 4, values: { x: 6 } }, - ]); - await store.resumeRun({ - experimentId: 'experiment1', - runId: 'run2', - resumeFrom: 2, + await store.resumeRun({ + experimentId: 'experiment1', + runId: 'run2', + resumeFrom: 1, + }); + await expect( + fromAsync( + store.getLogs({ experimentId: 'experiment1', runId: 'run2' }), + ), + ).resolves.toEqual([]); }); - await store.addLogs('experiment1', 'run2', [ - { type: 'overwriting', number: 2, values: { x: 1 } }, - { type: 'overwriting', number: 3, values: { x: 2 } }, - ]); - await expect( - fromAsync(store.getLogs({ experimentId: 'experiment1', runId: 'run2' })), - ).resolves.toMatchInlineSnapshot(` + it('should return logs overwriting other logs after resuming', async () => { + await store.addLogs('experiment1', 'run2', [ + { type: 'log1', number: 3, values: { x: 5 } }, + { type: 'log1', number: 4, values: { x: 6 } }, + ]); + await store.resumeRun({ + experimentId: 'experiment1', + runId: 'run2', + resumeFrom: 2, + }); + await store.addLogs('experiment1', 'run2', [ + { type: 'overwriting', number: 2, values: { x: 1 } }, + { type: 'overwriting', number: 3, values: { x: 2 } }, + ]); + await expect( + fromAsync( + store.getLogs({ experimentId: 'experiment1', runId: 'run2' }), + ), + ).resolves.toMatchInlineSnapshot(` [ { "experimentId": "experiment1", @@ -1370,8 +1406,9 @@ describe('SQLiteStore#getLogs', () => { }, ] `); + }); }); -}); +} async function fromAsync(iterable: AsyncIterable) { let values: T[] = []; From 8e8bd92eae0205bb1e4321ec5f78914fa195d746 Mon Sep 17 00:00:00 2001 From: Quentin Roy Date: Wed, 31 Jan 2024 11:22:33 +0100 Subject: [PATCH 3/5] log-server: changeset --- .changeset/old-zebras-tease.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/old-zebras-tease.md diff --git a/.changeset/old-zebras-tease.md b/.changeset/old-zebras-tease.md new file mode 100644 index 00000000..2a635f55 --- /dev/null +++ b/.changeset/old-zebras-tease.md @@ -0,0 +1,5 @@ +--- +'@lightmill/log-server': minor +--- + +Add a select query result limit for SQliteStore to prevent too many logs to be loaded in memory at the same time. Once the limit is attained logs are yielded and the next logs are loaded once their done being processed. From 5cf4748a6addbedd43f60e674495749ce6958e8c Mon Sep 17 00:00:00 2001 From: Quentin Roy Date: Wed, 31 Jan 2024 16:03:41 +0100 Subject: [PATCH 4/5] fix missing logs with limitSelectQuery --- .../src/db-migrations/2023-08-07-init.ts | 5 ++ packages/log-server/src/store.ts | 56 +++++++++++++------ 2 files changed, 44 insertions(+), 17 deletions(-) diff --git a/packages/log-server/src/db-migrations/2023-08-07-init.ts b/packages/log-server/src/db-migrations/2023-08-07-init.ts index fe132561..ae005bfb 100644 --- a/packages/log-server/src/db-migrations/2023-08-07-init.ts +++ b/packages/log-server/src/db-migrations/2023-08-07-init.ts @@ -218,5 +218,10 @@ export async function up(db: Kysely) { SELECT RAISE(ABORT, 'Cannot delete existing log value'); END; `.execute(trx); + await trx.schema + .createIndex('logValueLogIdName') + .on('logValue') + .columns(['logId', 'name']) + .execute(); }); } diff --git a/packages/log-server/src/store.ts b/packages/log-server/src/store.ts index d05ab52b..33eaa9ac 100644 --- a/packages/log-server/src/store.ts +++ b/packages/log-server/src/store.ts @@ -15,7 +15,7 @@ import { } from 'kysely'; import { JsonObject } from 'type-fest'; import loglevel, { LogLevelDesc } from 'loglevel'; -import { groupBy, last, pick } from 'remeda'; +import { groupBy, last } from 'remeda'; import { arrayify } from './utils.js'; const __dirname = url.fileURLToPath(new URL('.', import.meta.url)); @@ -456,15 +456,16 @@ export class SQLiteStore { async *#getLogValues(filter: LogFilter) { let lastRow: { - number: number; + logNumber: number; logId: number; experimentId: string; runId: string; + name: string; } | null = null; let isFirst = true; let isDone = false; while (!isDone) { - let result = await this.#db + let query = this.#db .selectFrom('runLogView as l') .innerJoin('logValue as v', 'l.logId', 'v.logId') .$if(filter.experimentId != null, (qb) => @@ -476,29 +477,46 @@ export class SQLiteStore { .$if(filter.type != null, (qb) => qb.where('l.type', 'in', arrayify(filter.type, true)), ) - .$if(!isFirst, (qb) => { - if (lastRow === null) throw new Error('lastRow is null'); - return qb - .where('l.experimentId', '>=', lastRow.experimentId) - .where('l.runId', '>=', lastRow.runId) - .where('l.logNumber', '>', lastRow.number); - }) .where('l.type', 'is not', null) .select([ 'l.experimentId as experimentId', 'l.runId as runId', 'l.logId as logId', 'l.type as type', - 'l.logNumber as number', - 'v.name', - 'v.value', + 'l.logNumber as logNumber', + 'v.name as name', + 'v.value as value', ]) .$narrowType<{ type: string }>() .orderBy('experimentId') .orderBy('runId') - .orderBy('number') + .orderBy('logNumber') + .orderBy('name') .limit(this.#selectQueryLimit) - .execute(); + .$if(!isFirst, (qb) => + qb.where((eb) => { + if (lastRow === null) throw new Error('lastRow is null'); + return eb.or([ + eb('experimentId', '>', lastRow.experimentId), + eb.and([ + eb('experimentId', '=', lastRow.experimentId), + eb('runId', '>', lastRow.runId), + ]), + eb.and([ + eb('experimentId', '=', lastRow.experimentId), + eb('runId', '=', lastRow.runId), + eb('logNumber', '>', lastRow.logNumber), + ]), + eb.and([ + eb('experimentId', '=', lastRow.experimentId), + eb('runId', '=', lastRow.runId), + eb('logNumber', '=', lastRow.logNumber), + eb('name', '>', lastRow.name), + ]), + ]); + }), + ); + let result = await query.execute(); isFirst = false; lastRow = last(result) ?? null; isDone = lastRow == null; @@ -513,9 +531,13 @@ export class SQLiteStore { if (first.done) return; let currentValues = [first.value]; - function getLogFromCurrentValues() { + function getLogFromCurrentValues(): Log { + let { experimentId, runId, logNumber, type } = currentValues[0]; return { - ...pick(currentValues[0], ['experimentId', 'runId', 'number', 'type']), + experimentId, + runId, + number: logNumber, + type, values: reconstructValues(currentValues), }; } From 8e7acd82442f09bea65f1bdfaa39271adc1580e6 Mon Sep 17 00:00:00 2001 From: Quentin Roy Date: Wed, 31 Jan 2024 16:04:58 +0100 Subject: [PATCH 5/5] fix lint --- packages/log-server/src/db-migrations/2023-08-07-init.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/log-server/src/db-migrations/2023-08-07-init.ts b/packages/log-server/src/db-migrations/2023-08-07-init.ts index ae005bfb..a0f833e5 100644 --- a/packages/log-server/src/db-migrations/2023-08-07-init.ts +++ b/packages/log-server/src/db-migrations/2023-08-07-init.ts @@ -219,9 +219,9 @@ export async function up(db: Kysely) { END; `.execute(trx); await trx.schema - .createIndex('logValueLogIdName') - .on('logValue') - .columns(['logId', 'name']) - .execute(); + .createIndex('logValueLogIdName') + .on('logValue') + .columns(['logId', 'name']) + .execute(); }); }