Skip to content

Commit

Permalink
fix: rework the disruption cron events (#893)
Browse files Browse the repository at this point in the history
* fix: rework the disruption cron events

Remove the broadcaster from the disruption cron webhook validator to the disruption cron reconciler to be able to re-emmit missing disruption cron events and ensure the resource exist during the re-emitting of the event.

Jira: CHAOSPLT-283
  • Loading branch information
aymericDD authored Aug 7, 2024
1 parent 37f5655 commit f4c5261
Show file tree
Hide file tree
Showing 8 changed files with 809 additions and 312 deletions.
6 changes: 4 additions & 2 deletions api/v1beta1/disruption_cron_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

package v1beta1

import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func init() {
SchemeBuilder.Register(&DisruptionCron{}, &DisruptionCronList{})
Expand Down Expand Up @@ -63,7 +65,7 @@ type DisruptionCronSpec struct {
Reporting *Reporting `json:"reporting,omitempty"`
}

// TargetResource specifies the long-lived resource to be targeted for disruptions.
// TargetResourceSpec specifies the long-lived resource to be targeted for disruptions.
// DisruptionCrons are intended to exist semi-permanently, and thus appropriate targets can only be other long-lived resources,
// such as statefulsets or deployment.
type TargetResourceSpec struct {
Expand Down
31 changes: 28 additions & 3 deletions api/v1beta1/disruption_cron_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,26 @@
package v1beta1

import (
"encoding/json"
"errors"
"strings"
"time"

"github.com/DataDog/chaos-controller/utils"
"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/webhook"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)

const (
EventDisruptionCronAnnotation = "disruption_cron"
EventDisruptionAnnotation = "disruption"
)

var (
disruptionCronWebhookLogger *zap.SugaredLogger
disruptionCronWebhookRecorder record.EventRecorder
Expand Down Expand Up @@ -67,7 +75,7 @@ func (d *DisruptionCron) ValidateCreate() (admission.Warnings, error) {
}

// send informative event to disruption cron to broadcast
disruptionCronWebhookRecorder.AnnotatedEventf(d, d.GetAnnotations(), Events[EventDisruptionCronCreated].Type, string(EventDisruptionCronCreated), Events[EventDisruptionCronCreated].OnDisruptionTemplateMessage)
d.emitEvent(EventDisruptionCronCreated)

return nil, nil
}
Expand All @@ -82,7 +90,7 @@ func (d *DisruptionCron) ValidateUpdate(oldObject runtime.Object) (warnings admi
}

// send informative event to disruption cron to broadcast
disruptionCronWebhookRecorder.Event(d, Events[EventDisruptionCronUpdated].Type, string(EventDisruptionCronUpdated), Events[EventDisruptionCronUpdated].OnDisruptionTemplateMessage)
d.emitEvent(EventDisruptionCronUpdated)

return nil, nil
}
Expand All @@ -92,8 +100,25 @@ func (d *DisruptionCron) ValidateDelete() (warnings admission.Warnings, err erro

log.Infow("validating deleted disruption cron", "spec", d.Spec)

// During the validation of the deletion the timestamp does not exist so we need to set it before emitting the event
d.DeletionTimestamp = &metav1.Time{Time: time.Now()}

// send informative event to disruption cron to broadcast
disruptionCronWebhookRecorder.Event(d, Events[EventDisruptionCronDeleted].Type, string(EventDisruptionCronDeleted), Events[EventDisruptionCronDeleted].OnDisruptionTemplateMessage)
d.emitEvent(EventDisruptionCronDeleted)

return nil, nil
}

func (d *DisruptionCron) emitEvent(eventReason EventReason) {
disruptionCronJSON, err := json.Marshal(d)
if err != nil {
disruptionCronWebhookLogger.Errorw("failed to marshal disruption cron", "error", err)
return
}

annotations := map[string]string{
EventDisruptionCronAnnotation: string(disruptionCronJSON),
}

disruptionCronWebhookRecorder.AnnotatedEventf(d, annotations, Events[eventReason].Type, string(eventReason), Events[eventReason].OnDisruptionTemplateMessage)
}
81 changes: 62 additions & 19 deletions api/v1beta1/disruption_cron_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@
package v1beta1

import (
"encoding/json"

"github.com/DataDog/chaos-controller/mocks"
"github.com/stretchr/testify/mock"
"go.uber.org/zap/zaptest"
authV1 "k8s.io/api/authentication/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -46,13 +49,18 @@ var _ = Describe("DisruptionCron Webhook", func() {
When("the controller is not in delete-only mode", func() {
It("should send an EventDisruptionCronCreated event to the broadcast", func() {
// Arrange
disruptionCron := &DisruptionCron{
Spec: DisruptionCronSpec{},
disruptionCron := makeValidDisruptionCron()

disruptionCronJSON, err := json.Marshal(disruptionCron)
Expect(err).ShouldNot(HaveOccurred())

expectedAnnotation := map[string]string{
EventDisruptionCronAnnotation: string(disruptionCronJSON),
}

By("sending the EventDisruptionCronCreated event to the broadcast")
mockEventRecorder := mocks.NewEventRecorderMock(GinkgoT())
mockEventRecorder.EXPECT().AnnotatedEventf(disruptionCron, mock.Anything, Events[EventDisruptionCronCreated].Type, string(EventDisruptionCronCreated), Events[EventDisruptionCronCreated].OnDisruptionTemplateMessage)
mockEventRecorder.EXPECT().AnnotatedEventf(disruptionCron, expectedAnnotation, Events[EventDisruptionCronCreated].Type, string(EventDisruptionCronCreated), Events[EventDisruptionCronCreated].OnDisruptionTemplateMessage)
disruptionCronWebhookRecorder = mockEventRecorder

// Act
Expand Down Expand Up @@ -82,9 +90,16 @@ var _ = Describe("DisruptionCron Webhook", func() {
Groups: []string{"group1"},
})).To(Succeed())

disruptionCronJSON, err := json.Marshal(disruptionCron)
Expect(err).ShouldNot(HaveOccurred())

expectedAnnotation := map[string]string{
EventDisruptionCronAnnotation: string(disruptionCronJSON),
}

By("sending the EventDisruptionCronCreated event to the broadcast")
mockEventRecorder := mocks.NewEventRecorderMock(GinkgoT())
mockEventRecorder.EXPECT().AnnotatedEventf(disruptionCron, mock.Anything, Events[EventDisruptionCronCreated].Type, string(EventDisruptionCronCreated), Events[EventDisruptionCronCreated].OnDisruptionTemplateMessage)
mockEventRecorder.EXPECT().AnnotatedEventf(disruptionCron, expectedAnnotation, Events[EventDisruptionCronCreated].Type, string(EventDisruptionCronCreated), Events[EventDisruptionCronCreated].OnDisruptionTemplateMessage)
disruptionCronWebhookRecorder = mockEventRecorder

// Act
Expand Down Expand Up @@ -183,9 +198,16 @@ var _ = Describe("DisruptionCron Webhook", func() {
// Arrange
disruptionCron := makeValidDisruptionCron()

disruptionCronJSON, err := json.Marshal(disruptionCron)
Expect(err).ShouldNot(HaveOccurred())

expectedAnnotation := map[string]string{
EventDisruptionCronAnnotation: string(disruptionCronJSON),
}

By("sending the EventDisruptionCronUpdated event to the broadcast")
mockEventRecorder := mocks.NewEventRecorderMock(GinkgoT())
mockEventRecorder.EXPECT().Event(disruptionCron, Events[EventDisruptionCronUpdated].Type, string(EventDisruptionCronUpdated), Events[EventDisruptionCronUpdated].OnDisruptionTemplateMessage)
mockEventRecorder.EXPECT().AnnotatedEventf(disruptionCron, expectedAnnotation, Events[EventDisruptionCronUpdated].Type, string(EventDisruptionCronUpdated), Events[EventDisruptionCronUpdated].OnDisruptionTemplateMessage)
disruptionCronWebhookRecorder = mockEventRecorder

// Act
Expand All @@ -205,9 +227,16 @@ var _ = Describe("DisruptionCron Webhook", func() {
disruptionCronWebhookDeleteOnly = true
disruptionCron := makeValidDisruptionCron()

disruptionCronJSON, err := json.Marshal(disruptionCron)
Expect(err).ShouldNot(HaveOccurred())

expectedAnnotation := map[string]string{
EventDisruptionCronAnnotation: string(disruptionCronJSON),
}

By("sending the EventDisruptionCronUpdated event to the broadcast")
mockEventRecorder := mocks.NewEventRecorderMock(GinkgoT())
mockEventRecorder.EXPECT().Event(disruptionCron, Events[EventDisruptionCronUpdated].Type, string(EventDisruptionCronUpdated), Events[EventDisruptionCronUpdated].OnDisruptionTemplateMessage)
mockEventRecorder.EXPECT().AnnotatedEventf(disruptionCron, expectedAnnotation, Events[EventDisruptionCronUpdated].Type, string(EventDisruptionCronUpdated), Events[EventDisruptionCronUpdated].OnDisruptionTemplateMessage)
disruptionCronWebhookRecorder = mockEventRecorder

// Act
Expand All @@ -234,9 +263,16 @@ var _ = Describe("DisruptionCron Webhook", func() {
oldDisruptionCron := makeValidDisruptionCron()
Expect(oldDisruptionCron.SetUserInfo(userInfo)).To(Succeed())

disruptionCronJSON, err := json.Marshal(disruptionCron)
Expect(err).ShouldNot(HaveOccurred())

expectedAnnotation := map[string]string{
EventDisruptionCronAnnotation: string(disruptionCronJSON),
}

By("emit an event to the broadcast")
mockEventRecorder := mocks.NewEventRecorderMock(GinkgoT())
mockEventRecorder.EXPECT().Event(disruptionCron, Events[EventDisruptionCronUpdated].Type, string(EventDisruptionCronUpdated), Events[EventDisruptionCronUpdated].OnDisruptionTemplateMessage)
mockEventRecorder.EXPECT().AnnotatedEventf(disruptionCron, expectedAnnotation, Events[EventDisruptionCronUpdated].Type, string(EventDisruptionCronUpdated), Events[EventDisruptionCronUpdated].OnDisruptionTemplateMessage)
disruptionCronWebhookRecorder = mockEventRecorder

// Act
Expand Down Expand Up @@ -264,7 +300,7 @@ var _ = Describe("DisruptionCron Webhook", func() {

By("not emit an event to the broadcast")
mockEventRecorder := mocks.NewEventRecorderMock(GinkgoT())
mockEventRecorder.AssertNotCalled(GinkgoT(), "Event", mock.Anything, mock.Anything, mock.Anything, mock.Anything)
mockEventRecorder.AssertNotCalled(GinkgoT(), "AnnotatedEventf", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything)
disruptionCronWebhookRecorder = mockEventRecorder

// Act
Expand Down Expand Up @@ -315,15 +351,24 @@ var _ = Describe("DisruptionCron Webhook", func() {
Describe("success cases", func() {
When("the controller is not in delete-only mode", func() {
It("should send an EventDisruptionCronDeleted event to the broadcast", func() {

disruptionCron := &DisruptionCron{
// Arrange
Spec: DisruptionCronSpec{},
}
// Arrange
disruptionCron := makeValidDisruptionCron()

By("sending the EventDisruptionCronDeleted event to the broadcast")
mockEventRecorder := mocks.NewEventRecorderMock(GinkgoT())
mockEventRecorder.EXPECT().Event(disruptionCron, Events[EventDisruptionCronDeleted].Type, string(EventDisruptionCronDeleted), Events[EventDisruptionCronDeleted].OnDisruptionTemplateMessage)
mockEventRecorder.EXPECT().AnnotatedEventf(disruptionCron, mock.Anything, Events[EventDisruptionCronDeleted].Type, string(EventDisruptionCronDeleted), Events[EventDisruptionCronDeleted].OnDisruptionTemplateMessage).RunAndReturn(
func(object runtime.Object, annotations map[string]string, _ string, _ string, _ string, _ ...interface{}) {
inputDisruptionCron := object.(*DisruptionCron)

inputDisruptionCronAnnotationString := annotations[EventDisruptionCronAnnotation]
err := json.Unmarshal([]byte(inputDisruptionCronAnnotationString), inputDisruptionCron)
Expect(err).ShouldNot(HaveOccurred())

inputDisruptionCron.DeletionTimestamp = nil

Expect(inputDisruptionCron).To(Equal(disruptionCron))
},
)
disruptionCronWebhookRecorder = mockEventRecorder

// Act
Expand All @@ -339,15 +384,13 @@ var _ = Describe("DisruptionCron Webhook", func() {

When("the controller is in delete-only mode", func() {
It("should send an EventDisruptionCronDeleted event to the broadcast", func() {
// Arrange
disruptionCronWebhookDeleteOnly = true
disruptionCron := &DisruptionCron{
// Arrange
Spec: DisruptionCronSpec{},
}
disruptionCron := makeValidDisruptionCron()

By("sending the EventDisruptionCronDeleted event to the broadcast")
mockEventRecorder := mocks.NewEventRecorderMock(GinkgoT())
mockEventRecorder.EXPECT().Event(disruptionCron, Events[EventDisruptionCronDeleted].Type, string(EventDisruptionCronDeleted), Events[EventDisruptionCronDeleted].OnDisruptionTemplateMessage)
mockEventRecorder.EXPECT().AnnotatedEventf(disruptionCron, mock.Anything, Events[EventDisruptionCronDeleted].Type, string(EventDisruptionCronDeleted), Events[EventDisruptionCronDeleted].OnDisruptionTemplateMessage)
disruptionCronWebhookRecorder = mockEventRecorder

// Act
Expand Down
41 changes: 26 additions & 15 deletions api/v1beta1/disruption_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package v1beta1

import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
Expand Down Expand Up @@ -119,16 +120,16 @@ var _ webhook.Validator = &Disruption{}

// ValidateCreate implements webhook.Validator so a webhook will be registered for the type
func (r *Disruption) ValidateCreate() (admission.Warnings, error) {
logger := logger.With("disruptionName", r.Name, "disruptionNamespace", r.Namespace)
log := logger.With("disruptionName", r.Name, "disruptionNamespace", r.Namespace)

ctx, err := r.SpanContext(context.Background())
if err != nil {
logger.Errorw("did not find span context", "err", err)
log.Errorw("did not find span context", "err", err)
} else {
logger = logger.With(tracerSink.GetLoggableTraceContext(trace.SpanFromContext(ctx))...)
log = log.With(tracerSink.GetLoggableTraceContext(trace.SpanFromContext(ctx))...)
}

logger.Infow("validating created disruption", "spec", r.Spec)
log.Infow("validating created disruption", "spec", r.Spec)

// delete-only mode, reject everything trying to be created
if deleteOnly {
Expand Down Expand Up @@ -204,7 +205,7 @@ func (r *Disruption) ValidateCreate() (admission.Warnings, error) {

if err := r.Spec.Validate(); err != nil {
if mErr := metricsSink.MetricValidationFailed(r.getMetricsTags()); mErr != nil {
logger.Errorw("error sending a metric", "error", mErr)
log.Errorw("error sending a metric", "error", mErr)
}

return nil, err
Expand Down Expand Up @@ -246,19 +247,29 @@ func (r *Disruption) ValidateCreate() (admission.Warnings, error) {

// send validation metric
if err := metricsSink.MetricValidationCreated(r.getMetricsTags()); err != nil {
logger.Errorw("error sending a metric", "error", err)
log.Errorw("error sending a metric", "error", err)
}

// send informative event to disruption to broadcast
recorder.AnnotatedEventf(r, r.GetAnnotations(), Events[EventDisruptionCreated].Type, string(EventDisruptionCreated), Events[EventDisruptionCreated].OnDisruptionTemplateMessage)
disruptionJSON, err := json.Marshal(r)
if err != nil {
log.Errorw("failed to marshal disruption", "error", err)
return nil, nil
}

annotations := map[string]string{
EventDisruptionAnnotation: string(disruptionJSON),
}

recorder.AnnotatedEventf(r, annotations, Events[EventDisruptionCreated].Type, string(EventDisruptionCreated), Events[EventDisruptionCreated].OnDisruptionTemplateMessage)

return nil, nil
}

// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type
func (r *Disruption) ValidateUpdate(old runtime.Object) (admission.Warnings, error) {
logger := logger.With("disruptionName", r.Name, "disruptionNamespace", r.Namespace)
logger.Debugw("validating updated disruption", "spec", r.Spec)
log := logger.With("disruptionName", r.Name, "disruptionNamespace", r.Namespace)
log.Debugw("validating updated disruption", "spec", r.Spec)

var err error

Expand All @@ -272,7 +283,7 @@ func (r *Disruption) ValidateUpdate(old runtime.Object) (admission.Warnings, err
// we should NOT always prevent finalizer removal because chaos controller reconcile loop will go through this mutating webhook when perfoming updates
// and need to be able to remove the finalizer to enable the disruption to be garbage collected on successful removal
if controllerutil.ContainsFinalizer(oldDisruption, chaostypes.DisruptionFinalizer) && !controllerutil.ContainsFinalizer(r, chaostypes.DisruptionFinalizer) {
oldPods, err := GetChaosPods(context.Background(), logger, chaosNamespace, k8sClient, oldDisruption, nil)
oldPods, err := GetChaosPods(context.Background(), log, chaosNamespace, k8sClient, oldDisruption, nil)
if err != nil {
return nil, fmt.Errorf("error getting disruption pods: %w", err)
}
Expand All @@ -285,7 +296,7 @@ func (r *Disruption) ValidateUpdate(old runtime.Object) (admission.Warnings, err

metricTags := append(r.getMetricsTags(), "prevent_finalizer_removal:true")
if mErr := metricsSink.MetricValidationFailed(metricTags); mErr != nil {
logger.Errorw("error sending a metric", "error", mErr)
log.Errorw("error sending a metric", "error", mErr)
}

return nil, fmt.Errorf(`unable to remove disruption finalizer, disruption '%s/%s' still has associated pods:
Expand Down Expand Up @@ -319,10 +330,10 @@ You first need to remove those chaos pods (and potentially their finalizers) to
}
}

logger.Debugw("comparing disruption spec hashes", "oldHash", oldHash, "newHash", newHash)
log.Debugw("comparing disruption spec hashes", "oldHash", oldHash, "newHash", newHash)

if oldHash != newHash {
logger.Errorw("error when comparing disruption spec hashes", "oldHash", oldHash, "newHash", newHash)
log.Errorw("error when comparing disruption spec hashes", "oldHash", oldHash, "newHash", newHash)

if oldDisruption.Spec.StaticTargeting {
return nil, fmt.Errorf("[StaticTargeting: true] a disruption spec cannot be updated, please delete and recreate it if needed")
Expand All @@ -333,15 +344,15 @@ You first need to remove those chaos pods (and potentially their finalizers) to

if err := r.Spec.Validate(); err != nil {
if mErr := metricsSink.MetricValidationFailed(r.getMetricsTags()); mErr != nil {
logger.Errorw("error sending a metric", "error", mErr)
log.Errorw("error sending a metric", "error", mErr)
}

return nil, err
}

// send validation metric
if err := metricsSink.MetricValidationUpdated(r.getMetricsTags()); err != nil {
logger.Errorw("error sending a metric", "error", err)
log.Errorw("error sending a metric", "error", err)
}

return nil, nil
Expand Down
Loading

0 comments on commit f4c5261

Please sign in to comment.