Skip to content

Commit

Permalink
chore: stage provider-kops progress
Browse files Browse the repository at this point in the history
  • Loading branch information
austinbrown-okta committed Jul 17, 2024
1 parent 668895c commit d02b5ec
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 110 deletions.
5 changes: 3 additions & 2 deletions apis/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,9 @@ type RollingUpdateSpecInstanceGroupSpec struct {
type InstanceGroupRole string

const (
ControlPlane InstanceGroupRole = "ControlPlane"
Node InstanceGroupRole = "Node"
InstanceGroupRoleControlPlane InstanceGroupRole = "ControlPlane"
Node InstanceGroupRole = "Node"
InstanceGroupRoleMaster InstanceGroupRole = "Master"
)

// *****
Expand Down
169 changes: 100 additions & 69 deletions internal/controller/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"encoding/json"
"fmt"
"os"
"reflect"
"strings"

"github.com/pkg/errors"
Expand Down Expand Up @@ -57,9 +56,11 @@ const (
crossplaneCreateSucceeded = "crossplane.io/external-create-succeeded"

providerKopsCreatePending = "provider-kops.io/external-create-pending"
providerKopsUpdateLocked = "provider-kops.io/external-update-locked"
providerKopsCreateComplete = "provider-kops.io/external-create-complete"
providerKopsReconcilePending = "provider-kops.io/external-reconcile-pending"
providerKopsTriggerUpdate = "provider-kops.io/external-update-trigger"
providerKopsTriggerRollingUpdate = "provider-kops.io/external-rolling-update-trigger"
providerKopsUpdateLocked = "provider-kops.io/external-update-locked"
)

var (
Expand Down Expand Up @@ -215,26 +216,58 @@ func (c *external) Observe(ctx context.Context, mg resource.Managed) (managed.Ex
} else {
return managed.ExternalObservation{}, err
}
}
if cluster != nil {
} else if cluster != nil {

// initialize status to a "ready" state and alter this based on annotations & diff
cr.Status.Status = apisv1alpha1.Ready
annotations := cr.GetAnnotations()

// check lock to prevent concurrent updates to state etc..
// _, resourceReconciling := annotations[providerKopsReconcilePending]
// if resourceReconciling {
// // TODO: print log indicating that the resource may not be updated, instruct
// // removal of the lock annotation if needed
// return mo, nil
// } else {
// if err := c.annotateCluster(ctx, cr, map[string]string{providerKopsReconcilePending: ""}); err != nil {
// return mo, err
// }
// }

// check lock to prevent concurrent updates to state etc..
_, resourceLocked := annotations[providerKopsUpdateLocked]
if resourceLocked {
// TODO: print log indicating that the resource may not be updated, instruct
// removal of the lock annotation if needed
cr.SetConditions(xpv1.Unavailable())
return mo, nil
}

// check for initial creation annotations
_, resourceCreating := annotations[providerKopsCreatePending]
_, resourceCreated := annotations[providerKopsCreateComplete]
if resourceCreating && !resourceCreated {
cr.Status.Status = apisv1alpha1.Updating
}

// now run validation w/ fallback for authentication to the cluster
output, err := c.service.validateCluster(ctx, cr, []string{})
if err != nil && strings.Contains(err.Error(), errNoAuth) {
if err := c.service.authenticateToCluster(ctx, cr, []string{}); err != nil {
return managed.ExternalObservation{}, err
}
cr.Status.Status = apisv1alpha1.Updating
}
if len(output.Failures) > 0 || err != nil {
cr.Status.Status = apisv1alpha1.Updating
} else {

if len(output.Failures) == 0 && err == nil {
cr.Status.Status = apisv1alpha1.Ready

changelog, err := c.service.diffCluster(ctx, cr)
if err != nil {
return managed.ExternalObservation{}, err
}
if len(changelog) > 0 {
// set status to prompt update
// cr.Status.Status = apisv1alpha1.Updating
cr.Status.Status = apisv1alpha1.Updating
if changelogjson, err := json.Marshal(changelog); err == nil {
fmt.Printf("\nchangelog:%s\n", string(changelogjson))
}
Expand All @@ -249,6 +282,9 @@ func (c *external) Observe(ctx context.Context, mg resource.Managed) (managed.Ex

switch cr.Status.Status {
case apisv1alpha1.Creating:
if err := c.lockCluster(ctx, cr, []string{providerKopsCreatePending, providerKopsUpdateLocked}); err != nil {
return mo, err
}
mo.ResourceExists = false
cr.SetConditions(xpv1.Creating())
case apisv1alpha1.Deleting:
Expand All @@ -258,6 +294,9 @@ func (c *external) Observe(ctx context.Context, mg resource.Managed) (managed.Ex
mo.ResourceExists = true
cr.SetConditions(xpv1.Available())
case apisv1alpha1.Updating:
if err := c.lockCluster(ctx, cr, []string{providerKopsUpdateLocked}); err != nil {
return mo, err
}
mo.ResourceExists = true
mo.ResourceUpToDate = false
// NB! We set the status to `Unavailable` bc the cluster is not
Expand All @@ -281,32 +320,23 @@ func (c *external) Create(ctx context.Context, mg resource.Managed) (managed.Ext
return managed.ExternalCreation{}, nil
}

// set a simple lock to prevent future creations and allow for safe re-create ops
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
fcr, err := c.getCluster(ctx, cr)
if err != nil {
return err
}

fcr.Annotations[providerKopsCreatePending] = ""
err = c.kube.Update(ctx, fcr, []client.UpdateOption{}...)
return err
})
if err != nil {
return managed.ExternalCreation{}, err
}

// fmt.Printf("Creating: %+v", cr)

if err := c.service.createCluster(ctx, cr); err != nil {
return managed.ExternalCreation{}, err
err := c.service.createCluster(ctx, cr)

if err := c.annotateCluster(ctx, cr, map[string]string{providerKopsCreateComplete: ""}); err != nil {
// TODO print error
}
// naive attempt to unlock cluster
if err := c.unlockCluster(ctx, cr, []string{providerKopsCreatePending, providerKopsUpdateLocked}); err != nil {
// TODO print error
}

return managed.ExternalCreation{
// Optionally return any details that may be required to connect to the
// external resource. These will be stored as the connection secret.
ConnectionDetails: managed.ConnectionDetails{},
}, nil
}, err
}

func (c *external) getCluster(ctx context.Context, cr *v1alpha1.Cluster) (*v1alpha1.Cluster, error) {
Expand All @@ -329,56 +359,16 @@ func (c *external) Update(ctx context.Context, mg resource.Managed) (managed.Ext

// fmt.Printf("Updating: %+v", cr)

a := instanceGroupYaml{}
b := instanceGroupYaml{}
if reflect.DeepEqual(a, b) {
fmt.Println("fancy that")
}

annotations := cr.GetAnnotations()

// check lock to prevent concurrent updates
if _, ok := annotations[providerKopsUpdateLocked]; ok {
return managed.ExternalUpdate{}, nil
}

// set a simple lock to prevent concurrent updates
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
fcr, err := c.getCluster(ctx, cr)
if err != nil {
return err
}

fcr.Annotations[providerKopsUpdateLocked] = ""
err = c.kube.Update(ctx, fcr, []client.UpdateOption{}...)
return err
})
if err != nil {
return managed.ExternalUpdate{}, err
}

// don't block when updating the cluster, this takes a while..
go func() {
bgCtx := context.Background()
if err := c.service.updateCluster(bgCtx, cr); err != nil {
fmt.Printf("\nUPDATE ERR: %+v\n", err)
}

// clear the lock
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
fcr, err := c.getCluster(bgCtx, cr)
if err != nil {
return err
}
fmt.Println("UPDATE DONE")
c.unlockCluster(bgCtx, cr, []string{providerKopsUpdateLocked})

delete(fcr.Annotations, providerKopsUpdateLocked)
delete(fcr.Annotations, providerKopsCreatePending)
err = c.kube.Update(bgCtx, fcr, []client.UpdateOption{}...)
return err
})
if err != nil {
fmt.Printf("\nUPDATE ANNOT ERR: %+v\n", err)
}
}()

return managed.ExternalUpdate{
Expand All @@ -403,3 +393,44 @@ func (c *external) Delete(ctx context.Context, mg resource.Managed) error {

return nil
}

func (c *external) annotateCluster(ctx context.Context, cr *v1alpha1.Cluster, annotations map[string]string) error {
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
fcr, err := c.getCluster(ctx, cr)
if err != nil {
return err
}

for k, v := range annotations {
fcr.Annotations[k] = v
}
err = c.kube.Update(ctx, fcr, []client.UpdateOption{}...)
return err
})
return err
}

func (c *external) lockCluster(ctx context.Context, cr *v1alpha1.Cluster, lockKeys []string) error {
annotations := map[string]string{}
for _, k := range lockKeys {
annotations[k] = ""
}
return c.annotateCluster(ctx, cr, annotations)
}

func (c *external) unlockCluster(ctx context.Context, cr *v1alpha1.Cluster, lockKeys []string) error {
// clear the lock
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
fcr, err := c.getCluster(ctx, cr)
if err != nil {
return err
}

for _, k := range lockKeys {
delete(fcr.Annotations, k)
}
err = c.kube.Update(ctx, fcr, []client.UpdateOption{}...)
return err
})
return err
}
Loading

0 comments on commit d02b5ec

Please sign in to comment.