diff --git a/pipelines/rj_smtr/__init__.py b/pipelines/rj_smtr/__init__.py index b9532af8d..2205e6c54 100644 --- a/pipelines/rj_smtr/__init__.py +++ b/pipelines/rj_smtr/__init__.py @@ -21,3 +21,4 @@ from pipelines.rj_smtr.br_rj_riodejaneiro_gtfs.flows import * from pipelines.rj_smtr.br_rj_riodejaneiro_stu.flows import * from pipelines.rj_smtr.br_rj_riodejaneiro_diretorios.flows import * +from pipelines.rj_smtr.janitor.flows import * diff --git a/pipelines/rj_smtr/janitor/__init__.py b/pipelines/rj_smtr/janitor/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/pipelines/rj_smtr/janitor/flows.py b/pipelines/rj_smtr/janitor/flows.py new file mode 100644 index 000000000..6c0e4f10e --- /dev/null +++ b/pipelines/rj_smtr/janitor/flows.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +"Flows for janitor" +from prefect.run_configs import KubernetesRun +from prefect.storage import GCS +from pipelines.constants import constants as emd_constants +from pipelines.utils.decorators import Flow +from prefect.utilities.edges import unmapped +from pipelines.rj_smtr.schedules import every_5_minutes +from pipelines.rj_smtr.janitor.tasks import ( + query_active_flow_names, + query_not_active_flows, + cancel_flows, + get_prefect_client, +) + +with Flow( + "SMTR: Desagendamento de runs arquivadas", code_owners=["caio"] +) as janitor_flow: + client = get_prefect_client() + flows = query_active_flow_names(prefect_client=client) + archived_flow_runs = query_not_active_flows.map( + flows=flows, prefect_client=unmapped(client) + ) + cancel_flows.map(flows=archived_flow_runs, prefect_client=unmapped(client)) + +janitor_flow.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) +janitor_flow.run_config = KubernetesRun( + image=emd_constants.DOCKER_IMAGE.value, + labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], +) +janitor_flow.schedule = every_5_minutes diff --git a/pipelines/rj_smtr/janitor/tasks.py b/pipelines/rj_smtr/janitor/tasks.py new file mode 100644 index 000000000..c8d309cf1 --- /dev/null +++ b/pipelines/rj_smtr/janitor/tasks.py @@ -0,0 +1,243 @@ +# -*- coding: utf-8 -*- +from typing import Dict, List +import traceback +from datetime import datetime + +from prefect import task +from prefect.client import Client + +from pipelines.utils.utils import log, get_vault_secret + +import requests + + +@task +def query_active_flow_names(prefix="%SMTR%", prefect_client=None): + query = """ +query ($prefix: String, $offset: Int){ + flow( + where: { + name: {_like: $prefix}, + archived: {_eq: false}, + project: {name:{_eq:"main"}} + } + offset: $offset + ){ + name + version + } +} +""" + if not prefect_client: + prefect_client = Client() + variables = {"prefix": prefix, "offset": 0} + # flow_names = [] + response = prefect_client.graphql(query=query, variables=variables)["data"] + active_flows = [] + for flow in response["flow"]: + active_flows.append((flow["name"], flow["version"])) + # flow_names.append(flow["name"]) + # flow_names = list(set(flow_names)) + active_flows = list(set(active_flows)) + return active_flows + + +@task +def query_not_active_flows(flows, prefect_client=None): + """ + Queries the graphql API for scheduled flow_runs of + archived versions of + + Args: + flow_name (str): flow name + """ + flow_name, last_version = flows + now = datetime.now().isoformat() + query = """ +query($flow_name: String, $last_version: Int, $now: timestamptz!, $offset: Int){ + flow( + where:{ + name: {_eq:$flow_name}, + version: {_lt:$last_version} + project: {name:{_eq:"main"}} + } + offset: $offset + order_by: {version:desc} + ){ + id + name + version + flow_runs( + where:{ + scheduled_start_time: {_gte: $now}, + state: {_nin: ["Cancelled"]} + } + order_by: {version:desc} + ){ + id + scheduled_start_time + } + } +} +""" + if not prefect_client: + prefect_client = Client() + + variables = { + "flow_name": flow_name, + "last_version": last_version, + "now": now, + "offset": 0, + } + archived_flows = [] + response = prefect_client.graphql(query=query, variables=variables)["data"] + # log(response) + for flow in response["flow"]: + if flow["flow_runs"]: + try: + archived_flows.append( + { + "id": flow["id"], + "name": flow["name"], + "version": flow["version"], + "count": len(flow["flow_runs"]), + } + ) + except Exception: + log(flow) + + return archived_flows + + +def send_cancelled_run_on_discord(flows, webhook_url): + message = f""" +Os Flows de nome {flows[0]['name']} tiveram as seguintes versões arquivadas: +Link para as versões:\n +""" + for flow in flows: + message.append( + f"Versão {flow['version']}: https://prefect.dados.rio/default/flow-run/{flow['id']}" + ) + + r = requests.post( + webhook_url, + data={"content": message}, + ) + + log(r.status_code) + log(r.text) + + +@task +def get_prefect_client(): + return Client() + + +@task +def get_active_flow_names(prefix="%SMTR%"): + flow_names = query_active_flow_names(prefix=prefix) + log(f"Got flow_names\n{flow_names[:10]}\n...\n{flow_names[-10:-1]}") + return flow_names + + +@task +def query_archived_scheduled_runs(flow_name, prefect_client=None): + """ + Queries the graphql API for scheduled flow_runs of + archived versions of + + Args: + flow_name (str): flow name + """ + query = """ +query($flow_name: String, $offset: Int){ + flow( + where:{ + name: {_eq:$flow_name}, + archived: {_eq:true}, + project: {name:{_eq:"main"}} + } + offset: $offset + order_by: {version:desc} + ){ + name + version + flow_runs( + where:{ + state: {_eq: "Scheduled"} + } + order_by: {version:desc} + ){ + id + scheduled_start_time + } + } +} +""" + if not prefect_client: + prefect_client = Client() + + variables = {"flow_name": flow_name, "offset": 0} + archived_flow_runs = [] + response = prefect_client.graphql(query=query, variables=variables)["data"] + + for flow in response["flow"]: + for flow_run in flow["flow_runs"]: + if flow["flow_runs"]: + archived_flow_runs.append(flow_run) + log( + f"Got flow_run {flow_run['id']}, scheduled: {flow_run['scheduled_start_time']}" + ) + + if archived_flow_runs: + log(f"O Flow {flow_name} possui runs a serem canceladas") + return {"flow_name": flow_name, "flow_runs": archived_flow_runs} + + +@task +def cancel_flows(flows, prefect_client: Client = None) -> None: + """ + Cancels a flow run from the API. + """ + if not flows: + # log(f"O flow {flow_runs['flow_name']} não possui runs para cancelar") + return + log(">>>>>>>>>> Cancelling flows") + + if not prefect_client: + prefect_client = Client() + + query = """ + mutation($flow_id: UUID!) { + archive_flow ( + input: { + flow_id: $flow_id + } + ) { + success + } + } + """ + cancelled_flows = [] + + for flow in flows: + try: + response = prefect_client.graphql( + query=query, variables=dict(flow_id=flow["id"]) + ) + # state: str = response["data"]["cancel_flow_run"]["state"] + log(response) + log(f">>>>>>>>>> Flow run {flow['id']} arquivada") + cancelled_flows.append(flow) + except Exception: + log(traceback.format_exc()) + log(f"Flow {flow['id']} could not be cancelled") + + # Notify cancellation + + try: + url = get_vault_secret("cancelled_runs_webhook")["url"] + send_cancelled_run_on_discord(cancelled_flows, flows, webhook_url=url) + except Exception: + log(traceback.format_exc()) + log("Could not get a webhook to send messages to")