Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Code enhancements for future use #198

Merged
merged 5 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 3 additions & 15 deletions internal/config/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,7 @@ func (r *RuntimeConfig) fetchRules(ctx context.Context, tx *sqlx.Tx) error {

escalationsByID := make(map[int64]*rule.Escalation)
for _, escalation := range escalations {
escalationLogger := r.logger.With(
zap.Int64("id", escalation.ID),
zap.Int64("rule_id", escalation.RuleID),
zap.String("condition", escalation.ConditionExpr.String),
zap.String("name", escalation.NameRaw.String),
zap.Int64("fallback_for", escalation.FallbackForID.Int64),
)
escalationLogger := r.logger.With(zap.Inline(escalation))

rule := rulesByID[escalation.RuleID]
if rule == nil {
Expand All @@ -87,10 +81,6 @@ func (r *RuntimeConfig) fetchRules(ctx context.Context, tx *sqlx.Tx) error {
continue
}

if escalation.NameRaw.Valid {
escalation.Name = escalation.NameRaw.String
}

rule.Escalations[escalation.ID] = escalation
escalationsByID[escalation.ID] = escalation
escalationLogger.Debugw("loaded escalation config")
Expand Down Expand Up @@ -165,11 +155,11 @@ func (r *RuntimeConfig) applyPendingRules() {
recipientLogger := r.logger.With(
zap.Int64("id", recipient.ID),
zap.Int64("escalation_id", recipient.EscalationID),
zap.Int64("channel_id", recipient.ChannelID.Int64))
zap.Int64("channel_id", recipient.ChannelID.Int64),
zap.Inline(recipient.Key))

if recipient.ContactID.Valid {
id := recipient.ContactID.Int64
recipientLogger = recipientLogger.With(zap.Int64("contact_id", id))
if c := r.Contacts[id]; c != nil {
recipient.Recipient = c
} else {
Expand All @@ -178,7 +168,6 @@ func (r *RuntimeConfig) applyPendingRules() {
}
} else if recipient.GroupID.Valid {
id := recipient.GroupID.Int64
recipientLogger = recipientLogger.With(zap.Int64("contactgroup_id", id))
if g := r.Groups[id]; g != nil {
recipient.Recipient = g
} else {
Expand All @@ -187,7 +176,6 @@ func (r *RuntimeConfig) applyPendingRules() {
}
} else if recipient.ScheduleID.Valid {
id := recipient.ScheduleID.Int64
recipientLogger = recipientLogger.With(zap.Int64("schedule_id", id))
if s := r.Schedules[id]; s != nil {
recipient.Recipient = s
} else {
Expand Down
4 changes: 4 additions & 0 deletions internal/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package event
import (
"bytes"
"context"
"errors"
"fmt"
"github.com/icinga/icinga-notifications/internal/utils"
"github.com/icinga/icingadb/pkg/icingadb"
Expand All @@ -11,6 +12,9 @@ import (
"time"
)

// ErrSuperfluousStateChange indicates a superfluous state change being ignored and stopping further processing.
var ErrSuperfluousStateChange = errors.New("ignoring superfluous state change")

// Event received of a specified Type for internal processing.
//
// The JSON struct tags are being used to unmarshal a JSON representation received from the listener.Listener. Some
Expand Down
2 changes: 1 addition & 1 deletion internal/icinga2/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (launcher *Launcher) launch(src *config.Source) {

err := incident.ProcessEvent(subCtx, launcher.Db, launcher.Logs, launcher.RuntimeConfig, ev)
switch {
case errors.Is(err, incident.ErrSuperfluousStateChange):
case errors.Is(err, event.ErrSuperfluousStateChange):
l.Debugw("Stopped processing event with superfluous state change", zap.Error(err))
case err != nil:
l.Errorw("Cannot process event", zap.Error(err))
Expand Down
17 changes: 17 additions & 0 deletions internal/incident/db_types.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package incident

import (
"context"
"github.com/icinga/icinga-notifications/internal/event"
"github.com/icinga/icinga-notifications/internal/recipient"
"github.com/icinga/icinga-notifications/internal/utils"
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/types"
"github.com/jmoiron/sqlx"
)

// EventRow represents a single incident event database entry.
Expand Down Expand Up @@ -85,6 +89,19 @@ func (h *HistoryRow) TableName() string {
return "incident_history"
}

// Sync persists the current state of this history to the database and retrieves the just inserted history ID.
// Returns error when failed to execute the query.
func (h *HistoryRow) Sync(ctx context.Context, db *icingadb.DB, tx *sqlx.Tx) error {
historyId, err := utils.InsertAndFetchId(ctx, tx, utils.BuildInsertStmtWithout(db, h, "id"), h)
if err != nil {
return err
}

h.ID = historyId

return nil
}

// NotificationEntry is used to cache a set of incident history fields of type Notified.
//
// The event processing workflow is performed in a separate transaction before trying to send the actual
Expand Down
113 changes: 51 additions & 62 deletions internal/incident/incident.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (i *Incident) IsNotifiable(role ContactRole) bool {
}

// ProcessEvent processes the given event for the current incident in an own transaction.
func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event, created bool) error {
func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error {
i.Lock()
defer i.Unlock()

Expand All @@ -131,7 +131,8 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event, created bo
return errors.New("can't insert event and fetch its ID")
}

if created {
isNew := i.StartedAt.Time().IsZero()
if isNew {
err = i.processIncidentOpenedEvent(ctx, tx, ev)
if err != nil {
return err
Expand All @@ -148,7 +149,7 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event, created bo

switch ev.Type {
case event.TypeState:
if !created {
if !isNew {
if err := i.processSeverityChangedEvent(ctx, tx, ev); err != nil {
return err
}
Expand Down Expand Up @@ -235,9 +236,9 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) {
return err
}

channels := make(contactChannels)
channels := make(rule.ContactChannels)
for _, escalation := range escalations {
channels.loadEscalationRecipientsChannel(escalation, i, ev.Time)
channels.LoadFromEscalationRecipients(escalation, ev.Time, i.isRecipientNotifiable)
}

notifications, err = i.addPendingNotifications(ctx, tx, ev, channels)
Expand All @@ -260,13 +261,14 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx,
oldSeverity := i.Severity
newSeverity := ev.Severity
if oldSeverity == newSeverity {
err := fmt.Errorf("%w: %s state event from source %d", ErrSuperfluousStateChange, ev.Severity.String(), ev.SourceId)
err := fmt.Errorf("%w: %s state event from source %d", event.ErrSuperfluousStateChange, ev.Severity.String(), ev.SourceId)
return err
}

i.logger.Infof("Incident severity changed from %s to %s", oldSeverity.String(), newSeverity.String())

history := &HistoryRow{
hr := &HistoryRow{
IncidentID: i.Id,
EventID: utils.ToDBInt(ev.ID),
Time: types.UnixMilli(time.Now()),
Type: IncidentSeverityChanged,
Expand All @@ -275,8 +277,7 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx,
Message: utils.ToDBString(ev.Message),
}

_, err := i.AddHistory(ctx, tx, history, false)
if err != nil {
if err := hr.Sync(ctx, i.db, tx); err != nil {
i.logger.Errorw("Failed to insert incident severity changed history", zap.Error(err))

return errors.New("failed to insert incident severity changed history")
Expand All @@ -288,14 +289,14 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx,

RemoveCurrent(i.Object)

history := &HistoryRow{
EventID: utils.ToDBInt(ev.ID),
Time: i.RecoveredAt,
Type: Closed,
hr = &HistoryRow{
IncidentID: i.Id,
EventID: utils.ToDBInt(ev.ID),
Time: i.RecoveredAt,
Type: Closed,
}

_, err = i.AddHistory(ctx, tx, history, false)
if err != nil {
if err := hr.Sync(ctx, i.db, tx); err != nil {
i.logger.Errorw("Can't insert incident closed history to the database", zap.Error(err))

return errors.New("can't insert incident closed history to the database")
Expand Down Expand Up @@ -327,15 +328,16 @@ func (i *Incident) processIncidentOpenedEvent(ctx context.Context, tx *sqlx.Tx,

i.logger.Infow(fmt.Sprintf("Source %d opened incident at severity %q", ev.SourceId, i.Severity.String()), zap.String("message", ev.Message))

historyRow := &HistoryRow{
hr := &HistoryRow{
IncidentID: i.Id,
Type: Opened,
Time: types.UnixMilli(ev.Time),
EventID: utils.ToDBInt(ev.ID),
NewSeverity: i.Severity,
Message: utils.ToDBString(ev.Message),
}

if _, err := i.AddHistory(ctx, tx, historyRow, false); err != nil {
if err := hr.Sync(ctx, i.db, tx); err != nil {
i.logger.Errorw("Can't insert incident opened history event", zap.Error(err))

return errors.New("can't insert incident opened history event")
Expand Down Expand Up @@ -379,14 +381,14 @@ func (i *Incident) evaluateRules(ctx context.Context, tx *sqlx.Tx, eventID int64
return errors.New("failed to insert incident rule")
}

history := &HistoryRow{
Time: types.UnixMilli(time.Now()),
EventID: utils.ToDBInt(eventID),
RuleID: utils.ToDBInt(r.ID),
Type: RuleMatched,
hr := &HistoryRow{
IncidentID: i.Id,
Time: types.UnixMilli(time.Now()),
EventID: utils.ToDBInt(eventID),
RuleID: utils.ToDBInt(r.ID),
Type: RuleMatched,
}
_, err = i.AddHistory(ctx, tx, history, false)
if err != nil {
if err := hr.Sync(ctx, i.db, tx); err != nil {
i.logger.Errorw("Failed to insert rule matched incident history", zap.String("rule", r.Name), zap.Error(err))

return errors.New("failed to insert rule matched incident history")
Expand Down Expand Up @@ -437,7 +439,7 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Escalation,
if err != nil {
i.logger.Warnw(
"Failed to evaluate escalation condition", zap.String("rule", r.Name),
zap.String("escalation", escalation.DisplayName()), zap.Error(err),
zap.Object("escalation", escalation), zap.Error(err),
)

matched = false
Expand Down Expand Up @@ -481,32 +483,33 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Escalation,
func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *event.Event, escalations []*rule.Escalation) error {
for _, escalation := range escalations {
r := i.runtimeConfig.Rules[escalation.RuleID]
i.logger.Infof("Rule %q reached escalation %q", r.Name, escalation.DisplayName())
i.logger.Infow("Rule reached escalation", zap.String("rule", r.Name), zap.Object("escalation", escalation))

state := &EscalationState{RuleEscalationID: escalation.ID, TriggeredAt: types.UnixMilli(time.Now())}
i.EscalationState[escalation.ID] = state

if err := i.AddEscalationTriggered(ctx, tx, state); err != nil {
i.logger.Errorw(
"Failed to upsert escalation state", zap.String("rule", r.Name),
zap.String("escalation", escalation.DisplayName()), zap.Error(err),
zap.Object("escalation", escalation), zap.Error(err),
)

return errors.New("failed to upsert escalation state")
}

history := &HistoryRow{
hr := &HistoryRow{
IncidentID: i.Id,
Time: state.TriggeredAt,
EventID: utils.ToDBInt(ev.ID),
RuleEscalationID: utils.ToDBInt(state.RuleEscalationID),
RuleID: utils.ToDBInt(r.ID),
Type: EscalationTriggered,
}

if _, err := i.AddHistory(ctx, tx, history, false); err != nil {
if err := hr.Sync(ctx, i.db, tx); err != nil {
i.logger.Errorw(
"Failed to insert escalation triggered incident history", zap.String("rule", r.Name),
zap.String("escalation", escalation.DisplayName()), zap.Error(err),
zap.Object("escalation", escalation), zap.Error(err),
)

return errors.New("failed to insert escalation triggered incident history")
Expand Down Expand Up @@ -602,6 +605,7 @@ func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx,
i.logger.Infof("Contact %q role changed from %s to %s", contact.String(), oldRole.String(), newRole.String())

hr := &HistoryRow{
IncidentID: i.Id,
Key: recipientKey,
EventID: utils.ToDBInt(ev.ID),
Type: RecipientRoleChanged,
Expand All @@ -611,8 +615,7 @@ func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx,
Message: utils.ToDBString(ev.Message),
}

_, err := i.AddHistory(ctx, tx, hr, false)
if err != nil {
if err := hr.Sync(ctx, i.db, tx); err != nil {
i.logger.Errorw(
"Failed to add recipient role changed history", zap.String("recipient", contact.String()), zap.Error(err),
)
Expand All @@ -623,7 +626,7 @@ func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx,
cr := &ContactRow{IncidentID: hr.IncidentID, Key: recipientKey, Role: newRole}

stmt, _ := i.db.BuildUpsertStmt(cr)
_, err = tx.NamedExecContext(ctx, stmt, cr)
_, err := tx.NamedExecContext(ctx, stmt, cr)
if err != nil {
i.logger.Errorw(
"Failed to upsert incident contact", zap.String("contact", contact.String()), zap.Error(err),
Expand All @@ -636,16 +639,16 @@ func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx,
}

// getRecipientsChannel returns all the configured channels of the current incident and escalation recipients.
func (i *Incident) getRecipientsChannel(t time.Time) contactChannels {
contactChs := make(contactChannels)
func (i *Incident) getRecipientsChannel(t time.Time) rule.ContactChannels {
contactChs := make(rule.ContactChannels)
// Load all escalations recipients channels
for escalationID := range i.EscalationState {
escalation := i.runtimeConfig.GetRuleEscalation(escalationID)
if escalation == nil {
continue
}

contactChs.loadEscalationRecipientsChannel(escalation, i, t)
contactChs.LoadFromEscalationRecipients(escalation, t, i.isRecipientNotifiable)
}

// Check whether all the incident recipients do have an appropriate contact channel configured.
Expand Down Expand Up @@ -695,6 +698,18 @@ func (i *Incident) restoreRecipients(ctx context.Context) error {
return nil
}

// isRecipientNotifiable checks whether the given recipient should be notified about the current incident.
// If the specified recipient has not yet been notified of this incident, it always returns false.
// Otherwise, the recipient role is forwarded to IsNotifiable and may or may not return true.
func (i *Incident) isRecipientNotifiable(key recipient.Key) bool {
state := i.Recipients[key]
if state == nil {
return false
}

return i.IsNotifiable(state.Role)
}

type EscalationState struct {
IncidentID int64 `db:"incident_id"`
RuleEscalationID int64 `db:"rule_escalation_id"`
Expand All @@ -710,32 +725,6 @@ type RecipientState struct {
Role ContactRole
}

// contactChannels stores a set of channel IDs for each set of individual contacts.
type contactChannels map[*recipient.Contact]map[int64]bool

// loadEscalationRecipientsChannel loads all the configured channel of all the provided escalation recipients.
func (rct contactChannels) loadEscalationRecipientsChannel(escalation *rule.Escalation, i *Incident, t time.Time) {
for _, escalationRecipient := range escalation.Recipients {
state := i.Recipients[escalationRecipient.Key]
if state == nil {
continue
}

if i.IsNotifiable(state.Role) {
for _, c := range escalationRecipient.Recipient.GetContactsAt(t) {
if rct[c] == nil {
rct[c] = make(map[int64]bool)
}
if escalationRecipient.ChannelID.Valid {
rct[c][escalationRecipient.ChannelID.Int64] = true
} else {
rct[c][c.DefaultChannelID] = true
}
}
}
}
}

var (
_ contracts.Incident = (*Incident)(nil)
)
Loading
Loading