Skip to content

Commit

Permalink
fix: add retry max retry mechanism for backup creation
Browse files Browse the repository at this point in the history
Prevent the backup CR from being stuck in the pending state forever.
Backup creation will only be retried up until max retry

longhorn-10090

Signed-off-by: Phan Le <[email protected]>
(cherry picked from commit 63a01a7)
  • Loading branch information
PhanLe1010 committed Jan 10, 2025
1 parent 5d1d411 commit fd5e596
Show file tree
Hide file tree
Showing 3 changed files with 234 additions and 2 deletions.
76 changes: 74 additions & 2 deletions controller/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,38 @@ const (
)

const (
<<<<<<< HEAD
WaitForSnapshotMessage = "Waiting for the snapshot %v to be ready"
WaitForEngineMessage = "Waiting for the engine %v to be ready"
FailedToGetSnapshotMessage = "Failed to get the Snapshot %v"
)

=======
WaitForSnapshotMessage = "Waiting for the snapshot %v to be ready"
FailedWaitingForSnapshotMessage = "Failed waiting for the snapshot %v to be ready"
WaitForEngineMessage = "Waiting for the engine %v to be ready"
FailedWaitingForEngineMessage = "Failed waiting for the engine %v to be ready"
WaitForBackupDeletionIsCompleteMessage = "Wait for backup %v to be deleted"
FailedToGetSnapshotMessage = "Failed to get the Snapshot %v"
FailedToDeleteBackupMessage = "Failed to delete the backup %v in the backupstore, err %v"
NoDeletionInProgressRecordMessage = "No deletion in progress record, retry the deletion command"
)

const (
DeletionMinInterval = time.Minute * 1
DeletionMaxInterval = time.Hour * 24

creationRetryCounterExpiredDuration = 10 * time.Minute
creationRetryCounterGCDuration = 45 * time.Second
maxCreationRetry = 5
)

type DeletingStatus struct {
State longhorn.BackupState
ErrorMessage string
}

>>>>>>> 63a01a72 (fix: add retry max retry mechanism for backup creation)
type BackupController struct {
*baseController

Expand All @@ -63,6 +90,17 @@ type BackupController struct {
cacheSyncs []cache.InformerSynced

proxyConnCounter util.Counter
<<<<<<< HEAD
=======

// Use to track the result of the deletion command.
// Also used to track if controller crashes after the deletion command is triggered.
deletingMapLock sync.Mutex
inProgressDeletingMap map[string]*DeletingStatus

deletingBackoff *flowcontrol.Backoff
creationRetryCounter *util.TimedCounter
>>>>>>> 63a01a72 (fix: add retry max retry mechanism for backup creation)
}

func NewBackupController(
Expand Down Expand Up @@ -96,6 +134,15 @@ func NewBackupController(
eventRecorder: eventBroadcaster.NewRecorder(scheme, corev1.EventSource{Component: "longhorn-backup-controller"}),

proxyConnCounter: proxyConnCounter,
<<<<<<< HEAD
=======

deletingMapLock: sync.Mutex{},
inProgressDeletingMap: map[string]*DeletingStatus{},

deletingBackoff: flowcontrol.NewBackOff(DeletionMinInterval, DeletionMaxInterval),
creationRetryCounter: util.NewTimedCounter(creationRetryCounterExpiredDuration),
>>>>>>> 63a01a72 (fix: add retry max retry mechanism for backup creation)
}

var err error
Expand Down Expand Up @@ -138,6 +185,7 @@ func (bc *BackupController) Run(workers int, stopCh <-chan struct{}) {
for i := 0; i < workers; i++ {
go wait.Until(bc.worker, time.Second, stopCh)
}
go bc.creationRetryCounter.RunGC(creationRetryCounterGCDuration, stopCh)
<-stopCh
}

Expand Down Expand Up @@ -317,6 +365,8 @@ func (bc *BackupController) reconcile(backupName string) (err error) {
// Disable monitor regardless of backup state
bc.disableBackupMonitor(backup.Name)

bc.creationRetryCounter.DeleteEntry(backup.Name)

if backup.Status.State == longhorn.BackupStateError || backup.Status.State == longhorn.BackupStateUnknown {
bc.eventRecorder.Eventf(backup, corev1.EventTypeWarning, string(backup.Status.State), "Failed backup %s has been deleted: %s", backup.Name, backup.Status.Error)
}
Expand Down Expand Up @@ -740,17 +790,28 @@ func (bc *BackupController) checkMonitor(backup *longhorn.Backup, volume *longho
if engine.Status.CurrentState != longhorn.InstanceStateRunning ||
engine.Spec.DesireState != longhorn.InstanceStateRunning ||
volume.Status.State != longhorn.VolumeStateAttached {
bc.creationRetryCounter.IncreaseCount(backup.Name)
if bc.creationRetryCounter.GetCount(backup.Name) >= maxCreationRetry {
backup.Status.Error = fmt.Sprintf(FailedWaitingForEngineMessage, engine.Name)
backup.Status.State = longhorn.BackupStateError
backup.Status.LastSyncedAt = metav1.Time{Time: time.Now().UTC()}
bc.creationRetryCounter.DeleteEntry(backup.Name)
return nil, fmt.Errorf("failed waiting for the engine %v to be running before enabling backup monitor", engine.Name)
}
backup.Status.State = longhorn.BackupStatePending
backup.Status.Messages[MessageTypeReconcileInfo] = fmt.Sprintf(WaitForEngineMessage, engine.Name)
return nil, fmt.Errorf("waiting for the engine %v to be running before enabling backup monitor", engine.Name)
}

snapshot, err := bc.ds.GetSnapshotRO(backup.Spec.SnapshotName)
if err != nil {
if apierrors.IsNotFound(err) {
backup.Status.Error = fmt.Sprintf("Snapshot %v not found", backup.Spec.SnapshotName)
bc.creationRetryCounter.IncreaseCount(backup.Name)
msg := fmt.Sprintf(FailedToGetSnapshotMessage, backup.Spec.SnapshotName)
if bc.creationRetryCounter.GetCount(backup.Name) >= maxCreationRetry {
backup.Status.Error = msg
backup.Status.State = longhorn.BackupStateError
backup.Status.LastSyncedAt = metav1.Time{Time: time.Now().UTC()}
bc.creationRetryCounter.DeleteEntry(backup.Name)
} else {
backup.Status.State = longhorn.BackupStatePending
backup.Status.Messages[MessageTypeReconcileInfo] = fmt.Sprintf(FailedToGetSnapshotMessage, backup.Spec.SnapshotName)
Expand All @@ -759,6 +820,14 @@ func (bc *BackupController) checkMonitor(backup *longhorn.Backup, volume *longho
}
if snapshot != nil {
if !snapshot.Status.ReadyToUse {
bc.creationRetryCounter.IncreaseCount(backup.Name)
if bc.creationRetryCounter.GetCount(backup.Name) >= maxCreationRetry {
backup.Status.Error = fmt.Sprintf(FailedWaitingForSnapshotMessage, backup.Spec.SnapshotName)
backup.Status.State = longhorn.BackupStateError
backup.Status.LastSyncedAt = metav1.Time{Time: time.Now().UTC()}
bc.creationRetryCounter.DeleteEntry(backup.Name)
return nil, fmt.Errorf("failed waiting for the snapshot %v to be ready before enabling backup monitor", backup.Spec.SnapshotName)
}
backup.Status.State = longhorn.BackupStatePending
backup.Status.Messages[MessageTypeReconcileInfo] = fmt.Sprintf(WaitForSnapshotMessage, backup.Spec.SnapshotName)
return nil, fmt.Errorf("waiting for the snapshot %v to be ready before enabling backup monitor", backup.Spec.SnapshotName)
Expand All @@ -774,6 +843,9 @@ func (bc *BackupController) checkMonitor(backup *longhorn.Backup, volume *longho
backup.Status.LastSyncedAt = metav1.Time{Time: time.Now().UTC()}
return nil, err
}

// backup creation is succeeded, remove it from the counter
bc.creationRetryCounter.DeleteEntry(backup.Name)
return monitor, nil
}

Expand Down
88 changes: 88 additions & 0 deletions util/timed_counter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package util

import (
"sync"
"time"
)

type counterEntry struct {
count int
lastUpdate time.Time
}

type TimedCounter struct {
sync.RWMutex
perItemCounter map[string]*counterEntry

expiredDuration time.Duration
}

func NewTimedCounter(expiredDuration time.Duration) *TimedCounter {
return &TimedCounter{
perItemCounter: map[string]*counterEntry{},
expiredDuration: expiredDuration,
}
}

func (c *TimedCounter) GetCount(id string) int {
c.RLock()
defer c.RUnlock()
entry, ok := c.perItemCounter[id]
if ok {
return entry.count
}
return 0
}

func (c *TimedCounter) IncreaseCount(id string) {
c.Lock()
defer c.Unlock()
entry, ok := c.perItemCounter[id]
if !ok {
entry = c.initEntryUnsafe(id)
}
entry.count += 1
entry.lastUpdate = time.Now()
}

func (c *TimedCounter) DeleteEntry(id string) {
c.Lock()
defer c.Unlock()
delete(c.perItemCounter, id)
}

func (c *TimedCounter) gc() {
c.Lock()
defer c.Unlock()
now := time.Now()
for id, entry := range c.perItemCounter {
if now.Sub(entry.lastUpdate) > c.expiredDuration {
delete(c.perItemCounter, id)
}
}
}

func (c *TimedCounter) RunGC(gcDuration time.Duration, stopCh <-chan struct{}) {
ticker := time.NewTicker(gcDuration)
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.gc()
case <-stopCh:
return
}
}
}

func (c *TimedCounter) GetTotalEntries() int {
c.RLock()
defer c.RUnlock()
return len(c.perItemCounter)
}

func (c *TimedCounter) initEntryUnsafe(id string) *counterEntry {
entry := &counterEntry{count: 0}
c.perItemCounter[id] = entry
return entry
}
72 changes: 72 additions & 0 deletions util/timed_counter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package util

import (
"testing"
"time"
)

func TestGetCount(t *testing.T) {
counter := NewTimedCounter(1 * time.Minute)

counter.IncreaseCount("item1")
if count := counter.GetCount("item1"); count != 1 {
t.Errorf("expected count 1, got %d", count)
}

counter.IncreaseCount("item1")
if count := counter.GetCount("item1"); count != 2 {
t.Errorf("expected count 2, got %d", count)
}

if count := counter.GetCount("nonexistent"); count != 0 {
t.Errorf("expected count 0 for nonexistent item, got %d", count)
}
}

func TestDeleteEntry(t *testing.T) {
counter := NewTimedCounter(1 * time.Minute)

counter.IncreaseCount("item1")
counter.DeleteEntry("item1")

if numEntries := counter.GetTotalEntries(); numEntries != 0 {
t.Errorf("expected 0 entry after deletion, got %d", numEntries)
}

if count := counter.GetCount("item1"); count != 0 {
t.Errorf("expected count 0 after deletion, got %d", count)
}
}

func TestGC(t *testing.T) {
counter := NewTimedCounter(200 * time.Millisecond)

counter.IncreaseCount("item1")
counter.IncreaseCount("item2")

time.Sleep(400 * time.Millisecond) // Wait for entries to expire

counter.gc() // Manually trigger garbage collection

if numEntries := counter.GetTotalEntries(); numEntries != 0 {
t.Errorf("expected 0 entry after gc, got %d", numEntries)
}
}

func TestRunGC(t *testing.T) {
counter := NewTimedCounter(200 * time.Millisecond)

counter.IncreaseCount("item1")
counter.IncreaseCount("item2")

stopCh := make(chan struct{})
go counter.RunGC(300*time.Millisecond, stopCh)

time.Sleep(400 * time.Millisecond) // Wait for GC to run

if numEntries := counter.GetTotalEntries(); numEntries != 0 {
t.Errorf("expected 0 entry, got %d", numEntries)
}

close(stopCh)
}

0 comments on commit fd5e596

Please sign in to comment.