diff --git a/manifests/claudie/kustomization.yaml b/manifests/claudie/kustomization.yaml index 695daac6f..261705c3f 100644 --- a/manifests/claudie/kustomization.yaml +++ b/manifests/claudie/kustomization.yaml @@ -57,18 +57,18 @@ apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization images: - name: ghcr.io/berops/claudie/ansibler - newTag: 6928ace-3165 + newTag: f95dd2b-3176 - name: ghcr.io/berops/claudie/autoscaler-adapter - newTag: 6928ace-3165 + newTag: f95dd2b-3176 - name: ghcr.io/berops/claudie/builder - newTag: 6928ace-3165 + newTag: f95dd2b-3176 - name: ghcr.io/berops/claudie/claudie-operator - newTag: 6928ace-3165 + newTag: f95dd2b-3176 - name: ghcr.io/berops/claudie/kube-eleven - newTag: 6928ace-3165 + newTag: f95dd2b-3176 - name: ghcr.io/berops/claudie/kuber - newTag: 6928ace-3165 + newTag: f95dd2b-3176 - name: ghcr.io/berops/claudie/manager - newTag: 6928ace-3165 + newTag: f95dd2b-3176 - name: ghcr.io/berops/claudie/terraformer - newTag: d06c57d-3175 + newTag: f95dd2b-3176 diff --git a/manifests/testing-framework/kustomization.yaml b/manifests/testing-framework/kustomization.yaml index 034deed37..9f6446d1b 100644 --- a/manifests/testing-framework/kustomization.yaml +++ b/manifests/testing-framework/kustomization.yaml @@ -91,4 +91,4 @@ secretGenerator: images: - name: ghcr.io/berops/claudie/testing-framework - newTag: d06c57d-3175 + newTag: f95dd2b-3176 diff --git a/proto/pb/spec/utils.go b/proto/pb/spec/utils.go index 16ebc140f..2569f0f82 100644 --- a/proto/pb/spec/utils.go +++ b/proto/pb/spec/utils.go @@ -163,3 +163,22 @@ func (r *TemplateRepository) MustExtractTargetPath() string { r.Path, ) } + +func (n *NodePool) Zone() string { + var sn string + + switch { + case n.GetDynamicNodePool() != nil: + sn = n.GetDynamicNodePool().Provider.SpecName + case n.GetStaticNodePool() != nil: + sn = StaticNodepoolInfo_STATIC_PROVIDER.String() + default: + panic("unsupported nodepool type") + } + + if sn == "" { + panic("no zone specified") + } + + return fmt.Sprintf("%s-zone", sn) +} diff --git a/services/kuber/server/domain/usecases/patch_nodes.go b/services/kuber/server/domain/usecases/patch_nodes.go index 5acb9966d..9de866b31 100644 --- a/services/kuber/server/domain/usecases/patch_nodes.go +++ b/services/kuber/server/domain/usecases/patch_nodes.go @@ -21,16 +21,16 @@ func (u *Usecases) PatchNodes(ctx context.Context, request *pb.PatchNodesRequest return nil, fmt.Errorf("error while patching providerID on nodes for %s : %w", clusterID, err) } - if err := patcher.PatchLabels(); err != nil { - logger.Err(err).Msgf("Error while patching node labels") - return nil, fmt.Errorf("error while patching labels on nodes for %s : %w", clusterID, err) - } - if err := patcher.PatchAnnotations(); err != nil { logger.Err(err).Msgf("Error while patching node annotations") return nil, fmt.Errorf("error while patching annotations on nodes for %s : %w", clusterID, err) } + if err := patcher.PatchLabels(); err != nil { + logger.Err(err).Msgf("Error while patching node labels") + return nil, fmt.Errorf("error while patching labels on nodes for %s : %w", clusterID, err) + } + if err := patcher.PatchTaints(); err != nil { logger.Err(err).Msgf("Error while patching node taints") return nil, fmt.Errorf("error while patching taints on nodes for %s : %w", clusterID, err) diff --git a/services/kuber/server/domain/utils/longhorn/longhorn.go b/services/kuber/server/domain/utils/longhorn/longhorn.go index b8c342a5a..b8e3f0ee5 100644 --- a/services/kuber/server/domain/utils/longhorn/longhorn.go +++ b/services/kuber/server/domain/utils/longhorn/longhorn.go @@ -18,10 +18,10 @@ import ( "github.com/rs/zerolog/log" ) -// Cluster - k8s cluster where longhorn will be set up -// Directory - directory where to create storage class manifest type Longhorn struct { - Cluster *spec.K8Scluster + // Cluster where longhorn will be set up + Cluster *spec.K8Scluster + // Directory where to create storage class manifest Directory string } @@ -37,146 +37,128 @@ type enableCA struct { const ( longhornYaml = "services/kuber/server/manifests/longhorn.yaml" longhornDefaultsYaml = "services/kuber/server/manifests/claudie-defaults.yaml" - longhornEnableCaTpl = "enable-ca.goyaml" - storageManifestTpl = "storage-class.goyaml" defaultSC = "longhorn" - storageClassLabel = "claudie.io/provider-instance" + storageClassLabel = "claudie.io/storage-class" ) // SetUp function will set up the longhorn on the k8s cluster saved in l.Longhorn func (l *Longhorn) SetUp() error { - kubectl := kubectl.Kubectl{Kubeconfig: l.Cluster.GetKubeconfig(), MaxKubectlRetries: 3} - // apply longhorn.yaml and settings - kubectl.Stdout = comm.GetStdOut(l.Cluster.ClusterInfo.Id()) - kubectl.Stderr = comm.GetStdErr(l.Cluster.ClusterInfo.Id()) - - // Apply longhorn manifests after nodes are annotated. - if err := l.applyManifests(kubectl); err != nil { - return fmt.Errorf("error while applying longhorn manifests in cluster %s : %w", l.Cluster.ClusterInfo.Name, err) + k := kubectl.Kubectl{ + Kubeconfig: l.Cluster.GetKubeconfig(), + MaxKubectlRetries: 3, } + k.Stdout = comm.GetStdOut(l.Cluster.ClusterInfo.Id()) + k.Stderr = comm.GetStdErr(l.Cluster.ClusterInfo.Id()) - //get existing sc so we can delete them if we do not need them any more - existingSC, err := l.getStorageClasses(kubectl) + current, err := l.currentClaudieStorageClasses(k) if err != nil { return fmt.Errorf("error while getting existing storage classes for %s : %w", l.Cluster.ClusterInfo.Name, err) } - //save applied sc so we can find a difference with existing ones and remove the redundant ones - var appliedSC []string - //load the templates - template := templateUtils.Templates{Directory: l.Directory} + if err := k.KubectlApply(longhornYaml); err != nil { + return fmt.Errorf("error while applying longhorn.yaml in %s : %w", l.Directory, err) + } + + if err := k.KubectlApply(longhornDefaultsYaml); err != nil { + return fmt.Errorf("error while applying claudie default settings for longhorn in %s : %w", l.Directory, err) + } + + template := templateUtils.Templates{ + Directory: l.Directory, + } + storageTpl, err := templateUtils.LoadTemplate(templates.StorageClassTemplate) if err != nil { return err } + enableCATpl, err := templateUtils.LoadTemplate(templates.EnableClusterAutoscalerTemplate) if err != nil { return err } - // Apply setting about CA - enableCa := enableCA{fmt.Sprintf("%v", l.Cluster.AnyAutoscaledNodePools())} - if setting, err := template.GenerateToString(enableCATpl, enableCa); err != nil { - return err - } else if err := kubectl.KubectlApplyString(setting); err != nil { - return fmt.Errorf("error while applying CA setting for longhorn in cluster %s : %w", l.Cluster.ClusterInfo.Name, err) + ca := enableCA{ + fmt.Sprintf("%v", l.Cluster.AnyAutoscaledNodePools()), } - // get real nodes names in a case when provider appends some string to the set name - realNodesInfo, err := kubectl.KubectlGetNodeNames() + setting, err := template.GenerateToString(enableCATpl, ca) if err != nil { return err } - realNodeNames := strings.Split(string(realNodesInfo), "\n") - // tag nodes based on the zones - for providerInstance, nps := range nodepools.ByProviderSpecName(l.Cluster.ClusterInfo.NodePools) { - zoneName := sanitise.String(fmt.Sprintf("%s-zone", providerInstance)) - storageClassName := fmt.Sprintf("longhorn-%s", zoneName) - //flag to determine whether we need to create storage class or not - isWorkerNodeProvider := false + + if err := k.KubectlApplyString(setting); err != nil { + return fmt.Errorf("error while applying CA setting for longhorn in cluster %s: %w", l.Cluster.ClusterInfo.Name, err) + } + + var desired []string + for provider, nps := range nodepools.ByProviderSpecName(l.Cluster.ClusterInfo.NodePools) { + wk := false for _, np := range nps { - // tag worker nodes from nodepool based on the future zone - // NOTE: the master nodes are by default set to NoSchedule, therefore we do not need to annotate them - // If in the future, we add functionality to allow scheduling on master nodes, the longhorn will need add the annotation if !np.IsControl { - isWorkerNodeProvider = true - for _, node := range np.GetNodes() { - nodeName := strings.TrimPrefix(node.Name, fmt.Sprintf("%s-", l.Cluster.ClusterInfo.Id())) - annotation := fmt.Sprintf("node.longhorn.io/default-node-tags='[\"%s\"]'", zoneName) - - i := slices.Index(realNodeNames, nodeName) - if i < 0 { - log.Warn().Str("cluster", l.Cluster.ClusterInfo.Id()).Msgf("Node %s was not found in cluster %v", nodeName, realNodeNames) - continue - } - // Add tag to the node via kubectl annotate, use --overwrite to avoid getting error of already tagged node - if err := kubectl.KubectlAnnotate("node", realNodeNames[i], annotation, "--overwrite"); err != nil { - return fmt.Errorf("error while annotating the node %s from cluster %s via kubectl annotate : %w", realNodeNames[i], l.Cluster.ClusterInfo.Name, err) - } - } + wk = true + break } } - if isWorkerNodeProvider { - // create storage class manifest based on zones from templates - zoneData := zoneData{ZoneName: zoneName, StorageClassName: storageClassName} - manifest := fmt.Sprintf("%s.yaml", storageClassName) - err := template.Generate(storageTpl, manifest, zoneData) - if err != nil { + + if wk { + zn := sanitise.String(fmt.Sprintf("%s-zone", provider)) + sc := fmt.Sprintf("longhorn-%s", zn) + manifest := fmt.Sprintf("%s.yaml", sc) + data := zoneData{ + ZoneName: zn, + StorageClassName: sc, + } + + if err := template.Generate(storageTpl, manifest, data); err != nil { return fmt.Errorf("error while generating %s manifest : %w", manifest, err) } - //update the kubectl working directory - kubectl.Directory = l.Directory - // apply manifest - err = kubectl.KubectlApply(manifest, "") - if err != nil { + + k.Directory = l.Directory + if err := k.KubectlApply(manifest, ""); err != nil { return fmt.Errorf("error while applying %s manifest : %w", manifest, err) } - appliedSC = append(appliedSC, storageClassName) + desired = append(desired, sc) } } - err = l.deleteOldStorageClasses(existingSC, appliedSC, kubectl) - if err != nil { + if err := l.deleteUnused(current, desired, k); err != nil { return err } - // Clean up if err := os.RemoveAll(l.Directory); err != nil { return fmt.Errorf("error while deleting files %s : %w", l.Directory, err) } + return nil } -// getStorageClasses returns a slice of names of storage classes currently deployed in cluster -// returns slice of storage classes if successful, error otherwise -func (l *Longhorn) getStorageClasses(kc kubectl.Kubectl) (result []string, err error) { - //define output struct +// currentClaudieStorageClasses returns a slice of names of claudie related storage classes currently deployed in cluster +func (l *Longhorn) currentClaudieStorageClasses(kc kubectl.Kubectl) (result []string, err error) { type KubectlOutputJSON struct { APIVersion string `json:"apiVersion"` Items []map[string]interface{} `json:"items"` Kind string `json:"kind"` Metadata map[string]interface{} `json:"metadata"` } - //get existing storage classes + out, err := kc.KubectlGet("sc", "-o", "json") if err != nil { return nil, fmt.Errorf("error while getting storage classes from cluster %s : %w", l.Cluster.ClusterInfo.Name, err) } - //no storage class defined yet + if strings.Contains(string(out), "No resources found") { return result, nil } - //parse output + var parsedJSON KubectlOutputJSON - err = json.Unmarshal(out, &parsedJSON) - if err != nil { + if err := json.Unmarshal(out, &parsedJSON); err != nil { return nil, fmt.Errorf("error while unmarshalling kubectl output for cluster %s : %w", l.Cluster.ClusterInfo.Name, err) } - //return name of the storage classes + for _, sc := range parsedJSON.Items { metadata := sc["metadata"].(map[string]interface{}) name := metadata["name"].(string) - //check if storage class has a claudie label + if labels, ok := metadata["labels"]; ok { labelsMap := labels.(map[string]interface{}) if _, ok := labelsMap[storageClassLabel]; ok { @@ -184,44 +166,23 @@ func (l *Longhorn) getStorageClasses(kc kubectl.Kubectl) (result []string, err e } } } + return result, nil } -// deleteOldStorageClasses deletes storage classes, which does not have a worker nodes behind it -func (l *Longhorn) deleteOldStorageClasses(existing, applied []string, kc kubectl.Kubectl) error { +// deleteUnused deleted unused storage classes previously created by claudie. +func (l *Longhorn) deleteUnused(existing, applied []string, kc kubectl.Kubectl) error { for _, ex := range existing { if ex == defaultSC { //ignore the default sc continue } - found := false - for _, app := range applied { - if ex == app { - found = true - break - } - } - //if not found in applied, delete the sc - if !found { - err := kc.KubectlDeleteResource("sc", ex) + if !slices.Contains(applied, ex) { log.Debug().Msgf("Deleting storage class %s", ex) - if err != nil { + if err := kc.KubectlDeleteResource("sc", ex); err != nil { return fmt.Errorf("error while deleting storage class %s due to no nodes backing it : %w", ex, err) } } } return nil } - -// applyManifests applies longhorn manifests to the managed cluster. -func (l *Longhorn) applyManifests(kc kubectl.Kubectl) error { - // Apply longhorn.yaml - if err := kc.KubectlApply(longhornYaml); err != nil { - return fmt.Errorf("error while applying longhorn.yaml in %s : %w", l.Directory, err) - } - // Apply longhorn setting - if err := kc.KubectlApply(longhornDefaultsYaml); err != nil { - return fmt.Errorf("error while applying claudie default settings for longhorn in %s : %w", l.Directory, err) - } - return nil -} diff --git a/services/kuber/server/domain/utils/nodes/delete.go b/services/kuber/server/domain/utils/nodes/delete.go index 4a1e602f6..e68a6816e 100644 --- a/services/kuber/server/domain/utils/nodes/delete.go +++ b/services/kuber/server/domain/utils/nodes/delete.go @@ -1,13 +1,12 @@ package nodes import ( + "errors" "fmt" "slices" "strings" - "time" comm "github.com/berops/claudie/internal/command" - "github.com/berops/claudie/internal/concurrent" "github.com/berops/claudie/internal/kubectl" "github.com/berops/claudie/internal/loggerutils" "github.com/berops/claudie/internal/nodepools" @@ -16,8 +15,7 @@ import ( ) const ( - longhornNamespace = "longhorn-system" - newReplicaCreationTimeout = 10 * time.Second + longhornNamespace = "longhorn-system" ) type etcdPodInfo struct { @@ -85,35 +83,31 @@ func (d *Deleter) DeleteNodes() (*spec.K8Scluster, error) { } } - // Cordon worker nodes to prevent any new pods/volume replicas being scheduled there - err = concurrent.Exec(d.workerNodes, func(_ int, worker string) error { - i := slices.Index(realNodeNames, worker) - if i < 0 { - d.logger.Warn().Msgf("Node name %s not found in cluster.", worker) - return nil - } - return kubectl.KubectlCordon(worker) - }) - if err != nil { - return nil, fmt.Errorf("error while cordoning worker nodes from cluster %s which were marked for deletion : %w", d.clusterPrefix, err) - } - // Remove worker nodes sequentially to minimise risk of fault when replicating PVC + var errDel error for _, worker := range d.workerNodes { - // Assure replication of storage - if err := d.assureReplication(kubectl, worker); err != nil { - return nil, fmt.Errorf("error while making sure storage is replicated before deletion on cluster %s : %w", d.clusterPrefix, err) + if !slices.Contains(realNodeNames, worker) { + d.logger.Warn().Msgf("Node name that contains %s not found in cluster", worker) + continue } - // Delete worker nodes from nodes.longhorn.io - if err := d.deleteFromLonghorn(kubectl, worker); err != nil { - return nil, fmt.Errorf("error while deleting nodes.longhorn.io for %s : %w", d.clusterPrefix, err) + + if err := kubectl.KubectlCordon(worker); err != nil { + errDel = errors.Join(errDel, fmt.Errorf("error while cordon worker node %s from cluster %s: %w", worker, d.clusterPrefix, err)) + continue } - // Delete worker nodes + if err := d.deleteNodesByName(kubectl, worker, realNodeNames); err != nil { - return nil, fmt.Errorf("error while deleting nodes from worker nodes for %s : %w", d.clusterPrefix, err) + errDel = errors.Join(errDel, fmt.Errorf("error while deleting node %s from cluster %s: %w", worker, d.clusterPrefix, err)) + continue } - // NOTE: Might need to manually verify if the volume got detached. - // https://github.com/berops/claudie/issues/784 + + if err := removeReplicasOnDeletedNode(kubectl, worker); err != nil { + // not a fatal error. + d.logger.Warn().Msgf("failed to delete unused replicas from replicas.longhorn.io, after node %s deletion: %s", worker, err) + } + } + if errDel != nil { + return nil, errDel } // Update the current cluster @@ -121,27 +115,22 @@ func (d *Deleter) DeleteNodes() (*spec.K8Scluster, error) { return d.cluster, nil } -// deleteNodesByName deletes node from cluster by performing -// kubectl delete node -// return nil if successful, error otherwise +// deleteNodesByName deletes node from the k8s cluster. func (d *Deleter) deleteNodesByName(kc kubectl.Kubectl, nodeName string, realNodeNames []string) error { - i := slices.Index(realNodeNames, nodeName) - if i < 0 { + if !slices.Contains(realNodeNames, nodeName) { d.logger.Warn().Msgf("Node name that contains %s not found in cluster", nodeName) return nil } - name := realNodeNames[i] - - d.logger.Info().Msgf("Deleting node %s from k8s cluster", name) + d.logger.Info().Msgf("Deleting node %s from k8s cluster", nodeName) //kubectl drain --ignore-daemonsets --delete-emptydir-data - if err := kc.KubectlDrain(name); err != nil { + if err := kc.KubectlDrain(nodeName); err != nil { return fmt.Errorf("error while draining node %s from cluster %s : %w", nodeName, d.clusterPrefix, err) } //kubectl delete node - if err := kc.KubectlDeleteResource("nodes", name); err != nil { + if err := kc.KubectlDeleteResource("nodes", nodeName); err != nil { return fmt.Errorf("error while deleting node %s from cluster %s : %w", nodeName, d.clusterPrefix, err) } @@ -193,69 +182,6 @@ nodes: } } -// deleteFromLonghorn will delete node from nodes.longhorn.io -// return nil if successful, error otherwise -func (d *Deleter) deleteFromLonghorn(kc kubectl.Kubectl, worker string) error { - // check if the resource is present before deleting. - if logs, err := kc.KubectlGet(fmt.Sprintf("nodes.longhorn.io %s", worker), "-n", longhornNamespace); err != nil { - // This is not the ideal path of checking for a NotFound error, this is only done as we shell out to run kubectl. - if strings.Contains(string(logs), "NotFound") { - d.logger.Warn().Msgf("worker node: %s not found, assuming it was deleted.", worker) - return nil - } - } - - d.logger.Info().Msgf("Deleting node %s from nodes.longhorn.io from cluster", worker) - if err := kc.KubectlDeleteResource("nodes.longhorn.io", worker, "-n", longhornNamespace); err != nil { - return fmt.Errorf("error while deleting node %s from nodes.longhorn.io from cluster %s : %w", worker, d.clusterPrefix, err) - } - return nil -} - -// assureReplication tries to assure, that replicas for each longhorn volume are migrated to nodes, which will remain in the cluster. -func (d *Deleter) assureReplication(kc kubectl.Kubectl, worker string) error { - // Get replicas and volumes as they can be scheduled on next node, which will be deleted. - replicas, err := getReplicasMap(kc) - if err != nil { - return fmt.Errorf("error while getting replicas from cluster %s : %w", d.clusterPrefix, err) - } - volumes, err := getVolumes(kc) - if err != nil { - return fmt.Errorf("error while getting volumes from cluster %s : %w", d.clusterPrefix, err) - } - if reps, ok := replicas[worker]; ok { - for _, r := range reps { - // Try to force creation of a new replicas from node, which will be deleted. - if v, ok := volumes[r.Spec.VolumeName]; ok { - // Increase number of replicas in volume. - if err := increaseReplicaCount(v, kc); err != nil { - return fmt.Errorf("error while increasing number of replicas in volume %s from cluster %s : %w", v.Metadata.Name, d.clusterPrefix, err) - } - // Wait newReplicaCreationTimeout for Longhorn to create new replica. - d.logger.Info().Msgf("Waiting %.0f seconds for new replicas to be scheduled if possible for node %s of cluster", newReplicaCreationTimeout.Seconds(), worker) - time.Sleep(newReplicaCreationTimeout) - - // Verify all current replicas are running correctly - if err := verifyAllReplicasSetUp(v.Metadata.Name, kc, d.logger); err != nil { - return fmt.Errorf("error while checking if all longhorn replicas for volume %s are running : %w", v.Metadata.Name, err) - } - d.logger.Info().Msgf("Replication for volume %s has been set up", v.Metadata.Name) - - // Decrease number of replicas in volume -> original state. - if err := revertReplicaCount(v, kc); err != nil { - return fmt.Errorf("error while increasing number of replicas in volume %s cluster %s : %w", v.Metadata.Name, d.clusterPrefix, err) - } - // Delete old replica, on to-be-deleted node. - d.logger.Debug().Str("node", r.Status.OwnerID).Msgf("Deleting replica %s from node %s", r.Metadata.Name, r.Status.OwnerID) - if err := deleteReplica(r, kc); err != nil { - return err - } - } - } - } - return nil -} - // getMainMaster iterates over all control nodes in cluster and returns API EP node. If none defined with this type, // function returns any master node which will not be deleted. // return API EP node if successful, nil otherwise diff --git a/services/kuber/server/domain/utils/nodes/patch.go b/services/kuber/server/domain/utils/nodes/patch.go index 79161a367..926bd2621 100644 --- a/services/kuber/server/domain/utils/nodes/patch.go +++ b/services/kuber/server/domain/utils/nodes/patch.go @@ -2,6 +2,7 @@ package nodes import ( "encoding/json" + "errors" "fmt" "strings" @@ -94,22 +95,64 @@ func (p *Patcher) PatchLabels() error { } func (p *Patcher) PatchAnnotations() error { - var err error + var errAll error for _, np := range p.desiredNodepools { - nodeAnnotations := np.Annotations + annotations := np.Annotations + if annotations == nil { + annotations = make(map[string]string) + } + // annotate worker nodes with provider spec name to match the storage classes + // created in the SetupLonghorn step. + // NOTE: the master nodes are by default set to NoSchedule, therefore we do not need to annotate them + // If in the future, if add functionality to allow scheduling on master nodes, longhorn will need to add the annotation. + if !np.IsControl { + k := "node.longhorn.io/default-node-tags" + tags, ok := annotations[k] + if !ok { + tags = "[]" + } + var v []any + if err := json.Unmarshal([]byte(tags), &v); err != nil { + errAll = errors.Join(errAll, fmt.Errorf("nodepool %s has invalid value for annotation %v, expected value to by of type array: %w", np.Name, k, err)) + continue + } + var found bool + for i := range v { + s, ok := v[i].(string) + if !ok { + continue + } + if s == np.Zone() { + found = true + break + } + } + if !found { + v = append(v, np.Zone()) + } + + b, err := json.Marshal(v) + if err != nil { + errAll = errors.Join(errAll, fmt.Errorf("failed to marshal modified annotations for nodepool %s: %w", np.Name, err)) + continue + } + annotations[k] = string(b) + } + for _, node := range np.Nodes { nodeName := strings.TrimPrefix(node.Name, fmt.Sprintf("%s-", p.clusterID)) - patchPath, err1 := buildJSONAnnotationPatch(nodeAnnotations) - if err1 != nil { - return fmt.Errorf("failed to create annotations patch for %s : %w, %w", np.Name, err, err1) + patch, err := buildJSONAnnotationPatch(annotations) + if err != nil { + errAll = errors.Join(errAll, fmt.Errorf("failed to create annotation for node %s: %w", nodeName, err)) + continue } - if err1 := p.kc.KubectlPatch("node", nodeName, patchPath, "--type", "merge"); err1 != nil { - p.logger.Err(err1).Str("node", nodeName).Msgf("Failed to patch annotations on node with path %s", patchPath) - err = fmt.Errorf("error while patching one or more nodes with annotations") + if err := p.kc.KubectlPatch("node", nodeName, patch, "--type", "merge"); err != nil { + errAll = errors.Join(err, fmt.Errorf("error while applying annotations %v for node %s: %w", annotations, nodeName, err)) + continue } } } - return err + return errAll } func (p *Patcher) PatchTaints() error { diff --git a/services/kuber/server/domain/utils/nodes/pvc_replication_utils.go b/services/kuber/server/domain/utils/nodes/pvc_replication_utils.go index 638485bf0..bd55f84f9 100644 --- a/services/kuber/server/domain/utils/nodes/pvc_replication_utils.go +++ b/services/kuber/server/domain/utils/nodes/pvc_replication_utils.go @@ -1,154 +1,63 @@ package nodes import ( - "context" + "errors" "fmt" - "time" "github.com/berops/claudie/internal/kubectl" - "github.com/rs/zerolog" "gopkg.in/yaml.v3" ) -type K8sList[T any] struct { - Items []T `yaml:"items"` +type ReplicaList struct { + Items []LonghornReplica `yaml:"items"` } type LonghornReplica struct { - Metadata Metadata `yaml:"metadata"` - Status Status `yaml:"status"` - Spec ReplicaSpec `yaml:"spec"` -} - -type LonghornVolume struct { - Metadata Metadata `yaml:"metadata"` - Spec VolumeSpec `yaml:"spec"` -} - -type Metadata struct { - Name string `yaml:"name"` -} + Metadata struct { + Name string `yaml:"name"` + } `yaml:"metadata"` -type ReplicaSpec struct { - VolumeName string `yaml:"volumeName"` -} - -type VolumeSpec struct { - NumberOfReplicas int `yaml:"numberOfReplicas"` -} + Status struct { + InstanceManagerName string `yaml:"instanceManagerName"` + CurrentState string `yaml:"currentState"` + Started bool `yaml:"started"` + } `yaml:"status"` -type Status struct { - OwnerID string `yaml:"ownerID"` - CurrentState string `yaml:"currentState"` - Started bool `yaml:"started"` + Spec struct { + NodeID string `yaml:"nodeID"` + FailedAt string `yaml:"failedAt"` + } `yaml:"spec"` } -const ( - replicas = "replicas.longhorn.io" - volumes = "volumes.longhorn.io" - patchNumberOfReplicas = "{\"spec\":{\"numberOfReplicas\":%d}}" - runningState = "running" - replicaRunningCheck = 5 * time.Second - pvcReplicationTimeout = 5 * time.Minute -) - -// getVolumes returns a map[volume name]volume of volumes currently in the cluster. -func getVolumes(kc kubectl.Kubectl) (map[string]LonghornVolume, error) { - out, err := kc.KubectlGet(volumes, "-n", longhornNamespace, "-o", "yaml") +func removeReplicasOnDeletedNode(kc kubectl.Kubectl, node string) error { + out, err := kc.KubectlGet("replicas.longhorn.io", "-n", longhornNamespace, "-o", "yaml") if err != nil { - return nil, fmt.Errorf("failed to list all volumes : %w", err) - } - var volumeList K8sList[LonghornVolume] - if err := yaml.Unmarshal(out, &volumeList); err != nil { - return nil, fmt.Errorf("failed unmarshal kubectl output : %w", err) - } - - m := make(map[string]LonghornVolume, len(volumeList.Items)) - for _, v := range volumeList.Items { - m[v.Metadata.Name] = v - } - return m, nil -} - -// getReplicas returns a map of nodes and slice of replicas they contain. -func getReplicasMap(kc kubectl.Kubectl) (map[string][]LonghornReplica, error) { - replicaList, err := getReplicas(kc) - if err != nil { - return nil, err - } - m := make(map[string][]LonghornReplica, len(replicaList.Items)) - for _, r := range replicaList.Items { - m[r.Status.OwnerID] = append(m[r.Status.OwnerID], r) - } - return m, nil -} - -func verifyAllReplicasSetUp(volumeName string, kc kubectl.Kubectl, logger zerolog.Logger) error { - ticker := time.NewTicker(replicaRunningCheck) - ctx, cancel := context.WithTimeout(context.Background(), pvcReplicationTimeout) - defer cancel() - // Check for the replication status - for { - select { - case <-ticker.C: - if ok, err := verifyAllReplicasRunning(volumeName, kc); err != nil { - logger.Warn().Msgf("Got error while checking for replication status of %s volume : %v", volumeName, err) - logger.Info().Msgf("Retrying check for replication status of %s volume", volumeName) - } else { - if ok { - return nil - } else { - logger.Debug().Msgf("Volume replication is not ready yet, retrying check for replication status of %s volume", volumeName) - } - } - case <-ctx.Done(): - return fmt.Errorf("error while checking the status of volume %s replication : %w", volumeName, ctx.Err()) - } + return fmt.Errorf("failed to list all replicas : %w", err) } -} -// deleteReplica deletes a replica from a node. -func deleteReplica(r LonghornReplica, kc kubectl.Kubectl) error { - return kc.KubectlDeleteResource(replicas, r.Metadata.Name, "-n", longhornNamespace) -} - -// increaseReplicaCount increases number of replicas for longhorn volume by 1, via kubectl patch. -func increaseReplicaCount(v LonghornVolume, kc kubectl.Kubectl) error { - return kc.KubectlPatch(volumes, v.Metadata.Name, fmt.Sprintf(patchNumberOfReplicas, v.Spec.NumberOfReplicas+1), "-n", longhornNamespace, "--type", "merge") -} - -// revertReplicaCount sets the number of replicas for longhorn volume to the original value, taken from the v, via kubectl patch. -func revertReplicaCount(v LonghornVolume, kc kubectl.Kubectl) error { - return kc.KubectlPatch(volumes, v.Metadata.Name, fmt.Sprintf(patchNumberOfReplicas, v.Spec.NumberOfReplicas), "-n", longhornNamespace, "--type", "merge") -} - -// getReplicas returns a List of Longhorn replicas currently in cluster. -func getReplicas(kc kubectl.Kubectl) (K8sList[LonghornReplica], error) { - out, err := kc.KubectlGet(replicas, "-n", longhornNamespace, "-o", "yaml") - if err != nil { - return K8sList[LonghornReplica]{}, fmt.Errorf("failed to list all replicas : %w", err) - } - var replicaList K8sList[LonghornReplica] + var replicaList ReplicaList if err := yaml.Unmarshal(out, &replicaList); err != nil { - return K8sList[LonghornReplica]{}, fmt.Errorf("failed unmarshal kubectl output : %w", err) - } - return replicaList, nil -} - -// verifyAllReplicasRunning returns true, if all replicas for specified volume are in running state. -func verifyAllReplicasRunning(volumeName string, kc kubectl.Kubectl) (bool, error) { - replicaList, err := getReplicas(kc) - if err != nil { - return false, err - } - for _, r := range replicaList.Items { - if r.Spec.VolumeName == volumeName { - // Current state not running, return false. - if !(r.Status.CurrentState == runningState && r.Status.Started) { - return false, nil + return fmt.Errorf("failed unmarshal kubectl output : %w", err) + } + + var errAll error + for _, replica := range replicaList.Items { + // https://github.com/longhorn/longhorn/blob/6cc47ec5e942f33b10f644a5eaf0970b650e27a7/deploy/longhorn.yaml#L3048 + // spec.NodeID is the node where the replica is on, this should + // matched the deleted node. + del := replica.Spec.NodeID == node + del = del && replica.Status.CurrentState == "stopped" + del = del && !replica.Status.Started + del = del && replica.Status.InstanceManagerName == "" + del = del && replica.Spec.FailedAt != "" + + if del { + err := kc.KubectlDeleteResource("replicas.longhorn.io", replica.Metadata.Name, "-n", longhornNamespace) + if err != nil { + errAll = errors.Join(errAll, fmt.Errorf("failed to delete replica %s: %w", replica.Metadata.Name, err)) } } } - // All replicas for specific volume are running, return true. - return true, nil + + return errAll } diff --git a/services/kuber/server/manifests/claudie-defaults.yaml b/services/kuber/server/manifests/claudie-defaults.yaml index e6a7e2208..91534d934 100644 --- a/services/kuber/server/manifests/claudie-defaults.yaml +++ b/services/kuber/server/manifests/claudie-defaults.yaml @@ -6,6 +6,14 @@ metadata: namespace: longhorn-system value: claudie.io/node-type:compute --- +# Make sure longhorn is using block on eviction if last replica is on the node deleted. +apiVersion: longhorn.io/v1beta1 +kind: Setting +metadata: + name: node-drain-policy + namespace: longhorn-system +value: block-for-eviction-if-last-replica +--- # Default path to use for storing data on a host apiVersion: longhorn.io/v1beta1 kind: Setting