Skip to content

Commit

Permalink
fix: add retry max retry mechanism for backup creation
Browse files Browse the repository at this point in the history
Prevent the backup CR from being stuck in the pending state forever.
Backup creation will only be retried up until max retry

longhorn-10090

Signed-off-by: Phan Le <[email protected]>
  • Loading branch information
PhanLe1010 committed Jan 10, 2025
1 parent 6ab8076 commit 157e7e8
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 4 deletions.
39 changes: 35 additions & 4 deletions controller/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ const (

const (
WaitForSnapshotMessage = "Waiting for the snapshot %v to be ready"
FailedWaitingForSnapshotMessage = "Failed waiting for the snapshot %v to be ready"
WaitForEngineMessage = "Waiting for the engine %v to be ready"
FailedWaitingForEngineMessage = "Failed waiting for the engine %v to be ready"
WaitForBackupDeletionIsCompleteMessage = "Wait for backup %v to be deleted"
FailedToGetSnapshotMessage = "Failed to get the Snapshot %v"
FailedToDeleteBackupMessage = "Failed to delete the backup %v in the backupstore, err %v"
Expand All @@ -51,6 +53,10 @@ const (
const (
DeletionMinInterval = time.Minute * 1
DeletionMaxInterval = time.Hour * 24

creationRetryCounterExpiredDuration = 10 * time.Minute
creationRetryCounterGCDuration = 45 * time.Second
maxCreationRetry = 4
)

type DeletingStatus struct {
Expand Down Expand Up @@ -83,7 +89,8 @@ type BackupController struct {
deletingMapLock sync.Mutex
inProgressDeletingMap map[string]*DeletingStatus

deletingBackoff *flowcontrol.Backoff
deletingBackoff *flowcontrol.Backoff
creationRetryCounter *util.TimedCounter
}

func NewBackupController(
Expand Down Expand Up @@ -121,7 +128,8 @@ func NewBackupController(
deletingMapLock: sync.Mutex{},
inProgressDeletingMap: map[string]*DeletingStatus{},

deletingBackoff: flowcontrol.NewBackOff(DeletionMinInterval, DeletionMaxInterval),
deletingBackoff: flowcontrol.NewBackOff(DeletionMinInterval, DeletionMaxInterval),
creationRetryCounter: util.NewTimedCounter(creationRetryCounterExpiredDuration),
}

var err error
Expand Down Expand Up @@ -164,6 +172,7 @@ func (bc *BackupController) Run(workers int, stopCh <-chan struct{}) {
for i := 0; i < workers; i++ {
go wait.Until(bc.worker, time.Second, stopCh)
}
go bc.creationRetryCounter.RunGC(creationRetryCounterGCDuration, stopCh)
<-stopCh
}

Expand Down Expand Up @@ -813,27 +822,46 @@ func (bc *BackupController) checkMonitor(backup *longhorn.Backup, volume *longho
if engine.Status.CurrentState != longhorn.InstanceStateRunning ||
engine.Spec.DesireState != longhorn.InstanceStateRunning ||
volume.Status.State != longhorn.VolumeStateAttached {
if bc.creationRetryCounter.GetCount(backup.Name) >= maxCreationRetry {
backup.Status.Error = fmt.Sprintf(FailedWaitingForEngineMessage, engine.Name)
backup.Status.State = longhorn.BackupStateError
backup.Status.LastSyncedAt = metav1.Time{Time: time.Now().UTC()}
bc.creationRetryCounter.DeleteEntry(backup.Name)
return nil, fmt.Errorf("failed waiting for the engine %v to be running before enabling backup monitor", engine.Name)
}
backup.Status.State = longhorn.BackupStatePending
backup.Status.Messages[MessageTypeReconcileInfo] = fmt.Sprintf(WaitForEngineMessage, engine.Name)
bc.creationRetryCounter.IncreaseCount(backup.Name)
return nil, fmt.Errorf("waiting for the engine %v to be running before enabling backup monitor", engine.Name)
}

snapshot, err := bc.ds.GetSnapshotRO(backup.Spec.SnapshotName)
if err != nil {
if apierrors.IsNotFound(err) {
backup.Status.Error = fmt.Sprintf("Snapshot %v not found", backup.Spec.SnapshotName)
msg := fmt.Sprintf(FailedToGetSnapshotMessage, backup.Spec.SnapshotName)
if bc.creationRetryCounter.GetCount(backup.Name) >= maxCreationRetry {
backup.Status.Error = msg
backup.Status.State = longhorn.BackupStateError
backup.Status.LastSyncedAt = metav1.Time{Time: time.Now().UTC()}
bc.creationRetryCounter.DeleteEntry(backup.Name)
} else {
backup.Status.State = longhorn.BackupStatePending
backup.Status.Messages[MessageTypeReconcileInfo] = fmt.Sprintf(FailedToGetSnapshotMessage, backup.Spec.SnapshotName)
bc.creationRetryCounter.IncreaseCount(backup.Name)
}
return nil, errors.Wrapf(err, "failed to get the snapshot %v before enabling backup monitor", backup.Spec.SnapshotName)
}
if snapshot != nil {
if !snapshot.Status.ReadyToUse {
if bc.creationRetryCounter.GetCount(backup.Name) >= maxCreationRetry {
backup.Status.Error = fmt.Sprintf(FailedWaitingForSnapshotMessage, backup.Spec.SnapshotName)
backup.Status.State = longhorn.BackupStateError
backup.Status.LastSyncedAt = metav1.Time{Time: time.Now().UTC()}
bc.creationRetryCounter.DeleteEntry(backup.Name)
return nil, fmt.Errorf("failed waiting for the snapshot %v to be ready before enabling backup monitor", backup.Spec.SnapshotName)
}
backup.Status.State = longhorn.BackupStatePending
backup.Status.Messages[MessageTypeReconcileInfo] = fmt.Sprintf(WaitForSnapshotMessage, backup.Spec.SnapshotName)
bc.creationRetryCounter.IncreaseCount(backup.Name)
return nil, fmt.Errorf("waiting for the snapshot %v to be ready before enabling backup monitor", backup.Spec.SnapshotName)
}
}
Expand All @@ -859,6 +887,9 @@ func (bc *BackupController) checkMonitor(backup *longhorn.Backup, volume *longho
backup.Status.LastSyncedAt = metav1.Time{Time: time.Now().UTC()}
return nil, err
}

// backup creation is succeeded, remove it from the counter
bc.creationRetryCounter.DeleteEntry(backup.Name)
return monitor, nil
}

Expand Down
88 changes: 88 additions & 0 deletions util/timed_counter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package util

import (
"sync"
"time"
)

type counterEntry struct {
count int
lastUpdate time.Time
}

type TimedCounter struct {
sync.RWMutex
perItemCounter map[string]*counterEntry

expiredDuration time.Duration
}

func NewTimedCounter(expiredDuration time.Duration) *TimedCounter {
return &TimedCounter{
perItemCounter: map[string]*counterEntry{},
expiredDuration: expiredDuration,
}
}

func (c *TimedCounter) GetCount(id string) int {
c.RLock()
defer c.RUnlock()
entry, ok := c.perItemCounter[id]
if ok {
return entry.count
}
return 0
}

func (c *TimedCounter) IncreaseCount(id string) {
c.Lock()
defer c.Unlock()
entry, ok := c.perItemCounter[id]
if !ok {
entry = c.initEntryUnsafe(id)
}
entry.count += 1
entry.lastUpdate = time.Now()
}

func (c *TimedCounter) DeleteEntry(id string) {
c.Lock()
defer c.Unlock()
delete(c.perItemCounter, id)
}

func (c *TimedCounter) gc() {
c.Lock()
defer c.Unlock()
now := time.Now()
for id, entry := range c.perItemCounter {
if now.Sub(entry.lastUpdate) > c.expiredDuration {
delete(c.perItemCounter, id)
}
}
}

func (c *TimedCounter) RunGC(gcDuration time.Duration, stopCh <-chan struct{}) {
ticker := time.NewTicker(gcDuration)
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.gc()
case <-stopCh:
return
}
}
}

func (c *TimedCounter) GetTotalEntries() int {
c.RLock()
defer c.RUnlock()
return len(c.perItemCounter)
}

func (c *TimedCounter) initEntryUnsafe(id string) *counterEntry {
entry := &counterEntry{count: 0}
c.perItemCounter[id] = entry
return entry
}
72 changes: 72 additions & 0 deletions util/timed_counter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package util

import (
"testing"
"time"
)

func TestGetCount(t *testing.T) {
counter := NewTimedCounter(1 * time.Minute)

counter.IncreaseCount("item1")
if count := counter.GetCount("item1"); count != 1 {
t.Errorf("expected count 1, got %d", count)
}

counter.IncreaseCount("item1")
if count := counter.GetCount("item1"); count != 2 {
t.Errorf("expected count 2, got %d", count)
}

if count := counter.GetCount("nonexistent"); count != 0 {
t.Errorf("expected count 0 for nonexistent item, got %d", count)
}
}

func TestDeleteEntry(t *testing.T) {
counter := NewTimedCounter(1 * time.Minute)

counter.IncreaseCount("item1")
counter.DeleteEntry("item1")

if numEntries := counter.GetTotalEntries(); numEntries != 0 {
t.Errorf("expected 0 entry after deletion, got %d", numEntries)
}

if count := counter.GetCount("item1"); count != 0 {
t.Errorf("expected count 0 after deletion, got %d", count)
}
}

func TestGC(t *testing.T) {
counter := NewTimedCounter(200 * time.Millisecond)

counter.IncreaseCount("item1")
counter.IncreaseCount("item2")

time.Sleep(400 * time.Millisecond) // Wait for entries to expire

counter.gc() // Manually trigger garbage collection

if numEntries := counter.GetTotalEntries(); numEntries != 0 {
t.Errorf("expected 0 entry after gc, got %d", numEntries)
}
}

func TestRunGC(t *testing.T) {
counter := NewTimedCounter(200 * time.Millisecond)

counter.IncreaseCount("item1")
counter.IncreaseCount("item2")

stopCh := make(chan struct{})
go counter.RunGC(300*time.Millisecond, stopCh)

time.Sleep(400 * time.Millisecond) // Wait for GC to run

if numEntries := counter.GetTotalEntries(); numEntries != 0 {
t.Errorf("expected 0 entry, got %d", numEntries)
}

close(stopCh)
}

0 comments on commit 157e7e8

Please sign in to comment.