Skip to content

Commit

Permalink
notify subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
pixelsoccupied committed Jan 2, 2025
1 parent 552b62d commit 8e76100
Show file tree
Hide file tree
Showing 13 changed files with 557 additions and 221 deletions.
198 changes: 99 additions & 99 deletions internal/service/alarms/api/generated/alarms.generated.go

Large diffs are not rendered by default.

15 changes: 8 additions & 7 deletions internal/service/alarms/api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ servers:
tags:
- name: alarms
description: Alarm management
- name: service configuration
- name: Alarm Service Configuration
- name: serviceConfiguration
description: Alarm Service Configuration
- name: subscriptions
description: Alarm subscription management
- name: probableCauses
description: Probable cause information
Expand Down Expand Up @@ -227,7 +228,7 @@ paths:
operationId: GetServiceConfiguration
summary: Retrieve the alarm service configuration
tags:
- service configuration
- serviceConfiguration
responses:
'200':
description: Successful response
Expand All @@ -252,7 +253,7 @@ paths:
operationId: UpdateAlarmServiceConfiguration
summary: Modify all fields of the Alarm Service Configuration.
tags:
- service configuration
- serviceConfiguration
requestBody:
required: true
content:
Expand Down Expand Up @@ -289,7 +290,7 @@ paths:
operationId: PatchAlarmServiceConfiguration
summary: Modify individual fields of the Alarm Service Configuration.
tags:
- service configuration
- serviceConfiguration
requestBody:
required: true
content:
Expand Down Expand Up @@ -709,13 +710,13 @@ components:
example: 16d5fc54-cee0-4532-9826-2369f8240e1b
filter:
type: string
enum: [ new, change, clear, acknowledge ]
enum: [ NEW, CHANGE, CLEAR, ACKNOWLEDGE ]
description: |
Criteria for events which do not need to be reported or will be filtered by the subscription
notification service. Therefore, if a filter is not provided then all events are reported.
It can be filtered by criteria based on the type of notification of fields of the
AlarmEventRecord.
example: (eq,perceivedSeverity,0)
example: NEW
callback:
type: string
format: uri
Expand Down
118 changes: 114 additions & 4 deletions internal/service/alarms/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ import (
"log/slog"
"net/http"
"strings"
"sync"
"time"

"github.com/openshift-kni/oran-o2ims/internal/service/common/notifier"

"github.com/google/uuid"

api "github.com/openshift-kni/oran-o2ims/internal/service/alarms/api/generated"
Expand Down Expand Up @@ -40,6 +43,10 @@ type AlarmsServer struct {
AlarmsRepository *repo.AlarmsRepository
// ClusterServer contains the cluster server client and fetched objects
ClusterServer *clusterserver.ClusterServer
// HttpClient HTTP client capable of acquiring an OAuth token used to authorize client requests
HttpClient *http.Client
// Logger used when global logger may not be available
Logger *slog.Logger
}

// AlarmsServer implements StrictServerInterface. This ensures that we've conformed to the `StrictServerInterface` with a compile-time check
Expand All @@ -50,7 +57,7 @@ var baseURL = "/o2ims-infrastructureMonitoring/v1"
var currentVersion = "1.0.0"

// GetAllVersions receives the API request to this endpoint, executes the request, and responds appropriately
func (r *AlarmsServer) GetAllVersions(ctx context.Context, request api.GetAllVersionsRequestObject) (api.GetAllVersionsResponseObject, error) {
func (a *AlarmsServer) GetAllVersions(ctx context.Context, request api.GetAllVersionsRequestObject) (api.GetAllVersionsResponseObject, error) {
// We currently only support a single version
versions := []common.APIVersion{
{
Expand All @@ -65,7 +72,7 @@ func (r *AlarmsServer) GetAllVersions(ctx context.Context, request api.GetAllVer
}

// GetMinorVersions receives the API request to this endpoint, executes the request, and responds appropriately
func (r *AlarmsServer) GetMinorVersions(ctx context.Context, request api.GetMinorVersionsRequestObject) (api.GetMinorVersionsResponseObject, error) {
func (a *AlarmsServer) GetMinorVersions(ctx context.Context, request api.GetMinorVersionsRequestObject) (api.GetMinorVersionsResponseObject, error) {
// We currently only support a single version
versions := []common.APIVersion{
{
Expand Down Expand Up @@ -137,6 +144,7 @@ func (a *AlarmsServer) CreateSubscription(ctx context.Context, request api.Creat
}), nil
}

slog.Info("Successfully created Alarm Subscription", "record", record)
return api.CreateSubscription201JSONResponse(models.ConvertSubscriptionModelToApi(*record)), nil
}

Expand Down Expand Up @@ -479,6 +487,8 @@ func (a *AlarmsServer) GetProbableCause(ctx context.Context, request api.GetProb
// AmNotification handles an API request coming from AlertManager with CaaS alerts. This api is used internally.
// Note: the errors returned can also be view under alertmanager pod logs but also logging here for convenience
func (a *AlarmsServer) AmNotification(ctx context.Context, request api.AmNotificationRequestObject) (api.AmNotificationResponseObject, error) {
// TODO: AM auto retries if it receives 5xx error code. That means any error, even if permanent (e.g postgres syntax or host not found), will be processed the same way. Once we have a better retry mechanism for pg, update all 5xx to 4xx as needed.

// Audit the table with full list of alerts in the current payload. If missing set them to resolve
if err := a.AlarmsRepository.ResolveNotificationIfNotInCurrent(ctx, request.Body); err != nil {
msg := "failed to resolve notification that are not present"
Expand All @@ -504,12 +514,112 @@ func (a *AlarmsServer) AmNotification(ctx context.Context, request api.AmNotific
return nil, fmt.Errorf("%s: %w", msg, err)
}

//TODO: Get subscriber
// Get all subscriptions
subscriptions, err := a.AlarmsRepository.GetAlarmSubscriptions(ctx)
if err != nil {
msg := "failed to get all subscriptions"
slog.Error(msg, "error", err)
return nil, fmt.Errorf("%s: %w", msg, err)
}
// No subscription, noop
if len(subscriptions) == 0 {
slog.Info("No subscriptions to notify")
return api.AmNotification200Response{}, nil
}

// Notify subscriber
slog.Info("Processing subscriptions")
if err := a.processSubscriptions(ctx, subscriptions); err != nil {
msg := "failed to process at least one subscription"
slog.Error(msg, "error", err)
return nil, fmt.Errorf("%s: %w", msg, err)
}

//TODO: Notify subscriber
slog.Info("Successfully handled all alertmanager alerts and notified subscriptions")
return api.AmNotification200Response{}, nil
}

// Max number of subscriptions that can be notified at the same time
const subsMaxConcurrent = 5

// processSubscriptions Process a list of oran subscription concurrently
func (a *AlarmsServer) processSubscriptions(ctx context.Context, subscriptions []models.AlarmSubscription) error {
maxConcurrent := subsMaxConcurrent
sem := make(chan struct{}, maxConcurrent)
errChan := make(chan struct {
err error
subID string
}, len(subscriptions))

var wg sync.WaitGroup

for _, subscription := range subscriptions {
wg.Add(1)
sem <- struct{}{} // Acquire semaphore
go func(sub models.AlarmSubscription) {
defer wg.Done()
defer func() { <-sem }() // Release semaphore

// Notify
if err := a.notifySubscription(ctx, sub); err != nil {
errChan <- struct {
err error
subID string
}{err, sub.SubscriptionID.String()}
}
}(subscription)
}

go func() {
wg.Wait()
close(errChan)
}()

var failedNotifications []string
errCount := 0
for e := range errChan {
errCount++
failedNotifications = append(failedNotifications, fmt.Sprintf("subscription %s: %v", e.subID, e.err))
}

if errCount > 0 {
return fmt.Errorf("failed to notify %d subscriptions: %s", errCount, strings.Join(failedNotifications, "::"))
}

slog.Info("Successfully processed all subscriptions", "count", len(subscriptions))
return nil
}

// notifySubscription For a given subscription, collect all the latest alarms (with possible filter) and update its event cursor
func (a *AlarmsServer) notifySubscription(ctx context.Context, subscription models.AlarmSubscription) error {
subAlarms, err := a.AlarmsRepository.GetAlarmsForSubscription(ctx, subscription)
if err != nil {
return fmt.Errorf("get alarms failed for %s: %w", subscription.SubscriptionID, err)
}

if len(subAlarms) > 0 {
// Call the subscription url with notification payload
notifyPayload := notifier.Notification{
NotificationID: subscription.SubscriptionID,
Payload: models.ConvertAlarmEventRecordModelsToAlarmEventNotifications(subAlarms, subscription, a.GlobalCloudID),
}
if err := notifier.CallUrl(ctx, a.Logger, a.HttpClient, subscription.Callback, notifyPayload); err != nil {
return fmt.Errorf("call url failed for %s: %w", subscription.SubscriptionID, err)
}

// Update the new event cursor with the latest
latestSequence := subAlarms[len(subAlarms)-1].AlarmSequenceNumber
subscription.EventCursor = subAlarms[0].AlarmSequenceNumber
if err := a.AlarmsRepository.UpdateSubscriptionEventCursor(ctx, subscription, latestSequence); err != nil {
return fmt.Errorf("update subscription failed for %s: %w", subscription.SubscriptionID, err)
}

slog.Info("Subscription have been notified", "subscription", subscription.SubscriptionID)
}

return nil
}

func (a *AlarmsServer) HwNotification(ctx context.Context, request api.HwNotificationRequestObject) (api.HwNotificationResponseObject, error) {
// TODO implement me
return nil, fmt.Errorf("not implemented")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ func Setup(ctx context.Context, cl client.Client) error {
func ConvertAmToAlarmEventRecordModels(am *api.AlertmanagerNotification, aDefinitionRecords []models.AlarmDefinition, clusterIDToObjectTypeID map[uuid.UUID]uuid.UUID) []models.AlarmEventRecord {
records := make([]models.AlarmEventRecord, 0, len(am.Alerts))
for _, alert := range am.Alerts {
slog.Info("Converting Alertmanager alert", "alert name", GetAlertName(*alert.Labels))
record := models.AlarmEventRecord{
AlarmRaisedTime: *alert.StartsAt,
AlarmClearedTime: setTime(*alert.EndsAt),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
-- Drop the trigger for updating timestamp updated_at in alarm_event_record
DROP TRIGGER IF EXISTS track_alarm_event_record_change_time ON alarm_event_record;
-- Drop the trigger for managing events
DROP TRIGGER IF EXISTS manage_alarm_event ON alarm_event_record;

-- Drop the function for updating timestamp updated_at in alarm_event_record
DROP FUNCTION IF EXISTS track_alarm_event_record_change_time;
-- Drop the function for managing events
DROP FUNCTION IF EXISTS manage_alarm_event;

-- Remove the default for alarm_sequence_number that uses alarm_sequence_seq
ALTER TABLE alarm_event_record ALTER COLUMN alarm_sequence_number DROP DEFAULT;

-- Drop the trigger for updating alarm_sequence_number in alarm_event_record
DROP TRIGGER IF EXISTS update_alarm_event_sequence ON alarm_event_record;

-- Drop the trigger function for updating alarm_sequence_number
DROP FUNCTION IF EXISTS update_alarm_event_sequence;

-- Drop the sequence for alarm_sequence_number
DROP SEQUENCE IF EXISTS alarm_sequence_seq;

Expand Down
Loading

0 comments on commit 8e76100

Please sign in to comment.