Skip to content

Commit

Permalink
fix: pipeline pausing race conditions of draining and terminating sou…
Browse files Browse the repository at this point in the history
…rce (#2131)

Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid authored Oct 9, 2024
1 parent d340a4e commit 206ff7f
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 28 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ require (
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.26.0
golang.org/x/crypto v0.27.0
golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc
golang.org/x/net v0.29.0
golang.org/x/oauth2 v0.21.0
golang.org/x/sync v0.8.0
Expand Down Expand Up @@ -197,7 +198,6 @@ require (
github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 // indirect
go.mongodb.org/mongo-driver v1.15.0 // indirect
golang.org/x/arch v0.7.0 // indirect
golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc // indirect
golang.org/x/mod v0.20.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/term v0.24.0 // indirect
Expand Down
79 changes: 52 additions & 27 deletions pkg/reconciler/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/imdario/mergo"
"go.uber.org/zap"
"golang.org/x/exp/maps"
appv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -181,8 +182,8 @@ func (r *pipelineReconciler) reconcile(ctx context.Context, pl *dfv1.Pipeline) (
}

// check if any changes related to pause/resume lifecycle for the pipeline
if isLifecycleChange(pl) {
oldPhase := pl.Status.Phase
oldPhase := pl.Status.Phase
if isLifecycleChange(pl) && oldPhase != pl.Spec.Lifecycle.GetDesiredPhase() {
requeue, err := r.updateDesiredState(ctx, pl)
if err != nil {
logMsg := fmt.Sprintf("Updated desired pipeline phase failed: %v", zap.Error(err))
Expand Down Expand Up @@ -611,7 +612,7 @@ func buildVertices(pl *dfv1.Pipeline) map[string]dfv1.Vertex {
copyVertexTemplate(pl, vCopy)
copyVertexLimits(pl, vCopy)
replicas := int32(1)
// If the desired phase is paused or we are in the middle of pausing we should not start any vertex replicas
// If the desired phase is paused, or we are in the middle of pausing we should not start any vertex replicas
if isLifecycleChange(pl) {
replicas = int32(0)
} else if v.IsReduceUDF() {
Expand Down Expand Up @@ -830,39 +831,48 @@ func (r *pipelineReconciler) resumePipeline(ctx context.Context, pl *dfv1.Pipeli
}

func (r *pipelineReconciler) pausePipeline(ctx context.Context, pl *dfv1.Pipeline) (bool, error) {
// check that annotations / pause timestamp annotation exist
var (
drainCompleted = false
daemonClient daemonclient.DaemonClient
errWhileDrain error
)
pl.Status.MarkPhasePausing()

if pl.GetAnnotations() == nil || pl.GetAnnotations()[dfv1.KeyPauseTimestamp] == "" {
_, err := r.scaleDownSourceVertices(ctx, pl)
if err != nil {
// If there's an error requeue the request
return true, err
}
patchJson := `{"metadata":{"annotations":{"` + dfv1.KeyPauseTimestamp + `":"` + time.Now().Format(time.RFC3339) + `"}}}`
if err := r.client.Patch(ctx, pl, client.RawPatch(types.MergePatchType, []byte(patchJson))); err != nil && !apierrors.IsNotFound(err) {
if err = r.client.Patch(ctx, pl, client.RawPatch(types.MergePatchType, []byte(patchJson))); err != nil && !apierrors.IsNotFound(err) {
return true, err
}
// This is to give some time to process the new messages,
// otherwise check IsDrained directly may get incorrect information
return true, nil
}

pl.Status.MarkPhasePausing()
updated, err := r.scaleDownSourceVertices(ctx, pl)
if err != nil || updated {
// If there's an error, or scaling down happens, requeue the request
// This is to give some time to process the new messages, otherwise check IsDrained directly may get incorrect information
return updated, err
}

var daemonError error
var drainCompleted = false

// Check if all the source vertex pods have scaled down to zero
sourcePodsTerminated, err := r.noSourceVertexPodsRunning(ctx, pl)
// If the sources have scaled down successfully then check for the buffer information.
// Check for the daemon to obtain the buffer draining information, in case we see an error trying to
// retrieve this we do not exit prematurely to allow honoring the pause timeout for a consistent error
// - In case the timeout has not occurred we would trigger a requeue
// - If the timeout has occurred even after getting the drained error, we will try to pause the pipeline
daemonClient, daemonError := daemonclient.NewGRPCDaemonServiceClient(pl.GetDaemonServiceURL())
if daemonClient != nil {
defer func() {
_ = daemonClient.Close()
}()
drainCompleted, err = daemonClient.IsDrained(ctx, pl.Name)
if err != nil {
daemonError = err
if sourcePodsTerminated {
daemonClient, err = daemonclient.NewGRPCDaemonServiceClient(pl.GetDaemonServiceURL())
if daemonClient != nil {
defer func() {
_ = daemonClient.Close()
}()
drainCompleted, err = daemonClient.IsDrained(ctx, pl.Name)
}
}
if err != nil {
errWhileDrain = err
}

pauseTimestamp, err := time.Parse(time.RFC3339, pl.GetAnnotations()[dfv1.KeyPauseTimestamp])
if err != nil {
return false, err
Expand All @@ -874,8 +884,8 @@ func (r *pipelineReconciler) pausePipeline(ctx context.Context, pl *dfv1.Pipelin
if err != nil {
return true, err
}
if daemonError != nil {
r.logger.Errorw("Error in fetching Drained status, Pausing due to timeout", zap.Error(daemonError))
if errWhileDrain != nil {
r.logger.Errorw("Errors encountered while pausing, moving to paused after timeout", zap.Error(errWhileDrain))
}
// if the drain completed successfully, then set the DrainedOnPause field to true
if drainCompleted {
Expand All @@ -884,7 +894,20 @@ func (r *pipelineReconciler) pausePipeline(ctx context.Context, pl *dfv1.Pipelin
pl.Status.MarkPhasePaused()
return false, nil
}
return true, daemonError
return true, err
}

// noSourceVertexPodsRunning checks whether any source vertex has running replicas
func (r *pipelineReconciler) noSourceVertexPodsRunning(ctx context.Context, pl *dfv1.Pipeline) (bool, error) {
sources := pl.Spec.GetSourcesByName()
pods := corev1.PodList{}
label := fmt.Sprintf("%s=%s, %s in (%s)", dfv1.KeyPipelineName, pl.Name,
dfv1.KeyVertexName, strings.Join(maps.Keys(sources), ","))
selector, _ := labels.Parse(label)
if err := r.client.List(ctx, &pods, &client.ListOptions{Namespace: pl.Namespace, LabelSelector: selector}); err != nil {
return false, err
}
return len(pods.Items) == 0, nil
}

func (r *pipelineReconciler) scaleDownSourceVertices(ctx context.Context, pl *dfv1.Pipeline) (bool, error) {
Expand Down Expand Up @@ -965,6 +988,8 @@ func (r *pipelineReconciler) checkChildrenResourceStatus(ctx context.Context, pi
return
}
}
// if all conditions are True, clear the status message.
pipeline.Status.Message = ""
}()

// get the daemon deployment and update the status of it to the pipeline
Expand Down

0 comments on commit 206ff7f

Please sign in to comment.