Skip to content

Commit

Permalink
debug adding nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
dmolik committed Aug 2, 2024
1 parent c3412aa commit 4319a0f
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 2 deletions.
8 changes: 8 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ rules:
- patch
- update
- watch
- apiGroups:
- ""
resources:
- pods
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
Expand Down
8 changes: 8 additions & 0 deletions dist/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,14 @@ rules:
- patch
- update
- watch
- apiGroups:
- ""
resources:
- pods
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
Expand Down
55 changes: 53 additions & 2 deletions internal/controller/valkey_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ var scripts embed.FS
// +kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups="",resources=serviceaccounts,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch
// +kubebuilder:rbac:groups="apps",resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=events,verbs=create;patch

Expand Down Expand Up @@ -443,16 +444,49 @@ func (r *ValkeyReconciler) balanceNodes(ctx context.Context, valkey *hyperv1.Val
r.Recorder.Event(valkey, "Normal", "Updated", fmt.Sprintf("Scaling out cluster nodes %s/%s", valkey.Namespace, valkey.Name))
for i := oldnodes; i < newnodes; i++ { // add nodes
name := fmt.Sprintf("%s-%d", valkey.Name, i)
logger.Info("adding node", "valkey", valkey.Name, "namespace", valkey.Namespace, "node", name)
if err := r.waitForPod(ctx, name, valkey.Namespace); err != nil {
logger.Error(err, "failed to wait for pod", "valkey", valkey.Name, "namespace", valkey.Namespace, "node", name)
return err
}
addr, err := net.LookupHost(fmt.Sprintf("%s.%s-headless.%s.svc", name, valkey.Name, valkey.Namespace))
addr, err := r.getPodIp(ctx, name, valkey.Namespace)
if err != nil {
logger.Error(err, "failed to lookup host", "valkey", valkey.Name, "namespace", valkey.Namespace)
return err
}
if err := vClient.Do(ctx, vClient.B().ClusterMeet().Ip(addr[0]).Port(6379).Build()).Error(); err != nil {
var dial int
for {
network, err := net.Dial("tcp", addr+":6379")
if err != nil {
network.Close()

Check failure on line 461 in internal/controller/valkey_controller.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `network.Close` is not checked (errcheck)
time.Sleep(time.Second * 2)
dial++
if dial > 60 {
logger.Error(err, "failed to dial", "valkey", valkey.Name, "namespace", valkey.Namespace)
break
}
continue
}
if network != nil {
network.Close()

Check failure on line 471 in internal/controller/valkey_controller.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `network.Close` is not checked (errcheck)
} else {
time.Sleep(time.Second * 2)
dial++
if dial > 60 {
logger.Error(err, "failed to dial", "valkey", valkey.Name, "namespace", valkey.Namespace)
break
}
continue
}
break
}
if dial > 60 {
logger.Error(err, "failed to dial", "valkey", valkey.Name, "namespace", valkey.Namespace)
continue
}
res, err := vClient.Do(ctx, vClient.B().ClusterMeet().Ip(addr).Port(6379).Build()).ToString()
logger.Info("meeting node "+res, "valkey", valkey.Name, "namespace", valkey.Namespace, "node", name)
if err != nil {
logger.Error(err, "failed to meet node", "valkey", valkey.Name, "namespace", valkey.Namespace, "node", name)
return err
}
Expand All @@ -461,6 +495,23 @@ func (r *ValkeyReconciler) balanceNodes(ctx context.Context, valkey *hyperv1.Val
return nil
}

func (r *ValkeyReconciler) getPodIp(ctx context.Context, name, namespace string) (string, error) {
logger := log.FromContext(ctx)

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
}

if err := r.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, pod); err != nil {
logger.Error(err, "failed fetching pod", "name", name, "namespace", namespace)
return "", err
}
return pod.Status.PodIP, nil
}

func (r *ValkeyReconciler) waitForPod(ctx context.Context, name, namespace string) error {
logger := log.FromContext(ctx)

Expand Down

0 comments on commit 4319a0f

Please sign in to comment.