Skip to content

Commit

Permalink
[release-1.14] fix: do not build OIDC config unless enabled (#1351)
Browse files Browse the repository at this point in the history
* [release-1.14] fix: do not build OIDC config unless enabled (knative-extensions#4021) (knative-extensions#4056)

* fix: do not build OIDC config unless enabled (knative-extensions#4021)

* fix: do not build OIDC config unless enabled

Signed-off-by: Calum Murray <[email protected]>

* feat: receiver redeploys verticles when oidc feature changes

Signed-off-by: Calum Murray <[email protected]>

* feat: the control plane ensures receiver restarts

When config-features changes, the control plane
sets a annotation on the receiver pods so that
the configmap update is reconciled by k8s

Signed-off-by: Calum Murray <[email protected]>

* mvn spotless:apply

Signed-off-by: Calum Murray <[email protected]>

* cleanup: goimports

Signed-off-by: Calum Murray <[email protected]>

* fix: do not re-deploy verticles

Signed-off-by: Calum Murray <[email protected]>

* fix: features config paths are now correct

Signed-off-by: Calum Murray <[email protected]>

* fix java unit tests

Signed-off-by: Calum Murray <[email protected]>

* mvn spotless:apply

Signed-off-by: Calum Murray <[email protected]>

* address review comments

Signed-off-by: Calum Murray <[email protected]>

---------

Signed-off-by: Calum Murray <[email protected]>

* fix: compilation errors

Signed-off-by: Calum Murray <[email protected]>

* goimports

Signed-off-by: Calum Murray <[email protected]>

---------

Signed-off-by: Calum Murray <[email protected]>

* usage of CopyOnWriteArrayList/AtomicRef

Signed-off-by: Matthias Wessendorf <[email protected]>

* using concurrent hash map and atomic integer for ids

Signed-off-by: Matthias Wessendorf <[email protected]>

---------

Signed-off-by: Calum Murray <[email protected]>
Signed-off-by: Matthias Wessendorf <[email protected]>
Co-authored-by: Calum Murray <[email protected]>
  • Loading branch information
matzew and Cali0707 authored Nov 7, 2024
1 parent 6b5eeca commit af5fd33
Show file tree
Hide file tree
Showing 20 changed files with 221 additions and 91 deletions.
35 changes: 23 additions & 12 deletions control-plane/pkg/reconciler/base/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package base
import (
"context"
"fmt"
"strconv"
"time"

"go.uber.org/zap"
"google.golang.org/protobuf/encoding/protojson"
Expand Down Expand Up @@ -67,6 +67,9 @@ const (
// volume generation annotation data plane pods.
VolumeGenerationAnnotationKey = "volumeGeneration"

// config features update time annotation for data plane pods.
ConfigFeaturesUpdatedAnnotationKey = "configFeaturesUpdatedAt"

Protobuf = "protobuf"
Json = "json"
)
Expand Down Expand Up @@ -281,23 +284,31 @@ func (r *Reconciler) UpdateDataPlaneConfigMap(ctx context.Context, contract *con
return nil
}

func (r *Reconciler) UpdateDispatcherPodsAnnotation(ctx context.Context, logger *zap.Logger, volumeGeneration uint64) error {
func (r *Reconciler) UpdateDispatcherPodsContractGenerationAnnotation(ctx context.Context, logger *zap.Logger, volumeGeneration uint64) error {
pods, errors := r.PodLister.Pods(r.DataPlaneNamespace).List(r.dispatcherSelector())
if errors != nil {
return fmt.Errorf("failed to list dispatcher pods in namespace %s: %w", r.DataPlaneNamespace, errors)
}
return r.UpdatePodsAnnotation(ctx, logger, "dispatcher", volumeGeneration, pods)
return r.UpdatePodsAnnotation(ctx, logger, "dispatcher", VolumeGenerationAnnotationKey, fmt.Sprint(volumeGeneration), pods)
}

func (r *Reconciler) UpdateReceiverPodsAnnotation(ctx context.Context, logger *zap.Logger, volumeGeneration uint64) error {
func (r *Reconciler) UpdateReceiverPodsContractGenerationAnnotation(ctx context.Context, logger *zap.Logger, volumeGeneration uint64) error {
pods, errors := r.PodLister.Pods(r.DataPlaneNamespace).List(r.ReceiverSelector())
if errors != nil {
return fmt.Errorf("failed to list receiver pods in namespace %s: %w", r.DataPlaneNamespace, errors)
}
return r.UpdatePodsAnnotation(ctx, logger, "receiver", volumeGeneration, pods)
return r.UpdatePodsAnnotation(ctx, logger, "receiver", VolumeGenerationAnnotationKey, fmt.Sprint(volumeGeneration), pods)
}

func (r *Reconciler) UpdateReceiverConfigFeaturesUpdatedAnnotation(ctx context.Context, logger *zap.Logger) error {
pods, err := r.PodLister.Pods(r.DataPlaneNamespace).List(r.ReceiverSelector())
if err != nil {
return fmt.Errorf("failed to list receiver pods in namespace %s: %s", r.DataPlaneNamespace, err)
}
return r.UpdatePodsAnnotation(ctx, logger, "receiver", ConfigFeaturesUpdatedAnnotationKey, time.Now().String(), pods)
}

func (r *Reconciler) UpdatePodsAnnotation(ctx context.Context, logger *zap.Logger, component string, volumeGeneration uint64, pods []*corev1.Pod) error {
func (r *Reconciler) UpdatePodsAnnotation(ctx context.Context, logger *zap.Logger, component, annotationKey, annotationValue string, pods []*corev1.Pod) error {

var errors error

Expand All @@ -306,7 +317,7 @@ func (r *Reconciler) UpdatePodsAnnotation(ctx context.Context, logger *zap.Logge
logger.Debug(
"Update "+component+" pod annotation",
zap.String("pod", fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)),
zap.Uint64("volumeGeneration", volumeGeneration),
zap.String(annotationKey, annotationValue),
)

// do not update cache copy
Expand All @@ -318,15 +329,15 @@ func (r *Reconciler) UpdatePodsAnnotation(ctx context.Context, logger *zap.Logge
}

// Check whether pod's annotation is the expected one.
if v, ok := annotations[VolumeGenerationAnnotationKey]; ok {
v, err := strconv.ParseUint(v /* base */, 10 /* bitSize */, 64)
if err == nil && v == volumeGeneration {
// Volume generation already matches the expected volume generation number.
if v, ok := annotations[annotationKey]; ok {
if v == annotationValue {
logger.Debug(component + " pod annotation already up to date")
// annotation is already correct.
continue
}
}

annotations[VolumeGenerationAnnotationKey] = fmt.Sprint(volumeGeneration)
annotations[annotationKey] = annotationValue
pod.SetAnnotations(annotations)

if _, err := r.KubeClient.CoreV1().Pods(pod.Namespace).Update(ctx, pod, metav1.UpdateOptions{}); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions control-plane/pkg/reconciler/base/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func TestUpdateReceiverPodAnnotation(t *testing.T) {
ReceiverLabel: base.SinkReceiverLabel,
}

err := r.UpdateReceiverPodsAnnotation(ctx, logging.FromContext(ctx).Desugar(), 1)
err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logging.FromContext(ctx).Desugar(), 1)
require.Nil(t, err)
}

Expand All @@ -247,7 +247,7 @@ func TestUpdateDispatcherPodAnnotation(t *testing.T) {
DispatcherLabel: label,
}

err := r.UpdateDispatcherPodsAnnotation(ctx, logging.FromContext(ctx).Desugar(), 1)
err := r.UpdateDispatcherPodsContractGenerationAnnotation(ctx, logging.FromContext(ctx).Desugar(), 1)
require.Nil(t, err)
}

Expand Down
8 changes: 4 additions & 4 deletions control-plane/pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker)
// the update even if here eventually means seconds or minutes after the actual update.

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
logger.Error("Failed to update receiver pod annotation", zap.Error(
statusConditionManager.FailedToUpdateReceiverPodsAnnotation(err),
))
Expand All @@ -217,7 +217,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker)
logger.Debug("Updated receiver pod annotation")

// Update volume generation annotation of dispatcher pods
if err := r.UpdateDispatcherPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateDispatcherPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
// Failing to update dispatcher pods annotation leads to config map refresh delayed by several seconds.
// Since the dispatcher side is the consumer side, we don't lose availability, and we can consider the Broker
// ready. So, log out the error and move on to the next step.
Expand Down Expand Up @@ -496,11 +496,11 @@ func (r *Reconciler) deleteResourceFromContractConfigMap(ctx context.Context, lo
// Note: if there aren't changes to be done at the pod annotation level, we just skip the update.

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
return err
}
// Update volume generation annotation of dispatcher pods
if err := r.UpdateDispatcherPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateDispatcherPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
return err
}

Expand Down
4 changes: 4 additions & 0 deletions control-plane/pkg/reconciler/broker/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E
if globalResync != nil {
globalResync(obj)
}
err = reconciler.UpdateReceiverConfigFeaturesUpdatedAnnotation(ctx, logger.Desugar())
if err != nil {
logger.Warn("config-features updated, but the receiver pods were not successfully annotated. This may lead to features not working as expected.", zap.Error(err))
}
})
featureStore.WatchConfigs(watcher)

Expand Down
8 changes: 4 additions & 4 deletions control-plane/pkg/reconciler/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, channel *messagingv1beta
// the update even if here eventually means seconds or minutes after the actual update.

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
logger.Error("Failed to update receiver pod annotation", zap.Error(
statusConditionManager.FailedToUpdateReceiverPodsAnnotation(err),
))
Expand All @@ -270,7 +270,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, channel *messagingv1beta
logger.Debug("Updated receiver pod annotation")

// Update volume generation annotation of dispatcher pods
if err := r.UpdateDispatcherPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateDispatcherPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
// Failing to update dispatcher pods annotation leads to config map refresh delayed by several seconds.
// Since the dispatcher side is the consumer side, we don't lose availability, and we can consider the Channel
// ready. So, log out the error and move on to the next step.
Expand Down Expand Up @@ -407,11 +407,11 @@ func (r *Reconciler) finalizeKind(ctx context.Context, channel *messagingv1beta1
// Note: if there aren't changes to be done at the pod annotation level, we just skip the update.

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
return err
}
// Update volume generation annotation of dispatcher pods
if err := r.UpdateDispatcherPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateDispatcherPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
return err
}

Expand Down
20 changes: 12 additions & 8 deletions control-plane/pkg/reconciler/channel/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,6 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf

logger := logging.FromContext(ctx)

var globalResync func(obj interface{})
featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) {
if globalResync != nil {
globalResync(nil)
}
})
featureStore.WatchConfigs(watcher)

_, err := reconciler.GetOrCreateDataPlaneConfigMap(ctx)
if err != nil {
logger.Fatal("Failed to get or create data plane config map",
Expand All @@ -110,6 +102,18 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
)
}

var globalResync func(obj interface{})
featureStore := feature.NewStore(logger.Named("feature-config-store"), func(name string, value interface{}) {
if globalResync != nil {
globalResync(nil)
}
err = reconciler.UpdateReceiverConfigFeaturesUpdatedAnnotation(ctx, logger.Desugar())
if err != nil {
logger.Warn("config-features updated, but the receiver pods were not successfully annotated. This may lead to features not working as expected.", zap.Error(err))
}
})
featureStore.WatchConfigs(watcher)

impl := kafkachannelreconciler.NewImpl(ctx, reconciler,
func(impl *controller.Impl) controller.Options {
return controller.Options{
Expand Down
4 changes: 2 additions & 2 deletions control-plane/pkg/reconciler/channel/v2/channelv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta
// the update even if here eventually means seconds or minutes after the actual update.

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
logger.Error("Failed to update receiver pod annotation", zap.Error(
statusConditionManager.FailedToUpdateReceiverPodsAnnotation(err),
))
Expand Down Expand Up @@ -407,7 +407,7 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, channel *messagingv1beta1
// Note: if there aren't changes to be done at the pod annotation level, we just skip the update.

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion control-plane/pkg/reconciler/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ func (r *Reconciler) schedule(ctx context.Context, logger *zap.Logger, c *kafkai
return false, err
}

return true, b.UpdatePodsAnnotation(ctx, logger, "dispatcher" /* component, for logging */, ct.Generation, []*corev1.Pod{p})
return true, b.UpdatePodsAnnotation(ctx, logger, "dispatcher" /* component, for logging */, base.VolumeGenerationAnnotationKey, fmt.Sprint(ct.Generation), []*corev1.Pod{p})
}

func (r *Reconciler) commonReconciler(p *corev1.Pod, cmName string) base.Reconciler {
Expand Down
4 changes: 4 additions & 0 deletions control-plane/pkg/reconciler/sink/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
if globalResync != nil {
globalResync(obj)
}
err = reconciler.UpdateReceiverConfigFeaturesUpdatedAnnotation(ctx, logger.Desugar())
if err != nil {
logger.Warn("config-features updated, but the receiver pods were not successfully annotated. This may lead to features not working as expected.", zap.Error(err))
}
})
featureStore.WatchConfigs(watcher)

Expand Down
4 changes: 2 additions & 2 deletions control-plane/pkg/reconciler/sink/kafka_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink)
// receivers haven't got the Sink, so update failures to receiver pods is a hard failure.

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
return err
}

Expand Down Expand Up @@ -363,7 +363,7 @@ func (r *Reconciler) finalizeKind(ctx context.Context, ks *eventing.KafkaSink) e
// Note: if there aren't changes to be done at the pod annotation level, we just skip the update.

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
return err
}

Expand Down
4 changes: 2 additions & 2 deletions control-plane/pkg/reconciler/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, trigger *eventing.Trigge
}

// Update volume generation annotation of dispatcher pods
if err := r.UpdateDispatcherPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateDispatcherPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
// Failing to update dispatcher pods annotation leads to config map refresh delayed by several seconds.
// Since the dispatcher side is the consumer side, we don't lose availability, and we can consider the Trigger
// ready. So, log out the error and move on to the next step.
Expand Down Expand Up @@ -289,7 +289,7 @@ func (r *Reconciler) finalizeKind(ctx context.Context, trigger *eventing.Trigger
logger.Debug("Updated data plane config map", zap.String("configmap", r.Env.DataPlaneConfigMapAsString()))

// Update volume generation annotation of dispatcher pods
if err := r.UpdateDispatcherPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateDispatcherPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
// Failing to update dispatcher pods annotation leads to config map refresh delayed by several seconds.
// The delete trigger will eventually be seen by the data plane pods, so log out the error and move on to the
// next step.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class FileWatcher implements AutoCloseable {
/**
* All args constructor.
*
* @param contractConsumer updates receiver.
* @param triggerFunction is triggered whenever there is a file change.
* @param file file to watch
*/
public FileWatcher(File file, Runnable triggerFunction) {
Expand Down
Loading

0 comments on commit af5fd33

Please sign in to comment.