Skip to content

Commit

Permalink
feat(ElasticSearch): Postgres synchronization (#10521)
Browse files Browse the repository at this point in the history
  • Loading branch information
Betree authored Dec 19, 2024
1 parent 65e6f84 commit a4316d7
Show file tree
Hide file tree
Showing 34 changed files with 1,292 additions and 187 deletions.
8 changes: 7 additions & 1 deletion config/custom-environment-variables.json
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
{
"port": "PORT",
"env": "OC_ENV",
"services": {
"server": "ENABLE_SERVICE_SERVER",
"searchSync": "ENABLE_SERVICE_SEARCH_SYNC"
},
"mailpit": {
"client": "MAILPIT_CLIENT"
},
"elasticSearch": {
"url": "ELASTICSEARCH_URL"
"url": "ELASTICSEARCH_URL",
"maxSyncDelay": "ELASTICSEARCH_MAX_SYNC_DELAY",
"indexesPrefix": "ELASTICSEARCH_INDEXES_PREFIX"
},
"database": {
"url": "PG_URL",
Expand Down
7 changes: 7 additions & 0 deletions config/default.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
{
"port": "3060",
"services": {
"server": true,
"searchSync": false
},
"mailpit": {
"client": false
},
Expand All @@ -16,6 +20,9 @@
},
"readOnly": false
},
"elasticSearch": {
"maxSyncDelay": 5000
},
"maintenancedb": {
"url": "postgres://127.0.0.1:5432/postgres"
},
Expand Down
3 changes: 3 additions & 0 deletions config/e2e.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
"database": {
"url": "postgres://[email protected]:5432/opencollective_e2e"
},
"services": {
"searchSync": false
},
"mailpit": {
"client": true
},
Expand Down
3 changes: 3 additions & 0 deletions config/staging.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
}
}
},
"elasticSearch": {
"indexesPrefix": "staging"
},
"host": {
"api": "https://api-staging.opencollective.com",
"frontend": "https://frontend-staging.opencollective.com",
Expand Down
7 changes: 6 additions & 1 deletion config/test.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
{
"port": "3061",
"services": {
"server": true,
"searchSync": false
},
"database": {
"url": "postgres://[email protected]:5432/opencollective_test"
},
"elasticSearch": {
"url": "http://localhost:9200"
"url": "http://localhost:9200",
"indexesPrefix": "test"
},
"fees": {
"default": {
Expand Down
21 changes: 21 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
"pg": "8.13.1",
"pg-connection-string": "2.7.0",
"pg-format": "1.0.4",
"pg-listen": "1.7.0",
"plaid": "29.0.0",
"prepend-http": "3.0.1",
"redis": "4.6.6",
Expand Down
42 changes: 32 additions & 10 deletions scripts/search.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import '../server/env';

import { Command } from 'commander';
import config from 'config';
import { partition, uniq } from 'lodash';

import { getElasticSearchClient } from '../server/lib/elastic-search/client';
import { formatIndexNameForElasticSearch } from '../server/lib/elastic-search/common';
import { ElasticSearchIndexName } from '../server/lib/elastic-search/constants';
import { elasticSearchGlobalSearch } from '../server/lib/elastic-search/search';
import {
Expand Down Expand Up @@ -77,7 +79,8 @@ program
const indexes = parseIndexesFromInput(indexesInput);
logger.info('Creating all indices...');
for (const indexName of indexes) {
logger.info(`Creating index ${indexName}`);
const realIndexName = formatIndexNameForElasticSearch(indexName);
logger.info(`Creating index ${indexName}${realIndexName !== indexName ? ` (${realIndexName})` : ''}`);
await createElasticSearchIndex(indexName);
}
logger.info('Create completed!');
Expand All @@ -98,9 +101,10 @@ program
) {
logger.info('Syncing all models...');
for (const indexName of indexes) {
logger.info(`Dropping index ${indexName}`);
const realIndexName = formatIndexNameForElasticSearch(indexName);
logger.info(`Dropping index ${indexName}${realIndexName !== indexName ? ` (${realIndexName})` : ''}`);
await deleteElasticSearchIndex(indexName, { throwIfMissing: false });
logger.info(`Re-creating index ${indexName}`);
logger.info(`Re-creating index ${indexName}${realIndexName !== indexName ? ` (${realIndexName})` : ''}`);
await createElasticSearchIndex(indexName);
await syncElasticSearchIndex(indexName, { log: true });
}
Expand Down Expand Up @@ -192,17 +196,35 @@ program
.command('stats')
.description('Show information about the ElasticSearch indices')
.argument('[indexes...]', 'Only show information about specific indexes')
.action(async indexesInput => {
.option('--all', 'Show information about all indexes, even those not matching the prefix')
.action(async (indexesInput, options) => {
checkElasticSearchAvailable();
const indexes = parseIndexesFromInput(indexesInput);
const indexesFromArgs = parseIndexesFromInput(indexesInput, null);
const client = getElasticSearchClient();
const availableIndexes = await getAvailableElasticSearchIndexes();
const [availableIndexesToQuery, unknownIndexes] = partition(indexes, index => availableIndexes.includes(index));
if (unknownIndexes.length) {
logger.warn(`Unknown indexes: ${unknownIndexes.join(', ')}`);
let availableIndexes = await getAvailableElasticSearchIndexes();

// Only get the indexes specified in args
if (indexesFromArgs) {
const partitionedIndexes = partition(indexesFromArgs, index => availableIndexes.includes(index));
availableIndexes = partitionedIndexes[0];
if (partitionedIndexes[1].length) {
logger.warn(`Unknown indexes: ${partitionedIndexes[1].join(', ')}`);
}
}

// Filter out indexes that don't match the prefix
if (!options.all) {
const prefix = config.elasticSearch.indexesPrefix;
if (prefix) {
availableIndexes = availableIndexes.filter(index => index.startsWith(prefix));
} else {
availableIndexes = availableIndexes.filter(index =>
Object.values(ElasticSearchIndexName).includes(index as ElasticSearchIndexName),
);
}
}

const result = await client.indices.stats({ index: availableIndexesToQuery.join(',') });
const result = await client.indices.stats({ index: availableIndexes.join(',') });
let nbDocs = 0;
let totalSize = 0;
const formatSize = (bytes: number) => `${(bytes / 1024 / 1024).toFixed(2)} MB`;
Expand Down
57 changes: 41 additions & 16 deletions server/graphql/loaders/search.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { AggregationsMultiBucketAggregateBase, SearchResponse } from '@elastic/e
import DataLoader from 'dataloader';
import { groupBy, pick } from 'lodash';

import { formatIndexNameForElasticSearch } from '../../lib/elastic-search/common';
import { ElasticSearchIndexName, ElasticSearchIndexParams } from '../../lib/elastic-search/constants';
import { elasticSearchGlobalSearch, ElasticSearchIndexRequest } from '../../lib/elastic-search/search';
import { reportMessageToSentry } from '../../lib/sentry';
Expand All @@ -12,15 +13,15 @@ import { Collective } from '../../models';
type SearchParams = {
requestId: string;
searchTerm: string;
index: string;
index: ElasticSearchIndexName;
indexParams: ElasticSearchIndexParams[ElasticSearchIndexName];
limit: number;
adminOfAccountIds: number[];
account: Collective;
host: Collective;
};

export type SearchResultBucket = {
type SearchResultBucket = {
key: string;
doc_count: number;
top_hits_by_index: {
Expand All @@ -41,6 +42,18 @@ export type SearchResultBucket = {
};
};

export type SearchResult = {
count: number;
maxScore: number;
hits: Array<{
indexName: ElasticSearchIndexName;
score: number;
id: string;
source: Record<string, unknown>;
highlight: Record<string, string[]>;
}>;
};

const getSearchIndexes = (requests: SearchParams[]): ElasticSearchIndexRequest[] => {
const results: Partial<Record<ElasticSearchIndexName, ElasticSearchIndexRequest>> = {};
for (const request of requests) {
Expand All @@ -56,9 +69,10 @@ const getSearchIndexes = (requests: SearchParams[]): ElasticSearchIndexRequest[]
* A loader to batch search requests on multiple indexes into a single ElasticSearch query.
*/
export const generateSearchLoaders = req => {
return new DataLoader<SearchParams, SearchResultBucket>(async (entries: SearchParams[]) => {
const groupedRequests = groupBy(entries, 'requestId');
return new DataLoader<SearchParams, SearchResult | null>(async (requests: SearchParams[]) => {
const groupedRequests = groupBy(requests, 'requestId');
const requestsResults = new Map<string, SearchResponse>();
const failures = [];

// All grouped requests must have the same searchTerm
assert(
Expand All @@ -79,29 +93,40 @@ export const generateSearchLoaders = req => {
});

if (results) {
requestsResults.set(requestId, results);
if (results._shards?.failures) {
reportMessageToSentry('ElasticSearch search shard failures', {
extra: {
failures: results._shards.failures,
request: firstRequest,
indexes,
},
});
failures.push({ request: firstRequest, indexes, items: results._shards.failures });
}

requestsResults.set(requestId, results);
}
}

return entries.map(entry => {
const results = requestsResults.get(entry.requestId);
if (failures.length > 0) {
reportMessageToSentry('ElasticSearch shard failures', { extra: { failures } });
}

return requests.map(request => {
const results = requestsResults.get(request.requestId);
const resultsAggregationsByIndex = results?.aggregations?.by_index as AggregationsMultiBucketAggregateBase;
const buckets = resultsAggregationsByIndex?.buckets as Array<SearchResultBucket>;
if (!buckets) {
return null;
}

return buckets.find(bucket => bucket.key === entry.index);
const expectedBucket = formatIndexNameForElasticSearch(request.index);
const bucket = buckets.find(bucket => bucket.key === expectedBucket);
if (bucket) {
return {
count: bucket.doc_count,
maxScore: bucket.top_hits_by_index.hits.max_score || 0,
hits: bucket.top_hits_by_index.hits.hits.map(hit => ({
indexName: request.index,
score: hit._score,
id: hit._id,
source: hit._source,
highlight: hit.highlight,
})),
};
}
});
});
};
Loading

0 comments on commit a4316d7

Please sign in to comment.