Skip to content

Commit

Permalink
Use trigger_and_wait for alias point step in create filtered index DAG (
Browse files Browse the repository at this point in the history
  • Loading branch information
AetherUnbound authored Jul 12, 2023
1 parent cf3f9d1 commit a6242cc
Showing 1 changed file with 11 additions and 7 deletions.
18 changes: 11 additions & 7 deletions catalog/dags/data_refresh/create_filtered_index_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,15 @@
of what you are doing.
"""
import uuid
from datetime import datetime
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task
from airflow.exceptions import AirflowSensorTimeout
from airflow.models.param import Param
from airflow.operators.empty import EmptyOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
from airflow.utils.trigger_rule import TriggerRule

from common import ingestion_server
Expand Down Expand Up @@ -129,17 +130,20 @@ def create_and_populate_filtered_index(
timeout=data_refresh.create_filtered_index_timeout,
)

def point_alias(destination_index_suffix: str):
def point_alias(destination_index_suffix: str) -> TaskGroup:
point_alias_payload = {
"alias": target_alias,
"index_suffix": f"{destination_index_suffix}-filtered",
}

return ingestion_server.trigger_task(
action="POINT_ALIAS",
model=media_type,
data=point_alias_payload,
)
with TaskGroup(group_id="point_alias") as point_alias_group:
ingestion_server.trigger_and_wait_for_task(
action="POINT_ALIAS",
model=media_type,
data=point_alias_payload,
timeout=timedelta(hours=12), # matches the ingestion server's wait time
)
return point_alias_group

with DAG(
dag_id=f"create_filtered_{media_type}_index",
Expand Down

0 comments on commit a6242cc

Please sign in to comment.