Skip to content

Commit

Permalink
add scaling logic
Browse files Browse the repository at this point in the history
  • Loading branch information
dmolik committed Aug 2, 2024
1 parent 3a59b85 commit 9478a82
Showing 1 changed file with 136 additions and 0 deletions.
136 changes: 136 additions & 0 deletions internal/controller/valkey_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"io"
"math/big"
"net"
"strings"
"time"

Expand Down Expand Up @@ -375,6 +376,116 @@ func (r *ValkeyReconciler) upsertServiceAccount(ctx context.Context, valkey *hyp
return nil
}

func removePort(addr string) string {
if strings.Contains(addr, ":") {
return strings.Split(addr, ":")[0]
}
return addr
}

func (r *ValkeyReconciler) balanceNodes(ctx context.Context, valkey *hyperv1.Valkey, oldnodes, newnodes int32) error {
logger := log.FromContext(ctx)

password, err := r.upsertSecret(ctx, valkey, true)
if err != nil {
return err
}

vClient, err := valkeyClient.NewClient(valkeyClient.ClientOption{InitAddress: []string{valkey.Name + "." + valkey.Namespace + ".svc:6379"}, Password: password})
if err != nil {
logger.Error(err, "failed to create valkey client", "valkey", valkey.Name, "namespace", valkey.Namespace)
return err
}
defer vClient.Close()
if err := vClient.Do(ctx, vClient.B().Ping().Build()).Error(); err != nil {
logger.Error(err, "failed to ping valkey", "valkey", valkey.Name, "namespace", valkey.Namespace)
return err
}

logger.Info("balancing nodes", "valkey", valkey.Name, "namespace", valkey.Namespace)
if oldnodes == newnodes {
return nil
}
nodes, err := vClient.Do(ctx, vClient.B().ClusterNodes().Build()).ToString()
if err != nil {
logger.Error(err, "failed to get nodes", "valkey", valkey.Name, "namespace", valkey.Namespace)
return err
}
ids := map[string]string{}
for _, node := range strings.Split(nodes, "\n") {
if node == "" {
continue
}
line := strings.Split(node, " ")
id := strings.ReplaceAll(line[0], "txt:", "")
addr := removePort(line[1])
addrs, err := net.LookupAddr(addr)
if err != nil {
logger.Error(err, "failed to lookup addr", "valkey", valkey.Name, "namespace", valkey.Namespace, "addr", addr)
}
hostname := strings.Split(addrs[0], ".")[0]
//namespace := strings.Split(addrs[0], ".")[1]
ids[hostname] = id
}
if oldnodes > newnodes {
for i := oldnodes - 1; i >= newnodes; i-- { // remove nodes
if _, ok := ids[fmt.Sprintf("%s-%d", valkey.Name, i)]; !ok {
logger.Info("node not found", "valkey", valkey.Name, "namespace", valkey.Namespace, "node", fmt.Sprintf("%s-%d", valkey.Name, i))
continue
}
if err := vClient.Do(ctx, vClient.B().ClusterForget().NodeId(ids[fmt.Sprintf("%s-%d", valkey.Name, i)]).Build()).Error(); err != nil {
logger.Error(err, "failed to forget node", "valkey", valkey.Name, "namespace", valkey.Namespace)
return err
}
}
} else {
for i := oldnodes; i < newnodes; i++ { // add nodes
name := fmt.Sprintf("%s-%d", valkey.Name, i)
if err := r.waitForPod(ctx, name, valkey.Namespace); err != nil {
logger.Error(err, "failed to wait for pod", "valkey", valkey.Name, "namespace", valkey.Namespace, "node", name)
return err
}
addr, err := net.LookupHost(fmt.Sprintf("%s.%s-headless.%s.svc", name, valkey.Name, valkey.Namespace))
if err != nil {
logger.Error(err, "failed to lookup host", "valkey", valkey.Name, "namespace", valkey.Namespace)
return err
}
if err := vClient.Do(ctx, vClient.B().ClusterMeet().Ip(addr[0]).Port(6379).Build()).Error(); err != nil {
logger.Error(err, "failed to meet node", "valkey", valkey.Name, "namespace", valkey.Namespace, "node", name)
return err
}
}
}
return nil
}

func (r *ValkeyReconciler) waitForPod(ctx context.Context, name, namespace string) error {
logger := log.FromContext(ctx)

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
}
var tries int
for {
if err := r.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, pod); err != nil {
logger.Error(err, "failed fetching pod", "name", name, "namespace", namespace)
continue
}
if pod.Status.Phase == corev1.PodRunning {
break
}
time.Sleep(time.Second * 3)
tries++
if tries > 15 {
return fmt.Errorf("pod %s/%s is not running", namespace, name)
}
}
return nil
}

func getMasterNodes(valkey *hyperv1.Valkey) string {
var nodes []string
for i := 0; i < int(valkey.Spec.MasterNodes)*(int(valkey.Spec.Replicas)+1); i++ {
Expand Down Expand Up @@ -665,6 +776,31 @@ fi
r.Recorder.Event(valkey, "Normal", "Created",
fmt.Sprintf("StatefulSet %s/%s is created", valkey.Namespace, valkey.Name))
}

err := r.Get(ctx, types.NamespacedName{Namespace: valkey.Namespace, Name: valkey.Name}, sts)
if err != nil && errors.IsNotFound(err) {
if err := r.Create(ctx, sts); err != nil {
logger.Error(err, "failed to update statefulset", "valkey", valkey.Name, "namespace", valkey.Namespace)
return err
}
r.Recorder.Event(valkey, "Normal", "Created",
fmt.Sprintf("StatefulSet %s/%s is created", valkey.Namespace, valkey.Name))
} else if err != nil {
logger.Error(err, "failed fetching statefulset", "valkey", valkey.Name, "namespace", valkey.Namespace)
return err
}

if *sts.Spec.Replicas != valkey.Spec.MasterNodes {
oldnodes := *sts.Spec.Replicas
sts.Spec.Replicas = &valkey.Spec.MasterNodes
if err := r.Update(ctx, sts); err != nil {
logger.Error(err, "failed to update statefulset", "valkey", valkey.Name, "namespace", valkey.Namespace)
return err
}
r.Recorder.Event(valkey, "Normal", "Updated", fmt.Sprintf("StatefulSet %s/%s is updated (replicas)", valkey.Namespace, valkey.Name))
r.balanceNodes(ctx, valkey, oldnodes, *sts.Spec.Replicas)

Check failure on line 801 in internal/controller/valkey_controller.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `r.balanceNodes` is not checked (errcheck)
}

return nil
}

Expand Down

0 comments on commit 9478a82

Please sign in to comment.