Skip to content

Commit

Permalink
reconcile: wait for apiserver to response before trying rolling-update
Browse files Browse the repository at this point in the history
The rolling-update requires the apiserver (when called without --cloudonly),
so reconcile should wait for apiserver to start responding.

Implement this by reusing "validate cluster", but filtering to only the instance groups
and pods that we expect to be online.
justinsb committed Jan 13, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 2b133b2 commit f2d4eeb
Showing 9 changed files with 125 additions and 51 deletions.
2 changes: 1 addition & 1 deletion cmd/kops/delete_instance.go
Original file line number Diff line number Diff line change
@@ -258,7 +258,7 @@ func RunDeleteInstance(ctx context.Context, f *util.Factory, out io.Writer, opti

var clusterValidator validation.ClusterValidator
if !options.CloudOnly {
clusterValidator, err = validation.NewClusterValidator(cluster, cloud, list, restConfig, k8sClient)
clusterValidator, err = validation.NewClusterValidator(cluster, cloud, list, nil, nil, restConfig, k8sClient)
if err != nil {
return fmt.Errorf("cannot create cluster validator: %v", err)
}
26 changes: 26 additions & 0 deletions cmd/kops/reconcile_cluster.go
Original file line number Diff line number Diff line change
@@ -20,8 +20,10 @@ import (
"context"
"fmt"
"io"
"time"

"github.com/spf13/cobra"
v1 "k8s.io/api/core/v1"
"k8s.io/kops/cmd/kops/util"
"k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/commands/commandutils"
@@ -134,6 +136,30 @@ func RunReconcileCluster(ctx context.Context, f *util.Factory, out io.Writer, c
}
}

// Particularly for a new cluster, we need to wait for the control plane to be answering requests
// before we can do a rolling update.
fmt.Fprintf(out, "Waiting for the kubernetes API to be served\n")
{
opt := &ValidateClusterOptions{}
opt.InitDefaults()
opt.ClusterName = c.ClusterName
opt.wait = 10 * time.Minute

// filter the instance group to only include the control plane
opt.filterInstanceGroups = func(ig *kops.InstanceGroup) bool {
return ig.Spec.Role == kops.InstanceGroupRoleAPIServer || ig.Spec.Role == kops.InstanceGroupRoleControlPlane
}

// Ignore all pods, we just want to check the control plane is responding
opt.filterPodsForValidation = func(pod *v1.Pod) bool {
return false
}

if _, err := RunValidateCluster(ctx, f, out, opt); err != nil {
return fmt.Errorf("waiting for kubernetes API to be served: %w", err)
}
}

fmt.Fprintf(out, "Doing rolling-update for control plane\n")
{
opt := &RollingUpdateOptions{}
2 changes: 1 addition & 1 deletion cmd/kops/rolling-update_cluster.go
Original file line number Diff line number Diff line change
@@ -453,7 +453,7 @@ func RunRollingUpdateCluster(ctx context.Context, f *util.Factory, out io.Writer
return fmt.Errorf("getting rest config: %w", err)
}

clusterValidator, err = validation.NewClusterValidator(cluster, cloud, list, restConfig, k8sClient)
clusterValidator, err = validation.NewClusterValidator(cluster, cloud, list, nil, nil, restConfig, k8sClient)
if err != nil {
return fmt.Errorf("cannot create cluster validator: %v", err)
}
24 changes: 16 additions & 8 deletions cmd/kops/validate_cluster.go
Original file line number Diff line number Diff line change
@@ -36,6 +36,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
"k8s.io/kops/cmd/kops/util"
"k8s.io/kops/pkg/apis/kops"
kopsapi "k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/validation"
"k8s.io/kops/util/pkg/tables"
@@ -61,12 +62,19 @@ var (
)

type ValidateClusterOptions struct {
ClusterName string
output string
wait time.Duration
count int
interval time.Duration
kubeconfig string
ClusterName string
InstanceGroupRoles []kops.InstanceGroupRole
output string
wait time.Duration
count int
interval time.Duration
kubeconfig string

// filterInstanceGroups is a function that returns true if the instance group should be validated
filterInstanceGroups func(ig *kops.InstanceGroup) bool

// filterPodsForValidation is a function that returns true if the pod should be validated
filterPodsForValidation func(pod *v1.Pod) bool
}

func (o *ValidateClusterOptions) InitDefaults() {
@@ -164,7 +172,7 @@ func RunValidateCluster(ctx context.Context, f *util.Factory, out io.Writer, opt

timeout := time.Now().Add(options.wait)

validator, err := validation.NewClusterValidator(cluster, cloud, list, restConfig, k8sClient)
validator, err := validation.NewClusterValidator(cluster, cloud, list, options.filterInstanceGroups, options.filterPodsForValidation, restConfig, k8sClient)
if err != nil {
return nil, fmt.Errorf("unexpected error creating validatior: %v", err)
}
@@ -175,7 +183,7 @@ func RunValidateCluster(ctx context.Context, f *util.Factory, out io.Writer, opt
return nil, fmt.Errorf("wait time exceeded during validation")
}

result, err := validator.Validate()
result, err := validator.Validate(ctx)
if err != nil {
consecutive = 0
if options.wait > 0 {
2 changes: 1 addition & 1 deletion pkg/instancegroups/instancegroups.go
Original file line number Diff line number Diff line change
@@ -537,7 +537,7 @@ func (c *RollingUpdateCluster) validateClusterWithTimeout(validateCount int, gro

for {
// Note that we validate at least once before checking the timeout, in case the cluster is healthy with a short timeout
result, err := c.ClusterValidator.Validate()
result, err := c.ClusterValidator.Validate(ctx)
if err == nil && !hasFailureRelevantToGroup(result.Failures, group) {
successCount++
if successCount >= validateCount {
22 changes: 10 additions & 12 deletions pkg/instancegroups/rollingupdate_test.go
Original file line number Diff line number Diff line change
@@ -84,13 +84,13 @@ func getTestSetup() (*RollingUpdateCluster, *awsup.MockAWSCloud) {

type successfulClusterValidator struct{}

func (*successfulClusterValidator) Validate() (*validation.ValidationCluster, error) {
func (*successfulClusterValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) {
return &validation.ValidationCluster{}, nil
}

type failingClusterValidator struct{}

func (*failingClusterValidator) Validate() (*validation.ValidationCluster, error) {
func (*failingClusterValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) {
return &validation.ValidationCluster{
Failures: []*validation.ValidationError{
{
@@ -104,7 +104,7 @@ func (*failingClusterValidator) Validate() (*validation.ValidationCluster, error

type erroringClusterValidator struct{}

func (*erroringClusterValidator) Validate() (*validation.ValidationCluster, error) {
func (*erroringClusterValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) {
return nil, errors.New("testing validation error")
}

@@ -113,7 +113,7 @@ type instanceGroupNodeSpecificErrorClusterValidator struct {
InstanceGroup *kopsapi.InstanceGroup
}

func (igErrorValidator *instanceGroupNodeSpecificErrorClusterValidator) Validate() (*validation.ValidationCluster, error) {
func (igErrorValidator *instanceGroupNodeSpecificErrorClusterValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) {
return &validation.ValidationCluster{
Failures: []*validation.ValidationError{
{
@@ -130,7 +130,7 @@ type assertNotCalledClusterValidator struct {
T *testing.T
}

func (v *assertNotCalledClusterValidator) Validate() (*validation.ValidationCluster, error) {
func (v *assertNotCalledClusterValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) {
v.T.Fatal("validator called unexpectedly")
return nil, errors.New("validator called unexpectedly")
}
@@ -425,8 +425,7 @@ type failAfterOneNodeClusterValidator struct {
ReturnError bool
}

func (v *failAfterOneNodeClusterValidator) Validate() (*validation.ValidationCluster, error) {
ctx := context.TODO()
func (v *failAfterOneNodeClusterValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) {
asgGroups, _ := v.Cloud.Autoscaling().DescribeAutoScalingGroups(ctx, &autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: []string{v.Group},
})
@@ -648,8 +647,7 @@ type flappingClusterValidator struct {
invocationCount int
}

func (v *flappingClusterValidator) Validate() (*validation.ValidationCluster, error) {
ctx := context.TODO()
func (v *flappingClusterValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) {
asgGroups, _ := v.Cloud.Autoscaling().DescribeAutoScalingGroups(ctx, &autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: []string{"master-1"},
})
@@ -706,7 +704,7 @@ type failThreeTimesClusterValidator struct {
invocationCount int
}

func (v *failThreeTimesClusterValidator) Validate() (*validation.ValidationCluster, error) {
func (v *failThreeTimesClusterValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) {
v.invocationCount++
if v.invocationCount <= 3 {
return &validation.ValidationCluster{
@@ -1060,7 +1058,7 @@ type concurrentTest struct {
detached map[string]bool
}

func (c *concurrentTest) Validate() (*validation.ValidationCluster, error) {
func (c *concurrentTest) Validate(ctx context.Context) (*validation.ValidationCluster, error) {
c.mutex.Lock()
defer c.mutex.Unlock()

@@ -1441,7 +1439,7 @@ type alreadyDetachedTest struct {
detached map[string]bool
}

func (t *alreadyDetachedTest) Validate() (*validation.ValidationCluster, error) {
func (t *alreadyDetachedTest) Validate(ctx context.Context) (*validation.ValidationCluster, error) {
t.mutex.Lock()
defer t.mutex.Unlock()

2 changes: 1 addition & 1 deletion pkg/instancegroups/rollingupdate_warmpool_test.go
Original file line number Diff line number Diff line change
@@ -75,7 +75,7 @@ type countingValidator struct {
numValidations int
}

func (c *countingValidator) Validate() (*validation.ValidationCluster, error) {
func (c *countingValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) {
c.numValidations++
return &validation.ValidationCluster{}, nil
}
83 changes: 60 additions & 23 deletions pkg/validation/validate_cluster.go
Original file line number Diff line number Diff line change
@@ -56,15 +56,23 @@ type ValidationError struct {

type ClusterValidator interface {
// Validate validates a k8s cluster
Validate() (*ValidationCluster, error)
Validate(ctx context.Context) (*ValidationCluster, error)
}

type clusterValidatorImpl struct {
cluster *kops.Cluster
cloud fi.Cloud
instanceGroups []*kops.InstanceGroup
restConfig *rest.Config
k8sClient kubernetes.Interface
cluster *kops.Cluster
cloud fi.Cloud
restConfig *rest.Config
k8sClient kubernetes.Interface

// allInstanceGroups is the list of all instance groups in the cluster
allInstanceGroups []*kops.InstanceGroup

// filterInstanceGroups is a function that returns true if the instance group should be validated
filterInstanceGroups func(ig *kops.InstanceGroup) bool

// filterPodsForValidation is a function that returns true if the pod should be validated
filterPodsForValidation func(pod *v1.Pod) bool
}

func (v *ValidationCluster) addError(failure *ValidationError) {
@@ -101,30 +109,44 @@ func hasPlaceHolderIP(host string) (string, error) {
return "", nil
}

func NewClusterValidator(cluster *kops.Cluster, cloud fi.Cloud, instanceGroupList *kops.InstanceGroupList, restConfig *rest.Config, k8sClient kubernetes.Interface) (ClusterValidator, error) {
var instanceGroups []*kops.InstanceGroup
func NewClusterValidator(cluster *kops.Cluster, cloud fi.Cloud, instanceGroupList *kops.InstanceGroupList, filterInstanceGroups func(ig *kops.InstanceGroup) bool, filterPodsForValidation func(pod *v1.Pod) bool, restConfig *rest.Config, k8sClient kubernetes.Interface) (ClusterValidator, error) {
var allInstanceGroups []*kops.InstanceGroup

for i := range instanceGroupList.Items {
ig := &instanceGroupList.Items[i]
instanceGroups = append(instanceGroups, ig)
allInstanceGroups = append(allInstanceGroups, ig)
}

if len(instanceGroups) == 0 {
if len(allInstanceGroups) == 0 {
return nil, fmt.Errorf("no InstanceGroup objects found")
}

// If no filter is provided, validate all instance groups
if filterInstanceGroups == nil {
filterInstanceGroups = func(ig *kops.InstanceGroup) bool {
return true
}
}

// If no filter is provided, validate all pods
if filterPodsForValidation == nil {
filterPodsForValidation = func(pod *v1.Pod) bool {
return true
}
}

return &clusterValidatorImpl{
cluster: cluster,
cloud: cloud,
instanceGroups: instanceGroups,
restConfig: restConfig,
k8sClient: k8sClient,
cluster: cluster,
cloud: cloud,
allInstanceGroups: allInstanceGroups,
restConfig: restConfig,
k8sClient: k8sClient,
filterInstanceGroups: filterInstanceGroups,
filterPodsForValidation: filterPodsForValidation,
}, nil
}

func (v *clusterValidatorImpl) Validate() (*ValidationCluster, error) {
ctx := context.TODO()

func (v *clusterValidatorImpl) Validate(ctx context.Context) (*ValidationCluster, error) {
validation := &ValidationCluster{}

// Do not use if we are running gossip or without dns
@@ -161,13 +183,14 @@ func (v *clusterValidatorImpl) Validate() (*ValidationCluster, error) {
}

warnUnmatched := false
cloudGroups, err := v.cloud.GetCloudGroups(v.cluster, v.instanceGroups, warnUnmatched, nodeList.Items)
cloudGroups, err := v.cloud.GetCloudGroups(v.cluster, v.allInstanceGroups, warnUnmatched, nodeList.Items)
if err != nil {
return nil, err
}
readyNodes, nodeInstanceGroupMapping := validation.validateNodes(cloudGroups, v.instanceGroups)

if err := validation.collectPodFailures(ctx, v.k8sClient, readyNodes, nodeInstanceGroupMapping); err != nil {
readyNodes, nodeInstanceGroupMapping := validation.validateNodes(cloudGroups, v.allInstanceGroups, v.filterInstanceGroups)

if err := validation.collectPodFailures(ctx, v.k8sClient, readyNodes, nodeInstanceGroupMapping, v.filterPodsForValidation); err != nil {
return nil, fmt.Errorf("cannot get pod health for %q: %v", v.cluster.Name, err)
}

@@ -181,7 +204,7 @@ var masterStaticPods = []string{
}

func (v *ValidationCluster) collectPodFailures(ctx context.Context, client kubernetes.Interface, nodes []v1.Node,
nodeInstanceGroupMapping map[string]*kops.InstanceGroup,
nodeInstanceGroupMapping map[string]*kops.InstanceGroup, podValidationFilter func(pod *v1.Pod) bool,
) error {
masterWithoutPod := map[string]map[string]bool{}
nodeByAddress := map[string]string{}
@@ -210,10 +233,16 @@ func (v *ValidationCluster) collectPodFailures(ctx context.Context, client kuber
delete(masterWithoutPod[nodeByAddress[pod.Status.HostIP]], app)
}

// Ignore pods that we don't want to validate
if !podValidationFilter(pod) {
return nil
}

priority := pod.Spec.PriorityClassName
if priority != "system-cluster-critical" && priority != "system-node-critical" {
return nil
}

if pod.Status.Phase == v1.PodSucceeded {
return nil
}
@@ -275,12 +304,16 @@ func (v *ValidationCluster) collectPodFailures(ctx context.Context, client kuber
return nil
}

func (v *ValidationCluster) validateNodes(cloudGroups map[string]*cloudinstances.CloudInstanceGroup, groups []*kops.InstanceGroup) ([]v1.Node, map[string]*kops.InstanceGroup) {
func (v *ValidationCluster) validateNodes(cloudGroups map[string]*cloudinstances.CloudInstanceGroup, groups []*kops.InstanceGroup, shouldValidateInstanceGroup func(ig *kops.InstanceGroup) bool) ([]v1.Node, map[string]*kops.InstanceGroup) {
var readyNodes []v1.Node
groupsSeen := map[string]bool{}
nodeInstanceGroupMapping := map[string]*kops.InstanceGroup{}

for _, cloudGroup := range cloudGroups {
if cloudGroup.InstanceGroup != nil && !shouldValidateInstanceGroup(cloudGroup.InstanceGroup) {
continue
}

var allMembers []*cloudinstances.CloudInstance
allMembers = append(allMembers, cloudGroup.Ready...)
allMembers = append(allMembers, cloudGroup.NeedUpdate...)
@@ -372,6 +405,10 @@ func (v *ValidationCluster) validateNodes(cloudGroups map[string]*cloudinstances
}

for _, ig := range groups {
if !shouldValidateInstanceGroup(ig) {
continue
}

if !groupsSeen[ig.Name] {
v.addError(&ValidationError{
Kind: "InstanceGroup",
13 changes: 9 additions & 4 deletions pkg/validation/validate_cluster_test.go
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ limitations under the License.
package validation

import (
"context"
"fmt"
"testing"

@@ -70,6 +71,8 @@ func (c *MockCloud) GetCloudGroups(cluster *kopsapi.Cluster, instancegroups []*k
}

func testValidate(t *testing.T, groups map[string]*cloudinstances.CloudInstanceGroup, objects []runtime.Object) (*ValidationCluster, error) {
ctx := context.TODO()

cluster := &kopsapi.Cluster{
ObjectMeta: metav1.ObjectMeta{Name: "testcluster.k8s.local"},
Spec: kopsapi.ClusterSpec{
@@ -130,14 +133,16 @@ func testValidate(t *testing.T, groups map[string]*cloudinstances.CloudInstanceG
restConfig := &rest.Config{
Host: "https://api.testcluster.k8s.local",
}
validator, err := NewClusterValidator(cluster, mockcloud, &kopsapi.InstanceGroupList{Items: instanceGroups}, restConfig, fake.NewSimpleClientset(objects...))
validator, err := NewClusterValidator(cluster, mockcloud, &kopsapi.InstanceGroupList{Items: instanceGroups}, nil, nil, restConfig, fake.NewSimpleClientset(objects...))
if err != nil {
return nil, err
}
return validator.Validate()
return validator.Validate(ctx)
}

func Test_ValidateCloudGroupMissing(t *testing.T) {
ctx := context.TODO()

cluster := &kopsapi.Cluster{
ObjectMeta: metav1.ObjectMeta{Name: "testcluster.k8s.local"},
Spec: kopsapi.ClusterSpec{
@@ -163,9 +168,9 @@ func Test_ValidateCloudGroupMissing(t *testing.T) {
restConfig := &rest.Config{
Host: "https://api.testcluster.k8s.local",
}
validator, err := NewClusterValidator(cluster, mockcloud, &kopsapi.InstanceGroupList{Items: instanceGroups}, restConfig, fake.NewSimpleClientset())
validator, err := NewClusterValidator(cluster, mockcloud, &kopsapi.InstanceGroupList{Items: instanceGroups}, nil, nil, restConfig, fake.NewSimpleClientset())
require.NoError(t, err)
v, err := validator.Validate()
v, err := validator.Validate(ctx)
require.NoError(t, err)
if !assert.Len(t, v.Failures, 1) ||
!assert.Equal(t, &ValidationError{

0 comments on commit f2d4eeb

Please sign in to comment.