Skip to content

Commit

Permalink
fix: use optimistic concurrency strategy when updating pod status (ar…
Browse files Browse the repository at this point in the history
…goproj#12632)

Signed-off-by: Atsushi Sakai <[email protected]>
  • Loading branch information
sakai-ast authored Feb 26, 2024
1 parent 7a23cb0 commit 8830456
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 56 deletions.
82 changes: 30 additions & 52 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"os"
"strconv"
"strings"
gosync "sync"
"syscall"
"time"
Expand Down Expand Up @@ -37,6 +36,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
apiwatch "k8s.io/client-go/tools/watch"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"

"github.com/argoproj/argo-workflows/v3"
Expand Down Expand Up @@ -151,7 +151,6 @@ type WorkflowController struct {
executorPlugins map[string]map[string]*spec.Plugin // namespace -> name -> plugin

recentCompletions recentCompletions
podNameLocks *gosync.Map
}

type PatchOperation struct {
Expand Down Expand Up @@ -211,7 +210,6 @@ func NewWorkflowController(ctx context.Context, restConfig *rest.Config, kubecli
eventRecorderManager: events.NewEventRecorderManager(kubeclientset),
progressPatchTickDuration: env.LookupEnvDurationOr(common.EnvVarProgressPatchTickDuration, 1*time.Minute),
progressFileTickDuration: env.LookupEnvDurationOr(common.EnvVarProgressFileTickDuration, 3*time.Second),
podNameLocks: &gosync.Map{},
}

if executorPlugins {
Expand Down Expand Up @@ -552,20 +550,13 @@ func (wfc *WorkflowController) processNextPodCleanupItem(ctx context.Context) bo
return err
}
case labelPodCompleted:
// Escape for JSON Pointer https://datatracker.ietf.org/doc/html/rfc6901#section-3
escaped := strings.ReplaceAll(common.LabelKeyCompleted, "/", "~1")
patch := PatchOperation{
Operation: "replace",
Path: fmt.Sprintf("/metadata/labels/%s", escaped),
Value: "true",
}
pods := wfc.kubeclientset.CoreV1().Pods(namespace)
if err := wfc.enablePodForDeletion(ctx, pods, namespace, podName, patch); err != nil {
if err := wfc.enablePodForDeletion(ctx, pods, namespace, podName, true); err != nil {
return err
}
case deletePod:
pods := wfc.kubeclientset.CoreV1().Pods(namespace)
if err := wfc.enablePodForDeletion(ctx, pods, namespace, podName); err != nil {
if err := wfc.enablePodForDeletion(ctx, pods, namespace, podName, false); err != nil {
return err
}
propagation := metav1.DeletePropagationBackground
Expand Down Expand Up @@ -611,55 +602,42 @@ func (wfc *WorkflowController) getPodFromCache(namespace string, podName string)
return pod, nil
}

func (wfc *WorkflowController) enablePodForDeletion(ctx context.Context, pods typedv1.PodInterface, namespace string, podName string, extraPatches ...PatchOperation) error {
podNameLock, _ := wfc.podNameLocks.LoadOrStore(podName, &gosync.Mutex{})
podNameMutex := podNameLock.(*gosync.Mutex)
func (wfc *WorkflowController) enablePodForDeletion(ctx context.Context, pods typedv1.PodInterface, namespace string, podName string, isCompleted bool) error {
// Get current Pod from K8S and update it to remove finalizer, and if the Pod was completed, set the Label
// In the case that the Pod changed in between Get and Update, we'll get a conflict error and can try again
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
currentPod, err := wfc.getPodFromAPI(ctx, namespace, podName)
if err != nil {
return err
}
updatedPod := currentPod.DeepCopy()

podNameMutex.Lock()
defer func() {
podNameMutex.Unlock()
wfc.podNameLocks.Delete(podName)
}()
if isCompleted {
if updatedPod.Labels == nil {
updatedPod.Labels = make(map[string]string)
}
updatedPod.Labels[common.LabelKeyCompleted] = "true"
}

var patches []PatchOperation
pod, err := wfc.getPodFromAPI(ctx, namespace, podName)
if err != nil {
updatedPod.Finalizers = removeFinalizer(updatedPod.Finalizers, common.FinalizerPodStatus)

_, err = pods.Update(ctx, updatedPod, metav1.UpdateOptions{})
return err
}
patch := createFinalizerRemovalPatchIfExists(pod, common.FinalizerPodStatus)
if patch != nil {
patches = append(patches, *patch)
}
patches = append(patches, extraPatches...)
if err := applyPatches(ctx, pods, pod.Name, patches); err != nil {
})
if err != nil {
return err
}
return nil
}

func createFinalizerRemovalPatchIfExists(pod *apiv1.Pod, targetFinalizer string) *PatchOperation {
i := slices.Index(pod.Finalizers, targetFinalizer)
if i >= 0 {
return &PatchOperation{
Operation: "remove",
Path: fmt.Sprintf("/metadata/finalizers/%d", i),
func removeFinalizer(finalizers []string, targetFinalizer string) []string {
var updatedFinalizers []string
for _, finalizer := range finalizers {
if finalizer != targetFinalizer {
updatedFinalizers = append(updatedFinalizers, finalizer)
}
}
return nil
}

func applyPatches(ctx context.Context, pods typedv1.PodInterface, podName string, patches []PatchOperation) error {
if len(patches) == 0 {
log.WithField("podName", podName).Debug("not patching pod")
return nil
}
data, err := json.Marshal(patches)
if err != nil {
return fmt.Errorf("failed to marshal patch: %w", err)
}
log.WithFields(log.Fields{"podName": podName, "data": string(data)}).Debug("patching pod")
_, err = pods.Patch(ctx, podName, types.JSONPatchType, data, metav1.PatchOptions{})
return err
return updatedFinalizers
}

func (wfc *WorkflowController) signalContainers(namespace string, podName string, sig syscall.Signal) (time.Duration, error) {
Expand Down Expand Up @@ -1073,7 +1051,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context)
log.WithError(err).Error("Failed to list pods")
}
for _, p := range podList.Items {
if err := wfc.enablePodForDeletion(ctx, pods, p.Namespace, p.Name); err != nil {
if err := wfc.enablePodForDeletion(ctx, pods, p.Namespace, p.Name, false); err != nil {
log.WithError(err).Error("Failed to enable pod for deletion")
}
}
Expand Down
6 changes: 2 additions & 4 deletions workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ package controller

import (
"context"
gosync "sync"
"testing"
"time"

"k8s.io/apimachinery/pkg/api/resource"

syncpkg "github.com/argoproj/pkg/sync"
"github.com/argoproj/pkg/sync"
"github.com/stretchr/testify/assert"
authorizationv1 "k8s.io/api/authorization/v1"
apiv1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -284,7 +283,7 @@ func newController(options ...interface{}) (context.CancelFunc, *WorkflowControl
kubeclientset: kube,
dynamicInterface: dynamicClient,
wfclientset: wfclientset,
workflowKeyLock: syncpkg.NewKeyLock(),
workflowKeyLock: sync.NewKeyLock(),
wfArchive: sqldb.NullWorkflowArchive,
hydrator: hydratorfake.Noop,
estimatorFactory: estimation.DummyEstimatorFactory,
Expand All @@ -294,7 +293,6 @@ func newController(options ...interface{}) (context.CancelFunc, *WorkflowControl
progressPatchTickDuration: envutil.LookupEnvDurationOr(common.EnvVarProgressPatchTickDuration, 1*time.Minute),
progressFileTickDuration: envutil.LookupEnvDurationOr(common.EnvVarProgressFileTickDuration, 3*time.Second),
maxStackDepth: maxAllowedStackDepth,
podNameLocks: &gosync.Map{},
}

for _, opt := range options {
Expand Down

0 comments on commit 8830456

Please sign in to comment.