From 77ba212350fad65f0a6742a6fd38ded35906a818 Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Fri, 13 Sep 2024 12:07:27 +0200 Subject: [PATCH] [core] OCTRL-920 safer concurrency in KillTasks Parallel attempts to kill tasks were found to be the primary cause for stuck auto-environments. In particular, it was due to channels in ackKilledTasks (and handling them) not expecting multiple listeners, so either one of the two kill acknowledgments would be stuck waiting for the acknowledgment to be received, or the other side, waiting for acknowledgment would never get it. It would cause KillTasks to be stuck indefinitely, which blocks the main auto-environment code-path. --- core/task/manager.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/core/task/manager.go b/core/task/manager.go index 587f6f7f..bbcd067f 100644 --- a/core/task/manager.go +++ b/core/task/manager.go @@ -100,6 +100,7 @@ type Manager struct { schedulerState *schedulerState internalEventCh chan<- event.Event ackKilledTasks *safeAcks + killTasksMu sync.Mutex // to avoid races when attempting to kill the same tasks in different goroutines } func NewManager(shutdown func(), internalEventCh chan<- event.Event) (taskman *Manager, err error) { @@ -1042,7 +1043,7 @@ func (m *Manager) Cleanup() (killed Tasks, running Tasks, err error) { // If the task list includes locked tasks, TaskNotFoundError is returned. func (m *Manager) KillTasks(taskIds []string) (killed Tasks, running Tasks, err error) { taskCanBeKilledFilter := func(t *Task) bool { - if t.IsLocked() { + if t.IsLocked() || m.ackKilledTasks.contains(t.taskId) { return false } for _, id := range taskIds { @@ -1053,13 +1054,18 @@ func (m *Manager) KillTasks(taskIds []string) (killed Tasks, running Tasks, err return false } + if !m.killTasksMu.TryLock() { + log.WithField("level", infologger.IL_Support).Warnf("Scheduling killing tasks was delayed until another goroutine is done doing so") + m.killTasksMu.Lock() + log.WithField("level", infologger.IL_Support).Infof("Scheduling killing tasks is resumed") + } // TODO: use grouping instead of 2 passes of filtering for performance toKill := m.roster.filtered(taskCanBeKilledFilter) - unkillable := m.roster.filtered(func(t *Task) bool { return !taskCanBeKilledFilter(t) }) if len(toKill) < len(taskIds) { + unkillable := m.roster.filtered(func(t *Task) bool { return !taskCanBeKilledFilter(t) }) log.WithField("taskIds", strings.Join(unkillable.GetTaskIds(), ", ")). - Debugf("some tasks cannot be physically killed (already dead?), will instead only be removed from roster") + Debugf("some tasks cannot be physically killed (already dead or being killed in another goroutine?), will instead only be removed from roster") } for _, id := range toKill.GetTaskIds() { @@ -1067,6 +1073,8 @@ func (m *Manager) KillTasks(taskIds []string) (killed Tasks, running Tasks, err } killed, running, err = m.doKillTasks(toKill) + m.killTasksMu.Unlock() + for _, id := range killed.GetTaskIds() { ack, ok := m.ackKilledTasks.getValue(id) if ok { @@ -1088,6 +1096,7 @@ func (m *Manager) doKillTasks(tasks Tasks) (killed Tasks, running Tasks, err err inactiveTasks := tasks.Filtered(func(task *Task) bool { return task.status != ACTIVE }) + // Remove from the roster the tasks which are also in the inactiveTasks list to delete m.roster.updateTasks(m.roster.filtered(func(task *Task) bool { return !inactiveTasks.Contains(func(t *Task) bool {