Skip to content

Commit

Permalink
simplify status and perform rollouts in correct order (#376)
Browse files Browse the repository at this point in the history
* simplify status and perform in correct order

* add tests for deletion timestamps

* cleanup tests and add cache invalidation
  • Loading branch information
agouin authored Oct 25, 2023
1 parent bf763c8 commit cf84f10
Show file tree
Hide file tree
Showing 10 changed files with 397 additions and 399 deletions.
9 changes: 1 addition & 8 deletions api/v1/cosmosfullnode_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,21 +138,14 @@ type FullNodeStatus struct {

// Current sync information. Collected every 60s.
// +optional
SyncInfo *SyncInfoStatus `json:"syncInfo,omitempty"`
SyncInfo map[string]*SyncInfoPodStatus `json:"sync,omitempty"`

// Latest Height information. collected when node starts up and when RPC is successfully queried.
// +optional
Height map[string]uint64 `json:"height,omitempty"`
}

type SyncInfoStatus struct {
// The latest consensus state of pods.
Pods []SyncInfoPodStatus `json:"pods"`
}

type SyncInfoPodStatus struct {
// Pod's name.
Pod string `json:"pod"`
// When consensus information was fetched.
Timestamp metav1.Time `json:"timestamp"`
// Latest height if no error encountered.
Expand Down
36 changes: 12 additions & 24 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 21 additions & 32 deletions config/crd/bases/cosmos.strange.love_cosmosfullnodes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6009,39 +6009,28 @@ spec:
status:
description: A generic message for the user. May contain errors.
type: string
syncInfo:
sync:
additionalProperties:
properties:
error:
description: Error message if unable to fetch consensus state.
type: string
height:
description: Latest height if no error encountered.
format: int64
type: integer
inSync:
description: If the pod reports itself as in sync with chain
tip.
type: boolean
timestamp:
description: When consensus information was fetched.
format: date-time
type: string
required:
- timestamp
type: object
description: Current sync information. Collected every 60s.
properties:
pods:
description: The latest consensus state of pods.
items:
properties:
error:
description: Error message if unable to fetch consensus
state.
type: string
height:
description: Latest height if no error encountered.
format: int64
type: integer
inSync:
description: If the pod reports itself as in sync with chain
tip.
type: boolean
pod:
description: Pod's name.
type: string
timestamp:
description: When consensus information was fetched.
format: date-time
type: string
required:
- pod
- timestamp
type: object
type: array
required:
- pods
type: object
required:
- observedGeneration
Expand Down
17 changes: 9 additions & 8 deletions controllers/cosmosfullnode_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ func (r *CosmosFullNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reque
reporter := kube.NewEventReporter(logger, r.recorder, crd)

fullnode.ResetStatus(crd)
defer r.updateStatus(ctx, crd)

syncInfo := fullnode.SyncInfoStatus(ctx, crd, r.cacheController)

defer r.updateStatus(ctx, crd, syncInfo)

errs := &kube.ReconcileErrors{}

Expand Down Expand Up @@ -169,7 +172,7 @@ func (r *CosmosFullNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reque
}

// Reconcile pods.
podRequeue, err := r.podControl.Reconcile(ctx, reporter, crd, configCksums)
podRequeue, err := r.podControl.Reconcile(ctx, reporter, crd, configCksums, syncInfo)
if err != nil {
errs.Append(err)
}
Expand Down Expand Up @@ -218,21 +221,19 @@ func (r *CosmosFullNodeReconciler) resultWithErr(crd *cosmosv1.CosmosFullNode, e
return stopResult, err
}

func (r *CosmosFullNodeReconciler) updateStatus(ctx context.Context, crd *cosmosv1.CosmosFullNode) {
consensus := fullnode.SyncInfoStatus(ctx, crd, r.cacheController)

func (r *CosmosFullNodeReconciler) updateStatus(ctx context.Context, crd *cosmosv1.CosmosFullNode, syncInfo map[string]*cosmosv1.SyncInfoPodStatus) {
if err := r.statusClient.SyncUpdate(ctx, client.ObjectKeyFromObject(crd), func(status *cosmosv1.FullNodeStatus) {
status.ObservedGeneration = crd.Status.ObservedGeneration
status.Phase = crd.Status.Phase
status.StatusMessage = crd.Status.StatusMessage
status.Peers = crd.Status.Peers
status.SyncInfo = &consensus
for _, v := range consensus.Pods {
status.SyncInfo = syncInfo
for k, v := range syncInfo {
if v.Height != nil && *v.Height > 0 {
if status.Height == nil {
status.Height = make(map[string]uint64)
}
status.Height[v.Pod] = *v.Height
status.Height[k] = *v.Height
}
}
}); err != nil {
Expand Down
21 changes: 16 additions & 5 deletions internal/cosmos/cache_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,22 @@ func (c *CacheController) Reconcile(ctx context.Context, req reconcile.Request)
return finishResult, nil
}

// Invalidate removes the given pods status from the cache.
func (c *CacheController) Invalidate(controller client.ObjectKey, pods []string) {
v, _ := c.cache.Get(controller)
now := time.Now()
for _, s := range v {
for _, pod := range pods {
if s.Pod.Name == pod {
s.Status = CometStatus{}
s.Err = fmt.Errorf("invalidated")
s.TS = now
}
}
}
c.cache.Update(controller, v)
}

// Collect returns a StatusCollection for the given controller. Only returns cached CometStatus.
func (c *CacheController) Collect(ctx context.Context, controller client.ObjectKey) StatusCollection {
pods, err := c.listPods(ctx, controller)
Expand All @@ -168,11 +184,6 @@ func (c *CacheController) SyncedPods(ctx context.Context, controller client.Obje
return kube.AvailablePods(c.Collect(ctx, controller).SyncedPods(), 5*time.Second, time.Now())
}

// PodsWithStatus returns all pods with their status.
func (c *CacheController) PodsWithStatus(ctx context.Context, crd *cosmosv1.CosmosFullNode) []PodStatus {
return c.Collect(ctx, client.ObjectKeyFromObject(crd)).PodsWithStatus(crd)
}

func (c *CacheController) listPods(ctx context.Context, controller client.ObjectKey) ([]corev1.Pod, error) {
var pods corev1.PodList
if err := c.client.List(ctx, &pods,
Expand Down
44 changes: 0 additions & 44 deletions internal/cosmos/status_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"github.com/samber/lo"
cosmosv1 "github.com/strangelove-ventures/cosmos-operator/api/v1"
"github.com/strangelove-ventures/cosmos-operator/internal/kube"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -114,46 +113,3 @@ func (coll StatusCollection) Synced() StatusCollection {
func (coll StatusCollection) SyncedPods() []*corev1.Pod {
return lo.Map(coll.Synced(), func(status StatusItem, _ int) *corev1.Pod { return status.GetPod() })
}

// PodStatus is the status of a pod.
type PodStatus struct {
Pod *corev1.Pod
RPCReachable bool
Synced bool
AwaitingUpgrade bool
}

// PodsWithStatus returns all pods with their status.
func (coll StatusCollection) PodsWithStatus(crd *cosmosv1.CosmosFullNode) []PodStatus {
out := make([]PodStatus, len(coll))
for i, status := range coll {
ps := PodStatus{
Pod: status.GetPod(),
}
if crd.Spec.ChainSpec.Versions != nil {
instanceHeight := uint64(0)
if height, ok := crd.Status.Height[status.Pod.Name]; ok {
instanceHeight = height
}
var image string
for _, version := range crd.Spec.ChainSpec.Versions {
if instanceHeight < version.UpgradeHeight {
break
}
image = version.Image
}
if image != "" && status.Pod.Spec.Containers[0].Image != image {
ps.AwaitingUpgrade = true
}
}
if status.Err == nil {
ps.RPCReachable = true
if !status.Status.Result.SyncInfo.CatchingUp {
ps.Synced = true
}
}

out[i] = ps
}
return out
}
Loading

0 comments on commit cf84f10

Please sign in to comment.