From dca16aaaf5f87a68db57a9ecdfaa2a8dae4df218 Mon Sep 17 00:00:00 2001 From: Benjamin Piouffle Date: Wed, 4 Dec 2024 12:15:10 +0100 Subject: [PATCH] feat(ElasticSearch): Postgres synchronization --- config/default.json | 4 + config/test.json | 4 + package-lock.json | 21 +++ package.json | 1 + server/index.js | 25 ++- .../ElasticSearchCollectivesAdapter.ts | 13 +- .../adapters/ElasticSearchCommentsAdapter.ts | 13 +- .../adapters/ElasticSearchExpensesAdapter.ts | 20 ++- .../ElasticSearchHostApplicationsAdapter.ts | 8 +- .../adapters/ElasticSearchOrdersAdapter.ts | 12 +- .../adapters/ElasticSearchTiersAdapter.ts | 12 +- .../ElasticSearchTransactionsAdapter.ts | 8 +- .../adapters/ElasticSearchUpdatesAdapter.ts | 12 +- server/lib/elastic-search/adapters/index.ts | 14 ++ server/lib/elastic-search/batch-processor.ts | 164 ++++++++++++++++++ server/lib/elastic-search/sync-postgres.ts | 119 +++++++++++++ server/lib/elastic-search/types.ts | 33 ++++ server/lib/sentry.ts | 1 + server/models/Collective.ts | 9 +- 19 files changed, 438 insertions(+), 55 deletions(-) create mode 100644 server/lib/elastic-search/batch-processor.ts create mode 100644 server/lib/elastic-search/sync-postgres.ts create mode 100644 server/lib/elastic-search/types.ts diff --git a/config/default.json b/config/default.json index e941e8200f7..5b0dbff0220 100644 --- a/config/default.json +++ b/config/default.json @@ -1,5 +1,9 @@ { "port": "3060", + "services": { + "server": true, + "searchSync": true + }, "mailpit": { "client": false }, diff --git a/config/test.json b/config/test.json index 60ec13e2174..a17a1ecd112 100644 --- a/config/test.json +++ b/config/test.json @@ -1,5 +1,9 @@ { "port": "3061", + "services": { + "server": true, + "searchSync": false + }, "database": { "url": "postgres://opencollective@127.0.0.1:5432/opencollective_test" }, diff --git a/package-lock.json b/package-lock.json index f235a4cee8d..8fb8639fec1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -95,6 +95,7 @@ "pg": "8.13.1", "pg-connection-string": "2.7.0", "pg-format": "1.0.4", + "pg-listen": "1.7.0", "plaid": "29.0.0", "prepend-http": "3.0.1", "redis": "4.6.6", @@ -23011,6 +23012,20 @@ "node": ">=4.0.0" } }, + "node_modules/pg-listen": { + "version": "1.7.0", + "resolved": "https://registry.npmjs.org/pg-listen/-/pg-listen-1.7.0.tgz", + "integrity": "sha512-MKDwKLm4ryhy7iq1yw1K1MvUzBdTkaT16HZToddX9QaT8XSdt3Kins5mYH6DLECGFzFWG09VdXvWOIYogjXrsg==", + "license": "MIT", + "dependencies": { + "debug": "^4.1.1", + "pg-format": "^1.0.4", + "typed-emitter": "^0.1.0" + }, + "peerDependencies": { + "pg": "7.x || 8.x" + } + }, "node_modules/pg-pool": { "version": "3.7.0", "resolved": "https://registry.npmjs.org/pg-pool/-/pg-pool-3.7.0.tgz", @@ -26027,6 +26042,12 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/typed-emitter": { + "version": "0.1.0", + "resolved": "https://registry.npmjs.org/typed-emitter/-/typed-emitter-0.1.0.tgz", + "integrity": "sha512-Tfay0l6gJMP5rkil8CzGbLthukn+9BN/VXWcABVFPjOoelJ+koW8BuPZYk+h/L+lEeIp1fSzVRiWRPIjKVjPdg==", + "license": "MIT" + }, "node_modules/typedarray": { "version": "0.0.6", "resolved": "https://registry.npmjs.org/typedarray/-/typedarray-0.0.6.tgz", diff --git a/package.json b/package.json index b1e34366079..9a4a53d61f9 100644 --- a/package.json +++ b/package.json @@ -116,6 +116,7 @@ "pg": "8.13.1", "pg-connection-string": "2.7.0", "pg-format": "1.0.4", + "pg-listen": "1.7.0", "plaid": "29.0.0", "prepend-http": "3.0.1", "redis": "4.6.6", diff --git a/server/index.js b/server/index.js index 0277b08d228..5a9f8b6848c 100644 --- a/server/index.js +++ b/server/index.js @@ -7,14 +7,16 @@ import config from 'config'; import express from 'express'; import throng from 'throng'; +import { startElasticSearchPostgresSync } from './lib/elastic-search/sync-postgres'; import expressLib from './lib/express'; import logger from './lib/logger'; import { updateCachedFidoMetadata } from './lib/two-factor-authentication/fido-metadata'; +import { parseToBoolean } from './lib/utils'; import routes from './routes'; const workers = process.env.WEB_CONCURRENCY || 1; -async function start(i) { +async function startExpressServer(workerId) { const expressApp = express(); await updateCachedFidoMetadata(); @@ -35,7 +37,7 @@ async function start(i) { host, server.address().port, config.env, - i, + workerId, ); }); @@ -45,15 +47,22 @@ async function start(i) { return expressApp; } +// Start the express server let app; +if (parseToBoolean(config.services.server)) { + if (['production', 'staging'].includes(config.env) && workers > 1) { + throng({ worker: startExpressServer, count: workers }); + } else { + app = startExpressServer(1); + } +} -if (['production', 'staging'].includes(config.env) && workers > 1) { - throng({ worker: start, count: workers }); -} else { - app = start(1); +// Start the search sync job +if (parseToBoolean(config.services.searchSync)) { + startElasticSearchPostgresSync(); } // This is used by tests -export default async function () { - return app ? app : start(1); +export default async function startServerForTest() { + return app ? app : parseToBoolean(config.services.server) ? startExpressServer(1) : null; } diff --git a/server/lib/elastic-search/adapters/ElasticSearchCollectivesAdapter.ts b/server/lib/elastic-search/adapters/ElasticSearchCollectivesAdapter.ts index ae4e2f4ce98..b6319d735bd 100644 --- a/server/lib/elastic-search/adapters/ElasticSearchCollectivesAdapter.ts +++ b/server/lib/elastic-search/adapters/ElasticSearchCollectivesAdapter.ts @@ -1,11 +1,12 @@ -import models, { Op } from '../../../models'; +import { Op } from '../../../models'; +import Collective from '../../../models/Collective'; import { stripHTMLOrEmpty } from '../../sanitize-html'; import { ElasticSearchIndexName } from '../constants'; import { ElasticSearchModelAdapter } from './ElasticSearchModelAdapter'; export class ElasticSearchCollectivesAdapter implements ElasticSearchModelAdapter { - public readonly model = models.Collective; + public readonly model = Collective; public readonly index = ElasticSearchIndexName.COLLECTIVES; public readonly mappings = { properties: { @@ -38,8 +39,8 @@ export class ElasticSearchCollectivesAdapter implements ElasticSearchModelAdapte maxId?: number; ids?: number[]; } = {}, - ): Promise>> { - return models.Collective.findAll({ + ): Promise>> { + return Collective.findAll({ attributes: Object.keys(this.mappings.properties), order: [['id', 'DESC']], limit: options.limit, @@ -54,7 +55,7 @@ export class ElasticSearchCollectivesAdapter implements ElasticSearchModelAdapte } public mapModelInstanceToDocument( - instance: InstanceType, + instance: InstanceType, ): Record { return { id: instance.id, @@ -72,7 +73,7 @@ export class ElasticSearchCollectivesAdapter implements ElasticSearchModelAdapte isActive: instance.isActive, isHostAccount: instance.isHostAccount, deactivatedAt: instance.deactivatedAt, - HostCollectiveId: instance.HostCollectiveId, + HostCollectiveId: !instance.isActive ? null : instance.HostCollectiveId, ParentCollectiveId: instance.ParentCollectiveId, }; } diff --git a/server/lib/elastic-search/adapters/ElasticSearchCommentsAdapter.ts b/server/lib/elastic-search/adapters/ElasticSearchCommentsAdapter.ts index 7848270d7d0..1b772daf265 100644 --- a/server/lib/elastic-search/adapters/ElasticSearchCommentsAdapter.ts +++ b/server/lib/elastic-search/adapters/ElasticSearchCommentsAdapter.ts @@ -2,15 +2,14 @@ import { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types'; import { omit } from 'lodash'; import { Op } from 'sequelize'; -import models from '../../../models'; -import { CommentType } from '../../../models/Comment'; +import Comment, { CommentType } from '../../../models/Comment'; import { stripHTMLOrEmpty } from '../../sanitize-html'; import { ElasticSearchIndexName } from '../constants'; import { ElasticSearchModelAdapter } from './ElasticSearchModelAdapter'; export class ElasticSearchCommentsAdapter implements ElasticSearchModelAdapter { - public readonly model = models.Comment; + public readonly model = Comment; public readonly index = ElasticSearchIndexName.COMMENTS; public readonly mappings = { properties: { @@ -38,7 +37,7 @@ export class ElasticSearchCommentsAdapter implements ElasticSearchModelAdapter { ids?: number[]; } = {}, ) { - return models.Comment.findAll({ + return Comment.findAll({ attributes: omit(Object.keys(this.mappings.properties), ['HostCollectiveId', 'ParentCollectiveId']), order: [['id', 'DESC']], limit: options.limit, @@ -52,7 +51,7 @@ export class ElasticSearchCommentsAdapter implements ElasticSearchModelAdapter { { association: 'collective', required: true, - attributes: ['HostCollectiveId', 'ParentCollectiveId'], + attributes: ['isActive', 'HostCollectiveId', 'ParentCollectiveId'], }, { association: 'expense', @@ -69,7 +68,7 @@ export class ElasticSearchCommentsAdapter implements ElasticSearchModelAdapter { } public mapModelInstanceToDocument( - instance: InstanceType, + instance: InstanceType, ): Record { return { id: instance.id, @@ -84,7 +83,7 @@ export class ElasticSearchCommentsAdapter implements ElasticSearchModelAdapter { HostCollectiveId: instance.expense?.HostCollectiveId ?? instance.hostApplication?.HostCollectiveId ?? - instance.collective.HostCollectiveId, + (!instance.collective.isActive ? null : instance.collective.HostCollectiveId), }; } diff --git a/server/lib/elastic-search/adapters/ElasticSearchExpensesAdapter.ts b/server/lib/elastic-search/adapters/ElasticSearchExpensesAdapter.ts index da46b533ebf..b13bf5047ac 100644 --- a/server/lib/elastic-search/adapters/ElasticSearchExpensesAdapter.ts +++ b/server/lib/elastic-search/adapters/ElasticSearchExpensesAdapter.ts @@ -1,14 +1,14 @@ import { omit } from 'lodash'; import { Op } from 'sequelize'; -import models from '../../../models'; +import Expense from '../../../models/Expense'; import { stripHTMLOrEmpty } from '../../sanitize-html'; import { ElasticSearchIndexName } from '../constants'; import { ElasticSearchModelAdapter } from './ElasticSearchModelAdapter'; export class ElasticSearchExpensesAdapter implements ElasticSearchModelAdapter { - public readonly model = models.Expense; + public readonly model = Expense; public readonly index = ElasticSearchIndexName.EXPENSES; public readonly mappings = { properties: { @@ -43,7 +43,7 @@ export class ElasticSearchExpensesAdapter implements ElasticSearchModelAdapter { ids?: number[]; } = {}, ) { - return models.Expense.findAll({ + return Expense.findAll({ attributes: omit(Object.keys(this.mappings.properties), ['ParentCollectiveId', 'items']), order: [['id', 'DESC']], limit: options.limit, @@ -57,7 +57,12 @@ export class ElasticSearchExpensesAdapter implements ElasticSearchModelAdapter { { association: 'collective', required: true, - attributes: ['HostCollectiveId', 'ParentCollectiveId'], + attributes: ['isActive', 'HostCollectiveId', 'ParentCollectiveId'], + }, + { + association: 'items', + required: true, + attributes: ['description'], }, { association: 'items', @@ -69,7 +74,7 @@ export class ElasticSearchExpensesAdapter implements ElasticSearchModelAdapter { } public mapModelInstanceToDocument( - instance: InstanceType, + instance: InstanceType, ): Record { return { id: instance.id, @@ -87,8 +92,9 @@ export class ElasticSearchExpensesAdapter implements ElasticSearchModelAdapter { ParentCollectiveId: instance.collective.ParentCollectiveId, FromCollectiveId: instance.FromCollectiveId, UserId: instance.UserId, - HostCollectiveId: instance.HostCollectiveId || instance.collective.HostCollectiveId, - items: instance.items.map(item => item.description).join(' '), + items: instance.items.map(item => item.description).join(', '), + HostCollectiveId: + instance.HostCollectiveId || (!instance.collective.isActive ? null : instance.collective.HostCollectiveId), }; } diff --git a/server/lib/elastic-search/adapters/ElasticSearchHostApplicationsAdapter.ts b/server/lib/elastic-search/adapters/ElasticSearchHostApplicationsAdapter.ts index a6430de094d..a4878175ef4 100644 --- a/server/lib/elastic-search/adapters/ElasticSearchHostApplicationsAdapter.ts +++ b/server/lib/elastic-search/adapters/ElasticSearchHostApplicationsAdapter.ts @@ -1,13 +1,13 @@ import { omit } from 'lodash'; import { Op } from 'sequelize'; -import models from '../../../models'; +import HostApplication from '../../../models/HostApplication'; import { ElasticSearchIndexName } from '../constants'; import { ElasticSearchModelAdapter } from './ElasticSearchModelAdapter'; export class ElasticSearchHostApplicationsAdapter implements ElasticSearchModelAdapter { - public readonly model = models.HostApplication; + public readonly model = HostApplication; public readonly index = ElasticSearchIndexName.HOST_APPLICATIONS; public readonly mappings = { properties: { @@ -33,7 +33,7 @@ export class ElasticSearchHostApplicationsAdapter implements ElasticSearchModelA ids?: number[]; } = {}, ) { - return models.HostApplication.findAll({ + return HostApplication.findAll({ attributes: omit(Object.keys(this.mappings.properties), ['ParentCollectiveId']), order: [['id', 'DESC']], limit: options.limit, @@ -54,7 +54,7 @@ export class ElasticSearchHostApplicationsAdapter implements ElasticSearchModelA } public mapModelInstanceToDocument( - instance: InstanceType, + instance: InstanceType, ): Record { return { id: instance.id, diff --git a/server/lib/elastic-search/adapters/ElasticSearchOrdersAdapter.ts b/server/lib/elastic-search/adapters/ElasticSearchOrdersAdapter.ts index 8eb250bf547..10a2d6fdce4 100644 --- a/server/lib/elastic-search/adapters/ElasticSearchOrdersAdapter.ts +++ b/server/lib/elastic-search/adapters/ElasticSearchOrdersAdapter.ts @@ -1,13 +1,13 @@ import { omit } from 'lodash'; import { Op } from 'sequelize'; -import models from '../../../models'; +import Order from '../../../models/Order'; import { ElasticSearchIndexName } from '../constants'; import { ElasticSearchModelAdapter } from './ElasticSearchModelAdapter'; export class ElasticSearchOrdersAdapter implements ElasticSearchModelAdapter { - public readonly model = models.Order; + public readonly model = Order; public readonly index = ElasticSearchIndexName.ORDERS; public readonly mappings = { properties: { @@ -34,7 +34,7 @@ export class ElasticSearchOrdersAdapter implements ElasticSearchModelAdapter { ids?: number[]; } = {}, ) { - return models.Order.findAll({ + return Order.findAll({ attributes: omit(Object.keys(this.mappings.properties), ['HostCollectiveId', 'ParentCollectiveId']), order: [['id', 'DESC']], limit: options.limit, @@ -48,7 +48,7 @@ export class ElasticSearchOrdersAdapter implements ElasticSearchModelAdapter { { association: 'collective', required: true, - attributes: ['HostCollectiveId', 'ParentCollectiveId'], + attributes: ['isActive', 'HostCollectiveId', 'ParentCollectiveId'], }, { model: models.Subscription, @@ -60,7 +60,7 @@ export class ElasticSearchOrdersAdapter implements ElasticSearchModelAdapter { } public mapModelInstanceToDocument( - instance: InstanceType, + instance: InstanceType, ): Record { return { id: instance.id, @@ -69,7 +69,7 @@ export class ElasticSearchOrdersAdapter implements ElasticSearchModelAdapter { description: instance.description, CollectiveId: instance.CollectiveId, FromCollectiveId: instance.FromCollectiveId, - HostCollectiveId: instance.collective.HostCollectiveId, + HostCollectiveId: !instance.collective.isActive ? null : instance.collective.HostCollectiveId, ParentCollectiveId: instance.collective.ParentCollectiveId, paypalSubscriptionId: instance.Subscription?.paypalSubscriptionId, }; diff --git a/server/lib/elastic-search/adapters/ElasticSearchTiersAdapter.ts b/server/lib/elastic-search/adapters/ElasticSearchTiersAdapter.ts index d96509c5b26..8686eab0304 100644 --- a/server/lib/elastic-search/adapters/ElasticSearchTiersAdapter.ts +++ b/server/lib/elastic-search/adapters/ElasticSearchTiersAdapter.ts @@ -1,14 +1,14 @@ import { omit } from 'lodash'; import { Op } from 'sequelize'; -import models from '../../../models'; +import Tier from '../../../models/Tier'; import { stripHTMLOrEmpty } from '../../sanitize-html'; import { ElasticSearchIndexName } from '../constants'; import { ElasticSearchModelAdapter } from './ElasticSearchModelAdapter'; export class ElasticSearchTiersAdapter implements ElasticSearchModelAdapter { - public readonly model = models.Tier; + public readonly model = Tier; public readonly index = ElasticSearchIndexName.TIERS; public readonly mappings = { properties: { @@ -37,7 +37,7 @@ export class ElasticSearchTiersAdapter implements ElasticSearchModelAdapter { ids?: number[]; } = {}, ) { - return models.Tier.findAll({ + return Tier.findAll({ attributes: omit(Object.keys(this.mappings.properties), ['HostCollectiveId', 'ParentCollectiveId']), order: [['id', 'DESC']], limit: options.limit, @@ -51,14 +51,14 @@ export class ElasticSearchTiersAdapter implements ElasticSearchModelAdapter { { association: 'Collective', required: true, - attributes: ['HostCollectiveId', 'ParentCollectiveId'], + attributes: ['isActive', 'HostCollectiveId', 'ParentCollectiveId'], }, ], }); } public mapModelInstanceToDocument( - instance: InstanceType, + instance: InstanceType, ): Record { return { id: instance.id, @@ -70,7 +70,7 @@ export class ElasticSearchTiersAdapter implements ElasticSearchModelAdapter { description: instance.description, longDescription: stripHTMLOrEmpty(instance.longDescription), CollectiveId: instance.CollectiveId, - HostCollectiveId: instance.Collective.HostCollectiveId, + HostCollectiveId: !instance.Collective.isActive ? null : instance.Collective.HostCollectiveId, ParentCollectiveId: instance.Collective.ParentCollectiveId, }; } diff --git a/server/lib/elastic-search/adapters/ElasticSearchTransactionsAdapter.ts b/server/lib/elastic-search/adapters/ElasticSearchTransactionsAdapter.ts index 1c8fef34f5f..22bd095f74b 100644 --- a/server/lib/elastic-search/adapters/ElasticSearchTransactionsAdapter.ts +++ b/server/lib/elastic-search/adapters/ElasticSearchTransactionsAdapter.ts @@ -1,13 +1,13 @@ import { omit } from 'lodash'; import { Op } from 'sequelize'; -import models from '../../../models'; +import Transaction from '../../../models/Transaction'; import { ElasticSearchIndexName } from '../constants'; import { ElasticSearchModelAdapter } from './ElasticSearchModelAdapter'; export class ElasticSearchTransactionsAdapter implements ElasticSearchModelAdapter { - public readonly model = models.Transaction; + public readonly model = Transaction; public readonly index = ElasticSearchIndexName.TRANSACTIONS; public readonly mappings = { properties: { @@ -37,7 +37,7 @@ export class ElasticSearchTransactionsAdapter implements ElasticSearchModelAdapt ids?: number[]; } = {}, ) { - return models.Transaction.findAll({ + return Transaction.findAll({ attributes: omit(Object.keys(this.mappings.properties), ['merchantId']), order: [['id', 'DESC']], limit: options.limit, @@ -51,7 +51,7 @@ export class ElasticSearchTransactionsAdapter implements ElasticSearchModelAdapt } public mapModelInstanceToDocument( - instance: InstanceType, + instance: InstanceType, ): Record { return { id: instance.id, diff --git a/server/lib/elastic-search/adapters/ElasticSearchUpdatesAdapter.ts b/server/lib/elastic-search/adapters/ElasticSearchUpdatesAdapter.ts index 3d56ef81b57..23d452f63cf 100644 --- a/server/lib/elastic-search/adapters/ElasticSearchUpdatesAdapter.ts +++ b/server/lib/elastic-search/adapters/ElasticSearchUpdatesAdapter.ts @@ -1,14 +1,14 @@ import { omit } from 'lodash'; import { Op } from 'sequelize'; -import models from '../../../models'; +import Update from '../../../models/Update'; import { stripHTMLOrEmpty } from '../../sanitize-html'; import { ElasticSearchIndexName } from '../constants'; import { ElasticSearchModelAdapter } from './ElasticSearchModelAdapter'; export class ElasticSearchUpdatesAdapter implements ElasticSearchModelAdapter { - public readonly model = models.Update; + public readonly model = Update; public readonly index = ElasticSearchIndexName.UPDATES; public readonly mappings = { properties: { @@ -38,7 +38,7 @@ export class ElasticSearchUpdatesAdapter implements ElasticSearchModelAdapter { ids?: number[]; } = {}, ) { - return models.Update.findAll({ + return Update.findAll({ attributes: omit(Object.keys(this.mappings.properties), ['HostCollectiveId', 'ParentCollectiveId']), order: [['id', 'DESC']], limit: options.limit, @@ -52,14 +52,14 @@ export class ElasticSearchUpdatesAdapter implements ElasticSearchModelAdapter { { association: 'collective', required: true, - attributes: ['HostCollectiveId', 'ParentCollectiveId'], + attributes: ['isActive', 'HostCollectiveId', 'ParentCollectiveId'], }, ], }); } public mapModelInstanceToDocument( - instance: InstanceType, + instance: InstanceType, ): Record { return { id: instance.id, @@ -72,7 +72,7 @@ export class ElasticSearchUpdatesAdapter implements ElasticSearchModelAdapter { CollectiveId: instance.CollectiveId, FromCollectiveId: instance.FromCollectiveId, CreatedByUserId: instance.CreatedByUserId, - HostCollectiveId: instance.collective.HostCollectiveId, + HostCollectiveId: !instance.collective.isActive ? null : instance.collective.HostCollectiveId, ParentCollectiveId: instance.collective.ParentCollectiveId, }; } diff --git a/server/lib/elastic-search/adapters/index.ts b/server/lib/elastic-search/adapters/index.ts index 2db4bfa7494..9fa9bbea564 100644 --- a/server/lib/elastic-search/adapters/index.ts +++ b/server/lib/elastic-search/adapters/index.ts @@ -20,3 +20,17 @@ export const ElasticSearchModelsAdapters: Record = Object.values( + ElasticSearchModelsAdapters, +).reduce( + (acc, adapter) => { + acc[adapter.model.tableName] = adapter; + return acc; + }, + {} as Record, +); + +export const getAdapterFromTableName = (table: string): ElasticSearchModelAdapter | undefined => { + return AdaptersFromTableNames[table]; +}; diff --git a/server/lib/elastic-search/batch-processor.ts b/server/lib/elastic-search/batch-processor.ts new file mode 100644 index 00000000000..af9f973c448 --- /dev/null +++ b/server/lib/elastic-search/batch-processor.ts @@ -0,0 +1,164 @@ +import { Client } from '@elastic/elasticsearch'; +import { BulkOperationContainer } from '@elastic/elasticsearch/lib/api/types'; +import { groupBy } from 'lodash'; + +import logger from '../logger'; + +import { getAdapterFromTableName } from './adapters'; +import { getElasticSearchClient } from './client'; +import { ElasticSearchRequest, ElasticSearchRequestType, isFullAccountReIndexRequest } from './types'; + +/** + * This class processes ElasticSearch requests in batches, to reduce the number of requests sent to + * the server. + */ +export class ElasticSearchBatchProcessor { + private static instance: ElasticSearchBatchProcessor; + private client: Client; + private queue: ElasticSearchRequest[] = []; + private maxBatchSize: number = 1_000; + private maxWaitTimeInSeconds: number = 5_000; // 5 seconds + private timeoutHandle: NodeJS.Timeout | null = null; + private isActive: boolean = true; + + static getInstance(): ElasticSearchBatchProcessor { + if (!ElasticSearchBatchProcessor.instance) { + ElasticSearchBatchProcessor.instance = new ElasticSearchBatchProcessor(); + } + + return ElasticSearchBatchProcessor.instance; + } + + async addToQueue(request: ElasticSearchRequest) { + if (!this.isActive) { + logger.warn('Elastic Search Batch Processor received a message after being closed'); + return; + } + + this.queue.push(request); + + // If we've reached batch size, process immediately + if (this.queue.length >= this.maxBatchSize) { + await this.processBatch(); + return; + } + + // If no timeout is set, create one + if (!this.timeoutHandle) { + this.timeoutHandle = setTimeout(async () => { + await this.processBatch(); + }, this.maxWaitTimeInSeconds); + } + } + + async processBatch() { + // Clear the timeout + if (this.timeoutHandle) { + clearTimeout(this.timeoutHandle); + this.timeoutHandle = null; + } + + // Skip if no messages + if (this.queue.length === 0) { + return; + } + + try { + // TODO Add pagination: only process a subset of the queue + + // Prepare bulk indexing body + // TODO: Perform bulk indexing + const operations = this.queue.flatMap(message => []); + const bulkResponse = await this.client.bulk({ operations }); + + // Handle any indexing errors + // if (bulkResponse.errors) { + // console.error( + // 'Bulk indexing errors:', + // bulkResponse.items.filter(item => item.index.status >= 400), + // ); + // } + + // Clear the queue after processing + this.queue = []; + } catch (error) { + console.error('Batch processing failed', error); + // TODO: Optionally implement retry or dead-letter queue logic + } + } + + async flushAndClose() { + this.isActive = false; + await this.processBatch(); + } + + // ---- Private methods ---- + private constructor() { + this.client = getElasticSearchClient({ throwIfUnavailable: true }); + } + + private async convertRequestsToBulkIndexingBody(requests: ElasticSearchRequest[]): Promise { + const body: BulkOperationContainer[] = []; + const preparedRequests = this.preprocessRequests(requests); + + for (const [tableOrSpecialAction, requests] of Object.entries(preparedRequests)) { + if (tableOrSpecialAction === 'FULL_ACCOUNT_RE_INDEX') { + // TODO: Handle FULL_ACCOUNT_RE_INDEX requests + } else { + const adapter = getAdapterFromTableName(tableOrSpecialAction); + if (!adapter) { + logger.error(`No ElasticSearch adapter found for table ${tableOrSpecialAction}`); + continue; + } + + // Preload all updated entries + const updateRequests = requests.filter(request => request.type === ElasticSearchRequestType.UPDATE); + const updateRequestsIds = updateRequests.map(request => request.payload.id); + const entriesToIndex = await adapter.findEntriesToIndex({ ids: updateRequestsIds }); + const groupedEntriesToIndex = groupBy(entriesToIndex, entry => entry['id']); + + // Iterate over requests and create bulk indexing operations + for (const request of requests) { + if (request.type === ElasticSearchRequestType.UPDATE) { + const entry = groupedEntriesToIndex[request.payload.id]; + if (entry) { + body.push( + { index: { _index: adapter.index, _id: request.payload.id.toString() } }, + adapter.mapModelInstanceToDocument(entry), + ); + } else { + body.push({ delete: { _index: adapter.index, _id: request.payload.id.toString() } }); + } + } else if (request.type === ElasticSearchRequestType.DELETE) { + body.push({ delete: { _index: adapter.index, _id: request.payload.id.toString() } }); + } else if (request.type === ElasticSearchRequestType.TRUNCATE) { + // TODO + } + } + } + } + + return body; + } + + /** + * Deduplicates requests, returning only the latest request for each entity, unless it's a + * FULL_ACCOUNT_RE_INDEX request - which always takes maximum priority - then groups them by table. + */ + private preprocessRequests(requests: ElasticSearchRequest[]): Record { + const deduplicatedRequests: Record = {}; + + for (const request of requests) { + if ( + isFullAccountReIndexRequest(request) || + !isFullAccountReIndexRequest(deduplicatedRequests[request.payload.id]) + ) { + deduplicatedRequests[request.payload.id] = request; + } + } + + return groupBy(Object.values(deduplicatedRequests), request => + isFullAccountReIndexRequest(request) ? 'FULL_ACCOUNT_RE_INDEX' : request.table, + ); + } +} diff --git a/server/lib/elastic-search/sync-postgres.ts b/server/lib/elastic-search/sync-postgres.ts new file mode 100644 index 00000000000..c697cee6be7 --- /dev/null +++ b/server/lib/elastic-search/sync-postgres.ts @@ -0,0 +1,119 @@ +import createSubscriber from 'pg-listen'; + +import { getDBUrl } from '../db'; +import logger from '../logger'; +import { HandlerType, reportErrorToSentry } from '../sentry'; +import sequelize from '../sequelize'; + +import { ElasticSearchModelsAdapters } from './adapters'; +import { ElasticSearchBatchProcessor } from './batch-processor'; +import { ElasticSearchRequestType } from './types'; + +const CHANNEL_NAME = 'elasticsearch-requests'; + +const setupPostgresTriggers = async () => { + await sequelize.query(` + -- Create a trigger function to send notifications on table changes + CREATE OR REPLACE FUNCTION notify_elasticsearch_on_change() + RETURNS TRIGGER AS $$ + DECLARE + notification JSON; + BEGIN + -- Determine the type of operation + IF (TG_OP = 'INSERT') THEN + notification = json_build_object('type', 'UPDATE', 'table', TG_TABLE_NAME, 'payload', json_build_object('id', NEW.id)); + ELSIF (TG_OP = 'UPDATE') THEN + IF (OLD."deletedAt" IS NULL AND NEW."deletedAt" IS NOT NULL) THEN + notification = json_build_object('type', 'DELETE', 'table', TG_TABLE_NAME, 'payload', json_build_object('id', NEW.id)); + ELSE + notification = json_build_object('type', 'UPDATE', 'table', TG_TABLE_NAME, 'payload', json_build_object('id', NEW.id)); + END IF; + ELSIF (TG_OP = 'DELETE') THEN + notification = json_build_object('type', 'DELETE', 'table', TG_TABLE_NAME, 'payload', json_build_object('id', OLD.id)); + ELSIF (TG_OP = 'TRUNCATE') THEN + notification = json_build_object('type', 'TRUNCATE', 'table', TG_TABLE_NAME, 'payload', json_build_object()); + END IF; + + -- Publish the notification to the Elastic Search requests channel + PERFORM pg_notify('${CHANNEL_NAME}', notification::text); + + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + + ${Object.values(ElasticSearchModelsAdapters) + .map( + adapter => ` + -- Create the trigger for INSERT operations + CREATE OR REPLACE TRIGGER ${adapter.model.tableName}_insert_trigger + AFTER INSERT ON "${adapter.model.tableName}" + FOR EACH ROW + EXECUTE FUNCTION notify_elasticsearch_on_change(); + + -- Create the trigger for UPDATE operations + CREATE OR REPLACE TRIGGER ${adapter.model.tableName}_update_trigger + AFTER UPDATE ON "${adapter.model.tableName}" + FOR EACH ROW + EXECUTE FUNCTION notify_elasticsearch_on_change(); + + -- Create the trigger for DELETE operations + CREATE OR REPLACE TRIGGER ${adapter.model.tableName}_delete_trigger + AFTER DELETE ON "${adapter.model.tableName}" + FOR EACH ROW + EXECUTE FUNCTION notify_elasticsearch_on_change(); + + -- Create the trigger for TRUNCATE operations + CREATE OR REPLACE TRIGGER ${adapter.model.tableName}_truncate_trigger + AFTER TRUNCATE ON "${adapter.model.tableName}" + FOR EACH STATEMENT + EXECUTE FUNCTION notify_elasticsearch_on_change(); + `, + ) + .join('\n')} + `); +}; + +export const startElasticSearchPostgresSync = async () => { + const elasticSearchBatchProcessor = ElasticSearchBatchProcessor.getInstance(); + const subscriber = createSubscriber({ connectionString: getDBUrl('database') }); + + subscriber.notifications.on(CHANNEL_NAME, async event => { + console.log(event); + try { + // TODO: Check message format + await elasticSearchBatchProcessor.addToQueue(event); + // await handleElasticSearchRequest(event); + } catch (error) { + // TODO: maybe error handling in the batch processor? + reportErrorToSentry(error, { handler: HandlerType.ELASTICSEARCH_SYNC_JOB }); + } + }); + + subscriber.events.on('error', error => { + reportErrorToSentry(error, { handler: HandlerType.ELASTICSEARCH_SYNC_JOB }); + }); + + process.on('exit', async () => { + await elasticSearchBatchProcessor.flushAndClose(); + subscriber.close(); + }); + + await subscriber.connect(); + await subscriber.listenTo(CHANNEL_NAME); + + await setupPostgresTriggers(); + + logger.info('ElasticSearch <-> Postgres sync job started'); +}; + +/** + * Re-indexes all entries across all indexes related to this `collectiveId`, either through `CollectiveId`, + * `HostCollectiveId`, `FromCollectiveId`...etc. + */ +export const elasticSearchFullAccountReIndex = async (collectiveId: number): Promise => { + const elasticSearchBatchProcessor = ElasticSearchBatchProcessor.getInstance(); + await elasticSearchBatchProcessor.addToQueue({ + type: ElasticSearchRequestType.FULL_ACCOUNT_RE_INDEX, + payload: { id: collectiveId }, + }); +}; diff --git a/server/lib/elastic-search/types.ts b/server/lib/elastic-search/types.ts new file mode 100644 index 00000000000..36ffc5cef43 --- /dev/null +++ b/server/lib/elastic-search/types.ts @@ -0,0 +1,33 @@ +export enum ElasticSearchRequestType { + FULL_ACCOUNT_RE_INDEX = 'FULL_ACCOUNT_RE_INDEX', + UPDATE = 'UPDATE', + DELETE = 'DELETE', + TRUNCATE = 'TRUNCATE', +} + +export type ElasticSearchRequestBase = { + type: ElasticSearchRequestType; +}; + +export type ElasticSearchRequest = ElasticSearchRequestBase & + ( + | { + type: ElasticSearchRequestType.FULL_ACCOUNT_RE_INDEX; + payload: { id: number }; + } + | { + type: ElasticSearchRequestType.UPDATE | ElasticSearchRequestType.DELETE; + table: string; + payload: { id: number }; + } + | { + type: ElasticSearchRequestType.TRUNCATE; + table: string; + payload: Record; + } + ); + +export const isFullAccountReIndexRequest = ( + request: ElasticSearchRequest, +): request is ElasticSearchRequest & { type: ElasticSearchRequestType.FULL_ACCOUNT_RE_INDEX } => + request?.type === ElasticSearchRequestType.FULL_ACCOUNT_RE_INDEX; diff --git a/server/lib/sentry.ts b/server/lib/sentry.ts index d05df8e3b34..e577a94e4ae 100644 --- a/server/lib/sentry.ts +++ b/server/lib/sentry.ts @@ -96,6 +96,7 @@ export enum HandlerType { CRON = 'CRON', FALLBACK = 'FALLBACK', WEBHOOK = 'WEBHOOK', + ELASTICSEARCH_SYNC_JOB = 'ELASTICSEARCH_SYNC_JOB', } export type CaptureErrorParams = { diff --git a/server/models/Collective.ts b/server/models/Collective.ts index 1ae09bb23ae..1042ebe713b 100644 --- a/server/models/Collective.ts +++ b/server/models/Collective.ts @@ -85,6 +85,7 @@ import { } from '../lib/collectivelib'; import { invalidateContributorsCache } from '../lib/contributors'; import { getFxRate } from '../lib/currency'; +import { elasticSearchFullAccountReIndex } from '../lib/elastic-search/sync-postgres'; import emailLib from '../lib/email'; import { formatAddress } from '../lib/format-address'; import { getGithubHandleFromUrl, getGithubUrlFromHandle } from '../lib/github'; @@ -2609,12 +2610,18 @@ class Collective extends Model< } } - return this.addHost(newHostCollective, remoteUser, { + await this.addHost(newHostCollective, remoteUser, { message, applicationData, shouldAutomaticallyApprove, }); } + + // Update caches + purgeCacheForCollective(this.slug); + elasticSearchFullAccountReIndex(this.id); + + return this; }; // edit the list of members and admins of this collective (create/update/remove)