Skip to content

Commit

Permalink
Fix backup post hook issue
Browse files Browse the repository at this point in the history
Fix backup post hook issue: hook executes before the backup is finished

Signed-off-by: Wenkai Yin(尹文开) <[email protected]>
  • Loading branch information
ywk253100 committed Dec 9, 2024
1 parent aa7ca15 commit 714a8e7
Show file tree
Hide file tree
Showing 22 changed files with 1,442 additions and 1,403 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/8490-ywk253100
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix backup post hook issue
159 changes: 159 additions & 0 deletions internal/hook/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package hook

import (
"context"
"sync"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

"github.com/vmware-tanzu/velero/pkg/podexec"
"github.com/vmware-tanzu/velero/pkg/podvolume"
)

// Handler handles all the hooks of one backup or restore.
//
// The pod's backup post hooks cannot execute until the PVBs are processed, Handler leverages the podvolume.Backupper to check this,
// and the podvolume.Backupper is per backup/restore, so one instance of Handler can only handle the hooks for one backup or restore.
//
// Handler only handles the hooks of pods for now, but it can be extended
// to other resources or even the hook defined for backup/restore if needed
type Handler interface {
// HandleResourceHooks handles a group of same type hooks for a specific resource, e.g. handles all backup pre hooks for one pod.
// Because whether to execute the hook may depend on the execution result of previous hooks (e.g. hooks will not execute
// if the previous hook is failed and marked as not continue), this function accepts a hook list as a group to handle.
//
// This function blocks until the hook completed, use "AsyncHandleResourceHooks()" instead if you want to handle the hooks asynchronously.
//
// The execution results are returned and also tracked inside the handler, calling the "WaitAllResourceHooksCompleted()" returns the results.
//
// This function only handles the hooks of pod for now, but it can be extended to other resources easily
HandleResourceHooks(ctx context.Context, log logrus.FieldLogger, resource *unstructured.Unstructured, hooks []*ResourceHook) []*ResourceHookResult
// AsyncHandleResourceHooks is the asynchronous version of "HandleResourceHooks()".
//
// Call "WaitAllHooksCompleted()" to wait all hooks completed and get the results.
AsyncHandleResourceHooks(ctx context.Context, log logrus.FieldLogger, resource *unstructured.Unstructured, hooks []*ResourceHook)
// WaitAllResourceHooksCompleted waits resource hooks completed and returns the execution results
WaitAllResourceHooksCompleted(ctx context.Context, log logrus.FieldLogger) *ResourceHookResults
}

// make sure "handler" implements "Handler" interface
var _ Handler = &handler{}

func NewHandler(podVolumeBackupper podvolume.Backupper, podCommandExecutor podexec.PodCommandExecutor) Handler {
return &handler{
WaitGroup: &sync.WaitGroup{},
results: &ResourceHookResults{
RWMutex: &sync.RWMutex{},
Results: []*ResourceHookResult{},
},
podVolumeBackupper: podVolumeBackupper,
podCommandExecutor: podCommandExecutor,
}
}

type handler struct {
*sync.WaitGroup
results *ResourceHookResults
podVolumeBackupper podvolume.Backupper
podCommandExecutor podexec.PodCommandExecutor
}

func (h *handler) HandleResourceHooks(ctx context.Context, log logrus.FieldLogger, resource *unstructured.Unstructured, hooks []*ResourceHook) []*ResourceHookResult {
if len(hooks) == 0 {
return nil
}

var results []*ResourceHookResult
// make sure the results are tracked inside the handler
defer func() {
for _, result := range results {
h.results.AddResult(result)
}
}()

markHooksFailed := func(hooks []*ResourceHook, err error) []*ResourceHookResult {
now := time.Now()
for _, hook := range hooks {
results = append(results, &ResourceHookResult{
Hook: hook,
Status: StatusFailed,
StartTime: now,
EndTime: now,
Error: err,
})
}
return results
}

resourceHookHandler, err := h.getResourceHookHandler(hooks[0].Type)
if err != nil {
return markHooksFailed(hooks, errors.Wrapf(err, "failed to get the resource hook handler for type %q", hooks[0].Type))
}

if err = resourceHookHandler.WaitUntilReadyToExec(ctx, log, resource); err != nil {
return markHooksFailed(hooks, errors.Wrap(err, "failed to wait ready to execute hook"))
}

Check warning on line 98 in internal/hook/handler.go

View check run for this annotation

Codecov / codecov/patch

internal/hook/handler.go#L97-L98

Added lines #L97 - L98 were not covered by tests

for i, hook := range hooks {
now := time.Now()
result := &ResourceHookResult{
Hook: hook,
StartTime: now,
}

// execution failed
if err = resourceHookHandler.Exec(ctx, log, resource, hook); err != nil {
result.Status = StatusFailed
result.EndTime = time.Now()
result.Error = err
results = append(results, result)
// skip
if !hook.ContinueOnError {
markHooksFailed(hooks[i+1:], errors.New("skip because the previous hook execution failed"))
break
}
continue
}

// execution completed
result.Status = StatusCompleted
result.EndTime = time.Now()
results = append(results, result)
}

return results
}

func (h *handler) AsyncHandleResourceHooks(ctx context.Context, log logrus.FieldLogger, resource *unstructured.Unstructured, hooks []*ResourceHook) {
n := len(hooks)
h.WaitGroup.Add(n)
go func() {
defer func() {
for i := 0; i < n; i++ {
h.Done()
}
}()

results := h.HandleResourceHooks(ctx, log, resource, hooks)
_ = results
}()
}

func (h *handler) WaitAllResourceHooksCompleted(ctx context.Context, log logrus.FieldLogger) *ResourceHookResults {
h.Wait()
return h.results
}

func (h *handler) getResourceHookHandler(hookType string) (ResourceHookHandler, error) {
switch hookType {
case TypePodBackupPreHook:
return NewPodBackupPreHookHandler(h.podCommandExecutor), nil
case TypePodBackupPostHook:
return NewPodBackupPostHookHandler(h.podVolumeBackupper, h.podCommandExecutor), nil
default:
return nil, errors.Errorf("unknown hook type %q", hookType)
}
}
189 changes: 189 additions & 0 deletions internal/hook/handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package hook

import (
"context"
"errors"
"testing"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
mock_podvolume "github.com/vmware-tanzu/velero/pkg/podvolume/mocks"
"github.com/vmware-tanzu/velero/pkg/test"
)

const (
pod = `{
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": "nginx",
"namespace": "nginx",
"labels": {
"app": "nginx"
}
},
"spec": {
"containers": [
{
"name": "nginx",
"image": "nginx:1.14.2",
"ports": [
{
"containerPort": 80
}
]
}
]
}
}
`
)

func TestHandleResourceHooks(t *testing.T) {
podvolumeBackupper := mock_podvolume.NewMockBackupper(t)
podCMDExecutor := &test.MockPodCommandExecutor{}
handler := NewHandler(podvolumeBackupper, podCMDExecutor)
ctx := context.Background()
log := logrus.New()
res := &unstructured.Unstructured{}
err := res.UnmarshalJSON([]byte(pod))
require.NoError(t, err)
var hooks []*ResourceHook

// empty hooks list
results := handler.HandleResourceHooks(ctx, log, res, hooks)
require.Empty(t, results)

// unknown hooks
hooks = []*ResourceHook{
{
Name: "hook01",
Type: "unknown",
},
{
Name: "hook02",
Type: "unknown",
},
}
results = handler.HandleResourceHooks(ctx, log, res, hooks)
require.Len(t, results, 2)
assert.Equal(t, StatusFailed, results[0].Status)
assert.Equal(t, StatusFailed, results[1].Status)

// skip other hooks if the former one failed and marked as not continue
podCMDExecutor.On("ExecutePodCommand", mock.Anything, mock.Anything, mock.Anything,
mock.Anything, mock.Anything, mock.Anything).Return(errors.New("failed to exec command"))
hooks = []*ResourceHook{
{
Name: "hook01",
Type: TypePodBackupPreHook,
Spec: &velerov1.ExecHook{},
Resource: res,
ContinueOnError: true,
},
{
Name: "hook02",
Type: TypePodBackupPreHook,
Spec: &velerov1.ExecHook{},
Resource: res,
ContinueOnError: false,
},
{
Name: "hook03",
Type: TypePodBackupPreHook,
Spec: &velerov1.ExecHook{},
Resource: res,
ContinueOnError: false,
},
}
results = handler.HandleResourceHooks(ctx, log, res, hooks)
require.Len(t, results, 3)
assert.Equal(t, StatusFailed, results[0].Status)
assert.Equal(t, StatusFailed, results[1].Status)
assert.Equal(t, StatusFailed, results[2].Status)

// all completed
podCMDExecutor.On("ExecutePodCommand").Unset()
podCMDExecutor.On("ExecutePodCommand", mock.Anything, mock.Anything, mock.Anything,
mock.Anything, mock.Anything, mock.Anything).Return(nil)
hooks = []*ResourceHook{
{
Name: "hook01",
Type: TypePodBackupPreHook,
Spec: &velerov1.ExecHook{},
Resource: res,
ContinueOnError: true,
},
{
Name: "hook02",
Type: TypePodBackupPreHook,
Spec: &velerov1.ExecHook{},
Resource: res,
ContinueOnError: false,
},
}
results = handler.HandleResourceHooks(ctx, log, res, hooks)
require.Len(t, results, 2)
assert.Equal(t, StatusCompleted, results[0].Status)
assert.Equal(t, StatusCompleted, results[1].Status)
}

func TestAsyncHandleResourceHooksAndWaitAllResourceHooksCompleted(t *testing.T) {
podvolumeBackupper := mock_podvolume.NewMockBackupper(t)
podCMDExecutor := &test.MockPodCommandExecutor{}
handler := NewHandler(podvolumeBackupper, podCMDExecutor)
ctx := context.Background()
log := logrus.New()
res := &unstructured.Unstructured{}
err := res.UnmarshalJSON([]byte(pod))
require.NoError(t, err)

podCMDExecutor.On("ExecutePodCommand", mock.Anything, mock.Anything, mock.Anything,
mock.Anything, mock.Anything, mock.Anything).Return(nil)
hooks := []*ResourceHook{
{
Name: "hook01",
Type: TypePodBackupPreHook,
Spec: &velerov1.ExecHook{},
Resource: res,
ContinueOnError: true,
},
{
Name: "hook02",
Type: TypePodBackupPreHook,
Spec: &velerov1.ExecHook{},
Resource: res,
ContinueOnError: false,
},
}
handler.AsyncHandleResourceHooks(ctx, log, res, hooks)
results := handler.WaitAllResourceHooksCompleted(ctx, log)
require.NotNil(t, results)
require.Equal(t, 2, results.Total)
require.Equal(t, 2, results.Completed)
assert.Equal(t, StatusCompleted, results.Results[0].Status)
assert.Equal(t, StatusCompleted, results.Results[1].Status)
}

func Test_getResourceHookHandler(t *testing.T) {
handler := &handler{}

// pod backup pre hook
resourceHookHandler, err := handler.getResourceHookHandler(TypePodBackupPreHook)
require.NoError(t, err)
assert.NotNil(t, resourceHookHandler)

// pod backup post hook
resourceHookHandler, err = handler.getResourceHookHandler(TypePodBackupPostHook)
require.NoError(t, err)
assert.NotNil(t, resourceHookHandler)

// unknown hook
_, err = handler.getResourceHookHandler("unknown")
require.Error(t, err)
}
Loading

0 comments on commit 714a8e7

Please sign in to comment.