From 9478a82eeb64ca4dc8415dd4dc4bd7bec3841e39 Mon Sep 17 00:00:00 2001 From: Dan Molik Date: Fri, 2 Aug 2024 07:06:27 -0400 Subject: [PATCH] add scaling logic --- internal/controller/valkey_controller.go | 136 +++++++++++++++++++++++ 1 file changed, 136 insertions(+) diff --git a/internal/controller/valkey_controller.go b/internal/controller/valkey_controller.go index e4b958f..693907c 100644 --- a/internal/controller/valkey_controller.go +++ b/internal/controller/valkey_controller.go @@ -23,6 +23,7 @@ import ( "fmt" "io" "math/big" + "net" "strings" "time" @@ -375,6 +376,116 @@ func (r *ValkeyReconciler) upsertServiceAccount(ctx context.Context, valkey *hyp return nil } +func removePort(addr string) string { + if strings.Contains(addr, ":") { + return strings.Split(addr, ":")[0] + } + return addr +} + +func (r *ValkeyReconciler) balanceNodes(ctx context.Context, valkey *hyperv1.Valkey, oldnodes, newnodes int32) error { + logger := log.FromContext(ctx) + + password, err := r.upsertSecret(ctx, valkey, true) + if err != nil { + return err + } + + vClient, err := valkeyClient.NewClient(valkeyClient.ClientOption{InitAddress: []string{valkey.Name + "." + valkey.Namespace + ".svc:6379"}, Password: password}) + if err != nil { + logger.Error(err, "failed to create valkey client", "valkey", valkey.Name, "namespace", valkey.Namespace) + return err + } + defer vClient.Close() + if err := vClient.Do(ctx, vClient.B().Ping().Build()).Error(); err != nil { + logger.Error(err, "failed to ping valkey", "valkey", valkey.Name, "namespace", valkey.Namespace) + return err + } + + logger.Info("balancing nodes", "valkey", valkey.Name, "namespace", valkey.Namespace) + if oldnodes == newnodes { + return nil + } + nodes, err := vClient.Do(ctx, vClient.B().ClusterNodes().Build()).ToString() + if err != nil { + logger.Error(err, "failed to get nodes", "valkey", valkey.Name, "namespace", valkey.Namespace) + return err + } + ids := map[string]string{} + for _, node := range strings.Split(nodes, "\n") { + if node == "" { + continue + } + line := strings.Split(node, " ") + id := strings.ReplaceAll(line[0], "txt:", "") + addr := removePort(line[1]) + addrs, err := net.LookupAddr(addr) + if err != nil { + logger.Error(err, "failed to lookup addr", "valkey", valkey.Name, "namespace", valkey.Namespace, "addr", addr) + } + hostname := strings.Split(addrs[0], ".")[0] + //namespace := strings.Split(addrs[0], ".")[1] + ids[hostname] = id + } + if oldnodes > newnodes { + for i := oldnodes - 1; i >= newnodes; i-- { // remove nodes + if _, ok := ids[fmt.Sprintf("%s-%d", valkey.Name, i)]; !ok { + logger.Info("node not found", "valkey", valkey.Name, "namespace", valkey.Namespace, "node", fmt.Sprintf("%s-%d", valkey.Name, i)) + continue + } + if err := vClient.Do(ctx, vClient.B().ClusterForget().NodeId(ids[fmt.Sprintf("%s-%d", valkey.Name, i)]).Build()).Error(); err != nil { + logger.Error(err, "failed to forget node", "valkey", valkey.Name, "namespace", valkey.Namespace) + return err + } + } + } else { + for i := oldnodes; i < newnodes; i++ { // add nodes + name := fmt.Sprintf("%s-%d", valkey.Name, i) + if err := r.waitForPod(ctx, name, valkey.Namespace); err != nil { + logger.Error(err, "failed to wait for pod", "valkey", valkey.Name, "namespace", valkey.Namespace, "node", name) + return err + } + addr, err := net.LookupHost(fmt.Sprintf("%s.%s-headless.%s.svc", name, valkey.Name, valkey.Namespace)) + if err != nil { + logger.Error(err, "failed to lookup host", "valkey", valkey.Name, "namespace", valkey.Namespace) + return err + } + if err := vClient.Do(ctx, vClient.B().ClusterMeet().Ip(addr[0]).Port(6379).Build()).Error(); err != nil { + logger.Error(err, "failed to meet node", "valkey", valkey.Name, "namespace", valkey.Namespace, "node", name) + return err + } + } + } + return nil +} + +func (r *ValkeyReconciler) waitForPod(ctx context.Context, name, namespace string) error { + logger := log.FromContext(ctx) + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } + var tries int + for { + if err := r.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, pod); err != nil { + logger.Error(err, "failed fetching pod", "name", name, "namespace", namespace) + continue + } + if pod.Status.Phase == corev1.PodRunning { + break + } + time.Sleep(time.Second * 3) + tries++ + if tries > 15 { + return fmt.Errorf("pod %s/%s is not running", namespace, name) + } + } + return nil +} + func getMasterNodes(valkey *hyperv1.Valkey) string { var nodes []string for i := 0; i < int(valkey.Spec.MasterNodes)*(int(valkey.Spec.Replicas)+1); i++ { @@ -665,6 +776,31 @@ fi r.Recorder.Event(valkey, "Normal", "Created", fmt.Sprintf("StatefulSet %s/%s is created", valkey.Namespace, valkey.Name)) } + + err := r.Get(ctx, types.NamespacedName{Namespace: valkey.Namespace, Name: valkey.Name}, sts) + if err != nil && errors.IsNotFound(err) { + if err := r.Create(ctx, sts); err != nil { + logger.Error(err, "failed to update statefulset", "valkey", valkey.Name, "namespace", valkey.Namespace) + return err + } + r.Recorder.Event(valkey, "Normal", "Created", + fmt.Sprintf("StatefulSet %s/%s is created", valkey.Namespace, valkey.Name)) + } else if err != nil { + logger.Error(err, "failed fetching statefulset", "valkey", valkey.Name, "namespace", valkey.Namespace) + return err + } + + if *sts.Spec.Replicas != valkey.Spec.MasterNodes { + oldnodes := *sts.Spec.Replicas + sts.Spec.Replicas = &valkey.Spec.MasterNodes + if err := r.Update(ctx, sts); err != nil { + logger.Error(err, "failed to update statefulset", "valkey", valkey.Name, "namespace", valkey.Namespace) + return err + } + r.Recorder.Event(valkey, "Normal", "Updated", fmt.Sprintf("StatefulSet %s/%s is updated (replicas)", valkey.Namespace, valkey.Name)) + r.balanceNodes(ctx, valkey, oldnodes, *sts.Spec.Replicas) + } + return nil }