From d3480e7e28b9c1b2c76c1782a37dd12214062233 Mon Sep 17 00:00:00 2001 From: Marcelo Guerrero Date: Mon, 23 Dec 2024 11:48:00 +0100 Subject: [PATCH] Implement sync mechanism for alarms collection This implements a polling mechanism to sync the alarm dictionaries and definitions periodically. It also handles the infrastructure server at a more generic level Signed-off-by: Marcelo Guerrero --- internal/service/alarms/api/server.go | 23 +++-- .../internal/alertmanager/alertmanager.go | 14 ++- .../alertmanager/alertmanager_test.go | 13 ++- .../cluster.go} | 83 ++++++++-------- .../dictionary_collector/collector.go | 83 ++++++++++++++++ .../dictionary_collector_test.go} | 20 ++-- .../suite_test.go | 2 +- .../clusterserver/clusterserver.go | 95 +++++++++++++------ .../clusterserver/generated/client.gen.go | 0 .../clusterserver/generated/generate.go | 2 +- .../clusterserver/generated/oapi-codegen.yaml | 0 .../internal/infrastructure/infrastructure.go | 38 ++++++++ internal/service/alarms/serve.go | 67 ++++--------- 13 files changed, 300 insertions(+), 140 deletions(-) rename internal/service/alarms/internal/{dictionary_definition/dictionary_definition.go => dictionary_collector/cluster.go} (82%) create mode 100644 internal/service/alarms/internal/dictionary_collector/collector.go rename internal/service/alarms/internal/{dictionary_definition/dictionary_definition_test.go => dictionary_collector/dictionary_collector_test.go} (93%) rename internal/service/alarms/internal/{dictionary_definition => dictionary_collector}/suite_test.go (84%) rename internal/service/alarms/internal/{ => infrastructure}/clusterserver/clusterserver.go (60%) rename internal/service/alarms/internal/{ => infrastructure}/clusterserver/generated/client.gen.go (100%) rename internal/service/alarms/internal/{ => infrastructure}/clusterserver/generated/generate.go (50%) rename internal/service/alarms/internal/{ => infrastructure}/clusterserver/generated/oapi-codegen.yaml (100%) create mode 100644 internal/service/alarms/internal/infrastructure/infrastructure.go diff --git a/internal/service/alarms/api/server.go b/internal/service/alarms/api/server.go index 12209937..87d82167 100644 --- a/internal/service/alarms/api/server.go +++ b/internal/service/alarms/api/server.go @@ -10,17 +10,17 @@ import ( "sync" "time" - "github.com/openshift-kni/oran-o2ims/internal/service/common/notifier" - "github.com/google/uuid" api "github.com/openshift-kni/oran-o2ims/internal/service/alarms/api/generated" "github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/alertmanager" - "github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/clusterserver" "github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/db/models" "github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/db/repo" + "github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/infrastructure" + "github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/infrastructure/clusterserver" api2 "github.com/openshift-kni/oran-o2ims/internal/service/common/api" common "github.com/openshift-kni/oran-o2ims/internal/service/common/api/generated" + "github.com/openshift-kni/oran-o2ims/internal/service/common/notifier" "github.com/openshift-kni/oran-o2ims/internal/service/common/utils" apiresources "github.com/openshift-kni/oran-o2ims/internal/service/resources/api/generated" ) @@ -42,8 +42,8 @@ type AlarmsServer struct { GlobalCloudID uuid.UUID // AlarmsRepository is the repository for the alarms AlarmsRepository *repo.AlarmsRepository - // ClusterServer contains the cluster server client and fetched objects - ClusterServer *clusterserver.ClusterServer + // Infrastructure clients + Infrastructure *infrastructure.Infrastructure // Wg to allow alarm server level background tasks to finish before graceful exit Wg sync.WaitGroup // NotificationProvider to handle new events @@ -508,8 +508,17 @@ func (a *AlarmsServer) AmNotification(ctx context.Context, request api.AmNotific return nil, fmt.Errorf("%s: %w", msg, err) } + // Get NodeCluster NodeClusterType mapping + var clusterIDToNodeClusterTypeID map[uuid.UUID]uuid.UUID + for i := range a.Infrastructure.Clients { + if a.Infrastructure.Clients[i].Name() == clusterserver.Name { + clusterIDToNodeClusterTypeID = a.Infrastructure.Clients[i].(*clusterserver.ClusterServer).GetClusterIDToResourceTypeID() + break + } + } + // Get the definition data based on current set of Alert names and managed cluster ID - alarmDefinitions, err := a.AlarmsRepository.GetAlarmDefinitions(ctx, request.Body, a.ClusterServer.ClusterIDToResourceTypeID) + alarmDefinitions, err := a.AlarmsRepository.GetAlarmDefinitions(ctx, request.Body, clusterIDToNodeClusterTypeID) if err != nil { msg := "failed to get AlarmDefinitions" slog.Error(msg, "error", err) @@ -517,7 +526,7 @@ func (a *AlarmsServer) AmNotification(ctx context.Context, request api.AmNotific } // Combine possible definitions with events - aerModels := alertmanager.ConvertAmToAlarmEventRecordModels(request.Body, alarmDefinitions, a.ClusterServer.ClusterIDToResourceTypeID) + aerModels := alertmanager.ConvertAmToAlarmEventRecordModels(request.Body, alarmDefinitions, clusterIDToNodeClusterTypeID) // Insert and update AlarmEventRecord if err := a.AlarmsRepository.UpsertAlarmEventRecord(ctx, aerModels); err != nil { diff --git a/internal/service/alarms/internal/alertmanager/alertmanager.go b/internal/service/alarms/internal/alertmanager/alertmanager.go index b29facfa..dc39fab2 100644 --- a/internal/service/alarms/internal/alertmanager/alertmanager.go +++ b/internal/service/alarms/internal/alertmanager/alertmanager.go @@ -18,6 +18,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/openshift-kni/oran-o2ims/internal/controllers/utils" + "github.com/openshift-kni/oran-o2ims/internal/service/common/clients/k8s" ) const ( @@ -31,11 +32,18 @@ const templateName = "alertmanager.yaml" //go:embed alertmanager.yaml.template var alertManagerConfig []byte +var getHubClient = k8s.NewClientForHub + // Setup updates the alertmanager config secret with the new configuration -func Setup(ctx context.Context, cl client.Client) error { +func Setup(ctx context.Context) error { + hubClient, err := getHubClient() + if err != nil { + return fmt.Errorf("error creating client for hub: %w", err) + } + // ACM recreates the secret when it is deleted, so we can safely assume it exists var secret corev1.Secret - err := cl.Get(ctx, client.ObjectKey{Namespace: namespace, Name: secretName}, &secret) + err = hubClient.Get(ctx, client.ObjectKey{Namespace: namespace, Name: secretName}, &secret) if err != nil { return fmt.Errorf("failed to get secret %s/%s: %w", namespace, secretName, err) } @@ -61,7 +69,7 @@ func Setup(ctx context.Context, cl client.Client) error { } secret.Data[secretKey] = rendered.Bytes() - err = cl.Update(ctx, &secret) + err = hubClient.Update(ctx, &secret) if err != nil { return fmt.Errorf("failed to update secret %s/%s: %w", namespace, secretName, err) } diff --git a/internal/service/alarms/internal/alertmanager/alertmanager_test.go b/internal/service/alarms/internal/alertmanager/alertmanager_test.go index c65e7540..2511df8d 100644 --- a/internal/service/alarms/internal/alertmanager/alertmanager_test.go +++ b/internal/service/alarms/internal/alertmanager/alertmanager_test.go @@ -22,12 +22,19 @@ var _ = Describe("Alertmanager", func() { ctx context.Context scheme *runtime.Scheme c client.Client + + temp func() (client.Client, error) ) BeforeEach(func() { scheme = runtime.NewScheme() _ = corev1.AddToScheme(scheme) + temp = getHubClient + getHubClient = func() (client.Client, error) { + return c, nil + } + ctx = context.Background() c = fake.NewClientBuilder().WithScheme(scheme).Build() @@ -45,8 +52,12 @@ var _ = Describe("Alertmanager", func() { Expect(err).NotTo(HaveOccurred()) }) + AfterEach(func() { + getHubClient = temp + }) + It("verifies that the alertmanager.yaml key is populated", func() { - err := Setup(ctx, c) + err := Setup(ctx) Expect(err).ToNot(HaveOccurred()) secret := &corev1.Secret{} diff --git a/internal/service/alarms/internal/dictionary_definition/dictionary_definition.go b/internal/service/alarms/internal/dictionary_collector/cluster.go similarity index 82% rename from internal/service/alarms/internal/dictionary_definition/dictionary_definition.go rename to internal/service/alarms/internal/dictionary_collector/cluster.go index 52d61132..2061af3c 100644 --- a/internal/service/alarms/internal/dictionary_definition/dictionary_definition.go +++ b/internal/service/alarms/internal/dictionary_collector/cluster.go @@ -1,46 +1,45 @@ -package dictionary_definition +package dictionary_collector import ( "context" "encoding/json" "fmt" + "golang.org/x/sync/errgroup" "log/slog" "sync" "time" "github.com/google/uuid" - "github.com/openshift-kni/oran-o2ims/internal/controllers/utils" - "github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/clusterserver" monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" - "golang.org/x/sync/errgroup" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" clusterv1 "open-cluster-management.io/api/cluster/v1" crclient "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/openshift-kni/oran-o2ims/internal/controllers/utils" "github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/db/models" "github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/db/repo" + "github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/infrastructure/clusterserver" "github.com/openshift-kni/oran-o2ims/internal/service/common/clients" "github.com/openshift-kni/oran-o2ims/internal/service/common/clients/k8s" ) -type AlarmDictionaryDefinition struct { - Client crclient.Client +type NodeClusterTypeDictionaryService struct { AlarmsRepository *repo.AlarmsRepository - RulesMap map[uuid.UUID][]monitoringv1.Rule + HubClient crclient.Client + + RulesMap map[uuid.UUID][]monitoringv1.Rule } -func New(client crclient.Client, ar *repo.AlarmsRepository) *AlarmDictionaryDefinition { - return &AlarmDictionaryDefinition{ - Client: client, - AlarmsRepository: ar, - RulesMap: make(map[uuid.UUID][]monitoringv1.Rule), +// loadNodeClusterTypeDictionaries loads the alarm dictionaries for the given node cluster types +func (r *NodeClusterTypeDictionaryService) loadNodeClusterTypeDictionaries(ctx context.Context, nodeClusterTypes []clusterserver.NodeClusterType) { + slog.Info("loading alarm dictionaries for node cluster types") + + if len(nodeClusterTypes) == 0 { + slog.Info("no node cluster types to process") + return } -} -// Load loads the alarm dictionary and definition -func (r *AlarmDictionaryDefinition) Load(ctx context.Context, nodeClusterTypes *[]clusterserver.NodeClusterType) error { - slog.Info("Loading alarm dictionaries and definitions") type result struct { NodeClusterTypeID uuid.UUID rules []monitoringv1.Rule @@ -49,7 +48,7 @@ func (r *AlarmDictionaryDefinition) Load(ctx context.Context, nodeClusterTypes * wg := sync.WaitGroup{} resultChannel := make(chan result) - for _, nct := range *nodeClusterTypes { + for _, nct := range nodeClusterTypes { wg.Add(1) go func(nodeClusterType clusterserver.NodeClusterType) { var err error @@ -96,16 +95,16 @@ func (r *AlarmDictionaryDefinition) Load(ctx context.Context, nodeClusterTypes * r.RulesMap[res.NodeClusterTypeID] = res.rules slog.Info("loaded rules for node cluster type", "NodeClusterType ID", res.NodeClusterTypeID, "rules count", len(res.rules)) } - err := r.syncDictionaries(ctx, *nodeClusterTypes) + + err := r.syncDictionaries(ctx, nodeClusterTypes) if err != nil { - return fmt.Errorf("failed to sync dictionary and definitions: %w", err) + slog.Error("failed to sync dictionary and definitions", "error", err) } - - return nil } -func (r *AlarmDictionaryDefinition) processHub(ctx context.Context) ([]monitoringv1.Rule, error) { - rules, err := r.getRules(ctx, r.Client) +// processHub processes the hub cluster +func (r *NodeClusterTypeDictionaryService) processHub(ctx context.Context) ([]monitoringv1.Rule, error) { + rules, err := r.getRules(ctx, r.HubClient) if err != nil { return nil, err } @@ -114,14 +113,17 @@ func (r *AlarmDictionaryDefinition) processHub(ctx context.Context) ([]monitorin return rules, nil } +// Needed for testing +var getClientForCluster = k8s.NewClientForCluster + // processManagedCluster processes a managed cluster -func (r *AlarmDictionaryDefinition) processManagedCluster(ctx context.Context, version string) ([]monitoringv1.Rule, error) { +func (r *NodeClusterTypeDictionaryService) processManagedCluster(ctx context.Context, version string) ([]monitoringv1.Rule, error) { cluster, err := r.getManagedCluster(ctx, version) if err != nil { return nil, err } - cl, err := getClientForCluster(ctx, r.Client, cluster.Name) + cl, err := getClientForCluster(ctx, r.HubClient, cluster.Name) if err != nil { return nil, err } @@ -135,11 +137,8 @@ func (r *AlarmDictionaryDefinition) processManagedCluster(ctx context.Context, v return rules, nil } -// Needed for testing -var getClientForCluster = k8s.NewClientForCluster - // getManagedCluster finds a single managed cluster with the given version -func (r *AlarmDictionaryDefinition) getManagedCluster(ctx context.Context, version string) (*clusterv1.ManagedCluster, error) { +func (r *NodeClusterTypeDictionaryService) getManagedCluster(ctx context.Context, version string) (*clusterv1.ManagedCluster, error) { // Match managed cluster with the given version and not local cluster selector := labels.NewSelector() versionSelector, _ := labels.NewRequirement(utils.OpenshiftVersionLabelName, selection.Equals, []string{version}) @@ -149,7 +148,7 @@ func (r *AlarmDictionaryDefinition) getManagedCluster(ctx context.Context, versi defer cancel() var managedClusters clusterv1.ManagedClusterList - err := r.Client.List(ctxWithTimeout, &managedClusters, &crclient.ListOptions{ + err := r.HubClient.List(ctxWithTimeout, &managedClusters, &crclient.ListOptions{ LabelSelector: selector.Add(*versionSelector).Add(*localClusterRequirement), Limit: 1, }) @@ -164,8 +163,8 @@ func (r *AlarmDictionaryDefinition) getManagedCluster(ctx context.Context, versi return &managedClusters.Items[0], nil } -// getRules gets rules defined within a PrometheusRule resource -func (r *AlarmDictionaryDefinition) getRules(ctx context.Context, cl crclient.Client) ([]monitoringv1.Rule, error) { +// getRules gets rules defined within a PrometheusRule +func (r *NodeClusterTypeDictionaryService) getRules(ctx context.Context, cl crclient.Client) ([]monitoringv1.Rule, error) { ctxWithTimeout, cancel := context.WithTimeout(ctx, clients.ListRequestTimeout) defer cancel() @@ -192,7 +191,7 @@ func (r *AlarmDictionaryDefinition) getRules(ctx context.Context, cl crclient.Cl } // syncDictionaries synchronizes the alarm dictionaries in the database -func (r *AlarmDictionaryDefinition) syncDictionaries(ctx context.Context, nodeClusterTypes []clusterserver.NodeClusterType) error { +func (r *NodeClusterTypeDictionaryService) syncDictionaries(ctx context.Context, nodeClusterTypes []clusterserver.NodeClusterType) error { slog.Info("Synchronizing alarm dictionaries in the database") ctx, cancel := context.WithTimeout(ctx, 5*time.Minute) // let's try to finish init with a set time. defer cancel() @@ -210,7 +209,7 @@ func (r *AlarmDictionaryDefinition) syncDictionaries(ctx context.Context, nodeCl } // deleteOutdatedDictionaries remove any dictionary that may not be available anymore -func (r *AlarmDictionaryDefinition) deleteOutdatedDictionaries(ctx context.Context, nodeClusterTypes []clusterserver.NodeClusterType) { +func (r *NodeClusterTypeDictionaryService) deleteOutdatedDictionaries(ctx context.Context, nodeClusterTypes []clusterserver.NodeClusterType) { // Delete Dictionaries that do not have a corresponding NodeClusterType ids := make([]any, 0, len(nodeClusterTypes)) for _, nodeClusterType := range nodeClusterTypes { @@ -225,7 +224,7 @@ func (r *AlarmDictionaryDefinition) deleteOutdatedDictionaries(ctx context.Conte } // processNodeClusterTypes process each nodeClusterType in parallel and return early with error if anything fails -func (r *AlarmDictionaryDefinition) processNodeClusterTypes(ctx context.Context, nodeClusterTypes []clusterserver.NodeClusterType) error { +func (r *NodeClusterTypeDictionaryService) processNodeClusterTypes(ctx context.Context, nodeClusterTypes []clusterserver.NodeClusterType) error { // Validate all rules exist before starting any processing r.validateRulesExist(nodeClusterTypes) @@ -243,7 +242,7 @@ func (r *AlarmDictionaryDefinition) processNodeClusterTypes(ctx context.Context, } // validateRulesExist check if we have rules to work with that will be converted to alarm defs and warn otherwise -func (r *AlarmDictionaryDefinition) validateRulesExist(nodeClusterTypes []clusterserver.NodeClusterType) { +func (r *NodeClusterTypeDictionaryService) validateRulesExist(nodeClusterTypes []clusterserver.NodeClusterType) { var missingRules []uuid.UUID for _, nct := range nodeClusterTypes { if _, ok := r.RulesMap[nct.NodeClusterTypeId]; !ok { @@ -257,7 +256,7 @@ func (r *AlarmDictionaryDefinition) validateRulesExist(nodeClusterTypes []cluste } // processNodeClusterType process dict and def for nodeClusterType -func (r *AlarmDictionaryDefinition) processNodeClusterType(ctx context.Context, nodeClusterType clusterserver.NodeClusterType) error { +func (r *NodeClusterTypeDictionaryService) processNodeClusterType(ctx context.Context, nodeClusterType clusterserver.NodeClusterType) error { // Process dictionary_definition dictID, err := r.upsertAlarmDictionary(ctx, nodeClusterType) if err != nil { @@ -276,7 +275,7 @@ func (r *AlarmDictionaryDefinition) processNodeClusterType(ctx context.Context, } // upsertAlarmDictionary process dict for nodeClusterType -func (r *AlarmDictionaryDefinition) upsertAlarmDictionary(ctx context.Context, nodeClusterType clusterserver.NodeClusterType) (models.AlarmDictionary, error) { +func (r *NodeClusterTypeDictionaryService) upsertAlarmDictionary(ctx context.Context, nodeClusterType clusterserver.NodeClusterType) (models.AlarmDictionary, error) { extensions, err := getVendorExtensions(nodeClusterType) if err != nil { return models.AlarmDictionary{}, fmt.Errorf("failed to get extensions for nodeClusterType %s: %w", nodeClusterType.NodeClusterTypeId, err) @@ -304,7 +303,7 @@ func (r *AlarmDictionaryDefinition) upsertAlarmDictionary(ctx context.Context, n } // upsertAlarmDictionary process def for nodeClusterType -func (r *AlarmDictionaryDefinition) processAlarmDefinitions(ctx context.Context, nodeClusterType clusterserver.NodeClusterType, ad models.AlarmDictionary) error { +func (r *NodeClusterTypeDictionaryService) processAlarmDefinitions(ctx context.Context, nodeClusterType clusterserver.NodeClusterType, ad models.AlarmDictionary) error { // Get filtered rules filteredRules := r.getFilteredRules(nodeClusterType.NodeClusterTypeId) @@ -321,7 +320,7 @@ func (r *AlarmDictionaryDefinition) processAlarmDefinitions(ctx context.Context, } // getFilteredRules check to see if rule can potentially be skipped -func (r *AlarmDictionaryDefinition) getFilteredRules(nodeClusterTypeID uuid.UUID) []monitoringv1.Rule { +func (r *NodeClusterTypeDictionaryService) getFilteredRules(nodeClusterTypeID uuid.UUID) []monitoringv1.Rule { // Upsert will complain if there are rules with the same Alert and Severity // We need to filter them out. First occurrence wins. type uniqueAlarm struct { @@ -354,7 +353,7 @@ func (r *AlarmDictionaryDefinition) getFilteredRules(nodeClusterTypeID uuid.UUID } // createAlarmDefinitions create new alarm def for each rules -func (r *AlarmDictionaryDefinition) createAlarmDefinitions(rules []monitoringv1.Rule, ad models.AlarmDictionary, nodeClusterType clusterserver.NodeClusterType) []models.AlarmDefinition { +func (r *NodeClusterTypeDictionaryService) createAlarmDefinitions(rules []monitoringv1.Rule, ad models.AlarmDictionary, nodeClusterType clusterserver.NodeClusterType) []models.AlarmDefinition { var records []models.AlarmDefinition for _, rule := range rules { @@ -391,7 +390,7 @@ func (r *AlarmDictionaryDefinition) createAlarmDefinitions(rules []monitoringv1. } // upsertAndCleanupDefinitions insert or update and finally remove defs if possible -func (r *AlarmDictionaryDefinition) upsertAndCleanupDefinitions(ctx context.Context, records []models.AlarmDefinition, nodeClusterTypeID uuid.UUID) error { +func (r *NodeClusterTypeDictionaryService) upsertAndCleanupDefinitions(ctx context.Context, records []models.AlarmDefinition, nodeClusterTypeID uuid.UUID) error { // Upsert Alarm Definitions alarmDefinitionRecords, err := r.AlarmsRepository.UpsertAlarmDefinitions(ctx, records) if err != nil { diff --git a/internal/service/alarms/internal/dictionary_collector/collector.go b/internal/service/alarms/internal/dictionary_collector/collector.go new file mode 100644 index 00000000..1176b158 --- /dev/null +++ b/internal/service/alarms/internal/dictionary_collector/collector.go @@ -0,0 +1,83 @@ +package dictionary_collector + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/go-logr/logr" + "github.com/google/uuid" + monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" + crclient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/db/repo" + "github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/infrastructure" + "github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/infrastructure/clusterserver" + "github.com/openshift-kni/oran-o2ims/internal/service/common/clients/k8s" +) + +const pollInterval = 10 * time.Minute + +// Collector is the struct that holds the alarms repository and the infrastructure clients +type Collector struct { + AlarmsRepository *repo.AlarmsRepository + Infrastructure *infrastructure.Infrastructure + + hubClient crclient.Client +} + +func New(ar *repo.AlarmsRepository, infra *infrastructure.Infrastructure) (*Collector, error) { + ad := &Collector{ + AlarmsRepository: ar, + Infrastructure: infra, + } + + // To avoid log from eventuallyFulfillRoot controller-runtime + log.SetLogger(logr.Discard()) + + // Create a client for the hub cluster + hubClient, err := k8s.NewClientForHub() + if err != nil { + return nil, fmt.Errorf("failed to create client for hub cluster: %w", err) + } + + ad.hubClient = hubClient + + return ad, nil +} + +// Run starts the alarm dictionary collector +func (r *Collector) Run(ctx context.Context) { + // Currently only the cluster server is supported + for i := range r.Infrastructure.Clients { + if r.Infrastructure.Clients[i].Name() == clusterserver.Name { + r.executeNodeClusterTypeDictionaryService(ctx, r.Infrastructure.Clients[i].(*clusterserver.ClusterServer)) + } + } +} + +// executeNodeClusterTypeDictionaryService starts the NodeClusterTypeDictionaryService +func (r *Collector) executeNodeClusterTypeDictionaryService(ctx context.Context, clusterServerClient *clusterserver.ClusterServer) { + c := &NodeClusterTypeDictionaryService{ + AlarmsRepository: r.AlarmsRepository, + HubClient: r.hubClient, + RulesMap: make(map[uuid.UUID][]monitoringv1.Rule), + } + + // First execution + c.loadNodeClusterTypeDictionaries(ctx, clusterServerClient.GetNodeClusterTypes()) + + go func() { + for { + select { + case <-ctx.Done(): + slog.Info("context cancelled, stopping node cluster type dictionary service") + return + case <-time.After(pollInterval): + c.loadNodeClusterTypeDictionaries(ctx, clusterServerClient.GetNodeClusterTypes()) + } + } + }() +} diff --git a/internal/service/alarms/internal/dictionary_definition/dictionary_definition_test.go b/internal/service/alarms/internal/dictionary_collector/dictionary_collector_test.go similarity index 93% rename from internal/service/alarms/internal/dictionary_definition/dictionary_definition_test.go rename to internal/service/alarms/internal/dictionary_collector/dictionary_collector_test.go index b73c464b..a3cc7a71 100644 --- a/internal/service/alarms/internal/dictionary_definition/dictionary_definition_test.go +++ b/internal/service/alarms/internal/dictionary_collector/dictionary_collector_test.go @@ -1,4 +1,4 @@ -package dictionary_definition +package dictionary_collector import ( "context" @@ -21,10 +21,10 @@ const ( version4152 = "4.15.2" ) -var _ = Describe("AlarmDictionaryDefinition", func() { +var _ = Describe("Alarm dictionary collector for node cluster type", func() { Describe("getManagedCluster", func() { var ( - r *AlarmDictionaryDefinition + r *NodeClusterTypeDictionaryService ctx context.Context scheme *runtime.Scheme ) @@ -34,8 +34,8 @@ var _ = Describe("AlarmDictionaryDefinition", func() { _ = clusterv1.AddToScheme(scheme) withWatch := fake.NewClientBuilder().WithScheme(scheme).Build() - r = &AlarmDictionaryDefinition{ - Client: withWatch, + r = &NodeClusterTypeDictionaryService{ + HubClient: withWatch, } ctx = context.Background() @@ -73,7 +73,7 @@ var _ = Describe("AlarmDictionaryDefinition", func() { } for _, cluster := range managedClusters { - err := r.Client.Create(ctx, cluster) + err := r.HubClient.Create(ctx, cluster) Expect(err).NotTo(HaveOccurred()) } }) @@ -92,7 +92,7 @@ var _ = Describe("AlarmDictionaryDefinition", func() { }) Describe("processManagedCluster", func() { var ( - r *AlarmDictionaryDefinition + r *NodeClusterTypeDictionaryService ctx context.Context scheme *runtime.Scheme @@ -104,8 +104,8 @@ var _ = Describe("AlarmDictionaryDefinition", func() { _ = clusterv1.AddToScheme(scheme) withWatch := fake.NewClientBuilder().WithScheme(scheme).Build() - r = &AlarmDictionaryDefinition{ - Client: withWatch, + r = &NodeClusterTypeDictionaryService{ + HubClient: withWatch, } ctx = context.Background() @@ -120,7 +120,7 @@ var _ = Describe("AlarmDictionaryDefinition", func() { }, } - err := r.Client.Create(ctx, managedCluster) + err := r.HubClient.Create(ctx, managedCluster) Expect(err).NotTo(HaveOccurred()) temp = getClientForCluster diff --git a/internal/service/alarms/internal/dictionary_definition/suite_test.go b/internal/service/alarms/internal/dictionary_collector/suite_test.go similarity index 84% rename from internal/service/alarms/internal/dictionary_definition/suite_test.go rename to internal/service/alarms/internal/dictionary_collector/suite_test.go index 5210ddfa..007e8fdd 100644 --- a/internal/service/alarms/internal/dictionary_definition/suite_test.go +++ b/internal/service/alarms/internal/dictionary_collector/suite_test.go @@ -1,4 +1,4 @@ -package dictionary_definition_test +package dictionary_collector_test import ( "testing" diff --git a/internal/service/alarms/internal/clusterserver/clusterserver.go b/internal/service/alarms/internal/infrastructure/clusterserver/clusterserver.go similarity index 60% rename from internal/service/alarms/internal/clusterserver/clusterserver.go rename to internal/service/alarms/internal/infrastructure/clusterserver/clusterserver.go index cef8cc62..f1e76b8f 100644 --- a/internal/service/alarms/internal/clusterserver/clusterserver.go +++ b/internal/service/alarms/internal/infrastructure/clusterserver/clusterserver.go @@ -7,17 +7,20 @@ import ( "net/http" "os" "strings" + "sync" "github.com/google/uuid" "github.com/oapi-codegen/oapi-codegen/v2/pkg/securityprovider" "github.com/openshift-kni/oran-o2ims/internal/controllers/utils" - "github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/clusterserver/generated" + "github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/infrastructure/clusterserver/generated" "github.com/openshift-kni/oran-o2ims/internal/service/common/clients" ) const ( + Name = "Cluster" + clusterServerURLEnvName = "CLUSTER_SERVER_URL" tokenPathEnvName = "TOKEN_PATH" ) @@ -27,13 +30,20 @@ type NodeClusterType = generated.NodeClusterType type ClusterServer struct { client *generated.ClientWithResponses - NodeClusters *[]NodeCluster - NodeClusterTypes *[]NodeClusterType - ClusterIDToResourceTypeID map[uuid.UUID]uuid.UUID + nodeClusters *[]NodeCluster + nodeClusterTypes *[]NodeClusterType + clusterIDToResourceTypeID map[uuid.UUID]uuid.UUID + + sync.Mutex } -// New creates a new cluster server object -func New() (*ClusterServer, error) { +// Name returns the name of the client +func (r *ClusterServer) Name() string { + return Name +} + +// Setup setups a new client for the cluster server +func (r *ClusterServer) Setup() error { slog.Info("Creating ClusterServer client") url := utils.GetServiceURL(utils.InventoryClusterServerName) @@ -47,7 +57,7 @@ func New() (*ClusterServer, error) { // Set up transport tr, err := utils.GetDefaultBackendTransport() if err != nil { - return nil, fmt.Errorf("failed to create http transport: %w", err) + return fmt.Errorf("failed to create http transport: %w", err) } hc := http.Client{Transport: tr} @@ -63,29 +73,30 @@ func New() (*ClusterServer, error) { // Read token data, err := os.ReadFile(tokenPath) if err != nil { - return nil, fmt.Errorf("failed to read token file: %w", err) + return fmt.Errorf("failed to read token file: %w", err) } // Create Bearer token token, err := securityprovider.NewSecurityProviderBearerToken(strings.TrimSpace(string(data))) if err != nil { - return nil, fmt.Errorf("failed to create Bearer token: %w", err) + return fmt.Errorf("failed to create Bearer token: %w", err) } c, err := generated.NewClientWithResponses(url, generated.WithHTTPClient(&hc), generated.WithRequestEditorFn(token.Intercept)) if err != nil { - return nil, fmt.Errorf("failed to create client: %w", err) + return fmt.Errorf("failed to create client: %w", err) } - return &ClusterServer{client: c}, nil + r.client = c + return nil } -// GetAll fetches all necessary data from the cluster server -func (r *ClusterServer) GetAll(ctx context.Context) error { +// FetchAll fetches all necessary data from the cluster server +func (r *ClusterServer) FetchAll(ctx context.Context) error { slog.Info("Getting all objects from the cluster server") // List node clusters - nodeClusters, err := r.GetNodeClusters(ctx) + nodeClusters, err := r.getNodeClusters(ctx) if err != nil { return fmt.Errorf("failed to get node clusters: %w", err) } @@ -94,7 +105,7 @@ func (r *ClusterServer) GetAll(ctx context.Context) error { } // List node cluster types - nodeClusterTypes, err := r.GetNodeClusterTypes(ctx) + nodeClusterTypes, err := r.getNodeClusterTypes(ctx) if err != nil { return fmt.Errorf("failed to get node cluster types: %w", err) } @@ -102,17 +113,19 @@ func (r *ClusterServer) GetAll(ctx context.Context) error { return fmt.Errorf("no node cluster types found: %w", err) } - r.NodeClusters = nodeClusters - r.NodeClusterTypes = nodeClusterTypes + r.Lock() + defer r.Unlock() - // Todo: not concurrency safe - r.ClusterResourceTypeMapping() + r.nodeClusters = nodeClusters + r.nodeClusterTypes = nodeClusterTypes + + r.clusterResourceTypeMapping() return nil } -// GetNodeClusters lists all node clusters -func (r *ClusterServer) GetNodeClusters(ctx context.Context) (*[]NodeCluster, error) { +// getNodeClusters lists all node clusters +func (r *ClusterServer) getNodeClusters(ctx context.Context) (*[]NodeCluster, error) { ctxWithTimeout, cancel := context.WithTimeout(ctx, clients.ListRequestTimeout) defer cancel() @@ -130,8 +143,8 @@ func (r *ClusterServer) GetNodeClusters(ctx context.Context) (*[]NodeCluster, er return resp.JSON200, nil } -// GetNodeClusterTypes lists all node cluster types -func (r *ClusterServer) GetNodeClusterTypes(ctx context.Context) (*[]NodeClusterType, error) { +// getNodeClusterTypes lists all node cluster types +func (r *ClusterServer) getNodeClusterTypes(ctx context.Context) (*[]NodeClusterType, error) { ctxWithTimeout, cancel := context.WithTimeout(ctx, clients.ListRequestTimeout) defer cancel() @@ -149,13 +162,41 @@ func (r *ClusterServer) GetNodeClusterTypes(ctx context.Context) (*[]NodeCluster return resp.JSON200, nil } -// ClusterResourceTypeMapping map cluster ID with objectType ID for faster lookup during Caas alerts -func (r *ClusterServer) ClusterResourceTypeMapping() { +// clusterResourceTypeMapping map cluster ID with objectType ID for faster lookup during Caas alerts +func (r *ClusterServer) clusterResourceTypeMapping() { mapping := make(map[uuid.UUID]uuid.UUID) - for _, cluster := range *r.NodeClusters { + for _, cluster := range *r.nodeClusters { mapping[cluster.NodeClusterId] = cluster.NodeClusterTypeId slog.Info("Mapping cluster ID to resource type ID", "ClusterID", cluster.NodeClusterId, "NodeClusterTypeId", cluster.NodeClusterTypeId) } - r.ClusterIDToResourceTypeID = mapping + r.clusterIDToResourceTypeID = mapping +} + +// GetNodeClusterTypes returns a copy of the node cluster types +func (r *ClusterServer) GetNodeClusterTypes() []NodeClusterType { + r.Lock() + defer r.Unlock() + + if r.nodeClusterTypes == nil { + return nil + } + + nodeClusterTypesCopy := make([]NodeClusterType, len(*r.nodeClusterTypes)) + copy(nodeClusterTypesCopy, *r.nodeClusterTypes) + + return nodeClusterTypesCopy +} + +// GetClusterIDToResourceTypeID returns a copy of the cluster ID to resource type ID mapping +func (r *ClusterServer) GetClusterIDToResourceTypeID() map[uuid.UUID]uuid.UUID { + r.Lock() + defer r.Unlock() + + c := make(map[uuid.UUID]uuid.UUID) + for k, v := range r.clusterIDToResourceTypeID { + c[k] = v + } + + return c } diff --git a/internal/service/alarms/internal/clusterserver/generated/client.gen.go b/internal/service/alarms/internal/infrastructure/clusterserver/generated/client.gen.go similarity index 100% rename from internal/service/alarms/internal/clusterserver/generated/client.gen.go rename to internal/service/alarms/internal/infrastructure/clusterserver/generated/client.gen.go diff --git a/internal/service/alarms/internal/clusterserver/generated/generate.go b/internal/service/alarms/internal/infrastructure/clusterserver/generated/generate.go similarity index 50% rename from internal/service/alarms/internal/clusterserver/generated/generate.go rename to internal/service/alarms/internal/infrastructure/clusterserver/generated/generate.go index 121de093..5788db2e 100644 --- a/internal/service/alarms/internal/clusterserver/generated/generate.go +++ b/internal/service/alarms/internal/infrastructure/clusterserver/generated/generate.go @@ -1,3 +1,3 @@ package generated -//go:generate go run github.com/oapi-codegen/oapi-codegen/v2/cmd/oapi-codegen -config oapi-codegen.yaml ../../../../cluster/api/openapi.yaml +//go:generate go run github.com/oapi-codegen/oapi-codegen/v2/cmd/oapi-codegen -config oapi-codegen.yaml ../../../../../cluster/api/openapi.yaml diff --git a/internal/service/alarms/internal/clusterserver/generated/oapi-codegen.yaml b/internal/service/alarms/internal/infrastructure/clusterserver/generated/oapi-codegen.yaml similarity index 100% rename from internal/service/alarms/internal/clusterserver/generated/oapi-codegen.yaml rename to internal/service/alarms/internal/infrastructure/clusterserver/generated/oapi-codegen.yaml diff --git a/internal/service/alarms/internal/infrastructure/infrastructure.go b/internal/service/alarms/internal/infrastructure/infrastructure.go new file mode 100644 index 00000000..2cb26241 --- /dev/null +++ b/internal/service/alarms/internal/infrastructure/infrastructure.go @@ -0,0 +1,38 @@ +package infrastructure + +import ( + "context" + "fmt" + + "github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/infrastructure/clusterserver" +) + +// Client is the interface that wraps the basic methods for the infrastructure clients +type Client interface { + Name() string + Setup() error + FetchAll(context.Context) error +} + +// Infrastructure represents the infrastructure clients +type Infrastructure struct { + Clients []Client +} + +// Init sets up the infrastructure clients and fetches all the data +func Init(ctx context.Context) (*Infrastructure, error) { + // Currently only the cluster server is supported + clients := []Client{&clusterserver.ClusterServer{}} + + for _, server := range clients { + if err := server.Setup(); err != nil { + return nil, fmt.Errorf("failed to setup %s: %w", server.Name(), err) + } + + if err := server.FetchAll(ctx); err != nil { + return nil, fmt.Errorf("failed to fetch objects from %s: %w", server.Name(), err) + } + } + + return &Infrastructure{Clients: clients}, nil +} diff --git a/internal/service/alarms/serve.go b/internal/service/alarms/serve.go index 3d1acef3..7897d7b7 100644 --- a/internal/service/alarms/serve.go +++ b/internal/service/alarms/serve.go @@ -11,21 +11,17 @@ import ( "syscall" "time" - "github.com/openshift-kni/oran-o2ims/internal/service/alarms/api" - - "sigs.k8s.io/controller-runtime/pkg/client" - "github.com/google/uuid" "github.com/openshift-kni/oran-o2ims/internal/controllers/utils" + "github.com/openshift-kni/oran-o2ims/internal/service/alarms/api" "github.com/openshift-kni/oran-o2ims/internal/service/alarms/api/generated" "github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/alertmanager" - "github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/clusterserver" "github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/db/repo" - "github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/dictionary_definition" + "github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/dictionary_collector" + "github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/infrastructure" "github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/notifier_provider" common "github.com/openshift-kni/oran-o2ims/internal/service/common/api" - "github.com/openshift-kni/oran-o2ims/internal/service/common/clients/k8s" "github.com/openshift-kni/oran-o2ims/internal/service/common/db" "github.com/openshift-kni/oran-o2ims/internal/service/common/notifier" ) @@ -48,6 +44,7 @@ func Serve(config *api.AlarmsServerConfig) error { signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM) ctx, cancel := context.WithCancel(context.Background()) + defer cancel() go func() { sig := <-shutdown @@ -70,10 +67,10 @@ func Serve(config *api.AlarmsServerConfig) error { pool.Close() }() - // Get client for hub - hubClient, err := k8s.NewClientForHub() + // Init infrastructure clients + infrastructureClients, err := infrastructure.Init(ctx) if err != nil { - return fmt.Errorf("error creating client for hub: %w", err) + return fmt.Errorf("error setting up and collecting objects from infrastructure servers: %w", err) } // Init alarm repository @@ -83,6 +80,15 @@ func Serve(config *api.AlarmsServerConfig) error { // TODO: Launch k8s job for DB remove resolved data + // Load dictionary + alarmDictionaryCollector, err := dictionary_collector.New(alarmRepository, infrastructureClients) + if err != nil { + return fmt.Errorf("error creating alarm dictionary collector: %w", err) + } + + // Run dictionary collector + alarmDictionaryCollector.Run(ctx) + // Parse global cloud id var globalCloudID uuid.UUID if config.GlobalCloudID != utils.DefaultOCloudID { @@ -103,10 +109,7 @@ func Serve(config *api.AlarmsServerConfig) error { alarmServer := api.AlarmsServer{ GlobalCloudID: globalCloudID, AlarmsRepository: alarmRepository, - } - - if err := UpdateAlarmDictionaryAndAlarmsDefinitionData(ctx, hubClient, &alarmServer); err != nil { - return fmt.Errorf("error updating alarms definition data: %w", err) + Infrastructure: infrastructureClients, } // start a new notifier @@ -170,11 +173,11 @@ func Serve(config *api.AlarmsServerConfig) error { serverErrors := make(chan error, 1) // Configure AM right before the server starts listening - if err := alertmanager.Setup(ctx, hubClient); err != nil { + if err := alertmanager.Setup(ctx); err != nil { return fmt.Errorf("error configuring alert manager: %w", err) } - // Start server + // Init server go func() { slog.Info(fmt.Sprintf("Listening on %s", srv.Addr)) if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { @@ -196,38 +199,6 @@ func Serve(config *api.AlarmsServerConfig) error { return nil } -// UpdateAlarmDictionaryAndAlarmsDefinitionData reach out to cluster server and sync DB with alarm dict and def -func UpdateAlarmDictionaryAndAlarmsDefinitionData(ctx context.Context, hubClient client.Client, a *api.AlarmsServer) error { - // Initialize cluster server client - cs, err := clusterserver.New() - if err != nil { - return fmt.Errorf("error creating cluster server client: %w", err) - } - - // Get all needed objects from the cluster server - err = cs.GetAll(ctx) - if err != nil { - return fmt.Errorf("error getting objects from the cluster server: %w", err) - } - - // set cluster server data to alarms server - a.ClusterServer = cs - - // Get all needed resources from the resource server - if err = cs.GetAll(ctx); err != nil { - slog.Warn("error getting resources from the resource server", "error", err) - } - - // Load dictionary and definition - alarmsDictDef := dictionary_definition.New(hubClient, a.AlarmsRepository) - // todo Handle me - if err = alarmsDictDef.Load(ctx, cs.NodeClusterTypes); err != nil { - slog.Warn("error loading dictionary and definition data", "error", err) - } - - return nil -} - // gracefulShutdownWithTasks Server may have background tasks running when SIGTERM is received. Let them continue. func gracefulShutdownWithTasks(srv *http.Server, alarmsServer *api.AlarmsServer) error { done := make(chan struct{})