forked from G-Research/alerta-contrib
-
Notifications
You must be signed in to change notification settings - Fork 0
/
alerta_alertops.py
77 lines (59 loc) · 2.53 KB
/
alerta_alertops.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import logging
import os
import re
import requests
from alerta.exceptions import RejectException
from alerta.plugins import PluginBase
try:
from alerta.plugins import app # alerta >= 5.0
except ImportError:
from alerta.app import app # alerta < 5.0
from alerta.plugins import PluginBase
LOG = logging.getLogger('alerta.plugins')
ALERTOPS_URL = os.environ.get('ALERTOPS_URL') or app.config['ALERTOPS_URL']
DASHBOARD_URL = os.environ.get('DASHBOARD_URL') or app.config['DASHBOARD_URL']
class TriggerEvent(PluginBase):
def pre_receive(self, alert, **kwargs):
return alert
@staticmethod
def _event_type(severity):
if severity in ['cleared', 'normal', 'ok']:
return "close"
else:
return "open"
def post_receive(self, alert, **kwargs):
if alert.repeat:
return
message = "%s: %s alert for %s - %s" %( alert.environment, alert.severity.capitalize(), ','.join(alert.service), alert.resource)
payload = {
"source_id": alert.id,
"source_status": TriggerEvent._event_type(alert.severity),
"description": message,
"resource": alert.resource,
"source": "alerta",
"source_url": '%s/#/alert/%s' % (DASHBOARD_URL, alert.id),
"details": alert.get_body(history=False)}
LOG.debug('AlertOps Payload: %s', payload)
try:
r = requests.post(ALERTOPS_URL, json=payload, timeout=2)
except Exception as e:
raise RuntimeError("AlertOps connection error: %s" % e)
LOG.debug('AlertOps response: %s - %s' % (r.status_code, r.text))
return
def status_change(self, alert, status, text, **kwargs):
if status not in ['ack', 'assign']:
return
message = "%s: %s alert for %s - %s" %( alert.environment, alert.severity.capitalize(), ','.join(alert.service), alert.resource)
payload = {
"source_id": alert.id,
"source_status": TriggerEvent._event_type(alert.severity),
"description": message,
"resource": alert.resource,
"source": "alerta",
"source_url": '%s/#/alert/%s' % (DASHBOARD_URL, alert.id),
"details": alert.get_body(history=False)}
try:
r = requests.post(ALERTOPS_URL, json=payload, timeout=2)
except Exception as e:
raise RuntimeError("AlertOps connection error: %s" % e)
LOG.debug('AlertOps response: %s - %s' % (r.status_code, r.text))