From cb31813ab2c1a90a0d1e9ee08ebb02ea0b8de028 Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Mon, 22 Jul 2024 12:53:52 -0700 Subject: [PATCH] Use aws default --- catalog/dags/common/constants.py | 1 - .../dags/data_refresh/distributed_reindex.py | 18 +++++++++--------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/catalog/dags/common/constants.py b/catalog/dags/common/constants.py index b94d3aac555..f2b1f45a407 100644 --- a/catalog/dags/common/constants.py +++ b/catalog/dags/common/constants.py @@ -47,7 +47,6 @@ AWS_CONN_ID = "aws_default" AWS_CLOUDWATCH_CONN_ID = os.environ.get("AWS_CLOUDWATCH_CONN_ID", AWS_CONN_ID) AWS_RDS_CONN_ID = os.environ.get("AWS_RDS_CONN_ID", AWS_CONN_ID) -AWS_ASG_CONN_ID = os.environ.get("AWS_ASG_CONN_ID", AWS_CONN_ID) ES_PROD_HTTP_CONN_ID = "elasticsearch_http_production" REFRESH_POKE_INTERVAL = int(os.getenv("DATA_REFRESH_POKE_INTERVAL", 60 * 30)) diff --git a/catalog/dags/data_refresh/distributed_reindex.py b/catalog/dags/data_refresh/distributed_reindex.py index b5d15e14b28..fcbfd62c243 100644 --- a/catalog/dags/data_refresh/distributed_reindex.py +++ b/catalog/dags/data_refresh/distributed_reindex.py @@ -22,7 +22,7 @@ from requests import Response from common.constants import ( - AWS_ASG_CONN_ID, + AWS_CONN_ID, OPENLEDGER_API_CONN_ID, PRODUCTION, Environment, @@ -103,7 +103,7 @@ def get_worker_params( estimated_record_count: int, environment: str, target_environment: Environment, - aws_conn_id: str = AWS_ASG_CONN_ID, + aws_conn_id: str = AWS_CONN_ID, ): """Determine the set of start/end indices to be passed to each indexer worker.""" # Defaults to one indexer worker in local development @@ -127,7 +127,7 @@ def get_worker_params( def get_launch_template_version( environment: str, target_environment: Environment, - aws_conn_id: str = AWS_ASG_CONN_ID, + aws_conn_id: str = AWS_CONN_ID, ): """ Get the latest version of the launch template. Indexer workers will all be created with this @@ -154,7 +154,7 @@ def create_worker( environment: str, target_environment: Environment, launch_template_version: int, - aws_conn_id: str = AWS_ASG_CONN_ID, + aws_conn_id: str = AWS_CONN_ID, ): """ Create a new EC2 instance using the launch template for the target @@ -197,7 +197,7 @@ def create_worker( def wait_for_worker( environment: str, instance_id: str, - aws_conn_id: str = AWS_ASG_CONN_ID, + aws_conn_id: str = AWS_CONN_ID, ): """Await the EC2 instance with the given id to be in a healthy running state.""" if environment != PRODUCTION: @@ -225,7 +225,7 @@ def wait_for_worker( trigger_rule=TriggerRule.NONE_FAILED ) def get_instance_ip_address( - environment: str, instance_id: str, aws_conn_id: str = AWS_ASG_CONN_ID + environment: str, instance_id: str, aws_conn_id: str = AWS_CONN_ID ): if environment != PRODUCTION: return "catalog_indexer_worker" @@ -274,7 +274,7 @@ def create_connection( def terminate_indexer_worker( environment: str, instance_id: str, - aws_conn_id: str = AWS_ASG_CONN_ID, + aws_conn_id: str = AWS_CONN_ID, ): """Terminate an individual indexer worker.""" if environment != PRODUCTION: @@ -304,7 +304,7 @@ def reindex( end_id: int, environment: str, target_environment: Environment, - aws_conn_id: str = AWS_ASG_CONN_ID, + aws_conn_id: str = AWS_CONN_ID, ): """ Trigger a reindexing task on a remote indexer worker and wait for it to complete. Once done, @@ -390,7 +390,7 @@ def perform_distributed_reindex( target_environment: Environment, target_index: str, data_refresh_config: DataRefreshConfig, - aws_conn_id: str = AWS_ASG_CONN_ID, + aws_conn_id: str = AWS_CONN_ID, ): """Perform the distributed reindex on a fleet of remote indexer workers.""" estimated_record_count = PGExecuteQueryOperator(