-
Notifications
You must be signed in to change notification settings - Fork 239
Don't heartbeat local activity on non-graceful worker shutdown #1933
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Don't heartbeat local activity on non-graceful worker shutdown #1933
Conversation
internal/internal_task_pollers.go
Outdated
// Non-server initiated cancellations | ||
// Note: We send ErrCanceled as the errType, so we can catch this scenario of cancellation | ||
// when handling the result | ||
return &localActivityResult{err: NewApplicationError(context.Cause(ctx).Error(), ErrCanceled.Error(), false, nil), task: task} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assumed we wouldn't want to continue heartbeating on any non-server cancel, not just worker shutdown
internal/internal_task_handlers.go
Outdated
|
||
// When local activity is canceled due to non-server cancel, we break loop here | ||
// to avoid heartbeating after local activity is no longer being run | ||
var appErr *ApplicationError | ||
if errors.As(lar.err, &appErr) { | ||
// AppErr with ErrCanceled errType are non-server initiated cancellations. | ||
if appErr.errType == ErrCanceled.Error() { | ||
break processWorkflowLoop | ||
} | ||
} | ||
|
||
response, err = workflowContext.ProcessLocalActivityResult(workflowTask, lar) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need some help understanding the issue. Why don't we want the workflow to heartbeat until the local activity has completed? Can you help me understand what logic below without this new change was causing ProcessLocalActivityResult
to return both a nil response
and err
causing us to continue to wait for local activity? Wouldn't the retryLocalActivity
be false since this is a canceled error and therefore err
be non-nil?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
retryLocalActivity
is false, but w.hasPendingLocalActivityWork()
in CompleteWorkflowTask
is true, causing response
and err
to be nil and to loop back
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is hasPendingLocalActivityWork
still true when the local activity is done? Hrmm, what if hasPendingLocalActivityWork
is legitimately true because another local activity is still pending. My concern now is what if I have two local activities running and only one has responded with cancel and the other is still processing its cleanup logic? This breaks out of the whole loop even though another local activity may finish with success. I wonder if we should just consider this local activity as no longer pending.
I am now generally curious about concurrent local activities with one failing with the cancel and one succeeding what would be recorded in the workflow task? What if the success came before the cancel one but both before any heartbeat occurred, does that mean only the one success marker is captured? Is that safe? (sorry, my heading is getting a bit lost thinking about it).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do multiple local activities matter if the worker is shutting down? Isn't it all moot, unless the WFT happens to finish as the worker shuts down?
internal/internal_task_handlers.go
Outdated
var appErr *ApplicationError | ||
if errors.As(lar.err, &appErr) { | ||
// AppErr with ErrCanceled errType are non-server initiated cancellations. | ||
if appErr.errType == ErrCanceled.Error() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A user can return this error to no? and a user doesn't have to return this error when the local activity worker is shut down so I am not sure this is the correct way to detect this condition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it's a viable fix to somehow pass a custom error through laResultCh
altogether? That seems like the easiest way to communicate that the worker has shutdown and we can stop processing this WFT
a user doesn't have to return this error when the local activity worker is shut down
My understanding is when worker shuts down, it cancels context and the user's code just stops running. Are you saying the user can handle that context cancellation and return their own custom error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess does the workflow worker need to receive the signal from the laResultCh
? shutdown comes from the top down to both workers
My understanding is when worker shuts down, it cancels context and the user's code just stops running.
Kind of, we cancel the context once the shutdown time has elapsed. Even when the context is cancelled no function if Go can just stop executing, it always needs to return something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shutdown comes from the top down to both workers
I don't think it does in this scenario. If I don't return to laResultCh
, the heartbeatTimer.C
keeps firing and the test continues to run for the full 15 seconds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh actually, I can probably use workflowContext.laTunnel.stopCh
} | ||
if workflowTask == nil { | ||
select { | ||
case <-workflowContext.laTunnel.stopCh: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this channel close when the worker is shutting down during graceful shutdown? What happens when a local activity competes during graceful shutdown? Will the result be ignored?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, this channel should now only close after local activities finish running, from #1875.
// We want a separate stop channel for local activities because when a worker shuts down,
// we need to allow pending local activities to finish running for that workflow task.
// After all pending local activities are handled, we then close the local activity stop channel.
laStopChannel := make(chan struct{})
laParams := params
laParams.WorkerStopChannel = laStopChannel
// laTunnel is the glue that hookup 3 parts
laTunnel := newLocalActivityTunnel(getReadOnlyChannel(laStopChannel))
Also added another integration test validating this behavior
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't the laStopChannel
closed when the workflow worker stop is finished?
func (ww *workflowWorker) Stop() {
close(ww.stopC)
// TODO: remove the stop methods in favor of the workerStopChannel
ww.worker.Stop()
close(ww.localActivityStopC)
ww.localActivityWorker.Stop()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Talked offline this should be fine, maybe just add a comment explaining what is happening here for future readers.
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
localActivityFn := func(ctx context.Context) error { | ||
time.Sleep(300 * time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we use the worker shutdown channel here to avoid any potential timing race? Basically complete after the shutdown channel is closed so we know the activity completed after the worker was requested to shutdown
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like because of #1875, now the LA worker stop channel is ww.localActivityStopC
, which only closes after pending LAs are complete.
I can work on a separate PR to see if I can fix GetWorkerStopChannel
to return the worker shutdown channel instead of the LA shutdown channel, and in that PR switch these and some prev tests i wrote to use GetWorkerStopChannel instead of a manual wait
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you open up an issue for this and add a TODO here? We will see if we can pull it in next sprint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, added #1963
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw, no need to wait for my review/approval as this is a bit too deep for me to dig back into, @Quinn-With-Two-Ns's review is enough IMO
What was changed
Stop local activity from heartbeating after worker has shutdown
Why?
They shouldn't heartbeat after they've been canceled due to non-graceful shutdown
Checklist
Closes ProcessWorkflowTask is not stopped on worker.Stop() #1706
How was this tested:
added integration test