Skip to content

Commit

Permalink
build PreservedLabelState when triggering evition
Browse files Browse the repository at this point in the history
Signed-off-by: changzhen <[email protected]>
  • Loading branch information
XiShanYongYe-Chang committed Nov 27, 2024
1 parent f168061 commit 2dc5a44
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 14 deletions.
3 changes: 3 additions & 0 deletions pkg/apis/work/v1alpha2/binding_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,9 @@ type GracefulEvictionTask struct {
// Populated by the system. Read-only.
// +optional
CreationTimestamp *metav1.Time `json:"creationTimestamp,omitempty"`

// ClusterBeforeFailover records the clusters where running the application before failover.
ClusterBeforeFailover []string `json:"clusterBeforeFailover,omitempty"`
}

// BindingSnapshot is a snapshot of a ResourceBinding or ClusterResourceBinding.
Expand Down
44 changes: 31 additions & 13 deletions pkg/apis/work/v1alpha2/binding_types_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ import policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"

// TaskOptions represents options for GracefulEvictionTasks.
type TaskOptions struct {
purgeMode policyv1alpha1.PurgeMode
producer string
reason string
message string
gracePeriodSeconds *int32
suppressDeletion *bool
purgeMode policyv1alpha1.PurgeMode
producer string
reason string
message string
gracePeriodSeconds *int32
suppressDeletion *bool
preservedLabelState map[string]string
clustersBeforeFailover []string
}

// Option configures a TaskOptions
Expand Down Expand Up @@ -83,6 +85,20 @@ func WithSuppressDeletion(suppressDeletion *bool) Option {
}
}

// WithPreservedLabelState sets the preservedLabelState for TaskOptions
func WithPreservedLabelState(preservedLabelState map[string]string) Option {
return func(o *TaskOptions) {
o.preservedLabelState = preservedLabelState
}
}

// WithClustersBeforeFailover sets the clustersBeforeFailover for TaskOptions
func WithClustersBeforeFailover(clustersBeforeFailover []string) Option {
return func(o *TaskOptions) {
o.clustersBeforeFailover = clustersBeforeFailover
}
}

// TargetContains checks if specific cluster present on the target list.
func (s *ResourceBindingSpec) TargetContains(name string) bool {
for i := range s.Clusters {
Expand Down Expand Up @@ -163,13 +179,15 @@ func (s *ResourceBindingSpec) GracefulEvictCluster(name string, options *TaskOpt
// build eviction task
evictingCluster := evictCluster.DeepCopy()
evictionTask := GracefulEvictionTask{
FromCluster: evictingCluster.Name,
PurgeMode: options.purgeMode,
Reason: options.reason,
Message: options.message,
Producer: options.producer,
GracePeriodSeconds: options.gracePeriodSeconds,
SuppressDeletion: options.suppressDeletion,
FromCluster: evictingCluster.Name,
PurgeMode: options.purgeMode,
Reason: options.reason,
Message: options.message,
Producer: options.producer,
GracePeriodSeconds: options.gracePeriodSeconds,
SuppressDeletion: options.suppressDeletion,
PreservedLabelState: options.preservedLabelState,
ClusterBeforeFailover: options.clustersBeforeFailover,
}
if evictingCluster.Replicas > 0 {
evictionTask.Replicas = &evictingCluster.Replicas
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ limitations under the License.
package applicationfailover

import (
"bytes"
"context"
"encoding/json"
"fmt"
"math"
"time"
Expand All @@ -27,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/jsonpath"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
controllerruntime "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -156,12 +159,25 @@ func (c *RBApplicationFailoverController) evictBinding(binding *workv1alpha2.Res
for _, cluster := range clusters {
switch binding.Spec.Failover.Application.PurgeMode {
case policyv1alpha1.Graciously:
preservedLabelState, err := c.calculatePreservedLabelState(binding, cluster)
if err != nil {
klog.Error(err)
return err
}

clustersBeforeFailover := make([]string, 0, len(binding.Spec.Clusters))
for _, targetCluster := range binding.Spec.Clusters {
clustersBeforeFailover = append(clustersBeforeFailover, targetCluster.Name)
}

if features.FeatureGate.Enabled(features.GracefulEviction) {
binding.Spec.GracefulEvictCluster(cluster, workv1alpha2.NewTaskOptions(
workv1alpha2.WithPurgeMode(policyv1alpha1.Graciously),
workv1alpha2.WithProducer(RBApplicationFailoverControllerName),
workv1alpha2.WithReason(workv1alpha2.EvictionReasonApplicationFailure),
workv1alpha2.WithGracePeriodSeconds(binding.Spec.Failover.Application.GracePeriodSeconds)))
workv1alpha2.WithGracePeriodSeconds(binding.Spec.Failover.Application.GracePeriodSeconds),
workv1alpha2.WithPreservedLabelState(preservedLabelState),
workv1alpha2.WithClustersBeforeFailover(clustersBeforeFailover)))
} else {
err := fmt.Errorf("GracefulEviction featureGate must be enabled when purgeMode is %s", policyv1alpha1.Graciously)
klog.Error(err)
Expand Down Expand Up @@ -272,3 +288,66 @@ func (c *RBApplicationFailoverController) bindingFilter(rb *workv1alpha2.Resourc
}
return true
}

func (c *RBApplicationFailoverController) calculatePreservedLabelState(binding *workv1alpha2.ResourceBinding, cluster string) (map[string]string, error) {
statePreservation := binding.Spec.Failover.Application.StatePreservation
if statePreservation == nil || len(statePreservation.Rules) == 0 {
return nil, nil
}

targetStatusItem, exist := findTargetStatusItemByCluster(binding.Status.AggregatedStatus, cluster)
if !exist || targetStatusItem.Status == nil || targetStatusItem.Status.Raw == nil {
return nil, fmt.Errorf("the status of ResourceBinding(%s/%s) under Cluster(%s) has not been colloected yet ", binding.Namespace, binding.Name, cluster)
}

results := make(map[string]string, len(statePreservation.Rules))
for _, rule := range statePreservation.Rules {
value, err := parseJsonValue(targetStatusItem, rule.JSONPath)
if err != nil {
klog.Errorf("Failed to parse value with jsonPath(%s) from status(%v), error: %v",
rule.JSONPath, targetStatusItem, err)
return nil, err
}
results[rule.AliasLabelName] = value
}

return results, nil
}

func parseJsonValue(targetStatus workv1alpha2.AggregatedStatusItem, jsonPath string) (string, error) {

Check failure on line 317 in pkg/controllers/applicationfailover/rb_application_failover_controller.go

View workflow job for this annotation

GitHub Actions / lint

var-naming: func parseJsonValue should be parseJSONValue (revive)
klog.Infof("[JUSTFORDEBUG] Raw status: %v", targetStatus.Status.Raw)
template := fmt.Sprintf(`{ %s }`, jsonPath)
j := jsonpath.New(jsonPath)
j.AllowMissingKeys(false)
err := j.Parse(template)
if err != nil {
klog.Errorf("[JUSTFORDEBUG] Parse template %s failed. Error: %v.", template, err)
return "", err
}

buf := new(bytes.Buffer)
unmarshalled := make(map[string]interface{})
_ = json.Unmarshal(targetStatus.Status.Raw, &unmarshalled)
klog.Infof("[JUSTFORDEBUG] unmarshalled data: %v", unmarshalled)
err = j.Execute(buf, unmarshalled)
if err != nil {
klog.Errorf("[JUSTFORDEBUG] Execute template %s failed. Error: %v.", template, err)
return "", err
}
klog.Infof("[JUSTFORDEBUG] Get Result: %s", buf.String())
return buf.String(), nil
}

func findTargetStatusItemByCluster(aggregatedStatusItems []workv1alpha2.AggregatedStatusItem, cluster string) (workv1alpha2.AggregatedStatusItem, bool) {
if len(aggregatedStatusItems) == 0 {
return workv1alpha2.AggregatedStatusItem{}, false
}

for index, statusItem := range aggregatedStatusItems {
if statusItem.ClusterName == cluster {
return aggregatedStatusItems[index], true
}
}

return workv1alpha2.AggregatedStatusItem{}, false
}

0 comments on commit 2dc5a44

Please sign in to comment.