diff --git a/docker/sweepers_driver.py b/docker/sweepers_driver.py index a4df683..9d05e0e 100755 --- a/docker/sweepers_driver.py +++ b/docker/sweepers_driver.py @@ -1,138 +1,8 @@ #! /usr/bin/env python3 -# -# Copyright © 2023, California Institute of Technology ("Caltech"). -# U.S. Government sponsorship acknowledged. -# -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# -# • Redistributions of source code must retain the above copyright notice, -# this list of conditions and the following disclaimer. -# • Redistributions must reproduce the above copyright notice, this list of -# conditions and the following disclaimer in the documentation and/or other -# materials provided with the distribution. -# • Neither the name of Caltech nor its operating division, the Jet Propulsion -# Laboratory, nor the names of its contributors may be used to endorse or -# promote products derived from this software without specific prior written -# permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE -# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -# POSSIBILITY OF SUCH DAMAGE. -# -# Python driver for provenance (OUTDATED TODO: Update documentation) -# ============================ -# -# This script is provided to support the scheduled execution of PDS Registry -# Provenance, typically in AWS via Event Bridge and ECS/Fargate. -# -# This script makes the following assumptions for its run-time: -# -# - The EN (i.e. primary) OpenSearch endpoint is provided in the environment -# variable PROV_ENDPOINT -# - The username/password is provided as a JSON key/value in the environment -# variable PROV_CREDENTIALS -# - The remotes available through cross cluster search to be processed are -# provided as a JSON list of strings - each string containing the space -# separated list of remotes (as they appear on the provenance command line) -# Each set of remotes is used in an execution of provenance. The value of -# this is specified in the environment variable PROV_REMOTES. If this -# variable is empty or not defined, provenance is run without specifying -# remotes and only the PROV_ENDPOINT is processed. -# - The directory containing the provenance.py file is in PATH and is -# executable. -# -# +from pds.registrysweepers.driver import run as run_sweepers -import functools -import inspect -import argparse -import json -import logging -import os -from datetime import datetime -from typing import Callable - -from pds.registrysweepers import provenance, ancestry, repairkit, legacy_registry_sync -from pds.registrysweepers.reindexer import main as reindexer -from pds.registrysweepers.utils import configure_logging, parse_log_level -from pds.registrysweepers.utils.db.client import get_opensearch_client_from_environment -from pds.registrysweepers.utils.misc import get_human_readable_elapsed_since -from pds.registrysweepers.utils.misc import is_dev_mode - -configure_logging(filepath=None, log_level=logging.INFO) -log = logging.getLogger(__name__) - -dev_mode = is_dev_mode() -if dev_mode: - log.warning('Operating in development mode - host verification disabled') - import urllib3 - - urllib3.disable_warnings() - -log_level = parse_log_level(os.environ.get('LOGLEVEL', 'INFO')) - - -def run_factory(sweeper_f: Callable) -> Callable: - return functools.partial( - sweeper_f, - client=get_opensearch_client_from_environment(verify_certs=True if not dev_mode else False), - # enable for development if required - not necessary in production - # log_filepath='registry-sweepers.log', - log_level=log_level - ) - - -parser = argparse.ArgumentParser( - prog='registry-sweepers', - description='sweeps the PDS registry with different routines meant to run regularly on the database' -) - -# define optional sweepers -parser.add_argument('--legacy-sync', action='store_true') -optional_sweepers = { - 'legacy_sync': legacy_registry_sync.run -} - -args = parser.parse_args() - -# Define default sweepers to be run here, in order of execution -sweepers = [ - repairkit.run, - provenance.run, - ancestry.run, - reindexer.run -] - -for option, sweeper in optional_sweepers.items(): - if getattr(args, option): - sweepers.append(sweeper) - -sweeper_descriptions = [inspect.getmodule(f).__name__ for f in sweepers] -log.info(f'Running sweepers: {sweeper_descriptions}') - -total_execution_begin = datetime.now() - -sweeper_execution_duration_strs = [] - -for sweeper in sweepers: - sweeper_execution_begin = datetime.now() - run_sweeper_f = run_factory(sweeper) - - run_sweeper_f() - - sweeper_name = inspect.getmodule(sweeper).__name__ - sweeper_execution_duration_strs.append(f'{sweeper_name}: {get_human_readable_elapsed_since(sweeper_execution_begin)}') - -log.info(f'Sweepers successfully executed in {get_human_readable_elapsed_since(total_execution_begin)}\n ' - + '\n '.join(sweeper_execution_duration_strs)) +# To reduce potential for error when deployed, this file has been left after extraction of contents to +# registrysweepers.driver.run() +# TODO: remove this dependency and have docker run driver.py directly +if __name__ == '__main__': + run_sweepers() diff --git a/scratch/manual-aoss-reindexer/reindex.py b/scratch/manual-aoss-reindexer/reindex.py new file mode 100644 index 0000000..8845518 --- /dev/null +++ b/scratch/manual-aoss-reindexer/reindex.py @@ -0,0 +1,175 @@ +import logging +import os +import time +from datetime import timedelta, datetime +from typing import Iterator + +from pds.registrysweepers.utils import configure_logging +from pds.registrysweepers.utils.db import query_registry_db_with_search_after, write_updated_docs, get_query_hits_count +from pds.registrysweepers.utils.db.client import get_opensearch_client_from_environment +from pds.registrysweepers.utils.db.indexing import ensure_index_mapping +from pds.registrysweepers.utils.db.update import Update + +from pds.registrysweepers.driver import run as run_sweepers +from pds.registrysweepers.utils.misc import get_human_readable_elapsed_since + +# Baseline mappings which are required to facilitate successful execution before any data is copied +necessary_mappings = { + 'lidvid': 'keyword', + 'ops:Harvest_Info/ops:harvest_date_time': 'date' +} + + +def ensure_valid_state(dest_index_name: str): + """Ensure that all necessary preconditions for safe/successful reindexing are met""" + with get_opensearch_client_from_environment() as client: + try: + # ensure that the destination is a temporary index, to prevent inadvertently writing to a real index + allowed_prefixes = {'temp', 'edunn'} + if not any([dest_index_name.startswith(prefix) for prefix in allowed_prefixes]): + raise ValueError( + f'Destination index name {dest_index_name} is not prefixed with one of {allowed_prefixes} and may not be the intended value - aborting') + + # ensure correct destination index configuration + try: + dynamic_mapping_disabled = client.indices.get(dest_index_name)[dest_index_name]['mappings'][ + 'dynamic'] == 'false' + if not dynamic_mapping_disabled: + raise ValueError + + except (KeyError, ValueError): + raise RuntimeError( + f'Index "{dest_index_name}" is not correctly configured - "dynamic" mapping setting is not set to "false - aborting"') + + # ensure necessary mappings to facilitate execution of reindexing sweeper and other steps + for property_name, property_mapping_type in necessary_mappings.items(): + ensure_index_mapping(client, dest_index_name, property_name, property_mapping_type) + + # other conditions may be populated later + + except Exception as err: + logging.error(err) + exit(1) + + +def migrate_bulk_data(src_index_name: str, dest_index_name: str): + """Stream documents from source index to destination index, which may """ + if get_outstanding_document_count(src_index_name, dest_index_name, as_proportion=True) < 0.1: + logging.warning(f'Less than 10% of documents outstanding - skipping bulk streaming migration stage') + return + + with get_opensearch_client_from_environment() as client: + try: + sort_keys = sorted(necessary_mappings.keys()) + + docs = query_registry_db_with_search_after(client, src_index_name, {"query": {"match_all": {}}}, {}, + sort_fields=sort_keys, request_timeout_seconds=20) + updates = map(lambda doc: Update(id=doc['_id'], content=doc['_source']), docs) + write_updated_docs(client, updates, dest_index_name, as_upsert=True) + + # TODO: Implement non-redundant pickup after partial completion, if possible + except Exception as err: + print(f'Reindex from {src_index_name} to {dest_index_name} failed: {err}') + exit(1) + + +def ensure_doc_consistency(src_index_name: str, dest_index_name: str): + """ + Ensure that all documents present in the source index are also present in the destination index. + Discovers and fixes any quiet failures encountered during bulk document streaming. + Yes, this could be accomplished within the bulk streaming, but implementation is simpler this way and there is less + opportunity for error. + """ + + logging.info( + f'Ensuring document consistency - {get_outstanding_document_count(src_index_name, dest_index_name)} documents remain to copy from {src_index_name} to {dest_index_name}') + + with get_opensearch_client_from_environment() as client: + # TODO instead of iteratively pulling/creating, use multi-search (maybe) and bulk write (definitely) to avoid + # the request overhead + for doc_id in enumerate_outstanding_doc_ids(src_index_name, dest_index_name): + try: + src_doc = client.get(src_index_name, doc_id) + client.create(dest_index_name, doc_id, src_doc['_source']) + logging.info(f'Created missing doc with id {doc_id}') + except Exception as err: + logging.error(f'Failed to create doc with id "{doc_id}": {err}') + + wait_until_indexed() + + +def enumerate_outstanding_doc_ids(src_index_name: str, dest_index_name: str) -> Iterator[str]: + with get_opensearch_client_from_environment() as client: + pseudoid_field = "lidvid" + + src_docs = iter(query_registry_db_with_search_after(client, src_index_name, {"query": {"match_all": {}}}, + {"includes": [pseudoid_field]}, + sort_fields=[pseudoid_field], request_timeout_seconds=20)) + dest_docs = iter(query_registry_db_with_search_after(client, dest_index_name, {"query": {"match_all": {}}}, + {"includes": [pseudoid_field]}, + sort_fields=[pseudoid_field], request_timeout_seconds=20)) + + # yield any documents which are present in source but not in destination + src_ids = {doc["_id"] for doc in src_docs} + dest_ids = {doc["_id"] for doc in dest_docs} + + ids_missing_from_src = dest_ids.difference(src_ids) + if len(ids_missing_from_src) > 0: + logging.error( + f'{len(ids_missing_from_src)} ids are present in {dest_index_name} but not in {src_index_name} - this indicates a potential error: {sorted(ids_missing_from_src)}') + exit(1) + + ids_missing_from_dest = src_ids.difference(dest_ids) + return iter(ids_missing_from_dest) + + +def get_outstanding_document_count(src_index_name: str, dest_index_name: str, as_proportion: bool = False) -> int: + """return count(src) - count(dest)""" + with get_opensearch_client_from_environment() as client: + src_docs_count = get_query_hits_count(client, src_index_name, {"query": {"match_all": {}}}) + dest_docs_count = get_query_hits_count(client, dest_index_name, {"query": {"match_all": {}}}) + logging.info(f'index {src_index_name} contains {src_docs_count} docs') + logging.info(f'index {dest_index_name} contains {dest_docs_count} docs') + + outstanding_docs_count = src_docs_count - dest_docs_count + return (outstanding_docs_count / src_docs_count) if as_proportion else outstanding_docs_count + + +def wait_until_indexed(timeout: timedelta = timedelta(minutes=30)): + start = datetime.now() + expiry = start + timeout + + while get_outstanding_document_count(src_index_name, dest_index_name) > 0: + if datetime.now() > expiry: + raise RuntimeError( + f'Failed to ensure consistency - there is remaining disparity in document count between indices "{src_index_name}" and "{dest_index_name}" after {get_human_readable_elapsed_since(start)}') + logging.info('Waiting for indexation to complete...') + time.sleep(15) + + +def run_registry_sweepers(): + """Run sweepers on the migrated data""" + try: + run_sweepers() + except Exception as err: + logging.error(f'Post-reindex sweeper execution failed with {err}') + raise err + + +if __name__ == '__main__': + configure_logging(filepath=None, log_level=logging.INFO) + + src_node_name = 'atm' + dest_pseudonode_name = f'temp-{src_node_name}' + + # set node id env var to facilitate sweepers + os.environ["MULTITENANCY_NODE_ID"] = dest_pseudonode_name + + # change these to use a resolution function later - need to decouple resolve_multitenant_index_name() from env var + src_index_name = f'{src_node_name}-registry' + dest_index_name = f'{dest_pseudonode_name}-registry' + + ensure_valid_state(dest_index_name) + migrate_bulk_data(src_index_name, dest_index_name) + ensure_doc_consistency(src_index_name, dest_index_name) + run_registry_sweepers() diff --git a/src/pds/registrysweepers/ancestry/__init__.py b/src/pds/registrysweepers/ancestry/__init__.py index 7d85c0c..edf996f 100644 --- a/src/pds/registrysweepers/ancestry/__init__.py +++ b/src/pds/registrysweepers/ancestry/__init__.py @@ -71,18 +71,20 @@ def run( METADATA_PARENT_COLLECTION_KEY, SWEEPERS_ANCESTRY_VERSION_METADATA_KEY, ]: - ensure_index_mapping(client, resolve_multitenant_index_name("registry"), metadata_key, "keyword") + ensure_index_mapping(client, resolve_multitenant_index_name(client, "registry"), metadata_key, "keyword") for metadata_key in [ SWEEPERS_ANCESTRY_VERSION_METADATA_KEY, ]: - ensure_index_mapping(client, resolve_multitenant_index_name("registry-refs"), metadata_key, "keyword") + ensure_index_mapping( + client, resolve_multitenant_index_name(client, "registry-refs"), metadata_key, "keyword" + ) log.info("Writing bulk updates to database...") write_updated_docs( client, updates, - index_name=resolve_multitenant_index_name("registry"), + index_name=resolve_multitenant_index_name(client, "registry"), ) log.info("Generating updates from deferred records...") deferred_updates = generate_deferred_updates(client, deferred_records_file.name, registry_mock_query_f) @@ -91,7 +93,7 @@ def run( write_updated_docs( client, deferred_updates, - index_name=resolve_multitenant_index_name("registry"), + index_name=resolve_multitenant_index_name(client, "registry"), ) else: # consume generator to dump bulk updates to sink @@ -99,7 +101,7 @@ def run( pass log.info("Checking indexes for orphaned documents") - index_names = [resolve_multitenant_index_name(index_label) for index_label in ["registry", "registry-refs"]] + index_names = [resolve_multitenant_index_name(client, index_label) for index_label in ["registry", "registry-refs"]] for index_name in index_names: if log.isEnabledFor(logging.DEBUG): orphaned_docs = get_orphaned_documents(client, registry_mock_query_f, index_name) diff --git a/src/pds/registrysweepers/ancestry/generation.py b/src/pds/registrysweepers/ancestry/generation.py index 14c4cf3..d3ba7f2 100644 --- a/src/pds/registrysweepers/ancestry/generation.py +++ b/src/pds/registrysweepers/ancestry/generation.py @@ -425,7 +425,7 @@ def _get_nonaggregate_ancestry_records_with_chunking( isinstance(err, KeyError) and most_recent_attempted_collection_lidvid not in bundle_ancestry_by_collection_lidvid ): - probable_cause = f'[Probable Cause]: Collection primary document with id "{doc["_source"].get("collection_lidvid")}" not found in index {resolve_multitenant_index_name("registry")} for {resolve_multitenant_index_name("registry-refs")} doc with id "{doc.get("_id")}"' + probable_cause = f'[Probable Cause]: Collection primary document with id "{doc["_source"].get("collection_lidvid")}" not found in index {resolve_multitenant_index_name(client, "registry")} for {resolve_multitenant_index_name(client, "registry-refs")} doc with id "{doc.get("_id")}"' elif isinstance(err, ValueError): probable_cause = f'[Probable Cause]: Failed to parse collection and/or product LIDVIDs from document with id "{doc.get("_id")}" in index "{doc.get("_index")}" due to {type(err).__name__}: {err}' else: @@ -490,6 +490,9 @@ def generate_update(doc: RefDocBookkeepingEntry) -> Update: f"Updating {len(docs)} registry-refs docs with {SWEEPERS_ANCESTRY_VERSION_METADATA_KEY}={SWEEPERS_ANCESTRY_VERSION}" ) write_updated_docs( - client, updates, index_name=resolve_multitenant_index_name("registry-refs"), bulk_chunk_max_update_count=20000 + client, + updates, + index_name=resolve_multitenant_index_name(client, "registry-refs"), + bulk_chunk_max_update_count=20000, ) logging.info("registry-refs metadata update complete") diff --git a/src/pds/registrysweepers/ancestry/queries.py b/src/pds/registrysweepers/ancestry/queries.py index 288b4a6..95a2ee7 100644 --- a/src/pds/registrysweepers/ancestry/queries.py +++ b/src/pds/registrysweepers/ancestry/queries.py @@ -42,7 +42,7 @@ def get_bundle_ancestry_records_query(client: OpenSearch, db_mock: DbMockTypeDef query = product_class_query_factory(ProductClass.BUNDLE) _source = {"includes": ["lidvid", SWEEPERS_ANCESTRY_VERSION_METADATA_KEY]} query_f = query_registry_db_or_mock(db_mock, "get_bundle_ancestry_records", use_search_after=True) - docs = query_f(client, resolve_multitenant_index_name("registry"), query, _source) + docs = query_f(client, resolve_multitenant_index_name(client, "registry"), query, _source) return docs @@ -51,7 +51,7 @@ def get_collection_ancestry_records_bundles_query(client: OpenSearch, db_mock: D query = product_class_query_factory(ProductClass.BUNDLE) _source = {"includes": ["lidvid", "ref_lid_collection"]} query_f = query_registry_db_or_mock(db_mock, "get_collection_ancestry_records_bundles", use_search_after=True) - docs = query_f(client, resolve_multitenant_index_name("registry"), query, _source) + docs = query_f(client, resolve_multitenant_index_name(client, "registry"), query, _source) return docs @@ -63,7 +63,7 @@ def get_collection_ancestry_records_collections_query( query = product_class_query_factory(ProductClass.COLLECTION) _source = {"includes": ["lidvid", SWEEPERS_ANCESTRY_VERSION_METADATA_KEY]} query_f = query_registry_db_or_mock(db_mock, "get_collection_ancestry_records_collections", use_search_after=True) - docs = query_f(client, resolve_multitenant_index_name("registry"), query, _source) + docs = query_f(client, resolve_multitenant_index_name(client, "registry"), query, _source) return docs @@ -84,7 +84,7 @@ def get_nonaggregate_ancestry_records_query(client: OpenSearch, registry_db_mock # each document will have many product lidvids, so a smaller page size is warranted here docs = query_f( client, - resolve_multitenant_index_name("registry-refs"), + resolve_multitenant_index_name(client, "registry-refs"), query, _source, page_size=AncestryRuntimeConstants.nonaggregate_ancestry_records_query_page_size, @@ -118,7 +118,7 @@ def get_nonaggregate_ancestry_records_for_collection_lid_query( # each document will have many product lidvids, so a smaller page size is warranted here docs = query_f( client, - resolve_multitenant_index_name("registry-refs"), + resolve_multitenant_index_name(client, "registry-refs"), query, _source, page_size=AncestryRuntimeConstants.nonaggregate_ancestry_records_query_page_size, @@ -185,7 +185,7 @@ def get_existing_ancestry_for_product( docs = query_f( client, - resolve_multitenant_index_name("registry"), + resolve_multitenant_index_name(client, "registry"), query, _source, ) diff --git a/src/pds/registrysweepers/driver.py b/src/pds/registrysweepers/driver.py new file mode 100644 index 0000000..57556c3 --- /dev/null +++ b/src/pds/registrysweepers/driver.py @@ -0,0 +1,137 @@ +# +# Copyright © 2023, California Institute of Technology ("Caltech"). +# U.S. Government sponsorship acknowledged. +# +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# • Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# • Redistributions must reproduce the above copyright notice, this list of +# conditions and the following disclaimer in the documentation and/or other +# materials provided with the distribution. +# • Neither the name of Caltech nor its operating division, the Jet Propulsion +# Laboratory, nor the names of its contributors may be used to endorse or +# promote products derived from this software without specific prior written +# permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +# +# Python driver for provenance (OUTDATED TODO: Update documentation) +# ============================ +# +# This script is provided to support the scheduled execution of PDS Registry +# Provenance, typically in AWS via Event Bridge and ECS/Fargate. +# +# This script makes the following assumptions for its run-time: +# +# - The EN (i.e. primary) OpenSearch endpoint is provided in the environment +# variable PROV_ENDPOINT +# - The username/password is provided as a JSON key/value in the environment +# variable PROV_CREDENTIALS +# - The remotes available through cross cluster search to be processed are +# provided as a JSON list of strings - each string containing the space +# separated list of remotes (as they appear on the provenance command line) +# Each set of remotes is used in an execution of provenance. The value of +# this is specified in the environment variable PROV_REMOTES. If this +# variable is empty or not defined, provenance is run without specifying +# remotes and only the PROV_ENDPOINT is processed. +# - The directory containing the provenance.py file is in PATH and is +# executable. +# +# +import argparse +import functools +import inspect +import json +import logging +import os +from datetime import datetime +from typing import Callable + +from pds.registrysweepers import ancestry +from pds.registrysweepers import legacy_registry_sync +from pds.registrysweepers import provenance +from pds.registrysweepers import repairkit +from pds.registrysweepers.reindexer import main as reindexer +from pds.registrysweepers.utils import configure_logging +from pds.registrysweepers.utils import parse_log_level +from pds.registrysweepers.utils.db.client import get_opensearch_client_from_environment +from pds.registrysweepers.utils.misc import get_human_readable_elapsed_since +from pds.registrysweepers.utils.misc import is_dev_mode + + +def run(): + configure_logging(filepath=None, log_level=logging.INFO) + log = logging.getLogger(__name__) + + dev_mode = is_dev_mode() + if dev_mode: + log.warning("Operating in development mode - host verification disabled") + import urllib3 + + urllib3.disable_warnings() + + log_level = parse_log_level(os.environ.get("LOGLEVEL", "INFO")) + + def run_factory(sweeper_f: Callable) -> Callable: + return functools.partial( + sweeper_f, + client=get_opensearch_client_from_environment(verify_certs=True if not dev_mode else False), + # enable for development if required - not necessary in production + # log_filepath='registry-sweepers.log', + log_level=log_level, + ) + + parser = argparse.ArgumentParser( + prog="registry-sweepers", + description="sweeps the PDS registry with different routines meant to run regularly on the database", + ) + + # define optional sweepers + parser.add_argument("--legacy-sync", action="store_true") + optional_sweepers = {"legacy_sync": legacy_registry_sync.run} + + args = parser.parse_args() + + # Define default sweepers to be run here, in order of execution + sweepers = [repairkit.run, provenance.run, ancestry.run, reindexer.run] + + for option, sweeper in optional_sweepers.items(): + if getattr(args, option): + sweepers.append(sweeper) + + sweeper_descriptions = [inspect.getmodule(f).__name__ for f in sweepers] + log.info(f"Running sweepers: {sweeper_descriptions}") + + total_execution_begin = datetime.now() + + sweeper_execution_duration_strs = [] + + for sweeper in sweepers: + sweeper_execution_begin = datetime.now() + run_sweeper_f = run_factory(sweeper) + + run_sweeper_f() + + sweeper_name = inspect.getmodule(sweeper).__name__ + sweeper_execution_duration_strs.append( + f"{sweeper_name}: {get_human_readable_elapsed_since(sweeper_execution_begin)}" + ) + + log.info( + f"Sweepers successfully executed in {get_human_readable_elapsed_since(total_execution_begin)}\n " + + "\n ".join(sweeper_execution_duration_strs) + ) diff --git a/src/pds/registrysweepers/provenance/__init__.py b/src/pds/registrysweepers/provenance/__init__.py index 3f99b50..bb06509 100644 --- a/src/pds/registrysweepers/provenance/__init__.py +++ b/src/pds/registrysweepers/provenance/__init__.py @@ -73,7 +73,9 @@ def get_records(client: OpenSearch) -> Iterable[ProvenanceRecord]: } _source = {"includes": ["lidvid", METADATA_SUCCESSOR_KEY, SWEEPERS_PROVENANCE_VERSION_METADATA_KEY]} - docs = query_registry_db_with_search_after(client, resolve_multitenant_index_name("registry"), query, _source) + docs = query_registry_db_with_search_after( + client, resolve_multitenant_index_name(client, "registry"), query, _source + ) for doc in docs: try: @@ -141,7 +143,7 @@ def run( write_updated_docs( client, updates, - index_name=resolve_multitenant_index_name("registry"), + index_name=resolve_multitenant_index_name(client, "registry"), ) log.info("Completed provenance sweeper processing!") diff --git a/src/pds/registrysweepers/reindexer/main.py b/src/pds/registrysweepers/reindexer/main.py index c07f19c..baeeef4 100644 --- a/src/pds/registrysweepers/reindexer/main.py +++ b/src/pds/registrysweepers/reindexer/main.py @@ -55,7 +55,7 @@ def get_docs_query(filter_to_harvested_before: datetime): def fetch_dd_field_types(client: OpenSearch) -> Dict[str, str]: - dd_index_name = resolve_multitenant_index_name("registry-dd") + dd_index_name = resolve_multitenant_index_name(client, "registry-dd") name_key = "es_field_name" type_key = "es_data_type" dd_docs = query_registry_db_with_search_after( @@ -279,7 +279,7 @@ def run( configure_logging(filepath=log_filepath, log_level=log_level) sweeper_start_timestamp = datetime.now() - products_index_name = resolve_multitenant_index_name("registry") + products_index_name = resolve_multitenant_index_name(client, "registry") ensure_index_mapping(client, products_index_name, REINDEXER_FLAG_METADATA_KEY, "date") dd_field_types_by_field_name = fetch_dd_field_types(client) diff --git a/src/pds/registrysweepers/repairkit/__init__.py b/src/pds/registrysweepers/repairkit/__init__.py index fa2be0f..a53aed1 100644 --- a/src/pds/registrysweepers/repairkit/__init__.py +++ b/src/pds/registrysweepers/repairkit/__init__.py @@ -100,7 +100,7 @@ def get_unprocessed_docs_query(): # page_size and bulk_chunk_max_update_count constraints are necessary to avoid choking nodes with very-large docs # i.e. ATM and GEO - index_name = resolve_multitenant_index_name("registry") + index_name = resolve_multitenant_index_name(client, "registry") update_max_chunk_size = 20000 while get_query_hits_count(client, index_name, get_unprocessed_docs_query()) > 0: all_docs = query_registry_db_with_search_after( @@ -114,12 +114,15 @@ def get_unprocessed_docs_query(): ) updates = generate_updates(all_docs, SWEEPERS_REPAIRKIT_VERSION_METADATA_KEY, SWEEPERS_REPAIRKIT_VERSION) ensure_index_mapping( - client, resolve_multitenant_index_name("registry"), SWEEPERS_REPAIRKIT_VERSION_METADATA_KEY, "integer" + client, + resolve_multitenant_index_name(client, "registry"), + SWEEPERS_REPAIRKIT_VERSION_METADATA_KEY, + "integer", ) write_updated_docs( client, updates, - index_name=resolve_multitenant_index_name("registry"), + index_name=resolve_multitenant_index_name(client, "registry"), bulk_chunk_max_update_count=update_max_chunk_size, ) diff --git a/src/pds/registrysweepers/utils/db/__init__.py b/src/pds/registrysweepers/utils/db/__init__.py index 521dd99..0e8d7f6 100644 --- a/src/pds/registrysweepers/utils/db/__init__.py +++ b/src/pds/registrysweepers/utils/db/__init__.py @@ -123,7 +123,7 @@ def query_registry_db_with_search_after( index_name: str, query: Dict, _source: Dict, - page_size: int = 10000, + page_size: int = 5000, limit: Union[int, None] = None, sort_fields: Union[List[str], None] = None, request_timeout_seconds: int = 20, @@ -279,6 +279,7 @@ def write_updated_docs( updates: Iterable[Update], index_name: str, bulk_chunk_max_update_count: Union[int, None] = None, + as_upsert: bool = False, ): log.info("Writing document updates...") updated_doc_count = 0 @@ -305,7 +306,7 @@ def write_updated_docs( bulk_updates_buffer = [] bulk_buffer_size_mb = 0.0 - update_statement_strs = update_as_statements(update) + update_statement_strs = update_as_statements(update, as_upsert=as_upsert) for s in update_statement_strs: bulk_buffer_size_mb += sys.getsizeof(s) / 1024**2 @@ -320,13 +321,16 @@ def write_updated_docs( log.info(f"Updated documents for {updated_doc_count} products!") -def update_as_statements(update: Update) -> Iterable[str]: - """Given an Update, convert it to an ElasticSearch-style set of request body content strings""" +def update_as_statements(update: Update, as_upsert: bool = False) -> Iterable[str]: + """ + Given an Update, convert it to an ElasticSearch-style set of request body content strings + Optionally, specify as upsert (index if does not already exist) + """ metadata_statement: Dict[str, Any] = {"update": {"_id": update.id}} if update.has_versioning_information(): metadata_statement["if_primary_term"] = update.primary_term metadata_statement["if_seq_no"] = update.seq_no - content_statement = {"doc": update.content} + content_statement = {"doc": update.content, "doc_as_upsert": as_upsert} update_objs = [metadata_statement, content_statement] updates_strs = [json.dumps(obj) for obj in update_objs] return updates_strs diff --git a/src/pds/registrysweepers/utils/db/multitenancy.py b/src/pds/registrysweepers/utils/db/multitenancy.py index 90bd454..4d7a808 100644 --- a/src/pds/registrysweepers/utils/db/multitenancy.py +++ b/src/pds/registrysweepers/utils/db/multitenancy.py @@ -1,13 +1,36 @@ +import logging import os +from opensearchpy import OpenSearch -def resolve_multitenant_index_name(index_type: str): + +def resolve_multitenant_index_name(client, index_type: str): supported_index_types = {"registry", "registry-refs", "registry-dd"} node_id = os.environ.get("MULTITENANCY_NODE_ID", "").strip(" ") - if node_id == "": + if client is None: return index_type + + if node_id == "": + return resolve_index_name_if_aliased(client, index_type) elif index_type not in supported_index_types: raise ValueError(f'index_type "{index_type}" not supported (expected one of {supported_index_types})') else: - return f"{node_id}-{index_type}" + return resolve_index_name_if_aliased(client, f"{node_id}-{index_type}") + + +def index_exists(client: OpenSearch, index_or_alias_name: str) -> bool: + # counterintuitively, indices.exists does not return False if its argument is an alias. + # Possibly a bug in opensearch-py: https://github.com/opensearch-project/opensearch-py/issues/888 + return client.indices.exists(index_or_alias_name) and not client.indices.exists_alias(index_or_alias_name) + + +def resolve_index_name_if_aliased(client: OpenSearch, index_or_alias_name: str) -> str: + if index_exists(client, index_or_alias_name): + return index_or_alias_name + elif client.indices.exists_alias(index_or_alias_name): + index_name = next(iter(client.indices.get(index_or_alias_name).keys())) + logging.debug(f"Resolved alias {index_or_alias_name} to index {index_name}") + return index_name + else: + raise ValueError(f'Could not resolve index for index_or_alias_name "{index_or_alias_name}"')