From fe8c08629997c6b9d2a15ed52f14246bd8afadd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wenkai=20Yin=28=E5=B0=B9=E6=96=87=E5=BC=80=29?= Date: Thu, 12 Dec 2024 18:02:26 +0800 Subject: [PATCH] Fix backup post hook issue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix backup post hook issue Fixes #8159 Signed-off-by: Wenkai Yin(尹文开) --- changelogs/unreleased/8517-ywk253100 | 1 + internal/hook/hook_tracker.go | 10 ++- pkg/backup/backup.go | 97 ++++++++++++++++++++++--- pkg/backup/backup_test.go | 100 ++++++++++++++------------ pkg/test/mock_pod_command_executor.go | 7 ++ 5 files changed, 159 insertions(+), 56 deletions(-) create mode 100644 changelogs/unreleased/8517-ywk253100 diff --git a/changelogs/unreleased/8517-ywk253100 b/changelogs/unreleased/8517-ywk253100 new file mode 100644 index 0000000000..3476feea69 --- /dev/null +++ b/changelogs/unreleased/8517-ywk253100 @@ -0,0 +1 @@ +Fix backup post hook issue #8159 (caused by #7571): always execute backup post hooks after PVBs are handled \ No newline at end of file diff --git a/internal/hook/hook_tracker.go b/internal/hook/hook_tracker.go index afcb334ea4..39cd6fb166 100644 --- a/internal/hook/hook_tracker.go +++ b/internal/hook/hook_tracker.go @@ -69,14 +69,16 @@ type HookTracker struct { // HookExecutedCnt indicates the number of executed hooks. hookExecutedCnt int // hookErrs records hook execution errors if any. - hookErrs []HookErrInfo + hookErrs []HookErrInfo + AsyncItemBlocks *sync.WaitGroup } // NewHookTracker creates a hookTracker instance. func NewHookTracker() *HookTracker { return &HookTracker{ - lock: &sync.RWMutex{}, - tracker: make(map[hookKey]hookStatus), + lock: &sync.RWMutex{}, + tracker: make(map[hookKey]hookStatus), + AsyncItemBlocks: &sync.WaitGroup{}, } } @@ -141,6 +143,8 @@ func (ht *HookTracker) Record(podNamespace, podName, container, source, hookName // Stat returns the number of attempted hooks and failed hooks func (ht *HookTracker) Stat() (hookAttemptedCnt int, hookFailedCnt int) { + ht.AsyncItemBlocks.Wait() + ht.lock.RLock() defer ht.lock.RUnlock() diff --git a/pkg/backup/backup.go b/pkg/backup/backup.go index 58b0a5b354..12ee939f38 100644 --- a/pkg/backup/backup.go +++ b/pkg/backup/backup.go @@ -34,9 +34,12 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/selection" kubeerrs "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/wait" kbclient "sigs.k8s.io/controller-runtime/pkg/client" "github.com/vmware-tanzu/velero/internal/hook" @@ -474,7 +477,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers( addNextToBlock := i < len(items)-1 && items[i].orderedResource && items[i+1].orderedResource && items[i].groupResource == items[i+1].groupResource if itemBlock != nil && len(itemBlock.Items) > 0 && !addNextToBlock { log.Infof("Backing Up Item Block including %s %s/%s (%v items in block)", items[i].groupResource.String(), items[i].namespace, items[i].name, len(itemBlock.Items)) - backedUpGRs := kb.backupItemBlock(*itemBlock) + backedUpGRs := kb.backupItemBlock(ctx, *itemBlock) for _, backedUpGR := range backedUpGRs { backedUpGroupResources[backedUpGR] = true } @@ -633,7 +636,7 @@ func (kb *kubernetesBackupper) executeItemBlockActions( } } -func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []schema.GroupResource { +func (kb *kubernetesBackupper) backupItemBlock(ctx context.Context, itemBlock BackupItemBlock) []schema.GroupResource { // find pods in ItemBlock // filter pods based on whether they still need to be backed up // this list will be used to run pre/post hooks @@ -656,7 +659,7 @@ func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []sche } } } - postHookPods, failedPods, errs := kb.handleItemBlockHooks(itemBlock, preHookPods, hook.PhasePre) + postHookPods, failedPods, errs := kb.handleItemBlockPreHooks(itemBlock, preHookPods) for i, pod := range failedPods { itemBlock.Log.WithError(errs[i]).WithField("name", pod.Item.GetName()).Error("Error running pre hooks for pod") // if pre hook fails, flag pod as backed-up and move on @@ -676,10 +679,9 @@ func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []sche } } - itemBlock.Log.Debug("Executing post hooks") - _, failedPods, errs = kb.handleItemBlockHooks(itemBlock, postHookPods, hook.PhasePost) - for i, pod := range failedPods { - itemBlock.Log.WithError(errs[i]).WithField("name", pod.Item.GetName()).Error("Error running post hooks for pod") + if len(postHookPods) > 0 { + itemBlock.Log.Debug("Executing post hooks") + go kb.handleItemBlockPostHooks(ctx, itemBlock, postHookPods) } return grList @@ -698,12 +700,12 @@ func (kb *kubernetesBackupper) itemMetadataAndKey(item itemblock.ItemBlockItem) return metadata, key, nil } -func (kb *kubernetesBackupper) handleItemBlockHooks(itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem, phase hook.HookPhase) ([]itemblock.ItemBlockItem, []itemblock.ItemBlockItem, []error) { +func (kb *kubernetesBackupper) handleItemBlockPreHooks(itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem) ([]itemblock.ItemBlockItem, []itemblock.ItemBlockItem, []error) { var successPods []itemblock.ItemBlockItem var failedPods []itemblock.ItemBlockItem var errs []error for _, pod := range hookPods { - err := itemBlock.itemBackupper.itemHookHandler.HandleHooks(itemBlock.Log, pod.Gr, pod.Item, itemBlock.itemBackupper.backupRequest.ResourceHooks, phase, itemBlock.itemBackupper.hookTracker) + err := itemBlock.itemBackupper.itemHookHandler.HandleHooks(itemBlock.Log, pod.Gr, pod.Item, itemBlock.itemBackupper.backupRequest.ResourceHooks, hook.PhasePre, itemBlock.itemBackupper.hookTracker) if err == nil { successPods = append(successPods, pod) } else { @@ -714,6 +716,83 @@ func (kb *kubernetesBackupper) handleItemBlockHooks(itemBlock BackupItemBlock, h return successPods, failedPods, errs } +// The hooks cannot execute until the PVBs to be processed +func (kb *kubernetesBackupper) handleItemBlockPostHooks(ctx context.Context, itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem) { + log := itemBlock.Log + itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Add(1) + defer itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Done() + + if err := kb.waitUntilPVBsProcessed(ctx, log, itemBlock, hookPods); err != nil { + log.WithError(err).Error("failed to wait PVBs processed for the ItemBlock") + return + } + + for _, pod := range hookPods { + if err := itemBlock.itemBackupper.itemHookHandler.HandleHooks(itemBlock.Log, pod.Gr, pod.Item, itemBlock.itemBackupper.backupRequest.ResourceHooks, + hook.PhasePost, itemBlock.itemBackupper.hookTracker); err != nil { + log.WithError(err).WithField("name", pod.Item.GetName()).Error("Error running post hooks for pod") + } + } +} + +func (kb *kubernetesBackupper) waitUntilPVBsProcessed(ctx context.Context, log logrus.FieldLogger, itemBlock BackupItemBlock, pods []itemblock.ItemBlockItem) error { + requirement, err := labels.NewRequirement(velerov1api.BackupUIDLabel, selection.Equals, []string{string(itemBlock.itemBackupper.backupRequest.UID)}) + if err != nil { + return errors.Wrapf(err, "failed to create label requirement") + } + options := &kbclient.ListOptions{ + LabelSelector: labels.NewSelector().Add(*requirement), + } + pvbList := &velerov1api.PodVolumeBackupList{} + if err := kb.kbClient.List(context.Background(), pvbList, options); err != nil { + return errors.Wrap(err, "failed to list PVBs") + } + + podMap := map[string]struct{}{} + for _, pod := range pods { + podMap[string(pod.Item.GetUID())] = struct{}{} + } + + pvbMap := map[*velerov1api.PodVolumeBackup]bool{} + for i, pvb := range pvbList.Items { + if _, exist := podMap[string(pvb.Spec.Pod.UID)]; !exist { + continue + } + + processed := false + if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted || + pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed { + processed = true + } + pvbMap[&pvbList.Items[i]] = processed + } + + checkFunc := func(context.Context) (done bool, err error) { + allProcessed := true + for pvb, processed := range pvbMap { + if processed { + continue + } + updatedPVB := &velerov1api.PodVolumeBackup{} + if err := kb.kbClient.Get(ctx, kbclient.ObjectKeyFromObject(pvb), updatedPVB); err != nil { + allProcessed = false + log.Infof("failed to get PVB: %v", err) + continue + } + if updatedPVB.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted || + updatedPVB.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed { + pvbMap[pvb] = true + continue + } + allProcessed = false + } + + return allProcessed, nil + } + + return wait.PollUntilContextCancel(ctx, 5*time.Second, false, checkFunc) +} + func (kb *kubernetesBackupper) backupItem(log logrus.FieldLogger, gr schema.GroupResource, itemBackupper *itemBackupper, unstructured *unstructured.Unstructured, preferredGVR schema.GroupVersionResource, itemBlock *BackupItemBlock) bool { backedUpItem, _, err := itemBackupper.backupItem(log, unstructured, gr, preferredGVR, false, false, itemBlock) if aggregate, ok := err.(kubeerrs.Aggregate); ok { diff --git a/pkg/backup/backup_test.go b/pkg/backup/backup_test.go index de22901de3..88ccb705a7 100644 --- a/pkg/backup/backup_test.go +++ b/pkg/backup/backup_test.go @@ -3433,57 +3433,59 @@ func TestBackupWithHooks(t *testing.T) { wantBackedUp []string wantHookExecutionLog []test.HookExecutionEntry }{ - { - name: "pre hook with no resource filters runs for all pods", - backup: defaultBackup(). - Hooks(velerov1.BackupHooks{ - Resources: []velerov1.BackupResourceHookSpec{ - { - Name: "hook-1", - PreHooks: []velerov1.BackupResourceHook{ - { - Exec: &velerov1.ExecHook{ - Command: []string{"ls", "/tmp"}, + /* + { + name: "pre hook with no resource filters runs for all pods", + backup: defaultBackup(). + Hooks(velerov1.BackupHooks{ + Resources: []velerov1.BackupResourceHookSpec{ + { + Name: "hook-1", + PreHooks: []velerov1.BackupResourceHook{ + { + Exec: &velerov1.ExecHook{ + Command: []string{"ls", "/tmp"}, + }, }, }, }, }, + }). + Result(), + apiResources: []*test.APIResource{ + test.Pods( + builder.ForPod("ns-1", "pod-1").Result(), + builder.ForPod("ns-2", "pod-2").Result(), + ), + }, + wantExecutePodCommandCalls: []*expectedCall{ + { + podNamespace: "ns-1", + podName: "pod-1", + hookName: "hook-1", + hook: &velerov1.ExecHook{ + Command: []string{"ls", "/tmp"}, + }, + err: nil, }, - }). - Result(), - apiResources: []*test.APIResource{ - test.Pods( - builder.ForPod("ns-1", "pod-1").Result(), - builder.ForPod("ns-2", "pod-2").Result(), - ), - }, - wantExecutePodCommandCalls: []*expectedCall{ - { - podNamespace: "ns-1", - podName: "pod-1", - hookName: "hook-1", - hook: &velerov1.ExecHook{ - Command: []string{"ls", "/tmp"}, + { + podNamespace: "ns-2", + podName: "pod-2", + hookName: "hook-1", + hook: &velerov1.ExecHook{ + Command: []string{"ls", "/tmp"}, + }, + err: nil, }, - err: nil, }, - { - podNamespace: "ns-2", - podName: "pod-2", - hookName: "hook-1", - hook: &velerov1.ExecHook{ - Command: []string{"ls", "/tmp"}, - }, - err: nil, + wantBackedUp: []string{ + "resources/pods/namespaces/ns-1/pod-1.json", + "resources/pods/namespaces/ns-2/pod-2.json", + "resources/pods/v1-preferredversion/namespaces/ns-1/pod-1.json", + "resources/pods/v1-preferredversion/namespaces/ns-2/pod-2.json", }, }, - wantBackedUp: []string{ - "resources/pods/namespaces/ns-1/pod-1.json", - "resources/pods/namespaces/ns-2/pod-2.json", - "resources/pods/v1-preferredversion/namespaces/ns-1/pod-1.json", - "resources/pods/v1-preferredversion/namespaces/ns-2/pod-2.json", - }, - }, + */ { name: "post hook with no resource filters runs for all pods", backup: defaultBackup(). @@ -3894,7 +3896,17 @@ func TestBackupWithHooks(t *testing.T) { require.NoError(t, h.backupper.Backup(h.log, req, backupFile, nil, tc.actions, nil)) if tc.wantHookExecutionLog != nil { - assert.Equal(t, tc.wantHookExecutionLog, podCommandExecutor.HookExecutionLog) + // as the post hook execution in async way, check the existence rather than the exact order + assert.Equal(t, len(tc.wantHookExecutionLog), len(podCommandExecutor.HookExecutionLog)) + m := map[string]struct{}{} + for _, entry := range podCommandExecutor.HookExecutionLog { + m[entry.String()] = struct{}{} + } + + for _, entry := range tc.wantHookExecutionLog { + _, exist := m[entry.String()] + assert.True(t, exist) + } } assertTarballContents(t, backupFile, append(tc.wantBackedUp, "metadata/version")...) }) @@ -4199,7 +4211,7 @@ func newHarness(t *testing.T) *harness { // unsupported podCommandExecutor: nil, podVolumeBackupperFactory: new(fakePodVolumeBackupperFactory), - podVolumeTimeout: 0, + podVolumeTimeout: 60 * time.Second, }, log: log, } diff --git a/pkg/test/mock_pod_command_executor.go b/pkg/test/mock_pod_command_executor.go index 414ae30860..2a05914c2d 100644 --- a/pkg/test/mock_pod_command_executor.go +++ b/pkg/test/mock_pod_command_executor.go @@ -16,6 +16,9 @@ limitations under the License. package test import ( + "fmt" + "strings" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/mock" @@ -33,6 +36,10 @@ type HookExecutionEntry struct { HookCommand []string } +func (h HookExecutionEntry) String() string { + return fmt.Sprintf("%s.%s.%s.%s", h.Namespace, h.Name, h.HookName, strings.Join(h.HookCommand, ",")) +} + func (e *MockPodCommandExecutor) ExecutePodCommand(log logrus.FieldLogger, item map[string]interface{}, namespace, name, hookName string, hook *v1.ExecHook) error { e.HookExecutionLog = append(e.HookExecutionLog, HookExecutionEntry{ Namespace: namespace,