diff --git a/pipelines/capture/templates/flows.py b/pipelines/capture/templates/flows.py index 341982e4..a16701c8 100644 --- a/pipelines/capture/templates/flows.py +++ b/pipelines/capture/templates/flows.py @@ -11,6 +11,7 @@ from prefect.tasks.core.function import FunctionTask from prefeitura_rio.pipelines_utils.custom import Flow from prefeitura_rio.pipelines_utils.state_handlers import ( + handler_initialize_sentry, handler_inject_bd_credentials, handler_skip_if_running, ) @@ -239,9 +240,7 @@ def create_default_capture_flow( image=constants.DOCKER_IMAGE.value, labels=[agent_label], ) - capture_flow.state_handlers = [ - handler_inject_bd_credentials, - ] + capture_flow.state_handlers = [handler_inject_bd_credentials, handler_initialize_sentry] if skip_if_running: capture_flow.state_handlers.append(handler_skip_if_running) diff --git a/pipelines/constants.py b/pipelines/constants.py index 00c38c3f..a9ad6a21 100644 --- a/pipelines/constants.py +++ b/pipelines/constants.py @@ -54,6 +54,10 @@ class constants(Enum): # pylint: disable=c0103 MODE_INCR = "incr" FLOW_RUN_URL_PATTERN = "https://pipelines.dados.rio/smtr/flow-run/{run_id}" + GLITCH_API_ENDPOINT = "https://glitch.dados.rio/api/0/projects/smtr/pipelines/issues/" + GLITCH_URL = "https://glitch.dados.rio/" + GLITCH_AUTH = "glitch_auth" + GLITCH_WEBHOOK = "glitch" # URLS # REPO_URL = "https://api.github.com/repos/prefeitura-rio/pipelines_rj_smtr" DATAPLEX_URL = "https://console.cloud.google.com/dataplex/govern/quality" diff --git a/pipelines/flows.py b/pipelines/flows.py index 3fff458c..6fdda596 100644 --- a/pipelines/flows.py +++ b/pipelines/flows.py @@ -6,5 +6,6 @@ from pipelines.capture.jae.flows import * # noqa from pipelines.capture.templates.flows import * # noqa from pipelines.exemplo import * # noqa +from pipelines.glitchtip.flows import * # noqa from pipelines.treatment.bilhetagem.flows import * # noqa from pipelines.veiculo.flows import * # noqa diff --git a/pipelines/glitchtip/__init__.py b/pipelines/glitchtip/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pipelines/glitchtip/flows.py b/pipelines/glitchtip/flows.py new file mode 100644 index 00000000..4da1db3f --- /dev/null +++ b/pipelines/glitchtip/flows.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*- +from prefect.run_configs import KubernetesRun +from prefect.storage import GCS + +# isort: off +# EMD Imports # + +from prefeitura_rio.pipelines_utils.custom import Flow +from prefeitura_rio.pipelines_utils.state_handlers import ( + handler_initialize_sentry, + handler_inject_bd_credentials, +) + +from pipelines.constants import constants as emd_constants +from pipelines.schedules import every_minute, every_day_noon +from pipelines.glitchtip.tasks import ( + test_raise_errors, + glitch_api_get_issues, + format_glitch_issue_messages, + send_issue_report, +) +from pipelines.utils.backup.tasks import get_current_timestamp + +with Flow("SMTR - Teste de Erros do Glitch Tip") as raise_flow: + datetime = get_current_timestamp() + test_raise_errors(datetime=datetime) + +raise_flow.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) +raise_flow.run_config = KubernetesRun( + image=emd_constants.DOCKER_IMAGE.value, + labels=[emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value], +) +raise_flow.state_handlers = [handler_initialize_sentry, handler_inject_bd_credentials] +raise_flow.schedule = every_minute + +with Flow("SMTR - Report de Issues do Glitch Tip") as glitch_flow: + issues = glitch_api_get_issues() + messages = format_glitch_issue_messages(issues) + send_issue_report(messages) + +glitch_flow.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) +glitch_flow.run_config = KubernetesRun( + image=emd_constants.DOCKER_IMAGE.value, + labels=[emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value], +) +glitch_flow.state_handlers = [handler_initialize_sentry] +glitch_flow.schedule = every_day_noon diff --git a/pipelines/glitchtip/tasks.py b/pipelines/glitchtip/tasks.py new file mode 100644 index 00000000..0a69c231 --- /dev/null +++ b/pipelines/glitchtip/tasks.py @@ -0,0 +1,79 @@ +# -*- coding: utf-8 -*- +from datetime import datetime + +import requests +from prefect import task +from prefeitura_rio.pipelines_utils.logging import log + +from pipelines.constants import constants +from pipelines.utils.secret import get_secret + +# EMD Imports # + + +# SMTR Imports # + + +@task +def test_raise_errors(datetime: datetime): + if datetime.minute % 5 == 0: + raise ValueError(f"{datetime} % 5 is equal to zero") + if datetime.minute % 3 == 0: + raise ValueError(f"{datetime} % 3 is equal to zero") + else: + return datetime.minute / 0 + + +@task +def glitch_api_get_issues(query="is:unresolved"): + base_url = constants.GLITCH_API_ENDPOINT.value + + headers = get_secret(secret_path=constants.GLITCH_AUTH.value) + if query: + base_url += f"?query={query}" + log(f"Will query glitch tip api at: {base_url}") + issues = requests.get(base_url, headers=headers) + + return issues.json() + + +@task +def format_glitch_issue_messages(issues): + messages = [] + issue_count = len(issues) + base_message = f"**Issues não resolvidos: {issue_count}**" + messages.append(base_message) + for issue in issues: + msg = f"""**Fonte: {issue['culprit']}** +**Erro**: {issue['title']} +**Event Count:** {issue['count']} +**Criado:** {issue['firstSeen'].split('.')[0]} +**Link:** {constants.GLITCH_URL.value}/smtr/issues/{issue['id']} +""" + messages.append({"name": f"Issue {issue['id']}", "value": msg}) + + return messages + + +@task +def send_issue_report(messages): + timestamp = datetime.now().isoformat() + webhook = get_secret( + secret_path=constants.WEBHOOKS_SECRET_PATH.value, secret_name=constants.GLITCH_WEBHOOK.value + )["glitch"] + headers = {"Content-Type": "application/json"} + message = { + "content": messages[0], + "embeds": [ + { + "color": 16515072, + "timestamp": timestamp, + "author": {"name": "Glitch Tip Issues", "url": constants.GLITCH_URL.value}, + "fields": messages[1:], + } + ], + } + response = requests.post(url=webhook, headers=headers, json=message) + + log(response.text) + log(response.status_code) diff --git a/pipelines/schedules.py b/pipelines/schedules.py index f87fcf1f..d0e112da 100644 --- a/pipelines/schedules.py +++ b/pipelines/schedules.py @@ -114,6 +114,18 @@ def generate_interval_schedule( ] ) +every_day_noon = Schedule( + clocks=[ + IntervalClock( + interval=timedelta(days=1), + start_date=datetime(2021, 1, 1, 12, 0, tzinfo=timezone(constants.TIMEZONE.value)), + labels=[ + emd_constants.RJ_SMTR_AGENT_LABEL.value, + ], + ), + ] +) + ftp_clocks = generate_ftp_schedules( interval_minutes=60, label=emd_constants.RJ_SMTR_DEV_AGENT_LABEL )