diff --git a/backend/deploy/helm/backend/templates/backend.deployment.yaml b/backend/deploy/helm/backend/templates/backend.deployment.yaml index 6b74ec0c6..0cce5e086 100644 --- a/backend/deploy/helm/backend/templates/backend.deployment.yaml +++ b/backend/deploy/helm/backend/templates/backend.deployment.yaml @@ -6,7 +6,7 @@ metadata: name: aro-hcp-backend spec: progressDeadlineSeconds: 600 - replicas: 1 + replicas: 2 revisionHistoryLimit: 10 selector: matchLabels: @@ -44,6 +44,10 @@ spec: configMapKeyRef: name: backend-config key: LOCATION + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace ports: - containerPort: 8081 protocol: TCP diff --git a/backend/deploy/helm/backend/templates/leader-election.role.yaml b/backend/deploy/helm/backend/templates/leader-election.role.yaml new file mode 100644 index 000000000..4b7f2b4a3 --- /dev/null +++ b/backend/deploy/helm/backend/templates/leader-election.role.yaml @@ -0,0 +1,17 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: leader-election + namespace: {{ .Release.Namespace }} +rules: +- apiGroups: + - coordination.k8s.io + resources: + - leases + verbs: + - create + - get + - list + - update + - watch + - patch diff --git a/backend/deploy/helm/backend/templates/leader-election.rolebinding.yaml b/backend/deploy/helm/backend/templates/leader-election.rolebinding.yaml new file mode 100644 index 000000000..cac41f22d --- /dev/null +++ b/backend/deploy/helm/backend/templates/leader-election.rolebinding.yaml @@ -0,0 +1,13 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: backend-leader-election + namespace: {{ .Release.Namespace }} +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: leader-election +subjects: +- kind: ServiceAccount + name: backend + namespace: {{ .Release.Namespace }} diff --git a/backend/main.go b/backend/main.go index 88b455aa9..54ac34ce9 100644 --- a/backend/main.go +++ b/backend/main.go @@ -11,19 +11,36 @@ import ( "os/signal" "path/filepath" "runtime/debug" + "sync/atomic" "syscall" + "time" "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/azcore/cloud" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos" + "github.com/go-logr/logr" ocmsdk "github.com/openshift-online/ocm-sdk-go" "github.com/spf13/cobra" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/klog/v2" "github.com/Azure/ARO-HCP/internal/database" ) +const ( + leaderElectionLockName = "backend-leader" + leaderElectionLeaseDuration = 15 * time.Second + leaderElectionRenewDeadline = 10 * time.Second + leaderElectionRetryPeriod = 2 * time.Second +) + var ( + argKubeconfig string + argNamespace string argLocation string argCosmosName string argCosmosURL string @@ -54,6 +71,8 @@ var ( func init() { rootCmd.SetErrPrefix(rootCmd.Short + " error:") + rootCmd.Flags().StringVar(&argKubeconfig, "kubeconfig", "", "Absolute path to the kubeconfig file") + rootCmd.Flags().StringVar(&argNamespace, "namespace", os.Getenv("NAMESPACE"), "Kubernetes namespace") rootCmd.Flags().StringVar(&argLocation, "location", os.Getenv("LOCATION"), "Azure location") rootCmd.Flags().StringVar(&argCosmosName, "cosmos-name", os.Getenv("DB_NAME"), "Cosmos database name") rootCmd.Flags().StringVar(&argCosmosURL, "cosmos-url", os.Getenv("DB_URL"), "Cosmos database URL") @@ -72,6 +91,14 @@ func init() { } } +func newKubeconfig(kubeconfig string) (*rest.Config, error) { + loader := clientcmd.NewDefaultClientConfigLoadingRules() + if kubeconfig != "" { + loader.ExplicitPath = kubeconfig + } + return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loader, nil).ClientConfig() +} + func newCosmosDBClient() (database.DBClient, error) { azcoreClientOptions := azcore.ClientOptions{ // FIXME Cloud should be determined by other means. @@ -105,6 +132,31 @@ func newCosmosDBClient() (database.DBClient, error) { func Run(cmd *cobra.Command, args []string) error { handler := slog.NewJSONHandler(os.Stdout, nil) logger := slog.New(handler) + klog.SetLogger(logr.FromSlogHandler(handler)) + + // Use pod name as the lock identity. + hostname, err := os.Hostname() + if err != nil { + return err + } + + kubeconfig, err := newKubeconfig(argKubeconfig) + if err != nil { + return fmt.Errorf("Failed to create Kubernetes configuration: %w", err) + } + + leaderElectionLock, err := resourcelock.NewFromKubeconfig( + resourcelock.LeasesResourceLock, + argNamespace, + leaderElectionLockName, + resourcelock.ResourceLockConfig{ + Identity: hostname, + }, + kubeconfig, + leaderElectionRenewDeadline) + if err != nil { + return fmt.Errorf("Failed to create leader election lock: %w", err) + } // Create database client dbClient, err := newCosmosDBClient() @@ -125,17 +177,39 @@ func Run(cmd *cobra.Command, args []string) error { operationsScanner := NewOperationsScanner(dbClient, ocmConnection) - stop := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + signalChannel := make(chan os.Signal, 1) signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM) - - go operationsScanner.Run(logger, stop) - - sig := <-signalChannel - logger.Info(fmt.Sprintf("caught %s signal", sig)) - close(stop) - - operationsScanner.Join() + go func() { + sig := <-signalChannel + logger.Info(fmt.Sprintf("Caught %s signal", sig)) + cancel() + }() + + var startedLeading atomic.Bool + + // FIXME Integrate leaderelection.HealthzAdaptor into a /healthz endpoint. + leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ + Lock: leaderElectionLock, + LeaseDuration: leaderElectionLeaseDuration, + RenewDeadline: leaderElectionRenewDeadline, + RetryPeriod: leaderElectionRetryPeriod, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + startedLeading.Store(true) + go operationsScanner.Run(ctx, logger) + }, + OnStoppedLeading: func() { + if startedLeading.Load() { + operationsScanner.Join() + } + }, + }, + ReleaseOnCancel: true, + Name: leaderElectionLockName, + }) logger.Info(fmt.Sprintf("%s (%s) stopped", cmd.Short, cmd.Version)) diff --git a/backend/operations_scanner.go b/backend/operations_scanner.go index 00e50076b..bbb2c85c0 100644 --- a/backend/operations_scanner.go +++ b/backend/operations_scanner.go @@ -59,7 +59,7 @@ func getInterval(envName string, defaultVal time.Duration, logger *slog.Logger) return defaultVal } -func (s *OperationsScanner) Run(logger *slog.Logger, stop <-chan struct{}) { +func (s *OperationsScanner) Run(ctx context.Context, logger *slog.Logger) { defer close(s.done) var interval time.Duration @@ -72,19 +72,23 @@ func (s *OperationsScanner) Run(logger *slog.Logger, stop <-chan struct{}) { logger.Info("Polling Cluster Service every " + interval.String()) pollCSOperationsTicker := time.NewTicker(interval) - ctx := context.Background() - // Poll database immediately on startup. - s.pollDBOperations(ctx, logger) + s.pollDBOperations(logger) + // The Context is cancelled when the process receives an interrupt signal. + // We let the periodic functions use their own Context so they can finish + // gracefully before we return. +loop: for { select { case <-pollDBOperationsTicker.C: - s.pollDBOperations(ctx, logger) + s.pollDBOperations(logger) case <-pollCSOperationsTicker.C: - s.pollCSOperations(ctx, logger, stop) - case <-stop: - break + s.pollCSOperations(logger) + case <-ctx.Done(): + // break alone just breaks out of select. + // Use a label to break out of the loop. + break loop } } } @@ -93,9 +97,11 @@ func (s *OperationsScanner) Join() { <-s.done } -func (s *OperationsScanner) pollDBOperations(ctx context.Context, logger *slog.Logger) { +func (s *OperationsScanner) pollDBOperations(logger *slog.Logger) { var activeOperations []*database.OperationDocument + ctx := context.Background() + iterator := s.dbClient.ListAllOperationDocs(ctx) for item := range iterator.Items(ctx) { @@ -123,35 +129,32 @@ func (s *OperationsScanner) pollDBOperations(ctx context.Context, logger *slog.L } } -func (s *OperationsScanner) pollCSOperations(ctx context.Context, logger *slog.Logger, stop <-chan struct{}) { +func (s *OperationsScanner) pollCSOperations(logger *slog.Logger) { var activeOperations []*database.OperationDocument + ctx := context.Background() + for _, doc := range s.activeOperations { - select { - case <-stop: - break - default: - var requeue bool - var err error - - opLogger := logger.With( - "operation", doc.Request, - "operation_id", doc.ID, - "resource_id", doc.ExternalID.String(), - "internal_id", doc.InternalID.String()) - - switch doc.InternalID.Kind() { - case cmv1.ClusterKind: - requeue, err = s.pollClusterOperation(ctx, opLogger, doc) - case cmv1.NodePoolKind: - requeue, err = s.pollNodePoolOperation(ctx, opLogger, doc) - } - if requeue { - activeOperations = append(activeOperations, doc) - } - if err != nil { - opLogger.Error(fmt.Sprintf("Error while polling operation '%s': %s", doc.ID, err.Error())) - } + var requeue bool + var err error + + opLogger := logger.With( + "operation", doc.Request, + "operation_id", doc.ID, + "resource_id", doc.ExternalID.String(), + "internal_id", doc.InternalID.String()) + + switch doc.InternalID.Kind() { + case cmv1.ClusterKind: + requeue, err = s.pollClusterOperation(ctx, opLogger, doc) + case cmv1.NodePoolKind: + requeue, err = s.pollNodePoolOperation(ctx, opLogger, doc) + } + if requeue { + activeOperations = append(activeOperations, doc) + } + if err != nil { + opLogger.Error(fmt.Sprintf("Error while polling operation '%s': %s", doc.ID, err.Error())) } }