diff --git a/core/task/manager.go b/core/task/manager.go index 587f6f7f..bbcd067f 100644 --- a/core/task/manager.go +++ b/core/task/manager.go @@ -100,6 +100,7 @@ type Manager struct { schedulerState *schedulerState internalEventCh chan<- event.Event ackKilledTasks *safeAcks + killTasksMu sync.Mutex // to avoid races when attempting to kill the same tasks in different goroutines } func NewManager(shutdown func(), internalEventCh chan<- event.Event) (taskman *Manager, err error) { @@ -1042,7 +1043,7 @@ func (m *Manager) Cleanup() (killed Tasks, running Tasks, err error) { // If the task list includes locked tasks, TaskNotFoundError is returned. func (m *Manager) KillTasks(taskIds []string) (killed Tasks, running Tasks, err error) { taskCanBeKilledFilter := func(t *Task) bool { - if t.IsLocked() { + if t.IsLocked() || m.ackKilledTasks.contains(t.taskId) { return false } for _, id := range taskIds { @@ -1053,13 +1054,18 @@ func (m *Manager) KillTasks(taskIds []string) (killed Tasks, running Tasks, err return false } + if !m.killTasksMu.TryLock() { + log.WithField("level", infologger.IL_Support).Warnf("Scheduling killing tasks was delayed until another goroutine is done doing so") + m.killTasksMu.Lock() + log.WithField("level", infologger.IL_Support).Infof("Scheduling killing tasks is resumed") + } // TODO: use grouping instead of 2 passes of filtering for performance toKill := m.roster.filtered(taskCanBeKilledFilter) - unkillable := m.roster.filtered(func(t *Task) bool { return !taskCanBeKilledFilter(t) }) if len(toKill) < len(taskIds) { + unkillable := m.roster.filtered(func(t *Task) bool { return !taskCanBeKilledFilter(t) }) log.WithField("taskIds", strings.Join(unkillable.GetTaskIds(), ", ")). - Debugf("some tasks cannot be physically killed (already dead?), will instead only be removed from roster") + Debugf("some tasks cannot be physically killed (already dead or being killed in another goroutine?), will instead only be removed from roster") } for _, id := range toKill.GetTaskIds() { @@ -1067,6 +1073,8 @@ func (m *Manager) KillTasks(taskIds []string) (killed Tasks, running Tasks, err } killed, running, err = m.doKillTasks(toKill) + m.killTasksMu.Unlock() + for _, id := range killed.GetTaskIds() { ack, ok := m.ackKilledTasks.getValue(id) if ok { @@ -1088,6 +1096,7 @@ func (m *Manager) doKillTasks(tasks Tasks) (killed Tasks, running Tasks, err err inactiveTasks := tasks.Filtered(func(task *Task) bool { return task.status != ACTIVE }) + // Remove from the roster the tasks which are also in the inactiveTasks list to delete m.roster.updateTasks(m.roster.filtered(func(task *Task) bool { return !inactiveTasks.Contains(func(t *Task) bool {