Skip to content

Commit

Permalink
[core] OCTRL-912 Cancel calls pending await upon environment destruction
Browse files Browse the repository at this point in the history
This addressed the problem of having stuck calls waiting for something to receive from c.await which outlive the parent environments.
This may happen if awaited trigger never occurs after it has been started, e.g. if trigger is before_START_ACTIVITY and await is after_START_ACTIVITY, but the environments fails to transition to RUNNING.
  • Loading branch information
knopers8 committed Aug 20, 2024
1 parent e101416 commit 865db66
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 2 deletions.
1 change: 1 addition & 0 deletions core/environment/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,7 @@ func (env *Environment) handleHooks(workflow workflow.Role, trigger string, weig
// respected.

callErrors = pendingCalls.AwaitAll()
delete(env.callsPendingAwait[trigger], weight)
}
}

Expand Down
32 changes: 32 additions & 0 deletions core/environment/hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/AliceO2Group/Control/common/utils/uid"
"github.com/AliceO2Group/Control/core/task"
"github.com/AliceO2Group/Control/core/workflow"
"github.com/AliceO2Group/Control/core/workflow/callable"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
Expand Down Expand Up @@ -459,4 +460,35 @@ var _ = Describe("calling hooks on FSM events", func() {
Expect(ok).To(BeTrue())
Expect(v).To(Equal("root.call1,root.call2,root.call3"))
})

It("should allow to cancel hooks in case that await trigger never happens", func() {
env.workflow = workflow.NewAggregatorRole("root", []workflow.Role{
workflow.NewCallRole(
"call1", // this call should return immediately and should not be accessible later
task.Traits{Trigger: "before_CONFIGURE", Timeout: "5s", Critical: true, Await: "before_CONFIGURE"},
"testplugin.Test()",
""),
workflow.NewCallRole(
"call2", // this call should not return, but should be cancelled later
task.Traits{Trigger: "before_CONFIGURE", Timeout: "5s", Critical: true, Await: "after_CONFIGURE"},
"testplugin.Test()",
"")})
workflow.LinkChildrenToParents(env.workflow)
env.Sm.SetState("DEPLOYED")

err := env.Sm.Event(context.Background(), "CONFIGURE", NewDummyTransition("CONFIGURE", true))
Expect(err).To(HaveOccurred())

callMapForAwait := env.callsPendingAwait
Expect(callMapForAwait).To(HaveKey("after_CONFIGURE"))
callsForWeight := callMapForAwait["after_CONFIGURE"]
Expect(callsForWeight).To(HaveKey(callable.HookWeight(0)))
calls := callsForWeight[0]
Expect(calls).To(HaveLen(1))
Expect(calls[0]).NotTo(BeNil())
// the first cancel attempt should return "true" to say it was successful
Expect(calls[0].Cancel()).To(BeTrue())
// the subsequent cancel attempts should return "false", because the call was already cancelled
Expect(calls[0].Cancel()).To(BeFalse())
})
})
18 changes: 18 additions & 0 deletions core/environment/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,8 @@ func (envs *Manager) TeardownEnvironment(environmentId uid.ID, force bool) error
}
}

envs.cancelCallsPendingAwait(env)

// we remake the pending teardown channel too, because each completed TasksReleasedEvent
// automatically closes it
pendingCh = make(chan *event.TasksReleasedEvent)
Expand Down Expand Up @@ -880,6 +882,22 @@ func (envs *Manager) TeardownEnvironment(environmentId uid.ID, force bool) error
return err
}

func (envs *Manager) cancelCallsPendingAwait(env *Environment) {
// unblock all calls which are stuck waiting for an await trigger which never happened
if env == nil {
return
}
for _, callMapForAwait := range env.callsPendingAwait {
for _, callsForWeight := range callMapForAwait {
for _, call := range callsForWeight {
if call != nil {
call.Cancel()
}
}
}
}
}

/*func (envs *Manager) Configuration(environmentId uuid.UUID) EnvironmentCfg {
envs.mu.RLock()
defer envs.mu.RUnlock()
Expand Down
21 changes: 19 additions & 2 deletions core/workflow/callable/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package callable

import (
"context"
"errors"
"fmt"
"strconv"
Expand Down Expand Up @@ -54,7 +55,8 @@ type Call struct {
Traits task.Traits
parentRole ParentRole

await chan error
await chan error
awaitCancel context.CancelFunc
}

type Calls []*Call
Expand Down Expand Up @@ -220,11 +222,17 @@ func (c *Call) Call() error {

func (c *Call) Start() {
c.await = make(chan error)
ctx, cancel := context.WithCancel(context.Background())
c.awaitCancel = cancel
go func() {
callId := fmt.Sprintf("hook:%s:%s", c.GetTraits().Trigger, c.GetName())
log.Debugf("%s started", callId)
defer utils.TimeTrack(time.Now(), callId, log.WithPrefix("callable"))
c.await <- c.Call()
select {
case c.await <- c.Call():
case <-ctx.Done():
log.Debugf("%s cancelled", callId)
}
close(c.await)
}()
}
Expand All @@ -234,6 +242,15 @@ func (c *Call) Await() error {
return <-c.await
}

func (c *Call) Cancel() bool {
if c.awaitCancel != nil {
c.awaitCancel()
c.awaitCancel = nil
return true
}
return false
}

func (c *Call) GetParentRole() interface{} {
return c.parentRole
}
Expand Down

0 comments on commit 865db66

Please sign in to comment.