Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add leader election to the backend #847

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ metadata:
name: aro-hcp-backend
spec:
progressDeadlineSeconds: 600
replicas: 1
replicas: 2
revisionHistoryLimit: 10
selector:
matchLabels:
Expand Down Expand Up @@ -44,6 +44,10 @@ spec:
configMapKeyRef:
name: backend-config
key: LOCATION
- name: NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
ports:
- containerPort: 8081
protocol: TCP
Expand Down
17 changes: 17 additions & 0 deletions backend/deploy/helm/backend/templates/leader-election.role.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: leader-election
namespace: {{ .Release.Namespace }}
rules:
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- create
- get
- list
- update
- watch
- patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: backend-leader-election
namespace: {{ .Release.Namespace }}
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: leader-election
subjects:
- kind: ServiceAccount
name: backend
namespace: {{ .Release.Namespace }}
92 changes: 83 additions & 9 deletions backend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,36 @@ import (
"os/signal"
"path/filepath"
"runtime/debug"
"sync/atomic"
"syscall"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/cloud"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos"
"github.com/go-logr/logr"
ocmsdk "github.com/openshift-online/ocm-sdk-go"
"github.com/spf13/cobra"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog/v2"

"github.com/Azure/ARO-HCP/internal/database"
)

const (
leaderElectionLockName = "backend-leader"
leaderElectionLeaseDuration = 15 * time.Second
leaderElectionRenewDeadline = 10 * time.Second
leaderElectionRetryPeriod = 2 * time.Second
)

var (
argKubeconfig string
argNamespace string
argLocation string
argCosmosName string
argCosmosURL string
Expand Down Expand Up @@ -54,6 +71,8 @@ var (
func init() {
rootCmd.SetErrPrefix(rootCmd.Short + " error:")

rootCmd.Flags().StringVar(&argKubeconfig, "kubeconfig", "", "Absolute path to the kubeconfig file")
rootCmd.Flags().StringVar(&argNamespace, "namespace", os.Getenv("NAMESPACE"), "Kubernetes namespace")
rootCmd.Flags().StringVar(&argLocation, "location", os.Getenv("LOCATION"), "Azure location")
rootCmd.Flags().StringVar(&argCosmosName, "cosmos-name", os.Getenv("DB_NAME"), "Cosmos database name")
rootCmd.Flags().StringVar(&argCosmosURL, "cosmos-url", os.Getenv("DB_URL"), "Cosmos database URL")
Expand All @@ -72,6 +91,14 @@ func init() {
}
}

func newKubeconfig(kubeconfig string) (*rest.Config, error) {
mbarnes marked this conversation as resolved.
Show resolved Hide resolved
loader := clientcmd.NewDefaultClientConfigLoadingRules()
if kubeconfig != "" {
loader.ExplicitPath = kubeconfig
}
return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loader, nil).ClientConfig()
}

func newCosmosDBClient() (database.DBClient, error) {
azcoreClientOptions := azcore.ClientOptions{
// FIXME Cloud should be determined by other means.
Expand Down Expand Up @@ -105,6 +132,31 @@ func newCosmosDBClient() (database.DBClient, error) {
func Run(cmd *cobra.Command, args []string) error {
handler := slog.NewJSONHandler(os.Stdout, nil)
logger := slog.New(handler)
klog.SetLogger(logr.FromSlogHandler(handler))

// Use pod name as the lock identity.
hostname, err := os.Hostname()
if err != nil {
return err
}

kubeconfig, err := newKubeconfig(argKubeconfig)
if err != nil {
return fmt.Errorf("Failed to create Kubernetes configuration: %w", err)
}

leaderElectionLock, err := resourcelock.NewFromKubeconfig(
resourcelock.LeasesResourceLock,
argNamespace,
leaderElectionLockName,
resourcelock.ResourceLockConfig{
Identity: hostname,
},
kubeconfig,
leaderElectionRenewDeadline)
if err != nil {
return fmt.Errorf("Failed to create leader election lock: %w", err)
}

// Create database client
dbClient, err := newCosmosDBClient()
Expand All @@ -125,17 +177,39 @@ func Run(cmd *cobra.Command, args []string) error {

operationsScanner := NewOperationsScanner(dbClient, ocmConnection)

stop := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM)

go operationsScanner.Run(logger, stop)

sig := <-signalChannel
logger.Info(fmt.Sprintf("caught %s signal", sig))
close(stop)

operationsScanner.Join()
go func() {
sig := <-signalChannel
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please just use signal.NotifyContext on this thing? Didn't we already talk about this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to log the signal received along with a timestamp so I can observe how much longer it takes the process to shut down. I don't see any big advantage to signal.NotifyContext; it's a convenience wrapper that does the same thing I'm doing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is signalChannel just for your logger? Then the following is simpler:

ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()

go func() {
    <-ctx.Done()
    logger.Info("Caught interrupt signal")
}()

And the only difference is you can't tell if you got SIGINT or SIGTERM (why do you care? in production, the kubelet only ever sends SIGTERM or SIGKILL (ref), so this seems like a distinction without much meaning). Code is much less Rube-Goldberg. IDK. I respect you think this logging of the specific signal that is sent is somehow important but in a decade of writing k8s-native applications it has never been useful or required in the past.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as big advantage, I did want to mention - using a more complex cascade of handlers and goroutines when it's not necessary is a downside for sure. Making the simple choice every time you can helps make this maintainable ten years down the line.

logger.Info(fmt.Sprintf("Caught %s signal", sig))
cancel()
}()

var startedLeading atomic.Bool

// FIXME Integrate leaderelection.HealthzAdaptor into a /healthz endpoint.
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: leaderElectionLock,
LeaseDuration: leaderElectionLeaseDuration,
RenewDeadline: leaderElectionRenewDeadline,
RetryPeriod: leaderElectionRetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
startedLeading.Store(true)
go operationsScanner.Run(ctx, logger)
},
OnStoppedLeading: func() {
if startedLeading.Load() {
operationsScanner.Join()
}
},
},
ReleaseOnCancel: true,
Name: leaderElectionLockName,
})

logger.Info(fmt.Sprintf("%s (%s) stopped", cmd.Short, cmd.Version))

Expand Down
73 changes: 38 additions & 35 deletions backend/operations_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func getInterval(envName string, defaultVal time.Duration, logger *slog.Logger)
return defaultVal
}

func (s *OperationsScanner) Run(logger *slog.Logger, stop <-chan struct{}) {
func (s *OperationsScanner) Run(ctx context.Context, logger *slog.Logger) {
defer close(s.done)

var interval time.Duration
Expand All @@ -72,19 +72,23 @@ func (s *OperationsScanner) Run(logger *slog.Logger, stop <-chan struct{}) {
logger.Info("Polling Cluster Service every " + interval.String())
pollCSOperationsTicker := time.NewTicker(interval)

ctx := context.Background()

// Poll database immediately on startup.
s.pollDBOperations(ctx, logger)
s.pollDBOperations(logger)

// The Context is cancelled when the process receives an interrupt signal.
// We let the periodic functions use their own Context so they can finish
// gracefully before we return.
loop:
for {
select {
case <-pollDBOperationsTicker.C:
s.pollDBOperations(ctx, logger)
s.pollDBOperations(logger)
case <-pollCSOperationsTicker.C:
s.pollCSOperations(ctx, logger, stop)
case <-stop:
break
s.pollCSOperations(logger)
case <-ctx.Done():
// break alone just breaks out of select.
// Use a label to break out of the loop.
break loop
}
}
}
Expand All @@ -93,9 +97,11 @@ func (s *OperationsScanner) Join() {
<-s.done
}

func (s *OperationsScanner) pollDBOperations(ctx context.Context, logger *slog.Logger) {
func (s *OperationsScanner) pollDBOperations(logger *slog.Logger) {
var activeOperations []*database.OperationDocument

ctx := context.Background()

iterator := s.dbClient.ListAllOperationDocs(ctx)

for item := range iterator.Items(ctx) {
Expand Down Expand Up @@ -123,35 +129,32 @@ func (s *OperationsScanner) pollDBOperations(ctx context.Context, logger *slog.L
}
}

func (s *OperationsScanner) pollCSOperations(ctx context.Context, logger *slog.Logger, stop <-chan struct{}) {
func (s *OperationsScanner) pollCSOperations(logger *slog.Logger) {
var activeOperations []*database.OperationDocument

ctx := context.Background()

for _, doc := range s.activeOperations {
select {
case <-stop:
break
default:
var requeue bool
var err error

opLogger := logger.With(
"operation", doc.Request,
"operation_id", doc.ID,
"resource_id", doc.ExternalID.String(),
"internal_id", doc.InternalID.String())

switch doc.InternalID.Kind() {
case cmv1.ClusterKind:
requeue, err = s.pollClusterOperation(ctx, opLogger, doc)
case cmv1.NodePoolKind:
requeue, err = s.pollNodePoolOperation(ctx, opLogger, doc)
}
if requeue {
activeOperations = append(activeOperations, doc)
}
if err != nil {
opLogger.Error(fmt.Sprintf("Error while polling operation '%s': %s", doc.ID, err.Error()))
}
var requeue bool
var err error

opLogger := logger.With(
"operation", doc.Request,
"operation_id", doc.ID,
"resource_id", doc.ExternalID.String(),
"internal_id", doc.InternalID.String())

switch doc.InternalID.Kind() {
case cmv1.ClusterKind:
requeue, err = s.pollClusterOperation(ctx, opLogger, doc)
case cmv1.NodePoolKind:
requeue, err = s.pollNodePoolOperation(ctx, opLogger, doc)
}
if requeue {
activeOperations = append(activeOperations, doc)
}
if err != nil {
opLogger.Error(fmt.Sprintf("Error while polling operation '%s': %s", doc.ID, err.Error()))
}
}

Expand Down
Loading