Skip to content

Commit ab08dfb

Browse files
authored
revisit timeout handling (#254)
* revisit timeout handling * cleanup * cleanup * comments ... * tweak backoff, update status on every reconile * simplify timeout handling * fix comments * set Timeout condition in error situations as well * do not set timeout when deleting * fix commments
1 parent 953b01a commit ab08dfb

File tree

8 files changed

+147
-81
lines changed

8 files changed

+147
-81
lines changed

clm/cmd/apply.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ func newApplyCmd() *cobra.Command {
117117

118118
for {
119119
release.State = component.StateProcessing
120-
ok, err := reconciler.Apply(context.TODO(), &release.Inventory, objects, namespace, ownerId, release.Revision)
120+
ok, err := reconciler.Apply(context.TODO(), &release.Inventory, objects, namespace, ownerId, fmt.Sprintf("%d", release.Revision))
121121
if err != nil {
122122
if !isEphmeralError(err) || errCount >= maxErrCount {
123123
return err

internal/backoff/backoff.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,15 @@ type Backoff struct {
2222
func NewBackoff(maxDelay time.Duration) *Backoff {
2323
return &Backoff{
2424
activities: make(map[any]any),
25-
// resulting per-item backoff is the maximum of a 200-times-50ms-then-maxDelay per-item limiter,
26-
// and an overall 5-per-second-burst-20 bucket limiter;
27-
// as a consequence, we have up to
28-
// - up to 20 almost immediate retries
29-
// - then then a phase of 5 guaranteed retries per seconnd (could be more if burst capacity is refilled
30-
// because of the duration of the reconcile logic execution itself)
31-
// - finally (after 200 iterations) slow retries at the rate given by maxDelay
25+
// resulting per-item backoff is the maximum of a 120-times-100ms-then-maxDelay per-item limiter,
26+
// and an overall 1-per-second-burst-50 bucket limiter;
27+
// as a consequence, we have
28+
// - a phase of 10 retries per second for the first 5 seconds
29+
// - then a phase of 1 retry per second for the next 60 seconds
30+
// - finally slow retries at the rate given by maxDelay
3231
limiter: workqueue.NewMaxOfRateLimiter(
33-
workqueue.NewItemFastSlowRateLimiter(50*time.Millisecond, maxDelay, 200),
34-
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(5), 20)},
32+
workqueue.NewItemFastSlowRateLimiter(100*time.Millisecond, maxDelay, 120),
33+
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(1), 50)},
3534
),
3635
}
3736
}

pkg/component/reconciler.go

Lines changed: 92 additions & 55 deletions
Large diffs are not rendered by default.

pkg/component/reference.go

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ type ConfigMapReference struct {
4949
}
5050

5151
func (r *ConfigMapReference) load(ctx context.Context, clnt client.Client, namespace string, ignoreNotFound bool) error {
52+
// TODO: shouldn't we panic if already loaded?
5253
configMap := &corev1.ConfigMap{}
5354
if err := clnt.Get(ctx, apitypes.NamespacedName{Namespace: namespace, Name: r.Name}, configMap); err != nil {
5455
if apierrors.IsNotFound(err) {
@@ -67,7 +68,7 @@ func (r *ConfigMapReference) load(ctx context.Context, clnt client.Client, names
6768

6869
func (r *ConfigMapReference) digest() string {
6970
if !r.loaded {
70-
// TODO: shouldn't we panic here?
71+
// note: we can't panic here because this might be called in case of not-found situations
7172
return ""
7273
}
7374
return calculateDigest(r.data)
@@ -97,6 +98,7 @@ type ConfigMapKeyReference struct {
9798
}
9899

99100
func (r *ConfigMapKeyReference) load(ctx context.Context, clnt client.Client, namespace string, ignoreNotFound bool, fallbackKeys ...string) error {
101+
// TODO: shouldn't we panic if already loaded?
100102
configMap := &corev1.ConfigMap{}
101103
if err := clnt.Get(ctx, apitypes.NamespacedName{Namespace: namespace, Name: r.Name}, configMap); err != nil {
102104
if apierrors.IsNotFound(err) {
@@ -130,7 +132,7 @@ func (r *ConfigMapKeyReference) load(ctx context.Context, clnt client.Client, na
130132

131133
func (r *ConfigMapKeyReference) digest() string {
132134
if !r.loaded {
133-
// TODO: shouldn't we panic here?
135+
// note: we can't panic here because this might be called in case of not-found situations
134136
return ""
135137
}
136138
return sha256hex([]byte(r.value))
@@ -157,6 +159,7 @@ type SecretReference struct {
157159
}
158160

159161
func (r *SecretReference) load(ctx context.Context, clnt client.Client, namespace string, ignoreNotFound bool) error {
162+
// TODO: shouldn't we panic if already loaded?
160163
secret := &corev1.Secret{}
161164
if err := clnt.Get(ctx, apitypes.NamespacedName{Namespace: namespace, Name: r.Name}, secret); err != nil {
162165
if apierrors.IsNotFound(err) {
@@ -175,7 +178,7 @@ func (r *SecretReference) load(ctx context.Context, clnt client.Client, namespac
175178

176179
func (r *SecretReference) digest() string {
177180
if !r.loaded {
178-
// TODO: shouldn't we panic here?
181+
// note: we can't panic here because this might be called in case of not-found situations
179182
return ""
180183
}
181184
return calculateDigest(r.data)
@@ -205,6 +208,7 @@ type SecretKeyReference struct {
205208
}
206209

207210
func (r *SecretKeyReference) load(ctx context.Context, clnt client.Client, namespace string, ignoreNotFound bool, fallbackKeys ...string) error {
211+
// TODO: shouldn't we panic if already loaded?
208212
secret := &corev1.Secret{}
209213
if err := clnt.Get(ctx, apitypes.NamespacedName{Namespace: namespace, Name: r.Name}, secret); err != nil {
210214
if apierrors.IsNotFound(err) {
@@ -238,7 +242,7 @@ func (r *SecretKeyReference) load(ctx context.Context, clnt client.Client, names
238242

239243
func (r *SecretKeyReference) digest() string {
240244
if !r.loaded {
241-
// TODO: shouldn't we panic here?
245+
// note: we can't panic here because this might be called in case of not-found situations
242246
return ""
243247
}
244248
return sha256hex(r.value)
@@ -253,16 +257,31 @@ func (r *SecretKeyReference) Value() []byte {
253257
return r.value
254258
}
255259

260+
// Generic reference. All occurrences in the component's spec of types implementing this interface are automatically resolved
261+
// by the framework during reconcile by calling the Load() method. The digests returned by the Digest() methods are
262+
// incorporated into the component's digest.
263+
type Reference[T Component] interface {
264+
// Load the referenced content. The framework calls this at most once. So it is ok if implementation
265+
// errors out or even panics if invoked more than once. The implementation may skip loading in certain cases,
266+
// for example if deletion is ongoing.
267+
Load(ctx context.Context, clnt client.Client, component T) error
268+
// Return a digest of the referenced content. This digest is incorporated into the component digest which
269+
// is passed to generators and hooks (per context) and which decides when the processing timer is reset,
270+
// and therefore influences the timeout behavior of the compoment. In case the reference is not loaded,
271+
// the implementation should return the empty string.
272+
Digest() string
273+
}
274+
256275
func resolveReferences[T Component](ctx context.Context, clnt client.Client, component T) (string, error) {
257276
digestData := make(map[string]any)
258277
spec := getSpec(component)
259278
digestData["generation"] = component.GetGeneration()
260279
digestData["annotations"] = component.GetAnnotations()
280+
// TODO: including spec into the digest is actually not required (since generation is included)
261281
digestData["spec"] = spec
262282
if err := walk.Walk(spec, func(x any, path []string, tag reflect.StructTag) error {
263283
// note: this must() is ok because marshalling []string should always work
264284
rawPath := must(json.Marshal(path))
265-
// TODO: allow arbitrary loadable types (with an interface LoadableReference or similar)
266285
switch r := x.(type) {
267286
case *ConfigMapReference:
268287
if r == nil {
@@ -308,6 +327,14 @@ func resolveReferences[T Component](ctx context.Context, clnt client.Client, com
308327
return err
309328
}
310329
digestData["refs:"+string(rawPath)] = r.digest()
330+
case Reference[T]:
331+
if v := reflect.ValueOf(r); r == nil || v.Kind() == reflect.Pointer && v.IsNil() {
332+
return nil
333+
}
334+
if err := r.Load(ctx, clnt, component); err != nil {
335+
return err
336+
}
337+
digestData["refs:"+string(rawPath)] = r.Digest()
311338
}
312339
return nil
313340
}); err != nil {

pkg/component/target.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func newReconcileTarget[T Component](reconcilerName string, reconcilerId string,
3535
}
3636
}
3737

38-
func (t *reconcileTarget[T]) Apply(ctx context.Context, component T, componentDigest string) (bool, string, error) {
38+
func (t *reconcileTarget[T]) Apply(ctx context.Context, component T, componentDigest string) (bool, error) {
3939
//log := log.FromContext(ctx)
4040
namespace := ""
4141
name := ""
@@ -63,12 +63,12 @@ func (t *reconcileTarget[T]) Apply(ctx context.Context, component T, componentDi
6363
WithComponentDigest(componentDigest)
6464
objects, err := t.resourceGenerator.Generate(generateCtx, namespace, name, component.GetSpec())
6565
if err != nil {
66-
return false, "", errors.Wrap(err, "error rendering manifests")
66+
return false, errors.Wrap(err, "error rendering manifests")
6767
}
6868

69-
ok, err := t.reconciler.Apply(ctx, &status.Inventory, objects, namespace, ownerId, component.GetGeneration())
69+
ok, err := t.reconciler.Apply(ctx, &status.Inventory, objects, namespace, ownerId, componentDigest)
7070

71-
return ok, calculateDigest(componentDigest, objects), err
71+
return ok, err
7272
}
7373

7474
func (t *reconcileTarget[T]) Delete(ctx context.Context, component T) (bool, error) {

pkg/reconciler/reconciler.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ func NewReconciler(name string, clnt cluster.Client, options ReconcilerOptions)
222222
// Objects which are instances of namespaced types will be placed into the namespace passed to Apply(), if they have no namespace defined in their manifest.
223223
// An update of an existing object will be performed if it is considered to be out of sync; that means:
224224
// - the object's manifest has changed, and the effective reconcile policy is ReconcilePolicyOnObjectChange or ReconcilePolicyOnObjectOrComponentChange or
225-
// - the specified component revision has changed and the effective reconcile policy is ReconcilePolicyOnObjectOrComponentChange or
225+
// - the specified component has changed and the effective reconcile policy is ReconcilePolicyOnObjectOrComponentChange or
226226
// - periodically after forceReapplyPeriod.
227227
//
228228
// The update itself will be done as follows:
@@ -242,10 +242,12 @@ func NewReconciler(name string, clnt cluster.Client, options ReconcilerOptions)
242242
// This method will change the passed inventory (add or remove elements, change elements). If Apply() returns true, then all objects are successfully reconciled;
243243
// otherwise, if it returns false, the caller should re-call it periodically, until it returns true. In any case, the passed inventory should match the state of the
244244
// inventory after the previous invocation of Apply(); usually, the caller saves the inventory after calling Apply(), and loads it before calling Apply().
245-
// The namespace and ownerId arguments should not be changed across subsequent invocations of Apply(); the componentRevision should be incremented only.
245+
// The namespace and ownerId arguments should not be changed across subsequent invocations of Apply(); the supplied componentDigest is included into the
246+
// digest of dependent objects if the effective reconcile policy is ReconcilePolicyOnObjectOrComponentChange (such that in this case, a change of componentDigest
247+
// triggers an immediate reconciliation of all dependent objects).
246248
//
247249
// Also note: it is absolutely crucial that this method returns (true, nil) immediately (on the first call) if everything is already in the right state.
248-
func (r *Reconciler) Apply(ctx context.Context, inventory *[]*InventoryItem, objects []client.Object, namespace string, ownerId string, componentRevision int64) (bool, error) {
250+
func (r *Reconciler) Apply(ctx context.Context, inventory *[]*InventoryItem, objects []client.Object, namespace string, ownerId string, componentDigest string) (bool, error) {
249251
var err error
250252
log := log.FromContext(ctx)
251253

@@ -417,7 +419,7 @@ func (r *Reconciler) Apply(ctx context.Context, inventory *[]*InventoryItem, obj
417419
// calculate object digest
418420
// note: if the effective reconcile policy of an object changes, it will always be reconciled at least one more time;
419421
// this is in particular the case if the policy changes from or to ReconcilePolicyOnce.
420-
digest, err := calculateObjectDigest(object, componentRevision, getReconcilePolicy(object))
422+
digest, err := calculateObjectDigest(object, componentDigest, getReconcilePolicy(object))
421423
if err != nil {
422424
return false, errors.Wrapf(err, "error calculating digest for object %s", types.ObjectKeyToString(object))
423425
}

pkg/reconciler/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ const (
5656
// Reconcile the dependent object if its manifest, as produced by the generator, changes.
5757
ReconcilePolicyOnObjectChange ReconcilePolicy = "OnObjectChange"
5858
// Reconcile the dependent object if its manifest, as produced by the generator, changes, or if the owning
59-
// component changes (identified by a change of its metadata.generation).
59+
// component changes (identified by a change of its digest, including references).
6060
ReconcilePolicyOnObjectOrComponentChange ReconcilePolicy = "OnObjectOrComponentChange"
6161
// Reconcile the dependent object only once; afterwards it will never be touched again by the reconciler.
6262
ReconcilePolicyOnce ReconcilePolicy = "Once"

pkg/reconciler/util.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func checkRange(x int, min int, max int) error {
5858
return nil
5959
}
6060

61-
func calculateObjectDigest(obj client.Object, revision int64, reconcilePolicy ReconcilePolicy) (string, error) {
61+
func calculateObjectDigest(obj client.Object, componentDigest string, reconcilePolicy ReconcilePolicy) (string, error) {
6262
if reconcilePolicy == ReconcilePolicyOnce {
6363
return "__once__", nil
6464
}
@@ -79,7 +79,8 @@ func calculateObjectDigest(obj client.Object, revision int64, reconcilePolicy Re
7979
digest := sha256hex(raw)
8080

8181
if reconcilePolicy == ReconcilePolicyOnObjectOrComponentChange {
82-
digest = fmt.Sprintf("%s@%d", digest, revision)
82+
// TODO: this becomes rather long; should we hash it once more?
83+
digest = fmt.Sprintf("%s@%s", digest, componentDigest)
8384
}
8485

8586
return digest, nil

0 commit comments

Comments
 (0)