diff --git a/core/environment/environment.go b/core/environment/environment.go index 01a4116e..55171427 100644 --- a/core/environment/environment.go +++ b/core/environment/environment.go @@ -88,7 +88,8 @@ type Environment struct { callsPendingAwait map[string] /*await expression, trigger only*/ callable.CallsMap currentTransition string - autoStopTimer *time.Timer + autoStopTimer *time.Timer + autoStopCancelFcn context.CancelFunc } func (env *Environment) NotifyEvent(e event.DeviceEvent) { @@ -593,6 +594,7 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, log.WithField("partition", envId.String()). Debug("O2 End Completion time already set before after_GO_ERROR") } + env.invalidateAutoStopTransition() } errHooks = errors.Join(errHooks, env.handleHooksWithPositiveWeights(env.Workflow(), trigger)) @@ -959,8 +961,11 @@ func (env *Environment) runTasksAsHooks(hooksToTrigger task.Tasks) (errorMap map func (env *Environment) TryTransition(t Transition) (err error) { if !env.transitionMutex.TryLock() { log.WithField("partition", env.id.String()). - Warnf("environment transition attempt delayed: transition '%s' in progress. waiting for completion or failure", env.currentTransition) + Warnf("environment transition '%s' attempt delayed: transition '%s' in progress. waiting for completion or failure", t.eventName(), env.currentTransition) env.transitionMutex.Lock() + log.WithField("level", infologger.IL_Support). + WithField("partition", env.id.String()). + Infof("environment transition '%s' attempt resumed", t.eventName()) } defer env.transitionMutex.Unlock() @@ -1400,6 +1405,8 @@ func (env *Environment) scheduleAutoStopTransition() (scheduled bool, expected t } env.autoStopTimer = time.NewTimer(autoStopDuration) + ctx, cancel := context.WithCancel(context.Background()) + env.autoStopCancelFcn = cancel go func() { select { case <-env.autoStopTimer.C: @@ -1420,6 +1427,10 @@ func (env *Environment) scheduleAutoStopTransition() (scheduled bool, expected t } return } + case <-ctx.Done(): + log.WithField("partition", env.id). + WithField("run", env.currentRunNumber). + Debugf("Scheduled auto stop transition was cancelled") } }() @@ -1434,7 +1445,10 @@ func (env *Environment) scheduleAutoStopTransition() (scheduled bool, expected t func (env *Environment) invalidateAutoStopTransition() { // Only try to stop an initialized timer - if env.autoStopTimer != nil { - env.autoStopTimer.Stop() + if env.autoStopTimer == nil { + return + } + if env.autoStopTimer.Stop() && env.autoStopCancelFcn != nil { + env.autoStopCancelFcn() } } diff --git a/core/environment/manager.go b/core/environment/manager.go index 950629c1..f4ead298 100644 --- a/core/environment/manager.go +++ b/core/environment/manager.go @@ -431,7 +431,7 @@ func (envs *Manager) CreateEnvironment(workflowPath string, userVars map[string] goErrorKillDestroy := func(op string) { envState := env.CurrentState() log.WithField("state", envState). - WithField("environment", env.Id().String()). + WithField("partition", env.Id().String()). WithError(err). Warnf("auto-transitioning environment failed %s, cleanup in progress", op) @@ -586,6 +586,9 @@ func (envs *Manager) TeardownEnvironment(environmentId uid.ID, force bool) error log.WithField("partition", environmentId.String()). Warnf("environment teardown attempt delayed: transition '%s' in progress. waiting for completion or failure", env.currentTransition) env.transitionMutex.Lock() + log.WithField("level", infologger.IL_Support). + WithField("partition", environmentId.String()). + Infof("environment teardown attempt resumed") } defer env.transitionMutex.Unlock() @@ -1212,7 +1215,7 @@ func (envs *Manager) CreateAutoEnvironment(workflowPath string, userVars map[str env.Public, env.Description, err = parseWorkflowPublicInfo(workflowPath) if err != nil { log.WithField("public info", env.Public). - WithField("environment", env.Id().String()). + WithField("partition", env.Id().String()). WithError(err). Warn("parse workflow public info failed.") } @@ -1272,7 +1275,7 @@ func (envs *Manager) CreateAutoEnvironment(workflowPath string, userVars map[str envState := env.CurrentState() env.sendEnvironmentEvent(&event.EnvironmentEvent{Message: fmt.Sprintf("environment is in %s, handling ERROR", envState), EnvironmentID: env.Id().String(), Error: err}) log.WithField("state", envState). - WithField("environment", env.Id().String()). + WithField("partition", env.Id().String()). WithError(err). Warnf("auto-transitioning environment failed %s, cleanup in progress", op) diff --git a/core/task/manager.go b/core/task/manager.go index 6bf956e7..95b10a26 100644 --- a/core/task/manager.go +++ b/core/task/manager.go @@ -100,6 +100,7 @@ type Manager struct { schedulerState *schedulerState internalEventCh chan<- event.Event ackKilledTasks *safeAcks + killTasksMu sync.Mutex // to avoid races when attempting to kill the same tasks in different goroutines } func NewManager(shutdown func(), internalEventCh chan<- event.Event) (taskman *Manager, err error) { @@ -938,6 +939,7 @@ func (m *Manager) updateTaskState(taskId string, state string) { taskPtr := m.roster.getByTaskId(taskId) if taskPtr == nil { log.WithField("taskId", taskId). + WithField("state", state). Warn("attempted state update of task not in roster") return } @@ -1041,7 +1043,7 @@ func (m *Manager) Cleanup() (killed Tasks, running Tasks, err error) { // If the task list includes locked tasks, TaskNotFoundError is returned. func (m *Manager) KillTasks(taskIds []string) (killed Tasks, running Tasks, err error) { taskCanBeKilledFilter := func(t *Task) bool { - if t.IsLocked() { + if t.IsLocked() || m.ackKilledTasks.contains(t.taskId) { return false } for _, id := range taskIds { @@ -1052,13 +1054,18 @@ func (m *Manager) KillTasks(taskIds []string) (killed Tasks, running Tasks, err return false } + if !m.killTasksMu.TryLock() { + log.WithField("level", infologger.IL_Support).Warnf("Scheduling killing tasks was delayed until another goroutine finishes doing so") + m.killTasksMu.Lock() + log.WithField("level", infologger.IL_Support).Infof("Scheduling killing tasks is resumed") + } // TODO: use grouping instead of 2 passes of filtering for performance toKill := m.roster.filtered(taskCanBeKilledFilter) - unkillable := m.roster.filtered(func(t *Task) bool { return !taskCanBeKilledFilter(t) }) if len(toKill) < len(taskIds) { + unkillable := m.roster.filtered(func(t *Task) bool { return !taskCanBeKilledFilter(t) }) log.WithField("taskIds", strings.Join(unkillable.GetTaskIds(), ", ")). - Debugf("some tasks cannot be physically killed (already dead?), will instead only be removed from roster") + Debugf("some tasks cannot be physically killed (already dead or being killed in another goroutine?), will instead only be removed from roster") } for _, id := range toKill.GetTaskIds() { @@ -1066,6 +1073,8 @@ func (m *Manager) KillTasks(taskIds []string) (killed Tasks, running Tasks, err } killed, running, err = m.doKillTasks(toKill) + m.killTasksMu.Unlock() + for _, id := range killed.GetTaskIds() { ack, ok := m.ackKilledTasks.getValue(id) if ok { @@ -1087,6 +1096,7 @@ func (m *Manager) doKillTasks(tasks Tasks) (killed Tasks, running Tasks, err err inactiveTasks := tasks.Filtered(func(task *Task) bool { return task.status != ACTIVE }) + // Remove from the roster the tasks which are also in the inactiveTasks list to delete m.roster.updateTasks(m.roster.filtered(func(task *Task) bool { return !inactiveTasks.Contains(func(t *Task) bool {