From cf2706360da3b004648d71f0ae95dcacea61f803 Mon Sep 17 00:00:00 2001 From: SebastienFelix Date: Sat, 5 Jul 2025 14:33:41 +0200 Subject: [PATCH] feat(syncwaves) - use binary tree ordering for sync waves Signed-off-by: SebastienFelix --- pkg/sync/common/types.go | 4 +- pkg/sync/doc.go | 26 +++ pkg/sync/sync_context.go | 136 ++++++++++--- pkg/sync/sync_context_test.go | 329 +++++++++++++++++++++++++++++-- pkg/sync/sync_task.go | 24 ++- pkg/sync/sync_task_test.go | 8 +- pkg/sync/sync_tasks.go | 110 ++++++++++- pkg/sync/sync_tasks_test.go | 149 +++++++++++++- pkg/sync/syncwaves/waves.go | 11 ++ pkg/sync/syncwaves/waves_test.go | 6 + 10 files changed, 743 insertions(+), 60 deletions(-) diff --git a/pkg/sync/common/types.go b/pkg/sync/common/types.go index d5052ed4a..351f42852 100644 --- a/pkg/sync/common/types.go +++ b/pkg/sync/common/types.go @@ -12,6 +12,8 @@ const ( AnnotationSyncOptions = "argocd.argoproj.io/sync-options" // AnnotationSyncWave indicates which wave of the sync the resource or hook should be in AnnotationSyncWave = "argocd.argoproj.io/sync-wave" + // AnnotationUseBinaryTreeWaveOrdering indicates if the resource or hook's wave should be ordered using a binary tree ordering + AnnotationUseBinaryTreeWaveOrdering = "argocd.argoproj.io/use-binary-tree-wave-ordering" // AnnotationKeyHook contains the hook type of a resource AnnotationKeyHook = "argocd.argoproj.io/hook" // AnnotationKeyHookDeletePolicy is the policy of deleting a hook @@ -58,7 +60,7 @@ type SyncPhase string // SyncWaveHook is a callback function which will be invoked after each sync wave is successfully // applied during a sync operation. The callback indicates which phase and wave it had just // executed, and whether or not that wave was the final one. -type SyncWaveHook func(phase SyncPhase, wave int, final bool) error +type SyncWaveHook func(phase SyncPhase, waves []int, final bool) error const ( SyncPhasePreSync = "PreSync" diff --git a/pkg/sync/doc.go b/pkg/sync/doc.go index f4f5d8725..02bf243e7 100644 --- a/pkg/sync/doc.go +++ b/pkg/sync/doc.go @@ -4,6 +4,7 @@ Package implements Kubernetes resources synchronization and provides the followi - resource pruning - resource hooks - sync waves + - sync waves binary tree ordering - sync options # Basic Syncing @@ -75,6 +76,31 @@ that runs before all other resources. The `argocd.argoproj.io/sync-wave` annotat annotations: argocd.argoproj.io/sync-wave: "5" +# Sync Waves Binary Tree Ordering + +The wave ordering using a binary tree feature allows to run parallel waves of synchronisation where the sync-wave values +correspond to a complete binary tree with root's label equal to 1. A sync-wave value X would be considered less than Y +when using binary tree ordering if and only if there exists integers N and M such that : +Y = X * 2**N + M where 0 <= M < N. + +The `argocd.argoproj.io/use-binary-tree-wave-ordering` annotation define the type of wave's ordering used for a resource's wave: + + metadata: + annotations: + argocd.argoproj.io/sync-wave: "5" + argocd.argoproj.io/use-binary-tree-wave-ordering: "true" + +example of waves ordering using binary tree: + + 1 -----> 2 -----> 4 + \ \----> 5 + \---> 3 -----> 6 + \----> 7 + +Note that a resource using a binary tree ordering for sync waves will always be synced after all resources using a normal ordering. +Note also that all resources using a binary tree ordering and having a sync wave value inferior to 1 will behave like resources using +a normal wave ordering. + # Sync Options The sync options allows customizing the synchronization of selected resources. The options are specified using the diff --git a/pkg/sync/sync_context.go b/pkg/sync/sync_context.go index 8f4d51e4f..a9bad34b7 100644 --- a/pkg/sync/sync_context.go +++ b/pkg/sync/sync_context.go @@ -4,6 +4,8 @@ import ( "context" "encoding/json" "fmt" + "reflect" + "slices" "sort" "strings" "sync" @@ -562,16 +564,19 @@ func (sc *syncContext) Sync() { // remove any tasks not in this wave phase := tasks.phase() - wave := tasks.wave() - finalWave := phase == tasks.lastPhase() && wave == tasks.lastWave() + waves, wavesUseBinaryTreeOrdering := tasks.waves() + lastWaves, lastWavesUseBinaryTreeOrdering := tasks.lastWaves() + finalWaves := phase == tasks.lastPhase() && reflect.DeepEqual(waves, lastWaves) && wavesUseBinaryTreeOrdering == lastWavesUseBinaryTreeOrdering // if it is the last phase/wave and the only remaining tasks are non-hooks, the we are successful // EVEN if those objects subsequently degraded // This handles the common case where neither hooks or waves are used and a sync equates to simply an (asynchronous) kubectl apply of manifests, which succeeds immediately. - remainingTasks := tasks.Filter(func(t *syncTask) bool { return t.phase != phase || wave != t.wave() || t.isHook() }) + remainingTasks := tasks.Filter(func(t *syncTask) bool { return t.phase != phase || !slices.Contains(waves, t.wave()) || t.isHook() }) - sc.log.WithValues("phase", phase, "wave", wave, "tasks", tasks, "syncFailTasks", syncFailTasks).V(1).Info("Filtering tasks in correct phase and wave") - tasks = tasks.Filter(func(t *syncTask) bool { return t.phase == phase && t.wave() == wave }) + sc.log.WithValues("phase", phase, "wave", waves, "tasks", tasks, "syncFailTasks", syncFailTasks).V(1).Info("Filtering tasks in correct phase and wave") + tasks = tasks.Filter(func(t *syncTask) bool { + return t.phase == phase && slices.Contains(waves, t.wave()) && t.waveUseBinaryTreeOrdering() == wavesUseBinaryTreeOrdering + }) sc.setOperationPhase(common.OperationRunning, "one or more tasks are running") @@ -579,7 +584,7 @@ func (sc *syncContext) Sync() { runState := sc.runTasks(tasks, false) if sc.syncWaveHook != nil && runState != failed { - err := sc.syncWaveHook(phase, wave, finalWave) + err := sc.syncWaveHook(phase, waves, finalWaves) if err != nil { sc.deleteHooks(hooksPendingDeletionFailed) sc.setOperationPhase(common.OperationFailed, fmt.Sprintf("SyncWaveHook failed: %v", err)) @@ -899,52 +904,133 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) { } } - // for prune tasks, modify the waves for proper cleanup i.e reverse of sync wave (creation order) - pruneTasks := make(map[int][]*syncTask) + // for prune tasks, modify the waves for proper cleanup i.e reverse of sync wave (creation order). + // if all prune tasks use normal wave ordering, use the legacy method. Otherwise, use a binary tree wave ordering + // on all prune tasks and modify the waves to decreasing power of 2. + // For prune tasks which already use binary tree wave ordering, set an identical syncWave to tasks which + // have the same level in a complete binary tree rooted at 1 where each node n has 2*n and 2*n+1 as children. + + pruntTasksUsingNormalOrdering := make(map[int][]*syncTask) for _, task := range tasks { - if task.isPrune() { - pruneTasks[task.wave()] = append(pruneTasks[task.wave()], task) + if task.isPrune() && task.waveUseBinaryTreeOrdering() == "false" { + pruntTasksUsingNormalOrdering[task.wave()] = append(pruntTasksUsingNormalOrdering[task.wave()], task) } } + var uniquePruneWavesUsingNormalOrdering []int + for k := range pruntTasksUsingNormalOrdering { + uniquePruneWavesUsingNormalOrdering = append(uniquePruneWavesUsingNormalOrdering, k) + } - var uniquePruneWaves []int - for k := range pruneTasks { - uniquePruneWaves = append(uniquePruneWaves, k) + sort.Ints(uniquePruneWavesUsingNormalOrdering) + + pruneTasksUsingBinaryTreeOrdering := make(map[int][]*syncTask) + for _, task := range tasks { + if task.isPrune() && task.waveUseBinaryTreeOrdering() == "true" { + pruneTasksUsingBinaryTreeOrdering[task.wave()] = append(pruneTasksUsingBinaryTreeOrdering[task.wave()], task) + } } - sort.Ints(uniquePruneWaves) - // reorder waves for pruning tasks using symmetric swap on prune waves - n := len(uniquePruneWaves) - for i := 0; i < n/2; i++ { - // waves to swap - startWave := uniquePruneWaves[i] - endWave := uniquePruneWaves[n-1-i] + if len(pruneTasksUsingBinaryTreeOrdering) > 0 { + var uniquePruneWavesUsingBinaryTreeOrdering []int + for k := range pruneTasksUsingBinaryTreeOrdering { + uniquePruneWavesUsingBinaryTreeOrdering = append(uniquePruneWavesUsingBinaryTreeOrdering, k) + } + sort.Ints(uniquePruneWavesUsingBinaryTreeOrdering) - for _, task := range pruneTasks[startWave] { - task.waveOverride = &endWave + pruneTasksWavesValues := []int{0} + for i := 1; i < len(uniquePruneWavesUsingNormalOrdering); i++ { + pruneTasksWavesValues = append(pruneTasksWavesValues, i) + } + nextPotentialWaveValue := len(uniquePruneWavesUsingNormalOrdering) + if len(uniquePruneWavesUsingNormalOrdering) != 0 { + pruneTasksWavesValues = append(pruneTasksWavesValues, nextPotentialWaveValue) + } + for i := 1; i < len(uniquePruneWavesUsingBinaryTreeOrdering); i++ { + currentWaveValue := biggestPowerOf2InferiorThan(uniquePruneWavesUsingBinaryTreeOrdering[i]) + previousWaveValue := biggestPowerOf2InferiorThan(uniquePruneWavesUsingBinaryTreeOrdering[i-1]) + if currentWaveValue == previousWaveValue { + pruneTasksWavesValues = append(pruneTasksWavesValues, nextPotentialWaveValue) + } else { + nextPotentialWaveValue++ + pruneTasksWavesValues = append(pruneTasksWavesValues, nextPotentialWaveValue) + } } - for _, task := range pruneTasks[endWave] { - task.waveOverride = &startWave + pruneTasksWavesNewValues := PowInt(2, pruneTasksWavesValues[len(pruneTasksWavesValues)-1]) + newPruneWaves := []int{pruneTasksWavesNewValues} + for i := 1; i < len(pruneTasksWavesValues); i++ { + if pruneTasksWavesValues[i] == pruneTasksWavesValues[i-1] { + newPruneWaves = append(newPruneWaves, pruneTasksWavesNewValues) + } else { + pruneTasksWavesNewValues /= 2 + newPruneWaves = append(newPruneWaves, pruneTasksWavesNewValues) + } + } + + syncTaskUseBinaryTreeOrdering := "true" + + for i := range uniquePruneWavesUsingNormalOrdering { + // tasks using normal wave ordering to reorder + iWave := uniquePruneWavesUsingNormalOrdering[i] + + for _, task := range pruntTasksUsingNormalOrdering[iWave] { + task.waveOverride = &newPruneWaves[i] + task.waveUseBinaryTreeOrderingOverride = &syncTaskUseBinaryTreeOrdering + } + } + + n := len(uniquePruneWavesUsingNormalOrdering) + for i := range uniquePruneWavesUsingBinaryTreeOrdering { + // tasks using binary tree wave ordering to reorder + iWave := uniquePruneWavesUsingBinaryTreeOrdering[i] + + for _, task := range pruneTasksUsingBinaryTreeOrdering[iWave] { + task.waveOverride = &(newPruneWaves[n+i]) + task.waveUseBinaryTreeOrderingOverride = &syncTaskUseBinaryTreeOrdering + } + } + } else { + // reorder waves for pruning tasks using symmetric swap on prune waves + n := len(uniquePruneWavesUsingNormalOrdering) + for i := 0; i < n/2; i++ { + // waves to swap + startWave := uniquePruneWavesUsingNormalOrdering[i] + endWave := uniquePruneWavesUsingNormalOrdering[n-1-i] + + for _, task := range pruntTasksUsingNormalOrdering[startWave] { + task.waveOverride = &endWave + } + + for _, task := range pruntTasksUsingNormalOrdering[endWave] { + task.waveOverride = &startWave + } } } // for pruneLast tasks, modify the wave to sync phase last wave of tasks + 1 // to ensure proper cleanup, syncPhaseLastWave should also consider prune tasks to determine last wave syncPhaseLastWave := 0 + syncPhaseLastWaveUseBinaryTreeOrdering := "false" for _, task := range tasks { if task.phase == common.SyncPhaseSync { if task.wave() > syncPhaseLastWave { syncPhaseLastWave = task.wave() + syncPhaseLastWaveUseBinaryTreeOrdering = task.waveUseBinaryTreeOrdering() } } } - syncPhaseLastWave = syncPhaseLastWave + 1 + + if syncPhaseLastWaveUseBinaryTreeOrdering == "false" { + syncPhaseLastWave++ + } else { + syncPhaseLastWave *= 2 + } for _, task := range tasks { if task.isPrune() && (sc.pruneLast || resourceutil.HasAnnotationOption(task.liveObj, common.AnnotationSyncOptions, common.SyncOptionPruneLast)) { task.waveOverride = &syncPhaseLastWave + task.waveUseBinaryTreeOrderingOverride = &syncPhaseLastWaveUseBinaryTreeOrdering } } diff --git a/pkg/sync/sync_context_test.go b/pkg/sync/sync_context_test.go index 0e8d01ebb..fd9cc0246 100644 --- a/pkg/sync/sync_context_test.go +++ b/pkg/sync/sync_context_test.go @@ -1631,10 +1631,10 @@ func TestSyncWaveHook(t *testing.T) { syncCtx.hooks = []*unstructured.Unstructured{pod3} called := false - syncCtx.syncWaveHook = func(phase synccommon.SyncPhase, wave int, final bool) error { + syncCtx.syncWaveHook = func(phase synccommon.SyncPhase, waves []int, final bool) error { called = true assert.Equal(t, synccommon.SyncPhaseSync, string(phase)) - assert.Equal(t, -1, wave) + assert.True(t, reflect.DeepEqual([]int{-1}, waves)) assert.False(t, final) return nil } @@ -1644,7 +1644,7 @@ func TestSyncWaveHook(t *testing.T) { // call sync again, it should not invoke the SyncWaveHook callback since we only should be // doing this after an apply, and not every reconciliation called = false - syncCtx.syncWaveHook = func(_ synccommon.SyncPhase, _ int, _ bool) error { + syncCtx.syncWaveHook = func(_ synccommon.SyncPhase, _ []int, _ bool) error { called = true return nil } @@ -1657,10 +1657,10 @@ func TestSyncWaveHook(t *testing.T) { pod1Res.HookPhase = synccommon.OperationSucceeded syncCtx.syncRes[resourceResultKey(pod1Res.ResourceKey, synccommon.SyncPhaseSync)] = pod1Res called = false - syncCtx.syncWaveHook = func(phase synccommon.SyncPhase, wave int, final bool) error { + syncCtx.syncWaveHook = func(phase synccommon.SyncPhase, waves []int, final bool) error { called = true assert.Equal(t, synccommon.SyncPhaseSync, string(phase)) - assert.Equal(t, 0, wave) + assert.True(t, reflect.DeepEqual([]int{0}, waves)) assert.False(t, final) return nil } @@ -1673,10 +1673,10 @@ func TestSyncWaveHook(t *testing.T) { pod2Res.HookPhase = synccommon.OperationSucceeded syncCtx.syncRes[resourceResultKey(pod2Res.ResourceKey, synccommon.SyncPhaseSync)] = pod2Res called = false - syncCtx.syncWaveHook = func(phase synccommon.SyncPhase, wave int, final bool) error { + syncCtx.syncWaveHook = func(phase synccommon.SyncPhase, waves []int, final bool) error { called = true assert.Equal(t, synccommon.SyncPhasePostSync, string(phase)) - assert.Equal(t, 0, wave) + assert.True(t, reflect.DeepEqual([]int{0}, waves)) assert.True(t, final) return nil } @@ -1695,7 +1695,7 @@ func TestSyncWaveHookFail(t *testing.T) { }) called := false - syncCtx.syncWaveHook = func(_ synccommon.SyncPhase, _ int, _ bool) error { + syncCtx.syncWaveHook = func(_ synccommon.SyncPhase, _ []int, _ bool) error { called = true return errors.New("intentional error") } @@ -1728,7 +1728,8 @@ func TestPruneLast(t *testing.T) { assert.True(t, successful) assert.Len(t, tasks, 3) // last wave is the last sync wave for non-prune task + 1 - assert.Equal(t, 1, tasks.lastWave()) + lastWaves, _ := tasks.lastWaves() + assert.True(t, reflect.DeepEqual([]int{1}, lastWaves)) }) t.Run("syncPhaseDifferentWave", func(t *testing.T) { @@ -1744,7 +1745,8 @@ func TestPruneLast(t *testing.T) { assert.True(t, successful) assert.Len(t, tasks, 3) // last wave is the last sync wave for tasks + 1 - assert.Equal(t, 8, tasks.lastWave()) + lastWaves, _ := tasks.lastWaves() + assert.True(t, reflect.DeepEqual([]int{8}, lastWaves)) }) t.Run("pruneLastIndividualResources", func(t *testing.T) { @@ -1762,7 +1764,8 @@ func TestPruneLast(t *testing.T) { assert.True(t, successful) assert.Len(t, tasks, 3) // last wave is the last sync wave for tasks + 1 - assert.Equal(t, 8, tasks.lastWave()) + lastWaves, _ := tasks.lastWaves() + assert.True(t, reflect.DeepEqual([]int{8}, lastWaves)) }) } @@ -2262,3 +2265,307 @@ func TestNeedsClientSideApplyMigration(t *testing.T) { }) } } + +func TestWaveReorderingOfPruneTasksUsingBinaryTreeOrdering(t *testing.T) { + ns := testingutils.NewNamespace() + ns.SetName("ns") + pod1 := testingutils.NewPod() + pod1.SetName("pod-1") + pod2 := testingutils.NewPod() + pod2.SetName("pod-2") + pod3 := testingutils.NewPod() + pod3.SetName("pod-3") + pod4 := testingutils.NewPod() + pod4.SetName("pod-4") + pod5 := testingutils.NewPod() + pod5.SetName("pod-5") + pod6 := testingutils.NewPod() + pod6.SetName("pod-6") + pod7 := testingutils.NewPod() + pod7.SetName("pod-7") + + type Test struct { + name string + target []*unstructured.Unstructured + live []*unstructured.Unstructured + expectedWaveOrder map[string]int + expectedWaveUseBinaryTreeOrdering map[string]string + pruneLast bool + } + runTest := func(test Test) { + t.Run(test.name, func(t *testing.T) { + fmt.Println(test.name) + syncCtx := newTestSyncCtx(nil) + syncCtx.pruneLast = test.pruneLast + syncCtx.resources = groupResources(ReconciliationResult{ + Live: test.live, + Target: test.target, + }) + tasks, successful := syncCtx.getSyncTasks() + + assert.True(t, successful) + assert.Len(t, tasks, len(test.target)) + + for i, task := range tasks { + assert.Equal(t, test.expectedWaveOrder[task.name()], task.wave()) + assert.Equal(t, test.expectedWaveUseBinaryTreeOrdering[task.name()], task.waveUseBinaryTreeOrdering()) + fmt.Println(i) + fmt.Println(test.expectedWaveUseBinaryTreeOrdering[task.name()]) + fmt.Println(task.waveUseBinaryTreeOrdering()) + } + }) + } + + // same wave + sameWaveTests := []Test{ + { + name: "sameWave_noPruneTasks", + live: []*unstructured.Unstructured{nil, nil, nil, nil, nil}, + target: []*unstructured.Unstructured{ns, pod1, pod2, pod3, pod4}, + // no change in wave order + expectedWaveOrder: map[string]int{ns.GetName(): 0, pod1.GetName(): 0, pod2.GetName(): 0, pod3.GetName(): 0, pod4.GetName(): 0}, + expectedWaveUseBinaryTreeOrdering: map[string]string{ns.GetName(): "false", pod1.GetName(): "false", pod2.GetName(): "false", pod3.GetName(): "false", pod4.GetName(): "false"}, + }, + { + name: "sameWave_allPruneTasks", + live: []*unstructured.Unstructured{ns, pod1, pod2, pod3, pod4}, + target: []*unstructured.Unstructured{nil, nil, nil, nil, nil}, + // no change in wave order + expectedWaveOrder: map[string]int{ns.GetName(): 0, pod1.GetName(): 0, pod2.GetName(): 0, pod3.GetName(): 0, pod4.GetName(): 0}, + expectedWaveUseBinaryTreeOrdering: map[string]string{ns.GetName(): "false", pod1.GetName(): "false", pod2.GetName(): "false", pod3.GetName(): "false", pod4.GetName(): "false"}, + }, + { + name: "sameWave_mixedTasks", + live: []*unstructured.Unstructured{ns, pod1, nil, pod3, pod4}, + target: []*unstructured.Unstructured{ns, nil, pod2, nil, nil}, + // no change in wave order + expectedWaveOrder: map[string]int{ns.GetName(): 0, pod1.GetName(): 0, pod2.GetName(): 0, pod3.GetName(): 0, pod4.GetName(): 0}, + expectedWaveUseBinaryTreeOrdering: map[string]string{ns.GetName(): "false", pod1.GetName(): "false", pod2.GetName(): "false", pod3.GetName(): "false", pod4.GetName(): "false"}, + }, + } + + for _, test := range sameWaveTests { + runTest(test) + } + + // different wave + differentWaveTests := []Test{ + { + name: "differentWave_noPruneTasks", + target: []*unstructured.Unstructured{ns, pod1, pod2, pod3, pod4}, + live: []*unstructured.Unstructured{nil, nil, nil, nil, nil}, + // no change in wave order + expectedWaveOrder: map[string]int{ + // new wave // original wave + ns.GetName(): 0, // 0 + pod1.GetName(): 1, // 1 + pod2.GetName(): 2, // 2 + pod3.GetName(): 4, // 4 + pod4.GetName(): 8, // 8 + }, + expectedWaveUseBinaryTreeOrdering: map[string]string{ + ns.GetName(): "true", + pod1.GetName(): "true", + pod2.GetName(): "true", + pod3.GetName(): "true", + pod4.GetName(): "true", + }, + }, + { + name: "differentWave_allPruneTasks", + target: []*unstructured.Unstructured{nil, nil, nil, nil, nil}, + live: []*unstructured.Unstructured{ns, pod1, pod2, pod3, pod4}, + // change in prune wave order + expectedWaveOrder: map[string]int{ + // new wave // original wave + ns.GetName(): 16, // 0 + pod1.GetName(): 8, // 1 + pod2.GetName(): 4, // 2 + pod3.GetName(): 2, // 4 + pod4.GetName(): 1, // 8 + }, + expectedWaveUseBinaryTreeOrdering: map[string]string{ + ns.GetName(): "true", + pod1.GetName(): "true", + pod2.GetName(): "true", + pod3.GetName(): "true", + pod4.GetName(): "true", + }, + }, + { + name: "differentWave_mixedTasks", + target: []*unstructured.Unstructured{ns, nil, pod2, nil, nil}, + live: []*unstructured.Unstructured{ns, pod1, nil, pod3, pod4}, + // change in prune wave order + expectedWaveOrder: map[string]int{ + // new wave // original wave + pod1.GetName(): 4, // 1 + pod3.GetName(): 2, // 4 + pod4.GetName(): 1, // 8 + + // no change since non prune tasks + ns.GetName(): 0, // 0 + pod2.GetName(): 2, // 2 + }, + expectedWaveUseBinaryTreeOrdering: map[string]string{ + ns.GetName(): "true", + pod1.GetName(): "true", + pod2.GetName(): "true", + pod3.GetName(): "true", + pod4.GetName(): "true", + }, + }, + } + + for _, test := range differentWaveTests { + ns.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "0", synccommon.AnnotationUseBinaryTreeWaveOrdering: "true"}) + pod1.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "1", synccommon.AnnotationUseBinaryTreeWaveOrdering: "true"}) + pod2.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "2", synccommon.AnnotationUseBinaryTreeWaveOrdering: "true"}) + pod3.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "4", synccommon.AnnotationUseBinaryTreeWaveOrdering: "true"}) + pod4.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "8", synccommon.AnnotationUseBinaryTreeWaveOrdering: "true"}) + + runTest(test) + } + + // different wave + differentWaveAndDifferentWaveOrderingTests := []Test{ + { + name: "differentWaveAndDifferentWaveOrdering_AllPruneTasks", + target: []*unstructured.Unstructured{nil, nil, nil, nil, nil}, + live: []*unstructured.Unstructured{ns, pod1, pod2, pod3, pod4}, + // no change in wave order + expectedWaveOrder: map[string]int{ + // new wave // original wave + ns.GetName(): 8, // 0 + pod1.GetName(): 4, // 1 + pod2.GetName(): 2, // 2 + pod3.GetName(): 1, // 4 + pod4.GetName(): 1, // 5 + }, + expectedWaveUseBinaryTreeOrdering: map[string]string{ + ns.GetName(): "true", + pod1.GetName(): "true", + pod2.GetName(): "true", + pod3.GetName(): "true", + pod4.GetName(): "true", + }, + }, + } + + for _, test := range differentWaveAndDifferentWaveOrderingTests { + ns.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "0"}) + pod1.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "1"}) + pod2.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "2", synccommon.AnnotationUseBinaryTreeWaveOrdering: "true"}) + pod3.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "4", synccommon.AnnotationUseBinaryTreeWaveOrdering: "true"}) + pod4.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "5", synccommon.AnnotationUseBinaryTreeWaveOrdering: "true"}) + + runTest(test) + } + + // prune last + pruneLastTests := []Test{ + { + name: "pruneLast", + pruneLast: true, + live: []*unstructured.Unstructured{ns, pod1, pod2, pod3, pod4}, + target: []*unstructured.Unstructured{ns, nil, nil, nil, nil}, + // change in prune wave order + expectedWaveOrder: map[string]int{ + // new wave // original wave + pod1.GetName(): 16, // 1 + pod2.GetName(): 16, // 2 + pod3.GetName(): 16, // 4 + pod4.GetName(): 16, // 8 + + // no change since non prune tasks + ns.GetName(): 0, // 0 + }, + expectedWaveUseBinaryTreeOrdering: map[string]string{ + ns.GetName(): "true", + pod1.GetName(): "true", + pod2.GetName(): "true", + pod3.GetName(): "true", + pod4.GetName(): "true", + }, + }, + { + name: "pruneLastIndividualResources", + pruneLast: false, + live: []*unstructured.Unstructured{ns, pod1, pod2, pod3, pod4}, + target: []*unstructured.Unstructured{ns, nil, nil, nil, nil}, + // change in wave order + expectedWaveOrder: map[string]int{ + // new wave // original wave + pod1.GetName(): 4, // 1 + pod2.GetName(): 16, // 2 + pod3.GetName(): 2, // 3 + pod4.GetName(): 1, // 4 + + // no change since non prune tasks + ns.GetName(): 0, // 0 + }, + expectedWaveUseBinaryTreeOrdering: map[string]string{ + ns.GetName(): "true", + pod1.GetName(): "true", + pod2.GetName(): "true", + pod3.GetName(): "true", + pod4.GetName(): "true", + }, + }, + } + + for _, test := range pruneLastTests { + ns.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "0", synccommon.AnnotationUseBinaryTreeWaveOrdering: "true"}) + pod1.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "1", synccommon.AnnotationUseBinaryTreeWaveOrdering: "true"}) + pod2.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "2", synccommon.AnnotationSyncOptions: synccommon.SyncOptionPruneLast}) + pod3.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "4", synccommon.AnnotationUseBinaryTreeWaveOrdering: "true"}) + pod4.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "8", synccommon.AnnotationUseBinaryTreeWaveOrdering: "true"}) + + runTest(test) + } + + // additional test + tests := []Test{ + { + name: "mixedTasks", + target: []*unstructured.Unstructured{ns, nil, pod2, nil, nil, nil, pod6, nil}, + live: []*unstructured.Unstructured{ns, pod1, nil, pod3, pod4, pod5, pod6, pod7}, + // change in prune wave order + expectedWaveOrder: map[string]int{ + // new wave // original wave + pod1.GetName(): 5, // 1 + pod3.GetName(): 4, // 3 + pod4.GetName(): 4, // 3 + pod5.GetName(): 3, // 4 + pod7.GetName(): 1, // 5 + + // no change since non prune tasks + ns.GetName(): -1, // -1 + pod2.GetName(): 3, // 3 + pod6.GetName(): 5, // 5 + }, + expectedWaveUseBinaryTreeOrdering: map[string]string{ + pod1.GetName(): "false", + pod3.GetName(): "false", + pod4.GetName(): "false", + pod5.GetName(): "false", + pod7.GetName(): "false", + ns.GetName(): "false", + pod2.GetName(): "false", + pod6.GetName(): "false", + }, + }, + } + for _, test := range tests { + ns.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "-1"}) + pod1.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "1"}) + pod2.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "3"}) + pod3.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "3"}) + pod4.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "3"}) + pod5.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "4"}) + pod6.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "5"}) + pod7.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "5"}) + + runTest(test) + } +} diff --git a/pkg/sync/sync_task.go b/pkg/sync/sync_task.go index 81803c309..c57511efb 100644 --- a/pkg/sync/sync_task.go +++ b/pkg/sync/sync_task.go @@ -16,14 +16,15 @@ import ( // indicates the live object needs to be pruned. A liveObj of nil indicates the object has yet to // be deployed type syncTask struct { - phase common.SyncPhase - liveObj *unstructured.Unstructured - targetObj *unstructured.Unstructured - skipDryRun bool - syncStatus common.ResultCode - operationState common.OperationPhase - message string - waveOverride *int + phase common.SyncPhase + liveObj *unstructured.Unstructured + targetObj *unstructured.Unstructured + skipDryRun bool + syncStatus common.ResultCode + operationState common.OperationPhase + message string + waveOverride *int + waveUseBinaryTreeOrderingOverride *string } func ternary(val bool, a, b string) string { @@ -63,6 +64,13 @@ func (t *syncTask) wave() int { return syncwaves.Wave(t.obj()) } +func (t *syncTask) waveUseBinaryTreeOrdering() string { + if t.waveUseBinaryTreeOrderingOverride != nil { + return *t.waveUseBinaryTreeOrderingOverride + } + return syncwaves.UseBinaryTreeWaveOrdering(t.obj()) +} + func (t *syncTask) isHook() bool { return hook.IsHook(t.obj()) } diff --git a/pkg/sync/sync_task_test.go b/pkg/sync/sync_task_test.go index 16825d0b4..1a04c8205 100644 --- a/pkg/sync/sync_task_test.go +++ b/pkg/sync/sync_task_test.go @@ -69,7 +69,11 @@ func Test_syncTask_deleteBeforeCreation(t *testing.T) { assert.True(t, (&syncTask{liveObj: testingutils.Annotate(testingutils.Annotate(testingutils.NewPod(), "argocd.argoproj.io/hook", "Sync"), "argocd.argoproj.io/hook-delete-policy", "BeforeHookCreation")}).deleteBeforeCreation()) } -func Test_syncTask_wave(t *testing.T) { +func TestSyncTaskWave(t *testing.T) { assert.Equal(t, 0, (&syncTask{targetObj: testingutils.NewPod()}).wave()) - assert.Equal(t, 1, (&syncTask{targetObj: testingutils.Annotate(testingutils.NewPod(), "argocd.argoproj.io/sync-wave", "1")}).wave()) + assert.Equal(t, "false", (&syncTask{targetObj: testingutils.NewPod()}).waveUseBinaryTreeOrdering()) + assert.Equal(t, 1, (&syncTask{targetObj: testingutils.Annotate(testingutils.NewPod(), common.AnnotationSyncWave, "1")}).wave()) + assert.Equal(t, "false", (&syncTask{targetObj: testingutils.Annotate(testingutils.NewPod(), common.AnnotationSyncWave, "1")}).waveUseBinaryTreeOrdering()) + assert.Equal(t, 1, (&syncTask{targetObj: testingutils.Annotate(testingutils.Annotate(testingutils.NewPod(), common.AnnotationSyncWave, "1"), common.AnnotationUseBinaryTreeWaveOrdering, "true")}).wave()) + assert.Equal(t, "true", (&syncTask{targetObj: testingutils.Annotate(testingutils.Annotate(testingutils.NewPod(), common.AnnotationSyncWave, "1"), common.AnnotationUseBinaryTreeWaveOrdering, "true")}).waveUseBinaryTreeOrdering()) } diff --git a/pkg/sync/sync_tasks.go b/pkg/sync/sync_tasks.go index 813533a23..482b34ad9 100644 --- a/pkg/sync/sync_tasks.go +++ b/pkg/sync/sync_tasks.go @@ -2,12 +2,14 @@ package sync import ( "fmt" + "reflect" "sort" "strings" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "github.com/argoproj/gitops-engine/pkg/sync/common" + "github.com/argoproj/gitops-engine/pkg/sync/syncwaves" "github.com/argoproj/gitops-engine/pkg/utils/kube" ) @@ -137,6 +139,46 @@ func (s syncTasks) Sort() { }) } +func LessUsingBinaryTreeOrdering(i int, j int) bool { + if (i <= 1) && (j <= 1) { + return i < j + } + if i <= 1 { + return true + } + if j <= 1 { + return false + } + + N := 2 + for j-int(N)*i >= 0 { + if j-N*i < N { + return true + } + N *= 2 + } + return false +} + +func biggestPowerOf2InferiorThan(n int) int { + if n < 1 { + return 0 + } + i := 1 + for i <= n { + i = i * 2 + } + return i / 2 +} + +func PowInt(n int, m int) int { + result := 1 + for i := 1; i <= m; i++ { + result = result * n + } + return result +} + // adjust order of tasks and bubble up tasks which are dependencies of other tasks // (e.g. namespace sync should happen before resources that resides in that namespace) func (s syncTasks) adjustDeps(isDep func(obj *unstructured.Unstructured) (string, bool), doesRefDep func(obj *unstructured.Unstructured) (string, bool)) { @@ -250,11 +292,33 @@ func (s syncTasks) phase() common.SyncPhase { return "" } -func (s syncTasks) wave() int { - if len(s) > 0 { - return s[0].wave() +func (s syncTasks) waves() ([]int, string) { + var wavesValues []int + tasksUsingNormalWaveOrdering := s.Filter(func(t *syncTask) bool { return syncwaves.UseBinaryTreeWaveOrdering(t.obj()) == "false" }) + tasksUsingBinaryTreeWaveOrdering := s.Filter(func(t *syncTask) bool { return syncwaves.UseBinaryTreeWaveOrdering(t.obj()) == "true" }) + if len(tasksUsingNormalWaveOrdering) > 0 { + wavesValues = append(wavesValues, tasksUsingNormalWaveOrdering[0].wave()) + return wavesValues, "false" + } + if len(tasksUsingBinaryTreeWaveOrdering) > 0 { + for iSyncTask := range tasksUsingBinaryTreeWaveOrdering { + candidateTask := tasksUsingBinaryTreeWaveOrdering[iSyncTask] + candidateTaskHasNoAntecedent := true + for jSyncTask := range tasksUsingBinaryTreeWaveOrdering { + task := tasksUsingBinaryTreeWaveOrdering[jSyncTask] + if LessUsingBinaryTreeOrdering(syncwaves.Wave(task.obj()), syncwaves.Wave(candidateTask.obj())) { + candidateTaskHasNoAntecedent = false + break + } + } + if candidateTaskHasNoAntecedent { + wavesValues = append(wavesValues, syncwaves.Wave(candidateTask.obj())) + } + } + return wavesValues, "true" } - return 0 + wavesValues = append(wavesValues, 0) + return wavesValues, "false" } func (s syncTasks) lastPhase() common.SyncPhase { @@ -264,13 +328,41 @@ func (s syncTasks) lastPhase() common.SyncPhase { return "" } -func (s syncTasks) lastWave() int { - if len(s) > 0 { - return s[len(s)-1].wave() +func (s syncTasks) lastWaves() ([]int, string) { + tasksUsingNormalWaveOrdering := s.Filter(func(t *syncTask) bool { return syncwaves.UseBinaryTreeWaveOrdering(t.obj()) == "false" }) + tasksUsingBinaryTreeWaveOrdering := s.Filter(func(t *syncTask) bool { return syncwaves.UseBinaryTreeWaveOrdering(t.obj()) == "true" }) + + var lastWavesValues []int + + if len(tasksUsingBinaryTreeWaveOrdering) > 0 { + for iSyncTask := range tasksUsingBinaryTreeWaveOrdering { + candidateTask := s[iSyncTask] + candidateTaskHasNoSuccessor := true + for jSyncTask := range s { + task := s[jSyncTask] + if LessUsingBinaryTreeOrdering(syncwaves.Wave(candidateTask.obj()), syncwaves.Wave(task.obj())) { + candidateTaskHasNoSuccessor = false + break + } + } + if candidateTaskHasNoSuccessor { + lastWavesValues = append(lastWavesValues, syncwaves.Wave(candidateTask.obj())) + } + } + return lastWavesValues, "true" } - return 0 + + if len(tasksUsingNormalWaveOrdering) > 0 { + lastWavesValues = append(lastWavesValues, tasksUsingNormalWaveOrdering[len(tasksUsingNormalWaveOrdering)-1].wave()) + return lastWavesValues, "false" + } + + lastWavesValues = append(lastWavesValues, 0) + return lastWavesValues, "false" } func (s syncTasks) multiStep() bool { - return s.wave() != s.lastWave() || s.phase() != s.lastPhase() + wavesValues, wavesUseBinaryTreeOrdering := s.waves() + lastWavesValues, lastWavesUseBinaryTreeOrdering := s.lastWaves() + return !reflect.DeepEqual(wavesValues, lastWavesValues) || wavesUseBinaryTreeOrdering != lastWavesUseBinaryTreeOrdering || s.phase() != s.lastPhase() } diff --git a/pkg/sync/sync_tasks_test.go b/pkg/sync/sync_tasks_test.go index 18135beab..1d2a9ee45 100644 --- a/pkg/sync/sync_tasks_test.go +++ b/pkg/sync/sync_tasks_test.go @@ -1,6 +1,7 @@ package sync import ( + "reflect" "sort" "testing" @@ -465,9 +466,11 @@ func Test_syncTasks_multiStep(t *testing.T) { t.Run("Single", func(t *testing.T) { tasks := syncTasks{{liveObj: testingutils.Annotate(testingutils.NewPod(), common.AnnotationSyncWave, "-1"), phase: common.SyncPhaseSync}} assert.Equal(t, common.SyncPhaseSync, string(tasks.phase())) - assert.Equal(t, -1, tasks.wave()) + tasksWaves, _ := tasks.waves() + assert.True(t, reflect.DeepEqual([]int{-1}, tasksWaves)) assert.Equal(t, common.SyncPhaseSync, string(tasks.lastPhase())) - assert.Equal(t, -1, tasks.lastWave()) + tasksLastWaves, _ := tasks.lastWaves() + assert.True(t, reflect.DeepEqual([]int{-1}, tasksLastWaves)) assert.False(t, tasks.multiStep()) }) t.Run("Double", func(t *testing.T) { @@ -476,9 +479,147 @@ func Test_syncTasks_multiStep(t *testing.T) { {liveObj: testingutils.Annotate(testingutils.NewPod(), common.AnnotationSyncWave, "1"), phase: common.SyncPhasePostSync}, } assert.Equal(t, common.SyncPhasePreSync, string(tasks.phase())) - assert.Equal(t, -1, tasks.wave()) + tasksWaves, _ := tasks.waves() + assert.True(t, reflect.DeepEqual([]int{-1}, tasksWaves)) assert.Equal(t, common.SyncPhasePostSync, string(tasks.lastPhase())) - assert.Equal(t, 1, tasks.lastWave()) + tasksLastWaves, _ := tasks.lastWaves() + assert.True(t, reflect.DeepEqual([]int{1}, tasksLastWaves)) assert.True(t, tasks.multiStep()) }) } + +var syncTaskUsingNormalWaveOrdering = syncTasks{ + { + targetObj: &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "annotations": map[string]any{ + common.AnnotationSyncWave: "-1", + }, + }, + }, + }, + }, +} + +func TestSyncTaskUsingNormalWaveOrdering(t *testing.T) { + tasks := syncTaskUsingNormalWaveOrdering + tasksWaves, _ := tasks.waves() + tasksLastWaves, _ := tasks.lastWaves() + assert.True(t, reflect.DeepEqual(tasksWaves, []int{-1})) + assert.True(t, reflect.DeepEqual(tasksLastWaves, []int{-1})) +} + +var syncTasksUsingNormalWaveOrdering = syncTasks{ + { + targetObj: &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "annotations": map[string]any{ + common.AnnotationSyncWave: "-1", + }, + }, + }, + }, + }, + { + targetObj: &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "annotations": map[string]any{ + common.AnnotationSyncWave: "0", + }, + }, + }, + }, + }, +} + +func TestSyncTasksUsingNormalWaveOrdering(t *testing.T) { + tasks := syncTasksUsingNormalWaveOrdering + tasksWaves, _ := tasks.waves() + tasksLastWaves, _ := tasks.lastWaves() + assert.True(t, reflect.DeepEqual(tasksWaves, []int{-1})) + assert.True(t, reflect.DeepEqual(tasksLastWaves, []int{0})) +} + +var syncTasksUsingBinaryTreeWaveOrdering_BothParentTasks = syncTasks{ + { + targetObj: &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "annotations": map[string]any{ + common.AnnotationSyncWave: "2", + common.AnnotationUseBinaryTreeWaveOrdering: "true", + }, + }, + }, + }, + }, + { + targetObj: &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "annotations": map[string]any{ + common.AnnotationSyncWave: "3", + common.AnnotationUseBinaryTreeWaveOrdering: "true", + }, + }, + }, + }, + }, +} + +func TestSyncTasksUsingBinaryTreeWaveOrderingBothParentTasks(t *testing.T) { + tasks := syncTasksUsingBinaryTreeWaveOrdering_BothParentTasks + tasksWaves, _ := tasks.waves() + tasksLastWaves, _ := tasks.lastWaves() + assert.True(t, reflect.DeepEqual(tasksWaves, []int{2, 3})) + assert.True(t, reflect.DeepEqual(tasksLastWaves, []int{2, 3})) +} + +var syncTasksUsingBinaryTreeWaveOrdering_OneParentTasks = syncTasks{ + { + targetObj: &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "annotations": map[string]any{ + common.AnnotationSyncWave: "2", + common.AnnotationUseBinaryTreeWaveOrdering: "true", + }, + }, + }, + }, + }, + { + targetObj: &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "annotations": map[string]any{ + common.AnnotationSyncWave: "4", + common.AnnotationUseBinaryTreeWaveOrdering: "true", + }, + }, + }, + }, + }, +} + +func TestSyncTasksusingBinaryTreeWaveOrderingOneParentTasks(t *testing.T) { + tasks := syncTasksUsingBinaryTreeWaveOrdering_OneParentTasks + tasksWaves, _ := tasks.waves() + tasksLastWaves, _ := tasks.lastWaves() + assert.True(t, reflect.DeepEqual(tasksWaves, []int{2})) + assert.True(t, reflect.DeepEqual(tasksLastWaves, []int{4})) +} + +func TestLessUsingBinaryTreeOrdering(t *testing.T) { + assert.True(t, LessUsingBinaryTreeOrdering(-4, -1)) + assert.True(t, LessUsingBinaryTreeOrdering(-4, 2)) + assert.True(t, LessUsingBinaryTreeOrdering(2, 4)) + assert.True(t, LessUsingBinaryTreeOrdering(2, 8)) + assert.False(t, LessUsingBinaryTreeOrdering(2, 3)) + assert.False(t, LessUsingBinaryTreeOrdering(4, 3)) + assert.False(t, LessUsingBinaryTreeOrdering(2, 6)) + assert.False(t, LessUsingBinaryTreeOrdering(2, -1)) +} diff --git a/pkg/sync/syncwaves/waves.go b/pkg/sync/syncwaves/waves.go index 200433571..9aa24a3f7 100644 --- a/pkg/sync/syncwaves/waves.go +++ b/pkg/sync/syncwaves/waves.go @@ -19,3 +19,14 @@ func Wave(obj *unstructured.Unstructured) int { } return helmhook.Weight(obj) } + +func UseBinaryTreeWaveOrdering(obj *unstructured.Unstructured) string { + text, ok := obj.GetAnnotations()[common.AnnotationUseBinaryTreeWaveOrdering] + if ok { + if text == "true" { + return text + } + return "false" + } + return "false" +} diff --git a/pkg/sync/syncwaves/waves_test.go b/pkg/sync/syncwaves/waves_test.go index 2fadf4fd7..d596d902d 100644 --- a/pkg/sync/syncwaves/waves_test.go +++ b/pkg/sync/syncwaves/waves_test.go @@ -13,3 +13,9 @@ func TestWave(t *testing.T) { assert.Equal(t, 1, Wave(testingutils.Annotate(testingutils.NewPod(), "argocd.argoproj.io/sync-wave", "1"))) assert.Equal(t, 1, Wave(testingutils.Annotate(testingutils.NewPod(), "helm.sh/hook-weight", "1"))) } + +func TestWaveUseBinaryTreeOrdering(t *testing.T) { + assert.Equal(t, "false", UseBinaryTreeWaveOrdering(testingutils.NewPod())) + assert.Equal(t, "false", UseBinaryTreeWaveOrdering(testingutils.Annotate(testingutils.NewPod(), "argocd.argoproj.io/use-binary-tree-wave-ordering", "false"))) + assert.Equal(t, "true", UseBinaryTreeWaveOrdering(testingutils.Annotate(testingutils.NewPod(), "argocd.argoproj.io/use-binary-tree-wave-ordering", "true"))) +}