diff --git a/config/custom-environment-variables.json b/config/custom-environment-variables.json index a76feebc231..dc4c1b4e018 100644 --- a/config/custom-environment-variables.json +++ b/config/custom-environment-variables.json @@ -1,11 +1,17 @@ { "port": "PORT", "env": "OC_ENV", + "services": { + "server": "ENABLE_SERVICE_SERVER", + "searchSync": "ENABLE_SERVICE_SEARCH_SYNC" + }, "mailpit": { "client": "MAILPIT_CLIENT" }, "elasticSearch": { - "url": "ELASTICSEARCH_URL" + "url": "ELASTICSEARCH_URL", + "maxSyncDelay": "ELASTICSEARCH_MAX_SYNC_DELAY", + "indexesPrefix": "ELASTICSEARCH_INDEXES_PREFIX" }, "database": { "url": "PG_URL", diff --git a/config/default.json b/config/default.json index 2e720647bd2..0781749d44e 100644 --- a/config/default.json +++ b/config/default.json @@ -1,5 +1,9 @@ { "port": "3060", + "services": { + "server": true, + "searchSync": false + }, "mailpit": { "client": false }, @@ -16,6 +20,9 @@ }, "readOnly": false }, + "elasticSearch": { + "maxSyncDelay": 5000 + }, "maintenancedb": { "url": "postgres://127.0.0.1:5432/postgres" }, diff --git a/config/e2e.json b/config/e2e.json index e3e4c31f317..f6870e5425d 100644 --- a/config/e2e.json +++ b/config/e2e.json @@ -2,6 +2,9 @@ "database": { "url": "postgres://opencollective@127.0.0.1:5432/opencollective_e2e" }, + "services": { + "searchSync": false + }, "mailpit": { "client": true }, diff --git a/config/staging.json b/config/staging.json index dfc78b7c89c..a1f20cab650 100644 --- a/config/staging.json +++ b/config/staging.json @@ -14,6 +14,9 @@ } } }, + "elasticSearch": { + "indexesPrefix": "staging" + }, "host": { "api": "https://api-staging.opencollective.com", "frontend": "https://frontend-staging.opencollective.com", diff --git a/config/test.json b/config/test.json index 60ec13e2174..1d7d8933c78 100644 --- a/config/test.json +++ b/config/test.json @@ -1,10 +1,15 @@ { "port": "3061", + "services": { + "server": true, + "searchSync": false + }, "database": { "url": "postgres://opencollective@127.0.0.1:5432/opencollective_test" }, "elasticSearch": { - "url": "http://localhost:9200" + "url": "http://localhost:9200", + "indexesPrefix": "test" }, "fees": { "default": { diff --git a/package-lock.json b/package-lock.json index 0b5245fb27b..d741a668dc8 100644 --- a/package-lock.json +++ b/package-lock.json @@ -95,6 +95,7 @@ "pg": "8.13.1", "pg-connection-string": "2.7.0", "pg-format": "1.0.4", + "pg-listen": "1.7.0", "plaid": "29.0.0", "prepend-http": "3.0.1", "redis": "4.6.6", @@ -23187,6 +23188,20 @@ "node": ">=4.0.0" } }, + "node_modules/pg-listen": { + "version": "1.7.0", + "resolved": "https://registry.npmjs.org/pg-listen/-/pg-listen-1.7.0.tgz", + "integrity": "sha512-MKDwKLm4ryhy7iq1yw1K1MvUzBdTkaT16HZToddX9QaT8XSdt3Kins5mYH6DLECGFzFWG09VdXvWOIYogjXrsg==", + "license": "MIT", + "dependencies": { + "debug": "^4.1.1", + "pg-format": "^1.0.4", + "typed-emitter": "^0.1.0" + }, + "peerDependencies": { + "pg": "7.x || 8.x" + } + }, "node_modules/pg-pool": { "version": "3.7.0", "resolved": "https://registry.npmjs.org/pg-pool/-/pg-pool-3.7.0.tgz", @@ -26227,6 +26242,12 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/typed-emitter": { + "version": "0.1.0", + "resolved": "https://registry.npmjs.org/typed-emitter/-/typed-emitter-0.1.0.tgz", + "integrity": "sha512-Tfay0l6gJMP5rkil8CzGbLthukn+9BN/VXWcABVFPjOoelJ+koW8BuPZYk+h/L+lEeIp1fSzVRiWRPIjKVjPdg==", + "license": "MIT" + }, "node_modules/typedarray": { "version": "0.0.6", "resolved": "https://registry.npmjs.org/typedarray/-/typedarray-0.0.6.tgz", diff --git a/package.json b/package.json index 5b0ce930f3a..593cf5f90fe 100644 --- a/package.json +++ b/package.json @@ -116,6 +116,7 @@ "pg": "8.13.1", "pg-connection-string": "2.7.0", "pg-format": "1.0.4", + "pg-listen": "1.7.0", "plaid": "29.0.0", "prepend-http": "3.0.1", "redis": "4.6.6", diff --git a/scripts/search.ts b/scripts/search.ts index ae8a0719676..88335ba1938 100644 --- a/scripts/search.ts +++ b/scripts/search.ts @@ -1,9 +1,11 @@ import '../server/env'; import { Command } from 'commander'; +import config from 'config'; import { partition, uniq } from 'lodash'; import { getElasticSearchClient } from '../server/lib/elastic-search/client'; +import { formatIndexNameForElasticSearch } from '../server/lib/elastic-search/common'; import { ElasticSearchIndexName } from '../server/lib/elastic-search/constants'; import { elasticSearchGlobalSearch } from '../server/lib/elastic-search/search'; import { @@ -77,7 +79,8 @@ program const indexes = parseIndexesFromInput(indexesInput); logger.info('Creating all indices...'); for (const indexName of indexes) { - logger.info(`Creating index ${indexName}`); + const realIndexName = formatIndexNameForElasticSearch(indexName); + logger.info(`Creating index ${indexName}${realIndexName !== indexName ? ` (${realIndexName})` : ''}`); await createElasticSearchIndex(indexName); } logger.info('Create completed!'); @@ -98,9 +101,10 @@ program ) { logger.info('Syncing all models...'); for (const indexName of indexes) { - logger.info(`Dropping index ${indexName}`); + const realIndexName = formatIndexNameForElasticSearch(indexName); + logger.info(`Dropping index ${indexName}${realIndexName !== indexName ? ` (${realIndexName})` : ''}`); await deleteElasticSearchIndex(indexName, { throwIfMissing: false }); - logger.info(`Re-creating index ${indexName}`); + logger.info(`Re-creating index ${indexName}${realIndexName !== indexName ? ` (${realIndexName})` : ''}`); await createElasticSearchIndex(indexName); await syncElasticSearchIndex(indexName, { log: true }); } @@ -192,17 +196,35 @@ program .command('stats') .description('Show information about the ElasticSearch indices') .argument('[indexes...]', 'Only show information about specific indexes') - .action(async indexesInput => { + .option('--all', 'Show information about all indexes, even those not matching the prefix') + .action(async (indexesInput, options) => { checkElasticSearchAvailable(); - const indexes = parseIndexesFromInput(indexesInput); + const indexesFromArgs = parseIndexesFromInput(indexesInput, null); const client = getElasticSearchClient(); - const availableIndexes = await getAvailableElasticSearchIndexes(); - const [availableIndexesToQuery, unknownIndexes] = partition(indexes, index => availableIndexes.includes(index)); - if (unknownIndexes.length) { - logger.warn(`Unknown indexes: ${unknownIndexes.join(', ')}`); + let availableIndexes = await getAvailableElasticSearchIndexes(); + + // Only get the indexes specified in args + if (indexesFromArgs) { + const partitionedIndexes = partition(indexesFromArgs, index => availableIndexes.includes(index)); + availableIndexes = partitionedIndexes[0]; + if (partitionedIndexes[1].length) { + logger.warn(`Unknown indexes: ${partitionedIndexes[1].join(', ')}`); + } + } + + // Filter out indexes that don't match the prefix + if (!options.all) { + const prefix = config.elasticSearch.indexesPrefix; + if (prefix) { + availableIndexes = availableIndexes.filter(index => index.startsWith(prefix)); + } else { + availableIndexes = availableIndexes.filter(index => + Object.values(ElasticSearchIndexName).includes(index as ElasticSearchIndexName), + ); + } } - const result = await client.indices.stats({ index: availableIndexesToQuery.join(',') }); + const result = await client.indices.stats({ index: availableIndexes.join(',') }); let nbDocs = 0; let totalSize = 0; const formatSize = (bytes: number) => `${(bytes / 1024 / 1024).toFixed(2)} MB`; diff --git a/server/graphql/loaders/search.ts b/server/graphql/loaders/search.ts index 8b2363dd328..38bdd04094c 100644 --- a/server/graphql/loaders/search.ts +++ b/server/graphql/loaders/search.ts @@ -4,6 +4,7 @@ import { AggregationsMultiBucketAggregateBase, SearchResponse } from '@elastic/e import DataLoader from 'dataloader'; import { groupBy, pick } from 'lodash'; +import { formatIndexNameForElasticSearch } from '../../lib/elastic-search/common'; import { ElasticSearchIndexName, ElasticSearchIndexParams } from '../../lib/elastic-search/constants'; import { elasticSearchGlobalSearch, ElasticSearchIndexRequest } from '../../lib/elastic-search/search'; import { reportMessageToSentry } from '../../lib/sentry'; @@ -12,7 +13,7 @@ import { Collective } from '../../models'; type SearchParams = { requestId: string; searchTerm: string; - index: string; + index: ElasticSearchIndexName; indexParams: ElasticSearchIndexParams[ElasticSearchIndexName]; limit: number; adminOfAccountIds: number[]; @@ -20,7 +21,7 @@ type SearchParams = { host: Collective; }; -export type SearchResultBucket = { +type SearchResultBucket = { key: string; doc_count: number; top_hits_by_index: { @@ -41,6 +42,18 @@ export type SearchResultBucket = { }; }; +export type SearchResult = { + count: number; + maxScore: number; + hits: Array<{ + indexName: ElasticSearchIndexName; + score: number; + id: string; + source: Record; + highlight: Record; + }>; +}; + const getSearchIndexes = (requests: SearchParams[]): ElasticSearchIndexRequest[] => { const results: Partial> = {}; for (const request of requests) { @@ -56,9 +69,10 @@ const getSearchIndexes = (requests: SearchParams[]): ElasticSearchIndexRequest[] * A loader to batch search requests on multiple indexes into a single ElasticSearch query. */ export const generateSearchLoaders = req => { - return new DataLoader(async (entries: SearchParams[]) => { - const groupedRequests = groupBy(entries, 'requestId'); + return new DataLoader(async (requests: SearchParams[]) => { + const groupedRequests = groupBy(requests, 'requestId'); const requestsResults = new Map(); + const failures = []; // All grouped requests must have the same searchTerm assert( @@ -79,29 +93,40 @@ export const generateSearchLoaders = req => { }); if (results) { + requestsResults.set(requestId, results); if (results._shards?.failures) { - reportMessageToSentry('ElasticSearch search shard failures', { - extra: { - failures: results._shards.failures, - request: firstRequest, - indexes, - }, - }); + failures.push({ request: firstRequest, indexes, items: results._shards.failures }); } - - requestsResults.set(requestId, results); } } - return entries.map(entry => { - const results = requestsResults.get(entry.requestId); + if (failures.length > 0) { + reportMessageToSentry('ElasticSearch shard failures', { extra: { failures } }); + } + + return requests.map(request => { + const results = requestsResults.get(request.requestId); const resultsAggregationsByIndex = results?.aggregations?.by_index as AggregationsMultiBucketAggregateBase; const buckets = resultsAggregationsByIndex?.buckets as Array; if (!buckets) { return null; } - return buckets.find(bucket => bucket.key === entry.index); + const expectedBucket = formatIndexNameForElasticSearch(request.index); + const bucket = buckets.find(bucket => bucket.key === expectedBucket); + if (bucket) { + return { + count: bucket.doc_count, + maxScore: bucket.top_hits_by_index.hits.max_score || 0, + hits: bucket.top_hits_by_index.hits.hits.map(hit => ({ + indexName: request.index, + score: hit._score, + id: hit._id, + source: hit._source, + highlight: hit.highlight, + })), + }; + } }); }); }; diff --git a/server/index.js b/server/index.js index 0277b08d228..b64ce007916 100644 --- a/server/index.js +++ b/server/index.js @@ -7,14 +7,17 @@ import config from 'config'; import express from 'express'; import throng from 'throng'; +import { isElasticSearchConfigured } from './lib/elastic-search/client'; +import { startElasticSearchPostgresSync, stopElasticSearchPostgresSync } from './lib/elastic-search/sync-postgres'; import expressLib from './lib/express'; import logger from './lib/logger'; import { updateCachedFidoMetadata } from './lib/two-factor-authentication/fido-metadata'; +import { parseToBoolean } from './lib/utils'; import routes from './routes'; const workers = process.env.WEB_CONCURRENCY || 1; -async function start(i) { +async function startExpressServer(workerId) { const expressApp = express(); await updateCachedFidoMetadata(); @@ -35,7 +38,7 @@ async function start(i) { host, server.address().port, config.env, - i, + workerId, ); }); @@ -45,15 +48,54 @@ async function start(i) { return expressApp; } -let app; +// Start the express server +let appPromise; +if (parseToBoolean(config.services.server)) { + if (['production', 'staging'].includes(config.env) && workers > 1) { + throng({ worker: startExpressServer, count: workers }); // TODO: Thong is not compatible with the shutdown logic below + } else { + appPromise = startExpressServer(1); + } +} + +// Start the search sync job +if (parseToBoolean(config.services.searchSync)) { + if (!isElasticSearchConfigured()) { + logger.warn('ElasticSearch is not configured. Skipping sync job.'); + } else { + startElasticSearchPostgresSync(); + } + + // Add a handler to make sure we flush the Elastic Search sync queue before shutting down + let isShuttingDown = false; + const gracefullyShutdown = async signal => { + if (!isShuttingDown) { + logger.info(`Received ${signal}. Shutting down.`); + isShuttingDown = true; + + if (appPromise) { + await appPromise.then(app => { + if (app.__server__) { + logger.info('Closing express server'); + app.__server__.close(); + } + }); + } + + if (parseToBoolean(config.services.searchSync) && isElasticSearchConfigured()) { + await stopElasticSearchPostgresSync(); + } + + process.exit(); + } + }; -if (['production', 'staging'].includes(config.env) && workers > 1) { - throng({ worker: start, count: workers }); -} else { - app = start(1); + process.on('exit', () => gracefullyShutdown('exit')); + process.on('SIGINT', () => gracefullyShutdown('SIGINT')); + process.on('SIGTERM', () => gracefullyShutdown('SIGTERM')); } // This is used by tests -export default async function () { - return app ? app : start(1); +export default async function startServerForTest() { + return appPromise ?? startExpressServer(1); } diff --git a/server/lib/db.js b/server/lib/db.js index 4905ecf7cbd..b0569e29e18 100644 --- a/server/lib/db.js +++ b/server/lib/db.js @@ -13,6 +13,7 @@ import { get, has } from 'lodash'; import pg from 'pg'; import pgConnectionString from 'pg-connection-string'; import format from 'pg-format'; +import createSubscriber from 'pg-listen'; /** Load a dump file into the current database. * @@ -160,3 +161,7 @@ export async function recreateDatabase(destroy = true) { const clientApp = await getConnectedClient(getDBUrl('database')); return [client, clientApp]; } + +export const createPostgresListener = () => { + return createSubscriber({ connectionString: getDBUrl('database') }); +}; diff --git a/server/lib/elastic-search/adapters/ElasticSearchCollectivesAdapter.ts b/server/lib/elastic-search/adapters/ElasticSearchCollectivesAdapter.ts index ae4e2f4ce98..4a992f1eea8 100644 --- a/server/lib/elastic-search/adapters/ElasticSearchCollectivesAdapter.ts +++ b/server/lib/elastic-search/adapters/ElasticSearchCollectivesAdapter.ts @@ -2,10 +2,9 @@ import models, { Op } from '../../../models'; import { stripHTMLOrEmpty } from '../../sanitize-html'; import { ElasticSearchIndexName } from '../constants'; -import { ElasticSearchModelAdapter } from './ElasticSearchModelAdapter'; +import { ElasticSearchModelAdapter, FindEntriesToIndexOptions } from './ElasticSearchModelAdapter'; export class ElasticSearchCollectivesAdapter implements ElasticSearchModelAdapter { - public readonly model = models.Collective; public readonly index = ElasticSearchIndexName.COLLECTIVES; public readonly mappings = { properties: { @@ -30,15 +29,11 @@ export class ElasticSearchCollectivesAdapter implements ElasticSearchModelAdapte }, } as const; - public async findEntriesToIndex( - options: { - offset?: number; - limit?: number; - fromDate?: Date; - maxId?: number; - ids?: number[]; - } = {}, - ): Promise>> { + public getModel() { + return models.Collective; + } + + public async findEntriesToIndex(options: FindEntriesToIndexOptions = {}) { return models.Collective.findAll({ attributes: Object.keys(this.mappings.properties), order: [['id', 'DESC']], @@ -48,7 +43,16 @@ export class ElasticSearchCollectivesAdapter implements ElasticSearchModelAdapte where: { ...(options.fromDate ? { updatedAt: options.fromDate } : null), ...(options.maxId ? { id: { [Op.lte]: options.maxId } } : null), - ...(options.ids?.length ? { id: { [Op.in]: options.ids } } : null), + ...(options.ids?.length ? { id: options.ids } : null), + ...(options.relatedToCollectiveIds?.length + ? { + [Op.or]: [ + { id: options.relatedToCollectiveIds }, + { HostCollectiveId: options.relatedToCollectiveIds }, + { ParentCollectiveId: options.relatedToCollectiveIds }, + ], + } + : null), }, }); } @@ -72,7 +76,7 @@ export class ElasticSearchCollectivesAdapter implements ElasticSearchModelAdapte isActive: instance.isActive, isHostAccount: instance.isHostAccount, deactivatedAt: instance.deactivatedAt, - HostCollectiveId: instance.HostCollectiveId, + HostCollectiveId: !instance.isActive ? null : instance.HostCollectiveId, ParentCollectiveId: instance.ParentCollectiveId, }; } diff --git a/server/lib/elastic-search/adapters/ElasticSearchCommentsAdapter.ts b/server/lib/elastic-search/adapters/ElasticSearchCommentsAdapter.ts index 7848270d7d0..2caed48ccd5 100644 --- a/server/lib/elastic-search/adapters/ElasticSearchCommentsAdapter.ts +++ b/server/lib/elastic-search/adapters/ElasticSearchCommentsAdapter.ts @@ -7,10 +7,9 @@ import { CommentType } from '../../../models/Comment'; import { stripHTMLOrEmpty } from '../../sanitize-html'; import { ElasticSearchIndexName } from '../constants'; -import { ElasticSearchModelAdapter } from './ElasticSearchModelAdapter'; +import { ElasticSearchModelAdapter, FindEntriesToIndexOptions } from './ElasticSearchModelAdapter'; export class ElasticSearchCommentsAdapter implements ElasticSearchModelAdapter { - public readonly model = models.Comment; public readonly index = ElasticSearchIndexName.COMMENTS; public readonly mappings = { properties: { @@ -29,15 +28,11 @@ export class ElasticSearchCommentsAdapter implements ElasticSearchModelAdapter { }, } as const; - public findEntriesToIndex( - options: { - offset?: number; - limit?: number; - fromDate?: Date; - maxId?: number; - ids?: number[]; - } = {}, - ) { + public getModel() { + return models.Comment; + } + + public findEntriesToIndex(options: FindEntriesToIndexOptions = {}) { return models.Comment.findAll({ attributes: omit(Object.keys(this.mappings.properties), ['HostCollectiveId', 'ParentCollectiveId']), order: [['id', 'DESC']], @@ -46,13 +41,21 @@ export class ElasticSearchCommentsAdapter implements ElasticSearchModelAdapter { where: { ...(options.fromDate ? { updatedAt: options.fromDate } : null), ...(options.maxId ? { id: { [Op.lte]: options.maxId } } : null), - ...(options.ids?.length ? { id: { [Op.in]: options.ids } } : null), + ...(options.ids?.length ? { id: options.ids } : null), + ...(options.relatedToCollectiveIds?.length + ? { + [Op.or]: [ + { CollectiveId: options.relatedToCollectiveIds }, + { FromCollectiveId: options.relatedToCollectiveIds }, + ], + } + : null), }, include: [ { association: 'collective', required: true, - attributes: ['HostCollectiveId', 'ParentCollectiveId'], + attributes: ['isActive', 'HostCollectiveId', 'ParentCollectiveId'], }, { association: 'expense', @@ -84,7 +87,7 @@ export class ElasticSearchCommentsAdapter implements ElasticSearchModelAdapter { HostCollectiveId: instance.expense?.HostCollectiveId ?? instance.hostApplication?.HostCollectiveId ?? - instance.collective.HostCollectiveId, + (!instance.collective.isActive ? null : instance.collective.HostCollectiveId), }; } diff --git a/server/lib/elastic-search/adapters/ElasticSearchExpensesAdapter.ts b/server/lib/elastic-search/adapters/ElasticSearchExpensesAdapter.ts index da46b533ebf..bc94a7017c3 100644 --- a/server/lib/elastic-search/adapters/ElasticSearchExpensesAdapter.ts +++ b/server/lib/elastic-search/adapters/ElasticSearchExpensesAdapter.ts @@ -5,10 +5,9 @@ import models from '../../../models'; import { stripHTMLOrEmpty } from '../../sanitize-html'; import { ElasticSearchIndexName } from '../constants'; -import { ElasticSearchModelAdapter } from './ElasticSearchModelAdapter'; +import { ElasticSearchModelAdapter, FindEntriesToIndexOptions } from './ElasticSearchModelAdapter'; export class ElasticSearchExpensesAdapter implements ElasticSearchModelAdapter { - public readonly model = models.Expense; public readonly index = ElasticSearchIndexName.EXPENSES; public readonly mappings = { properties: { @@ -34,15 +33,11 @@ export class ElasticSearchExpensesAdapter implements ElasticSearchModelAdapter { }, } as const; - public findEntriesToIndex( - options: { - offset?: number; - limit?: number; - fromDate?: Date; - maxId?: number; - ids?: number[]; - } = {}, - ) { + public getModel() { + return models.Expense; + } + + public findEntriesToIndex(options: FindEntriesToIndexOptions = {}) { return models.Expense.findAll({ attributes: omit(Object.keys(this.mappings.properties), ['ParentCollectiveId', 'items']), order: [['id', 'DESC']], @@ -51,13 +46,27 @@ export class ElasticSearchExpensesAdapter implements ElasticSearchModelAdapter { where: { ...(options.fromDate ? { updatedAt: options.fromDate } : null), ...(options.maxId ? { id: { [Op.lte]: options.maxId } } : null), - ...(options.ids?.length ? { id: { [Op.in]: options.ids } } : null), + ...(options.ids?.length ? { id: options.ids } : null), + ...(options.relatedToCollectiveIds?.length + ? { + [Op.or]: [ + { CollectiveId: options.relatedToCollectiveIds }, + { FromCollectiveId: options.relatedToCollectiveIds }, + { HostCollectiveId: options.relatedToCollectiveIds }, + ], + } + : null), }, include: [ { association: 'collective', required: true, - attributes: ['HostCollectiveId', 'ParentCollectiveId'], + attributes: ['isActive', 'HostCollectiveId', 'ParentCollectiveId'], + }, + { + association: 'items', + required: true, + attributes: ['description'], }, { association: 'items', @@ -87,8 +96,9 @@ export class ElasticSearchExpensesAdapter implements ElasticSearchModelAdapter { ParentCollectiveId: instance.collective.ParentCollectiveId, FromCollectiveId: instance.FromCollectiveId, UserId: instance.UserId, - HostCollectiveId: instance.HostCollectiveId || instance.collective.HostCollectiveId, - items: instance.items.map(item => item.description).join(' '), + items: instance.items.map(item => item.description).join(', '), + HostCollectiveId: + instance.HostCollectiveId || (!instance.collective.isActive ? null : instance.collective.HostCollectiveId), }; } diff --git a/server/lib/elastic-search/adapters/ElasticSearchHostApplicationsAdapter.ts b/server/lib/elastic-search/adapters/ElasticSearchHostApplicationsAdapter.ts index a6430de094d..e400a062da4 100644 --- a/server/lib/elastic-search/adapters/ElasticSearchHostApplicationsAdapter.ts +++ b/server/lib/elastic-search/adapters/ElasticSearchHostApplicationsAdapter.ts @@ -4,10 +4,9 @@ import { Op } from 'sequelize'; import models from '../../../models'; import { ElasticSearchIndexName } from '../constants'; -import { ElasticSearchModelAdapter } from './ElasticSearchModelAdapter'; +import { ElasticSearchModelAdapter, FindEntriesToIndexOptions } from './ElasticSearchModelAdapter'; export class ElasticSearchHostApplicationsAdapter implements ElasticSearchModelAdapter { - public readonly model = models.HostApplication; public readonly index = ElasticSearchIndexName.HOST_APPLICATIONS; public readonly mappings = { properties: { @@ -24,15 +23,11 @@ export class ElasticSearchHostApplicationsAdapter implements ElasticSearchModelA }, } as const; - public findEntriesToIndex( - options: { - offset?: number; - limit?: number; - fromDate?: Date; - maxId?: number; - ids?: number[]; - } = {}, - ) { + public getModel() { + return models.HostApplication; + } + + public findEntriesToIndex(options: FindEntriesToIndexOptions = {}) { return models.HostApplication.findAll({ attributes: omit(Object.keys(this.mappings.properties), ['ParentCollectiveId']), order: [['id', 'DESC']], @@ -41,7 +36,15 @@ export class ElasticSearchHostApplicationsAdapter implements ElasticSearchModelA where: { ...(options.fromDate ? { updatedAt: options.fromDate } : null), ...(options.maxId ? { id: { [Op.lte]: options.maxId } } : null), - ...(options.ids?.length ? { id: { [Op.in]: options.ids } } : null), + ...(options.ids?.length ? { id: options.ids } : null), + ...(options.relatedToCollectiveIds?.length + ? { + [Op.or]: [ + { CollectiveId: options.relatedToCollectiveIds }, + { HostCollectiveId: options.relatedToCollectiveIds }, + ], + } + : null), }, include: [ { diff --git a/server/lib/elastic-search/adapters/ElasticSearchModelAdapter.ts b/server/lib/elastic-search/adapters/ElasticSearchModelAdapter.ts index 8f27874de5d..fa5f672d774 100644 --- a/server/lib/elastic-search/adapters/ElasticSearchModelAdapter.ts +++ b/server/lib/elastic-search/adapters/ElasticSearchModelAdapter.ts @@ -12,20 +12,24 @@ type ElasticSearchModelPermissions = { fields?: Record; }; +export type FindEntriesToIndexOptions = { + offset?: number; + limit?: number; + fromDate?: Date; + maxId?: number; + ids?: number[]; + relatedToCollectiveIds?: number[]; +}; + export interface ElasticSearchModelAdapter { - readonly model: ModelStatic; readonly index: ElasticSearchIndexName; readonly mappings: MappingTypeMapping; readonly settings?: IndicesIndexSettings; + getModel(): ModelStatic; + /** Returns the attributes that `mapModelInstanceToDocument` needs to build the document */ - findEntriesToIndex(options?: { - offset?: number; - limit?: number; - fromDate?: Date; - maxId?: number; - ids?: number[]; - }): Promise>>; + findEntriesToIndex(options?: FindEntriesToIndexOptions): Promise>>; /** Maps a model instance to an ElasticSearch document */ mapModelInstanceToDocument( diff --git a/server/lib/elastic-search/adapters/ElasticSearchOrdersAdapter.ts b/server/lib/elastic-search/adapters/ElasticSearchOrdersAdapter.ts index 8eb250bf547..a5b7e7a4278 100644 --- a/server/lib/elastic-search/adapters/ElasticSearchOrdersAdapter.ts +++ b/server/lib/elastic-search/adapters/ElasticSearchOrdersAdapter.ts @@ -1,13 +1,12 @@ import { omit } from 'lodash'; import { Op } from 'sequelize'; -import models from '../../../models'; +import models, { Subscription } from '../../../models'; import { ElasticSearchIndexName } from '../constants'; -import { ElasticSearchModelAdapter } from './ElasticSearchModelAdapter'; +import { ElasticSearchModelAdapter, FindEntriesToIndexOptions } from './ElasticSearchModelAdapter'; export class ElasticSearchOrdersAdapter implements ElasticSearchModelAdapter { - public readonly model = models.Order; public readonly index = ElasticSearchIndexName.ORDERS; public readonly mappings = { properties: { @@ -18,6 +17,7 @@ export class ElasticSearchOrdersAdapter implements ElasticSearchModelAdapter { // Relationships CollectiveId: { type: 'keyword' }, FromCollectiveId: { type: 'keyword' }, + CreatedByUserId: { type: 'keyword' }, // Special fields HostCollectiveId: { type: 'keyword' }, ParentCollectiveId: { type: 'keyword' }, @@ -25,15 +25,11 @@ export class ElasticSearchOrdersAdapter implements ElasticSearchModelAdapter { }, } as const; - public findEntriesToIndex( - options: { - offset?: number; - limit?: number; - fromDate?: Date; - maxId?: number; - ids?: number[]; - } = {}, - ) { + public getModel() { + return models.Order; + } + + public findEntriesToIndex(options: FindEntriesToIndexOptions = {}) { return models.Order.findAll({ attributes: omit(Object.keys(this.mappings.properties), ['HostCollectiveId', 'ParentCollectiveId']), order: [['id', 'DESC']], @@ -42,16 +38,24 @@ export class ElasticSearchOrdersAdapter implements ElasticSearchModelAdapter { where: { ...(options.fromDate ? { updatedAt: options.fromDate } : null), ...(options.maxId ? { id: { [Op.lte]: options.maxId } } : null), - ...(options.ids?.length ? { id: { [Op.in]: options.ids } } : null), + ...(options.ids?.length ? { id: options.ids } : null), + ...(options.relatedToCollectiveIds?.length + ? { + [Op.or]: [ + { CollectiveId: options.relatedToCollectiveIds }, + { FromCollectiveId: options.relatedToCollectiveIds }, + ], + } + : null), }, include: [ { association: 'collective', required: true, - attributes: ['HostCollectiveId', 'ParentCollectiveId'], + attributes: ['isActive', 'HostCollectiveId', 'ParentCollectiveId'], }, { - model: models.Subscription, + model: Subscription, required: false, attributes: ['paypalSubscriptionId'], }, @@ -69,8 +73,9 @@ export class ElasticSearchOrdersAdapter implements ElasticSearchModelAdapter { description: instance.description, CollectiveId: instance.CollectiveId, FromCollectiveId: instance.FromCollectiveId, - HostCollectiveId: instance.collective.HostCollectiveId, + HostCollectiveId: !instance.collective.isActive ? null : instance.collective.HostCollectiveId, ParentCollectiveId: instance.collective.ParentCollectiveId, + CreatedByUserId: instance.CreatedByUserId, paypalSubscriptionId: instance.Subscription?.paypalSubscriptionId, }; } @@ -83,6 +88,9 @@ export class ElasticSearchOrdersAdapter implements ElasticSearchModelAdapter { paypalSubscriptionId: { terms: { HostCollectiveId: adminOfAccountIds }, }, + CreatedByUserId: { + terms: { HostCollectiveId: adminOfAccountIds }, + }, }, }; /* eslint-enable camelcase */ diff --git a/server/lib/elastic-search/adapters/ElasticSearchTiersAdapter.ts b/server/lib/elastic-search/adapters/ElasticSearchTiersAdapter.ts index b2319d4d652..0cea679a65f 100644 --- a/server/lib/elastic-search/adapters/ElasticSearchTiersAdapter.ts +++ b/server/lib/elastic-search/adapters/ElasticSearchTiersAdapter.ts @@ -5,10 +5,9 @@ import models from '../../../models'; import { stripHTMLOrEmpty } from '../../sanitize-html'; import { ElasticSearchIndexName } from '../constants'; -import { ElasticSearchModelAdapter } from './ElasticSearchModelAdapter'; +import { ElasticSearchModelAdapter, FindEntriesToIndexOptions } from './ElasticSearchModelAdapter'; export class ElasticSearchTiersAdapter implements ElasticSearchModelAdapter { - public readonly model = models.Tier; public readonly index = ElasticSearchIndexName.TIERS; public readonly mappings = { properties: { @@ -28,15 +27,11 @@ export class ElasticSearchTiersAdapter implements ElasticSearchModelAdapter { }, } as const; - public findEntriesToIndex( - options: { - offset?: number; - limit?: number; - fromDate?: Date; - maxId?: number; - ids?: number[]; - } = {}, - ) { + public getModel() { + return models.Tier; + } + + public findEntriesToIndex(options: FindEntriesToIndexOptions = {}) { return models.Tier.findAll({ attributes: omit(Object.keys(this.mappings.properties), ['HostCollectiveId', 'ParentCollectiveId']), order: [['id', 'DESC']], @@ -45,13 +40,14 @@ export class ElasticSearchTiersAdapter implements ElasticSearchModelAdapter { where: { ...(options.fromDate ? { updatedAt: options.fromDate } : null), ...(options.maxId ? { id: { [Op.lte]: options.maxId } } : null), - ...(options.ids?.length ? { id: { [Op.in]: options.ids } } : null), + ...(options.ids?.length ? { id: options.ids } : null), + ...(options.relatedToCollectiveIds?.length ? { CollectiveId: options.relatedToCollectiveIds } : null), }, include: [ { association: 'Collective', required: true, - attributes: ['HostCollectiveId', 'ParentCollectiveId'], + attributes: ['isActive', 'HostCollectiveId', 'ParentCollectiveId'], }, ], }); @@ -70,7 +66,7 @@ export class ElasticSearchTiersAdapter implements ElasticSearchModelAdapter { description: instance.description, longDescription: stripHTMLOrEmpty(instance.longDescription), CollectiveId: instance.CollectiveId, - HostCollectiveId: instance.Collective.HostCollectiveId, + HostCollectiveId: !instance.Collective.isActive ? null : instance.Collective.HostCollectiveId, ParentCollectiveId: instance.Collective.ParentCollectiveId, }; } diff --git a/server/lib/elastic-search/adapters/ElasticSearchTransactionsAdapter.ts b/server/lib/elastic-search/adapters/ElasticSearchTransactionsAdapter.ts index 01db817a41a..e6980cd98c2 100644 --- a/server/lib/elastic-search/adapters/ElasticSearchTransactionsAdapter.ts +++ b/server/lib/elastic-search/adapters/ElasticSearchTransactionsAdapter.ts @@ -4,10 +4,9 @@ import { Op } from 'sequelize'; import models from '../../../models'; import { ElasticSearchIndexName } from '../constants'; -import { ElasticSearchModelAdapter } from './ElasticSearchModelAdapter'; +import { ElasticSearchModelAdapter, FindEntriesToIndexOptions } from './ElasticSearchModelAdapter'; export class ElasticSearchTransactionsAdapter implements ElasticSearchModelAdapter { - public readonly model = models.Transaction; public readonly index = ElasticSearchIndexName.TRANSACTIONS; public readonly mappings = { properties: { @@ -28,15 +27,11 @@ export class ElasticSearchTransactionsAdapter implements ElasticSearchModelAdapt }, } as const; - public findEntriesToIndex( - options: { - offset?: number; - limit?: number; - fromDate?: Date; - maxId?: number; - ids?: number[]; - } = {}, - ) { + public getModel() { + return models.Transaction; + } + + public findEntriesToIndex(options: FindEntriesToIndexOptions = {}) { return models.Transaction.findAll({ attributes: omit(Object.keys(this.mappings.properties), ['merchantId']), order: [['id', 'DESC']], @@ -45,7 +40,16 @@ export class ElasticSearchTransactionsAdapter implements ElasticSearchModelAdapt where: { ...(options.fromDate ? { updatedAt: options.fromDate } : null), ...(options.maxId ? { id: { [Op.lte]: options.maxId } } : null), - ...(options.ids?.length ? { id: { [Op.in]: options.ids } } : null), + ...(options.ids?.length ? { id: options.ids } : null), + ...(options.relatedToCollectiveIds?.length + ? { + [Op.or]: [ + { CollectiveId: options.relatedToCollectiveIds }, + { FromCollectiveId: options.relatedToCollectiveIds }, + { HostCollectiveId: options.relatedToCollectiveIds }, + ], + } + : null), }, }); } diff --git a/server/lib/elastic-search/adapters/ElasticSearchUpdatesAdapter.ts b/server/lib/elastic-search/adapters/ElasticSearchUpdatesAdapter.ts index 3d56ef81b57..fb168824e7a 100644 --- a/server/lib/elastic-search/adapters/ElasticSearchUpdatesAdapter.ts +++ b/server/lib/elastic-search/adapters/ElasticSearchUpdatesAdapter.ts @@ -5,10 +5,9 @@ import models from '../../../models'; import { stripHTMLOrEmpty } from '../../sanitize-html'; import { ElasticSearchIndexName } from '../constants'; -import { ElasticSearchModelAdapter } from './ElasticSearchModelAdapter'; +import { ElasticSearchModelAdapter, FindEntriesToIndexOptions } from './ElasticSearchModelAdapter'; export class ElasticSearchUpdatesAdapter implements ElasticSearchModelAdapter { - public readonly model = models.Update; public readonly index = ElasticSearchIndexName.UPDATES; public readonly mappings = { properties: { @@ -29,15 +28,11 @@ export class ElasticSearchUpdatesAdapter implements ElasticSearchModelAdapter { }, } as const; - public findEntriesToIndex( - options: { - offset?: number; - limit?: number; - fromDate?: Date; - maxId?: number; - ids?: number[]; - } = {}, - ) { + public getModel() { + return models.Update; + } + + public findEntriesToIndex(options: FindEntriesToIndexOptions = {}) { return models.Update.findAll({ attributes: omit(Object.keys(this.mappings.properties), ['HostCollectiveId', 'ParentCollectiveId']), order: [['id', 'DESC']], @@ -46,13 +41,21 @@ export class ElasticSearchUpdatesAdapter implements ElasticSearchModelAdapter { where: { ...(options.fromDate ? { updatedAt: options.fromDate } : null), ...(options.maxId ? { id: { [Op.lte]: options.maxId } } : null), - ...(options.ids?.length ? { id: { [Op.in]: options.ids } } : null), + ...(options.ids?.length ? { id: options.ids } : null), + ...(options.relatedToCollectiveIds?.length + ? { + [Op.or]: [ + { CollectiveId: options.relatedToCollectiveIds }, + { FromCollectiveId: options.relatedToCollectiveIds }, + ], + } + : null), }, include: [ { association: 'collective', required: true, - attributes: ['HostCollectiveId', 'ParentCollectiveId'], + attributes: ['isActive', 'HostCollectiveId', 'ParentCollectiveId'], }, ], }); @@ -72,7 +75,7 @@ export class ElasticSearchUpdatesAdapter implements ElasticSearchModelAdapter { CollectiveId: instance.CollectiveId, FromCollectiveId: instance.FromCollectiveId, CreatedByUserId: instance.CreatedByUserId, - HostCollectiveId: instance.collective.HostCollectiveId, + HostCollectiveId: !instance.collective.isActive ? null : instance.collective.HostCollectiveId, ParentCollectiveId: instance.collective.ParentCollectiveId, }; } diff --git a/server/lib/elastic-search/adapters/index.ts b/server/lib/elastic-search/adapters/index.ts index 2db4bfa7494..ec42f45cb06 100644 --- a/server/lib/elastic-search/adapters/index.ts +++ b/server/lib/elastic-search/adapters/index.ts @@ -20,3 +20,19 @@ export const ElasticSearchModelsAdapters: Record; + +export const getAdapterFromTableName = (table: string): ElasticSearchModelAdapter | undefined => { + if (!AdaptersFromTableNames) { + AdaptersFromTableNames = Object.values(ElasticSearchModelsAdapters).reduce( + (acc, adapter) => { + acc[adapter.getModel().tableName] = adapter; + return acc; + }, + {} as Record, + ); + } + + return AdaptersFromTableNames[table]; +}; diff --git a/server/lib/elastic-search/batch-processor.ts b/server/lib/elastic-search/batch-processor.ts new file mode 100644 index 00000000000..0e39e9715e9 --- /dev/null +++ b/server/lib/elastic-search/batch-processor.ts @@ -0,0 +1,293 @@ +import { Client } from '@elastic/elasticsearch'; +import { BulkOperationContainer, DeleteByQueryRequest } from '@elastic/elasticsearch/lib/api/types'; +import config from 'config'; +import debugLib from 'debug'; +import { groupBy, keyBy } from 'lodash'; + +import logger from '../logger'; +import { HandlerType, reportErrorToSentry, reportMessageToSentry } from '../sentry'; + +import { ElasticSearchModelsAdapters, getAdapterFromTableName } from './adapters'; +import { getElasticSearchClient } from './client'; +import { ElasticSearchIndexName } from './constants'; +import { ElasticSearchRequest, ElasticSearchRequestType, isFullAccountReIndexRequest } from './types'; + +const debug = debugLib('elasticsearch-batch-processor'); + +/** + * This class processes ElasticSearch requests in batches, to reduce the number of requests sent to + * the server. + */ +export class ElasticSearchBatchProcessor { + public maxBatchSize: number = 1_000; + private static instance: ElasticSearchBatchProcessor; + private client: Client; + private _queue: ElasticSearchRequest[] = []; + private _maxWaitTimeInSeconds: number = config.elasticSearch.maxSyncDelay; + private _timeoutHandle: NodeJS.Timeout | null = null; + private _isStarted: boolean = false; + private _isProcessing: boolean = false; + private _processBatchPromise: Promise | null = null; + + static getInstance(): ElasticSearchBatchProcessor { + if (!ElasticSearchBatchProcessor.instance) { + ElasticSearchBatchProcessor.instance = new ElasticSearchBatchProcessor(); + } + + return ElasticSearchBatchProcessor.instance; + } + + start() { + this._isStarted = true; + } + + get isProcessing() { + return this._isProcessing; + } + + get hasScheduledBatch() { + return Boolean(this._timeoutHandle); + } + + async flushAndClose() { + debug('Flushing and closing Elastic Search Batch Processor'); + this._isStarted = false; + return this.callProcessBatch(); + } + + addToQueue(request: ElasticSearchRequest) { + if (!this._isStarted) { + return; + } + + debug('New request:', request.type, request['table'] || '', request.payload); + this._queue.push(request); + + if (this._queue.length >= this.maxBatchSize || isFullAccountReIndexRequest(request)) { + this.callProcessBatch(); + } else { + this.scheduleCallProcessBatch(); + } + } + + // ---- Private methods ---- + private constructor() { + this.client = getElasticSearchClient({ throwIfUnavailable: true }); + } + + private scheduleCallProcessBatch(wait = this._maxWaitTimeInSeconds) { + if (!this._timeoutHandle) { + this._timeoutHandle = setTimeout(() => this.callProcessBatch(), wait); + } + } + + /** + * A wrapper around `processBatch` that either calls it immediately or return a promise that resolves + * once the batch is fully processed or after a timeout. + */ + private async callProcessBatch(): Promise { + // Scenario 1: we are already processing a batch. + if (this._processBatchPromise) { + debug('callProcessBatch: waiting on existing batch processing'); + await this._processBatchPromise; + } + // Scenario 2: there is a pending batch processing. We cancel the timeout and run the batch immediately. + else if (this._timeoutHandle) { + debug('callProcessBatch: running batch early'); + clearTimeout(this._timeoutHandle); + this._timeoutHandle = null; + this._processBatchPromise = this._processBatch(); + await this._processBatchPromise; + } + // Scenario 3: there is no pending batch processing and no timeout, but there are requests in the queue. + else if (this._queue.length) { + debug('callProcessBatch: running batch now'); + this._processBatchPromise = this._processBatch(); + await this._processBatchPromise; + } + // Scenario 4: there is no pending batch processing, no timeout and no requests in the queue. We're done. + else { + debug('callProcessBatch: all done'); + return; + } + + await this.callProcessBatch(); + } + + private async _processBatch() { + // Clear the timeout + if (this._timeoutHandle) { + clearTimeout(this._timeoutHandle); + this._timeoutHandle = null; + } + + // Skip if no messages + if (this._queue.length === 0) { + debug('No messages to process'); + return; + } + + // Skip if already processing + if (this._isProcessing) { + return; + } + + // Immediately move up to maxBatchSize items from the queue to the processing queue + this._isProcessing = true; + const processingQueue = this._queue.splice(0, this.maxBatchSize); + debug('Processing batch of', processingQueue.length, 'requests'); + + try { + // Prepare bulk indexing body + const { operations, deleteQuery } = await this.convertRequestsToBulkOperations(processingQueue); + + if (deleteQuery) { + debug('Running delete query for', deleteQuery.query.bool.should[0].bool.must[1].terms._id); + const deleteQueryResult = await this.client.deleteByQuery(deleteQuery); + debug('Delete query took', deleteQueryResult.took, 'ms'); + } + + if (operations.length > 0) { + const bulkResponse = await this.client.bulk({ operations }); + debug('Synchronized', bulkResponse.items.length, 'items in', bulkResponse.took, 'ms'); + + // Handle any indexing errors + if (bulkResponse.errors) { + reportMessageToSentry('ElasticSearchBatchProcessor: Bulk indexing errors', { + severity: 'warning', + extra: { processingQueue, bulkResponse }, + }); + } + } + } catch (error) { + debug('Error processing batch:', error); + reportErrorToSentry(error, { + handler: HandlerType.ELASTICSEARCH_SYNC_JOB, + extra: { processingQueue }, + }); + } + + // End of processing + this._isProcessing = false; + this._processBatchPromise = null; + + // If the queue is ready to be processed again, do it + if (this._queue.length && !this._timeoutHandle) { + const wait = this._queue.length >= this.maxBatchSize ? 0 : this._maxWaitTimeInSeconds; + this.scheduleCallProcessBatch(wait); + } + } + + private async convertRequestsToBulkOperations( + requests: ElasticSearchRequest[], + ): Promise<{ operations: BulkOperationContainer[]; deleteQuery: DeleteByQueryRequest }> { + const { accountsToReIndex, requestsGroupedByTableName } = this.preprocessRequests(requests); + const operations: BulkOperationContainer[] = []; + let deleteQuery: DeleteByQueryRequest | null = null; + // Start with FULL_ACCOUNT_RE_INDEX requests + if (accountsToReIndex.length > 0) { + deleteQuery = this.getAccountsReIndexDeleteQuery(accountsToReIndex); + for (const adapter of Object.values(ElasticSearchModelsAdapters)) { + const entriesToIndex = await adapter.findEntriesToIndex({ relatedToCollectiveIds: accountsToReIndex }); + for (const entry of entriesToIndex) { + operations.push( + { index: { _index: adapter.index, _id: entry['id'].toString() } }, + adapter.mapModelInstanceToDocument(entry), + ); + } + } + } + + // Then process the rest + for (const [table, requests] of Object.entries(requestsGroupedByTableName)) { + const adapter = getAdapterFromTableName(table); + if (!adapter) { + logger.error(`No ElasticSearch adapter found for table ${table}`); + continue; + } + + // Preload all updated entries + let groupedEntriesToIndex = {}; + const updateRequests = requests.filter(request => request.type === ElasticSearchRequestType.UPDATE); + if (updateRequests.length) { + const updateRequestsIds = updateRequests.map(request => request.payload.id); + const entriesToIndex = await adapter.findEntriesToIndex({ ids: updateRequestsIds }); + groupedEntriesToIndex = keyBy(entriesToIndex, 'id'); + } + + // Iterate over requests and create bulk indexing operations + for (const request of requests) { + if (request.type === ElasticSearchRequestType.UPDATE) { + const entry = groupedEntriesToIndex[request.payload.id]; + if (!entry) { + operations.push({ delete: { _index: adapter.index, _id: request.payload.id.toString() } }); + } else { + operations.push( + { index: { _index: adapter.index, _id: request.payload.id.toString() } }, + adapter.mapModelInstanceToDocument(entry), + ); + } + } else if (request.type === ElasticSearchRequestType.DELETE) { + operations.push({ delete: { _index: adapter.index, _id: request.payload.id.toString() } }); + } + } + } + + return { operations, deleteQuery }; + } + + private getAccountsReIndexDeleteQuery(accountIds: Array): DeleteByQueryRequest { + if (!accountIds.length) { + return null; + } + + const allIndexes = Object.values(ElasticSearchModelsAdapters).map(adapter => adapter.index); + return { + index: allIndexes.join(','), + wait_for_completion: true, // eslint-disable-line camelcase + query: { + bool: { + should: [ + // Delete all collectives + { + bool: { + must: [{ term: { _index: ElasticSearchIndexName.COLLECTIVES } }, { terms: { _id: accountIds } }], + }, + }, + // Delete all relationships + { bool: { must: [{ terms: { HostCollectiveId: accountIds } }] } }, + { bool: { must: [{ terms: { ParentCollectiveId: accountIds } }] } }, + { bool: { must: [{ terms: { FromCollectiveId: accountIds } }] } }, + { bool: { must: [{ terms: { CollectiveId: accountIds } }] } }, + ], + }, + }, + }; + } + + /** + * Deduplicates requests, returning only the latest request for each entity, unless it's a + * FULL_ACCOUNT_RE_INDEX request - which always takes maximum priority - then groups them by table. + */ + private preprocessRequests(requests: ElasticSearchRequest[]): { + accountsToReIndex: Array; + requestsGroupedByTableName: Record; + } { + const accountsToReIndex = new Set(); + const otherRequests: Record = {}; + + for (const request of requests) { + if (isFullAccountReIndexRequest(request)) { + accountsToReIndex.add(request.payload.id); + delete otherRequests[request.payload.id]; // FULL_ACCOUNT_RE_INDEX requests take priority + } else if (request.table !== 'Collectives' || !accountsToReIndex.has(request.payload.id)) { + otherRequests[request.payload.id] = request; + } + } + + return { + accountsToReIndex: Array.from(accountsToReIndex), + requestsGroupedByTableName: groupBy(Object.values(otherRequests), request => request['table']), + }; + } +} diff --git a/server/lib/elastic-search/client.ts b/server/lib/elastic-search/client.ts index 01c9c0c3fe6..36472e7878e 100644 --- a/server/lib/elastic-search/client.ts +++ b/server/lib/elastic-search/client.ts @@ -1,8 +1,10 @@ import { Client } from '@elastic/elasticsearch'; import config from 'config'; +export const isElasticSearchConfigured = (): boolean => !!config.elasticSearch?.url; + export const getElasticSearchClient = ({ throwIfUnavailable = false } = {}): Client | undefined => { - if (config.elasticSearch?.url) { + if (isElasticSearchConfigured()) { return new Client({ node: config.elasticSearch.url }); } else if (throwIfUnavailable) { throw new Error('ElasticSearch is not configured'); diff --git a/server/lib/elastic-search/common.ts b/server/lib/elastic-search/common.ts new file mode 100644 index 00000000000..49456d77df5 --- /dev/null +++ b/server/lib/elastic-search/common.ts @@ -0,0 +1,18 @@ +import config from 'config'; + +import { ElasticSearchIndexName } from './constants'; + +const getIndexesPrefix = () => config.elasticSearch?.indexesPrefix; + +/** + * Formats the index name before querying ElasticSearch. Allows to share a single ElasticSearch + * instance between multiple environments (e.g. staging and production, dev and test). + */ +export const formatIndexNameForElasticSearch = (indexName: ElasticSearchIndexName): string => { + const prefix = getIndexesPrefix(); + if (prefix) { + return `${prefix}_${indexName}`; + } else { + return indexName; + } +}; diff --git a/server/lib/elastic-search/graphql-search.ts b/server/lib/elastic-search/graphql-search.ts index 3c29ac59e6e..3dcec4f6bc8 100644 --- a/server/lib/elastic-search/graphql-search.ts +++ b/server/lib/elastic-search/graphql-search.ts @@ -2,7 +2,6 @@ * This file contains the logic to bind the ElasticSearch functionality to the GraphQL API. */ -import DataLoader from 'dataloader'; import { GraphQLBoolean, GraphQLFieldConfigArgumentMap, @@ -13,10 +12,10 @@ import { GraphQLString, } from 'graphql'; import { GraphQLJSONObject } from 'graphql-scalars'; -import { groupBy, mapKeys, mapValues } from 'lodash'; +import { keyBy, mapKeys, mapValues } from 'lodash'; import { FieldsToGraphQLFieldConfigArgumentMap } from '../../graphql/common/typescript-helpers'; -import { SearchResultBucket } from '../../graphql/loaders/search'; +import { SearchResult } from '../../graphql/loaders/search'; import { GraphQLAccountCollection } from '../../graphql/v2/collection/AccountCollection'; import { CommentCollection } from '../../graphql/v2/collection/CommentCollection'; import { GraphQLExpenseCollection } from '../../graphql/v2/collection/ExpenseCollection'; @@ -56,7 +55,7 @@ const GraphQLSearchResultsStrategy: Record< ElasticSearchIndexName, { // A loader to use for loading entities from the (optionally encoded) ID - loadMany: (req, ids) => DataLoader; + loadMany: (req, ids) => Array; // A function to encode the ID for use in the GraphQL API getGraphQLId: (result: Record) => string; // A function to get Elastic Search index-specific parameters from the GraphQL arguments. By default, it returns the raw arguments. @@ -135,9 +134,9 @@ const buildSearchResultsType = (index: ElasticSearchIndexName, name: string, col resolve: async (baseSearchParams: GraphQLSearchParams, args, req) => { const indexParams = strategy.prepareArguments ? strategy.prepareArguments(args) : args; const fullSearchParams = { ...baseSearchParams, index, indexParams }; - const results = (await req.loaders.search.load(fullSearchParams)) as SearchResultBucket; + const result = (await req.loaders.search.load(fullSearchParams)) as SearchResult | null; - if (!results || results['doc_count'] === 0) { + if (!result || result.count === 0) { return { maxScore: 0, collection: { totalCount: 0, offset: 0, limit: baseSearchParams.limit, nodes: () => [] }, @@ -145,23 +144,22 @@ const buildSearchResultsType = (index: ElasticSearchIndexName, name: string, col }; } - const hits = results['top_hits_by_index']['hits']['hits']; - const maxScore = results['top_hits_by_index']['hits']['max_score'] ?? 0; - const getSQLIdFromHit = hit => hit['_source']['id']; - const hitsGroupedBySQLId = groupBy(hits, getSQLIdFromHit); - const hitsGroupedByGraphQLKey = mapKeys(hitsGroupedBySQLId, result => - strategy.getGraphQLId(result[0]['_source']), - ); - const highlights = mapValues(hitsGroupedByGraphQLKey, hits => hits[0]['highlight']); + const getSQLIdFromHit = (hit: (typeof result.hits)[0]): number => hit.source['id'] as number; + const hitsGroupedBySQLId = keyBy(result.hits, getSQLIdFromHit); + const hitsGroupedByGraphQLKey = mapKeys(hitsGroupedBySQLId, result => strategy.getGraphQLId(result.source)); + const highlights = mapValues(hitsGroupedByGraphQLKey, hit => hit.highlight); return { - maxScore, + maxScore: result.maxScore, highlights, collection: { - totalCount: results['doc_count'], + totalCount: result.count, offset: 0, limit: baseSearchParams.limit, - nodes: () => strategy.loadMany(req, hits.map(getSQLIdFromHit)), + nodes: async () => { + const entries = await strategy.loadMany(req, result.hits.map(getSQLIdFromHit)); + return entries.filter(Boolean); // Entries in ElasticSearch may have been deleted in the DB + }, }, }; }, diff --git a/server/lib/elastic-search/search.ts b/server/lib/elastic-search/search.ts index 424d194db54..56d3ad3463d 100644 --- a/server/lib/elastic-search/search.ts +++ b/server/lib/elastic-search/search.ts @@ -10,6 +10,7 @@ import { reportErrorToSentry } from '../sentry'; import { ElasticSearchModelsAdapters } from './adapters'; import { getElasticSearchClient } from './client'; +import { formatIndexNameForElasticSearch } from './common'; import { ElasticSearchIndexName, ElasticSearchIndexParams } from './constants'; /** @@ -112,7 +113,7 @@ const buildQuery = ( { bool: { filter: [ - { term: { _index: index } }, + { term: { _index: formatIndexNameForElasticSearch(index) } }, ...(permissions.default === 'PUBLIC' ? [] : [permissions.default]), ...getIndexConditions(index, indexParams), ], @@ -178,7 +179,7 @@ export const elasticSearchGlobalSearch = async ( return await client.search({ /* eslint-disable camelcase */ timeout: `${timeoutInSeconds}s`, - index: Array.from(indexes).join(','), + index: Array.from(indexes).map(formatIndexNameForElasticSearch).join(','), body: { size: 0, // We don't need hits at the top level query, diff --git a/server/lib/elastic-search/sync-postgres.ts b/server/lib/elastic-search/sync-postgres.ts new file mode 100644 index 00000000000..275962b8809 --- /dev/null +++ b/server/lib/elastic-search/sync-postgres.ts @@ -0,0 +1,171 @@ +import { createPostgresListener } from '../db'; +import logger from '../logger'; +import { runWithTimeout } from '../promises'; +import { HandlerType, reportErrorToSentry, reportMessageToSentry } from '../sentry'; +import sequelize from '../sequelize'; + +import { ElasticSearchModelsAdapters } from './adapters'; +import { ElasticSearchBatchProcessor } from './batch-processor'; +import { isElasticSearchConfigured } from './client'; +import { ElasticSearchRequestType, isValidElasticSearchRequest } from './types'; + +const CHANNEL_NAME = 'elasticsearch-requests'; + +const setupPostgresTriggers = async () => { + try { + await sequelize.query(` + -- Create a trigger function to send notifications on table changes + CREATE OR REPLACE FUNCTION notify_elasticsearch_on_change() + RETURNS TRIGGER AS $$ + DECLARE + notification JSON; + BEGIN + -- Determine the type of operation + IF (TG_OP = 'INSERT') THEN + notification = json_build_object('type', 'UPDATE', 'table', TG_TABLE_NAME, 'payload', json_build_object('id', NEW.id)); + ELSIF (TG_OP = 'UPDATE') THEN + IF (OLD."deletedAt" IS NULL AND NEW."deletedAt" IS NOT NULL) THEN + notification = json_build_object('type', 'DELETE', 'table', TG_TABLE_NAME, 'payload', json_build_object('id', NEW.id)); + ELSIF (OLD."deletedAt" IS NOT NULL AND NEW."deletedAt" IS NOT NULL) THEN + RETURN NULL; -- Do not notify on updates of deleted rows + ELSE + notification = json_build_object('type', 'UPDATE', 'table', TG_TABLE_NAME, 'payload', json_build_object('id', NEW.id)); + END IF; + ELSIF (TG_OP = 'DELETE') THEN + notification = json_build_object('type', 'DELETE', 'table', TG_TABLE_NAME, 'payload', json_build_object('id', OLD.id)); + END IF; + + -- Publish the notification to the Elastic Search requests channel + PERFORM pg_notify('${CHANNEL_NAME}', notification::text); + + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + + ${Object.values(ElasticSearchModelsAdapters) + .map( + adapter => ` + -- Create the trigger for INSERT operations + CREATE OR REPLACE TRIGGER ${adapter.getModel().tableName}_insert_trigger + AFTER INSERT ON "${adapter.getModel().tableName}" + FOR EACH ROW + EXECUTE FUNCTION notify_elasticsearch_on_change(); + + -- Create the trigger for UPDATE operations + CREATE OR REPLACE TRIGGER ${adapter.getModel().tableName}_update_trigger + AFTER UPDATE ON "${adapter.getModel().tableName}" + FOR EACH ROW + EXECUTE FUNCTION notify_elasticsearch_on_change(); + + -- Create the trigger for DELETE operations + CREATE OR REPLACE TRIGGER ${adapter.getModel().tableName}_delete_trigger + AFTER DELETE ON "${adapter.getModel().tableName}" + FOR EACH ROW + EXECUTE FUNCTION notify_elasticsearch_on_change(); + `, + ) + .join('\n')} + `); + } catch (error) { + logger.error(`Error setting up Postgres triggers: ${JSON.stringify(error)}`); + reportErrorToSentry(error, { handler: HandlerType.ELASTICSEARCH_SYNC_JOB }); + throw new Error('Failed to setup Postgres triggers'); + } +}; + +export const removeElasticSearchPostgresTriggers = async () => { + await sequelize.query(` + ${Object.values(ElasticSearchModelsAdapters) + .map( + adapter => ` + DROP TRIGGER IF EXISTS ${adapter.getModel().tableName}_insert_trigger ON "${adapter.getModel().tableName}"; + DROP TRIGGER IF EXISTS ${adapter.getModel().tableName}_update_trigger ON "${adapter.getModel().tableName}"; + DROP TRIGGER IF EXISTS ${adapter.getModel().tableName}_delete_trigger ON "${adapter.getModel().tableName}"; + DROP TRIGGER IF EXISTS ${adapter.getModel().tableName}_truncate_trigger ON "${adapter.getModel().tableName}"; + `, + ) + .join('\n')} + + DROP FUNCTION IF EXISTS notify_elasticsearch_on_change(); + `); +}; + +// Some shared variables +let shutdownPromise: Promise | null = null; +let subscriber: ReturnType; + +export const startElasticSearchPostgresSync = async () => { + const elasticSearchBatchProcessor = ElasticSearchBatchProcessor.getInstance(); + elasticSearchBatchProcessor.start(); + + // Setup DB message queue + subscriber = createPostgresListener(); + subscriber.notifications.on(CHANNEL_NAME, async event => { + if (!isValidElasticSearchRequest(event)) { + reportMessageToSentry('Invalid ElasticSearch request', { + extra: { event }, + handler: HandlerType.ELASTICSEARCH_SYNC_JOB, + severity: 'error', + }); + return; + } + + try { + elasticSearchBatchProcessor.addToQueue(event); + } catch (error) { + reportErrorToSentry(error, { handler: HandlerType.ELASTICSEARCH_SYNC_JOB }); + } + }); + + subscriber.events.on('error', error => { + reportErrorToSentry(error, { handler: HandlerType.ELASTICSEARCH_SYNC_JOB }); + }); + + await subscriber.connect(); + await subscriber.listenTo(CHANNEL_NAME); + + // Setup postgres triggers + await setupPostgresTriggers(); + + logger.info('ElasticSearch <-> Postgres sync job started'); + + return subscriber; +}; + +export const stopElasticSearchPostgresSync = (): Promise => { + if (!shutdownPromise) { + logger.info('Shutting down ElasticSearch <-> Postgres sync job'); + if (subscriber) { + subscriber.close(); + } + + shutdownPromise = runWithTimeout( + (async () => { + await removeElasticSearchPostgresTriggers(); + const elasticSearchBatchProcessor = ElasticSearchBatchProcessor.getInstance(); + await elasticSearchBatchProcessor.flushAndClose(); + logger.info('ElasticSearch <-> Postgres sync job shutdown complete'); + })(), + 30_000, + 'Elastic Search <-> Postgres sync job took too long to shutdown, forcing exit', + ); + } + + return shutdownPromise; +}; + +/** + * Re-indexes all entries across all indexes related to this `collectiveId`, either through `CollectiveId`, + * `HostCollectiveId`, `FromCollectiveId`...etc. + */ +export const elasticSearchFullAccountReIndex = async (collectiveId: number): Promise => { + if (!isElasticSearchConfigured()) { + logger.debug(`ElasticSearch is not configured, skipping ${collectiveId} full account re-index`); + return; + } + + ElasticSearchBatchProcessor.getInstance().addToQueue({ + type: ElasticSearchRequestType.FULL_ACCOUNT_RE_INDEX, + payload: { id: collectiveId }, + }); +}; diff --git a/server/lib/elastic-search/sync.ts b/server/lib/elastic-search/sync.ts index 7cd5f07ef32..c2ac4974229 100644 --- a/server/lib/elastic-search/sync.ts +++ b/server/lib/elastic-search/sync.ts @@ -2,6 +2,7 @@ * Functions to sync data between the database and elastic search */ +import config from 'config'; import { chunk } from 'lodash'; import { Op } from '../../models'; @@ -9,6 +10,7 @@ import logger from '../logger'; import { ElasticSearchModelsAdapters } from './adapters'; import { getElasticSearchClient } from './client'; +import { formatIndexNameForElasticSearch } from './common'; import { ElasticSearchIndexName } from './constants'; export async function createElasticSearchIndex(indexName: ElasticSearchIndexName) { @@ -19,7 +21,7 @@ export async function createElasticSearchIndex(indexName: ElasticSearchIndexName } return client.indices.create({ - index: adapter.index, + index: formatIndexNameForElasticSearch(indexName), body: { mappings: adapter['mappings'], settings: adapter['settings'] }, }); } @@ -31,7 +33,7 @@ async function removeDeletedEntries(indexName: ElasticSearchIndexName, fromDate: let offset = 0; let deletedEntries = []; do { - deletedEntries = await adapter.model.findAll({ + deletedEntries = await adapter.getModel().findAll({ attributes: ['id'], where: { deletedAt: { [Op.gt]: fromDate } }, raw: true, @@ -44,7 +46,7 @@ async function removeDeletedEntries(indexName: ElasticSearchIndexName, fromDate: return; } await client.bulk({ - index: indexName, + index: formatIndexNameForElasticSearch(indexName), body: deletedEntries.flatMap(entry => [{ delete: { _id: entry.id } }]), }); offset += pageSize; @@ -61,7 +63,7 @@ export async function restoreUndeletedEntries(indexName: ElasticSearchIndexName, /* eslint-disable camelcase */ let scrollSearch = await client.search({ - index: indexName, + index: formatIndexNameForElasticSearch(indexName), body: { _source: false }, filter_path: ['hits.hits._id', '_scroll_id'], size: 10_000, // Max value allowed by ES @@ -83,7 +85,7 @@ export async function restoreUndeletedEntries(indexName: ElasticSearchIndexName, /* eslint-enable camelcase */ // Search for entries that are not marked as deleted in the database - const undeletedEntries = (await adapter.model.findAll({ + const undeletedEntries = (await adapter.getModel().findAll({ attributes: ['id'], where: { id: { [Op.not]: allIds } }, raw: true, @@ -115,7 +117,7 @@ export async function restoreUndeletedEntries(indexName: ElasticSearchIndexName, // Send data to ElasticSearch await client.bulk({ - index: indexName, + index: formatIndexNameForElasticSearch(indexName), body: modelEntries.flatMap(entry => [{ index: { _id: entry.id } }, adapter.mapModelInstanceToDocument(entry)]), }); @@ -133,7 +135,10 @@ export async function syncElasticSearchIndex( const { fromDate } = options; if (options.log) { - logger.info(`Syncing index ${indexName}...`); + const realIndexName = formatIndexNameForElasticSearch(indexName); + logger.info( + `Syncing index ${indexName}${realIndexName !== indexName ? ` (${realIndexName})` : ''}${fromDate ? ` from ${fromDate}` : ''}...`, + ); } // If there's a fromDate, it means we are doing a simple sync (not a full resync) and therefore need to look at deleted entries @@ -159,7 +164,7 @@ export async function syncElasticSearchIndex( // Send data to ElasticSearch await client.bulk({ - index: indexName, + index: formatIndexNameForElasticSearch(indexName), body: modelEntries.flatMap(entry => [{ index: { _id: entry.id } }, adapter.mapModelInstanceToDocument(entry)]), }); @@ -188,10 +193,10 @@ export const deleteElasticSearchIndex = async (indexName: ElasticSearchIndexName } const client = getElasticSearchClient({ throwIfUnavailable: true }); - await client.indices.delete({ index: indexName }); + await client.indices.delete({ index: formatIndexNameForElasticSearch(indexName) }); }; -export const waitForAllIndexesRefresh = async () => { +export const waitForAllIndexesRefresh = async (prefix = config.elasticSearch.indexesPrefix) => { const client = getElasticSearchClient({ throwIfUnavailable: true }); - await client.indices.refresh({ index: '_all' }); + await client.indices.refresh({ index: !prefix ? '_all' : `${prefix}_*` }); }; diff --git a/server/lib/elastic-search/types.ts b/server/lib/elastic-search/types.ts new file mode 100644 index 00000000000..081a2b9e54a --- /dev/null +++ b/server/lib/elastic-search/types.ts @@ -0,0 +1,43 @@ +export enum ElasticSearchRequestType { + FULL_ACCOUNT_RE_INDEX = 'FULL_ACCOUNT_RE_INDEX', + UPDATE = 'UPDATE', + DELETE = 'DELETE', +} + +type ElasticSearchRequestBase = { + type: ElasticSearchRequestType; +}; + +export type ElasticSearchRequest = ElasticSearchRequestBase & + ( + | { + type: ElasticSearchRequestType.FULL_ACCOUNT_RE_INDEX; + payload: { id: number }; + } + | { + type: ElasticSearchRequestType.UPDATE | ElasticSearchRequestType.DELETE; + table: string; + payload: { id: number }; + } + ); + +export const isFullAccountReIndexRequest = ( + request: ElasticSearchRequest, +): request is ElasticSearchRequest & { type: ElasticSearchRequestType.FULL_ACCOUNT_RE_INDEX } => + request?.type === ElasticSearchRequestType.FULL_ACCOUNT_RE_INDEX; + +export const isValidElasticSearchRequest = (message: any): message is ElasticSearchRequest => { + if (typeof message !== 'object' || message === null) { + return false; + } else { + switch (message.type) { + case ElasticSearchRequestType.FULL_ACCOUNT_RE_INDEX: + return 'id' in message.payload && typeof message.payload.id === 'number'; + case ElasticSearchRequestType.UPDATE: + case ElasticSearchRequestType.DELETE: + return 'table' in message && 'id' in message.payload && typeof message.payload.id === 'number'; + default: + return false; + } + } +}; diff --git a/server/lib/promises.ts b/server/lib/promises.ts new file mode 100644 index 00000000000..c0a37594ae8 --- /dev/null +++ b/server/lib/promises.ts @@ -0,0 +1,27 @@ +class PromiseTimeoutError extends Error { + constructor(message: string) { + super(message); + this.name = 'PromiseTimeoutError'; + } +} + +export const runWithTimeout = async ( + promise: Promise, + timeoutInMs = 10000, + message = `Promise did not resolve within ${timeoutInMs}ms`, +): Promise => { + let timeout: NodeJS.Timeout; + + const result = await Promise.race([ + promise, + new Promise((_, reject) => { + timeout = setTimeout(() => reject(new PromiseTimeoutError(message)), timeoutInMs); + }), + ]); + + if (timeout) { + clearTimeout(timeout); + } + + return result as T; +}; diff --git a/server/lib/sentry.ts b/server/lib/sentry.ts index 9631ed8aaeb..26b4017c625 100644 --- a/server/lib/sentry.ts +++ b/server/lib/sentry.ts @@ -96,6 +96,7 @@ export enum HandlerType { CRON = 'CRON', FALLBACK = 'FALLBACK', WEBHOOK = 'WEBHOOK', + ELASTICSEARCH_SYNC_JOB = 'ELASTICSEARCH_SYNC_JOB', } export type CaptureErrorParams = { diff --git a/server/models/Collective.ts b/server/models/Collective.ts index 1ae09bb23ae..2161bb72afe 100644 --- a/server/models/Collective.ts +++ b/server/models/Collective.ts @@ -85,6 +85,7 @@ import { } from '../lib/collectivelib'; import { invalidateContributorsCache } from '../lib/contributors'; import { getFxRate } from '../lib/currency'; +import { elasticSearchFullAccountReIndex } from '../lib/elastic-search/sync-postgres'; import emailLib from '../lib/email'; import { formatAddress } from '../lib/format-address'; import { getGithubHandleFromUrl, getGithubUrlFromHandle } from '../lib/github'; @@ -2609,12 +2610,17 @@ class Collective extends Model< } } - return this.addHost(newHostCollective, remoteUser, { + await this.addHost(newHostCollective, remoteUser, { message, applicationData, shouldAutomaticallyApprove, }); } + + // Update search + elasticSearchFullAccountReIndex(this.id); + + return this; }; // edit the list of members and admins of this collective (create/update/remove) diff --git a/test/server/lib/elastic-search/batch-processor.test.ts b/test/server/lib/elastic-search/batch-processor.test.ts new file mode 100644 index 00000000000..32d40edf5d5 --- /dev/null +++ b/test/server/lib/elastic-search/batch-processor.test.ts @@ -0,0 +1,253 @@ +import { expect } from 'chai'; +import sinon from 'sinon'; + +import { ElasticSearchBatchProcessor } from '../../../../server/lib/elastic-search/batch-processor'; +import * as ElasticSearchClient from '../../../../server/lib/elastic-search/client'; +import { ElasticSearchRequestType } from '../../../../server/lib/elastic-search/types'; +import * as SentryLib from '../../../../server/lib/sentry'; + +describe('server/lib/elastic-search/batch-processor', () => { + let processor: ElasticSearchBatchProcessor; + let clientStub; + let sentryReportMessageStub; + let sentryReportErrorStub; + + beforeEach(() => { + // Reset singleton instance + (ElasticSearchBatchProcessor as any).instance = null; + + // Create stub for ES client + clientStub = { + bulk: sinon.stub().resolves({ items: [], errors: false, took: 0 }), + deleteByQuery: sinon.stub().resolves({ took: 0 }), + }; + + // Mock the ES client + sinon.stub(ElasticSearchClient, 'getElasticSearchClient').returns(clientStub); + processor = ElasticSearchBatchProcessor.getInstance(); + + sentryReportMessageStub = sinon.stub(SentryLib, 'reportMessageToSentry'); + sentryReportErrorStub = sinon.stub(SentryLib, 'reportErrorToSentry'); + }); + + afterEach(() => { + sinon.restore(); + }); + + describe('addToQueue()', () => { + it('should add requests to the queue and schedule batch processing', async () => { + processor.start(); + processor.addToQueue({ + type: ElasticSearchRequestType.UPDATE, + table: 'Collectives', + payload: { id: 1 }, + }); + + expect((processor as any)._queue).to.have.length(1); + expect(processor.hasScheduledBatch).to.be.true; + }); + + it('should process batch immediately when the queue is full', async () => { + processor.start(); + const processSpy = sinon.spy(processor, '_processBatch'); + + // Fill queue to maxBatchSize + for (let i = 0; i < processor.maxBatchSize; i++) { + processor.addToQueue({ + type: ElasticSearchRequestType.UPDATE, + table: 'Collectives', + payload: { id: i }, // Need to have unique payloads to prevent deduplication + }); + } + + expect(processSpy.calledOnce).to.be.true; + }); + + it('should process batch immediately when RECEIVING a FULL_ACCOUNT_RE_INDEX request', async () => { + processor.start(); + const processSpy = sinon.spy(processor, '_processBatch'); + + processor.addToQueue({ + type: ElasticSearchRequestType.FULL_ACCOUNT_RE_INDEX, + payload: { id: 1 }, + }); + + expect(processSpy.calledOnce).to.be.true; + }); + + it('should not add requests when processor is not started', async () => { + processor.addToQueue({ + type: ElasticSearchRequestType.UPDATE, + table: 'Collectives', + payload: { id: 1 }, + }); + + expect((processor as any)._queue).to.have.length(0); + }); + }); + + describe('flushAndClose()', () => { + it('should process remaining items and stop accepting new ones', async () => { + processor.start(); + const processSpy = sinon.spy(processor as any, '_processBatch'); + + processor.addToQueue({ + type: ElasticSearchRequestType.UPDATE, + table: 'Collectives', + payload: { id: 1 }, + }); + + await processor.flushAndClose(); + + expect(processSpy.calledOnce).to.be.true; + expect((processor as any)._queue).to.have.length(0); + expect(processor.hasScheduledBatch).to.be.false; + expect((processor as any).isProcessing).to.be.false; + }); + }); + + describe('callProcessBatch()', () => { + it('should wait for existing batch to complete before processing new requests', async () => { + processor.start(); + const processSpy = sinon.spy(processor as any, '_processBatch'); + + // Start a batch processing + (processor as any).processBatchPromise = Promise.resolve(); + + processor.addToQueue({ + type: ElasticSearchRequestType.UPDATE, + table: 'Collectives', + payload: { id: 1 }, + }); + + expect(processSpy.called).to.be.false; + (processor as any).processBatchPromise = null; + + await (processor as any).callProcessBatch(); + expect(processSpy.calledOnce).to.be.true; + }); + + it('should cancel pending timeout and process immediately', async () => { + processor.start(); + const processSpy = sinon.spy(processor as any, '_processBatch'); + + // Start a batch processing + (processor as any).processBatchPromise = Promise.resolve(); + + processor.addToQueue({ + type: ElasticSearchRequestType.UPDATE, + table: 'Collectives', + payload: { id: 1 }, + }); + + expect(processSpy.called).to.be.false; + (processor as any).processBatchPromise = null; + + await (processor as any).callProcessBatch(true); + expect(processSpy.calledOnce).to.be.true; + }); + }); + + describe('preprocessRequests()', () => { + it('should prioritize FULL_ACCOUNT_RE_INDEX requests', () => { + const requests = [ + { + type: ElasticSearchRequestType.UPDATE, + table: 'Collectives', + payload: { id: 1 }, + }, + { + type: ElasticSearchRequestType.FULL_ACCOUNT_RE_INDEX, + payload: { id: 1 }, + }, + ]; + + const result = (processor as any).preprocessRequests(requests); + expect(result).to.deep.eq({ + accountsToReIndex: [1], + requestsGroupedByTableName: {}, // No other requests, because FULL_ACCOUNT_RE_INDEX is prioritized + }); + }); + + it('should group non-reindex requests by table', () => { + const requests = [ + { + type: ElasticSearchRequestType.UPDATE, + table: 'Collectives', + payload: { id: 1 }, + }, + { + type: ElasticSearchRequestType.UPDATE, + table: 'Transactions', + payload: { id: 2 }, + }, + ]; + + const result = (processor as any).preprocessRequests(requests); + expect(result).to.deep.eq({ + accountsToReIndex: [], + requestsGroupedByTableName: { + Collectives: [{ type: ElasticSearchRequestType.UPDATE, table: 'Collectives', payload: { id: 1 } }], + Transactions: [{ type: ElasticSearchRequestType.UPDATE, table: 'Transactions', payload: { id: 2 } }], + }, + }); + }); + + it('should take the most recent request for each entry', () => { + const requests = [ + { + type: ElasticSearchRequestType.DELETE, + table: 'Collectives', + payload: { id: 1 }, + }, + { + type: ElasticSearchRequestType.UPDATE, + table: 'Collectives', + payload: { id: 1 }, + }, + ]; + + const result = (processor as any).preprocessRequests(requests); + expect(result).to.deep.eq({ + accountsToReIndex: [], + requestsGroupedByTableName: { + Collectives: [{ type: ElasticSearchRequestType.UPDATE, table: 'Collectives', payload: { id: 1 } }], + }, + }); + }); + }); + + describe('_processBatch()', () => { + it('should handle errors gracefully', async () => { + processor.start(); + clientStub.bulk.rejects(new Error('Test error')); + + processor.addToQueue({ + type: ElasticSearchRequestType.UPDATE, + table: 'Collectives', + payload: { id: 1 }, + }); + + await (processor as any)._processBatch(); + + expect((processor as any).isProcessing).to.be.false; + expect(sentryReportErrorStub.calledOnce).to.be.true; + }); + + it('should handle bulk response errors', async () => { + processor.start(); + clientStub.bulk.resolves({ items: [], errors: true, took: 0 }); + + processor.addToQueue({ + type: ElasticSearchRequestType.UPDATE, + table: 'Collectives', + payload: { id: 1 }, + }); + + await (processor as any)._processBatch(); + + expect((processor as any).isProcessing).to.be.false; + expect(sentryReportMessageStub.calledOnce).to.be.true; + }); + }); +}); diff --git a/test/server/lib/elastic-search/sync-postgres.test.ts b/test/server/lib/elastic-search/sync-postgres.test.ts new file mode 100644 index 00000000000..4e59c6df78a --- /dev/null +++ b/test/server/lib/elastic-search/sync-postgres.test.ts @@ -0,0 +1,96 @@ +import { expect } from 'chai'; +import sinon from 'sinon'; + +import { ElasticSearchBatchProcessor } from '../../../../server/lib/elastic-search/batch-processor'; +import { + removeElasticSearchPostgresTriggers, + startElasticSearchPostgresSync, + stopElasticSearchPostgresSync, +} from '../../../../server/lib/elastic-search/sync-postgres'; +import * as SentryLib from '../../../../server/lib/sentry'; +import { fakeCollective, sequelize } from '../../../test-helpers/fake-data'; +import { waitForCondition } from '../../../utils'; + +const checkIfElasticSearchTriggerExists = async () => { + const [result] = await sequelize.query(`SELECT * FROM pg_trigger WHERE tgname LIKE '%_%_trigger'`); + return result.length > 0; +}; + +describe('server/lib/elastic-search/sync-postgres', () => { + let processorStub; + let sentryReportMessageStub; + let sentryReportErrorStub; + + beforeEach(() => { + processorStub = sinon.createStubInstance(ElasticSearchBatchProcessor); + sinon.stub(ElasticSearchBatchProcessor, 'getInstance').returns(processorStub); + + sentryReportMessageStub = sinon.stub(SentryLib, 'reportMessageToSentry'); + sentryReportErrorStub = sinon.stub(SentryLib, 'reportErrorToSentry'); + }); + + afterEach(async () => { + sinon.restore(); + await removeElasticSearchPostgresTriggers(); // Make sure we always remove triggers to not impact tests performance + }); + + describe('startElasticSearchPostgresSync', () => { + let listener; + + afterEach(async () => { + if (listener) { + await listener.close(); + listener = null; + } + }); + + it('should dispatch events to the batch processor', async () => { + listener = await startElasticSearchPostgresSync(); + await fakeCollective(); + await waitForCondition(() => processorStub.addToQueue.called, { timeout: 2_000 }); + expect(sentryReportMessageStub.calledOnce).to.be.false; + expect(sentryReportErrorStub.calledOnce).to.be.false; + expect(await checkIfElasticSearchTriggerExists()).to.be.true; + }); + + it('should report errors to Sentry', async () => { + listener = await startElasticSearchPostgresSync(); + await listener.notify('elasticsearch-requests', { type: 'INVALID' }); + await waitForCondition(() => sentryReportMessageStub.called, { timeout: 2_000 }); + expect(processorStub.addToQueue.called).to.be.false; + expect(await checkIfElasticSearchTriggerExists()).to.be.true; + }); + }); + + describe('stopElasticSearchPostgresSync', () => { + let listener; + + afterEach(async () => { + if (listener) { + await listener.close(); + listener = null; + } + }); + + it('should close connections and flush processor', async () => { + listener = await startElasticSearchPostgresSync(); + const listenerStopSpy = sinon.spy(listener, 'close'); + await stopElasticSearchPostgresSync(); + + expect(processorStub.flushAndClose.calledOnce).to.be.true; + expect(listenerStopSpy.calledOnce).to.be.true; + expect(await checkIfElasticSearchTriggerExists()).to.be.false; + }); + + it('should not create multiple shutdown promises', async () => { + listener = await startElasticSearchPostgresSync(); + + const firstStop = stopElasticSearchPostgresSync(); + const secondStop = stopElasticSearchPostgresSync(); + + expect(firstStop).to.equal(secondStop); + }); + + it('should timeout if closing takes too long', async () => {}); + }); +});