Skip to content

Commit

Permalink
Make BackedUpItems thread safe
Browse files Browse the repository at this point in the history
Signed-off-by: Scott Seago <[email protected]>
  • Loading branch information
sseago committed Dec 3, 2024
1 parent 3c06fc8 commit d18cf3c
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 44 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/8366-sseago
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make BackedUpItems thread safe
90 changes: 90 additions & 0 deletions pkg/backup/backed_up_items_map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
Copyright the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package backup

import (
"fmt"
"sort"
"sync"
)

// backedUpItemsMap keeps track of the items already backed up for the current Velero Backup
type backedUpItemsMap struct {
*sync.RWMutex
backedUpItems map[itemKey]struct{}
}

func NewBackedUpItemsMap() *backedUpItemsMap {
return &backedUpItemsMap{
RWMutex: &sync.RWMutex{},
backedUpItems: make(map[itemKey]struct{}),
}
}

func (m *backedUpItemsMap) CopyItemMap() map[itemKey]struct{} {
m.RLock()
defer m.RUnlock()
returnMap := make(map[itemKey]struct{}, len(m.backedUpItems))
for key, val := range m.backedUpItems {
returnMap[key] = val
}
return returnMap
}

// ResourceMap returns a map of the backed up items.
// For each map entry, the key is the resource type,
// and the value is a list of namespaced names for the resource
func (m *backedUpItemsMap) ResourceMap() map[string][]string {
m.RLock()
defer m.RUnlock()

resources := map[string][]string{}
for i := range m.backedUpItems {
entry := i.name
if i.namespace != "" {
entry = fmt.Sprintf("%s/%s", i.namespace, i.name)
}
resources[i.resource] = append(resources[i.resource], entry)
}

// sort namespace/name entries for each GVK
for _, v := range resources {
sort.Strings(v)
}

return resources
}

func (m *backedUpItemsMap) Len() int {
m.RLock()
defer m.RUnlock()
return len(m.backedUpItems)
}

func (m *backedUpItemsMap) Has(key itemKey) bool {
m.RLock()
defer m.RUnlock()

_, exists := m.backedUpItems[key]
return exists
}

func (m *backedUpItemsMap) AddItem(key itemKey) {
m.Lock()
defer m.Unlock()
m.backedUpItems[key] = struct{}{}
}
31 changes: 15 additions & 16 deletions pkg/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,6 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
return err
}

backupRequest.BackedUpItems = map[itemKey]struct{}{}

podVolumeTimeout := kb.podVolumeTimeout
if val := backupRequest.Annotations[velerov1api.PodVolumeOperationTimeoutAnnotation]; val != "" {
parsed, err := time.ParseDuration(val)
Expand Down Expand Up @@ -499,20 +497,21 @@ func (kb *kubernetesBackupper) BackupWithResolvers(

// updated total is computed as "how many items we've backed up so far, plus
// how many items we know of that are remaining"
totalItems := len(backupRequest.BackedUpItems) + (len(items) - (i + 1))
backedUpItems := backupRequest.BackedUpItems.Len()
totalItems := backedUpItems + (len(items) - (i + 1))

// send a progress update
update <- progressUpdate{
totalItems: totalItems,
itemsBackedUp: len(backupRequest.BackedUpItems),
itemsBackedUp: backedUpItems,
}

log.WithFields(map[string]interface{}{
"progress": "",
"resource": items[i].groupResource.String(),
"namespace": items[i].namespace,
"name": items[i].name,
}).Infof("Backed up %d items out of an estimated total of %d (estimate will change throughout the backup)", len(backupRequest.BackedUpItems), totalItems)
}).Infof("Backed up %d items out of an estimated total of %d (estimate will change throughout the backup)", backedUpItems, totalItems)
}

// no more progress updates will be sent on the 'update' channel
Expand All @@ -538,8 +537,9 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
if updated.Status.Progress == nil {
updated.Status.Progress = &velerov1api.BackupProgress{}
}
updated.Status.Progress.TotalItems = len(backupRequest.BackedUpItems)
updated.Status.Progress.ItemsBackedUp = len(backupRequest.BackedUpItems)
backedUpItems := backupRequest.BackedUpItems.Len()
updated.Status.Progress.TotalItems = backedUpItems
updated.Status.Progress.ItemsBackedUp = backedUpItems

// update the hooks execution status
if updated.Status.HookStatus == nil {
Expand All @@ -558,8 +558,8 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
log.Infof("Summary for skipped PVs: %s", skippedPVSummary)
}

backupRequest.Status.Progress = &velerov1api.BackupProgress{TotalItems: len(backupRequest.BackedUpItems), ItemsBackedUp: len(backupRequest.BackedUpItems)}
log.WithField("progress", "").Infof("Backed up a total of %d items", len(backupRequest.BackedUpItems))
backupRequest.Status.Progress = &velerov1api.BackupProgress{TotalItems: backedUpItems, ItemsBackedUp: backedUpItems}
log.WithField("progress", "").Infof("Backed up a total of %d items", backedUpItems)

return nil
}
Expand Down Expand Up @@ -667,7 +667,7 @@ func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []sche
continue
}
// Don't run hooks if pod has already been backed up
if _, exists := itemBlock.itemBackupper.backupRequest.BackedUpItems[key]; !exists {
if !itemBlock.itemBackupper.backupRequest.BackedUpItems.Has(key) {
preHookPods = append(preHookPods, item)
}
}
Expand All @@ -681,7 +681,7 @@ func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []sche
itemBlock.Log.WithError(errors.WithStack(err)).Error("Error accessing pod metadata")
continue
}
itemBlock.itemBackupper.backupRequest.BackedUpItems[key] = struct{}{}
itemBlock.itemBackupper.backupRequest.BackedUpItems.AddItem(key)
}

itemBlock.Log.Debug("Backing up items in BackupItemBlock")
Expand Down Expand Up @@ -861,8 +861,6 @@ func (kb *kubernetesBackupper) FinalizeBackup(
return err
}

backupRequest.BackedUpItems = map[itemKey]struct{}{}

// set up a temp dir for the itemCollector to use to temporarily
// store items as they're scraped from the API.
tempDir, err := os.MkdirTemp("", "")
Expand Down Expand Up @@ -947,14 +945,15 @@ func (kb *kubernetesBackupper) FinalizeBackup(

// updated total is computed as "how many items we've backed up so far, plus
// how many items we know of that are remaining"
totalItems := len(backupRequest.BackedUpItems) + (len(items) - (i + 1))
backedUpItems := backupRequest.BackedUpItems.Len()
totalItems := backedUpItems + (len(items) - (i + 1))

log.WithFields(map[string]interface{}{
"progress": "",
"resource": item.groupResource.String(),
"namespace": item.namespace,
"name": item.name,
}).Infof("Updated %d items out of an estimated total of %d (estimate will change throughout the backup finalizer)", len(backupRequest.BackedUpItems), totalItems)
}).Infof("Updated %d items out of an estimated total of %d (estimate will change throughout the backup finalizer)", backedUpItems, totalItems)
}

volumeInfos, err := backupStore.GetBackupVolumeInfos(backupRequest.Backup.Name)
Expand All @@ -979,7 +978,7 @@ func (kb *kubernetesBackupper) FinalizeBackup(
return err
}

log.WithField("progress", "").Infof("Updated a total of %d items", len(backupRequest.BackedUpItems))
log.WithField("progress", "").Infof("Updated a total of %d items", backupRequest.BackedUpItems.Len())

return nil
}
Expand Down
Loading

0 comments on commit d18cf3c

Please sign in to comment.