From fd1d734c89d12791bb8402dfd42a0b68be6dc71a Mon Sep 17 00:00:00 2001 From: Amit Phulera Date: Tue, 3 Dec 2024 14:09:32 +0530 Subject: [PATCH 1/6] add parent_index_cname prop to multiplex adapter --- corehq/apps/es/client.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/corehq/apps/es/client.py b/corehq/apps/es/client.py index 1a8a2267fe08..9c2975c6b269 100644 --- a/corehq/apps/es/client.py +++ b/corehq/apps/es/client.py @@ -1118,6 +1118,10 @@ def __init__(self, primary_adapter, secondary_adapter): def mapping(self): return self.primary.mapping + @property + def parent_index_cname(self): + return self.primary.parent_index_cname + def export_adapter(self): adapter = copy.copy(self) adapter.primary = adapter.primary.export_adapter() From 97677ebf67111352c5f0baa3ecaab4444b43e897 Mon Sep 17 00:00:00 2001 From: Amit Phulera Date: Tue, 3 Dec 2024 14:20:10 +0530 Subject: [PATCH 2/6] ignore subindices in calculating reindex size we will not reindex the sub indices as we can create them on the fly after we have new indices ready --- .../management/commands/elastic_sync_multiplexed.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/corehq/apps/es/management/commands/elastic_sync_multiplexed.py b/corehq/apps/es/management/commands/elastic_sync_multiplexed.py index 5acc94919540..df46c1e05c85 100644 --- a/corehq/apps/es/management/commands/elastic_sync_multiplexed.py +++ b/corehq/apps/es/management/commands/elastic_sync_multiplexed.py @@ -297,7 +297,7 @@ def _copy_checkpoints(self, pillow, new_checkpoint_id): def estimate_disk_space_for_reindex(self, stdout=None): indices_info = es_manager.indices_info() - index_cname_map = self._get_index_name_cname_map() + index_cname_map = self._get_index_name_cname_map(ignore_subindices=True) index_size_rows = [] total_size = 0 for index_name in index_cname_map.keys(): @@ -314,8 +314,13 @@ def estimate_disk_space_for_reindex(self, stdout=None): print("\n\n") print(f"Minimum free disk space recommended before starting the reindex: {recommended_disk}") - def _get_index_name_cname_map(self): - return {adapter.index_name: cname for cname, adapter in CANONICAL_NAME_ADAPTER_MAP.items()} + def _get_index_name_cname_map(self, ignore_subindices=False): + index_name_cname_map = {} + for cname, adapter in CANONICAL_NAME_ADAPTER_MAP.items(): + if ignore_subindices and adapter.parent_index_cname: + continue + index_name_cname_map[adapter.index_name] = cname + return index_name_cname_map def _format_bytes(self, size): units = ['B', 'KB', 'MB', 'GB', 'TB'] From 304d8ecd04ec7a061ed7bb0f1719df2db53d592e Mon Sep 17 00:00:00 2001 From: Amit Phulera Date: Tue, 3 Dec 2024 15:45:59 +0530 Subject: [PATCH 3/6] add batch_size to the reindex command which was removed by mistake earlier --- .../apps/es/management/commands/elastic_sync_multiplexed.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/corehq/apps/es/management/commands/elastic_sync_multiplexed.py b/corehq/apps/es/management/commands/elastic_sync_multiplexed.py index df46c1e05c85..6f0779c4606c 100644 --- a/corehq/apps/es/management/commands/elastic_sync_multiplexed.py +++ b/corehq/apps/es/management/commands/elastic_sync_multiplexed.py @@ -57,7 +57,8 @@ def start_reindex(self, cname, reindex_batch_size=1000, requests_per_second=None logger.info("Starting ReIndex process") task_id = es_manager.reindex( - source_index, destination_index, requests_per_second=requests_per_second + source_index, destination_index, + requests_per_second=requests_per_second, batch_size=reindex_batch_size ) logger.info(f"Copying docs from index {source_index} to index {destination_index}") task_number = task_id.split(':')[1] @@ -462,7 +463,7 @@ class Command(BaseCommand): For getting current count of both the indices ```bash - /manage.py elastic_sync_multiplexed display_doc_counts + ./manage.py elastic_sync_multiplexed display_doc_counts ``` For getting current shard allocation status for the cluster From 3748ba4120ccc2039d57e4f076cfc486eda163d1 Mon Sep 17 00:00:00 2001 From: Amit Phulera Date: Thu, 12 Dec 2024 13:45:07 +0530 Subject: [PATCH 4/6] add back option to purge ids --- corehq/apps/es/client.py | 48 +++++++++++-------- .../commands/elastic_sync_multiplexed.py | 4 +- 2 files changed, 31 insertions(+), 21 deletions(-) diff --git a/corehq/apps/es/client.py b/corehq/apps/es/client.py index 9c2975c6b269..f0618a08c456 100644 --- a/corehq/apps/es/client.py +++ b/corehq/apps/es/client.py @@ -399,7 +399,8 @@ def _validate_single_index(index): def reindex( self, source, dest, wait_for_completion=False, - refresh=False, batch_size=1000, requests_per_second=None, copy_doc_ids=True, query=None, + refresh=False, batch_size=1000, requests_per_second=None, + copy_doc_ids=True, query=None, purge_ids=False ): """ Starts the reindex process in elastic search cluster @@ -416,6 +417,7 @@ def reindex( and can be reduced if you encounter scroll timeouts. :param query: ``dict`` optional parameter to include a term query to filter which documents are included in the reindex + :param purge_ids: ``bool`` if True, will remove the _id field from the documents :returns: None if wait_for_completion is True else would return task_id of reindex task """ @@ -435,27 +437,35 @@ def reindex( "conflicts": "proceed" } - # Should be removed after ES 5-6 migration - if copy_doc_ids and source == const.HQ_USERS_INDEX_NAME: - # Remove password from form index + if copy_doc_ids or purge_ids: reindex_body["script"] = { "lang": "painless", - "source": """ - ctx._source.remove('password'); - if (!ctx._source.containsKey('doc_id')) { - ctx._source['doc_id'] = ctx._id; - } - """ - } - elif copy_doc_ids: - reindex_body["script"] = { - "lang": "painless", - "source": """ - if (!ctx._source.containsKey('doc_id')) { - ctx._source['doc_id'] = ctx._id; - } - """ + "source": "" } + script_parts = [] + + if purge_ids: + script_parts.append(""" + if (ctx._source.containsKey('_id')) { + ctx._source.remove('_id'); + } + """) + + if source == const.HQ_USERS_INDEX_NAME: + # Remove password field from users index + script_parts.append(""" + ctx._source.remove('password'); + """) + + if copy_doc_ids: + # Add doc_id field to the documents + script_parts.append(""" + if (!ctx._source.containsKey('doc_id')) { + ctx._source['doc_id'] = ctx._id; + } + """) + + reindex_body["script"]["source"] = " ".join(script_parts) reindex_kwargs = { "wait_for_completion": wait_for_completion, diff --git a/corehq/apps/es/management/commands/elastic_sync_multiplexed.py b/corehq/apps/es/management/commands/elastic_sync_multiplexed.py index 6f0779c4606c..c62bd781bac8 100644 --- a/corehq/apps/es/management/commands/elastic_sync_multiplexed.py +++ b/corehq/apps/es/management/commands/elastic_sync_multiplexed.py @@ -42,7 +42,7 @@ class ESSyncUtil: def __init__(self): self.es = get_client() - def start_reindex(self, cname, reindex_batch_size=1000, requests_per_second=None): + def start_reindex(self, cname, reindex_batch_size=1000, requests_per_second=None, purge_ids=False): adapter = doc_adapter_from_cname(cname) @@ -58,7 +58,7 @@ def start_reindex(self, cname, reindex_batch_size=1000, requests_per_second=None logger.info("Starting ReIndex process") task_id = es_manager.reindex( source_index, destination_index, - requests_per_second=requests_per_second, batch_size=reindex_batch_size + requests_per_second=requests_per_second, batch_size=reindex_batch_size, purge_ids=purge_ids ) logger.info(f"Copying docs from index {source_index} to index {destination_index}") task_number = task_id.split(':')[1] From 657dd1e8ca6bcb8c32cd58119557b3d542045f8f Mon Sep 17 00:00:00 2001 From: Amit Phulera Date: Thu, 12 Dec 2024 13:53:13 +0530 Subject: [PATCH 5/6] skip sub indices on non saas environments --- corehq/apps/hqadmin/views/data.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/corehq/apps/hqadmin/views/data.py b/corehq/apps/hqadmin/views/data.py index 7d0ac1a1ae29..73bb34cd46a6 100644 --- a/corehq/apps/hqadmin/views/data.py +++ b/corehq/apps/hqadmin/views/data.py @@ -1,12 +1,13 @@ import json +from django.conf import settings from django.http import Http404, HttpResponse, JsonResponse from django.shortcuts import render from django.utils.translation import gettext as _ from corehq.apps.domain.decorators import require_superuser from corehq.apps.es.es_query import ESQuery -from corehq.apps.es.transient_util import iter_index_cnames +from corehq.apps.es.transient_util import doc_adapter_from_cname, iter_index_cnames from corehq.apps.hqwebapp.doc_lookup import ( get_databases, get_db_from_db_name, @@ -31,6 +32,12 @@ def to_json(doc): found_indices = {} es_doc_type = None for index in iter_index_cnames(): + if not settings.IS_SAAS_ENVIRONMENT: + # If we're not in a SaaS environment, we don't need to check the sub-indices + # because they were not created in non-saas environments. + doc_adapter = doc_adapter_from_cname(index) + if doc_adapter.parent_index_cname: + continue es_doc = lookup_doc_in_es(doc_id, index) if es_doc: found_indices[index] = to_json(es_doc) From 079beeb95f69104dde09cf3b69ee959d4df8a8b3 Mon Sep 17 00:00:00 2001 From: Amit Phulera Date: Thu, 12 Dec 2024 14:19:24 +0530 Subject: [PATCH 6/6] add purge_id option to the command as well --- .../es/management/commands/elastic_sync_multiplexed.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/corehq/apps/es/management/commands/elastic_sync_multiplexed.py b/corehq/apps/es/management/commands/elastic_sync_multiplexed.py index d12493b464f6..2367fb03b5e6 100644 --- a/corehq/apps/es/management/commands/elastic_sync_multiplexed.py +++ b/corehq/apps/es/management/commands/elastic_sync_multiplexed.py @@ -608,7 +608,12 @@ def handle(self, **options): sub_cmd = options['sub_command'] cmd_func = options.get('func') if sub_cmd == 'start': - cmd_func(options['index_cname'], options['batch_size'], options['requests_per_second']) + cmd_func( + options['index_cname'], + options['batch_size'], + options['requests_per_second'], + options['purge_ids'] + ) elif sub_cmd == 'delete': cmd_func(options['index_cname']) elif sub_cmd == 'cleanup' or sub_cmd == 'display_doc_counts':