Skip to content

Commit

Permalink
Pin launch template version for all indexer workers
Browse files Browse the repository at this point in the history
  • Loading branch information
stacimc committed Aug 19, 2024
1 parent 6eca45c commit 098972f
Showing 1 changed file with 42 additions and 12 deletions.
54 changes: 42 additions & 12 deletions catalog/dags/data_refresh/distributed_reindex.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
OPENLEDGER_API_CONN_ID,
PRODUCTION,
REFRESH_POKE_INTERVAL,
Environment,
)
from common.sql import PGExecuteQueryOperator, single_value
from data_refresh.constants import INDEXER_LAUNCH_TEMPLATES, INDEXER_WORKER_COUNTS
Expand All @@ -41,7 +42,7 @@
class TempConnectionHTTPOperator(HttpOperator):
"""
Wrapper around the HTTPOperator which allows templating of the conn_id,
in order to support using a temporary conn_id passed through XCOM.
in order to support using a temporary conn_id passed through XCOMs.
"""

template_fields: Sequence[str] = (
Expand All @@ -56,7 +57,7 @@ class TempConnectionHTTPOperator(HttpOperator):
class TempConnectionHTTPSensor(HttpSensor):
"""
Wrapper around the HTTPSensor which allows templating of the conn_id,
in order to support using a temporary conn_id passed through XCOM.
in order to support using a temporary conn_id passed through XCOMs.
"""

template_fields: Sequence[str] = (
Expand Down Expand Up @@ -102,8 +103,8 @@ def response_check_wait_for_completion(response: Response) -> bool:
def get_worker_params(
estimated_record_count: int,
environment: str,
target_environment: str,
aws_conn_id: str = AWS_ASG_CONN_ID, # TODO
target_environment: Environment,
aws_conn_id: str = AWS_ASG_CONN_ID,
):
"""Determine the set of start/end indices to be passed to each indexer worker."""
# Defaults to one indexer worker in local development
Expand All @@ -123,10 +124,37 @@ def get_worker_params(
]


@task
def get_launch_template_version(
environment: str,
target_environment: Environment,
aws_conn_id: str = AWS_ASG_CONN_ID,
):
"""
Get the latest version of the launch template. Indexer workers will all be created with this
version. Importantly, this allows us to retry an individual indexer worker and ensure that
it will run with the same version of the indexer worker as the others, even if code has been
deployed to the indexer worker in the meantime.
"""
if environment != PRODUCTION:
raise AirflowSkipException("Skipping instance creation in local environment.")

ec2_hook = EC2Hook(aws_conn_id=aws_conn_id, api_type="client_type")
launch_templates = ec2_hook.conn.describe_launch_templates(
LaunchTemplateNames=INDEXER_LAUNCH_TEMPLATES.get(target_environment)
)

if len(launch_templates.get("LaunchTemplates")) == 0:
raise Exception("Unable to determine launch template version.")

return launch_templates.get("LaunchTemplates")[0].get("LatestVersionNumber")


@task
def create_worker(
environment: str,
target_environment: str,
target_environment: Environment,
launch_template_version: int,
aws_conn_id: str = AWS_ASG_CONN_ID,
):
"""
Expand All @@ -142,11 +170,7 @@ def create_worker(
MaxCount=1,
LaunchTemplate={
"LaunchTemplateName": INDEXER_LAUNCH_TEMPLATES.get(target_environment),
"Version": "$Latest",
# TODO we could add a task before all of this to get the version number of
# the launch template and then use it in all these tasks, that ensures
# that all indexer workers are running the same code even if a deploy
# happens in the middle of a data refresh
"Version": str(launch_template_version),
},
# Name the instance by applying a tag
TagSpecifications=[
Expand Down Expand Up @@ -279,18 +303,24 @@ def reindex(
start_id: int,
end_id: int,
environment: str,
target_environment: str,
target_environment: Environment,
aws_conn_id: str = AWS_ASG_CONN_ID,
):
"""
Trigger a reindexing task on a remote indexer worker and wait for it to complete. Once done,
terminate the indexer worker instance.
"""

launch_template_version = get_launch_template_version(
environment=environment,
target_environment=target_environment,
)

# Create a new EC2 instance
instance_id = create_worker(
environment=environment,
target_environment=target_environment,
launch_template_version=launch_template_version,
aws_conn_id=aws_conn_id,
)

Expand Down Expand Up @@ -356,7 +386,7 @@ def reindex(
)
def perform_distributed_reindex(
environment: str,
target_environment: str, # TODO, update types
target_environment: Environment,
target_index: str,
data_refresh_config: DataRefreshConfig,
aws_conn_id: str = AWS_ASG_CONN_ID,
Expand Down

0 comments on commit 098972f

Please sign in to comment.