From c65db37b11454349882f7ce25cbbdb6ab4a1b078 Mon Sep 17 00:00:00 2001 From: edunn Date: Wed, 4 Dec 2024 11:29:42 -0800 Subject: [PATCH 01/15] implement as_upsert kwarg in update_as_statements() and write_updated_docs() --- src/pds/registrysweepers/utils/db/__init__.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/pds/registrysweepers/utils/db/__init__.py b/src/pds/registrysweepers/utils/db/__init__.py index 521dd99..b317613 100644 --- a/src/pds/registrysweepers/utils/db/__init__.py +++ b/src/pds/registrysweepers/utils/db/__init__.py @@ -279,6 +279,7 @@ def write_updated_docs( updates: Iterable[Update], index_name: str, bulk_chunk_max_update_count: Union[int, None] = None, + as_upsert: bool = False ): log.info("Writing document updates...") updated_doc_count = 0 @@ -305,7 +306,7 @@ def write_updated_docs( bulk_updates_buffer = [] bulk_buffer_size_mb = 0.0 - update_statement_strs = update_as_statements(update) + update_statement_strs = update_as_statements(update, as_upsert=as_upsert) for s in update_statement_strs: bulk_buffer_size_mb += sys.getsizeof(s) / 1024**2 @@ -320,13 +321,16 @@ def write_updated_docs( log.info(f"Updated documents for {updated_doc_count} products!") -def update_as_statements(update: Update) -> Iterable[str]: - """Given an Update, convert it to an ElasticSearch-style set of request body content strings""" +def update_as_statements(update: Update, as_upsert: bool = False) -> Iterable[str]: + """ + Given an Update, convert it to an ElasticSearch-style set of request body content strings + Optionally, specify as upsert (index if does not already exist) + """ metadata_statement: Dict[str, Any] = {"update": {"_id": update.id}} if update.has_versioning_information(): metadata_statement["if_primary_term"] = update.primary_term metadata_statement["if_seq_no"] = update.seq_no - content_statement = {"doc": update.content} + content_statement = {"doc": update.content, "doc_as_upsert": as_upsert} update_objs = [metadata_statement, content_statement] updates_strs = [json.dumps(obj) for obj in update_objs] return updates_strs From d0b85a5fc419a75fadd7be5cd411273f896ca4e3 Mon Sep 17 00:00:00 2001 From: edunn Date: Wed, 4 Dec 2024 15:00:37 -0800 Subject: [PATCH 02/15] make sweepers_driver main routine importable --- docker/sweepers_driver.py | 142 ++--------------------------- src/pds/registrysweepers/driver.py | 138 ++++++++++++++++++++++++++++ 2 files changed, 144 insertions(+), 136 deletions(-) create mode 100644 src/pds/registrysweepers/driver.py diff --git a/docker/sweepers_driver.py b/docker/sweepers_driver.py index a4df683..9d05e0e 100755 --- a/docker/sweepers_driver.py +++ b/docker/sweepers_driver.py @@ -1,138 +1,8 @@ #! /usr/bin/env python3 -# -# Copyright © 2023, California Institute of Technology ("Caltech"). -# U.S. Government sponsorship acknowledged. -# -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# -# • Redistributions of source code must retain the above copyright notice, -# this list of conditions and the following disclaimer. -# • Redistributions must reproduce the above copyright notice, this list of -# conditions and the following disclaimer in the documentation and/or other -# materials provided with the distribution. -# • Neither the name of Caltech nor its operating division, the Jet Propulsion -# Laboratory, nor the names of its contributors may be used to endorse or -# promote products derived from this software without specific prior written -# permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE -# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -# POSSIBILITY OF SUCH DAMAGE. -# -# Python driver for provenance (OUTDATED TODO: Update documentation) -# ============================ -# -# This script is provided to support the scheduled execution of PDS Registry -# Provenance, typically in AWS via Event Bridge and ECS/Fargate. -# -# This script makes the following assumptions for its run-time: -# -# - The EN (i.e. primary) OpenSearch endpoint is provided in the environment -# variable PROV_ENDPOINT -# - The username/password is provided as a JSON key/value in the environment -# variable PROV_CREDENTIALS -# - The remotes available through cross cluster search to be processed are -# provided as a JSON list of strings - each string containing the space -# separated list of remotes (as they appear on the provenance command line) -# Each set of remotes is used in an execution of provenance. The value of -# this is specified in the environment variable PROV_REMOTES. If this -# variable is empty or not defined, provenance is run without specifying -# remotes and only the PROV_ENDPOINT is processed. -# - The directory containing the provenance.py file is in PATH and is -# executable. -# -# +from pds.registrysweepers.driver import run as run_sweepers -import functools -import inspect -import argparse -import json -import logging -import os -from datetime import datetime -from typing import Callable - -from pds.registrysweepers import provenance, ancestry, repairkit, legacy_registry_sync -from pds.registrysweepers.reindexer import main as reindexer -from pds.registrysweepers.utils import configure_logging, parse_log_level -from pds.registrysweepers.utils.db.client import get_opensearch_client_from_environment -from pds.registrysweepers.utils.misc import get_human_readable_elapsed_since -from pds.registrysweepers.utils.misc import is_dev_mode - -configure_logging(filepath=None, log_level=logging.INFO) -log = logging.getLogger(__name__) - -dev_mode = is_dev_mode() -if dev_mode: - log.warning('Operating in development mode - host verification disabled') - import urllib3 - - urllib3.disable_warnings() - -log_level = parse_log_level(os.environ.get('LOGLEVEL', 'INFO')) - - -def run_factory(sweeper_f: Callable) -> Callable: - return functools.partial( - sweeper_f, - client=get_opensearch_client_from_environment(verify_certs=True if not dev_mode else False), - # enable for development if required - not necessary in production - # log_filepath='registry-sweepers.log', - log_level=log_level - ) - - -parser = argparse.ArgumentParser( - prog='registry-sweepers', - description='sweeps the PDS registry with different routines meant to run regularly on the database' -) - -# define optional sweepers -parser.add_argument('--legacy-sync', action='store_true') -optional_sweepers = { - 'legacy_sync': legacy_registry_sync.run -} - -args = parser.parse_args() - -# Define default sweepers to be run here, in order of execution -sweepers = [ - repairkit.run, - provenance.run, - ancestry.run, - reindexer.run -] - -for option, sweeper in optional_sweepers.items(): - if getattr(args, option): - sweepers.append(sweeper) - -sweeper_descriptions = [inspect.getmodule(f).__name__ for f in sweepers] -log.info(f'Running sweepers: {sweeper_descriptions}') - -total_execution_begin = datetime.now() - -sweeper_execution_duration_strs = [] - -for sweeper in sweepers: - sweeper_execution_begin = datetime.now() - run_sweeper_f = run_factory(sweeper) - - run_sweeper_f() - - sweeper_name = inspect.getmodule(sweeper).__name__ - sweeper_execution_duration_strs.append(f'{sweeper_name}: {get_human_readable_elapsed_since(sweeper_execution_begin)}') - -log.info(f'Sweepers successfully executed in {get_human_readable_elapsed_since(total_execution_begin)}\n ' - + '\n '.join(sweeper_execution_duration_strs)) +# To reduce potential for error when deployed, this file has been left after extraction of contents to +# registrysweepers.driver.run() +# TODO: remove this dependency and have docker run driver.py directly +if __name__ == '__main__': + run_sweepers() diff --git a/src/pds/registrysweepers/driver.py b/src/pds/registrysweepers/driver.py new file mode 100644 index 0000000..d4f86ff --- /dev/null +++ b/src/pds/registrysweepers/driver.py @@ -0,0 +1,138 @@ +# +# Copyright © 2023, California Institute of Technology ("Caltech"). +# U.S. Government sponsorship acknowledged. +# +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# • Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# • Redistributions must reproduce the above copyright notice, this list of +# conditions and the following disclaimer in the documentation and/or other +# materials provided with the distribution. +# • Neither the name of Caltech nor its operating division, the Jet Propulsion +# Laboratory, nor the names of its contributors may be used to endorse or +# promote products derived from this software without specific prior written +# permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +# +# Python driver for provenance (OUTDATED TODO: Update documentation) +# ============================ +# +# This script is provided to support the scheduled execution of PDS Registry +# Provenance, typically in AWS via Event Bridge and ECS/Fargate. +# +# This script makes the following assumptions for its run-time: +# +# - The EN (i.e. primary) OpenSearch endpoint is provided in the environment +# variable PROV_ENDPOINT +# - The username/password is provided as a JSON key/value in the environment +# variable PROV_CREDENTIALS +# - The remotes available through cross cluster search to be processed are +# provided as a JSON list of strings - each string containing the space +# separated list of remotes (as they appear on the provenance command line) +# Each set of remotes is used in an execution of provenance. The value of +# this is specified in the environment variable PROV_REMOTES. If this +# variable is empty or not defined, provenance is run without specifying +# remotes and only the PROV_ENDPOINT is processed. +# - The directory containing the provenance.py file is in PATH and is +# executable. +# +# + +import functools +import inspect +import argparse +import json +import logging +import os +from datetime import datetime +from typing import Callable + +from pds.registrysweepers import provenance, ancestry, repairkit, legacy_registry_sync +from pds.registrysweepers.reindexer import main as reindexer +from pds.registrysweepers.utils import configure_logging, parse_log_level +from pds.registrysweepers.utils.db.client import get_opensearch_client_from_environment +from pds.registrysweepers.utils.misc import get_human_readable_elapsed_since + + +def run(): + configure_logging(filepath=None, log_level=logging.INFO) + log = logging.getLogger(__name__) + + dev_mode = str(os.environ.get("DEV_MODE")).lower() not in {'none', '', '0', 'false'} + if dev_mode: + log.warning('Operating in development mode - host verification disabled') + import urllib3 + + urllib3.disable_warnings() + + log_level = parse_log_level(os.environ.get('LOGLEVEL', 'INFO')) + + + def run_factory(sweeper_f: Callable) -> Callable: + return functools.partial( + sweeper_f, + client=get_opensearch_client_from_environment(verify_certs=True if not dev_mode else False), + # enable for development if required - not necessary in production + # log_filepath='registry-sweepers.log', + log_level=log_level + ) + + + parser = argparse.ArgumentParser( + prog='registry-sweepers', + description='sweeps the PDS registry with different routines meant to run regularly on the database' + ) + + # define optional sweepers + parser.add_argument('--legacy-sync', action='store_true') + optional_sweepers = { + 'legacy_sync': legacy_registry_sync.run + } + + args = parser.parse_args() + + # Define default sweepers to be run here, in order of execution + sweepers = [ + repairkit.run, + provenance.run, + ancestry.run, + reindexer.run + ] + + for option, sweeper in optional_sweepers.items(): + if getattr(args, option): + sweepers.append(sweeper) + + sweeper_descriptions = [inspect.getmodule(f).__name__ for f in sweepers] + log.info(f'Running sweepers: {sweeper_descriptions}') + + total_execution_begin = datetime.now() + + sweeper_execution_duration_strs = [] + + for sweeper in sweepers: + sweeper_execution_begin = datetime.now() + run_sweeper_f = run_factory(sweeper) + + run_sweeper_f() + + sweeper_name = inspect.getmodule(sweeper).__name__ + sweeper_execution_duration_strs.append(f'{sweeper_name}: {get_human_readable_elapsed_since(sweeper_execution_begin)}') + + log.info(f'Sweepers successfully executed in {get_human_readable_elapsed_since(total_execution_begin)}\n ' + + '\n '.join(sweeper_execution_duration_strs)) From 40ce41b334611e92df77d854f85991138aae389d Mon Sep 17 00:00:00 2001 From: edunn Date: Wed, 4 Dec 2024 15:22:53 -0800 Subject: [PATCH 03/15] [incomplete] implement reindex.py scratch script --- scratch/manual-aoss-reindexer/reindex.py | 77 ++++++++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 scratch/manual-aoss-reindexer/reindex.py diff --git a/scratch/manual-aoss-reindexer/reindex.py b/scratch/manual-aoss-reindexer/reindex.py new file mode 100644 index 0000000..36d95c5 --- /dev/null +++ b/scratch/manual-aoss-reindexer/reindex.py @@ -0,0 +1,77 @@ +import logging +import os + +from pds.registrysweepers.utils.db import query_registry_db_with_search_after, write_updated_docs +from pds.registrysweepers.utils.db.client import get_opensearch_client_from_environment +from pds.registrysweepers.utils.db.indexing import ensure_index_mapping +from pds.registrysweepers.utils.db.update import Update + +from pds.registrysweepers.driver import run as run_sweepers + +if __name__ == '__main__': + + src_node_name = 'geo' + dest_pseudonode_name = f'edunn-{src_node_name}' # TODO: change this from 'edunn' to 'temp' + + # set node id env var to facilitate sweepers + os.environ["MULTITENANCY_NODE_ID"] = dest_pseudonode_name + + # change these to use a resolution function later - need to decouple resolve_multitenant_index_name() from env var + src_index_name = f'{src_node_name}-registry' + dest_index_name = f'{dest_pseudonode_name}-registry' + + with get_opensearch_client_from_environment() as client: + + # ensure that all necessary preconditions for safe/successful reindexing are met + try: + # ensure that the destination is a temporary index, to prevent inadvertently writing to a real index + allowed_prefixes = {'temp', 'edunn'} + if not any([dest_index_name.startswith(prefix) for prefix in allowed_prefixes]): + raise ValueError( + f'Destination index name {dest_index_name} is not prefixed with one of {allowed_prefixes} and may not be the intended value - aborting') + + # ensure correct destination index configuration + try: + dynamic_mapping_disabled = client.indices.get(dest_index_name)[dest_index_name]['mappings'][ + 'dynamic'] == 'false' + if not dynamic_mapping_disabled: + raise ValueError + + except (KeyError, ValueError): + raise RuntimeError( + f'Index "{dest_index_name}" is not correctly configured - "dynamic" mapping setting is not set to "false - aborting"') + + # other conditions may be populated later + + except Exception as err: + logging.error(err) + exit(1) + + try: + # ensure that sort field is in mapping to facilitate execution of reindexing sweeper + necessary_mappings = { + 'lidvid': 'keyword' + } + for property_name, property_mapping_type in necessary_mappings.items(): + ensure_index_mapping(client, dest_index_name, property_name, property_mapping_type) + + sort_keys = sorted(necessary_mappings.keys()) + + docs = query_registry_db_with_search_after(client, src_index_name, {"query": {"match_all": {}}}, {}, + sort_fields=sort_keys, request_timeout_seconds=20) + updates = map(lambda doc: Update(id=doc['_id'], content=doc['_source']), docs) + write_updated_docs(client, updates, dest_index_name, as_upsert=True) + + # TODO: Implement non-redundant pickup after partial completion, if possible + except Exception as err: + print(f'Reindex from {src_index_name} to {dest_index_name} failed: {err}') + exit(1) + + # TODO: Implement consistency check to ensure that all data was successfully migrated + + # Run sweepers on the migrated data + try: + run_sweepers() + except Exception as err: + logging.error(f'Post-reindex sweeper execution failed with {err}') + exit(1) \ No newline at end of file From 6e824ca2f244a4af7e349b19544ca5832ebe611e Mon Sep 17 00:00:00 2001 From: edunn Date: Mon, 9 Dec 2024 11:08:46 -0800 Subject: [PATCH 04/15] refactor reindex.py --- scratch/manual-aoss-reindexer/reindex.py | 50 ++++++++++++++---------- 1 file changed, 30 insertions(+), 20 deletions(-) diff --git a/scratch/manual-aoss-reindexer/reindex.py b/scratch/manual-aoss-reindexer/reindex.py index 36d95c5..a54c55f 100644 --- a/scratch/manual-aoss-reindexer/reindex.py +++ b/scratch/manual-aoss-reindexer/reindex.py @@ -8,21 +8,10 @@ from pds.registrysweepers.driver import run as run_sweepers -if __name__ == '__main__': - - src_node_name = 'geo' - dest_pseudonode_name = f'edunn-{src_node_name}' # TODO: change this from 'edunn' to 'temp' - - # set node id env var to facilitate sweepers - os.environ["MULTITENANCY_NODE_ID"] = dest_pseudonode_name - - # change these to use a resolution function later - need to decouple resolve_multitenant_index_name() from env var - src_index_name = f'{src_node_name}-registry' - dest_index_name = f'{dest_pseudonode_name}-registry' +def ensure_valid_state(dest_index_name: str): + """Ensure that all necessary preconditions for safe/successful reindexing are met""" with get_opensearch_client_from_environment() as client: - - # ensure that all necessary preconditions for safe/successful reindexing are met try: # ensure that the destination is a temporary index, to prevent inadvertently writing to a real index allowed_prefixes = {'temp', 'edunn'} @@ -47,6 +36,10 @@ logging.error(err) exit(1) + +def migrate_bulk_data(src_index_name: str, dest_index_name: str): + """Stream documents from source index to destination index, which may """ + with get_opensearch_client_from_environment() as client: try: # ensure that sort field is in mapping to facilitate execution of reindexing sweeper necessary_mappings = { @@ -67,11 +60,28 @@ print(f'Reindex from {src_index_name} to {dest_index_name} failed: {err}') exit(1) - # TODO: Implement consistency check to ensure that all data was successfully migrated - # Run sweepers on the migrated data - try: - run_sweepers() - except Exception as err: - logging.error(f'Post-reindex sweeper execution failed with {err}') - exit(1) \ No newline at end of file +def run_sweepers(): + """Run sweepers on the migrated data""" + try: + run_sweepers() + except Exception as err: + logging.error(f'Post-reindex sweeper execution failed with {err}') + exit(1) + + +if __name__ == '__main__': + src_node_name = 'geo' + dest_pseudonode_name = f'edunn-{src_node_name}' # TODO: change this from 'edunn' to 'temp' + + # set node id env var to facilitate sweepers + os.environ["MULTITENANCY_NODE_ID"] = dest_pseudonode_name + + # change these to use a resolution function later - need to decouple resolve_multitenant_index_name() from env var + src_index_name = f'{src_node_name}-registry' + dest_index_name = f'{dest_pseudonode_name}-registry' + + ensure_valid_state(dest_index_name) + migrate_bulk_data(src_index_name, dest_index_name) + # TODO: Implement consistency check to ensure that all data was successfully migrated + run_sweepers() From 1b3e3b6f0be18cd46d829aa38d268a4304bf23d1 Mon Sep 17 00:00:00 2001 From: edunn Date: Mon, 9 Dec 2024 17:01:56 -0800 Subject: [PATCH 05/15] [bugged] implement consistency check this currently yields strange results which may indicate that OS/python are inconsistent in their lexical sorting --- scratch/manual-aoss-reindexer/reindex.py | 97 +++++++++++++++++++++++- 1 file changed, 94 insertions(+), 3 deletions(-) diff --git a/scratch/manual-aoss-reindexer/reindex.py b/scratch/manual-aoss-reindexer/reindex.py index a54c55f..fba5f0b 100644 --- a/scratch/manual-aoss-reindexer/reindex.py +++ b/scratch/manual-aoss-reindexer/reindex.py @@ -1,7 +1,9 @@ import logging import os +from typing import Iterator -from pds.registrysweepers.utils.db import query_registry_db_with_search_after, write_updated_docs +from pds.registrysweepers.utils import configure_logging +from pds.registrysweepers.utils.db import query_registry_db_with_search_after, write_updated_docs, get_query_hits_count from pds.registrysweepers.utils.db.client import get_opensearch_client_from_environment from pds.registrysweepers.utils.db.indexing import ensure_index_mapping from pds.registrysweepers.utils.db.update import Update @@ -39,6 +41,10 @@ def ensure_valid_state(dest_index_name: str): def migrate_bulk_data(src_index_name: str, dest_index_name: str): """Stream documents from source index to destination index, which may """ + if get_outstanding_document_count(src_index_name, dest_index_name, as_proportion=True) < 0.1: + logging.warning(f'Less than 10% of documents outstanding - skipping bulk streaming migration stage') + return + with get_opensearch_client_from_environment() as client: try: # ensure that sort field is in mapping to facilitate execution of reindexing sweeper @@ -61,6 +67,89 @@ def migrate_bulk_data(src_index_name: str, dest_index_name: str): exit(1) +def ensure_doc_consistency(src_index_name: str, dest_index_name: str): + """ + Ensure that all documents present in the source index are also present in the destination index. + Discovers and fixes any quiet failures encountered during bulk document streaming. + Yes, this could be accomplished within the bulk streaming, but implementation is simpler this way and there is less + opportunity for error. + """ + + logging.info(f'Ensuring document consistency - {get_outstanding_document_count(src_index_name, dest_index_name)} documents remain to copy from {src_index_name} to {dest_index_name}') + + with get_opensearch_client_from_environment() as client: + for doc_id in enumerate_outstanding_doc_ids(src_index_name, dest_index_name): + try: + src_doc = client.get(src_index_name, doc_id) + client.create(dest_index_name, doc_id, src_doc['_source']) + logging.info(f'Created missing doc with id {doc_id}') + except Exception as err: + logging.error(f'Failed to create doc with id "{doc_id}": {err}') + + if not get_outstanding_document_count(src_index_name, dest_index_name) == 0: + raise RuntimeError( + f'Failed to ensure consistency - there is remaining disparity in document count between indices "{src_index_name}" and "{dest_index_name}"') + + +def enumerate_outstanding_doc_ids(src_index_name: str, dest_index_name: str) -> Iterator[str]: + with get_opensearch_client_from_environment() as client: + + pseudoid_field = "lidvid" + + src_docs = iter(query_registry_db_with_search_after(client, src_index_name, {"query": {"match_all": {}}}, + {"includes": [pseudoid_field]}, + sort_fields=[pseudoid_field], request_timeout_seconds=20)) + dest_docs = iter(query_registry_db_with_search_after(client, dest_index_name, {"query": {"match_all": {}}}, + {"includes": [pseudoid_field]}, + sort_fields=[pseudoid_field], request_timeout_seconds=20)) + + # yield any documents which are present in source but not in destination + try: + src_doc = next(src_docs) + dest_doc = next(dest_docs) + + while True: + src_doc_pseudoid = src_doc["_source"][pseudoid_field] + src_doc_id = src_doc["_id"] + dest_doc_pseudoid = dest_doc["_source"][pseudoid_field] + dest_doc_id = dest_doc["_id"] + + + if src_doc_pseudoid < dest_doc_pseudoid: # if id present in src but not dest + yield src_doc_id + src_doc = next(src_docs) + elif dest_doc_pseudoid < src_doc_pseudoid: # if id present in dest but not src + logging.warning( + f'Document with id "{dest_doc_pseudoid}" is present in destination index {dest_index_name} file but not in source index {src_index_name}') + dest_doc = next(dest_docs) + else: # if id is present in both files + src_doc = next(src_docs) + dest_doc = next(dest_docs) + except StopIteration: + pass + + # yield any remaining documents in source iterable + try: + src_doc = next(src_docs) + while True: + src_doc_pseudoid = src_doc["_source"][pseudoid_field] + + yield src_doc_pseudoid + src_doc = next(src_docs) + except StopIteration: + pass + + +def get_outstanding_document_count(src_index_name: str, dest_index_name: str, as_proportion: bool = False) -> int: + """return count(src) - count(dest)""" + with get_opensearch_client_from_environment() as client: + src_docs_count = get_query_hits_count(client, src_index_name, {"query": {"match_all": {}}}) + dest_docs_count = get_query_hits_count(client, dest_index_name, {"query": {"match_all": {}}}) + + outstanding_docs_count = src_docs_count - dest_docs_count + return (outstanding_docs_count / src_docs_count) if as_proportion else outstanding_docs_count + + def run_sweepers(): """Run sweepers on the migrated data""" try: @@ -71,6 +160,8 @@ def run_sweepers(): if __name__ == '__main__': + configure_logging(filepath=None, log_level=logging.INFO) + src_node_name = 'geo' dest_pseudonode_name = f'edunn-{src_node_name}' # TODO: change this from 'edunn' to 'temp' @@ -83,5 +174,5 @@ def run_sweepers(): ensure_valid_state(dest_index_name) migrate_bulk_data(src_index_name, dest_index_name) - # TODO: Implement consistency check to ensure that all data was successfully migrated - run_sweepers() + ensure_doc_consistency(src_index_name, dest_index_name) + # run_sweepers() From 7a3aeb7ce86d52be8b07f4023922221b2b2e1166 Mon Sep 17 00:00:00 2001 From: edunn Date: Mon, 9 Dec 2024 20:39:32 -0800 Subject: [PATCH 06/15] [bugged] switch consistency check to non-streaming comparison of src/dest This avoids issues where ES/python have inconsistent lexical sort approaches --- scratch/manual-aoss-reindexer/reindex.py | 62 ++++++++---------------- 1 file changed, 20 insertions(+), 42 deletions(-) diff --git a/scratch/manual-aoss-reindexer/reindex.py b/scratch/manual-aoss-reindexer/reindex.py index fba5f0b..835730f 100644 --- a/scratch/manual-aoss-reindexer/reindex.py +++ b/scratch/manual-aoss-reindexer/reindex.py @@ -75,7 +75,8 @@ def ensure_doc_consistency(src_index_name: str, dest_index_name: str): opportunity for error. """ - logging.info(f'Ensuring document consistency - {get_outstanding_document_count(src_index_name, dest_index_name)} documents remain to copy from {src_index_name} to {dest_index_name}') + logging.info( + f'Ensuring document consistency - {get_outstanding_document_count(src_index_name, dest_index_name)} documents remain to copy from {src_index_name} to {dest_index_name}') with get_opensearch_client_from_environment() as client: for doc_id in enumerate_outstanding_doc_ids(src_index_name, dest_index_name): @@ -93,51 +94,28 @@ def ensure_doc_consistency(src_index_name: str, dest_index_name: str): def enumerate_outstanding_doc_ids(src_index_name: str, dest_index_name: str) -> Iterator[str]: with get_opensearch_client_from_environment() as client: - + pseudoid_field = "lidvid" src_docs = iter(query_registry_db_with_search_after(client, src_index_name, {"query": {"match_all": {}}}, - {"includes": [pseudoid_field]}, - sort_fields=[pseudoid_field], request_timeout_seconds=20)) + {"includes": [pseudoid_field]}, + sort_fields=[pseudoid_field], request_timeout_seconds=20)) dest_docs = iter(query_registry_db_with_search_after(client, dest_index_name, {"query": {"match_all": {}}}, - {"includes": [pseudoid_field]}, - sort_fields=[pseudoid_field], request_timeout_seconds=20)) + {"includes": [pseudoid_field]}, + sort_fields=[pseudoid_field], request_timeout_seconds=20)) # yield any documents which are present in source but not in destination - try: - src_doc = next(src_docs) - dest_doc = next(dest_docs) - - while True: - src_doc_pseudoid = src_doc["_source"][pseudoid_field] - src_doc_id = src_doc["_id"] - dest_doc_pseudoid = dest_doc["_source"][pseudoid_field] - dest_doc_id = dest_doc["_id"] - - - if src_doc_pseudoid < dest_doc_pseudoid: # if id present in src but not dest - yield src_doc_id - src_doc = next(src_docs) - elif dest_doc_pseudoid < src_doc_pseudoid: # if id present in dest but not src - logging.warning( - f'Document with id "{dest_doc_pseudoid}" is present in destination index {dest_index_name} file but not in source index {src_index_name}') - dest_doc = next(dest_docs) - else: # if id is present in both files - src_doc = next(src_docs) - dest_doc = next(dest_docs) - except StopIteration: - pass - - # yield any remaining documents in source iterable - try: - src_doc = next(src_docs) - while True: - src_doc_pseudoid = src_doc["_source"][pseudoid_field] + src_ids = {doc["_id"] for doc in src_docs} + dest_ids = {doc["_id"] for doc in dest_docs} + + ids_missing_from_src = dest_ids.difference(src_ids) + if len(ids_missing_from_src) > 0: + logging.error( + f'{len(ids_missing_from_src)} ids are present in {dest_index_name} but not in {src_index_name} - this indicates a potential error: {sorted(ids_missing_from_src)}') + exit(1) - yield src_doc_pseudoid - src_doc = next(src_docs) - except StopIteration: - pass + ids_missing_from_dest = src_ids.difference(dest_ids) + return iter(ids_missing_from_dest) def get_outstanding_document_count(src_index_name: str, dest_index_name: str, as_proportion: bool = False) -> int: @@ -150,13 +128,13 @@ def get_outstanding_document_count(src_index_name: str, dest_index_name: str, as return (outstanding_docs_count / src_docs_count) if as_proportion else outstanding_docs_count -def run_sweepers(): +def run_registry_sweepers(): """Run sweepers on the migrated data""" try: run_sweepers() except Exception as err: logging.error(f'Post-reindex sweeper execution failed with {err}') - exit(1) + raise err if __name__ == '__main__': @@ -175,4 +153,4 @@ def run_sweepers(): ensure_valid_state(dest_index_name) migrate_bulk_data(src_index_name, dest_index_name) ensure_doc_consistency(src_index_name, dest_index_name) - # run_sweepers() + run_registry_sweepers() From 946429e4e2eaef94735c4051b8bde35007c16a59 Mon Sep 17 00:00:00 2001 From: edunn Date: Tue, 10 Dec 2024 15:26:33 -0800 Subject: [PATCH 07/15] switch to atm arguments TODO: implement argparse CLI --- scratch/manual-aoss-reindexer/reindex.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scratch/manual-aoss-reindexer/reindex.py b/scratch/manual-aoss-reindexer/reindex.py index 835730f..5e3f44b 100644 --- a/scratch/manual-aoss-reindexer/reindex.py +++ b/scratch/manual-aoss-reindexer/reindex.py @@ -140,8 +140,8 @@ def run_registry_sweepers(): if __name__ == '__main__': configure_logging(filepath=None, log_level=logging.INFO) - src_node_name = 'geo' - dest_pseudonode_name = f'edunn-{src_node_name}' # TODO: change this from 'edunn' to 'temp' + src_node_name = 'atm' + dest_pseudonode_name = f'temp-{src_node_name}' # set node id env var to facilitate sweepers os.environ["MULTITENANCY_NODE_ID"] = dest_pseudonode_name From 44df072619d73270016101ff33e5432b3c46bed5 Mon Sep 17 00:00:00 2001 From: edunn Date: Thu, 12 Dec 2024 14:20:02 -0800 Subject: [PATCH 08/15] various minor updates --- scratch/manual-aoss-reindexer/reindex.py | 40 +++++++++++++++++------- 1 file changed, 29 insertions(+), 11 deletions(-) diff --git a/scratch/manual-aoss-reindexer/reindex.py b/scratch/manual-aoss-reindexer/reindex.py index 5e3f44b..0894243 100644 --- a/scratch/manual-aoss-reindexer/reindex.py +++ b/scratch/manual-aoss-reindexer/reindex.py @@ -1,5 +1,7 @@ import logging import os +import time +from datetime import timedelta, datetime from typing import Iterator from pds.registrysweepers.utils import configure_logging @@ -9,6 +11,12 @@ from pds.registrysweepers.utils.db.update import Update from pds.registrysweepers.driver import run as run_sweepers +from pds.registrysweepers.utils.misc import get_human_readable_elapsed_since + +# Baseline mappings which are required to facilitate successful execution before any data is copied +necessary_mappings = { + 'lidvid': 'keyword' +} def ensure_valid_state(dest_index_name: str): @@ -32,6 +40,10 @@ def ensure_valid_state(dest_index_name: str): raise RuntimeError( f'Index "{dest_index_name}" is not correctly configured - "dynamic" mapping setting is not set to "false - aborting"') + # ensure necessary mappings to facilitate execution of reindexing sweeper and other steps + for property_name, property_mapping_type in necessary_mappings.items(): + ensure_index_mapping(client, dest_index_name, property_name, property_mapping_type) + # other conditions may be populated later except Exception as err: @@ -47,13 +59,6 @@ def migrate_bulk_data(src_index_name: str, dest_index_name: str): with get_opensearch_client_from_environment() as client: try: - # ensure that sort field is in mapping to facilitate execution of reindexing sweeper - necessary_mappings = { - 'lidvid': 'keyword' - } - for property_name, property_mapping_type in necessary_mappings.items(): - ensure_index_mapping(client, dest_index_name, property_name, property_mapping_type) - sort_keys = sorted(necessary_mappings.keys()) docs = query_registry_db_with_search_after(client, src_index_name, {"query": {"match_all": {}}}, {}, @@ -79,6 +84,8 @@ def ensure_doc_consistency(src_index_name: str, dest_index_name: str): f'Ensuring document consistency - {get_outstanding_document_count(src_index_name, dest_index_name)} documents remain to copy from {src_index_name} to {dest_index_name}') with get_opensearch_client_from_environment() as client: + # TODO instead of iteratively pulling/creating, use multi-search (maybe) and bulk write (definitely) to avoid + # the request overhead for doc_id in enumerate_outstanding_doc_ids(src_index_name, dest_index_name): try: src_doc = client.get(src_index_name, doc_id) @@ -87,14 +94,11 @@ def ensure_doc_consistency(src_index_name: str, dest_index_name: str): except Exception as err: logging.error(f'Failed to create doc with id "{doc_id}": {err}') - if not get_outstanding_document_count(src_index_name, dest_index_name) == 0: - raise RuntimeError( - f'Failed to ensure consistency - there is remaining disparity in document count between indices "{src_index_name}" and "{dest_index_name}"') + wait_until_indexed() def enumerate_outstanding_doc_ids(src_index_name: str, dest_index_name: str) -> Iterator[str]: with get_opensearch_client_from_environment() as client: - pseudoid_field = "lidvid" src_docs = iter(query_registry_db_with_search_after(client, src_index_name, {"query": {"match_all": {}}}, @@ -123,11 +127,25 @@ def get_outstanding_document_count(src_index_name: str, dest_index_name: str, as with get_opensearch_client_from_environment() as client: src_docs_count = get_query_hits_count(client, src_index_name, {"query": {"match_all": {}}}) dest_docs_count = get_query_hits_count(client, dest_index_name, {"query": {"match_all": {}}}) + logging.info(f'index {src_index_name} contains {src_docs_count} docs') + logging.info(f'index {dest_index_name} contains {dest_docs_count} docs') outstanding_docs_count = src_docs_count - dest_docs_count return (outstanding_docs_count / src_docs_count) if as_proportion else outstanding_docs_count +def wait_until_indexed(timeout: timedelta = timedelta(minutes=30)): + start = datetime.now() + expiry = start + timeout + + while get_outstanding_document_count(src_index_name, dest_index_name) > 0: + if datetime.now() > expiry: + raise RuntimeError( + f'Failed to ensure consistency - there is remaining disparity in document count between indices "{src_index_name}" and "{dest_index_name}" after {get_human_readable_elapsed_since(start)}') + logging.info('Waiting for indexation to complete...') + time.sleep(15) + + def run_registry_sweepers(): """Run sweepers on the migrated data""" try: From efa7ebba7f59aa6c534b8b119b0716be909acc96 Mon Sep 17 00:00:00 2001 From: edunn Date: Thu, 12 Dec 2024 16:43:10 -0800 Subject: [PATCH 09/15] add harvest_date_time to necessary mappings without this in place prior to migration, the reindexer sweeper will fail --- scratch/manual-aoss-reindexer/reindex.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/scratch/manual-aoss-reindexer/reindex.py b/scratch/manual-aoss-reindexer/reindex.py index 0894243..8845518 100644 --- a/scratch/manual-aoss-reindexer/reindex.py +++ b/scratch/manual-aoss-reindexer/reindex.py @@ -15,7 +15,8 @@ # Baseline mappings which are required to facilitate successful execution before any data is copied necessary_mappings = { - 'lidvid': 'keyword' + 'lidvid': 'keyword', + 'ops:Harvest_Info/ops:harvest_date_time': 'date' } From 4aa9bb43cdc45298df18fb692b6d210bdd1bda33 Mon Sep 17 00:00:00 2001 From: edunn Date: Wed, 29 Jan 2025 10:07:32 -0800 Subject: [PATCH 10/15] reduce page size to prevent too-large response in some cases of large documents this is a bandaid, but should be replaced by detection/handling of "RequestError(400, 'illegal_argument_exception', 'ReleasableBytesStreamOutput cannot hold more than 2GB of data'" such that --- src/pds/registrysweepers/utils/db/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pds/registrysweepers/utils/db/__init__.py b/src/pds/registrysweepers/utils/db/__init__.py index b317613..2748361 100644 --- a/src/pds/registrysweepers/utils/db/__init__.py +++ b/src/pds/registrysweepers/utils/db/__init__.py @@ -123,7 +123,7 @@ def query_registry_db_with_search_after( index_name: str, query: Dict, _source: Dict, - page_size: int = 10000, + page_size: int = 5000, limit: Union[int, None] = None, sort_fields: Union[List[str], None] = None, request_timeout_seconds: int = 20, From 7f55fa8ceb695c6687767b6947d040bae6db472a Mon Sep 17 00:00:00 2001 From: edunn Date: Wed, 29 Jan 2025 11:11:20 -0800 Subject: [PATCH 11/15] implement index alias resolution utility functions --- .../registrysweepers/utils/db/multitenancy.py | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/pds/registrysweepers/utils/db/multitenancy.py b/src/pds/registrysweepers/utils/db/multitenancy.py index 90bd454..c13a678 100644 --- a/src/pds/registrysweepers/utils/db/multitenancy.py +++ b/src/pds/registrysweepers/utils/db/multitenancy.py @@ -1,5 +1,8 @@ +import logging import os +from opensearchpy import OpenSearch + def resolve_multitenant_index_name(index_type: str): supported_index_types = {"registry", "registry-refs", "registry-dd"} @@ -11,3 +14,20 @@ def resolve_multitenant_index_name(index_type: str): raise ValueError(f'index_type "{index_type}" not supported (expected one of {supported_index_types})') else: return f"{node_id}-{index_type}" + + +def index_exists(client: OpenSearch, index_or_alias_name: str) -> bool: + # counterintuitively, indices.exists does not return False if its argument is an alias. + # Possibly a bug in opensearch-py: https://github.com/opensearch-project/opensearch-py/issues/888 + return client.indices.exists(index_or_alias_name) and not client.indices.exists_alias(index_or_alias_name) + + +def resolve_index_name_if_aliased(client: OpenSearch, index_or_alias_name: str) -> str: + if index_exists(client, index_or_alias_name): + return index_or_alias_name + elif client.indices.exists_alias(index_or_alias_name): + index_name = next(iter(client.indices.get_alias('index_or_alias_name').keys())) + logging.debug(f'Resolved alias {index_or_alias_name} to index {index_name}') + return index_name + else: + raise ValueError(f'Could not resolve index for index_or_alias_name "{index_or_alias_name}"') From 128fa9de09b2da0cdddd5926baf5d6ccc66c25a6 Mon Sep 17 00:00:00 2001 From: edunn Date: Wed, 29 Jan 2025 14:19:11 -0800 Subject: [PATCH 12/15] incorporate resolve_index_name_if_aliased() into multitenant index name resolution and all calls --- src/pds/registrysweepers/ancestry/__init__.py | 10 +++++----- src/pds/registrysweepers/ancestry/generation.py | 4 ++-- src/pds/registrysweepers/ancestry/queries.py | 12 ++++++------ src/pds/registrysweepers/provenance/__init__.py | 4 ++-- src/pds/registrysweepers/reindexer/main.py | 4 ++-- src/pds/registrysweepers/repairkit/__init__.py | 6 +++--- src/pds/registrysweepers/utils/db/multitenancy.py | 8 ++++---- 7 files changed, 24 insertions(+), 24 deletions(-) diff --git a/src/pds/registrysweepers/ancestry/__init__.py b/src/pds/registrysweepers/ancestry/__init__.py index 7d85c0c..75910b5 100644 --- a/src/pds/registrysweepers/ancestry/__init__.py +++ b/src/pds/registrysweepers/ancestry/__init__.py @@ -71,18 +71,18 @@ def run( METADATA_PARENT_COLLECTION_KEY, SWEEPERS_ANCESTRY_VERSION_METADATA_KEY, ]: - ensure_index_mapping(client, resolve_multitenant_index_name("registry"), metadata_key, "keyword") + ensure_index_mapping(client, resolve_multitenant_index_name(client, "registry"), metadata_key, "keyword") for metadata_key in [ SWEEPERS_ANCESTRY_VERSION_METADATA_KEY, ]: - ensure_index_mapping(client, resolve_multitenant_index_name("registry-refs"), metadata_key, "keyword") + ensure_index_mapping(client, resolve_multitenant_index_name(client, "registry-refs"), metadata_key, "keyword") log.info("Writing bulk updates to database...") write_updated_docs( client, updates, - index_name=resolve_multitenant_index_name("registry"), + index_name=resolve_multitenant_index_name(client, "registry"), ) log.info("Generating updates from deferred records...") deferred_updates = generate_deferred_updates(client, deferred_records_file.name, registry_mock_query_f) @@ -91,7 +91,7 @@ def run( write_updated_docs( client, deferred_updates, - index_name=resolve_multitenant_index_name("registry"), + index_name=resolve_multitenant_index_name(client, "registry"), ) else: # consume generator to dump bulk updates to sink @@ -99,7 +99,7 @@ def run( pass log.info("Checking indexes for orphaned documents") - index_names = [resolve_multitenant_index_name(index_label) for index_label in ["registry", "registry-refs"]] + index_names = [resolve_multitenant_index_name(client, index_label) for index_label in ["registry", "registry-refs"]] for index_name in index_names: if log.isEnabledFor(logging.DEBUG): orphaned_docs = get_orphaned_documents(client, registry_mock_query_f, index_name) diff --git a/src/pds/registrysweepers/ancestry/generation.py b/src/pds/registrysweepers/ancestry/generation.py index 14c4cf3..6ca8dcc 100644 --- a/src/pds/registrysweepers/ancestry/generation.py +++ b/src/pds/registrysweepers/ancestry/generation.py @@ -425,7 +425,7 @@ def _get_nonaggregate_ancestry_records_with_chunking( isinstance(err, KeyError) and most_recent_attempted_collection_lidvid not in bundle_ancestry_by_collection_lidvid ): - probable_cause = f'[Probable Cause]: Collection primary document with id "{doc["_source"].get("collection_lidvid")}" not found in index {resolve_multitenant_index_name("registry")} for {resolve_multitenant_index_name("registry-refs")} doc with id "{doc.get("_id")}"' + probable_cause = f'[Probable Cause]: Collection primary document with id "{doc["_source"].get("collection_lidvid")}" not found in index {resolve_multitenant_index_name(client, "registry")} for {resolve_multitenant_index_name(client, "registry-refs")} doc with id "{doc.get("_id")}"' elif isinstance(err, ValueError): probable_cause = f'[Probable Cause]: Failed to parse collection and/or product LIDVIDs from document with id "{doc.get("_id")}" in index "{doc.get("_index")}" due to {type(err).__name__}: {err}' else: @@ -490,6 +490,6 @@ def generate_update(doc: RefDocBookkeepingEntry) -> Update: f"Updating {len(docs)} registry-refs docs with {SWEEPERS_ANCESTRY_VERSION_METADATA_KEY}={SWEEPERS_ANCESTRY_VERSION}" ) write_updated_docs( - client, updates, index_name=resolve_multitenant_index_name("registry-refs"), bulk_chunk_max_update_count=20000 + client, updates, index_name=resolve_multitenant_index_name(client, "registry-refs"), bulk_chunk_max_update_count=20000 ) logging.info("registry-refs metadata update complete") diff --git a/src/pds/registrysweepers/ancestry/queries.py b/src/pds/registrysweepers/ancestry/queries.py index 288b4a6..95a2ee7 100644 --- a/src/pds/registrysweepers/ancestry/queries.py +++ b/src/pds/registrysweepers/ancestry/queries.py @@ -42,7 +42,7 @@ def get_bundle_ancestry_records_query(client: OpenSearch, db_mock: DbMockTypeDef query = product_class_query_factory(ProductClass.BUNDLE) _source = {"includes": ["lidvid", SWEEPERS_ANCESTRY_VERSION_METADATA_KEY]} query_f = query_registry_db_or_mock(db_mock, "get_bundle_ancestry_records", use_search_after=True) - docs = query_f(client, resolve_multitenant_index_name("registry"), query, _source) + docs = query_f(client, resolve_multitenant_index_name(client, "registry"), query, _source) return docs @@ -51,7 +51,7 @@ def get_collection_ancestry_records_bundles_query(client: OpenSearch, db_mock: D query = product_class_query_factory(ProductClass.BUNDLE) _source = {"includes": ["lidvid", "ref_lid_collection"]} query_f = query_registry_db_or_mock(db_mock, "get_collection_ancestry_records_bundles", use_search_after=True) - docs = query_f(client, resolve_multitenant_index_name("registry"), query, _source) + docs = query_f(client, resolve_multitenant_index_name(client, "registry"), query, _source) return docs @@ -63,7 +63,7 @@ def get_collection_ancestry_records_collections_query( query = product_class_query_factory(ProductClass.COLLECTION) _source = {"includes": ["lidvid", SWEEPERS_ANCESTRY_VERSION_METADATA_KEY]} query_f = query_registry_db_or_mock(db_mock, "get_collection_ancestry_records_collections", use_search_after=True) - docs = query_f(client, resolve_multitenant_index_name("registry"), query, _source) + docs = query_f(client, resolve_multitenant_index_name(client, "registry"), query, _source) return docs @@ -84,7 +84,7 @@ def get_nonaggregate_ancestry_records_query(client: OpenSearch, registry_db_mock # each document will have many product lidvids, so a smaller page size is warranted here docs = query_f( client, - resolve_multitenant_index_name("registry-refs"), + resolve_multitenant_index_name(client, "registry-refs"), query, _source, page_size=AncestryRuntimeConstants.nonaggregate_ancestry_records_query_page_size, @@ -118,7 +118,7 @@ def get_nonaggregate_ancestry_records_for_collection_lid_query( # each document will have many product lidvids, so a smaller page size is warranted here docs = query_f( client, - resolve_multitenant_index_name("registry-refs"), + resolve_multitenant_index_name(client, "registry-refs"), query, _source, page_size=AncestryRuntimeConstants.nonaggregate_ancestry_records_query_page_size, @@ -185,7 +185,7 @@ def get_existing_ancestry_for_product( docs = query_f( client, - resolve_multitenant_index_name("registry"), + resolve_multitenant_index_name(client, "registry"), query, _source, ) diff --git a/src/pds/registrysweepers/provenance/__init__.py b/src/pds/registrysweepers/provenance/__init__.py index 3f99b50..26ff0d9 100644 --- a/src/pds/registrysweepers/provenance/__init__.py +++ b/src/pds/registrysweepers/provenance/__init__.py @@ -73,7 +73,7 @@ def get_records(client: OpenSearch) -> Iterable[ProvenanceRecord]: } _source = {"includes": ["lidvid", METADATA_SUCCESSOR_KEY, SWEEPERS_PROVENANCE_VERSION_METADATA_KEY]} - docs = query_registry_db_with_search_after(client, resolve_multitenant_index_name("registry"), query, _source) + docs = query_registry_db_with_search_after(client, resolve_multitenant_index_name(client, "registry"), query, _source) for doc in docs: try: @@ -141,7 +141,7 @@ def run( write_updated_docs( client, updates, - index_name=resolve_multitenant_index_name("registry"), + index_name=resolve_multitenant_index_name(client, "registry"), ) log.info("Completed provenance sweeper processing!") diff --git a/src/pds/registrysweepers/reindexer/main.py b/src/pds/registrysweepers/reindexer/main.py index c07f19c..baeeef4 100644 --- a/src/pds/registrysweepers/reindexer/main.py +++ b/src/pds/registrysweepers/reindexer/main.py @@ -55,7 +55,7 @@ def get_docs_query(filter_to_harvested_before: datetime): def fetch_dd_field_types(client: OpenSearch) -> Dict[str, str]: - dd_index_name = resolve_multitenant_index_name("registry-dd") + dd_index_name = resolve_multitenant_index_name(client, "registry-dd") name_key = "es_field_name" type_key = "es_data_type" dd_docs = query_registry_db_with_search_after( @@ -279,7 +279,7 @@ def run( configure_logging(filepath=log_filepath, log_level=log_level) sweeper_start_timestamp = datetime.now() - products_index_name = resolve_multitenant_index_name("registry") + products_index_name = resolve_multitenant_index_name(client, "registry") ensure_index_mapping(client, products_index_name, REINDEXER_FLAG_METADATA_KEY, "date") dd_field_types_by_field_name = fetch_dd_field_types(client) diff --git a/src/pds/registrysweepers/repairkit/__init__.py b/src/pds/registrysweepers/repairkit/__init__.py index fa2be0f..09dcb0c 100644 --- a/src/pds/registrysweepers/repairkit/__init__.py +++ b/src/pds/registrysweepers/repairkit/__init__.py @@ -100,7 +100,7 @@ def get_unprocessed_docs_query(): # page_size and bulk_chunk_max_update_count constraints are necessary to avoid choking nodes with very-large docs # i.e. ATM and GEO - index_name = resolve_multitenant_index_name("registry") + index_name = resolve_multitenant_index_name(client, "registry") update_max_chunk_size = 20000 while get_query_hits_count(client, index_name, get_unprocessed_docs_query()) > 0: all_docs = query_registry_db_with_search_after( @@ -114,12 +114,12 @@ def get_unprocessed_docs_query(): ) updates = generate_updates(all_docs, SWEEPERS_REPAIRKIT_VERSION_METADATA_KEY, SWEEPERS_REPAIRKIT_VERSION) ensure_index_mapping( - client, resolve_multitenant_index_name("registry"), SWEEPERS_REPAIRKIT_VERSION_METADATA_KEY, "integer" + client, resolve_multitenant_index_name(client, "registry"), SWEEPERS_REPAIRKIT_VERSION_METADATA_KEY, "integer" ) write_updated_docs( client, updates, - index_name=resolve_multitenant_index_name("registry"), + index_name=resolve_multitenant_index_name(client, "registry"), bulk_chunk_max_update_count=update_max_chunk_size, ) diff --git a/src/pds/registrysweepers/utils/db/multitenancy.py b/src/pds/registrysweepers/utils/db/multitenancy.py index c13a678..6c9152e 100644 --- a/src/pds/registrysweepers/utils/db/multitenancy.py +++ b/src/pds/registrysweepers/utils/db/multitenancy.py @@ -4,16 +4,16 @@ from opensearchpy import OpenSearch -def resolve_multitenant_index_name(index_type: str): +def resolve_multitenant_index_name(client, index_type: str): supported_index_types = {"registry", "registry-refs", "registry-dd"} node_id = os.environ.get("MULTITENANCY_NODE_ID", "").strip(" ") if node_id == "": - return index_type + return resolve_index_name_if_aliased(client, index_type) elif index_type not in supported_index_types: raise ValueError(f'index_type "{index_type}" not supported (expected one of {supported_index_types})') else: - return f"{node_id}-{index_type}" + return resolve_index_name_if_aliased(client, f"{node_id}-{index_type}") def index_exists(client: OpenSearch, index_or_alias_name: str) -> bool: @@ -26,7 +26,7 @@ def resolve_index_name_if_aliased(client: OpenSearch, index_or_alias_name: str) if index_exists(client, index_or_alias_name): return index_or_alias_name elif client.indices.exists_alias(index_or_alias_name): - index_name = next(iter(client.indices.get_alias('index_or_alias_name').keys())) + index_name = next(iter(client.indices.get(index_or_alias_name).keys())) logging.debug(f'Resolved alias {index_or_alias_name} to index {index_name}') return index_name else: From 37b8b09d34e1c5ee39f598e4196257ee550d3ac5 Mon Sep 17 00:00:00 2001 From: edunn Date: Thu, 30 Jan 2025 00:00:56 -0800 Subject: [PATCH 13/15] reinstate use of utils.misc.is_dev_mode() in driver code this was removed during rebase/merge in d0b85a5f --- src/pds/registrysweepers/driver.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/pds/registrysweepers/driver.py b/src/pds/registrysweepers/driver.py index d4f86ff..9288c5a 100644 --- a/src/pds/registrysweepers/driver.py +++ b/src/pds/registrysweepers/driver.py @@ -68,12 +68,14 @@ from pds.registrysweepers.utils.db.client import get_opensearch_client_from_environment from pds.registrysweepers.utils.misc import get_human_readable_elapsed_since +from src.pds.registrysweepers.utils.misc import is_dev_mode + def run(): configure_logging(filepath=None, log_level=logging.INFO) log = logging.getLogger(__name__) - dev_mode = str(os.environ.get("DEV_MODE")).lower() not in {'none', '', '0', 'false'} + dev_mode = is_dev_mode() if dev_mode: log.warning('Operating in development mode - host verification disabled') import urllib3 From 8425128293348da7c888dadabbc209864dae0e71 Mon Sep 17 00:00:00 2001 From: edunn Date: Fri, 31 Jan 2025 12:13:19 -0800 Subject: [PATCH 14/15] lint --- src/pds/registrysweepers/ancestry/__init__.py | 4 +- .../registrysweepers/ancestry/generation.py | 5 +- src/pds/registrysweepers/driver.py | 51 +++++++++---------- .../registrysweepers/provenance/__init__.py | 4 +- .../registrysweepers/repairkit/__init__.py | 5 +- src/pds/registrysweepers/utils/db/__init__.py | 2 +- .../registrysweepers/utils/db/multitenancy.py | 2 +- 7 files changed, 40 insertions(+), 33 deletions(-) diff --git a/src/pds/registrysweepers/ancestry/__init__.py b/src/pds/registrysweepers/ancestry/__init__.py index 75910b5..edf996f 100644 --- a/src/pds/registrysweepers/ancestry/__init__.py +++ b/src/pds/registrysweepers/ancestry/__init__.py @@ -76,7 +76,9 @@ def run( for metadata_key in [ SWEEPERS_ANCESTRY_VERSION_METADATA_KEY, ]: - ensure_index_mapping(client, resolve_multitenant_index_name(client, "registry-refs"), metadata_key, "keyword") + ensure_index_mapping( + client, resolve_multitenant_index_name(client, "registry-refs"), metadata_key, "keyword" + ) log.info("Writing bulk updates to database...") write_updated_docs( diff --git a/src/pds/registrysweepers/ancestry/generation.py b/src/pds/registrysweepers/ancestry/generation.py index 6ca8dcc..d3ba7f2 100644 --- a/src/pds/registrysweepers/ancestry/generation.py +++ b/src/pds/registrysweepers/ancestry/generation.py @@ -490,6 +490,9 @@ def generate_update(doc: RefDocBookkeepingEntry) -> Update: f"Updating {len(docs)} registry-refs docs with {SWEEPERS_ANCESTRY_VERSION_METADATA_KEY}={SWEEPERS_ANCESTRY_VERSION}" ) write_updated_docs( - client, updates, index_name=resolve_multitenant_index_name(client, "registry-refs"), bulk_chunk_max_update_count=20000 + client, + updates, + index_name=resolve_multitenant_index_name(client, "registry-refs"), + bulk_chunk_max_update_count=20000, ) logging.info("registry-refs metadata update complete") diff --git a/src/pds/registrysweepers/driver.py b/src/pds/registrysweepers/driver.py index 9288c5a..57556c3 100644 --- a/src/pds/registrysweepers/driver.py +++ b/src/pds/registrysweepers/driver.py @@ -52,23 +52,25 @@ # executable. # # - +import argparse import functools import inspect -import argparse import json import logging import os from datetime import datetime from typing import Callable -from pds.registrysweepers import provenance, ancestry, repairkit, legacy_registry_sync +from pds.registrysweepers import ancestry +from pds.registrysweepers import legacy_registry_sync +from pds.registrysweepers import provenance +from pds.registrysweepers import repairkit from pds.registrysweepers.reindexer import main as reindexer -from pds.registrysweepers.utils import configure_logging, parse_log_level +from pds.registrysweepers.utils import configure_logging +from pds.registrysweepers.utils import parse_log_level from pds.registrysweepers.utils.db.client import get_opensearch_client_from_environment from pds.registrysweepers.utils.misc import get_human_readable_elapsed_since - -from src.pds.registrysweepers.utils.misc import is_dev_mode +from pds.registrysweepers.utils.misc import is_dev_mode def run(): @@ -77,13 +79,12 @@ def run(): dev_mode = is_dev_mode() if dev_mode: - log.warning('Operating in development mode - host verification disabled') + log.warning("Operating in development mode - host verification disabled") import urllib3 urllib3.disable_warnings() - log_level = parse_log_level(os.environ.get('LOGLEVEL', 'INFO')) - + log_level = parse_log_level(os.environ.get("LOGLEVEL", "INFO")) def run_factory(sweeper_f: Callable) -> Callable: return functools.partial( @@ -91,37 +92,29 @@ def run_factory(sweeper_f: Callable) -> Callable: client=get_opensearch_client_from_environment(verify_certs=True if not dev_mode else False), # enable for development if required - not necessary in production # log_filepath='registry-sweepers.log', - log_level=log_level + log_level=log_level, ) - parser = argparse.ArgumentParser( - prog='registry-sweepers', - description='sweeps the PDS registry with different routines meant to run regularly on the database' + prog="registry-sweepers", + description="sweeps the PDS registry with different routines meant to run regularly on the database", ) # define optional sweepers - parser.add_argument('--legacy-sync', action='store_true') - optional_sweepers = { - 'legacy_sync': legacy_registry_sync.run - } + parser.add_argument("--legacy-sync", action="store_true") + optional_sweepers = {"legacy_sync": legacy_registry_sync.run} args = parser.parse_args() # Define default sweepers to be run here, in order of execution - sweepers = [ - repairkit.run, - provenance.run, - ancestry.run, - reindexer.run - ] + sweepers = [repairkit.run, provenance.run, ancestry.run, reindexer.run] for option, sweeper in optional_sweepers.items(): if getattr(args, option): sweepers.append(sweeper) sweeper_descriptions = [inspect.getmodule(f).__name__ for f in sweepers] - log.info(f'Running sweepers: {sweeper_descriptions}') + log.info(f"Running sweepers: {sweeper_descriptions}") total_execution_begin = datetime.now() @@ -134,7 +127,11 @@ def run_factory(sweeper_f: Callable) -> Callable: run_sweeper_f() sweeper_name = inspect.getmodule(sweeper).__name__ - sweeper_execution_duration_strs.append(f'{sweeper_name}: {get_human_readable_elapsed_since(sweeper_execution_begin)}') + sweeper_execution_duration_strs.append( + f"{sweeper_name}: {get_human_readable_elapsed_since(sweeper_execution_begin)}" + ) - log.info(f'Sweepers successfully executed in {get_human_readable_elapsed_since(total_execution_begin)}\n ' - + '\n '.join(sweeper_execution_duration_strs)) + log.info( + f"Sweepers successfully executed in {get_human_readable_elapsed_since(total_execution_begin)}\n " + + "\n ".join(sweeper_execution_duration_strs) + ) diff --git a/src/pds/registrysweepers/provenance/__init__.py b/src/pds/registrysweepers/provenance/__init__.py index 26ff0d9..bb06509 100644 --- a/src/pds/registrysweepers/provenance/__init__.py +++ b/src/pds/registrysweepers/provenance/__init__.py @@ -73,7 +73,9 @@ def get_records(client: OpenSearch) -> Iterable[ProvenanceRecord]: } _source = {"includes": ["lidvid", METADATA_SUCCESSOR_KEY, SWEEPERS_PROVENANCE_VERSION_METADATA_KEY]} - docs = query_registry_db_with_search_after(client, resolve_multitenant_index_name(client, "registry"), query, _source) + docs = query_registry_db_with_search_after( + client, resolve_multitenant_index_name(client, "registry"), query, _source + ) for doc in docs: try: diff --git a/src/pds/registrysweepers/repairkit/__init__.py b/src/pds/registrysweepers/repairkit/__init__.py index 09dcb0c..a53aed1 100644 --- a/src/pds/registrysweepers/repairkit/__init__.py +++ b/src/pds/registrysweepers/repairkit/__init__.py @@ -114,7 +114,10 @@ def get_unprocessed_docs_query(): ) updates = generate_updates(all_docs, SWEEPERS_REPAIRKIT_VERSION_METADATA_KEY, SWEEPERS_REPAIRKIT_VERSION) ensure_index_mapping( - client, resolve_multitenant_index_name(client, "registry"), SWEEPERS_REPAIRKIT_VERSION_METADATA_KEY, "integer" + client, + resolve_multitenant_index_name(client, "registry"), + SWEEPERS_REPAIRKIT_VERSION_METADATA_KEY, + "integer", ) write_updated_docs( client, diff --git a/src/pds/registrysweepers/utils/db/__init__.py b/src/pds/registrysweepers/utils/db/__init__.py index 2748361..0e8d7f6 100644 --- a/src/pds/registrysweepers/utils/db/__init__.py +++ b/src/pds/registrysweepers/utils/db/__init__.py @@ -279,7 +279,7 @@ def write_updated_docs( updates: Iterable[Update], index_name: str, bulk_chunk_max_update_count: Union[int, None] = None, - as_upsert: bool = False + as_upsert: bool = False, ): log.info("Writing document updates...") updated_doc_count = 0 diff --git a/src/pds/registrysweepers/utils/db/multitenancy.py b/src/pds/registrysweepers/utils/db/multitenancy.py index 6c9152e..d4c667d 100644 --- a/src/pds/registrysweepers/utils/db/multitenancy.py +++ b/src/pds/registrysweepers/utils/db/multitenancy.py @@ -27,7 +27,7 @@ def resolve_index_name_if_aliased(client: OpenSearch, index_or_alias_name: str) return index_or_alias_name elif client.indices.exists_alias(index_or_alias_name): index_name = next(iter(client.indices.get(index_or_alias_name).keys())) - logging.debug(f'Resolved alias {index_or_alias_name} to index {index_name}') + logging.debug(f"Resolved alias {index_or_alias_name} to index {index_name}") return index_name else: raise ValueError(f'Could not resolve index for index_or_alias_name "{index_or_alias_name}"') From be6086e76b19c526ae785aab9a0c63f6348c0828 Mon Sep 17 00:00:00 2001 From: edunn Date: Fri, 31 Jan 2025 12:14:49 -0800 Subject: [PATCH 15/15] implement support for unit test stubs in multitenancy.resolve_multitenant_index_name() --- src/pds/registrysweepers/utils/db/multitenancy.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/pds/registrysweepers/utils/db/multitenancy.py b/src/pds/registrysweepers/utils/db/multitenancy.py index d4c667d..4d7a808 100644 --- a/src/pds/registrysweepers/utils/db/multitenancy.py +++ b/src/pds/registrysweepers/utils/db/multitenancy.py @@ -8,6 +8,9 @@ def resolve_multitenant_index_name(client, index_type: str): supported_index_types = {"registry", "registry-refs", "registry-dd"} node_id = os.environ.get("MULTITENANCY_NODE_ID", "").strip(" ") + if client is None: + return index_type + if node_id == "": return resolve_index_name_if_aliased(client, index_type) elif index_type not in supported_index_types: