Skip to content

Commit

Permalink
Fix timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
stacimc committed Aug 19, 2024
1 parent 098972f commit 4420fe8
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 22 deletions.
4 changes: 2 additions & 2 deletions catalog/dags/data_refresh/dag_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def wait_for_conflicting_dags(
task_id="wait_for_data_refresh",
external_dag_ids=external_dag_ids,
check_existence=True,
poke_interval=data_refresh_config.data_refresh_poke_interval,
poke_interval=data_refresh_config.concurrency_check_poke_interval,
mode="reschedule",
pool=DATA_REFRESH_POOL,
)
Expand Down Expand Up @@ -99,7 +99,7 @@ def create_data_refresh_dag(
data_refresh: dataclass containing configuration information for the
DAG
target_environment: the environment in which the data refresh is performed
target_environment: the API environment in which the data refresh is performed
external_dag_ids: list of ids of the other data refresh DAGs. The data refresh step
of this DAG will not run concurrently with the corresponding step
of any dependent DAG.
Expand Down
19 changes: 13 additions & 6 deletions catalog/dags/data_refresh/data_refresh_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,10 @@ class DataRefreshConfig:
the distributed reindex
index_readiness_timeout: timedelta expressing amount of time it may take
to await a healthy ES index after reindexing
data_refresh_poke_interval: int number of seconds to wait between
checks to see if conflicting DAGs are running.
concurrency_check_poke_interval: int number of seconds to wait between
checks to see if conflicting DAGs are running
reindex_poke_interval: int number of seconds to wait between checks to see if
the reindexing task has completed
doc_md: str used for the DAG's documentation markdown
"""

Expand All @@ -85,9 +87,10 @@ class DataRefreshConfig:
default_args: dict = field(default_factory=dict)
dag_timeout: timedelta = timedelta(days=1)
copy_data_timeout: timedelta = timedelta(hours=1)
indexer_worker_timeout: timedelta = timedelta(hours=6) # TODO
indexer_worker_timeout: timedelta = timedelta(hours=12)
index_readiness_timeout: timedelta = timedelta(days=1)
data_refresh_poke_interval: int = REFRESH_POKE_INTERVAL
concurrency_check_poke_interval: int = REFRESH_POKE_INTERVAL
reindex_poke_interval: int = REFRESH_POKE_INTERVAL

def __post_init__(self):
self.dag_id = f"{self.media_type}_data_refresh"
Expand All @@ -106,7 +109,10 @@ def __post_init__(self):
],
dag_timeout=timedelta(days=4),
copy_data_timeout=timedelta(hours=12),
data_refresh_poke_interval=int(os.getenv("DATA_REFRESH_POKE_INTERVAL", 60)),
concurrency_check_poke_interval=int(
os.getenv("DATA_REFRESH_POKE_INTERVAL", 60)
),
reindex_poke_interval=int(os.getenv("DATA_REFRESH_POKE_INTERVAL", 60)),
),
AUDIO: DataRefreshConfig(
media_type=AUDIO,
Expand All @@ -125,8 +131,9 @@ def __post_init__(self):
copy_data_query=queries.BASIC_COPY_DATA_QUERY,
),
],
data_refresh_poke_interval=int(
concurrency_check_poke_interval=int(
os.getenv("DATA_REFRESH_POKE_INTERVAL", 60 * 30)
),
reindex_poke_interval=int(os.getenv("DATA_REFRESH_POKE_INTERVAL", 60)),
),
}
28 changes: 14 additions & 14 deletions catalog/dags/data_refresh/distributed_reindex.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
AWS_ASG_CONN_ID,
OPENLEDGER_API_CONN_ID,
PRODUCTION,
REFRESH_POKE_INTERVAL,
Environment,
)
from common.sql import PGExecuteQueryOperator, single_value
Expand Down Expand Up @@ -194,7 +193,6 @@ def create_worker(
return instances[0]["InstanceId"]


# TODO configure interval/timeout
@task.sensor(poke_interval=60, timeout=3600, mode="reschedule")
def wait_for_worker(
environment: str,
Expand Down Expand Up @@ -222,7 +220,10 @@ def wait_for_worker(
)


@task
@task(
# Run locally when create instance tasks are skipped
trigger_rule=TriggerRule.NONE_FAILED
)
def get_instance_ip_address(
environment: str, instance_id: str, aws_conn_id: str = AWS_ASG_CONN_ID
):
Expand Down Expand Up @@ -297,8 +298,7 @@ def drop_connection(worker_conn: str):

@task_group(group_id="reindex")
def reindex(
model_name: str,
table_name: str,
data_refresh_config: DataRefreshConfig,
target_index: str,
start_id: int,
end_id: int,
Expand All @@ -325,9 +325,10 @@ def reindex(
)

# Wait for the worker to finish initializing
await_worker = wait_for_worker(
environment=environment, instance_id=instance_id, aws_conn_id=aws_conn_id
)
await_worker = wait_for_worker.override(
poke_interval=data_refresh_config.reindex_poke_interval,
timeout=data_refresh_config.indexer_worker_timeout.total_seconds(),
)(environment=environment, instance_id=instance_id, aws_conn_id=aws_conn_id)

instance_ip_address = get_instance_ip_address(
environment=environment, instance_id=instance_id, aws_conn_id=aws_conn_id
Expand All @@ -345,8 +346,8 @@ def reindex(
http_conn_id=worker_conn,
endpoint="task",
data={
"model_name": model_name,
"table_name": table_name,
"model_name": data_refresh_config.media_type,
"table_name": data_refresh_config.media_type,
"target_index": target_index,
"start_id": start_id,
"end_id": end_id,
Expand All @@ -362,8 +363,8 @@ def reindex(
method="GET",
response_check=response_check_wait_for_completion,
mode="reschedule",
poke_interval=REFRESH_POKE_INTERVAL,
timeout=24 * 60 * 60, # 1 day TODO
poke_interval=data_refresh_config.reindex_poke_interval,
timeout=data_refresh_config.indexer_worker_timeout,
)

terminate_instance = terminate_indexer_worker.override(
Expand Down Expand Up @@ -416,8 +417,7 @@ def perform_distributed_reindex(
estimated_record_count >> worker_params

reindex.partial(
model_name=data_refresh_config.media_type,
table_name=data_refresh_config.media_type,
data_refresh_config=data_refresh_config,
target_index=target_index,
environment=environment,
target_environment=target_environment,
Expand Down

0 comments on commit 4420fe8

Please sign in to comment.