diff --git a/api/v1/receiver_types.go b/api/v1/receiver_types.go
index 9790345d4..9861faa32 100644
--- a/api/v1/receiver_types.go
+++ b/api/v1/receiver_types.go
@@ -67,6 +67,11 @@ type ReceiverSpec struct {
// +required
Resources []CrossNamespaceObjectReference `json:"resources"`
+ // ResourceFilter is an expression that is applied to each Resource
+ // referenced in the Resources. If the expression returns false then the
+ // Resource is discarded and will not be notified.
+ ResourceFilter string `json:"resourceFilter,omitempty"`
+
// SecretRef specifies the Secret containing the token used
// to validate the payload authenticity.
// +required
diff --git a/config/crd/bases/notification.toolkit.fluxcd.io_receivers.yaml b/config/crd/bases/notification.toolkit.fluxcd.io_receivers.yaml
index 239a1a81f..f77e7490c 100644
--- a/config/crd/bases/notification.toolkit.fluxcd.io_receivers.yaml
+++ b/config/crd/bases/notification.toolkit.fluxcd.io_receivers.yaml
@@ -62,6 +62,12 @@ spec:
Secret references.
pattern: ^([0-9]+(\.[0-9]+)?(ms|s|m|h))+$
type: string
+ resourceFilter:
+ description: |-
+ ResourceFilter is an expression that is applied to each Resource
+ referenced in the Resources. If the expression returns false then the
+ Resource is discarded and will not be notified.
+ type: string
resources:
description: A list of resources to be notified about changes.
items:
diff --git a/docs/api/v1/notification.md b/docs/api/v1/notification.md
index 325c17596..130ae5db6 100644
--- a/docs/api/v1/notification.md
+++ b/docs/api/v1/notification.md
@@ -122,6 +122,19 @@ e.g. ‘push’ for GitHub or ‘Push Hook’ for GitLab.
+resourceFilter
+
+string
+
+ |
+
+ ResourceFilter is an expression that is applied to each Resource
+referenced in the Resources. If the expression returns false then the
+Resource is discarded and will not be notified.
+ |
+
+
+
secretRef
@@ -321,6 +334,19 @@ e.g. ‘push’ for GitHub or ‘Push Hook’ for GitLab.
|
+resourceFilter
+
+string
+
+ |
+
+ ResourceFilter is an expression that is applied to each Resource
+referenced in the Resources. If the expression returns false then the
+Resource is discarded and will not be notified.
+ |
+
+
+
secretRef
diff --git a/go.mod b/go.mod
index 9c5ea45e6..c7e41168a 100644
--- a/go.mod
+++ b/go.mod
@@ -25,6 +25,7 @@ require (
github.com/fluxcd/pkg/ssa v0.41.1
github.com/getsentry/sentry-go v0.29.0
github.com/go-logr/logr v1.4.2
+ github.com/google/cel-go v0.20.1
github.com/google/go-github/v64 v64.0.0
github.com/hashicorp/go-retryablehttp v0.7.7
github.com/ktrysmt/go-bitbucket v0.9.80
@@ -74,6 +75,7 @@ require (
github.com/DataDog/zstd v1.5.2 // indirect
github.com/MakeNowJust/heredoc v1.0.0 // indirect
github.com/ProtonMail/go-crypto v1.0.0 // indirect
+ github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
@@ -159,6 +161,7 @@ require (
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1 // indirect
github.com/spf13/cobra v1.8.1 // indirect
+ github.com/stoewer/go-strcase v1.2.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/xlab/treeprint v1.2.0 // indirect
go.opencensus.io v0.24.0 // indirect
diff --git a/go.sum b/go.sum
index a3fcfb21f..fb9392f87 100644
--- a/go.sum
+++ b/go.sum
@@ -73,6 +73,8 @@ github.com/PagerDuty/go-pagerduty v1.8.0 h1:MTFqTffIcAervB83U7Bx6HERzLbyaSPL/+ox
github.com/PagerDuty/go-pagerduty v1.8.0/go.mod h1:nzIeAqyFSJAFkjWKvMzug0JtwDg+V+UoCWjFrfFH5mI=
github.com/ProtonMail/go-crypto v1.0.0 h1:LRuvITjQWX+WIfr930YHG2HNfjR1uOfyf5vE0kC2U78=
github.com/ProtonMail/go-crypto v1.0.0/go.mod h1:EjAoLdwvbIOoOQr3ihjnSoLZRtE8azugULFRteWMNc0=
+github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI=
+github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
@@ -215,6 +217,8 @@ github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU=
github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4=
+github.com/google/cel-go v0.20.1 h1:nDx9r8S3L4pE61eDdt8igGj8rf5kjYR3ILxWIpWNi84=
+github.com/google/cel-go v0.20.1/go.mod h1:kWcIzTsPX0zmQ+H3TirHstLLf9ep5QTsZBN9u4dOYLg=
github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I=
github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
@@ -381,12 +385,15 @@ github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM=
github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
+github.com/stoewer/go-strcase v1.2.0 h1:Z2iHWqGXH00XYgqDmNgQbIBxf3wrNq0F3feEy0ainaU=
+github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
@@ -591,6 +598,7 @@ gopkg.in/evanphx/json-patch.v4 v4.12.0 h1:n6jtcsulIzXPJaxegRbvFNNrZDjbij7ny3gmSP
gopkg.in/evanphx/json-patch.v4 v4.12.0/go.mod h1:p8EYWUEYMpynmqDbY58zCKCFZw8pRWMG4EsWvDvM72M=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
diff --git a/internal/server/cel.go b/internal/server/cel.go
new file mode 100644
index 000000000..f3ee26215
--- /dev/null
+++ b/internal/server/cel.go
@@ -0,0 +1,172 @@
+package server
+
+import (
+ "encoding/json"
+ "fmt"
+ "mime"
+ "net/http"
+ "strings"
+
+ "github.com/google/cel-go/cel"
+ "github.com/google/cel-go/checker/decls"
+ "github.com/google/cel-go/common/types"
+ "github.com/google/cel-go/common/types/ref"
+ "github.com/google/cel-go/common/types/traits"
+ celext "github.com/google/cel-go/ext"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+)
+
+func newCELEvaluator(expr string, req *http.Request) (resourcePredicate, error) {
+ env, err := makeCELEnv()
+ if err != nil {
+ return nil, err
+ }
+ parsed, issues := env.Parse(expr)
+ if issues != nil && issues.Err() != nil {
+ return nil, fmt.Errorf("failed to parse expression %v: %w", expr, issues.Err())
+ }
+
+ checked, issues := env.Check(parsed)
+ if issues != nil && issues.Err() != nil {
+ return nil, fmt.Errorf("expression %v check failed: %w", expr, issues.Err())
+ }
+
+ prg, err := env.Program(checked, cel.EvalOptions(cel.OptOptimize))
+ if err != nil {
+ return nil, fmt.Errorf("expression %v failed to create a Program: %w", expr, err)
+ }
+
+ body := map[string]any{}
+ // Only decodes the body for the expression if the body is JSON.
+ // Technically you could generate several resources without any body.
+ if isJSONContent(req) {
+ if err := json.NewDecoder(req.Body).Decode(&body); err != nil {
+ return nil, fmt.Errorf("failed to parse request body as JSON: %s", err)
+ }
+ }
+
+ return func(obj client.Object) (*bool, error) {
+ data, err := clientObjectToMap(obj)
+ if err != nil {
+ return nil, err
+ }
+
+ out, _, err := prg.Eval(map[string]any{
+ "resource": data,
+ "request": body,
+ })
+ if err != nil {
+ return nil, fmt.Errorf("expression %v failed to evaluate: %w", expr, err)
+ }
+
+ v, ok := out.(types.Bool)
+ if !ok {
+ return nil, fmt.Errorf("expression %q did not return a boolean value", expr)
+ }
+
+ result := v.Value().(bool)
+
+ return &result, nil
+ }, nil
+}
+
+func makeCELEnv() (*cel.Env, error) {
+ mapStrDyn := decls.NewMapType(decls.String, decls.Dyn)
+ return cel.NewEnv(
+ celext.Strings(),
+ celext.Encoders(),
+ notifications(),
+ cel.Declarations(
+ decls.NewVar("resource", mapStrDyn),
+ decls.NewVar("request", mapStrDyn),
+ ))
+}
+
+func isJSONContent(r *http.Request) bool {
+ contentType := r.Header.Get("Content-type")
+ for _, v := range strings.Split(contentType, ",") {
+ t, _, err := mime.ParseMediaType(v)
+ if err != nil {
+ break
+ }
+ if t == "application/json" {
+ return true
+ }
+ }
+
+ return false
+}
+
+func notifications() cel.EnvOption {
+ r, err := types.NewRegistry()
+ if err != nil {
+ panic(err) // TODO: Do something better?
+ }
+
+ return cel.Lib(¬ificationsLib{registry: r})
+}
+
+type notificationsLib struct {
+ registry *types.Registry
+}
+
+// LibraryName implements the SingletonLibrary interface method.
+func (*notificationsLib) LibraryName() string {
+ return "flux.notifications.lib"
+}
+
+// CompileOptions implements the Library interface method.
+func (l *notificationsLib) CompileOptions() []cel.EnvOption {
+ listStrDyn := cel.ListType(cel.DynType)
+ opts := []cel.EnvOption{
+ cel.Function("first",
+ cel.MemberOverload("first_list", []*cel.Type{listStrDyn}, cel.DynType,
+ cel.UnaryBinding(listFirst))),
+ cel.Function("last",
+ cel.MemberOverload("last_list", []*cel.Type{listStrDyn}, cel.DynType,
+ cel.UnaryBinding(listLast))),
+ }
+
+ return opts
+}
+
+// ProgramOptions implements the Library interface method.
+func (*notificationsLib) ProgramOptions() []cel.ProgramOption {
+ return []cel.ProgramOption{}
+}
+
+func listLast(val ref.Val) ref.Val {
+ l := val.(traits.Lister)
+ sz := l.Size().Value().(int64)
+
+ if sz == 0 {
+ return types.NullValue
+ }
+
+ return l.Get(types.Int(sz - 1))
+}
+
+func listFirst(val ref.Val) ref.Val {
+ l := val.(traits.Lister)
+ sz := l.Size().Value().(int64)
+
+ if sz == 0 {
+ return types.NullValue
+ }
+
+ return l.Get(types.Int(0))
+}
+
+func clientObjectToMap(v client.Object) (map[string]any, error) {
+ b, err := json.Marshal(v)
+ if err != nil {
+ return nil, fmt.Errorf("failed to marshal PartialObjectMetadata from resource for CEL expression: %w", err)
+ }
+
+ var result map[string]any
+ if err := json.Unmarshal(b, &result); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal PartialObjectMetadata from resource for CEL expression: %w", err)
+ }
+
+ return result, nil
+}
diff --git a/internal/server/receiver_handler_test.go b/internal/server/receiver_handler_test.go
index b555dd02a..8908749b6 100644
--- a/internal/server/receiver_handler_test.go
+++ b/internal/server/receiver_handler_test.go
@@ -760,6 +760,213 @@ func Test_handlePayload(t *testing.T) {
expectedResourcesAnnotated: 1,
expectedResponseCode: http.StatusOK,
},
+ {
+ name: "resources filtered with CEL expressions",
+ headers: map[string]string{
+ "Content-Type": "application/json; charset=utf-8",
+ },
+ payload: map[string]interface{}{
+ "action": "INSERT",
+ "digest": "us-east1-docker.pkg.dev/my-project/my-repo/hello-world@sha256:6ec128e26cd5...",
+ "tag": "us-east1-docker.pkg.dev/my-project/my-repo/hello-world:1.1",
+ },
+ receiver: &apiv1.Receiver{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "receiver",
+ },
+ Spec: apiv1.ReceiverSpec{
+ Type: apiv1.GenericReceiver,
+ SecretRef: meta.LocalObjectReference{
+ Name: "token",
+ },
+ Resources: []apiv1.CrossNamespaceObjectReference{
+ {
+ APIVersion: apiv1.GroupVersion.String(),
+ Kind: apiv1.ReceiverKind,
+ Name: "*",
+ MatchLabels: map[string]string{
+ "label": "production",
+ },
+ },
+ },
+ ResourceFilter: `has(resource.metadata.annotations) && request.tag.split('/').last().split(":").first() == resource.metadata.annotations['update-image']`,
+ },
+ Status: apiv1.ReceiverStatus{
+ WebhookPath: apiv1.ReceiverWebhookPath,
+ Conditions: []metav1.Condition{{Type: meta.ReadyCondition, Status: metav1.ConditionTrue}},
+ },
+ },
+ secret: &corev1.Secret{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "token",
+ },
+ Data: map[string][]byte{
+ "token": []byte("token"),
+ },
+ },
+ resources: []client.Object{
+ &apiv1.Receiver{
+ TypeMeta: metav1.TypeMeta{
+ Kind: apiv1.ReceiverKind,
+ APIVersion: apiv1.GroupVersion.String(),
+ },
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-resource-1",
+ Annotations: map[string]string{
+ "update-image": "hello-world",
+ },
+ Labels: map[string]string{
+ "label": "production",
+ },
+ },
+ },
+ &apiv1.Receiver{
+ TypeMeta: metav1.TypeMeta{
+ Kind: apiv1.ReceiverKind,
+ APIVersion: apiv1.GroupVersion.String(),
+ },
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-resource-2",
+ Namespace: "tested",
+ Labels: map[string]string{
+ "label": "production",
+ },
+ Annotations: map[string]string{
+ "update-image": "other-image",
+ },
+ },
+ },
+ &apiv1.Receiver{
+ TypeMeta: metav1.TypeMeta{
+ Kind: apiv1.ReceiverKind,
+ APIVersion: apiv1.GroupVersion.String(),
+ },
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-resource-3",
+ Labels: map[string]string{
+ "label": "production",
+ },
+ },
+ },
+ },
+ expectedResourcesAnnotated: 1, // TODO: This should really check more than just the count.
+ expectedResponseCode: http.StatusOK,
+ },
+ {
+ name: "filtering out a single named resource with CEL",
+ headers: map[string]string{
+ "Content-Type": "application/json; charset=utf-8",
+ },
+ payload: map[string]interface{}{
+ "action": "INSERT",
+ "digest": "us-east1-docker.pkg.dev/my-project/my-repo/hello-world@sha256:6ec128e26cd5...",
+ "tag": "us-east1-docker.pkg.dev/my-project/my-repo/hello-world:1.1",
+ },
+ receiver: &apiv1.Receiver{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "receiver",
+ },
+ Spec: apiv1.ReceiverSpec{
+ Type: apiv1.GenericReceiver,
+ SecretRef: meta.LocalObjectReference{
+ Name: "token",
+ },
+ Resources: []apiv1.CrossNamespaceObjectReference{
+ {
+ APIVersion: apiv1.GroupVersion.String(),
+ Kind: apiv1.ReceiverKind,
+ Name: "test-resource",
+ },
+ },
+ ResourceFilter: `has(resource.metadata.annotations) && request.tag.split('/').last().split(":").first() == resource.metadata.annotations['update-image']`,
+ },
+ Status: apiv1.ReceiverStatus{
+ WebhookPath: apiv1.ReceiverWebhookPath,
+ Conditions: []metav1.Condition{{Type: meta.ReadyCondition, Status: metav1.ConditionTrue}},
+ },
+ },
+ secret: &corev1.Secret{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "token",
+ },
+ Data: map[string][]byte{
+ "token": []byte("token"),
+ },
+ },
+ resources: []client.Object{
+ &apiv1.Receiver{
+ TypeMeta: metav1.TypeMeta{
+ Kind: apiv1.ReceiverKind,
+ APIVersion: apiv1.GroupVersion.String(),
+ },
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-resource",
+ Annotations: map[string]string{
+ "update-image": "not-hello-world",
+ },
+ },
+ },
+ },
+ expectedResourcesAnnotated: 0,
+ expectedResponseCode: http.StatusOK,
+ },
+
+ {
+ name: "handling errors when parsing the CEL expression results",
+ headers: map[string]string{
+ "Content-Type": "application/json; charset=utf-8",
+ },
+ receiver: &apiv1.Receiver{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "receiver",
+ },
+ Spec: apiv1.ReceiverSpec{
+ Type: apiv1.GenericReceiver,
+ SecretRef: meta.LocalObjectReference{
+ Name: "token",
+ },
+ Resources: []apiv1.CrossNamespaceObjectReference{
+ {
+ APIVersion: apiv1.GroupVersion.String(),
+ Kind: apiv1.ReceiverKind,
+ Name: "*",
+ MatchLabels: map[string]string{
+ "label": "production",
+ },
+ },
+ },
+ ResourceFilter: `resource.name == "test-resource-1"`,
+ },
+ Status: apiv1.ReceiverStatus{
+ WebhookPath: apiv1.ReceiverWebhookPath,
+ Conditions: []metav1.Condition{{Type: meta.ReadyCondition, Status: metav1.ConditionTrue}},
+ },
+ },
+ secret: &corev1.Secret{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "token",
+ },
+ Data: map[string][]byte{
+ "token": []byte("token"),
+ },
+ },
+ resources: []client.Object{
+ &apiv1.Receiver{
+ TypeMeta: metav1.TypeMeta{
+ Kind: apiv1.ReceiverKind,
+ APIVersion: apiv1.GroupVersion.String(),
+ },
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-resource-1",
+ Labels: map[string]string{
+ "label": "production",
+ },
+ },
+ },
+ },
+ expectedResourcesAnnotated: 0, // TODO: This should really check more than just the count.
+ expectedResponseCode: http.StatusInternalServerError,
+ },
}
scheme := runtime.NewScheme()
@@ -809,8 +1016,7 @@ func Test_handlePayload(t *testing.T) {
}
rr := httptest.NewRecorder()
- handler := s.handlePayload()
- handler(rr, req)
+ s.handlePayload(rr, req)
g.Expect(rr.Result().StatusCode).To(gomega.Equal(tt.expectedResponseCode))
var allReceivers apiv1.ReceiverList
@@ -827,3 +1033,14 @@ func Test_handlePayload(t *testing.T) {
})
}
}
+
+func buildTestClient(objs ...client.Object) client.Client {
+ scheme := runtime.NewScheme()
+ apiv1.AddToScheme(scheme)
+ corev1.AddToScheme(scheme)
+
+ return fake.NewClientBuilder().
+ WithScheme(scheme).
+ WithObjects(objs...).
+ WithIndex(&apiv1.Receiver{}, WebhookPathIndexKey, IndexReceiverWebhookPath).Build()
+}
diff --git a/internal/server/receiver_handlers.go b/internal/server/receiver_handlers.go
index 5a054cec7..dbcc105b9 100644
--- a/internal/server/receiver_handlers.go
+++ b/internal/server/receiver_handlers.go
@@ -70,65 +70,72 @@ func IndexReceiverWebhookPath(o client.Object) []string {
return nil
}
-func (s *ReceiverServer) handlePayload() func(w http.ResponseWriter, r *http.Request) {
- return func(w http.ResponseWriter, r *http.Request) {
- ctx := context.Background()
- digest := url.PathEscape(strings.TrimPrefix(r.RequestURI, apiv1.ReceiverWebhookPath))
+func (s *ReceiverServer) handlePayload(w http.ResponseWriter, r *http.Request) {
+ ctx := r.Context()
+ digest := url.PathEscape(strings.TrimPrefix(r.RequestURI, apiv1.ReceiverWebhookPath))
- s.logger.Info(fmt.Sprintf("handling request: %s", digest))
+ s.logger.Info(fmt.Sprintf("handling request: %s", digest))
- var allReceivers apiv1.ReceiverList
- err := s.kubeClient.List(ctx, &allReceivers, client.MatchingFields{
- WebhookPathIndexKey: r.RequestURI,
- }, client.Limit(1))
- if err != nil {
- s.logger.Error(err, "unable to list receivers")
- w.WriteHeader(http.StatusInternalServerError)
- return
- }
+ var allReceivers apiv1.ReceiverList
+ err := s.kubeClient.List(ctx, &allReceivers, client.MatchingFields{
+ WebhookPathIndexKey: r.RequestURI,
+ }, client.Limit(1))
+ if err != nil {
+ s.logger.Error(err, "unable to list receivers")
+ w.WriteHeader(http.StatusInternalServerError)
+ return
+ }
- if len(allReceivers.Items) == 0 {
- w.WriteHeader(http.StatusNotFound)
- return
- }
+ if len(allReceivers.Items) == 0 {
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
- receiver := allReceivers.Items[0]
- logger := s.logger.WithValues(
- "reconciler kind", apiv1.ReceiverKind,
- "name", receiver.Name,
- "namespace", receiver.Namespace)
+ receiver := allReceivers.Items[0]
+ logger := s.logger.WithValues(
+ "reconciler kind", apiv1.ReceiverKind,
+ "name", receiver.Name,
+ "namespace", receiver.Namespace)
- if receiver.Spec.Suspend || !conditions.IsReady(&receiver) {
- err := errors.New("unable to process request")
- if receiver.Spec.Suspend {
- logger.Error(err, "receiver is suspended")
- } else {
- logger.Error(err, "receiver is not ready")
- }
- w.WriteHeader(http.StatusServiceUnavailable)
- return
+ if receiver.Spec.Suspend || !conditions.IsReady(&receiver) {
+ err := errors.New("unable to process request")
+ if receiver.Spec.Suspend {
+ logger.Error(err, "receiver is suspended")
+ } else {
+ logger.Error(err, "receiver is not ready")
}
+ w.WriteHeader(http.StatusServiceUnavailable)
+ return
+ }
- if err := s.validate(ctx, receiver, r); err != nil {
- logger.Error(err, "unable to validate payload")
- w.WriteHeader(http.StatusBadRequest)
- return
- }
+ if err := s.validate(ctx, receiver, r); err != nil {
+ logger.Error(err, "unable to validate payload")
+ w.WriteHeader(http.StatusBadRequest)
+ return
+ }
- var withErrors bool
- for _, resource := range receiver.Spec.Resources {
- if err := s.requestReconciliation(ctx, logger, resource, receiver.Namespace); err != nil {
- logger.Error(err, "unable to request reconciliation")
- withErrors = true
- }
+ var evaluator func(client.Object) (*bool, error)
+ if receiver.Spec.ResourceFilter != "" {
+ evaluator, err = newCELEvaluator(receiver.Spec.ResourceFilter, r)
+ if err != nil {
+ logger.Error(err, "unable to create CEL evaluator")
+ w.WriteHeader(http.StatusInternalServerError)
}
+ }
- if withErrors {
- w.WriteHeader(http.StatusInternalServerError)
- } else {
- w.WriteHeader(http.StatusOK)
+ var withErrors bool
+ for _, resource := range receiver.Spec.Resources {
+ if err := s.requestReconciliation(ctx, logger, resource, receiver.Namespace, evaluator); err != nil {
+ logger.Error(err, "unable to request reconciliation")
+ withErrors = true
}
}
+
+ if withErrors {
+ w.WriteHeader(http.StatusInternalServerError)
+ } else {
+ w.WriteHeader(http.StatusOK)
+ }
}
func (s *ReceiverServer) validate(ctx context.Context, receiver apiv1.Receiver, r *http.Request) error {
@@ -397,8 +404,10 @@ func (s *ReceiverServer) token(ctx context.Context, receiver apiv1.Receiver) (st
return token, nil
}
+type resourcePredicate func(client.Object) (*bool, error)
+
// requestReconciliation requests reconciliation of all the resources matching the given CrossNamespaceObjectReference by annotating them accordingly.
-func (s *ReceiverServer) requestReconciliation(ctx context.Context, logger logr.Logger, resource apiv1.CrossNamespaceObjectReference, defaultNamespace string) error {
+func (s *ReceiverServer) requestReconciliation(ctx context.Context, logger logr.Logger, resource apiv1.CrossNamespaceObjectReference, defaultNamespace string, resourcePredicate resourcePredicate) error {
namespace := defaultNamespace
if resource.Namespace != "" {
namespace = resource.Namespace
@@ -443,6 +452,17 @@ func (s *ReceiverServer) requestReconciliation(ctx context.Context, logger logr.
}
for i, resource := range resources.Items {
+ if resourcePredicate != nil {
+ accept, err := resourcePredicate(&resource)
+ if err != nil {
+ return err
+ }
+ if !*accept {
+ logger.Info(fmt.Sprintf("resource '%s/%s.%s' NOT annotated because CEL expression returned false", resource.Kind, resource.Name, namespace))
+ continue
+ }
+ }
+
if err := s.annotate(ctx, &resources.Items[i]); err != nil {
return fmt.Errorf("failed to annotate resource: '%s/%s.%s': %w", resource.Kind, resource.Name, namespace, err)
} else {
@@ -470,6 +490,16 @@ func (s *ReceiverServer) requestReconciliation(ctx context.Context, logger logr.
return fmt.Errorf("unable to read %s '%s' error: %w", resource.Kind, objectKey, err)
}
+ if resourcePredicate != nil {
+ accept, err := resourcePredicate(u)
+ if err != nil {
+ return err
+ }
+ if !*accept {
+ logger.Info(fmt.Sprintf("resource '%s/%s.%s' NOT annotated because CEL expression returned false", resource.Kind, resource.Name, namespace))
+ return nil
+ }
+ }
err := s.annotate(ctx, u)
if err != nil {
return fmt.Errorf("failed to annotate resource: '%s/%s.%s': %w", resource.Kind, resource.Name, namespace, err)
diff --git a/internal/server/receiver_server.go b/internal/server/receiver_server.go
index 2eb9c5760..e71fd9b19 100644
--- a/internal/server/receiver_server.go
+++ b/internal/server/receiver_server.go
@@ -51,7 +51,7 @@ func NewReceiverServer(port string, logger logr.Logger, kubeClient client.Client
// ListenAndServe starts the HTTP server on the specified port
func (s *ReceiverServer) ListenAndServe(stopCh <-chan struct{}, mdlw middleware.Middleware) {
mux := http.NewServeMux()
- mux.Handle(apiv1.ReceiverWebhookPath, http.HandlerFunc(s.handlePayload()))
+ mux.Handle(apiv1.ReceiverWebhookPath, http.HandlerFunc(s.handlePayload))
handlerID := apiv1.ReceiverWebhookPath
if s.exportHTTPPathMetrics {
handlerID = ""
|