From a514726225d9e8f7c07f7d8ba631dc5c4c6dcd5d Mon Sep 17 00:00:00 2001 From: Jakub Riedl Date: Tue, 13 May 2025 18:49:27 +1000 Subject: [PATCH] Implement partial autoload in postgres-persister --- src/persisters/common/database/postgresql.ts | 94 +++++++++++++++++++- 1 file changed, 90 insertions(+), 4 deletions(-) diff --git a/src/persisters/common/database/postgresql.ts b/src/persisters/common/database/postgresql.ts index ca9052bdc9..0ac84a663d 100644 --- a/src/persisters/common/database/postgresql.ts +++ b/src/persisters/common/database/postgresql.ts @@ -2,11 +2,13 @@ import type { DatabaseChangeListener, DatabaseExecuteCommand, DatabasePersisterConfig, + PersistedChanges, PersistedStore, Persister, PersisterListener, Persists, } from '../../../@types/persisters/index.d.ts'; +import type {Changes} from '../../../@types/store/index.d.ts'; import {arrayJoin, arrayMap} from '../../../common/array.ts'; import {collHas, collValues} from '../../../common/coll.ts'; import {getHash} from '../../../common/hash.ts'; @@ -16,6 +18,7 @@ import { jsonStringWithUndefined, } from '../../../common/json.ts'; import {mapGet} from '../../../common/map.ts'; +import {objDel, objMap} from '../../../common/obj.ts'; import {ifNotUndefined, promiseAll} from '../../../common/other.ts'; import { EMPTY_STRING, @@ -27,6 +30,7 @@ import { import { CREATE, CREATE_TABLE, + DEFAULT_ROW_ID_COLUMN_NAME, DELETE, FUNCTION, INSERT, @@ -46,7 +50,8 @@ import {createTabularPersister} from './tabular.ts'; const TABLE_CREATED = 'c'; const DATA_CHANGED = 'd'; -const EVENT_REGEX = /^([cd]:)(.+)/; +const CHANGES_PAYLOAD_JSON = `json_build_object('NEW',NEW,'OLD',OLD)::text`; +const EVENT_REGEX = /^([cd]:)(.+?)(?::(.+))?$/; export const createCustomPostgreSqlPersister = < ListenerHandle, @@ -133,6 +138,85 @@ export const createCustomPostgreSqlPersister = < newOrOldOrBoth == 0 ? 'NEW' : 'OLD', ); + const detectChanges = ( + tableName: string, + payload: string | undefined, + ): Changes | undefined => { + // trigger isn't row change but table change we want to trigger full reload + if (!payload) return undefined; + // with JSON mode we can always trigger full reload as it's always referring to single row anyways + if (isJson) return undefined; + + const {NEW, OLD} = JSON.parse(payload) as { + NEW: Record | undefined | null; + OLD: Record | undefined | null; + }; + + const tabularLoadConfig = defaultedConfig[0]; + const valuesConfig = defaultedConfig[2]; + const shouldLoadValues = + typeof valuesConfig === 'string' ? true : valuesConfig[0]; + const valuesTableName = + typeof valuesConfig === 'string' ? valuesConfig : valuesConfig[2]; + + if (shouldLoadValues && tableName === valuesTableName) { + return [ + {}, + objMap(objDel(NEW ?? OLD ?? {}, DEFAULT_ROW_ID_COLUMN_NAME), (field) => + jsonParse(field as string), + ), + 1, + ]; + } + if (!managedTableNamesSet.has(tableName) || !tabularLoadConfig) { + return undefined; + } + + const rowIdColumnName = + typeof tabularLoadConfig === 'string' + ? DEFAULT_ROW_ID_COLUMN_NAME + : (tabularLoadConfig.get(tableName)?.[1] ?? DEFAULT_ROW_ID_COLUMN_NAME); + + const tableId = + typeof tabularLoadConfig === 'string' + ? tabularLoadConfig + : (tabularLoadConfig.get(tableName)?.[0] ?? tableName); + + // row delete + if (!NEW && OLD) { + const rowId = OLD[rowIdColumnName]; + return [{[tableId]: {[rowId]: undefined}}, {}, 1]; + } + + // row insert + if (NEW && !OLD) { + const rowId = NEW[rowIdColumnName]; + const row = objMap(objDel(NEW, rowIdColumnName), (field) => + jsonParse(field as string), + ); + return [{[tableId]: {[rowId]: row}}, {}, 1]; + } + + // row update + if (NEW && OLD) { + const rowId = NEW[rowIdColumnName]; + const changedCells: Record = {}; + let hasChanged = false; + + for (const key in NEW) { + if (NEW[key] !== OLD[key] && key !== rowIdColumnName) { + changedCells[key] = jsonParse(NEW[key]); + hasChanged = true; + } + } + + if (!hasChanged) return undefined; + return [{[tableId]: {[rowId]: changedCells}}, {}, 1]; + } + + return undefined; + }; + const addDataChangedTriggers = ( tableName: string, dataChangedFunction: string, @@ -171,7 +255,8 @@ export const createCustomPostgreSqlPersister = < const dataChangedFunctionName = await createFunction( DATA_CHANGED, - notify(`'d:'||TG_TABLE_NAME`) + `RETURN NULL;`, + notify(`'d:'||TG_TABLE_NAME||':'||${CHANGES_PAYLOAD_JSON}`) + + `RETURN NULL;`, ); await promiseAll( @@ -189,7 +274,7 @@ export const createCustomPostgreSqlPersister = < (prefixAndTableName) => ifNotUndefined( strMatch(prefixAndTableName, EVENT_REGEX), - async ([, eventType, tableName]) => { + async ([, eventType, tableName, payload]) => { if (collHas(managedTableNamesSet, tableName)) { if (eventType == 'c:') { await addDataChangedTriggers( @@ -197,7 +282,8 @@ export const createCustomPostgreSqlPersister = < dataChangedFunctionName, ); } - listener(); + const changes = detectChanges(tableName, payload); + listener(undefined, changes as PersistedChanges); } }, ),