Skip to content

Commit

Permalink
feat: add possiblility to send in mulitple alerts simultaneously
Browse files Browse the repository at this point in the history
  • Loading branch information
sbgap committed Nov 8, 2024
1 parent a7a9e3d commit 13d68b0
Show file tree
Hide file tree
Showing 5 changed files with 390 additions and 1 deletion.
124 changes: 124 additions & 0 deletions alerta/database/backends/postgres/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,120 @@ def correlate_alert(self, alert, history):
)
return self._updateone(update, vars(alert), returning=True)

def correlate_multiple_alerts(self, alerts):
"""
Update alert status, service, value, text, timeout and rawData, increment duplicate count and set
repeat=True, and keep track of last receive id and time but don't append to history unless status changes.
"""
objs = {}
alerts_values = []
for i, alert in enumerate(alerts):
alerts_values.append(
f"""
(
%(environment{i})s, %(resource{i})s, %(event{i})s, %(severity{i})s,
%(status{i})s,%(service{i})s, %(value{i})s, %(text{i})s,
%(timeout{i})s, %(raw_data{i})s, %(last_receive_id{i})s, (%(last_receive_time{i})s)::timestamp without time zone,
%(tags{i})s, (%(attributes{i})s)::jsonb, (%(update_time{i})s)::timestamp without time zone,
(%(history{i})s), %(customer{i})s, (%(create_time{i})s)::timestamp without time zone, %(previous_severity{i})s,
%(trend_indication{i})s, (%(receive_time{i})s)::timestamp without time zone, %(duplicate_count{i})s
)
"""
)
objs.update({f"{key}{i}": value for key, value in vars(alert).items()})
update = f"""
UPDATE alerts
SET event=al.event, severity=al.severity, status=al.status, service=al.service, value=al.value, text=al.text,
create_time=al.create_time, timeout=al.timeout, raw_data=al.raw_data, repeat=FALSE, previous_severity=al.previous_severity,
trend_indication=al.trend_indication, receive_time=al.receive_time, last_receive_id=al.last_receive_id, last_receive_time=al.last_receive_time,
tags=ARRAY(SELECT DISTINCT UNNEST(alerts.tags || al.tags)), attributes=alerts.attributes || al.attributes,
duplicate_count=al.duplicate_count, update_time=COALESCE(al.update_time, alerts.update_time),
history=CASE WHEN al.history IS NULL THEN alerts.history ELSE (al.history || alerts.history)[1:{current_app.config['HISTORY_LIMIT']}] END
FROM (VALUES
{",".join(alerts_values)}
) as al(
environment, resource, event, severity,
status, service, value, text,
timeout, raw_data, last_receive_id, last_receive_time,
tags, attributes, update_time, history, customer, create_time, previous_severity,
trend_indication, receive_time, duplicate_count
)
WHERE alerts.environment=al.environment
AND alerts.resource=al.resource
AND ((alerts.event=al.event AND alerts.severity!=al.severity) OR (alerts.event!=al.event AND al.event=ANY(alerts.correlate)))
RETURNING alerts.*
"""
return self._updateall(update, objs, returning=True)

def dedup_multiple_alerts(self, alerts):
"""
Update alert status, service, value, text, timeout and rawData, increment duplicate count and set
repeat=True, and keep track of last receive id and time but don't append to history unless status changes.
"""
objs = {}
alerts_values = []
for i, alert in enumerate(alerts):
alerts_values.append(
f"""
(
%(environment{i})s, %(resource{i})s, %(event{i})s, %(severity{i})s,
%(status{i})s,%(service{i})s,%(value{i})s, %(text{i})s,
%(timeout{i})s, %(raw_data{i})s, %(last_receive_id{i})s, (%(last_receive_time{i})s)::timestamp without time zone,
%(tags{i})s, (%(attributes{i})s)::jsonb, (%(update_time{i})s)::timestamp without time zone,
(%(history{i})s)::history, %(customer{i})s
)
"""
)
objs.update({f"{key}{i}": value for key, value in vars(alert).items()})
update = f"""
UPDATE alerts
SET status=al.status, service=al.service, value=al.value, text=al.text,
timeout=al.timeout, raw_data=al.raw_data, repeat=TRUE,
last_receive_id=al.last_receive_id, last_receive_time=al.last_receive_time,
tags=ARRAY(SELECT DISTINCT UNNEST(alerts.tags || al.tags)), attributes=alerts.attributes || al.attributes,
duplicate_count=alerts.duplicate_count + 1, update_time=COALESCE(al.update_time, alerts.update_time),
history=CASE WHEN al.history IS NULL THEN alerts.history ELSE (al.history || alerts.history)[1:{current_app.config['HISTORY_LIMIT']}] END
FROM (VALUES
{",".join(alerts_values)}
) as al(
environment, resource, event, severity,
status, service, value, text,
timeout, raw_data, last_receive_id, last_receive_time,
tags, attributes, update_time, history, customer
)
WHERE alerts.environment=al.environment
AND alerts.resource=al.resource
AND alerts.event=al.event
AND alerts.severity=al.severity
RETURNING alerts.*
"""
return self._updateall(update, objs, returning=True)

def create_multiple_alerts(self, alerts):
alerts_insert = ",".join([
f"""
(
%(id{i})s, %(resource{i})s, %(event{i})s, %(environment{i})s, %(severity{i})s, %(correlate{i})s, %(status{i})s,
%(service{i})s, %(group{i})s, %(value{i})s, %(text{i})s, %(tags{i})s, %(attributes{i})s, %(origin{i})s,
%(event_type{i})s, %(create_time{i})s, %(timeout{i})s, %(raw_data{i})s, %(customer{i})s, %(duplicate_count{i})s,
%(repeat{i})s, %(previous_severity{i})s, %(trend_indication{i})s, %(receive_time{i})s, %(last_receive_id{i})s,
%(last_receive_time{i})s, %(update_time{i})s, %(history{i})s::history[]
)
""" for i in range(len(alerts))])
objs = {}
for i, alert in enumerate(alerts):
objs.update({f"{key}{i}": value for key, value in vars(alert).items()})

insert = f"""
INSERT INTO alerts (id, resource, event, environment, severity, correlate, status, service, "group",
value, text, tags, attributes, origin, type, create_time, timeout, raw_data, customer,
duplicate_count, repeat, previous_severity, trend_indication, receive_time, last_receive_id,
last_receive_time, update_time, history)
VALUES {alerts_insert}
RETURNING *
"""
return self._insert_all(insert, objs)

def create_alert(self, alert):
insert = """
INSERT INTO alerts (id, resource, event, environment, severity, correlate, status, service, "group",
Expand Down Expand Up @@ -2286,6 +2400,16 @@ def _insert(self, query, vars):
self.get_db().commit()
return cursor.fetchone()

def _insert_all(self, query, vars):
"""
Insert, with return.
"""
cursor = self.get_db().cursor()
self._log(cursor, query, vars)
cursor.execute(query, vars)
self.get_db().commit()
return cursor.fetchall()

def _fetchone(self, query, vars):
"""
Return none or one row.
Expand Down
9 changes: 9 additions & 0 deletions alerta/database/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,18 @@ def is_flapping(self, alert, window=1800, count=2):
def dedup_alert(self, alert, history):
raise NotImplementedError

def dedup_multiple_alerts(self, alerts):
raise NotImplementedError

def correlate_alert(self, alert, history):
raise NotImplementedError

def correlate_multiple_alerts(self, alerts):
raise NotImplementedError

def create_multiple_alerts(self, alerts):
raise NotImplementedError

def create_alert(self, alert):
raise NotImplementedError

Expand Down
144 changes: 144 additions & 0 deletions alerta/models/alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,150 @@ def get_services(query: Query = None) -> List[str]:
def get_groups(query: Query = None) -> List[str]:
return db.get_alert_groups(query)

@staticmethod
def create_multiple_alerts(alerts: 'list[Alert]'):
if len(alerts) < 1:
return []
now = datetime.utcnow()
for alert in alerts:

trend_indication = alarm_model.trend(alarm_model.DEFAULT_PREVIOUS_SEVERITY, alert.severity)

_, alert.status = alarm_model.transition(
alert=alert
)

alert.duplicate_count = 0
alert.repeat = False
alert.previous_severity = alarm_model.DEFAULT_PREVIOUS_SEVERITY
alert.trend_indication = trend_indication
alert.receive_time = now
alert.last_receive_id = alert.id
alert.last_receive_time = now
alert.update_time = now

alert.history = [History(
id=alert.id,
event=alert.event,
severity=alert.severity,
status=alert.status,
value=alert.value,
text=alert.text,
change_type=ChangeType.new,
update_time=alert.create_time,
user=g.login,
timeout=alert.timeout
)]
return [Alert.from_db(alert) for alert in db.create_multiple_alerts(alerts)]

@staticmethod
def dedup_multiple_alerts(duplicates: 'list[dict[str, Alert]]'):
if len(duplicates) < 1:
return []
now = datetime.utcnow()
alerts = []
for alert in duplicates:
alert, duplicate_of = (alert['alert'], alert['duplicate'])
status, previous_value, previous_status, _ = alert._get_hist_info()

_, new_status = alarm_model.transition(
alert=alert,
current_status=status,
previous_status=previous_status
)

alert.repeat = True
alert.last_receive_id = alert.id
alert.last_receive_time = now

if new_status != status:
r = status_change_hook.send(duplicate_of, status=new_status, text=alert.text)
_, (_, new_status, text) = r[0]
alert.update_time = now

history = History(
id=alert.id,
event=alert.event,
severity=alert.severity,
status=new_status,
value=alert.value,
text=text,
change_type=ChangeType.status,
update_time=alert.create_time,
user=g.login,
timeout=alert.timeout,
) # type: Optional[History]

elif current_app.config['HISTORY_ON_VALUE_CHANGE'] and alert.value != previous_value:
history = History(
id=alert.id,
event=alert.event,
severity=alert.severity,
status=status,
value=alert.value,
text=alert.text,
change_type=ChangeType.value,
update_time=alert.create_time,
user=g.login,
timeout=alert.timeout,
)
else:
history = None

alert.history = history
alert.status = new_status
alerts.append(alert)
return [Alert.from_db(alert) for alert in db.dedup_multiple_alerts(alerts)]

@staticmethod
def update_multiple(correlates: 'list[dict[str, Alert]]') -> 'Alert':
if len(correlates) < 1:
return []
now = datetime.utcnow()
alerts = []
for alert in correlates:
alert, correlate_with = (alert['alert'], alert['correlate'])
alert.previous_severity = db.get_severity(alert)
alert.trend_indication = alarm_model.trend(alert.previous_severity, alert.severity)

status, _, previous_status, _ = alert._get_hist_info()

_, new_status = alarm_model.transition(
alert=alert,
current_status=status,
previous_status=previous_status
)

alert.duplicate_count = 0
alert.repeat = False
alert.receive_time = now
alert.last_receive_id = alert.id
alert.last_receive_time = now

if new_status != status:
r = status_change_hook.send(correlate_with, status=new_status, text=alert.text)
_, (_, new_status, text) = r[0]
alert.update_time = now
else:
text = alert.text

alert.history = [History(
id=alert.id,
event=alert.event,
severity=alert.severity,
status=new_status,
value=alert.value,
text=text,
change_type=ChangeType.severity,
update_time=alert.create_time,
user=g.login,
timeout=alert.timeout
)]

alert.status = new_status
alerts.append(alert)
return [Alert.from_db(alert) for alert in db.correlate_multiple_alerts(alerts)]

# get tags
@staticmethod
def get_tags(query: Query = None) -> List[str]:
Expand Down
75 changes: 75 additions & 0 deletions alerta/utils/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,81 @@ def assign_customer(wanted: str = None, permission: str = Scope.admin_alerts) ->
return None


def process_multiple_alert(alerts: 'list[Alert]') -> Alert:
new: 'list[Alert]' = []
correlates: 'list[dict[str, Alert]]' = []
duplicates:'list[dict[str, Alert]]' = []

for alert in alerts:

wanted_plugins, wanted_config = plugins.routing(alert)

skip_plugins = False
for plugin in wanted_plugins:
if alert.is_suppressed:
skip_plugins = True
break
try:
alert = plugin.pre_receive(alert, config=wanted_config)
except TypeError:
alert = plugin.pre_receive(alert) # for backward compatibility
except (RejectException, HeartbeatReceived, BlackoutPeriod, RateLimit, ForwardingLoop, AlertaException):
raise
except Exception as e:
if current_app.config['PLUGINS_RAISE_ON_ERROR']:
raise RuntimeError(f"Error while running pre-receive plugin '{plugin.name}': {str(e)}")
else:
logging.error(f"Error while running pre-receive plugin '{plugin.name}': {str(e)}")
if not alert:
raise SyntaxError(f"Plugin '{plugin.name}' pre-receive hook did not return modified alert")

try:
is_duplicate = alert.is_duplicate()
if is_duplicate:
duplicates.append({'alert': alert, 'duplicate': is_duplicate})
else:
is_correlated = alert.is_correlated()
if is_correlated:
correlates.append({'alert': alert, 'correlate': is_correlated})
else:
new.append(alert)
except Exception as e:
raise ApiError(str(e))

alerts = []

for alert in [*Alert.update_multiple(correlates), *Alert.dedup_multiple_alerts(duplicates), *Alert.create_multiple_alerts(new)]:

wanted_plugins, wanted_config = plugins.routing(alert)

alert_was_updated: bool = False
for plugin in wanted_plugins:
if skip_plugins:
break
try:
updated = plugin.post_receive(alert, config=wanted_config)
except TypeError:
updated = plugin.post_receive(alert) # for backward compatibility
except AlertaException:
raise
except Exception as e:
if current_app.config['PLUGINS_RAISE_ON_ERROR']:
raise ApiError(f"Error while running post-receive plugin '{plugin.name}': {str(e)}")
else:
logging.error(f"Error while running post-receive plugin '{plugin.name}': {str(e)}")
if updated:
alert = updated
alert_was_updated = True

if alert_was_updated:
alert.update_tags(alert.tags)
alert.attributes = alert.update_attributes(alert.attributes)

alerts.append(alert)

return alerts


def process_alert(alert: Alert) -> Alert:

wanted_plugins, wanted_config = plugins.routing(alert)
Expand Down
Loading

0 comments on commit 13d68b0

Please sign in to comment.