Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feat] Adiciona flow de teste para subir erros no Glitch Tip #14

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
5 changes: 2 additions & 3 deletions pipelines/capture/templates/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from prefect.tasks.core.function import FunctionTask
from prefeitura_rio.pipelines_utils.custom import Flow
from prefeitura_rio.pipelines_utils.state_handlers import (
handler_initialize_sentry,
handler_inject_bd_credentials,
handler_skip_if_running,
)
Expand Down Expand Up @@ -239,9 +240,7 @@ def create_default_capture_flow(
image=constants.DOCKER_IMAGE.value,
labels=[agent_label],
)
capture_flow.state_handlers = [
handler_inject_bd_credentials,
]
capture_flow.state_handlers = [handler_inject_bd_credentials, handler_initialize_sentry]

if skip_if_running:
capture_flow.state_handlers.append(handler_skip_if_running)
Expand Down
4 changes: 4 additions & 0 deletions pipelines/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ class constants(Enum): # pylint: disable=c0103
MODE_INCR = "incr"
FLOW_RUN_URL_PATTERN = "https://pipelines.dados.rio/smtr/flow-run/{run_id}"

GLITCH_API_ENDPOINT = "https://glitch.dados.rio/api/0/projects/smtr/pipelines/issues/"
GLITCH_URL = "https://glitch.dados.rio/"
GLITCH_AUTH = "glitch_auth"
GLITCH_WEBHOOK = "glitch"
# URLS #
REPO_URL = "https://api.github.com/repos/prefeitura-rio/pipelines_rj_smtr"
DATAPLEX_URL = "https://console.cloud.google.com/dataplex/govern/quality"
Expand Down
1 change: 1 addition & 0 deletions pipelines/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@
from pipelines.capture.jae.flows import * # noqa
from pipelines.capture.templates.flows import * # noqa
from pipelines.exemplo import * # noqa
from pipelines.glitchtip.flows import * # noqa
from pipelines.treatment.bilhetagem.flows import * # noqa
from pipelines.veiculo.flows import * # noqa
Empty file added pipelines/glitchtip/__init__.py
Empty file.
47 changes: 47 additions & 0 deletions pipelines/glitchtip/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# -*- coding: utf-8 -*-
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS

# isort: off
# EMD Imports #

from prefeitura_rio.pipelines_utils.custom import Flow
from prefeitura_rio.pipelines_utils.state_handlers import (
handler_initialize_sentry,
handler_inject_bd_credentials,
)

from pipelines.constants import constants as emd_constants
from pipelines.schedules import every_minute, every_day_noon
from pipelines.glitchtip.tasks import (
test_raise_errors,
glitch_api_get_issues,
format_glitch_issue_messages,
send_issue_report,
)
from pipelines.utils.backup.tasks import get_current_timestamp

with Flow("SMTR - Teste de Erros do Glitch Tip") as raise_flow:
datetime = get_current_timestamp()
test_raise_errors(datetime=datetime)

raise_flow.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
raise_flow.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value],
)
raise_flow.state_handlers = [handler_initialize_sentry, handler_inject_bd_credentials]
raise_flow.schedule = every_minute

with Flow("SMTR - Report de Issues do Glitch Tip") as glitch_flow:
issues = glitch_api_get_issues()
messages = format_glitch_issue_messages(issues)
send_issue_report(messages)

glitch_flow.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
glitch_flow.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value],
)
glitch_flow.state_handlers = [handler_initialize_sentry]
glitch_flow.schedule = every_day_noon
79 changes: 79 additions & 0 deletions pipelines/glitchtip/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# -*- coding: utf-8 -*-
from datetime import datetime

import requests
from prefect import task
from prefeitura_rio.pipelines_utils.logging import log

from pipelines.constants import constants
from pipelines.utils.secret import get_secret

# EMD Imports #


# SMTR Imports #


@task
def test_raise_errors(datetime: datetime):
if datetime.minute % 5 == 0:
raise ValueError(f"{datetime} % 5 is equal to zero")
if datetime.minute % 3 == 0:
raise ValueError(f"{datetime} % 3 is equal to zero")
else:
return datetime.minute / 0


@task
def glitch_api_get_issues(query="is:unresolved"):
base_url = constants.GLITCH_API_ENDPOINT.value

headers = get_secret(secret_path=constants.GLITCH_AUTH.value)
if query:
base_url += f"?query={query}"
log(f"Will query glitch tip api at: {base_url}")
issues = requests.get(base_url, headers=headers)

return issues.json()


@task
def format_glitch_issue_messages(issues):
messages = []
issue_count = len(issues)
base_message = f"**Issues não resolvidos: {issue_count}**"
messages.append(base_message)
for issue in issues:
msg = f"""**Fonte: {issue['culprit']}**
**Erro**: {issue['title']}
**Event Count:** {issue['count']}
**Criado:** {issue['firstSeen'].split('.')[0]}
**Link:** {constants.GLITCH_URL.value}/smtr/issues/{issue['id']}
"""
messages.append({"name": f"Issue {issue['id']}", "value": msg})

return messages


@task
def send_issue_report(messages):
timestamp = datetime.now().isoformat()
webhook = get_secret(
secret_path=constants.WEBHOOKS_SECRET_PATH.value, secret_name=constants.GLITCH_WEBHOOK.value
)["glitch"]
headers = {"Content-Type": "application/json"}
message = {
"content": messages[0],
"embeds": [
{
"color": 16515072,
"timestamp": timestamp,
"author": {"name": "Glitch Tip Issues", "url": constants.GLITCH_URL.value},
"fields": messages[1:],
}
],
}
response = requests.post(url=webhook, headers=headers, json=message)

log(response.text)
log(response.status_code)
12 changes: 12 additions & 0 deletions pipelines/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,18 @@ def generate_interval_schedule(
]
)

every_day_noon = Schedule(
clocks=[
IntervalClock(
interval=timedelta(days=1),
start_date=datetime(2021, 1, 1, 12, 0, tzinfo=timezone(constants.TIMEZONE.value)),
labels=[
emd_constants.RJ_SMTR_AGENT_LABEL.value,
],
),
]
)

ftp_clocks = generate_ftp_schedules(
interval_minutes=60, label=emd_constants.RJ_SMTR_DEV_AGENT_LABEL
)
Expand Down
Loading