Skip to content

Commit

Permalink
Use aws default
Browse files Browse the repository at this point in the history
  • Loading branch information
stacimc committed Jul 22, 2024
1 parent cf2383b commit ad209e3
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 10 deletions.
1 change: 0 additions & 1 deletion catalog/dags/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
AWS_CONN_ID = "aws_default"
AWS_CLOUDWATCH_CONN_ID = os.environ.get("AWS_CLOUDWATCH_CONN_ID", AWS_CONN_ID)
AWS_RDS_CONN_ID = os.environ.get("AWS_RDS_CONN_ID", AWS_CONN_ID)
AWS_ASG_CONN_ID = os.environ.get("AWS_ASG_CONN_ID", AWS_CONN_ID)
ES_PROD_HTTP_CONN_ID = "elasticsearch_http_production"
REFRESH_POKE_INTERVAL = int(os.getenv("DATA_REFRESH_POKE_INTERVAL", 60 * 30))

Expand Down
18 changes: 9 additions & 9 deletions catalog/dags/data_refresh/distributed_reindex.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from requests import Response

from common.constants import (
AWS_ASG_CONN_ID,
AWS_CONN_ID,
OPENLEDGER_API_CONN_ID,
PRODUCTION,
Environment,
Expand Down Expand Up @@ -103,7 +103,7 @@ def get_worker_params(
estimated_record_count: int,
environment: str,
target_environment: Environment,
aws_conn_id: str = AWS_ASG_CONN_ID,
aws_conn_id: str = AWS_CONN_ID,
):
"""Determine the set of start/end indices to be passed to each indexer worker."""
# Defaults to one indexer worker in local development
Expand All @@ -127,7 +127,7 @@ def get_worker_params(
def get_launch_template_version(
environment: str,
target_environment: Environment,
aws_conn_id: str = AWS_ASG_CONN_ID,
aws_conn_id: str = AWS_CONN_ID,
):
"""
Get the latest version of the launch template. Indexer workers will all be created with this
Expand All @@ -154,7 +154,7 @@ def create_worker(
environment: str,
target_environment: Environment,
launch_template_version: int,
aws_conn_id: str = AWS_ASG_CONN_ID,
aws_conn_id: str = AWS_CONN_ID,
):
"""
Create a new EC2 instance using the launch template for the target
Expand Down Expand Up @@ -197,7 +197,7 @@ def create_worker(
def wait_for_worker(
environment: str,
instance_id: str,
aws_conn_id: str = AWS_ASG_CONN_ID,
aws_conn_id: str = AWS_CONN_ID,
):
"""Await the EC2 instance with the given id to be in a healthy running state."""
if environment != PRODUCTION:
Expand Down Expand Up @@ -225,7 +225,7 @@ def wait_for_worker(
trigger_rule=TriggerRule.NONE_FAILED
)
def get_instance_ip_address(
environment: str, instance_id: str, aws_conn_id: str = AWS_ASG_CONN_ID
environment: str, instance_id: str, aws_conn_id: str = AWS_CONN_ID
):
if environment != PRODUCTION:
return "catalog_indexer_worker"
Expand Down Expand Up @@ -274,7 +274,7 @@ def create_connection(
def terminate_indexer_worker(
environment: str,
instance_id: str,
aws_conn_id: str = AWS_ASG_CONN_ID,
aws_conn_id: str = AWS_CONN_ID,
):
"""Terminate an individual indexer worker."""
if environment != PRODUCTION:
Expand Down Expand Up @@ -304,7 +304,7 @@ def reindex(
end_id: int,
environment: str,
target_environment: Environment,
aws_conn_id: str = AWS_ASG_CONN_ID,
aws_conn_id: str = AWS_CONN_ID,
):
"""
Trigger a reindexing task on a remote indexer worker and wait for it to complete. Once done,
Expand Down Expand Up @@ -390,7 +390,7 @@ def perform_distributed_reindex(
target_environment: Environment,
target_index: str,
data_refresh_config: DataRefreshConfig,
aws_conn_id: str = AWS_ASG_CONN_ID,
aws_conn_id: str = AWS_CONN_ID,
):
"""Perform the distributed reindex on a fleet of remote indexer workers."""
estimated_record_count = PGExecuteQueryOperator(
Expand Down

0 comments on commit ad209e3

Please sign in to comment.