Skip to content

feat(syncwaves): use binary tree ordering for sync waves #744

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/sync/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ const (
AnnotationSyncOptions = "argocd.argoproj.io/sync-options"
// AnnotationSyncWave indicates which wave of the sync the resource or hook should be in
AnnotationSyncWave = "argocd.argoproj.io/sync-wave"
// AnnotationUseBinaryTreeWaveOrdering indicates if the resource or hook's wave should be ordered using a binary tree ordering
AnnotationUseBinaryTreeWaveOrdering = "argocd.argoproj.io/use-binary-tree-wave-ordering"
// AnnotationKeyHook contains the hook type of a resource
AnnotationKeyHook = "argocd.argoproj.io/hook"
// AnnotationKeyHookDeletePolicy is the policy of deleting a hook
Expand Down Expand Up @@ -58,7 +60,7 @@ type SyncPhase string
// SyncWaveHook is a callback function which will be invoked after each sync wave is successfully
// applied during a sync operation. The callback indicates which phase and wave it had just
// executed, and whether or not that wave was the final one.
type SyncWaveHook func(phase SyncPhase, wave int, final bool) error
type SyncWaveHook func(phase SyncPhase, waves []int, final bool) error

const (
SyncPhasePreSync = "PreSync"
Expand Down
26 changes: 26 additions & 0 deletions pkg/sync/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Package implements Kubernetes resources synchronization and provides the followi
- resource pruning
- resource hooks
- sync waves
- sync waves binary tree ordering
- sync options

# Basic Syncing
Expand Down Expand Up @@ -75,6 +76,31 @@ that runs before all other resources. The `argocd.argoproj.io/sync-wave` annotat
annotations:
argocd.argoproj.io/sync-wave: "5"

# Sync Waves Binary Tree Ordering

The wave ordering using a binary tree feature allows to run parallel waves of synchronisation where the sync-wave values
correspond to a complete binary tree with root's label equal to 1. A sync-wave value X would be considered less than Y
when using binary tree ordering if and only if there exists integers N and M such that :
Y = X * 2**N + M where 0 <= M < N.

The `argocd.argoproj.io/use-binary-tree-wave-ordering` annotation define the type of wave's ordering used for a resource's wave:

metadata:
annotations:
argocd.argoproj.io/sync-wave: "5"
argocd.argoproj.io/use-binary-tree-wave-ordering: "true"

example of waves ordering using binary tree:

1 -----> 2 -----> 4
\ \----> 5
\---> 3 -----> 6
\----> 7

Note that a resource using a binary tree ordering for sync waves will always be synced after all resources using a normal ordering.
Note also that all resources using a binary tree ordering and having a sync wave value inferior to 1 will behave like resources using
a normal wave ordering.

# Sync Options

The sync options allows customizing the synchronization of selected resources. The options are specified using the
Expand Down
136 changes: 111 additions & 25 deletions pkg/sync/sync_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"encoding/json"
"fmt"
"reflect"
"slices"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -562,24 +564,27 @@ func (sc *syncContext) Sync() {

// remove any tasks not in this wave
phase := tasks.phase()
wave := tasks.wave()
finalWave := phase == tasks.lastPhase() && wave == tasks.lastWave()
waves, wavesUseBinaryTreeOrdering := tasks.waves()
lastWaves, lastWavesUseBinaryTreeOrdering := tasks.lastWaves()
finalWaves := phase == tasks.lastPhase() && reflect.DeepEqual(waves, lastWaves) && wavesUseBinaryTreeOrdering == lastWavesUseBinaryTreeOrdering

// if it is the last phase/wave and the only remaining tasks are non-hooks, the we are successful
// EVEN if those objects subsequently degraded
// This handles the common case where neither hooks or waves are used and a sync equates to simply an (asynchronous) kubectl apply of manifests, which succeeds immediately.
remainingTasks := tasks.Filter(func(t *syncTask) bool { return t.phase != phase || wave != t.wave() || t.isHook() })
remainingTasks := tasks.Filter(func(t *syncTask) bool { return t.phase != phase || !slices.Contains(waves, t.wave()) || t.isHook() })

sc.log.WithValues("phase", phase, "wave", wave, "tasks", tasks, "syncFailTasks", syncFailTasks).V(1).Info("Filtering tasks in correct phase and wave")
tasks = tasks.Filter(func(t *syncTask) bool { return t.phase == phase && t.wave() == wave })
sc.log.WithValues("phase", phase, "wave", waves, "tasks", tasks, "syncFailTasks", syncFailTasks).V(1).Info("Filtering tasks in correct phase and wave")
tasks = tasks.Filter(func(t *syncTask) bool {
return t.phase == phase && slices.Contains(waves, t.wave()) && t.waveUseBinaryTreeOrdering() == wavesUseBinaryTreeOrdering
})

sc.setOperationPhase(common.OperationRunning, "one or more tasks are running")

sc.log.WithValues("tasks", tasks).V(1).Info("Wet-run")
runState := sc.runTasks(tasks, false)

if sc.syncWaveHook != nil && runState != failed {
err := sc.syncWaveHook(phase, wave, finalWave)
err := sc.syncWaveHook(phase, waves, finalWaves)
if err != nil {
sc.deleteHooks(hooksPendingDeletionFailed)
sc.setOperationPhase(common.OperationFailed, fmt.Sprintf("SyncWaveHook failed: %v", err))
Expand Down Expand Up @@ -899,52 +904,133 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) {
}
}

// for prune tasks, modify the waves for proper cleanup i.e reverse of sync wave (creation order)
pruneTasks := make(map[int][]*syncTask)
// for prune tasks, modify the waves for proper cleanup i.e reverse of sync wave (creation order).
// if all prune tasks use normal wave ordering, use the legacy method. Otherwise, use a binary tree wave ordering
// on all prune tasks and modify the waves to decreasing power of 2.
// For prune tasks which already use binary tree wave ordering, set an identical syncWave to tasks which
// have the same level in a complete binary tree rooted at 1 where each node n has 2*n and 2*n+1 as children.

pruntTasksUsingNormalOrdering := make(map[int][]*syncTask)
for _, task := range tasks {
if task.isPrune() {
pruneTasks[task.wave()] = append(pruneTasks[task.wave()], task)
if task.isPrune() && task.waveUseBinaryTreeOrdering() == "false" {
pruntTasksUsingNormalOrdering[task.wave()] = append(pruntTasksUsingNormalOrdering[task.wave()], task)
}
}
var uniquePruneWavesUsingNormalOrdering []int
for k := range pruntTasksUsingNormalOrdering {
uniquePruneWavesUsingNormalOrdering = append(uniquePruneWavesUsingNormalOrdering, k)
}

var uniquePruneWaves []int
for k := range pruneTasks {
uniquePruneWaves = append(uniquePruneWaves, k)
sort.Ints(uniquePruneWavesUsingNormalOrdering)

pruneTasksUsingBinaryTreeOrdering := make(map[int][]*syncTask)
for _, task := range tasks {
if task.isPrune() && task.waveUseBinaryTreeOrdering() == "true" {
pruneTasksUsingBinaryTreeOrdering[task.wave()] = append(pruneTasksUsingBinaryTreeOrdering[task.wave()], task)
}
}
sort.Ints(uniquePruneWaves)

// reorder waves for pruning tasks using symmetric swap on prune waves
n := len(uniquePruneWaves)
for i := 0; i < n/2; i++ {
// waves to swap
startWave := uniquePruneWaves[i]
endWave := uniquePruneWaves[n-1-i]
if len(pruneTasksUsingBinaryTreeOrdering) > 0 {
var uniquePruneWavesUsingBinaryTreeOrdering []int
for k := range pruneTasksUsingBinaryTreeOrdering {
uniquePruneWavesUsingBinaryTreeOrdering = append(uniquePruneWavesUsingBinaryTreeOrdering, k)
}
sort.Ints(uniquePruneWavesUsingBinaryTreeOrdering)

for _, task := range pruneTasks[startWave] {
task.waveOverride = &endWave
pruneTasksWavesValues := []int{0}
for i := 1; i < len(uniquePruneWavesUsingNormalOrdering); i++ {
pruneTasksWavesValues = append(pruneTasksWavesValues, i)
}
nextPotentialWaveValue := len(uniquePruneWavesUsingNormalOrdering)
if len(uniquePruneWavesUsingNormalOrdering) != 0 {
pruneTasksWavesValues = append(pruneTasksWavesValues, nextPotentialWaveValue)
}
for i := 1; i < len(uniquePruneWavesUsingBinaryTreeOrdering); i++ {
currentWaveValue := biggestPowerOf2InferiorThan(uniquePruneWavesUsingBinaryTreeOrdering[i])
previousWaveValue := biggestPowerOf2InferiorThan(uniquePruneWavesUsingBinaryTreeOrdering[i-1])
if currentWaveValue == previousWaveValue {
pruneTasksWavesValues = append(pruneTasksWavesValues, nextPotentialWaveValue)
} else {
nextPotentialWaveValue++
pruneTasksWavesValues = append(pruneTasksWavesValues, nextPotentialWaveValue)
}
}

for _, task := range pruneTasks[endWave] {
task.waveOverride = &startWave
pruneTasksWavesNewValues := PowInt(2, pruneTasksWavesValues[len(pruneTasksWavesValues)-1])
newPruneWaves := []int{pruneTasksWavesNewValues}
for i := 1; i < len(pruneTasksWavesValues); i++ {
if pruneTasksWavesValues[i] == pruneTasksWavesValues[i-1] {
newPruneWaves = append(newPruneWaves, pruneTasksWavesNewValues)
} else {
pruneTasksWavesNewValues /= 2
newPruneWaves = append(newPruneWaves, pruneTasksWavesNewValues)
}
}

syncTaskUseBinaryTreeOrdering := "true"

for i := range uniquePruneWavesUsingNormalOrdering {
// tasks using normal wave ordering to reorder
iWave := uniquePruneWavesUsingNormalOrdering[i]

for _, task := range pruntTasksUsingNormalOrdering[iWave] {
task.waveOverride = &newPruneWaves[i]
task.waveUseBinaryTreeOrderingOverride = &syncTaskUseBinaryTreeOrdering
}
}

n := len(uniquePruneWavesUsingNormalOrdering)
for i := range uniquePruneWavesUsingBinaryTreeOrdering {
// tasks using binary tree wave ordering to reorder
iWave := uniquePruneWavesUsingBinaryTreeOrdering[i]

for _, task := range pruneTasksUsingBinaryTreeOrdering[iWave] {
task.waveOverride = &(newPruneWaves[n+i])
task.waveUseBinaryTreeOrderingOverride = &syncTaskUseBinaryTreeOrdering
}
}
} else {
// reorder waves for pruning tasks using symmetric swap on prune waves
n := len(uniquePruneWavesUsingNormalOrdering)
for i := 0; i < n/2; i++ {
// waves to swap
startWave := uniquePruneWavesUsingNormalOrdering[i]
endWave := uniquePruneWavesUsingNormalOrdering[n-1-i]

for _, task := range pruntTasksUsingNormalOrdering[startWave] {
task.waveOverride = &endWave
}

for _, task := range pruntTasksUsingNormalOrdering[endWave] {
task.waveOverride = &startWave
}
}
}

// for pruneLast tasks, modify the wave to sync phase last wave of tasks + 1
// to ensure proper cleanup, syncPhaseLastWave should also consider prune tasks to determine last wave
syncPhaseLastWave := 0
syncPhaseLastWaveUseBinaryTreeOrdering := "false"
for _, task := range tasks {
if task.phase == common.SyncPhaseSync {
if task.wave() > syncPhaseLastWave {
syncPhaseLastWave = task.wave()
syncPhaseLastWaveUseBinaryTreeOrdering = task.waveUseBinaryTreeOrdering()
}
}
}
syncPhaseLastWave = syncPhaseLastWave + 1

if syncPhaseLastWaveUseBinaryTreeOrdering == "false" {
syncPhaseLastWave++
} else {
syncPhaseLastWave *= 2
}

for _, task := range tasks {
if task.isPrune() &&
(sc.pruneLast || resourceutil.HasAnnotationOption(task.liveObj, common.AnnotationSyncOptions, common.SyncOptionPruneLast)) {
task.waveOverride = &syncPhaseLastWave
task.waveUseBinaryTreeOrderingOverride = &syncPhaseLastWaveUseBinaryTreeOrdering
}
}

Expand Down
Loading