Skip to content

Commit

Permalink
fix: use resource version while watching (#157)
Browse files Browse the repository at this point in the history
* fix: use resource version while watching

Signed-off-by: Vishal Choudhary <[email protected]>

* fix: dont append

Signed-off-by: Vishal Choudhary <[email protected]>

* fix: deepcopy

Signed-off-by: Vishal Choudhary <[email protected]>

* fix: wrong var

Signed-off-by: Vishal Choudhary <[email protected]>

---------

Signed-off-by: Vishal Choudhary <[email protected]>
  • Loading branch information
vishal-chdhry authored Jul 4, 2024
1 parent 19dc442 commit e28d413
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 24 deletions.
41 changes: 35 additions & 6 deletions pkg/api/cephr.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,8 @@ func (c *cephrStore) List(ctx context.Context, options *metainternalversion.List
// }

cephrList := &reportsv1.ClusterEphemeralReportList{
Items: make([]reportsv1.ClusterEphemeralReport, 0),
ListMeta: metav1.ListMeta{
ResourceVersion: "1",
},
Items: make([]reportsv1.ClusterEphemeralReport, 0),
ListMeta: metav1.ListMeta{},
}
var desiredRv uint64
if len(options.ResourceVersion) == 0 {
Expand Down Expand Up @@ -143,6 +141,7 @@ func (c *cephrStore) Create(ctx context.Context, obj runtime.Object, createValid
}

cephr.Annotations = labelReports(cephr.Annotations)
cephr.Generation += 1
klog.Infof("creating cluster ephemeral reports name=%s", cephr.Name)
if !isDryRun {
r, err := c.createCephr(cephr)
Expand Down Expand Up @@ -201,6 +200,7 @@ func (c *cephrStore) Update(ctx context.Context, name string, objInfo rest.Updat
}

cephr.Annotations = labelReports(cephr.Annotations)
cephr.Generation += 1
klog.Infof("updating cluster ephemeral reports name=%s", cephr.Name)
if !isDryRun {
r, err := c.updateCephr(cephr, oldObj)
Expand Down Expand Up @@ -271,8 +271,37 @@ func (c *cephrStore) DeleteCollection(ctx context.Context, deleteValidation rest
return cephrList, nil
}

func (c *cephrStore) Watch(ctx context.Context, _ *metainternalversion.ListOptions) (watch.Interface, error) {
return c.broadcaster.Watch()
func (c *cephrStore) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
switch options.ResourceVersion {
case "", "0":
return c.broadcaster.Watch()
default:
break
}
items, err := c.List(ctx, options)
if err != nil {
return nil, err
}
list, ok := items.(*reportsv1.ClusterEphemeralReportList)
if !ok {
return nil, fmt.Errorf("failed to convert runtime object into cluster ephemeral report list")
}
events := make([]watch.Event, len(list.Items))
for i, pol := range list.Items {
report := pol.DeepCopy()
if report.Generation == 1 || report.Generation == 0 {
events[i] = watch.Event{
Type: watch.Added,
Object: report,
}
} else {
events[i] = watch.Event{
Type: watch.Modified,
Object: report,
}
}
}
return c.broadcaster.WatchWithPrefix(events)
}

func (c *cephrStore) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1beta1.Table, error) {
Expand Down
41 changes: 35 additions & 6 deletions pkg/api/cpolr.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,8 @@ func (c *cpolrStore) List(ctx context.Context, options *metainternalversion.List
// }

cpolrList := &v1alpha2.ClusterPolicyReportList{
Items: make([]v1alpha2.ClusterPolicyReport, 0),
ListMeta: metav1.ListMeta{
ResourceVersion: "1",
},
Items: make([]v1alpha2.ClusterPolicyReport, 0),
ListMeta: metav1.ListMeta{},
}
var desiredRv uint64
if len(options.ResourceVersion) == 0 {
Expand Down Expand Up @@ -143,6 +141,7 @@ func (c *cpolrStore) Create(ctx context.Context, obj runtime.Object, createValid
}

cpolr.Annotations = labelReports(cpolr.Annotations)
cpolr.Generation = 1
klog.Infof("creating cluster policy report name=%s", cpolr.Name)
if !isDryRun {
r, err := c.createCpolr(cpolr)
Expand Down Expand Up @@ -200,6 +199,7 @@ func (c *cpolrStore) Update(ctx context.Context, name string, objInfo rest.Updat
return nil, false, errors.NewBadRequest("failed to validate cluster policy report")
}
cpolr.Annotations = labelReports(cpolr.Annotations)
cpolr.Generation += 1
klog.Infof("updating cluster policy report name=%s", cpolr.Name)
if !isDryRun {
r, err := c.updateCpolr(cpolr, oldObj)
Expand Down Expand Up @@ -270,8 +270,37 @@ func (c *cpolrStore) DeleteCollection(ctx context.Context, deleteValidation rest
return cpolrList, nil
}

func (c *cpolrStore) Watch(ctx context.Context, _ *metainternalversion.ListOptions) (watch.Interface, error) {
return c.broadcaster.Watch()
func (c *cpolrStore) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
switch options.ResourceVersion {
case "", "0":
return c.broadcaster.Watch()
default:
break
}
items, err := c.List(ctx, options)
if err != nil {
return nil, err
}
list, ok := items.(*v1alpha2.ClusterPolicyReportList)
if !ok {
return nil, fmt.Errorf("failed to convert runtime object into cluster policy report list")
}
events := make([]watch.Event, len(list.Items))
for i, pol := range list.Items {
report := pol.DeepCopy()
if report.Generation == 1 || report.Generation == 0 {
events[i] = watch.Event{
Type: watch.Added,
Object: report,
}
} else {
events[i] = watch.Event{
Type: watch.Modified,
Object: report,
}
}
}
return c.broadcaster.WatchWithPrefix(events)
}

func (c *cpolrStore) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1beta1.Table, error) {
Expand Down
41 changes: 35 additions & 6 deletions pkg/api/ephr.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,8 @@ func (p *ephrStore) List(ctx context.Context, options *metainternalversion.ListO
// }

ephrList := &reportsv1.EphemeralReportList{
Items: make([]reportsv1.EphemeralReport, 0),
ListMeta: metav1.ListMeta{
ResourceVersion: "1",
},
Items: make([]reportsv1.EphemeralReport, 0),
ListMeta: metav1.ListMeta{},
}
var desiredRv uint64
if len(options.ResourceVersion) == 0 {
Expand Down Expand Up @@ -154,6 +152,7 @@ func (p *ephrStore) Create(ctx context.Context, obj runtime.Object, createValida
}

ephr.Annotations = labelReports(ephr.Annotations)
ephr.Generation = 1
klog.Infof("creating ephemeral reports name=%s namespace=%s", ephr.Name, ephr.Namespace)
if !isDryRun {
r, err := p.createEphr(ephr)
Expand Down Expand Up @@ -218,6 +217,7 @@ func (p *ephrStore) Update(ctx context.Context, name string, objInfo rest.Update
}

ephr.Annotations = labelReports(ephr.Annotations)
ephr.Generation += 1
klog.Infof("updating ephemeral reports name=%s namespace=%s", ephr.Name, ephr.Namespace)
if !isDryRun {
r, err := p.updateEphr(ephr, oldObj)
Expand Down Expand Up @@ -291,8 +291,37 @@ func (p *ephrStore) DeleteCollection(ctx context.Context, deleteValidation rest.
return ephrList, nil
}

func (p *ephrStore) Watch(ctx context.Context, _ *metainternalversion.ListOptions) (watch.Interface, error) {
return p.broadcaster.Watch()
func (p *ephrStore) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
switch options.ResourceVersion {
case "", "0":
return p.broadcaster.Watch()
default:
break
}
items, err := p.List(ctx, options)
if err != nil {
return nil, err
}
list, ok := items.(*reportsv1.EphemeralReportList)
if !ok {
return nil, fmt.Errorf("failed to convert runtime object into ephemeral report list")
}
events := make([]watch.Event, len(list.Items))
for i, pol := range list.Items {
report := pol.DeepCopy()
if report.Generation == 1 || report.Generation == 0 {
events[i] = watch.Event{
Type: watch.Added,
Object: report,
}
} else {
events[i] = watch.Event{
Type: watch.Modified,
Object: report,
}
}
}
return p.broadcaster.WatchWithPrefix(events)
}

func (p *ephrStore) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1beta1.Table, error) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ func allowObjectListWatch(object metav1.ObjectMeta, labelSelector labels.Selecto
if rv != desiredRv {
return false, 0, nil
}
default:
break
}

if labelSelector == nil {
Expand Down
41 changes: 35 additions & 6 deletions pkg/api/polr.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,8 @@ func (p *polrStore) List(ctx context.Context, options *metainternalversion.ListO
// }

polrList := &v1alpha2.PolicyReportList{
Items: make([]v1alpha2.PolicyReport, 0),
ListMeta: metav1.ListMeta{
ResourceVersion: "1",
},
Items: make([]v1alpha2.PolicyReport, 0),
ListMeta: metav1.ListMeta{},
}
var desiredRv uint64
if len(options.ResourceVersion) == 0 {
Expand Down Expand Up @@ -154,6 +152,7 @@ func (p *polrStore) Create(ctx context.Context, obj runtime.Object, createValida
}

polr.Annotations = labelReports(polr.Annotations)
polr.Generation = 1
klog.Infof("creating policy reports name=%s namespace=%s", polr.Name, polr.Namespace)
if !isDryRun {
r, err := p.createPolr(polr)
Expand Down Expand Up @@ -218,6 +217,7 @@ func (p *polrStore) Update(ctx context.Context, name string, objInfo rest.Update
}

polr.Annotations = labelReports(polr.Annotations)
polr.Generation += 1
klog.Infof("updating policy reports name=%s namespace=%s", polr.Name, polr.Namespace)
if !isDryRun {
r, err := p.updatePolr(polr, oldObj)
Expand Down Expand Up @@ -291,8 +291,37 @@ func (p *polrStore) DeleteCollection(ctx context.Context, deleteValidation rest.
return polrList, nil
}

func (p *polrStore) Watch(ctx context.Context, _ *metainternalversion.ListOptions) (watch.Interface, error) {
return p.broadcaster.Watch()
func (p *polrStore) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
switch options.ResourceVersion {
case "", "0":
return p.broadcaster.Watch()
default:
break
}
items, err := p.List(ctx, options)
if err != nil {
return nil, err
}
list, ok := items.(*v1alpha2.PolicyReportList)
if !ok {
return nil, fmt.Errorf("failed to convert runtime object into policy report list")
}
events := make([]watch.Event, len(list.Items))
for i, pol := range list.Items {
report := pol.DeepCopy()
if report.Generation == 1 || report.Generation == 0 {
events[i] = watch.Event{
Type: watch.Added,
Object: report,
}
} else {
events[i] = watch.Event{
Type: watch.Modified,
Object: report,
}
}
}
return p.broadcaster.WatchWithPrefix(events)
}

func (p *polrStore) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1beta1.Table, error) {
Expand Down

0 comments on commit e28d413

Please sign in to comment.