From e28d41374a96680cb6aa0db0895c98853a3d262e Mon Sep 17 00:00:00 2001 From: Vishal Choudhary Date: Thu, 4 Jul 2024 20:01:06 +0530 Subject: [PATCH] fix: use resource version while watching (#157) * fix: use resource version while watching Signed-off-by: Vishal Choudhary * fix: dont append Signed-off-by: Vishal Choudhary * fix: deepcopy Signed-off-by: Vishal Choudhary * fix: wrong var Signed-off-by: Vishal Choudhary --------- Signed-off-by: Vishal Choudhary --- pkg/api/cephr.go | 41 +++++++++++++++++++++++++++++++++++------ pkg/api/cpolr.go | 41 +++++++++++++++++++++++++++++++++++------ pkg/api/ephr.go | 41 +++++++++++++++++++++++++++++++++++------ pkg/api/filter.go | 2 ++ pkg/api/polr.go | 41 +++++++++++++++++++++++++++++++++++------ 5 files changed, 142 insertions(+), 24 deletions(-) diff --git a/pkg/api/cephr.go b/pkg/api/cephr.go index 9980d52..bc83bd2 100644 --- a/pkg/api/cephr.go +++ b/pkg/api/cephr.go @@ -73,10 +73,8 @@ func (c *cephrStore) List(ctx context.Context, options *metainternalversion.List // } cephrList := &reportsv1.ClusterEphemeralReportList{ - Items: make([]reportsv1.ClusterEphemeralReport, 0), - ListMeta: metav1.ListMeta{ - ResourceVersion: "1", - }, + Items: make([]reportsv1.ClusterEphemeralReport, 0), + ListMeta: metav1.ListMeta{}, } var desiredRv uint64 if len(options.ResourceVersion) == 0 { @@ -143,6 +141,7 @@ func (c *cephrStore) Create(ctx context.Context, obj runtime.Object, createValid } cephr.Annotations = labelReports(cephr.Annotations) + cephr.Generation += 1 klog.Infof("creating cluster ephemeral reports name=%s", cephr.Name) if !isDryRun { r, err := c.createCephr(cephr) @@ -201,6 +200,7 @@ func (c *cephrStore) Update(ctx context.Context, name string, objInfo rest.Updat } cephr.Annotations = labelReports(cephr.Annotations) + cephr.Generation += 1 klog.Infof("updating cluster ephemeral reports name=%s", cephr.Name) if !isDryRun { r, err := c.updateCephr(cephr, oldObj) @@ -271,8 +271,37 @@ func (c *cephrStore) DeleteCollection(ctx context.Context, deleteValidation rest return cephrList, nil } -func (c *cephrStore) Watch(ctx context.Context, _ *metainternalversion.ListOptions) (watch.Interface, error) { - return c.broadcaster.Watch() +func (c *cephrStore) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) { + switch options.ResourceVersion { + case "", "0": + return c.broadcaster.Watch() + default: + break + } + items, err := c.List(ctx, options) + if err != nil { + return nil, err + } + list, ok := items.(*reportsv1.ClusterEphemeralReportList) + if !ok { + return nil, fmt.Errorf("failed to convert runtime object into cluster ephemeral report list") + } + events := make([]watch.Event, len(list.Items)) + for i, pol := range list.Items { + report := pol.DeepCopy() + if report.Generation == 1 || report.Generation == 0 { + events[i] = watch.Event{ + Type: watch.Added, + Object: report, + } + } else { + events[i] = watch.Event{ + Type: watch.Modified, + Object: report, + } + } + } + return c.broadcaster.WatchWithPrefix(events) } func (c *cephrStore) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1beta1.Table, error) { diff --git a/pkg/api/cpolr.go b/pkg/api/cpolr.go index 498e10f..e0d6070 100644 --- a/pkg/api/cpolr.go +++ b/pkg/api/cpolr.go @@ -73,10 +73,8 @@ func (c *cpolrStore) List(ctx context.Context, options *metainternalversion.List // } cpolrList := &v1alpha2.ClusterPolicyReportList{ - Items: make([]v1alpha2.ClusterPolicyReport, 0), - ListMeta: metav1.ListMeta{ - ResourceVersion: "1", - }, + Items: make([]v1alpha2.ClusterPolicyReport, 0), + ListMeta: metav1.ListMeta{}, } var desiredRv uint64 if len(options.ResourceVersion) == 0 { @@ -143,6 +141,7 @@ func (c *cpolrStore) Create(ctx context.Context, obj runtime.Object, createValid } cpolr.Annotations = labelReports(cpolr.Annotations) + cpolr.Generation = 1 klog.Infof("creating cluster policy report name=%s", cpolr.Name) if !isDryRun { r, err := c.createCpolr(cpolr) @@ -200,6 +199,7 @@ func (c *cpolrStore) Update(ctx context.Context, name string, objInfo rest.Updat return nil, false, errors.NewBadRequest("failed to validate cluster policy report") } cpolr.Annotations = labelReports(cpolr.Annotations) + cpolr.Generation += 1 klog.Infof("updating cluster policy report name=%s", cpolr.Name) if !isDryRun { r, err := c.updateCpolr(cpolr, oldObj) @@ -270,8 +270,37 @@ func (c *cpolrStore) DeleteCollection(ctx context.Context, deleteValidation rest return cpolrList, nil } -func (c *cpolrStore) Watch(ctx context.Context, _ *metainternalversion.ListOptions) (watch.Interface, error) { - return c.broadcaster.Watch() +func (c *cpolrStore) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) { + switch options.ResourceVersion { + case "", "0": + return c.broadcaster.Watch() + default: + break + } + items, err := c.List(ctx, options) + if err != nil { + return nil, err + } + list, ok := items.(*v1alpha2.ClusterPolicyReportList) + if !ok { + return nil, fmt.Errorf("failed to convert runtime object into cluster policy report list") + } + events := make([]watch.Event, len(list.Items)) + for i, pol := range list.Items { + report := pol.DeepCopy() + if report.Generation == 1 || report.Generation == 0 { + events[i] = watch.Event{ + Type: watch.Added, + Object: report, + } + } else { + events[i] = watch.Event{ + Type: watch.Modified, + Object: report, + } + } + } + return c.broadcaster.WatchWithPrefix(events) } func (c *cpolrStore) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1beta1.Table, error) { diff --git a/pkg/api/ephr.go b/pkg/api/ephr.go index 8c31c5d..66c0759 100644 --- a/pkg/api/ephr.go +++ b/pkg/api/ephr.go @@ -76,10 +76,8 @@ func (p *ephrStore) List(ctx context.Context, options *metainternalversion.ListO // } ephrList := &reportsv1.EphemeralReportList{ - Items: make([]reportsv1.EphemeralReport, 0), - ListMeta: metav1.ListMeta{ - ResourceVersion: "1", - }, + Items: make([]reportsv1.EphemeralReport, 0), + ListMeta: metav1.ListMeta{}, } var desiredRv uint64 if len(options.ResourceVersion) == 0 { @@ -154,6 +152,7 @@ func (p *ephrStore) Create(ctx context.Context, obj runtime.Object, createValida } ephr.Annotations = labelReports(ephr.Annotations) + ephr.Generation = 1 klog.Infof("creating ephemeral reports name=%s namespace=%s", ephr.Name, ephr.Namespace) if !isDryRun { r, err := p.createEphr(ephr) @@ -218,6 +217,7 @@ func (p *ephrStore) Update(ctx context.Context, name string, objInfo rest.Update } ephr.Annotations = labelReports(ephr.Annotations) + ephr.Generation += 1 klog.Infof("updating ephemeral reports name=%s namespace=%s", ephr.Name, ephr.Namespace) if !isDryRun { r, err := p.updateEphr(ephr, oldObj) @@ -291,8 +291,37 @@ func (p *ephrStore) DeleteCollection(ctx context.Context, deleteValidation rest. return ephrList, nil } -func (p *ephrStore) Watch(ctx context.Context, _ *metainternalversion.ListOptions) (watch.Interface, error) { - return p.broadcaster.Watch() +func (p *ephrStore) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) { + switch options.ResourceVersion { + case "", "0": + return p.broadcaster.Watch() + default: + break + } + items, err := p.List(ctx, options) + if err != nil { + return nil, err + } + list, ok := items.(*reportsv1.EphemeralReportList) + if !ok { + return nil, fmt.Errorf("failed to convert runtime object into ephemeral report list") + } + events := make([]watch.Event, len(list.Items)) + for i, pol := range list.Items { + report := pol.DeepCopy() + if report.Generation == 1 || report.Generation == 0 { + events[i] = watch.Event{ + Type: watch.Added, + Object: report, + } + } else { + events[i] = watch.Event{ + Type: watch.Modified, + Object: report, + } + } + } + return p.broadcaster.WatchWithPrefix(events) } func (p *ephrStore) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1beta1.Table, error) { diff --git a/pkg/api/filter.go b/pkg/api/filter.go index 58ad5dc..05aa0a1 100644 --- a/pkg/api/filter.go +++ b/pkg/api/filter.go @@ -22,6 +22,8 @@ func allowObjectListWatch(object metav1.ObjectMeta, labelSelector labels.Selecto if rv != desiredRv { return false, 0, nil } + default: + break } if labelSelector == nil { diff --git a/pkg/api/polr.go b/pkg/api/polr.go index e82785d..c46d45a 100644 --- a/pkg/api/polr.go +++ b/pkg/api/polr.go @@ -76,10 +76,8 @@ func (p *polrStore) List(ctx context.Context, options *metainternalversion.ListO // } polrList := &v1alpha2.PolicyReportList{ - Items: make([]v1alpha2.PolicyReport, 0), - ListMeta: metav1.ListMeta{ - ResourceVersion: "1", - }, + Items: make([]v1alpha2.PolicyReport, 0), + ListMeta: metav1.ListMeta{}, } var desiredRv uint64 if len(options.ResourceVersion) == 0 { @@ -154,6 +152,7 @@ func (p *polrStore) Create(ctx context.Context, obj runtime.Object, createValida } polr.Annotations = labelReports(polr.Annotations) + polr.Generation = 1 klog.Infof("creating policy reports name=%s namespace=%s", polr.Name, polr.Namespace) if !isDryRun { r, err := p.createPolr(polr) @@ -218,6 +217,7 @@ func (p *polrStore) Update(ctx context.Context, name string, objInfo rest.Update } polr.Annotations = labelReports(polr.Annotations) + polr.Generation += 1 klog.Infof("updating policy reports name=%s namespace=%s", polr.Name, polr.Namespace) if !isDryRun { r, err := p.updatePolr(polr, oldObj) @@ -291,8 +291,37 @@ func (p *polrStore) DeleteCollection(ctx context.Context, deleteValidation rest. return polrList, nil } -func (p *polrStore) Watch(ctx context.Context, _ *metainternalversion.ListOptions) (watch.Interface, error) { - return p.broadcaster.Watch() +func (p *polrStore) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) { + switch options.ResourceVersion { + case "", "0": + return p.broadcaster.Watch() + default: + break + } + items, err := p.List(ctx, options) + if err != nil { + return nil, err + } + list, ok := items.(*v1alpha2.PolicyReportList) + if !ok { + return nil, fmt.Errorf("failed to convert runtime object into policy report list") + } + events := make([]watch.Event, len(list.Items)) + for i, pol := range list.Items { + report := pol.DeepCopy() + if report.Generation == 1 || report.Generation == 0 { + events[i] = watch.Event{ + Type: watch.Added, + Object: report, + } + } else { + events[i] = watch.Event{ + Type: watch.Modified, + Object: report, + } + } + } + return p.broadcaster.WatchWithPrefix(events) } func (p *polrStore) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1beta1.Table, error) {