Skip to content

Commit

Permalink
Create configmap for starting dispatcher pods (knative-extensions#4027)…
Browse files Browse the repository at this point in the history
… (#1299)

Signed-off-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
pierDipi authored Oct 1, 2024
1 parent 4146637 commit 1e7271f
Show file tree
Hide file tree
Showing 10 changed files with 159 additions and 16 deletions.
3 changes: 2 additions & 1 deletion control-plane/cmd/kafka-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/eventingtls"

"knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing"
"knative.dev/eventing-kafka-broker/control-plane/pkg/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker"
Expand Down Expand Up @@ -70,7 +71,7 @@ func main() {
ctx = filteredFactory.WithSelectors(ctx,
eventingtls.TrustBundleLabelSelector,
auth.OIDCLabelSelector,
"app.kubernetes.io/kind=kafka-dispatcher",
eventing.DispatcherLabelSelectorStr,
)

if v := os.Getenv("ENABLE_SARAMA_LOGGER"); strings.EqualFold(v, "true") {
Expand Down
5 changes: 5 additions & 0 deletions control-plane/pkg/apis/internals/kafka/eventing/data_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ const (
ConfigMapVolumeName = "kafka-resources"

DispatcherVolumeName = "contract-resources"

DataPlanePodKindLabelKey = "app.kubernetes.io/kind"
DispatcherPodKindLabelValue = "kafka-dispatcher"

DispatcherLabelSelectorStr = DataPlanePodKindLabelKey + "=" + DispatcherPodKindLabelValue
)

func ConfigMapNameFromPod(p *corev1.Pod) (string, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package v1alpha1

import (
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -75,3 +77,16 @@ func IsKnownStatefulSet(name string) bool {
name == ChannelStatefulSetName ||
name == BrokerStatefulSetName
}

func GetOwnerKindFromStatefulSetPrefix(name string) (string, bool) {
if strings.HasPrefix(name, SourceStatefulSetName) {
return "KafkaSource", true
}
if strings.HasPrefix(name, ChannelStatefulSetName) {
return "KafkaChannel", true
}
if strings.HasPrefix(name, BrokerStatefulSetName) {
return "Trigger", true
}
return "", false
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 43 additions & 0 deletions control-plane/pkg/reconciler/consumergroup/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ import (
"go.uber.org/multierr"
"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/storage/names"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"

"knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/offset"
"knative.dev/eventing-kafka-broker/control-plane/pkg/prober"
Expand Down Expand Up @@ -203,6 +205,47 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I

ResyncOnStatefulSetChange(ctx, globalResync)

dispatcherPodInformer.Informer().AddEventHandler(controller.HandleAll(func(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
return
}

kind, ok := kafkainternals.GetOwnerKindFromStatefulSetPrefix(pod.Name)
if !ok {
return
}

cmName, err := eventing.ConfigMapNameFromPod(pod)
if err != nil {
logger.Warnw("Failed to get ConfigMap name from pod", zap.String("pod", pod.Name), zap.Error(err))
return
}
if err := r.ensureContractConfigMapExists(ctx, pod, cmName); err != nil {
logger.Warnw("Failed to ensure ConfigMap for pod exists", zap.String("pod", pod.Name), zap.String("configmap", cmName), zap.Error(err))
return
}

impl.FilteredGlobalResync(
func(obj interface{}) bool {
cg, ok := obj.(*kafkainternals.ConsumerGroup)
if !ok {
return false
}

uf := cg.GetUserFacingResourceRef()
if uf == nil {
return false
}
if strings.EqualFold(kind, uf.Kind) {
return true
}
return false
},
consumerGroupInformer.Informer(),
)
}))

//Todo: ScaledObject informer when KEDA is installed

return impl
Expand Down
16 changes: 13 additions & 3 deletions control-plane/pkg/reconciler/consumergroup/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,30 @@
package consumergroup

import (
"context"
"testing"

"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
_ "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/informers/sources/v1beta1/kafkasource/fake"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool"
_ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake"
kubeclient "knative.dev/pkg/client/injection/kube/client/fake"
_ "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/node/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/filtered/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/secret/fake"
filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered"
_ "knative.dev/pkg/client/injection/kube/informers/factory/filtered/fake"
"knative.dev/pkg/configmap"
reconcilertesting "knative.dev/pkg/reconciler/testing"

"knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing"
_ "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/informers/sources/v1beta1/kafkasource/fake"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool"

kafkainternals "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1"
_ "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/informers/eventing/v1alpha1/consumer/fake"
_ "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/informers/eventing/v1alpha1/consumergroup/fake"
Expand All @@ -55,7 +61,11 @@ const (
)

func TestNewController(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)
ctx, _ := reconcilertesting.SetupFakeContext(t, func(ctx context.Context) context.Context {
return filteredFactory.WithSelectors(ctx,
eventing.DispatcherLabelSelectorStr,
)
})
ctx, _ = kedaclient.With(ctx)

t.Setenv("SYSTEM_NAMESPACE", systemNamespace)
Expand Down
25 changes: 17 additions & 8 deletions control-plane/pkg/reconciler/consumergroup/evictor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package consumergroup

import (
"context"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -26,21 +27,23 @@ import (
"k8s.io/utils/pointer"
eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
kubeclient "knative.dev/pkg/client/injection/kube/client/fake"
filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered"
reconcilertesting "knative.dev/pkg/reconciler/testing"

"knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing"
kafkainternals "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1"
_ "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/client/fake"
kafkainternalsclient "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/client/fake"
)

func TestNewEvictor(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)
ctx, _ := reconcilertesting.SetupFakeContext(t, withFilteredSelectors)

require.NotPanics(t, func() { newEvictor(ctx, zap.String("k", "n")) })
}

func TestEvictorNilPodNoPanic(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)
ctx, _ := reconcilertesting.SetupFakeContext(t, withFilteredSelectors)

var pod *corev1.Pod

Expand Down Expand Up @@ -74,7 +77,7 @@ func TestEvictorNilPodNoPanic(t *testing.T) {
}

func TestEvictorEvictSuccess(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)
ctx, _ := reconcilertesting.SetupFakeContext(t, withFilteredSelectors)

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "name"},
Expand Down Expand Up @@ -111,7 +114,7 @@ func TestEvictorEvictSuccess(t *testing.T) {
}

func TestEvictorNoEvictionEmptyPlacement(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)
ctx, _ := reconcilertesting.SetupFakeContext(t, withFilteredSelectors)

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "name"},
Expand Down Expand Up @@ -145,7 +148,7 @@ func TestEvictorNoEvictionEmptyPlacement(t *testing.T) {
}

func TestEvictorNoEviction(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)
ctx, _ := reconcilertesting.SetupFakeContext(t, withFilteredSelectors)

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "name"},
Expand Down Expand Up @@ -181,7 +184,7 @@ func TestEvictorNoEviction(t *testing.T) {
}

func TestEvictorEvictSuccessConsumerGroupSchedulingInProgress(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)
ctx, _ := reconcilertesting.SetupFakeContext(t, withFilteredSelectors)

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "name"},
Expand Down Expand Up @@ -212,7 +215,7 @@ func TestEvictorEvictSuccessConsumerGroupSchedulingInProgress(t *testing.T) {
}

func TestEvictorEvictPodNotFound(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)
ctx, _ := reconcilertesting.SetupFakeContext(t, withFilteredSelectors)

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "name"},
Expand All @@ -238,7 +241,7 @@ func TestEvictorEvictPodNotFound(t *testing.T) {
require.Nil(t, err)
}
func TestEvictorEvictConsumerGroupNotFound(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)
ctx, _ := reconcilertesting.SetupFakeContext(t, withFilteredSelectors)

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "name"},
Expand All @@ -263,3 +266,9 @@ func TestEvictorEvictConsumerGroupNotFound(t *testing.T) {

require.Nil(t, err)
}

func withFilteredSelectors(ctx context.Context) context.Context {
return filteredFactory.WithSelectors(ctx,
eventing.DispatcherLabelSelectorStr,
)
}
1 change: 1 addition & 0 deletions openshift/ci-operator/build-image/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ RUN wget https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 &&
RUN ./get-helm-3 --version v3.11.3 --no-sudo && helm version

RUN GOFLAGS='' go install github.com/mikefarah/yq/v3@latest
RUN GOFLAGS='' go install -tags="exclude_graphdriver_btrfs containers_image_openpgp" github.com/containers/skopeo/cmd/[email protected]

# go install creates $GOPATH/.cache with root permissions, we delete it here
# to avoid permission issues with the runtime users
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1527,6 +1527,7 @@ knative.dev/pkg/client/injection/kube/informers/core/v1/node/fake
knative.dev/pkg/client/injection/kube/informers/core/v1/pod
knative.dev/pkg/client/injection/kube/informers/core/v1/pod/fake
knative.dev/pkg/client/injection/kube/informers/core/v1/pod/filtered
knative.dev/pkg/client/injection/kube/informers/core/v1/pod/filtered/fake
knative.dev/pkg/client/injection/kube/informers/core/v1/secret
knative.dev/pkg/client/injection/kube/informers/core/v1/secret/fake
knative.dev/pkg/client/injection/kube/informers/core/v1/service
Expand Down

0 comments on commit 1e7271f

Please sign in to comment.