-
Notifications
You must be signed in to change notification settings - Fork 0
/
alerta_nctalk.py
83 lines (68 loc) · 2.3 KB
/
alerta_nctalk.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import logging
import os
from hashlib import sha256
from secrets import token_bytes
import requests
try:
from alerta.plugins import app # alerta >= 5.0
except ImportError:
from alerta.app import app # alerta < 5.0
from alerta.plugins import PluginBase
LOG = logging.getLogger("alerta.plugins.nctalk")
NEXTCLOUD_SERVER = os.environ.get("NEXTCLOUD_SERVER") or app.config["NEXTCLOUD_SERVER"]
NEXTCLOUD_TALK_TOKEN = (
os.environ.get("NEXTCLOUD_TALK_TOKEN") or app.config["NEXTCLOUD_TALK_TOKEN"]
)
NEXTCLOUD_USERNAME = (
os.environ.get("NEXTCLOUD_USERNAME") or app.config["NEXTCLOUD_USERNAME"]
)
NEXTCLOUD_PASSWORD = (
os.environ.get("NEXTCLOUD_PASSWORD") or app.config["NEXTCLOUD_PASSWORD"]
)
DASHBOARD_URL = os.environ.get("DASHBOARD_URL") or app.config.get("DASHBOARD_URL", "")
SEVERITY_ICON = {
"critical": "🔴 ",
"warning": "⚠ ",
"ok": "✅ ",
"cleared": "✅ ",
"normal": "✅ ",
}
class SendMessage(PluginBase):
def pre_receive(self, alert):
return alert
def post_receive(self, alert):
if alert.repeat:
return
severity = SEVERITY_ICON.get(alert.severity, "")
body = "{}{}: {} alert for {} \n{} - {} - {} \n{} \nDate: {} | {}/#/alert/{}".format(
severity,
alert.environment,
alert.severity.capitalize(),
",".join(alert.service),
alert.resource,
alert.event,
alert.value,
alert.text,
alert.create_time,
DASHBOARD_URL,
alert.id,
)
headers = {"Content-Type": "application/json", "OCS-APIRequest": "true"}
payload = {
"message": body,
"referenceId": sha256(token_bytes(256)).hexdigest(),
}
LOG.debug("NC Talk: %s", payload)
try:
r = requests.post(
f"{NEXTCLOUD_SERVER}/ocs/v2.php/apps/spreed/api/v1/chat/{NEXTCLOUD_TALK_TOKEN}",
json=payload,
headers=headers,
auth=(NEXTCLOUD_USERNAME, NEXTCLOUD_PASSWORD),
timeout=2,
)
except Exception as e:
raise RuntimeError("NC Talk: ERROR - %s" % e)
LOG.debug("NC Talk: %s - %s", r.status_code, r.text)
def status_change(self, alert, status, text):
return