Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix size estimation for reindex #35463

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions corehq/apps/es/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,10 @@ def __init__(self, primary_adapter, secondary_adapter):
def mapping(self):
return self.primary.mapping

@property
def parent_index_cname(self):
return self.primary.parent_index_cname

def export_adapter(self):
adapter = copy.copy(self)
adapter.primary = adapter.primary.export_adapter()
Expand Down
16 changes: 11 additions & 5 deletions corehq/apps/es/management/commands/elastic_sync_multiplexed.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ def start_reindex(self, cname, reindex_batch_size=1000, requests_per_second=None

logger.info("Starting ReIndex process")
task_id = es_manager.reindex(
source_index, destination_index, requests_per_second=requests_per_second
source_index, destination_index,
requests_per_second=requests_per_second, batch_size=reindex_batch_size
)
logger.info(f"Copying docs from index {source_index} to index {destination_index}")
task_number = task_id.split(':')[1]
Expand Down Expand Up @@ -297,7 +298,7 @@ def _copy_checkpoints(self, pillow, new_checkpoint_id):

def estimate_disk_space_for_reindex(self, stdout=None):
indices_info = es_manager.indices_info()
index_cname_map = self._get_index_name_cname_map()
index_cname_map = self._get_index_name_cname_map(ignore_subindices=True)
index_size_rows = []
total_size = 0
for index_name in index_cname_map.keys():
Expand All @@ -314,8 +315,13 @@ def estimate_disk_space_for_reindex(self, stdout=None):
print("\n\n")
print(f"Minimum free disk space recommended before starting the reindex: {recommended_disk}")

def _get_index_name_cname_map(self):
return {adapter.index_name: cname for cname, adapter in CANONICAL_NAME_ADAPTER_MAP.items()}
def _get_index_name_cname_map(self, ignore_subindices=False):
index_name_cname_map = {}
for cname, adapter in CANONICAL_NAME_ADAPTER_MAP.items():
if ignore_subindices and adapter.parent_index_cname:
continue
index_name_cname_map[adapter.index_name] = cname
return index_name_cname_map

def _format_bytes(self, size):
units = ['B', 'KB', 'MB', 'GB', 'TB']
Expand Down Expand Up @@ -457,7 +463,7 @@ class Command(BaseCommand):

For getting current count of both the indices
```bash
/manage.py elastic_sync_multiplexed display_doc_counts <index_cname>
./manage.py elastic_sync_multiplexed display_doc_counts <index_cname>
```

For getting current shard allocation status for the cluster
Expand Down
Loading