diff --git a/api/v1/receiver_types.go b/api/v1/receiver_types.go index 9790345d4..046fc0a4a 100644 --- a/api/v1/receiver_types.go +++ b/api/v1/receiver_types.go @@ -63,10 +63,22 @@ type ReceiverSpec struct { // +optional Events []string `json:"events,omitempty"` + // TODO: Validate one or other (or both?) + // A list of resources to be notified about changes. // +required Resources []CrossNamespaceObjectReference `json:"resources"` + // ResourceExpressions is a list of CEL expressions that will be parsed to + // determine resources to be notified about changes. + // The expressions must evaluate to CEL values that contain the keys "name", + // "kind", "apiVersion" and optionally "namespace". + // These values will be parsed to CrossNamespaceObjectReferences. + // e.g. {"name": "test-resource-1", "kind": "Receiver", "apiVersion": + // "notification.toolkit.fluxcd.io/v1"}. + // +optional + ResourceExpressions []string `json:"resourceExpressions,omitempty"` + // SecretRef specifies the Secret containing the token used // to validate the payload authenticity. // +required diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index 81a647ce8..8e0bbd010 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -126,6 +126,11 @@ func (in *ReceiverSpec) DeepCopyInto(out *ReceiverSpec) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.ResourceExpressions != nil { + in, out := &in.ResourceExpressions, &out.ResourceExpressions + *out = make([]string, len(*in)) + copy(*out, *in) + } out.SecretRef = in.SecretRef } diff --git a/config/crd/bases/notification.toolkit.fluxcd.io_receivers.yaml b/config/crd/bases/notification.toolkit.fluxcd.io_receivers.yaml index 239a1a81f..bb80b7177 100644 --- a/config/crd/bases/notification.toolkit.fluxcd.io_receivers.yaml +++ b/config/crd/bases/notification.toolkit.fluxcd.io_receivers.yaml @@ -62,6 +62,18 @@ spec: Secret references. pattern: ^([0-9]+(\.[0-9]+)?(ms|s|m|h))+$ type: string + resourceExpressions: + description: |- + ResourceExpressions is a list of CEL expressions that will be parsed to + determine resources to be notified about changes. + The expressions must evaluate to CEL values that contain the keys "name", + "kind", "apiVersion" and optionally "namespace". + These values will be parsed to CrossNamespaceObjectReferences. + e.g. {"name": "test-resource-1", "kind": "Receiver", "apiVersion": + "notification.toolkit.fluxcd.io/v1"}. + items: + type: string + type: array resources: description: A list of resources to be notified about changes. items: diff --git a/docs/api/v1/notification.md b/docs/api/v1/notification.md index 325c17596..697fa6ed4 100644 --- a/docs/api/v1/notification.md +++ b/docs/api/v1/notification.md @@ -122,6 +122,24 @@ e.g. ‘push’ for GitHub or ‘Push Hook’ for GitLab.

+resourceExpressions
+ +[]string + + + +(Optional) +

ResourceExpressions is a list of CEL expressions that will be parsed to +determine resources to be notified about changes. +The expressions must evaluate to CEL values that contain the keys “name”, +“kind”, “apiVersion” and optionally “namespace”. +These values will be parsed to CrossNamespaceObjectReferences. +e.g. {“name”: “test-resource-1”, “kind”: “Receiver”, “apiVersion”: +“notification.toolkit.fluxcd.io/v1”}.

+ + + + secretRef
@@ -321,6 +339,24 @@ e.g. ‘push’ for GitHub or ‘Push Hook’ for GitLab.

+resourceExpressions
+ +[]string + + + +(Optional) +

ResourceExpressions is a list of CEL expressions that will be parsed to +determine resources to be notified about changes. +The expressions must evaluate to CEL values that contain the keys “name”, +“kind”, “apiVersion” and optionally “namespace”. +These values will be parsed to CrossNamespaceObjectReferences. +e.g. {“name”: “test-resource-1”, “kind”: “Receiver”, “apiVersion”: +“notification.toolkit.fluxcd.io/v1”}.

+ + + + secretRef
diff --git a/go.mod b/go.mod index 9c5ea45e6..7f1392a3c 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/fluxcd/pkg/ssa v0.41.1 github.com/getsentry/sentry-go v0.29.0 github.com/go-logr/logr v1.4.2 + github.com/google/cel-go v0.20.1 github.com/google/go-github/v64 v64.0.0 github.com/hashicorp/go-retryablehttp v0.7.7 github.com/ktrysmt/go-bitbucket v0.9.80 @@ -39,6 +40,7 @@ require ( golang.org/x/oauth2 v0.23.0 golang.org/x/text v0.18.0 google.golang.org/api v0.199.0 + google.golang.org/protobuf v1.34.2 k8s.io/api v0.31.1 k8s.io/apimachinery v0.31.1 k8s.io/client-go v0.31.1 @@ -74,6 +76,7 @@ require ( github.com/DataDog/zstd v1.5.2 // indirect github.com/MakeNowJust/heredoc v1.0.0 // indirect github.com/ProtonMail/go-crypto v1.0.0 // indirect + github.com/antlr4-go/antlr/v4 v4.13.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/blang/semver/v4 v4.0.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect @@ -159,6 +162,7 @@ require ( github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/santhosh-tekuri/jsonschema/v6 v6.0.1 // indirect github.com/spf13/cobra v1.8.1 // indirect + github.com/stoewer/go-strcase v1.2.0 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/xlab/treeprint v1.2.0 // indirect go.opencensus.io v0.24.0 // indirect @@ -183,7 +187,6 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect google.golang.org/grpc v1.67.0 // indirect - google.golang.org/protobuf v1.34.2 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index a3fcfb21f..fb9392f87 100644 --- a/go.sum +++ b/go.sum @@ -73,6 +73,8 @@ github.com/PagerDuty/go-pagerduty v1.8.0 h1:MTFqTffIcAervB83U7Bx6HERzLbyaSPL/+ox github.com/PagerDuty/go-pagerduty v1.8.0/go.mod h1:nzIeAqyFSJAFkjWKvMzug0JtwDg+V+UoCWjFrfFH5mI= github.com/ProtonMail/go-crypto v1.0.0 h1:LRuvITjQWX+WIfr930YHG2HNfjR1uOfyf5vE0kC2U78= github.com/ProtonMail/go-crypto v1.0.0/go.mod h1:EjAoLdwvbIOoOQr3ihjnSoLZRtE8azugULFRteWMNc0= +github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI= +github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -215,6 +217,8 @@ github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU= github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= +github.com/google/cel-go v0.20.1 h1:nDx9r8S3L4pE61eDdt8igGj8rf5kjYR3ILxWIpWNi84= +github.com/google/cel-go v0.20.1/go.mod h1:kWcIzTsPX0zmQ+H3TirHstLLf9ep5QTsZBN9u4dOYLg= github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -381,12 +385,15 @@ github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM= github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/stoewer/go-strcase v1.2.0 h1:Z2iHWqGXH00XYgqDmNgQbIBxf3wrNq0F3feEy0ainaU= +github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -591,6 +598,7 @@ gopkg.in/evanphx/json-patch.v4 v4.12.0 h1:n6jtcsulIzXPJaxegRbvFNNrZDjbij7ny3gmSP gopkg.in/evanphx/json-patch.v4 v4.12.0/go.mod h1:p8EYWUEYMpynmqDbY58zCKCFZw8pRWMG4EsWvDvM72M= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= diff --git a/internal/server/receiver_handler_test.go b/internal/server/receiver_handler_test.go index b555dd02a..19a02de0d 100644 --- a/internal/server/receiver_handler_test.go +++ b/internal/server/receiver_handler_test.go @@ -27,6 +27,7 @@ import ( "net/http/httptest" "testing" + "github.com/go-logr/logr" "github.com/google/go-github/v64/github" "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" @@ -760,6 +761,140 @@ func Test_handlePayload(t *testing.T) { expectedResourcesAnnotated: 1, expectedResponseCode: http.StatusOK, }, + { + name: "resources determined by CEL expressions", + headers: map[string]string{ + "Content-Type": "application/json; charset=utf-8", + }, + receiver: &apiv1.Receiver{ + ObjectMeta: metav1.ObjectMeta{ + Name: "receiver", + }, + Spec: apiv1.ReceiverSpec{ + Type: apiv1.GenericReceiver, + SecretRef: meta.LocalObjectReference{ + Name: "token", + }, + ResourceExpressions: []string{ + `{"name": "test-resource-1", "kind": "Receiver", "apiVersion": "notification.toolkit.fluxcd.io/v1"}`, + `[{"name": body.image.split(':',2)[0] + '-2', "namespace": "tested", "kind": "Receiver", "apiVersion": "notification.toolkit.fluxcd.io/v1"}]`, + `body.resources.map(r, {"name": r, "kind": "Receiver", "apiVersion": "notification.toolkit.fluxcd.io/v1"})`, + }, + }, + Status: apiv1.ReceiverStatus{ + WebhookPath: apiv1.ReceiverWebhookPath, + Conditions: []metav1.Condition{{Type: meta.ReadyCondition, Status: metav1.ConditionTrue}}, + }, + }, + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "token", + }, + Data: map[string][]byte{ + "token": []byte("token"), + }, + }, + payload: map[string]interface{}{ + "image": "test-resource:1.2.1", + "resources": []string{ + "test-resource-3", + "test-resource-4", + }, + }, + resources: []client.Object{ + &apiv1.Receiver{ + TypeMeta: metav1.TypeMeta{ + Kind: apiv1.ReceiverKind, + APIVersion: apiv1.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-resource-1", + }, + }, + &apiv1.Receiver{ + TypeMeta: metav1.TypeMeta{ + Kind: apiv1.ReceiverKind, + APIVersion: apiv1.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-resource-2", + Namespace: "tested", + }, + }, + &apiv1.Receiver{ + TypeMeta: metav1.TypeMeta{ + Kind: apiv1.ReceiverKind, + APIVersion: apiv1.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-resource-3", + }, + }, + &apiv1.Receiver{ + TypeMeta: metav1.TypeMeta{ + Kind: apiv1.ReceiverKind, + APIVersion: apiv1.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-resource-4", + }, + }, + }, + expectedResourcesAnnotated: 4, // TODO: This should really check more than just the count. + expectedResponseCode: http.StatusOK, + }, + { + name: "handling errors when parsing the CEL expression results", + headers: map[string]string{ + "Content-Type": "application/json; charset=utf-8", + }, + receiver: &apiv1.Receiver{ + ObjectMeta: metav1.ObjectMeta{ + Name: "receiver", + }, + Spec: apiv1.ReceiverSpec{ + Type: apiv1.GenericReceiver, + SecretRef: meta.LocalObjectReference{ + Name: "token", + }, + ResourceExpressions: []string{ + `{"name": ["test-resource-1"], "kind": "Receiver", "apiVersion": "notification.toolkit.fluxcd.io/v1"}`, + }, + }, + Status: apiv1.ReceiverStatus{ + WebhookPath: apiv1.ReceiverWebhookPath, + Conditions: []metav1.Condition{{Type: meta.ReadyCondition, Status: metav1.ConditionTrue}}, + }, + }, + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "token", + }, + Data: map[string][]byte{ + "token": []byte("token"), + }, + }, + payload: map[string]interface{}{ + "image": "test-resource:1.2.1", + "resources": []string{ + "test-resource-3", + "test-resource-4", + }, + }, + resources: []client.Object{ + &apiv1.Receiver{ + TypeMeta: metav1.TypeMeta{ + Kind: apiv1.ReceiverKind, + APIVersion: apiv1.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-resource-1", + }, + }, + }, + expectedResourcesAnnotated: 0, // TODO: This should really check more than just the count. + expectedResponseCode: http.StatusBadRequest, + }, } scheme := runtime.NewScheme() @@ -785,11 +920,11 @@ func Test_handlePayload(t *testing.T) { } client := builder.Build() - s := ReceiverServer{ - port: "", - logger: logger.NewLogger(logger.Options{}), - kubeClient: client, - } + s := newReceiverHandler( + logger.NewLogger(logger.Options{}), + client, + false, + ) data, err := json.Marshal(tt.payload) if err != nil { @@ -809,8 +944,7 @@ func Test_handlePayload(t *testing.T) { } rr := httptest.NewRecorder() - handler := s.handlePayload() - handler(rr, req) + s.handlePayload(rr, req) g.Expect(rr.Result().StatusCode).To(gomega.Equal(tt.expectedResponseCode)) var allReceivers apiv1.ReceiverList @@ -827,3 +961,80 @@ func Test_handlePayload(t *testing.T) { }) } } + +func TestReceiverServer(t *testing.T) { + receiver := &apiv1.Receiver{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-receiver", + Namespace: "default", + }, + Spec: apiv1.ReceiverSpec{ + Type: apiv1.GenericReceiver, + SecretRef: meta.LocalObjectReference{ + Name: "token", + }, + ResourceExpressions: []string{ + `{"name": "test-receiver", "kind": "Receiver", "apiVersion": "notification.toolkit.fluxcd.io/v1"}`, + }, + }, + Status: apiv1.ReceiverStatus{ + WebhookPath: apiv1.ReceiverWebhookPath, + Conditions: []metav1.Condition{ + { + Type: meta.ReadyCondition, + Status: metav1.ConditionTrue, + }, + }, + }, + } + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "token", + Namespace: "default", + }, + Data: map[string][]byte{ + "token": []byte("token"), + }, + } + + k8sClient := buildTestClient(receiver, secret) + + rs := newReceiverHandler(logr.Discard(), k8sClient, false) + srv := httptest.NewServer(rs) + defer srv.Close() + + payload := map[string]any{ + "image": "test-resource:1.2.1", + } + + body, err := json.Marshal(payload) + if err != nil { + t.Fatal(err) + } + req, err := http.NewRequest(http.MethodPost, srv.URL+apiv1.ReceiverWebhookPath, bytes.NewBuffer(body)) + if err != nil { + t.Fatal(err) + } + req.Header.Set("Content-Type", "application/json; charset=utf-8") + + resp, err := srv.Client().Do(req) + if err != nil { + t.Fatal(err) + } + + if resp.StatusCode != http.StatusOK { + t.Errorf("got StatusCode %v, want %v", resp.StatusCode, http.StatusOK) + } +} + +func buildTestClient(objs ...client.Object) client.Client { + scheme := runtime.NewScheme() + apiv1.AddToScheme(scheme) + corev1.AddToScheme(scheme) + + return fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(objs...). + WithStatusSubresource(&apiv1.Receiver{}). + WithIndex(&apiv1.Receiver{}, WebhookPathIndexKey, IndexReceiverWebhookPath).Build() +} diff --git a/internal/server/receiver_handlers.go b/internal/server/receiver_handlers.go index 5a054cec7..dea1e1b0e 100644 --- a/internal/server/receiver_handlers.go +++ b/internal/server/receiver_handlers.go @@ -26,8 +26,10 @@ import ( "errors" "fmt" "io" + "mime" "net/http" "net/url" + "reflect" "strings" "time" @@ -36,7 +38,12 @@ import ( "github.com/fluxcd/pkg/apis/meta" "github.com/fluxcd/pkg/runtime/conditions" "github.com/go-logr/logr" + "github.com/google/cel-go/cel" + "github.com/google/cel-go/checker/decls" + "github.com/google/cel-go/common/types/traits" + celext "github.com/google/cel-go/ext" "github.com/google/go-github/v64/github" + "google.golang.org/protobuf/types/known/structpb" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -70,68 +77,100 @@ func IndexReceiverWebhookPath(o client.Object) []string { return nil } -func (s *ReceiverServer) handlePayload() func(w http.ResponseWriter, r *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - ctx := context.Background() - digest := url.PathEscape(strings.TrimPrefix(r.RequestURI, apiv1.ReceiverWebhookPath)) +func newReceiverHandler(logger logr.Logger, kubeClient client.Client, exportHTTPPathMetrics bool) *receiverHandler { + h := &receiverHandler{ + logger: logger.WithName("receiver-server"), + kubeClient: kubeClient, + exportHTTPPathMetrics: exportHTTPPathMetrics, + ServeMux: http.NewServeMux(), + } + h.ServeMux.Handle(apiv1.ReceiverWebhookPath, http.HandlerFunc(h.handlePayload)) - s.logger.Info(fmt.Sprintf("handling request: %s", digest)) + return h +} - var allReceivers apiv1.ReceiverList - err := s.kubeClient.List(ctx, &allReceivers, client.MatchingFields{ - WebhookPathIndexKey: r.RequestURI, - }, client.Limit(1)) - if err != nil { - s.logger.Error(err, "unable to list receivers") - w.WriteHeader(http.StatusInternalServerError) - return - } +type receiverHandler struct { + logger logr.Logger + kubeClient client.Client + exportHTTPPathMetrics bool + *http.ServeMux +} - if len(allReceivers.Items) == 0 { - w.WriteHeader(http.StatusNotFound) - return - } +func (s *receiverHandler) handlePayload(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + digest := url.PathEscape(strings.TrimPrefix(r.RequestURI, apiv1.ReceiverWebhookPath)) - receiver := allReceivers.Items[0] - logger := s.logger.WithValues( - "reconciler kind", apiv1.ReceiverKind, - "name", receiver.Name, - "namespace", receiver.Namespace) + s.logger.Info(fmt.Sprintf("handling request: %s", digest)) - if receiver.Spec.Suspend || !conditions.IsReady(&receiver) { - err := errors.New("unable to process request") - if receiver.Spec.Suspend { - logger.Error(err, "receiver is suspended") - } else { - logger.Error(err, "receiver is not ready") - } - w.WriteHeader(http.StatusServiceUnavailable) - return - } + var allReceivers apiv1.ReceiverList + err := s.kubeClient.List(ctx, &allReceivers, client.MatchingFields{ + WebhookPathIndexKey: r.RequestURI, + }, client.Limit(1)) + + if err != nil { + s.logger.Error(err, "unable to list receivers") + w.WriteHeader(http.StatusInternalServerError) + return + } + + if len(allReceivers.Items) == 0 { + w.WriteHeader(http.StatusNotFound) + return + } - if err := s.validate(ctx, receiver, r); err != nil { - logger.Error(err, "unable to validate payload") - w.WriteHeader(http.StatusBadRequest) - return + receiver := allReceivers.Items[0] + logger := s.logger.WithValues( + "reconciler kind", apiv1.ReceiverKind, + "name", receiver.Name, + "namespace", receiver.Namespace) + + if receiver.Spec.Suspend || !conditions.IsReady(&receiver) { + err := errors.New("unable to process request") + if receiver.Spec.Suspend { + logger.Error(err, "receiver is suspended") + } else { + logger.Error(err, "receiver is not ready") } + w.WriteHeader(http.StatusServiceUnavailable) + return + } - var withErrors bool - for _, resource := range receiver.Spec.Resources { - if err := s.requestReconciliation(ctx, logger, resource, receiver.Namespace); err != nil { - logger.Error(err, "unable to request reconciliation") - withErrors = true - } + if err := s.validate(ctx, receiver, r); err != nil { + logger.Error(err, "unable to validate payload") + w.WriteHeader(http.StatusBadRequest) + return + } + + var withErrors bool + for _, resource := range receiver.Spec.Resources { + if err := s.requestReconciliation(ctx, logger, resource, receiver.Namespace); err != nil { + logger.Error(err, "unable to request reconciliation") + withErrors = true } + } - if withErrors { - w.WriteHeader(http.StatusInternalServerError) - } else { - w.WriteHeader(http.StatusOK) + evaluatedResources, err := s.evaluateResourceExpressions(r, receiver) + if err != nil { + logger.Error(err, "unable to evaluate resource expressions") + w.WriteHeader(http.StatusBadRequest) + return + } + + for _, resource := range evaluatedResources { + if err := s.requestReconciliation(ctx, logger, resource, receiver.Namespace); err != nil { + logger.Error(err, "unable to request reconciliation") + withErrors = true } } + + if withErrors { + w.WriteHeader(http.StatusInternalServerError) + } else { + w.WriteHeader(http.StatusOK) + } } -func (s *ReceiverServer) validate(ctx context.Context, receiver apiv1.Receiver, r *http.Request) error { +func (s *receiverHandler) validate(ctx context.Context, receiver apiv1.Receiver, r *http.Request) error { token, err := s.token(ctx, receiver) if err != nil { return fmt.Errorf("unable to read token, error: %w", err) @@ -375,7 +414,7 @@ func (s *ReceiverServer) validate(ctx context.Context, receiver apiv1.Receiver, return fmt.Errorf("recevier type '%s' not supported", receiver.Spec.Type) } -func (s *ReceiverServer) token(ctx context.Context, receiver apiv1.Receiver) (string, error) { +func (s *receiverHandler) token(ctx context.Context, receiver apiv1.Receiver) (string, error) { token := "" secretName := types.NamespacedName{ Namespace: receiver.GetNamespace(), @@ -398,7 +437,7 @@ func (s *ReceiverServer) token(ctx context.Context, receiver apiv1.Receiver) (st } // requestReconciliation requests reconciliation of all the resources matching the given CrossNamespaceObjectReference by annotating them accordingly. -func (s *ReceiverServer) requestReconciliation(ctx context.Context, logger logr.Logger, resource apiv1.CrossNamespaceObjectReference, defaultNamespace string) error { +func (s *receiverHandler) requestReconciliation(ctx context.Context, logger logr.Logger, resource apiv1.CrossNamespaceObjectReference, defaultNamespace string) error { namespace := defaultNamespace if resource.Namespace != "" { namespace = resource.Namespace @@ -481,7 +520,7 @@ func (s *ReceiverServer) requestReconciliation(ctx context.Context, logger logr. return nil } -func (s *ReceiverServer) annotate(ctx context.Context, resource *metav1.PartialObjectMetadata) error { +func (s *receiverHandler) annotate(ctx context.Context, resource *metav1.PartialObjectMetadata) error { patch := client.MergeFrom(resource.DeepCopy()) sourceAnnotations := resource.GetAnnotations() @@ -545,3 +584,135 @@ func getGroupVersion(s string) (string, string) { return slice[0], slice[1] } + +func (s *receiverHandler) evaluateResourceExpressions(r *http.Request, receiver apiv1.Receiver) ([]apiv1.CrossNamespaceObjectReference, error) { + if len(receiver.Spec.ResourceExpressions) == 0 { + return nil, nil + } + + body := map[string]any{} + // Only decodes the body for the expression if the body is JSON. + // Technically you could generate several resources without any body. + if isJSONContent(r) { + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + return nil, fmt.Errorf("failed to parse request body as JSON: %s", err) + } + } + + env, err := makeCELEnv() + if err != nil { + return nil, fmt.Errorf("failed to setup CEL environment: %s", err) + } + + var combinedResources []apiv1.CrossNamespaceObjectReference + + var returnErr error + for _, expr := range receiver.Spec.ResourceExpressions { + evaluated, evalErr := evaluate(expr, env, map[string]interface{}{ + "body": body, + }) + if evalErr != nil { + returnErr = errors.Join(returnErr, evalErr) + continue + } + combinedResources = append(combinedResources, evaluated...) + } + + return combinedResources, returnErr +} + +func evaluate(expr string, env *cel.Env, data map[string]any) ([]apiv1.CrossNamespaceObjectReference, error) { + parsed, issues := env.Parse(expr) + if issues != nil && issues.Err() != nil { + return nil, fmt.Errorf("failed to parse expression %v: %w", expr, issues.Err()) + } + + checked, issues := env.Check(parsed) + if issues != nil && issues.Err() != nil { + return nil, fmt.Errorf("expression %v check failed: %w", expr, issues.Err()) + } + + prg, err := env.Program(checked, cel.EvalOptions(cel.OptOptimize)) + if err != nil { + return nil, fmt.Errorf("expression %v failed to create a Program: %w", expr, err) + } + + out, _, err := prg.Eval(data) + if err != nil { + return nil, fmt.Errorf("expression %v failed to evaluate: %w", expr, err) + } + + switch v := out.(type) { + case traits.Lister: + return parseList(v) + case traits.Mapper: + ref, err := mapperToReference(v) + if err != nil { + return nil, err + } + return []apiv1.CrossNamespaceObjectReference{*ref}, nil + // TODO Log out other types? + } + + return nil, nil +} + +func parseList(l traits.Lister) ([]apiv1.CrossNamespaceObjectReference, error) { + var resources []apiv1.CrossNamespaceObjectReference + it := l.Iterator() + for it.HasNext().Value() == true { + switch element := it.Next().(type) { + case traits.Mapper: + cno, err := mapperToReference(element) + if err != nil { + return nil, err + } + resources = append(resources, *cno) + } + } + + return resources, nil +} + +func mapperToReference(v traits.Mapper) (*apiv1.CrossNamespaceObjectReference, error) { + raw, err := v.ConvertToNative(reflect.TypeOf(&structpb.Value{})) + if err != nil { + return nil, fmt.Errorf("failed to convert to resource reference: %s", err) + } + cno := apiv1.CrossNamespaceObjectReference{} + b, err := raw.(*structpb.Value).MarshalJSON() + if err != nil { + return nil, fmt.Errorf("converting object references to JSON: %w", err) + } + err = json.Unmarshal(b, &cno) + if err != nil { + return nil, fmt.Errorf("parsing object reference: %w", err) + } + + return &cno, err +} + +func makeCELEnv() (*cel.Env, error) { + mapStrDyn := decls.NewMapType(decls.String, decls.Dyn) + return cel.NewEnv( + celext.Strings(), + celext.Encoders(), + cel.Declarations( + decls.NewVar("body", mapStrDyn), + )) +} + +func isJSONContent(r *http.Request) bool { + contentType := r.Header.Get("Content-type") + for _, v := range strings.Split(contentType, ",") { + t, _, err := mime.ParseMediaType(v) + if err != nil { + break + } + if t == "application/json" { + return true + } + } + + return false +} diff --git a/internal/server/receiver_server.go b/internal/server/receiver_server.go index 2eb9c5760..c01c8f8bd 100644 --- a/internal/server/receiver_server.go +++ b/internal/server/receiver_server.go @@ -32,31 +32,25 @@ import ( // ReceiverServer handles webhook POST requests type ReceiverServer struct { - port string - logger logr.Logger - kubeClient client.Client - exportHTTPPathMetrics bool + port string + *receiverHandler } // NewReceiverServer returns an HTTP server that handles webhooks func NewReceiverServer(port string, logger logr.Logger, kubeClient client.Client, exportHTTPPathMetrics bool) *ReceiverServer { return &ReceiverServer{ - port: port, - logger: logger.WithName("receiver-server"), - kubeClient: kubeClient, - exportHTTPPathMetrics: exportHTTPPathMetrics, + port: port, + receiverHandler: newReceiverHandler(logger, kubeClient, exportHTTPPathMetrics), } } // ListenAndServe starts the HTTP server on the specified port func (s *ReceiverServer) ListenAndServe(stopCh <-chan struct{}, mdlw middleware.Middleware) { - mux := http.NewServeMux() - mux.Handle(apiv1.ReceiverWebhookPath, http.HandlerFunc(s.handlePayload())) handlerID := apiv1.ReceiverWebhookPath if s.exportHTTPPathMetrics { handlerID = "" } - h := std.Handler(handlerID, mdlw, mux) + h := std.Handler(handlerID, mdlw, s.receiverHandler) srv := &http.Server{ Addr: s.port, Handler: h,