Skip to content

Commit

Permalink
suggested improvements after PR review
Browse files Browse the repository at this point in the history
Signed-off-by: Josef Karasek <[email protected]>
  • Loading branch information
josefkarasek committed Nov 8, 2023
1 parent 0fe1af5 commit 58db5bc
Show file tree
Hide file tree
Showing 9 changed files with 225 additions and 149 deletions.
28 changes: 28 additions & 0 deletions apis/eventing/v1alpha1/condition_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
Copyright 2023 The KEDA Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1alpha1

const (
// CloudEventSourceConditionActiveReason defines the active condition reason for CloudEventSource
CloudEventSourceConditionActiveReason = "CloudEventSourceActive"
// CloudEventSourceConditionFailedReason defines the failed condition reason for CloudEventSource
CloudEventSourceConditionFailedReason = "CloudEventSourceFailed"
// CloudEventSourceConditionActiveMessage defines the active condition message for CloudEventSource
CloudEventSourceConditionActiveMessage = "Is configured to send events to the configured destination"
// CloudEventSourceConditionFailedMessage defines the failed condition message for CloudEventSource
CloudEventSourceConditionFailedMessage = "Failed to send events to the configured destination"
)
4 changes: 2 additions & 2 deletions apis/keda/v1alpha1/condition_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ const (
)

const (
// ScaledObjectConditionReadySucccesReason defines the default Reason for correct ScaledObject
ScaledObjectConditionReadySucccesReason = "ScaledObjectReady"
// ScaledObjectConditionReadySuccessReason defines the default Reason for correct ScaledObject
ScaledObjectConditionReadySuccessReason = "ScaledObjectReady"
// ScaledObjectConditionReadySuccessMessage defines the default Message for correct ScaledObject
ScaledObjectConditionReadySuccessMessage = "ScaledObject is defined correctly and is ready for scaling"
// ScaledObjectConditionPausedReason defines the default Reason for paused ScaledObject
Expand Down
10 changes: 5 additions & 5 deletions cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func main() {
Recorder: eventRecorder,
ScaleClient: scaleClient,
ScaleHandler: scaledHandler,
EventEmitter: *eventEmitter,
EventEmitter: eventEmitter,
}).SetupWithManager(mgr, controller.Options{
MaxConcurrentReconciles: scaledObjectMaxReconciles,
}); err != nil {
Expand Down Expand Up @@ -254,10 +254,10 @@ func main() {
setupLog.Error(err, "unable to create controller", "controller", "ClusterTriggerAuthentication")
os.Exit(1)
}
if err = (&eventingcontrollers.CloudEventSourceReconciler{
Client: mgr.GetClient(),
EventEmitter: *eventEmitter,
}).SetupWithManager(mgr); err != nil {
if err = (eventingcontrollers.NewCloudEventSourceReconciler(
mgr.GetClient(),
eventEmitter,
)).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "CloudEventSource")
os.Exit(1)
}
Expand Down
50 changes: 24 additions & 26 deletions controllers/eventing/cloudeventsource_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,22 @@ import (
// CloudEventSourceReconciler reconciles a EventSource object
type CloudEventSourceReconciler struct {
client.Client
EventEmitter eventemitter.EventEmitter
eventEmitter eventemitter.EventHandler

cloudEventSourceGenerations *sync.Map
eventSourcePromMetricsMap map[string]string
eventSourcePromMetricsLock *sync.Mutex
}

type cloudEventSourceMetricsData struct {
namespace string
}

var (
eventSourcePromMetricsMap map[string]cloudEventSourceMetricsData
eventSourcePromMetricsLock *sync.Mutex
)

func init() {
eventSourcePromMetricsMap = make(map[string]cloudEventSourceMetricsData)
eventSourcePromMetricsLock = &sync.Mutex{}
// NewCloudEventSourceReconciler creates a new CloudEventSourceReconciler
func NewCloudEventSourceReconciler(c client.Client, e eventemitter.EventHandler) *CloudEventSourceReconciler {
return &CloudEventSourceReconciler{
Client: c,
eventEmitter: e,
cloudEventSourceGenerations: &sync.Map{},
eventSourcePromMetricsMap: make(map[string]string),
eventSourcePromMetricsLock: &sync.Mutex{},
}
}

// +kubebuilder:rbac:groups=eventing.keda.sh,resources=cloudeventsources;cloudeventsources/status,verbs="*"
Expand Down Expand Up @@ -114,7 +113,6 @@ func (r *CloudEventSourceReconciler) Reconcile(ctx context.Context, req ctrl.Req

// SetupWithManager sets up the controller with the Manager.
func (r *CloudEventSourceReconciler) SetupWithManager(mgr ctrl.Manager) error {
r.cloudEventSourceGenerations = &sync.Map{}
return ctrl.NewControllerManagedBy(mgr).
For(&eventingv1alpha1.CloudEventSource{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Complete(r)
Expand All @@ -130,7 +128,7 @@ func (r *CloudEventSourceReconciler) requestEventLoop(ctx context.Context, logge
return err
}

if err = r.EventEmitter.HandleCloudEventSource(ctx, eventSource); err != nil {
if err = r.eventEmitter.HandleCloudEventSource(ctx, eventSource); err != nil {
return err
}

Expand All @@ -148,7 +146,7 @@ func (r *CloudEventSourceReconciler) stopEventLoop(logger logr.Logger, eventSour
return err
}

if err := r.EventEmitter.DeleteCloudEventSource(eventSource); err != nil {
if err := r.eventEmitter.DeleteCloudEventSource(eventSource); err != nil {
return err
}
// delete CloudEventSource's current Generation
Expand All @@ -175,25 +173,25 @@ func (r *CloudEventSourceReconciler) cloudEventSourceGenerationChanged(logger lo
}

func (r *CloudEventSourceReconciler) updatePromMetrics(eventSource *eventingv1alpha1.CloudEventSource, namespacedName string) {
eventSourcePromMetricsLock.Lock()
defer eventSourcePromMetricsLock.Unlock()
r.eventSourcePromMetricsLock.Lock()
defer r.eventSourcePromMetricsLock.Unlock()

if metricsData, ok := eventSourcePromMetricsMap[namespacedName]; ok {
metricscollector.DecrementCRDTotal(metricscollector.CloudEventSourceResource, metricsData.namespace)
if ns, ok := r.eventSourcePromMetricsMap[namespacedName]; ok {
metricscollector.DecrementCRDTotal(metricscollector.CloudEventSourceResource, ns)
}

metricscollector.IncrementCRDTotal(metricscollector.CloudEventSourceResource, eventSource.Namespace)
eventSourcePromMetricsMap[namespacedName] = cloudEventSourceMetricsData{namespace: eventSource.Namespace}
r.eventSourcePromMetricsMap[namespacedName] = eventSource.Namespace
}

// UpdatePromMetricsOnDelete is idempotent, so it can be called multiple times without side-effects
func (r *CloudEventSourceReconciler) UpdatePromMetricsOnDelete(namespacedName string) {
eventSourcePromMetricsLock.Lock()
defer eventSourcePromMetricsLock.Unlock()
r.eventSourcePromMetricsLock.Lock()
defer r.eventSourcePromMetricsLock.Unlock()

if metricsData, ok := eventSourcePromMetricsMap[namespacedName]; ok {
metricscollector.DecrementCRDTotal(metricscollector.CloudEventSourceResource, metricsData.namespace)
if ns, ok := r.eventSourcePromMetricsMap[namespacedName]; ok {
metricscollector.DecrementCRDTotal(metricscollector.CloudEventSourceResource, ns)
}

delete(eventSourcePromMetricsMap, namespacedName)
delete(r.eventSourcePromMetricsMap, namespacedName)
}
4 changes: 2 additions & 2 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type ScaledObjectReconciler struct {
Recorder record.EventRecorder
ScaleClient scale.ScalesGetter
ScaleHandler scaling.ScaleHandler
EventEmitter eventemitter.EventEmitter
EventEmitter eventemitter.EventHandler

restMapper meta.RESTMapper
scaledObjectsGenerations *sync.Map
Expand Down Expand Up @@ -191,7 +191,7 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeNormal, eventreason.ScaledObjectReady, "ScaledObject is ready for scaling")
}
reqLogger.V(1).Info(msg)
conditions.SetReadyCondition(metav1.ConditionTrue, kedav1alpha1.ScaledObjectConditionReadySucccesReason, msg)
conditions.SetReadyCondition(metav1.ConditionTrue, kedav1alpha1.ScaledObjectConditionReadySuccessReason, msg)
}

if err := kedastatus.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, &conditions); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion controllers/keda/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ var _ = BeforeSuite(func() {
Recorder: k8sManager.GetEventRecorderFor("keda-operator"),
ScaleHandler: scaling.NewScaleHandler(k8sManager.GetClient(), scaleClient, k8sManager.GetScheme(), time.Duration(10), k8sManager.GetEventRecorderFor("keda-operator"), nil),
ScaleClient: scaleClient,
EventEmitter: *eventemitter.NewEventEmitter(k8sManager.GetClient(), k8sManager.GetEventRecorderFor("keda-operator"), "kubernetes-default"),
EventEmitter: eventemitter.NewEventEmitter(k8sManager.GetClient(), k8sManager.GetEventRecorderFor("keda-operator"), "kubernetes-default"),
}).SetupWithManager(k8sManager, controller.Options{})
Expect(err).ToNot(HaveOccurred())

Expand Down
40 changes: 22 additions & 18 deletions pkg/eventemitter/cloudevent_http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,17 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
cloudEventSourceType = "com.cloudeventsource.keda"
)

type CloudEventHTTPHandler struct {
Endpoint string
Client cloudevents.Client
ClusterName string
ActiveStatus metav1.ConditionStatus
ctx context.Context
logger logr.Logger
endpoint string
client cloudevents.Client
clusterName string
activeStatus metav1.ConditionStatus
}

func NewCloudEventHTTPHandler(context context.Context, clusterName string, uri string, logger logr.Logger) (*CloudEventHTTPHandler, error) {
Expand All @@ -53,47 +57,47 @@ func NewCloudEventHTTPHandler(context context.Context, clusterName string, uri s

logger.Info("Create new cloudevents http handler with endPoint: " + uri)
return &CloudEventHTTPHandler{
Client: client,
Endpoint: uri,
ClusterName: clusterName,
ActiveStatus: metav1.ConditionTrue,
client: client,
endpoint: uri,
clusterName: clusterName,
activeStatus: metav1.ConditionTrue,
ctx: ctx,
logger: logger,
}, nil
}

func (c *CloudEventHTTPHandler) SetActiveStatus(status metav1.ConditionStatus) {
c.ActiveStatus = status
c.activeStatus = status
}

func (c *CloudEventHTTPHandler) GetActiveStatus() metav1.ConditionStatus {
return c.ActiveStatus
return c.activeStatus
}

func (c *CloudEventHTTPHandler) CloseHandler() {

c.logger.V(1).Info("Closing CloudEvent HTTP handler")
}

func (c *CloudEventHTTPHandler) EmitEvent(eventData EventData, failureFunc func(eventData EventData, err error)) {
source := "/" + c.ClusterName + "/" + eventData.namespace + "/keda"
subject := "/" + c.ClusterName + "/" + eventData.namespace + "/workload/" + eventData.objectName
source := "/" + c.clusterName + "/" + eventData.namespace + "/keda"
subject := "/" + c.clusterName + "/" + eventData.namespace + "/workload/" + eventData.objectName

event := cloudevents.NewEvent()
event.SetSource(source)
event.SetSubject(subject)
event.SetType(CloudEventSourceType)
event.SetType(cloudEventSourceType)

if err := event.SetData(cloudevents.ApplicationJSON, EmitData{Reason: eventData.reason, Message: eventData.message}); err != nil {
c.logger.Error(err, "Failed to set data to cloudevent")
c.logger.Error(err, "Failed to set data to CloudEvents receiver")
return
}

err := c.Client.Send(c.ctx, event)
err := c.client.Send(c.ctx, event)
if protocol.IsNACK(err) || protocol.IsUndelivered(err) {
c.logger.Error(err, "Failed to send event to cloudevent")
c.logger.Error(err, "Failed to send event to CloudEvents receiver")
failureFunc(eventData, err)
return
}

c.logger.V(1).Info("Publish Event to CloudEvents receiver Successfully")
c.logger.V(1).Info("Successfully published event to CloudEvents receiver")
}
Loading

0 comments on commit 58db5bc

Please sign in to comment.