Skip to content

Commit

Permalink
feat(ElasticSearch): add support for index prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
Betree committed Dec 19, 2024
1 parent 0a91e0d commit c3c17bf
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 56 deletions.
3 changes: 2 additions & 1 deletion config/custom-environment-variables.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
},
"elasticSearch": {
"url": "ELASTICSEARCH_URL",
"maxSyncDelay": "ELASTICSEARCH_MAX_SYNC_DELAY"
"maxSyncDelay": "ELASTICSEARCH_MAX_SYNC_DELAY",
"indexesPrefix": "ELASTICSEARCH_INDEXES_PREFIX"
},
"database": {
"url": "PG_URL",
Expand Down
3 changes: 3 additions & 0 deletions config/staging.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
}
}
},
"elasticSearch": {
"indexesPrefix": "staging"
},
"host": {
"api": "https://api-staging.opencollective.com",
"frontend": "https://frontend-staging.opencollective.com",
Expand Down
3 changes: 2 additions & 1 deletion config/test.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
"url": "postgres://[email protected]:5432/opencollective_test"
},
"elasticSearch": {
"url": "http://localhost:9200"
"url": "http://localhost:9200",
"indexesPrefix": "test"
},
"fees": {
"default": {
Expand Down
42 changes: 32 additions & 10 deletions scripts/search.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import '../server/env';

import { Command } from 'commander';
import config from 'config';
import { partition, uniq } from 'lodash';

import { getElasticSearchClient } from '../server/lib/elastic-search/client';
import { formatIndexNameForElasticSearch } from '../server/lib/elastic-search/common';
import { ElasticSearchIndexName } from '../server/lib/elastic-search/constants';
import { elasticSearchGlobalSearch } from '../server/lib/elastic-search/search';
import {
Expand Down Expand Up @@ -77,7 +79,8 @@ program
const indexes = parseIndexesFromInput(indexesInput);
logger.info('Creating all indices...');
for (const indexName of indexes) {
logger.info(`Creating index ${indexName}`);
const realIndexName = formatIndexNameForElasticSearch(indexName);
logger.info(`Creating index ${indexName}${realIndexName !== indexName ? ` (${realIndexName})` : ''}`);
await createElasticSearchIndex(indexName);
}
logger.info('Create completed!');
Expand All @@ -98,9 +101,10 @@ program
) {
logger.info('Syncing all models...');
for (const indexName of indexes) {
logger.info(`Dropping index ${indexName}`);
const realIndexName = formatIndexNameForElasticSearch(indexName);
logger.info(`Dropping index ${indexName}${realIndexName !== indexName ? ` (${realIndexName})` : ''}`);
await deleteElasticSearchIndex(indexName, { throwIfMissing: false });
logger.info(`Re-creating index ${indexName}`);
logger.info(`Re-creating index ${indexName}${realIndexName !== indexName ? ` (${realIndexName})` : ''}`);
await createElasticSearchIndex(indexName);
await syncElasticSearchIndex(indexName, { log: true });
}
Expand Down Expand Up @@ -192,17 +196,35 @@ program
.command('stats')
.description('Show information about the ElasticSearch indices')
.argument('[indexes...]', 'Only show information about specific indexes')
.action(async indexesInput => {
.option('--all', 'Show information about all indexes, even those not matching the prefix')
.action(async (indexesInput, options) => {
checkElasticSearchAvailable();
const indexes = parseIndexesFromInput(indexesInput);
const indexesFromArgs = parseIndexesFromInput(indexesInput, null);
const client = getElasticSearchClient();
const availableIndexes = await getAvailableElasticSearchIndexes();
const [availableIndexesToQuery, unknownIndexes] = partition(indexes, index => availableIndexes.includes(index));
if (unknownIndexes.length) {
logger.warn(`Unknown indexes: ${unknownIndexes.join(', ')}`);
let availableIndexes = await getAvailableElasticSearchIndexes();

// Only get the indexes specified in args
if (indexesFromArgs) {
const partitionedIndexes = partition(indexesFromArgs, index => availableIndexes.includes(index));
availableIndexes = partitionedIndexes[0];
if (partitionedIndexes[1].length) {
logger.warn(`Unknown indexes: ${partitionedIndexes[1].join(', ')}`);
}
}

// Filter out indexes that don't match the prefix
if (!options.all) {
const prefix = config.elasticSearch.indexesPrefix;
if (prefix) {
availableIndexes = availableIndexes.filter(index => index.startsWith(prefix));
} else {
availableIndexes = availableIndexes.filter(index =>
Object.values(ElasticSearchIndexName).includes(index as ElasticSearchIndexName),
);
}
}

const result = await client.indices.stats({ index: availableIndexesToQuery.join(',') });
const result = await client.indices.stats({ index: availableIndexes.join(',') });
let nbDocs = 0;
let totalSize = 0;
const formatSize = (bytes: number) => `${(bytes / 1024 / 1024).toFixed(2)} MB`;
Expand Down
57 changes: 41 additions & 16 deletions server/graphql/loaders/search.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { AggregationsMultiBucketAggregateBase, SearchResponse } from '@elastic/e
import DataLoader from 'dataloader';
import { groupBy, pick } from 'lodash';

import { formatIndexNameForElasticSearch } from '../../lib/elastic-search/common';
import { ElasticSearchIndexName, ElasticSearchIndexParams } from '../../lib/elastic-search/constants';
import { elasticSearchGlobalSearch, ElasticSearchIndexRequest } from '../../lib/elastic-search/search';
import { reportMessageToSentry } from '../../lib/sentry';
Expand All @@ -12,15 +13,15 @@ import { Collective } from '../../models';
type SearchParams = {
requestId: string;
searchTerm: string;
index: string;
index: ElasticSearchIndexName;
indexParams: ElasticSearchIndexParams[ElasticSearchIndexName];
limit: number;
adminOfAccountIds: number[];
account: Collective;
host: Collective;
};

export type SearchResultBucket = {
type SearchResultBucket = {
key: string;
doc_count: number;
top_hits_by_index: {
Expand All @@ -41,6 +42,18 @@ export type SearchResultBucket = {
};
};

export type SearchResult = {
count: number;
maxScore: number;
hits: Array<{
indexName: ElasticSearchIndexName;
score: number;
id: string;
source: Record<string, unknown>;
highlight: Record<string, string[]>;
}>;
};

const getSearchIndexes = (requests: SearchParams[]): ElasticSearchIndexRequest[] => {
const results: Partial<Record<ElasticSearchIndexName, ElasticSearchIndexRequest>> = {};
for (const request of requests) {
Expand All @@ -56,9 +69,10 @@ const getSearchIndexes = (requests: SearchParams[]): ElasticSearchIndexRequest[]
* A loader to batch search requests on multiple indexes into a single ElasticSearch query.
*/
export const generateSearchLoaders = req => {
return new DataLoader<SearchParams, SearchResultBucket>(async (entries: SearchParams[]) => {
const groupedRequests = groupBy(entries, 'requestId');
return new DataLoader<SearchParams, SearchResult | null>(async (requests: SearchParams[]) => {
const groupedRequests = groupBy(requests, 'requestId');
const requestsResults = new Map<string, SearchResponse>();
const failures = [];

// All grouped requests must have the same searchTerm
assert(
Expand All @@ -79,29 +93,40 @@ export const generateSearchLoaders = req => {
});

if (results) {
requestsResults.set(requestId, results);
if (results._shards?.failures) {
reportMessageToSentry('ElasticSearch search shard failures', {
extra: {
failures: results._shards.failures,
request: firstRequest,
indexes,
},
});
failures.push({ request: firstRequest, indexes, items: results._shards.failures });
}

requestsResults.set(requestId, results);
}
}

return entries.map(entry => {
const results = requestsResults.get(entry.requestId);
if (failures.length > 0) {
reportMessageToSentry('ElasticSearch shard failures', { extra: { failures } });
}

return requests.map(request => {
const results = requestsResults.get(request.requestId);
const resultsAggregationsByIndex = results?.aggregations?.by_index as AggregationsMultiBucketAggregateBase;
const buckets = resultsAggregationsByIndex?.buckets as Array<SearchResultBucket>;
if (!buckets) {
return null;
}

return buckets.find(bucket => bucket.key === entry.index);
const expectedBucket = formatIndexNameForElasticSearch(request.index);
const bucket = buckets.find(bucket => bucket.key === expectedBucket);
if (bucket) {
return {
count: bucket.doc_count,
maxScore: bucket.top_hits_by_index.hits.max_score || 0,
hits: bucket.top_hits_by_index.hits.hits.map(hit => ({
indexName: request.index,
score: hit._score,
id: hit._id,
source: hit._source,
highlight: hit.highlight,
})),
};
}
});
});
};
18 changes: 18 additions & 0 deletions server/lib/elastic-search/common.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import config from 'config';

import { ElasticSearchIndexName } from './constants';

const getIndexesPrefix = () => config.elasticSearch?.indexesPrefix;

/**
* Formats the index name before querying ElasticSearch. Allows to share a single ElasticSearch
* instance between multiple environments (e.g. staging and production, dev and test).
*/
export const formatIndexNameForElasticSearch = (indexName: ElasticSearchIndexName): string => {
const prefix = getIndexesPrefix();
if (prefix) {
return `${prefix}_${indexName}`;
} else {
return indexName;
}
};
32 changes: 15 additions & 17 deletions server/lib/elastic-search/graphql-search.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
* This file contains the logic to bind the ElasticSearch functionality to the GraphQL API.
*/

import DataLoader from 'dataloader';
import {
GraphQLBoolean,
GraphQLFieldConfigArgumentMap,
Expand All @@ -13,10 +12,10 @@ import {
GraphQLString,
} from 'graphql';
import { GraphQLJSONObject } from 'graphql-scalars';
import { groupBy, mapKeys, mapValues } from 'lodash';
import { keyBy, mapKeys, mapValues } from 'lodash';

import { FieldsToGraphQLFieldConfigArgumentMap } from '../../graphql/common/typescript-helpers';
import { SearchResultBucket } from '../../graphql/loaders/search';
import { SearchResult } from '../../graphql/loaders/search';
import { GraphQLAccountCollection } from '../../graphql/v2/collection/AccountCollection';
import { CommentCollection } from '../../graphql/v2/collection/CommentCollection';
import { GraphQLExpenseCollection } from '../../graphql/v2/collection/ExpenseCollection';
Expand Down Expand Up @@ -56,7 +55,7 @@ const GraphQLSearchResultsStrategy: Record<
ElasticSearchIndexName,
{
// A loader to use for loading entities from the (optionally encoded) ID
loadMany: (req, ids) => DataLoader<unknown, unknown>;
loadMany: (req, ids) => Array<unknown | null>;
// A function to encode the ID for use in the GraphQL API
getGraphQLId: (result: Record<string, unknown>) => string;
// A function to get Elastic Search index-specific parameters from the GraphQL arguments. By default, it returns the raw arguments.
Expand Down Expand Up @@ -135,33 +134,32 @@ const buildSearchResultsType = (index: ElasticSearchIndexName, name: string, col
resolve: async (baseSearchParams: GraphQLSearchParams, args, req) => {
const indexParams = strategy.prepareArguments ? strategy.prepareArguments(args) : args;
const fullSearchParams = { ...baseSearchParams, index, indexParams };
const results = (await req.loaders.search.load(fullSearchParams)) as SearchResultBucket;
const result = (await req.loaders.search.load(fullSearchParams)) as SearchResult | null;

if (!results || results['doc_count'] === 0) {
if (!result || result.count === 0) {
return {
maxScore: 0,
collection: { totalCount: 0, offset: 0, limit: baseSearchParams.limit, nodes: () => [] },
highlights: {},
};
}

const hits = results['top_hits_by_index']['hits']['hits'];
const maxScore = results['top_hits_by_index']['hits']['max_score'] ?? 0;
const getSQLIdFromHit = hit => hit['_source']['id'];
const hitsGroupedBySQLId = groupBy(hits, getSQLIdFromHit);
const hitsGroupedByGraphQLKey = mapKeys(hitsGroupedBySQLId, result =>
strategy.getGraphQLId(result[0]['_source']),
);
const highlights = mapValues(hitsGroupedByGraphQLKey, hits => hits[0]['highlight']);
const getSQLIdFromHit = (hit: (typeof result.hits)[0]): number => hit.source['id'] as number;
const hitsGroupedBySQLId = keyBy(result.hits, getSQLIdFromHit);
const hitsGroupedByGraphQLKey = mapKeys(hitsGroupedBySQLId, result => strategy.getGraphQLId(result.source));
const highlights = mapValues(hitsGroupedByGraphQLKey, hit => hit.highlight);

return {
maxScore,
maxScore: result.maxScore,
highlights,
collection: {
totalCount: results['doc_count'],
totalCount: result.count,
offset: 0,
limit: baseSearchParams.limit,
nodes: () => strategy.loadMany(req, hits.map(getSQLIdFromHit)),
nodes: async () => {
const entries = await strategy.loadMany(req, result.hits.map(getSQLIdFromHit));
return entries.filter(Boolean); // Entries in ElasticSearch may have been deleted in the DB
},
},
};
},
Expand Down
5 changes: 3 additions & 2 deletions server/lib/elastic-search/search.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { reportErrorToSentry } from '../sentry';

import { ElasticSearchModelsAdapters } from './adapters';
import { getElasticSearchClient } from './client';
import { formatIndexNameForElasticSearch } from './common';
import { ElasticSearchIndexName, ElasticSearchIndexParams } from './constants';

/**
Expand Down Expand Up @@ -112,7 +113,7 @@ const buildQuery = (
{
bool: {
filter: [
{ term: { _index: index } },
{ term: { _index: formatIndexNameForElasticSearch(index) } },
...(permissions.default === 'PUBLIC' ? [] : [permissions.default]),
...getIndexConditions(index, indexParams),
],
Expand Down Expand Up @@ -178,7 +179,7 @@ export const elasticSearchGlobalSearch = async (
return await client.search({
/* eslint-disable camelcase */
timeout: `${timeoutInSeconds}s`,
index: Array.from(indexes).join(','),
index: Array.from(indexes).map(formatIndexNameForElasticSearch).join(','),
body: {
size: 0, // We don't need hits at the top level
query,
Expand Down
Loading

0 comments on commit c3c17bf

Please sign in to comment.