Skip to content

Commit

Permalink
Merge pull request #4358 from ywk253100/211117_pager
Browse files Browse the repository at this point in the history
[cherry-pick]fix buggy pager func
  • Loading branch information
reasonerjt authored Nov 17, 2021
2 parents 37a712e + bf10709 commit 80b43f8
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 30 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/4358-alaypatel07
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
fix buggy pager func
51 changes: 21 additions & 30 deletions pkg/backup/item_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -293,58 +293,49 @@ func (r *itemCollector) getResourceItems(log logrus.FieldLogger, gv schema.Group
if selector := r.backupRequest.Spec.LabelSelector; selector != nil {
labelSelector = metav1.FormatLabelSelector(selector)
}
listOptions := metav1.ListOptions{LabelSelector: labelSelector}

log.Info("Listing items")
unstructuredItems := make([]unstructured.Unstructured, 0)

if r.pageSize > 0 {
// If limit is positive, use a pager to split list over multiple requests
// Use Velero's dynamic list function instead of the default
listFunc := pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
list, err := resourceClient.List(listOptions)
if err != nil {
return nil, err
}
return list, nil
})
listPager := pager.New(listFunc)
listPager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return resourceClient.List(opts)
}))
// Use the page size defined in the server config
// TODO allow configuration of page buffer size
listPager.PageSize = int64(r.pageSize)
// Add each item to temporary slice
var items []unstructured.Unstructured
err := listPager.EachListItem(context.Background(), listOptions, func(object runtime.Object) error {
item, isUnstructured := object.(*unstructured.Unstructured)
if !isUnstructured {
// We should never hit this
log.Error("Got type other than Unstructured from pager func")
return nil
list, paginated, err := listPager.List(context.Background(), metav1.ListOptions{LabelSelector: labelSelector})
if err != nil {
log.WithError(errors.WithStack(err)).Error("Error listing resources")
continue
}
if !paginated {
log.Infof("list for groupResource %s was not paginated", gr)
}
err = meta.EachListItem(list, func(object runtime.Object) error {
u, ok := object.(*unstructured.Unstructured)
if !ok {
log.WithError(errors.WithStack(fmt.Errorf("expected *unstructured.Unstructured but got %T", u))).Error("unable to understand entry in the list")
return fmt.Errorf("expected *unstructured.Unstructured but got %T", u)
}
items = append(items, *item)
unstructuredItems = append(unstructuredItems, *u)
return nil
})
if statusError, isStatusError := err.(*apierrors.StatusError); isStatusError && statusError.Status().Reason == metav1.StatusReasonExpired {
log.WithError(errors.WithStack(err)).Error("Error paging item list. Falling back on unpaginated list")
unstructuredList, err := resourceClient.List(listOptions)
if err != nil {
log.WithError(errors.WithStack(err)).Error("Error listing items")
continue
}
items = unstructuredList.Items
} else if err != nil {
log.WithError(errors.WithStack(err)).Error("Error paging item list")
if err != nil {
log.WithError(errors.WithStack(err)).Error("unable to understand paginated list")
continue
}
unstructuredItems = append(unstructuredItems, items...)
} else {
// If limit is not positive, do not use paging. Instead, request all items at once
unstructuredList, err := resourceClient.List(metav1.ListOptions{LabelSelector: labelSelector})
unstructuredItems = append(unstructuredItems, unstructuredList.Items...)
if err != nil {
log.WithError(errors.WithStack(err)).Error("Error listing items")
continue
}
unstructuredItems = append(unstructuredItems, unstructuredList.Items...)
}

log.Infof("Retrieved %d items", len(unstructuredItems))
Expand Down

0 comments on commit 80b43f8

Please sign in to comment.