Skip to content

Commit

Permalink
Implement sync mechanism for alarms collection
Browse files Browse the repository at this point in the history
This implements a polling mechanism to sync the
alarm dictionaries and definitions periodically.
It also handles the infrastructure server at a
more generic level

Signed-off-by: Marcelo Guerrero <[email protected]>
  • Loading branch information
mlguerrero12 committed Jan 10, 2025
1 parent 912f865 commit d3480e7
Show file tree
Hide file tree
Showing 13 changed files with 300 additions and 140 deletions.
23 changes: 16 additions & 7 deletions internal/service/alarms/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@ import (
"sync"
"time"

"github.com/openshift-kni/oran-o2ims/internal/service/common/notifier"

"github.com/google/uuid"

api "github.com/openshift-kni/oran-o2ims/internal/service/alarms/api/generated"
"github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/alertmanager"
"github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/clusterserver"
"github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/db/models"
"github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/db/repo"
"github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/infrastructure"
"github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/infrastructure/clusterserver"
api2 "github.com/openshift-kni/oran-o2ims/internal/service/common/api"
common "github.com/openshift-kni/oran-o2ims/internal/service/common/api/generated"
"github.com/openshift-kni/oran-o2ims/internal/service/common/notifier"
"github.com/openshift-kni/oran-o2ims/internal/service/common/utils"
apiresources "github.com/openshift-kni/oran-o2ims/internal/service/resources/api/generated"
)
Expand All @@ -42,8 +42,8 @@ type AlarmsServer struct {
GlobalCloudID uuid.UUID
// AlarmsRepository is the repository for the alarms
AlarmsRepository *repo.AlarmsRepository
// ClusterServer contains the cluster server client and fetched objects
ClusterServer *clusterserver.ClusterServer
// Infrastructure clients
Infrastructure *infrastructure.Infrastructure
// Wg to allow alarm server level background tasks to finish before graceful exit
Wg sync.WaitGroup
// NotificationProvider to handle new events
Expand Down Expand Up @@ -508,16 +508,25 @@ func (a *AlarmsServer) AmNotification(ctx context.Context, request api.AmNotific
return nil, fmt.Errorf("%s: %w", msg, err)
}

// Get NodeCluster NodeClusterType mapping
var clusterIDToNodeClusterTypeID map[uuid.UUID]uuid.UUID
for i := range a.Infrastructure.Clients {
if a.Infrastructure.Clients[i].Name() == clusterserver.Name {
clusterIDToNodeClusterTypeID = a.Infrastructure.Clients[i].(*clusterserver.ClusterServer).GetClusterIDToResourceTypeID()
break
}
}

// Get the definition data based on current set of Alert names and managed cluster ID
alarmDefinitions, err := a.AlarmsRepository.GetAlarmDefinitions(ctx, request.Body, a.ClusterServer.ClusterIDToResourceTypeID)
alarmDefinitions, err := a.AlarmsRepository.GetAlarmDefinitions(ctx, request.Body, clusterIDToNodeClusterTypeID)
if err != nil {
msg := "failed to get AlarmDefinitions"
slog.Error(msg, "error", err)
return nil, fmt.Errorf("%s: %w", msg, err)
}

// Combine possible definitions with events
aerModels := alertmanager.ConvertAmToAlarmEventRecordModels(request.Body, alarmDefinitions, a.ClusterServer.ClusterIDToResourceTypeID)
aerModels := alertmanager.ConvertAmToAlarmEventRecordModels(request.Body, alarmDefinitions, clusterIDToNodeClusterTypeID)

// Insert and update AlarmEventRecord
if err := a.AlarmsRepository.UpsertAlarmEventRecord(ctx, aerModels); err != nil {
Expand Down
14 changes: 11 additions & 3 deletions internal/service/alarms/internal/alertmanager/alertmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/openshift-kni/oran-o2ims/internal/controllers/utils"
"github.com/openshift-kni/oran-o2ims/internal/service/common/clients/k8s"
)

const (
Expand All @@ -31,11 +32,18 @@ const templateName = "alertmanager.yaml"
//go:embed alertmanager.yaml.template
var alertManagerConfig []byte

var getHubClient = k8s.NewClientForHub

// Setup updates the alertmanager config secret with the new configuration
func Setup(ctx context.Context, cl client.Client) error {
func Setup(ctx context.Context) error {
hubClient, err := getHubClient()
if err != nil {
return fmt.Errorf("error creating client for hub: %w", err)
}

// ACM recreates the secret when it is deleted, so we can safely assume it exists
var secret corev1.Secret
err := cl.Get(ctx, client.ObjectKey{Namespace: namespace, Name: secretName}, &secret)
err = hubClient.Get(ctx, client.ObjectKey{Namespace: namespace, Name: secretName}, &secret)
if err != nil {
return fmt.Errorf("failed to get secret %s/%s: %w", namespace, secretName, err)
}
Expand All @@ -61,7 +69,7 @@ func Setup(ctx context.Context, cl client.Client) error {
}

secret.Data[secretKey] = rendered.Bytes()
err = cl.Update(ctx, &secret)
err = hubClient.Update(ctx, &secret)
if err != nil {
return fmt.Errorf("failed to update secret %s/%s: %w", namespace, secretName, err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,19 @@ var _ = Describe("Alertmanager", func() {
ctx context.Context
scheme *runtime.Scheme
c client.Client

temp func() (client.Client, error)
)

BeforeEach(func() {
scheme = runtime.NewScheme()
_ = corev1.AddToScheme(scheme)

temp = getHubClient
getHubClient = func() (client.Client, error) {
return c, nil
}

ctx = context.Background()
c = fake.NewClientBuilder().WithScheme(scheme).Build()

Expand All @@ -45,8 +52,12 @@ var _ = Describe("Alertmanager", func() {
Expect(err).NotTo(HaveOccurred())
})

AfterEach(func() {
getHubClient = temp
})

It("verifies that the alertmanager.yaml key is populated", func() {
err := Setup(ctx, c)
err := Setup(ctx)
Expect(err).ToNot(HaveOccurred())

secret := &corev1.Secret{}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,46 +1,45 @@
package dictionary_definition
package dictionary_collector

import (
"context"
"encoding/json"
"fmt"
"golang.org/x/sync/errgroup"
"log/slog"
"sync"
"time"

"github.com/google/uuid"
"github.com/openshift-kni/oran-o2ims/internal/controllers/utils"
"github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/clusterserver"
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
"golang.org/x/sync/errgroup"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
clusterv1 "open-cluster-management.io/api/cluster/v1"
crclient "sigs.k8s.io/controller-runtime/pkg/client"

"github.com/openshift-kni/oran-o2ims/internal/controllers/utils"
"github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/db/models"
"github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/db/repo"
"github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/infrastructure/clusterserver"
"github.com/openshift-kni/oran-o2ims/internal/service/common/clients"
"github.com/openshift-kni/oran-o2ims/internal/service/common/clients/k8s"
)

type AlarmDictionaryDefinition struct {
Client crclient.Client
type NodeClusterTypeDictionaryService struct {
AlarmsRepository *repo.AlarmsRepository
RulesMap map[uuid.UUID][]monitoringv1.Rule
HubClient crclient.Client

RulesMap map[uuid.UUID][]monitoringv1.Rule
}

func New(client crclient.Client, ar *repo.AlarmsRepository) *AlarmDictionaryDefinition {
return &AlarmDictionaryDefinition{
Client: client,
AlarmsRepository: ar,
RulesMap: make(map[uuid.UUID][]monitoringv1.Rule),
// loadNodeClusterTypeDictionaries loads the alarm dictionaries for the given node cluster types
func (r *NodeClusterTypeDictionaryService) loadNodeClusterTypeDictionaries(ctx context.Context, nodeClusterTypes []clusterserver.NodeClusterType) {
slog.Info("loading alarm dictionaries for node cluster types")

if len(nodeClusterTypes) == 0 {
slog.Info("no node cluster types to process")
return
}
}

// Load loads the alarm dictionary and definition
func (r *AlarmDictionaryDefinition) Load(ctx context.Context, nodeClusterTypes *[]clusterserver.NodeClusterType) error {
slog.Info("Loading alarm dictionaries and definitions")
type result struct {
NodeClusterTypeID uuid.UUID
rules []monitoringv1.Rule
Expand All @@ -49,7 +48,7 @@ func (r *AlarmDictionaryDefinition) Load(ctx context.Context, nodeClusterTypes *

wg := sync.WaitGroup{}
resultChannel := make(chan result)
for _, nct := range *nodeClusterTypes {
for _, nct := range nodeClusterTypes {
wg.Add(1)
go func(nodeClusterType clusterserver.NodeClusterType) {
var err error
Expand Down Expand Up @@ -96,16 +95,16 @@ func (r *AlarmDictionaryDefinition) Load(ctx context.Context, nodeClusterTypes *
r.RulesMap[res.NodeClusterTypeID] = res.rules
slog.Info("loaded rules for node cluster type", "NodeClusterType ID", res.NodeClusterTypeID, "rules count", len(res.rules))
}
err := r.syncDictionaries(ctx, *nodeClusterTypes)

err := r.syncDictionaries(ctx, nodeClusterTypes)
if err != nil {
return fmt.Errorf("failed to sync dictionary and definitions: %w", err)
slog.Error("failed to sync dictionary and definitions", "error", err)
}

return nil
}

func (r *AlarmDictionaryDefinition) processHub(ctx context.Context) ([]monitoringv1.Rule, error) {
rules, err := r.getRules(ctx, r.Client)
// processHub processes the hub cluster
func (r *NodeClusterTypeDictionaryService) processHub(ctx context.Context) ([]monitoringv1.Rule, error) {
rules, err := r.getRules(ctx, r.HubClient)
if err != nil {
return nil, err
}
Expand All @@ -114,14 +113,17 @@ func (r *AlarmDictionaryDefinition) processHub(ctx context.Context) ([]monitorin
return rules, nil
}

// Needed for testing
var getClientForCluster = k8s.NewClientForCluster

// processManagedCluster processes a managed cluster
func (r *AlarmDictionaryDefinition) processManagedCluster(ctx context.Context, version string) ([]monitoringv1.Rule, error) {
func (r *NodeClusterTypeDictionaryService) processManagedCluster(ctx context.Context, version string) ([]monitoringv1.Rule, error) {
cluster, err := r.getManagedCluster(ctx, version)
if err != nil {
return nil, err
}

cl, err := getClientForCluster(ctx, r.Client, cluster.Name)
cl, err := getClientForCluster(ctx, r.HubClient, cluster.Name)
if err != nil {
return nil, err
}
Expand All @@ -135,11 +137,8 @@ func (r *AlarmDictionaryDefinition) processManagedCluster(ctx context.Context, v
return rules, nil
}

// Needed for testing
var getClientForCluster = k8s.NewClientForCluster

// getManagedCluster finds a single managed cluster with the given version
func (r *AlarmDictionaryDefinition) getManagedCluster(ctx context.Context, version string) (*clusterv1.ManagedCluster, error) {
func (r *NodeClusterTypeDictionaryService) getManagedCluster(ctx context.Context, version string) (*clusterv1.ManagedCluster, error) {
// Match managed cluster with the given version and not local cluster
selector := labels.NewSelector()
versionSelector, _ := labels.NewRequirement(utils.OpenshiftVersionLabelName, selection.Equals, []string{version})
Expand All @@ -149,7 +148,7 @@ func (r *AlarmDictionaryDefinition) getManagedCluster(ctx context.Context, versi
defer cancel()

var managedClusters clusterv1.ManagedClusterList
err := r.Client.List(ctxWithTimeout, &managedClusters, &crclient.ListOptions{
err := r.HubClient.List(ctxWithTimeout, &managedClusters, &crclient.ListOptions{
LabelSelector: selector.Add(*versionSelector).Add(*localClusterRequirement),
Limit: 1,
})
Expand All @@ -164,8 +163,8 @@ func (r *AlarmDictionaryDefinition) getManagedCluster(ctx context.Context, versi
return &managedClusters.Items[0], nil
}

// getRules gets rules defined within a PrometheusRule resource
func (r *AlarmDictionaryDefinition) getRules(ctx context.Context, cl crclient.Client) ([]monitoringv1.Rule, error) {
// getRules gets rules defined within a PrometheusRule
func (r *NodeClusterTypeDictionaryService) getRules(ctx context.Context, cl crclient.Client) ([]monitoringv1.Rule, error) {
ctxWithTimeout, cancel := context.WithTimeout(ctx, clients.ListRequestTimeout)
defer cancel()

Expand All @@ -192,7 +191,7 @@ func (r *AlarmDictionaryDefinition) getRules(ctx context.Context, cl crclient.Cl
}

// syncDictionaries synchronizes the alarm dictionaries in the database
func (r *AlarmDictionaryDefinition) syncDictionaries(ctx context.Context, nodeClusterTypes []clusterserver.NodeClusterType) error {
func (r *NodeClusterTypeDictionaryService) syncDictionaries(ctx context.Context, nodeClusterTypes []clusterserver.NodeClusterType) error {
slog.Info("Synchronizing alarm dictionaries in the database")
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute) // let's try to finish init with a set time.
defer cancel()
Expand All @@ -210,7 +209,7 @@ func (r *AlarmDictionaryDefinition) syncDictionaries(ctx context.Context, nodeCl
}

// deleteOutdatedDictionaries remove any dictionary that may not be available anymore
func (r *AlarmDictionaryDefinition) deleteOutdatedDictionaries(ctx context.Context, nodeClusterTypes []clusterserver.NodeClusterType) {
func (r *NodeClusterTypeDictionaryService) deleteOutdatedDictionaries(ctx context.Context, nodeClusterTypes []clusterserver.NodeClusterType) {
// Delete Dictionaries that do not have a corresponding NodeClusterType
ids := make([]any, 0, len(nodeClusterTypes))
for _, nodeClusterType := range nodeClusterTypes {
Expand All @@ -225,7 +224,7 @@ func (r *AlarmDictionaryDefinition) deleteOutdatedDictionaries(ctx context.Conte
}

// processNodeClusterTypes process each nodeClusterType in parallel and return early with error if anything fails
func (r *AlarmDictionaryDefinition) processNodeClusterTypes(ctx context.Context, nodeClusterTypes []clusterserver.NodeClusterType) error {
func (r *NodeClusterTypeDictionaryService) processNodeClusterTypes(ctx context.Context, nodeClusterTypes []clusterserver.NodeClusterType) error {
// Validate all rules exist before starting any processing
r.validateRulesExist(nodeClusterTypes)

Expand All @@ -243,7 +242,7 @@ func (r *AlarmDictionaryDefinition) processNodeClusterTypes(ctx context.Context,
}

// validateRulesExist check if we have rules to work with that will be converted to alarm defs and warn otherwise
func (r *AlarmDictionaryDefinition) validateRulesExist(nodeClusterTypes []clusterserver.NodeClusterType) {
func (r *NodeClusterTypeDictionaryService) validateRulesExist(nodeClusterTypes []clusterserver.NodeClusterType) {
var missingRules []uuid.UUID
for _, nct := range nodeClusterTypes {
if _, ok := r.RulesMap[nct.NodeClusterTypeId]; !ok {
Expand All @@ -257,7 +256,7 @@ func (r *AlarmDictionaryDefinition) validateRulesExist(nodeClusterTypes []cluste
}

// processNodeClusterType process dict and def for nodeClusterType
func (r *AlarmDictionaryDefinition) processNodeClusterType(ctx context.Context, nodeClusterType clusterserver.NodeClusterType) error {
func (r *NodeClusterTypeDictionaryService) processNodeClusterType(ctx context.Context, nodeClusterType clusterserver.NodeClusterType) error {
// Process dictionary_definition
dictID, err := r.upsertAlarmDictionary(ctx, nodeClusterType)
if err != nil {
Expand All @@ -276,7 +275,7 @@ func (r *AlarmDictionaryDefinition) processNodeClusterType(ctx context.Context,
}

// upsertAlarmDictionary process dict for nodeClusterType
func (r *AlarmDictionaryDefinition) upsertAlarmDictionary(ctx context.Context, nodeClusterType clusterserver.NodeClusterType) (models.AlarmDictionary, error) {
func (r *NodeClusterTypeDictionaryService) upsertAlarmDictionary(ctx context.Context, nodeClusterType clusterserver.NodeClusterType) (models.AlarmDictionary, error) {
extensions, err := getVendorExtensions(nodeClusterType)
if err != nil {
return models.AlarmDictionary{}, fmt.Errorf("failed to get extensions for nodeClusterType %s: %w", nodeClusterType.NodeClusterTypeId, err)
Expand Down Expand Up @@ -304,7 +303,7 @@ func (r *AlarmDictionaryDefinition) upsertAlarmDictionary(ctx context.Context, n
}

// upsertAlarmDictionary process def for nodeClusterType
func (r *AlarmDictionaryDefinition) processAlarmDefinitions(ctx context.Context, nodeClusterType clusterserver.NodeClusterType, ad models.AlarmDictionary) error {
func (r *NodeClusterTypeDictionaryService) processAlarmDefinitions(ctx context.Context, nodeClusterType clusterserver.NodeClusterType, ad models.AlarmDictionary) error {
// Get filtered rules
filteredRules := r.getFilteredRules(nodeClusterType.NodeClusterTypeId)

Expand All @@ -321,7 +320,7 @@ func (r *AlarmDictionaryDefinition) processAlarmDefinitions(ctx context.Context,
}

// getFilteredRules check to see if rule can potentially be skipped
func (r *AlarmDictionaryDefinition) getFilteredRules(nodeClusterTypeID uuid.UUID) []monitoringv1.Rule {
func (r *NodeClusterTypeDictionaryService) getFilteredRules(nodeClusterTypeID uuid.UUID) []monitoringv1.Rule {
// Upsert will complain if there are rules with the same Alert and Severity
// We need to filter them out. First occurrence wins.
type uniqueAlarm struct {
Expand Down Expand Up @@ -354,7 +353,7 @@ func (r *AlarmDictionaryDefinition) getFilteredRules(nodeClusterTypeID uuid.UUID
}

// createAlarmDefinitions create new alarm def for each rules
func (r *AlarmDictionaryDefinition) createAlarmDefinitions(rules []monitoringv1.Rule, ad models.AlarmDictionary, nodeClusterType clusterserver.NodeClusterType) []models.AlarmDefinition {
func (r *NodeClusterTypeDictionaryService) createAlarmDefinitions(rules []monitoringv1.Rule, ad models.AlarmDictionary, nodeClusterType clusterserver.NodeClusterType) []models.AlarmDefinition {
var records []models.AlarmDefinition

for _, rule := range rules {
Expand Down Expand Up @@ -391,7 +390,7 @@ func (r *AlarmDictionaryDefinition) createAlarmDefinitions(rules []monitoringv1.
}

// upsertAndCleanupDefinitions insert or update and finally remove defs if possible
func (r *AlarmDictionaryDefinition) upsertAndCleanupDefinitions(ctx context.Context, records []models.AlarmDefinition, nodeClusterTypeID uuid.UUID) error {
func (r *NodeClusterTypeDictionaryService) upsertAndCleanupDefinitions(ctx context.Context, records []models.AlarmDefinition, nodeClusterTypeID uuid.UUID) error {
// Upsert Alarm Definitions
alarmDefinitionRecords, err := r.AlarmsRepository.UpsertAlarmDefinitions(ctx, records)
if err != nil {
Expand Down
Loading

0 comments on commit d3480e7

Please sign in to comment.