Skip to content

Commit

Permalink
notify alarms subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
pixelsoccupied committed Jan 3, 2025
1 parent 510e791 commit 7c09fe5
Show file tree
Hide file tree
Showing 15 changed files with 688 additions and 227 deletions.
6 changes: 5 additions & 1 deletion internal/controllers/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sort"
"strconv"
"strings"
"time"

"github.com/google/uuid"
"golang.org/x/oauth2"
Expand Down Expand Up @@ -674,7 +675,10 @@ func SetupOAuthClient(ctx context.Context, config OAuthClientConfig) (*http.Clie

c := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsConfig}}
TLSClientConfig: tlsConfig,
},
Timeout: 30 * time.Second,
}

if config.ClientId != "" {
config := clientcredentials.Config{
Expand Down
84 changes: 84 additions & 0 deletions internal/service/alarms/DEVELOPING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Tools to test the alarms server

- A minimal server that can be called to accept Subscriber notification

```go
package main

import (
"encoding/json"
"fmt"
"log"
"net/http"
)

func main() {
http.HandleFunc("/notify", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}

// Decode to interface{} to accept any JSON structure
var payload interface{}
if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

// Pretty print the received payload
prettyJSON, _ := json.MarshalIndent(payload, "", " ")
log.Printf("Received notification:\n%s\n", string(prettyJSON))

w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "Notification received successfully")
})

log.Println("Starting server on :8080")
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatal("Server failed to start:", err)
}
}
```

- A test PromRule that emits alert. Apply to Hub to Spoke. ACM auto applies cluster ID.

```yaml
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
name: fake-alerting-rules
namespace: open-cluster-management-addon-observability
spec:
groups:
- name: ./example.rules
rules:
- alert: ExampleAlert
expr: vector(1)
labels:
severity: major
```
- A test Thanos rule that is accepted by ACM on Hub and is propagated everywhere.
Note this is the type of rule that has missing cluster ID.
```yaml
kind: ConfigMap
apiVersion: v1
metadata:
name: thanos-ruler-custom-rules
namespace: open-cluster-management-observability
data:
custom_rules.yaml: |
groups:
- name: node-health
rules:
- alert: NodeOutOfMemoryFakeButFromAllNodes
expr: instance:node_memory_utilisation:ratio * 100 > 0
for: 1m
labels:
instance: "{{ $labels.instance }}"
cluster: "{{ $labels.cluster }}"
clusterID: "{{ $labels.clusterID }}"
severity: warning
```
198 changes: 99 additions & 99 deletions internal/service/alarms/api/generated/alarms.generated.go

Large diffs are not rendered by default.

15 changes: 8 additions & 7 deletions internal/service/alarms/api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ servers:
tags:
- name: alarms
description: Alarm management
- name: service configuration
- name: Alarm Service Configuration
- name: serviceConfiguration
description: Alarm Service Configuration
- name: subscriptions
description: Alarm subscription management
- name: probableCauses
description: Probable cause information
Expand Down Expand Up @@ -227,7 +228,7 @@ paths:
operationId: GetServiceConfiguration
summary: Retrieve the alarm service configuration
tags:
- service configuration
- serviceConfiguration
responses:
'200':
description: Successful response
Expand All @@ -252,7 +253,7 @@ paths:
operationId: UpdateAlarmServiceConfiguration
summary: Modify all fields of the Alarm Service Configuration.
tags:
- service configuration
- serviceConfiguration
requestBody:
required: true
content:
Expand Down Expand Up @@ -289,7 +290,7 @@ paths:
operationId: PatchAlarmServiceConfiguration
summary: Modify individual fields of the Alarm Service Configuration.
tags:
- service configuration
- serviceConfiguration
requestBody:
required: true
content:
Expand Down Expand Up @@ -709,13 +710,13 @@ components:
example: 16d5fc54-cee0-4532-9826-2369f8240e1b
filter:
type: string
enum: [ new, change, clear, acknowledge ]
enum: [ NEW, CHANGE, CLEAR, ACKNOWLEDGE ]
description: |
Criteria for events which do not need to be reported or will be filtered by the subscription
notification service. Therefore, if a filter is not provided then all events are reported.
It can be filtered by criteria based on the type of notification of fields of the
AlarmEventRecord.
example: (eq,perceivedSeverity,0)
example: NEW
callback:
type: string
format: uri
Expand Down
128 changes: 124 additions & 4 deletions internal/service/alarms/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ import (
"log/slog"
"net/http"
"strings"
"sync"
"time"

"github.com/openshift-kni/oran-o2ims/internal/service/common/notifier"

"github.com/google/uuid"

api "github.com/openshift-kni/oran-o2ims/internal/service/alarms/api/generated"
Expand Down Expand Up @@ -40,6 +43,12 @@ type AlarmsServer struct {
AlarmsRepository *repo.AlarmsRepository
// ClusterServer contains the cluster server client and fetched objects
ClusterServer *clusterserver.ClusterServer
// HttpClient HTTP client capable of acquiring an OAuth token used to authorize client requests
HttpClient *http.Client
// Logger used when global logger may not be available
Logger *slog.Logger
// Wg to allow alarm server level background tasks to finish before graceful exit
Wg sync.WaitGroup
}

// AlarmsServer implements StrictServerInterface. This ensures that we've conformed to the `StrictServerInterface` with a compile-time check
Expand All @@ -50,7 +59,7 @@ var baseURL = "/o2ims-infrastructureMonitoring/v1"
var currentVersion = "1.0.0"

// GetAllVersions receives the API request to this endpoint, executes the request, and responds appropriately
func (r *AlarmsServer) GetAllVersions(ctx context.Context, request api.GetAllVersionsRequestObject) (api.GetAllVersionsResponseObject, error) {
func (a *AlarmsServer) GetAllVersions(ctx context.Context, request api.GetAllVersionsRequestObject) (api.GetAllVersionsResponseObject, error) {
// We currently only support a single version
versions := []common.APIVersion{
{
Expand All @@ -65,7 +74,7 @@ func (r *AlarmsServer) GetAllVersions(ctx context.Context, request api.GetAllVer
}

// GetMinorVersions receives the API request to this endpoint, executes the request, and responds appropriately
func (r *AlarmsServer) GetMinorVersions(ctx context.Context, request api.GetMinorVersionsRequestObject) (api.GetMinorVersionsResponseObject, error) {
func (a *AlarmsServer) GetMinorVersions(ctx context.Context, request api.GetMinorVersionsRequestObject) (api.GetMinorVersionsResponseObject, error) {
// We currently only support a single version
versions := []common.APIVersion{
{
Expand Down Expand Up @@ -137,6 +146,7 @@ func (a *AlarmsServer) CreateSubscription(ctx context.Context, request api.Creat
}), nil
}

slog.Info("Successfully created Alarm Subscription", "record", record)
return api.CreateSubscription201JSONResponse(models.ConvertSubscriptionModelToApi(*record)), nil
}

Expand Down Expand Up @@ -477,6 +487,8 @@ func (a *AlarmsServer) GetProbableCause(ctx context.Context, request api.GetProb
// AmNotification handles an API request coming from AlertManager with CaaS alerts. This api is used internally.
// Note: the errors returned can also be view under alertmanager pod logs but also logging here for convenience
func (a *AlarmsServer) AmNotification(ctx context.Context, request api.AmNotificationRequestObject) (api.AmNotificationResponseObject, error) {
// TODO: AM auto retries if it receives 5xx error code. That means any error, even if permanent (e.g postgres syntax), will be processed the same way. Once we have a better retry mechanism for pg, update all 5xx to 4xx as needed.

// Audit the table with full list of alerts in the current payload. If missing set them to resolve
if err := a.AlarmsRepository.ResolveNotificationIfNotInCurrent(ctx, request.Body); err != nil {
msg := "failed to resolve notification that are not present"
Expand All @@ -502,12 +514,120 @@ func (a *AlarmsServer) AmNotification(ctx context.Context, request api.AmNotific
return nil, fmt.Errorf("%s: %w", msg, err)
}

//TODO: Get subscriber
// Return 200 before subscription processing to free up AM conn
a.Wg.Add(1)
go func() {
start := time.Now()
defer func() {
slog.Info("Subscription processing completed", "duration", time.Since(start).String())
}()
defer a.Wg.Done()
// Create new background context for subscription processing
subCtx := context.Background()

subscriptions, err := a.AlarmsRepository.GetAlarmSubscriptions(subCtx)
if err != nil {
slog.Error("failed to get all subscriptions", "error", err)
return
}
if len(subscriptions) == 0 {
slog.Info("No subscriptions to notify")
return
}

slog.Info("Processing subscriptions")
if err := a.processSubscriptions(subCtx, subscriptions); err != nil {
slog.Error("failed to process at least one subscription", "error", err)
return
}
}()

//TODO: Notify subscriber
slog.Info("Successfully handled all alertmanager alerts")
return api.AmNotification200Response{}, nil
}

// Max number of subscriptions that can be notified at the same time
const subsMaxConcurrent = 5

// processSubscriptions Process a list of oran subscription concurrently
func (a *AlarmsServer) processSubscriptions(ctx context.Context, subscriptions []models.AlarmSubscription) error {
var (
sem = make(chan struct{}, subsMaxConcurrent)
errChan = make(chan struct {
err error
subID string
}, len(subscriptions))
)

var wg sync.WaitGroup
for _, subscription := range subscriptions {
wg.Add(1)
sem <- struct{}{} // Acquire semaphore
go func(sub models.AlarmSubscription) {
defer wg.Done()
defer func() { <-sem }() // Release semaphore

// Notify
if err := a.notifySubscription(ctx, sub); err != nil {
errChan <- struct {
err error
subID string
}{err, sub.SubscriptionID.String()}
}
}(subscription)
}

go func() {
wg.Wait()
close(errChan)
}()

var failedNotifications []string
errCount := 0
for e := range errChan {
errCount++
failedNotifications = append(failedNotifications, fmt.Sprintf("subscription %s: %v", e.subID, e.err))
}

if errCount > 0 {
return fmt.Errorf("failed to notify %d subscriptions: %s", errCount, strings.Join(failedNotifications, "::"))
}

slog.Info("Successfully processed all subscriptions", "count", len(subscriptions))
return nil
}

// notifySubscription For a given subscription, collect all the latest alarms (with possible filter) and update its event cursor
func (a *AlarmsServer) notifySubscription(ctx context.Context, subscription models.AlarmSubscription) error {
subAlarms, err := a.AlarmsRepository.GetAlarmsForSubscription(ctx, subscription)
if err != nil {
return fmt.Errorf("get alarms failed for %s: %w", subscription.SubscriptionID, err)
}

if len(subAlarms) > 0 {
// Call the subscription url with notification payload
notifyPayload := notifier.Notification{
NotificationID: subscription.SubscriptionID,
Payload: models.ConvertAlarmEventRecordModelsToAlarmEventNotifications(subAlarms, subscription, a.GlobalCloudID),
}
if err := notifier.CallUrl(ctx, a.Logger, a.HttpClient, subscription.Callback, notifyPayload); err != nil {
return fmt.Errorf("call url failed for %s: %w", subscription.SubscriptionID, err)
}

// Update the new event cursor with the latest
latestSequence := subAlarms[len(subAlarms)-1].AlarmSequenceNumber
if err := a.AlarmsRepository.UpdateSubscriptionEventCursor(ctx, subscription, latestSequence); err != nil {
return fmt.Errorf("update subscription failed for %s: %w", subscription.SubscriptionID, err)
}

slog.Info("Subscription have been notified", "subscription", subscription.SubscriptionID)
} else {
slog.Info("No alarms to notify", "subscription", subscription.SubscriptionID)
}

return nil
}

func (a *AlarmsServer) HwNotification(ctx context.Context, request api.HwNotificationRequestObject) (api.HwNotificationResponseObject, error) {
// TODO implement me
return nil, fmt.Errorf("not implemented")
Expand Down
12 changes: 10 additions & 2 deletions internal/service/alarms/internal/alertmanager/alertmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ func Setup(ctx context.Context, cl client.Client) error {
func ConvertAmToAlarmEventRecordModels(am *api.AlertmanagerNotification, aDefinitionRecords []models.AlarmDefinition, clusterIDToObjectTypeID map[uuid.UUID]uuid.UUID) []models.AlarmEventRecord {
records := make([]models.AlarmEventRecord, 0, len(am.Alerts))
for _, alert := range am.Alerts {
slog.Info("Converting Alertmanager alert", "alert name", GetAlertName(*alert.Labels))
record := models.AlarmEventRecord{
AlarmRaisedTime: *alert.StartsAt,
AlarmClearedTime: setTime(*alert.EndsAt),
Expand Down Expand Up @@ -184,6 +183,15 @@ func severityToPerceivedSeverity(input string) api.PerceivedSeverity {

// getExtensions extract oran extension from alert. For caas it's basically the labels and annotations from payload.
func getExtensions(labels, annotations map[string]string) map[string]string {
maps.Copy(labels, annotations)
if labels == nil {
labels = make(map[string]string)
}
if annotations == nil {
annotations = make(map[string]string)
}

result := make(map[string]string)
maps.Copy(result, labels)
maps.Copy(result, annotations)
return labels
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
-- Drop the trigger for updating timestamp updated_at in alarm_event_record
DROP TRIGGER IF EXISTS track_alarm_event_record_change_time ON alarm_event_record;
-- Drop the trigger for managing events
DROP TRIGGER IF EXISTS manage_alarm_event ON alarm_event_record;

-- Drop the function for updating timestamp updated_at in alarm_event_record
DROP FUNCTION IF EXISTS track_alarm_event_record_change_time;
-- Drop the function for managing events
DROP FUNCTION IF EXISTS manage_alarm_event;

-- Remove the default for alarm_sequence_number that uses alarm_sequence_seq
ALTER TABLE alarm_event_record ALTER COLUMN alarm_sequence_number DROP DEFAULT;

-- Drop the trigger for updating alarm_sequence_number in alarm_event_record
DROP TRIGGER IF EXISTS update_alarm_event_sequence ON alarm_event_record;

-- Drop the trigger function for updating alarm_sequence_number
DROP FUNCTION IF EXISTS update_alarm_event_sequence;

-- Drop the sequence for alarm_sequence_number
DROP SEQUENCE IF EXISTS alarm_sequence_seq;

Expand Down
Loading

0 comments on commit 7c09fe5

Please sign in to comment.