Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(planstate): make plan propagation safer #474

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions internals/cli/cmd_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,16 +199,6 @@ func runDaemon(rcmd *cmdRun, ch chan os.Signal, ready chan<- func()) error {
return err
}

if rcmd.Args != nil {
mappedArgs, err := convertArgs(rcmd.Args)
if err != nil {
return err
}
if err := d.SetServiceArgs(mappedArgs); err != nil {
return err
}
}

// Run sanity check now, if anything goes wrong with the
// check we go into "degraded" mode where we always report
// the given error to any client.
Expand All @@ -227,6 +217,20 @@ func runDaemon(rcmd *cmdRun, ch chan os.Signal, ready chan<- func()) error {
return err
}

// When we reach this point, we know every overlord manager has received a
// state engine StartUp() and at least one Ensure() call. This also guarantees
// the plan has been loaded and propagated to every change listener.

if rcmd.Args != nil {
mappedArgs, err := convertArgs(rcmd.Args)
if err != nil {
return err
}
if err := d.SetServiceArgs(mappedArgs); err != nil {
Copy link
Contributor Author

@flotter flotter Aug 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SetServiceArgs requires the service manager plan so we have to move it down slightly. Care must be taken to not make direct Plan Manager calls before the state engine is running, as the lock-less load happens during this time. We could add the lock for the initial load / propagate if really needed.

return err
}
}

watchdog, err := runWatchdog(d)
if err != nil {
return fmt.Errorf("cannot run software watchdog: %v", err)
Expand Down
7 changes: 7 additions & 0 deletions internals/daemon/api_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (s *apiSuite) TestServicesStart(c *C) {
writeTestLayer(s.pebbleDir, servicesLayer)
d := s.daemon(c)
st := d.overlord.State()
s.startOverlord()

soon := 0
restore := FakeStateEnsureBefore(func(st *state.State, d time.Duration) {
Expand Down Expand Up @@ -120,6 +121,7 @@ func (s *apiSuite) TestServicesStop(c *C) {
writeTestLayer(s.pebbleDir, servicesLayer)
d := s.daemon(c)
st := d.overlord.State()
s.startOverlord()

soon := 0
restore := FakeStateEnsureBefore(func(st *state.State, d time.Duration) {
Expand Down Expand Up @@ -167,6 +169,7 @@ func (s *apiSuite) TestServicesAutoStart(c *C) {
writeTestLayer(s.pebbleDir, servicesLayer)
d := s.daemon(c)
st := d.overlord.State()
s.startOverlord()

soon := 0
restore := FakeStateEnsureBefore(func(st *state.State, d time.Duration) {
Expand Down Expand Up @@ -209,6 +212,7 @@ func (s *apiSuite) TestServicesGet(c *C) {
// Setup
writeTestLayer(s.pebbleDir, servicesLayer)
s.daemon(c)
s.startOverlord()

// Execute
req, err := http.NewRequest("GET", "/v1/services", nil)
Expand Down Expand Up @@ -238,6 +242,7 @@ func (s *apiSuite) TestServicesRestart(c *C) {
writeTestLayer(s.pebbleDir, servicesLayer)
d := s.daemon(c)
st := d.overlord.State()
s.startOverlord()

soon := 0
restore := FakeStateEnsureBefore(func(st *state.State, d time.Duration) {
Expand Down Expand Up @@ -287,6 +292,7 @@ func (s *apiSuite) TestServicesReplan(c *C) {
writeTestLayer(s.pebbleDir, servicesLayer)
d := s.daemon(c)
st := d.overlord.State()
s.startOverlord()

soon := 0
restore := FakeStateEnsureBefore(func(st *state.State, d time.Duration) {
Expand Down Expand Up @@ -334,6 +340,7 @@ services:
`)
d := s.daemon(c)
st := d.overlord.State()
s.startOverlord()
restore := FakeStateEnsureBefore(func(st *state.State, d time.Duration) {})
defer restore()

Expand Down
5 changes: 4 additions & 1 deletion internals/daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1345,7 +1345,10 @@ func (s *daemonSuite) TestWritesRequireAdminAccess(c *C) {
}

func (s *daemonSuite) TestAPIAccessLevels(c *C) {
_ = s.newDaemon(c)
d := s.newDaemon(c)
d.Init()
c.Assert(d.Start(), IsNil)
defer d.Stop(nil)

tests := []struct {
method string
Expand Down
10 changes: 1 addition & 9 deletions internals/overlord/checkstate/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"reflect"
"sort"
"sync"
"sync/atomic"

"gopkg.in/tomb.v2"

Expand All @@ -38,8 +37,7 @@ const (

// CheckManager starts and manages the health checks.
type CheckManager struct {
state *state.State
ensureDone atomic.Bool
Copy link
Contributor Author

@flotter flotter Aug 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic, which would also have been needed in derived project managers, can now safely be removed. The race is prevented in the overlord.

state *state.State

failureHandlers []FailureFunc

Expand Down Expand Up @@ -87,7 +85,6 @@ func NewManager(s *state.State, runner *state.TaskRunner) *CheckManager {
}

func (m *CheckManager) Ensure() error {
m.ensureDone.Store(true)
return nil
}

Expand Down Expand Up @@ -164,11 +161,6 @@ func (m *CheckManager) PlanChanged(newPlan *plan.Plan) {
shouldEnsure = true
}
}
if !m.ensureDone.Load() {
// Can't call EnsureBefore before Overlord.Loop is running (which will
// call m.Ensure for the first time).
return
}
if shouldEnsure {
m.state.EnsureBefore(0) // start new tasks right away
}
Expand Down
44 changes: 28 additions & 16 deletions internals/overlord/overlord.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"

"github.com/canonical/x-go/randutil"
Expand Down Expand Up @@ -89,7 +88,7 @@ type Overlord struct {
ensureLock sync.Mutex
ensureTimer *time.Timer
ensureNext time.Time
ensureRun int32
ensureRun chan struct{}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am extending an existing feature here to also allow for waiting, not only checking if ensure ran already.

pruneTicker *time.Ticker

startOfOperationTime time.Time
Expand All @@ -114,6 +113,7 @@ func New(opts *Options) (*Overlord, error) {
o := &Overlord{
pebbleDir: opts.PebbleDir,
loopTomb: new(tomb.Tomb),
ensureRun: make(chan struct{}),
inited: true,
extension: opts.Extension,
}
Expand Down Expand Up @@ -208,14 +208,6 @@ func New(opts *Options) (*Overlord, error) {
// before it.
o.stateEng.AddManager(o.runner)

// Load the plan from the Pebble layers directory (which may be missing
// or have no layers, resulting in an empty plan), and propagate PlanChanged
// notifications to all notification subscribers.
err = o.planMgr.Load()
if err != nil {
return nil, fmt.Errorf("cannot load plan: %w", err)
}

return o, nil
}

Expand Down Expand Up @@ -379,15 +371,34 @@ func (o *Overlord) Loop() {
}
}
})
o.ensureWaitRun()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am also discussing this proposal with @pedronis in a separate thread, to make sure this is compatible with snapd.

}

func (o *Overlord) ensureDidRun() {
atomic.StoreInt32(&o.ensureRun, 1)
select {
case <-o.ensureRun:
// Already closed. Ensure already ran at least once.
default:
close(o.ensureRun)
}
}

func (o *Overlord) CanStandby() bool {
run := atomic.LoadInt32(&o.ensureRun)
return run != 0
// ensureWaitRun waits until StateEngine.Ensure() was called at least once.
func (o *Overlord) ensureWaitRun() {
flotter marked this conversation as resolved.
Show resolved Hide resolved
select {
case <-o.ensureRun:
case <-o.loopTomb.Dying():
}
}

func (o *Overlord) CanStandby() (ensured bool) {
select {
case <-o.ensureRun:
// Already closed. Ensure already ran at least once.
ensured = true
default:
}
return ensured
flotter marked this conversation as resolved.
Show resolved Hide resolved
}

// Stop stops the ensure loop and the managers under the StateEngine.
Expand Down Expand Up @@ -545,8 +556,9 @@ func Fake() *Overlord {
// testing.
func FakeWithState(handleRestart func(restart.RestartType)) *Overlord {
o := &Overlord{
loopTomb: new(tomb.Tomb),
inited: false,
loopTomb: new(tomb.Tomb),
inited: false,
ensureRun: make(chan struct{}),
}
s := state.New(fakeBackend{o: o})
o.stateEng = NewStateEngine(s)
Expand Down
32 changes: 19 additions & 13 deletions internals/overlord/planstate/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ func (e *LabelExists) Error() string {
}

type PlanManager struct {
layersDir string
layersDir string
stateEngineReady bool

planLock sync.Mutex
plan *plan.Plan
Expand All @@ -48,21 +49,15 @@ func NewManager(layersDir string) (*PlanManager, error) {
return manager, nil
}

// Load reads plan layers from the pebble directory, combines and validates the
// final plan, and finally notifies registered managers of the plan update. In
// the case of a non-existent layers directory, or no layers in the layers
// directory, an empty plan is announced to change subscribers.
func (m *PlanManager) Load() error {
plan, err := plan.ReadDir(m.layersDir)
// Load reads plan layers from the layers directory and combines and validates
// the final plan. In the case of a non-existent layers directory, or no layers
// in the layers directory, an empty plan is created.
func (m *PlanManager) load() error {
var err error
m.plan, err = plan.ReadDir(m.layersDir)
if err != nil {
return err
}

m.planLock.Lock()
m.plan = plan
m.planLock.Unlock()

m.callChangeListeners(plan)
return nil
}

Expand Down Expand Up @@ -203,9 +198,20 @@ func findLayer(layers []*plan.Layer, label string) (int, *plan.Layer) {

// Ensure implements StateManager.Ensure.
func (m *PlanManager) Ensure() error {
if !m.stateEngineReady {
// No public call to this package will happen until the first
// Ensure() pass was completed. See overlord.go.
m.callChangeListeners(m.plan)
m.stateEngineReady = true
}
return nil
}

// StartUp implements StateManager.StartUp.
func (m *PlanManager) StartUp() error {
return m.load()
}

// SetServiceArgs sets the service arguments provided by "pebble run --args"
// to their respective services. It adds a new layer in the plan, the layer
// consisting of services with commands having their arguments changed.
Expand Down
12 changes: 9 additions & 3 deletions internals/overlord/planstate/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ func (ps *planSuite) TestLoadInvalidPebbleDir(c *C) {
ps.planMgr, err = planstate.NewManager("/invalid/path")
c.Assert(err, IsNil)
// Load the plan from the <pebble-dir>/layers directory
err = ps.planMgr.Load()
err = ps.planMgr.StartUp()
c.Assert(err, IsNil)
err = ps.planMgr.Ensure()
c.Assert(err, IsNil)
plan := ps.planMgr.Plan()
out, err := yaml.Marshal(plan)
Expand Down Expand Up @@ -64,7 +66,9 @@ func (ps *planSuite) TestLoadLayers(c *C) {
ps.writeLayer(c, string(reindent(l)))
}
// Load the plan from the <pebble-dir>/layers directory
err = ps.planMgr.Load()
err = ps.planMgr.StartUp()
c.Assert(err, IsNil)
err = ps.planMgr.Ensure()
c.Assert(err, IsNil)
plan := ps.planMgr.Plan()
out, err := yaml.Marshal(plan)
Expand Down Expand Up @@ -350,7 +354,9 @@ services:
override: replace
command: echo svc1
`)
err = manager.Load()
err = manager.StartUp()
c.Assert(err, IsNil)
err = manager.Ensure()
c.Assert(err, IsNil)

layer1 := ps.parseLayer(c, 0, "label1", `
Expand Down
Loading