Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement reindexing sweeper #155

Merged
merged 15 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 6 additions & 136 deletions docker/sweepers_driver.py
Original file line number Diff line number Diff line change
@@ -1,138 +1,8 @@
#! /usr/bin/env python3
#
# Copyright © 2023, California Institute of Technology ("Caltech").
# U.S. Government sponsorship acknowledged.
#
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# • Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# • Redistributions must reproduce the above copyright notice, this list of
# conditions and the following disclaimer in the documentation and/or other
# materials provided with the distribution.
# • Neither the name of Caltech nor its operating division, the Jet Propulsion
# Laboratory, nor the names of its contributors may be used to endorse or
# promote products derived from this software without specific prior written
# permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Python driver for provenance (OUTDATED TODO: Update documentation)
# ============================
#
# This script is provided to support the scheduled execution of PDS Registry
# Provenance, typically in AWS via Event Bridge and ECS/Fargate.
#
# This script makes the following assumptions for its run-time:
#
# - The EN (i.e. primary) OpenSearch endpoint is provided in the environment
# variable PROV_ENDPOINT
# - The username/password is provided as a JSON key/value in the environment
# variable PROV_CREDENTIALS
# - The remotes available through cross cluster search to be processed are
# provided as a JSON list of strings - each string containing the space
# separated list of remotes (as they appear on the provenance command line)
# Each set of remotes is used in an execution of provenance. The value of
# this is specified in the environment variable PROV_REMOTES. If this
# variable is empty or not defined, provenance is run without specifying
# remotes and only the PROV_ENDPOINT is processed.
# - The directory containing the provenance.py file is in PATH and is
# executable.
#
#
from pds.registrysweepers.driver import run as run_sweepers

import functools
import inspect
import argparse
import json
import logging
import os
from datetime import datetime
from typing import Callable

from pds.registrysweepers import provenance, ancestry, repairkit, legacy_registry_sync
from pds.registrysweepers.reindexer import main as reindexer
from pds.registrysweepers.utils import configure_logging, parse_log_level
from pds.registrysweepers.utils.db.client import get_opensearch_client_from_environment
from pds.registrysweepers.utils.misc import get_human_readable_elapsed_since
from pds.registrysweepers.utils.misc import is_dev_mode

configure_logging(filepath=None, log_level=logging.INFO)
log = logging.getLogger(__name__)

dev_mode = is_dev_mode()
if dev_mode:
log.warning('Operating in development mode - host verification disabled')
import urllib3

urllib3.disable_warnings()

log_level = parse_log_level(os.environ.get('LOGLEVEL', 'INFO'))


def run_factory(sweeper_f: Callable) -> Callable:
return functools.partial(
sweeper_f,
client=get_opensearch_client_from_environment(verify_certs=True if not dev_mode else False),
# enable for development if required - not necessary in production
# log_filepath='registry-sweepers.log',
log_level=log_level
)


parser = argparse.ArgumentParser(
prog='registry-sweepers',
description='sweeps the PDS registry with different routines meant to run regularly on the database'
)

# define optional sweepers
parser.add_argument('--legacy-sync', action='store_true')
optional_sweepers = {
'legacy_sync': legacy_registry_sync.run
}

args = parser.parse_args()

# Define default sweepers to be run here, in order of execution
sweepers = [
repairkit.run,
provenance.run,
ancestry.run,
reindexer.run
]

for option, sweeper in optional_sweepers.items():
if getattr(args, option):
sweepers.append(sweeper)

sweeper_descriptions = [inspect.getmodule(f).__name__ for f in sweepers]
log.info(f'Running sweepers: {sweeper_descriptions}')

total_execution_begin = datetime.now()

sweeper_execution_duration_strs = []

for sweeper in sweepers:
sweeper_execution_begin = datetime.now()
run_sweeper_f = run_factory(sweeper)

run_sweeper_f()

sweeper_name = inspect.getmodule(sweeper).__name__
sweeper_execution_duration_strs.append(f'{sweeper_name}: {get_human_readable_elapsed_since(sweeper_execution_begin)}')

log.info(f'Sweepers successfully executed in {get_human_readable_elapsed_since(total_execution_begin)}\n '
+ '\n '.join(sweeper_execution_duration_strs))
# To reduce potential for error when deployed, this file has been left after extraction of contents to
# registrysweepers.driver.run()
# TODO: remove this dependency and have docker run driver.py directly
if __name__ == '__main__':
run_sweepers()
175 changes: 175 additions & 0 deletions scratch/manual-aoss-reindexer/reindex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import logging
import os
import time
from datetime import timedelta, datetime
from typing import Iterator

from pds.registrysweepers.utils import configure_logging
from pds.registrysweepers.utils.db import query_registry_db_with_search_after, write_updated_docs, get_query_hits_count
from pds.registrysweepers.utils.db.client import get_opensearch_client_from_environment
from pds.registrysweepers.utils.db.indexing import ensure_index_mapping
from pds.registrysweepers.utils.db.update import Update

from pds.registrysweepers.driver import run as run_sweepers
from pds.registrysweepers.utils.misc import get_human_readable_elapsed_since

# Baseline mappings which are required to facilitate successful execution before any data is copied
necessary_mappings = {
'lidvid': 'keyword',
'ops:Harvest_Info/ops:harvest_date_time': 'date'
}


def ensure_valid_state(dest_index_name: str):
"""Ensure that all necessary preconditions for safe/successful reindexing are met"""
with get_opensearch_client_from_environment() as client:
try:
# ensure that the destination is a temporary index, to prevent inadvertently writing to a real index
allowed_prefixes = {'temp', 'edunn'}
if not any([dest_index_name.startswith(prefix) for prefix in allowed_prefixes]):
raise ValueError(
f'Destination index name {dest_index_name} is not prefixed with one of {allowed_prefixes} and may not be the intended value - aborting')

# ensure correct destination index configuration
try:
dynamic_mapping_disabled = client.indices.get(dest_index_name)[dest_index_name]['mappings'][
'dynamic'] == 'false'
if not dynamic_mapping_disabled:
raise ValueError

except (KeyError, ValueError):
raise RuntimeError(
f'Index "{dest_index_name}" is not correctly configured - "dynamic" mapping setting is not set to "false - aborting"')

# ensure necessary mappings to facilitate execution of reindexing sweeper and other steps
for property_name, property_mapping_type in necessary_mappings.items():
ensure_index_mapping(client, dest_index_name, property_name, property_mapping_type)

# other conditions may be populated later

except Exception as err:
logging.error(err)
exit(1)


def migrate_bulk_data(src_index_name: str, dest_index_name: str):
"""Stream documents from source index to destination index, which may """
if get_outstanding_document_count(src_index_name, dest_index_name, as_proportion=True) < 0.1:
logging.warning(f'Less than 10% of documents outstanding - skipping bulk streaming migration stage')
return

with get_opensearch_client_from_environment() as client:
try:
sort_keys = sorted(necessary_mappings.keys())

docs = query_registry_db_with_search_after(client, src_index_name, {"query": {"match_all": {}}}, {},
sort_fields=sort_keys, request_timeout_seconds=20)
updates = map(lambda doc: Update(id=doc['_id'], content=doc['_source']), docs)
write_updated_docs(client, updates, dest_index_name, as_upsert=True)

# TODO: Implement non-redundant pickup after partial completion, if possible
except Exception as err:
print(f'Reindex from {src_index_name} to {dest_index_name} failed: {err}')
exit(1)


def ensure_doc_consistency(src_index_name: str, dest_index_name: str):
"""
Ensure that all documents present in the source index are also present in the destination index.
Discovers and fixes any quiet failures encountered during bulk document streaming.
Yes, this could be accomplished within the bulk streaming, but implementation is simpler this way and there is less
opportunity for error.
"""

logging.info(
f'Ensuring document consistency - {get_outstanding_document_count(src_index_name, dest_index_name)} documents remain to copy from {src_index_name} to {dest_index_name}')

with get_opensearch_client_from_environment() as client:
# TODO instead of iteratively pulling/creating, use multi-search (maybe) and bulk write (definitely) to avoid
# the request overhead
for doc_id in enumerate_outstanding_doc_ids(src_index_name, dest_index_name):
try:
src_doc = client.get(src_index_name, doc_id)
client.create(dest_index_name, doc_id, src_doc['_source'])
logging.info(f'Created missing doc with id {doc_id}')
except Exception as err:
logging.error(f'Failed to create doc with id "{doc_id}": {err}')

wait_until_indexed()


def enumerate_outstanding_doc_ids(src_index_name: str, dest_index_name: str) -> Iterator[str]:
with get_opensearch_client_from_environment() as client:
pseudoid_field = "lidvid"

src_docs = iter(query_registry_db_with_search_after(client, src_index_name, {"query": {"match_all": {}}},
{"includes": [pseudoid_field]},
sort_fields=[pseudoid_field], request_timeout_seconds=20))
dest_docs = iter(query_registry_db_with_search_after(client, dest_index_name, {"query": {"match_all": {}}},
{"includes": [pseudoid_field]},
sort_fields=[pseudoid_field], request_timeout_seconds=20))

# yield any documents which are present in source but not in destination
src_ids = {doc["_id"] for doc in src_docs}
dest_ids = {doc["_id"] for doc in dest_docs}

ids_missing_from_src = dest_ids.difference(src_ids)
if len(ids_missing_from_src) > 0:
logging.error(
f'{len(ids_missing_from_src)} ids are present in {dest_index_name} but not in {src_index_name} - this indicates a potential error: {sorted(ids_missing_from_src)}')
exit(1)

ids_missing_from_dest = src_ids.difference(dest_ids)
return iter(ids_missing_from_dest)


def get_outstanding_document_count(src_index_name: str, dest_index_name: str, as_proportion: bool = False) -> int:
"""return count(src) - count(dest)"""
with get_opensearch_client_from_environment() as client:
src_docs_count = get_query_hits_count(client, src_index_name, {"query": {"match_all": {}}})
dest_docs_count = get_query_hits_count(client, dest_index_name, {"query": {"match_all": {}}})
logging.info(f'index {src_index_name} contains {src_docs_count} docs')
logging.info(f'index {dest_index_name} contains {dest_docs_count} docs')

outstanding_docs_count = src_docs_count - dest_docs_count
return (outstanding_docs_count / src_docs_count) if as_proportion else outstanding_docs_count


def wait_until_indexed(timeout: timedelta = timedelta(minutes=30)):
start = datetime.now()
expiry = start + timeout

while get_outstanding_document_count(src_index_name, dest_index_name) > 0:
if datetime.now() > expiry:
raise RuntimeError(
f'Failed to ensure consistency - there is remaining disparity in document count between indices "{src_index_name}" and "{dest_index_name}" after {get_human_readable_elapsed_since(start)}')
logging.info('Waiting for indexation to complete...')
time.sleep(15)


def run_registry_sweepers():
"""Run sweepers on the migrated data"""
try:
run_sweepers()
except Exception as err:
logging.error(f'Post-reindex sweeper execution failed with {err}')
raise err


if __name__ == '__main__':
configure_logging(filepath=None, log_level=logging.INFO)

src_node_name = 'atm'
dest_pseudonode_name = f'temp-{src_node_name}'

# set node id env var to facilitate sweepers
os.environ["MULTITENANCY_NODE_ID"] = dest_pseudonode_name

# change these to use a resolution function later - need to decouple resolve_multitenant_index_name() from env var
src_index_name = f'{src_node_name}-registry'
dest_index_name = f'{dest_pseudonode_name}-registry'

ensure_valid_state(dest_index_name)
migrate_bulk_data(src_index_name, dest_index_name)
ensure_doc_consistency(src_index_name, dest_index_name)
run_registry_sweepers()
12 changes: 7 additions & 5 deletions src/pds/registrysweepers/ancestry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,20 @@ def run(
METADATA_PARENT_COLLECTION_KEY,
SWEEPERS_ANCESTRY_VERSION_METADATA_KEY,
]:
ensure_index_mapping(client, resolve_multitenant_index_name("registry"), metadata_key, "keyword")
ensure_index_mapping(client, resolve_multitenant_index_name(client, "registry"), metadata_key, "keyword")

for metadata_key in [
SWEEPERS_ANCESTRY_VERSION_METADATA_KEY,
]:
ensure_index_mapping(client, resolve_multitenant_index_name("registry-refs"), metadata_key, "keyword")
ensure_index_mapping(
client, resolve_multitenant_index_name(client, "registry-refs"), metadata_key, "keyword"
)

log.info("Writing bulk updates to database...")
write_updated_docs(
client,
updates,
index_name=resolve_multitenant_index_name("registry"),
index_name=resolve_multitenant_index_name(client, "registry"),
)
log.info("Generating updates from deferred records...")
deferred_updates = generate_deferred_updates(client, deferred_records_file.name, registry_mock_query_f)
Expand All @@ -91,15 +93,15 @@ def run(
write_updated_docs(
client,
deferred_updates,
index_name=resolve_multitenant_index_name("registry"),
index_name=resolve_multitenant_index_name(client, "registry"),
)
else:
# consume generator to dump bulk updates to sink
for _ in updates:
pass

log.info("Checking indexes for orphaned documents")
index_names = [resolve_multitenant_index_name(index_label) for index_label in ["registry", "registry-refs"]]
index_names = [resolve_multitenant_index_name(client, index_label) for index_label in ["registry", "registry-refs"]]
for index_name in index_names:
if log.isEnabledFor(logging.DEBUG):
orphaned_docs = get_orphaned_documents(client, registry_mock_query_f, index_name)
Expand Down
7 changes: 5 additions & 2 deletions src/pds/registrysweepers/ancestry/generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ def _get_nonaggregate_ancestry_records_with_chunking(
isinstance(err, KeyError)
and most_recent_attempted_collection_lidvid not in bundle_ancestry_by_collection_lidvid
):
probable_cause = f'[Probable Cause]: Collection primary document with id "{doc["_source"].get("collection_lidvid")}" not found in index {resolve_multitenant_index_name("registry")} for {resolve_multitenant_index_name("registry-refs")} doc with id "{doc.get("_id")}"'
probable_cause = f'[Probable Cause]: Collection primary document with id "{doc["_source"].get("collection_lidvid")}" not found in index {resolve_multitenant_index_name(client, "registry")} for {resolve_multitenant_index_name(client, "registry-refs")} doc with id "{doc.get("_id")}"'
elif isinstance(err, ValueError):
probable_cause = f'[Probable Cause]: Failed to parse collection and/or product LIDVIDs from document with id "{doc.get("_id")}" in index "{doc.get("_index")}" due to {type(err).__name__}: {err}'
else:
Expand Down Expand Up @@ -490,6 +490,9 @@ def generate_update(doc: RefDocBookkeepingEntry) -> Update:
f"Updating {len(docs)} registry-refs docs with {SWEEPERS_ANCESTRY_VERSION_METADATA_KEY}={SWEEPERS_ANCESTRY_VERSION}"
)
write_updated_docs(
client, updates, index_name=resolve_multitenant_index_name("registry-refs"), bulk_chunk_max_update_count=20000
client,
updates,
index_name=resolve_multitenant_index_name(client, "registry-refs"),
bulk_chunk_max_update_count=20000,
)
logging.info("registry-refs metadata update complete")
Loading