Skip to content

Commit

Permalink
feat(planstate): make plan propagation safer
Browse files Browse the repository at this point in the history
  • Loading branch information
flotter committed Aug 5, 2024
1 parent d826556 commit 2f30648
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 57 deletions.
24 changes: 14 additions & 10 deletions internals/cli/cmd_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,16 +199,6 @@ func runDaemon(rcmd *cmdRun, ch chan os.Signal, ready chan<- func()) error {
return err
}

if rcmd.Args != nil {
mappedArgs, err := convertArgs(rcmd.Args)
if err != nil {
return err
}
if err := d.SetServiceArgs(mappedArgs); err != nil {
return err
}
}

// Run sanity check now, if anything goes wrong with the
// check we go into "degraded" mode where we always report
// the given error to any client.
Expand All @@ -227,6 +217,20 @@ func runDaemon(rcmd *cmdRun, ch chan os.Signal, ready chan<- func()) error {
return err
}

// When we reach this point, we know every overlord manager has received a
// state engine StartUp() and at least one Ensure() call. This also guarantees
// the plan has been loaded and propagated to every change listener.

if rcmd.Args != nil {
mappedArgs, err := convertArgs(rcmd.Args)
if err != nil {
return err
}
if err := d.SetServiceArgs(mappedArgs); err != nil {
return err
}
}

watchdog, err := runWatchdog(d)
if err != nil {
return fmt.Errorf("cannot run software watchdog: %v", err)
Expand Down
14 changes: 9 additions & 5 deletions internals/daemon/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,18 @@ func (s *apiSuite) daemon(c *check.C) *Daemon {
if s.d != nil {
panic("called daemon() twice")
}
d, err := New(&Options{Dir: s.pebbleDir})
var err error
s.d, err = New(&Options{Dir: s.pebbleDir})
c.Assert(err, check.IsNil)
d.addRoutes()
s.d.addRoutes()

c.Assert(d.overlord.StartUp(), check.IsNil)
// Ensure the plan manager loads and propagates the plan.
err = s.d.overlord.StateEngine().StartUp()
c.Assert(err, check.IsNil)
err = s.d.overlord.StateEngine().Ensure()
c.Assert(err, check.IsNil)

s.d = d
return d
return s.d
}

func (s *apiSuite) startOverlord() {
Expand Down
5 changes: 4 additions & 1 deletion internals/daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1345,7 +1345,10 @@ func (s *daemonSuite) TestWritesRequireAdminAccess(c *C) {
}

func (s *daemonSuite) TestAPIAccessLevels(c *C) {
_ = s.newDaemon(c)
d := s.newDaemon(c)
d.Init()
c.Assert(d.Start(), IsNil)
defer d.Stop(nil)

tests := []struct {
method string
Expand Down
10 changes: 1 addition & 9 deletions internals/overlord/checkstate/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"reflect"
"sort"
"sync"
"sync/atomic"

"gopkg.in/tomb.v2"

Expand All @@ -38,8 +37,7 @@ const (

// CheckManager starts and manages the health checks.
type CheckManager struct {
state *state.State
ensureDone atomic.Bool
state *state.State

failureHandlers []FailureFunc

Expand Down Expand Up @@ -87,7 +85,6 @@ func NewManager(s *state.State, runner *state.TaskRunner) *CheckManager {
}

func (m *CheckManager) Ensure() error {
m.ensureDone.Store(true)
return nil
}

Expand Down Expand Up @@ -164,11 +161,6 @@ func (m *CheckManager) PlanChanged(newPlan *plan.Plan) {
shouldEnsure = true
}
}
if !m.ensureDone.Load() {
// Can't call EnsureBefore before Overlord.Loop is running (which will
// call m.Ensure for the first time).
return
}
if shouldEnsure {
m.state.EnsureBefore(0) // start new tasks right away
}
Expand Down
44 changes: 28 additions & 16 deletions internals/overlord/overlord.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"

"github.com/canonical/x-go/randutil"
Expand Down Expand Up @@ -89,7 +88,7 @@ type Overlord struct {
ensureLock sync.Mutex
ensureTimer *time.Timer
ensureNext time.Time
ensureRun int32
ensureRun chan struct{}
pruneTicker *time.Ticker

startOfOperationTime time.Time
Expand All @@ -114,6 +113,7 @@ func New(opts *Options) (*Overlord, error) {
o := &Overlord{
pebbleDir: opts.PebbleDir,
loopTomb: new(tomb.Tomb),
ensureRun: make(chan struct{}),
inited: true,
extension: opts.Extension,
}
Expand Down Expand Up @@ -208,14 +208,6 @@ func New(opts *Options) (*Overlord, error) {
// before it.
o.stateEng.AddManager(o.runner)

// Load the plan from the Pebble layers directory (which may be missing
// or have no layers, resulting in an empty plan), and propagate PlanChanged
// notifications to all notification subscribers.
err = o.planMgr.Load()
if err != nil {
return nil, fmt.Errorf("cannot load plan: %w", err)
}

return o, nil
}

Expand Down Expand Up @@ -379,15 +371,34 @@ func (o *Overlord) Loop() {
}
}
})
o.ensureWaitRun()
}

func (o *Overlord) ensureDidRun() {
atomic.StoreInt32(&o.ensureRun, 1)
select {
case <-o.ensureRun:
// Already closed. Ensure already ran at least once.
default:
close(o.ensureRun)
}
}

func (o *Overlord) CanStandby() bool {
run := atomic.LoadInt32(&o.ensureRun)
return run != 0
// ensureWaitRun waits until StateEngine.Ensure() was called at least once.
func (o *Overlord) ensureWaitRun() {
select {
case <-o.ensureRun:
case <-o.loopTomb.Dying():
}
}

func (o *Overlord) CanStandby() (ensured bool) {
select {
case <-o.ensureRun:
// Already closed. Ensure already ran at least once.
ensured = true
default:
}
return ensured
}

// Stop stops the ensure loop and the managers under the StateEngine.
Expand Down Expand Up @@ -545,8 +556,9 @@ func Fake() *Overlord {
// testing.
func FakeWithState(handleRestart func(restart.RestartType)) *Overlord {
o := &Overlord{
loopTomb: new(tomb.Tomb),
inited: false,
loopTomb: new(tomb.Tomb),
inited: false,
ensureRun: make(chan struct{}),
}
s := state.New(fakeBackend{o: o})
o.stateEng = NewStateEngine(s)
Expand Down
32 changes: 19 additions & 13 deletions internals/overlord/planstate/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ func (e *LabelExists) Error() string {
}

type PlanManager struct {
layersDir string
layersDir string
stateEngineReady bool

planLock sync.Mutex
plan *plan.Plan
Expand All @@ -48,21 +49,15 @@ func NewManager(layersDir string) (*PlanManager, error) {
return manager, nil
}

// Load reads plan layers from the pebble directory, combines and validates the
// final plan, and finally notifies registered managers of the plan update. In
// the case of a non-existent layers directory, or no layers in the layers
// directory, an empty plan is announced to change subscribers.
func (m *PlanManager) Load() error {
plan, err := plan.ReadDir(m.layersDir)
// Load reads plan layers from the layers directory and combines and validates
// the final plan. In the case of a non-existent layers directory, or no layers
// in the layers directory, an empty plan is created.
func (m *PlanManager) load() error {
var err error
m.plan, err = plan.ReadDir(m.layersDir)
if err != nil {
return err
}

m.planLock.Lock()
m.plan = plan
m.planLock.Unlock()

m.callChangeListeners(plan)
return nil
}

Expand Down Expand Up @@ -203,9 +198,20 @@ func findLayer(layers []*plan.Layer, label string) (int, *plan.Layer) {

// Ensure implements StateManager.Ensure.
func (m *PlanManager) Ensure() error {
if !m.stateEngineReady {
// No public call to this package will happen until the first
// Ensure() pass was completed. See overlord.go.
m.callChangeListeners(m.plan)
m.stateEngineReady = true
}
return nil
}

// StartUp implements StateManager.StartUp.
func (m *PlanManager) StartUp() error {
return m.load()
}

// SetServiceArgs sets the service arguments provided by "pebble run --args"
// to their respective services. It adds a new layer in the plan, the layer
// consisting of services with commands having their arguments changed.
Expand Down
12 changes: 9 additions & 3 deletions internals/overlord/planstate/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ func (ps *planSuite) TestLoadInvalidPebbleDir(c *C) {
ps.planMgr, err = planstate.NewManager("/invalid/path")
c.Assert(err, IsNil)
// Load the plan from the <pebble-dir>/layers directory
err = ps.planMgr.Load()
err = ps.planMgr.StartUp()
c.Assert(err, IsNil)
err = ps.planMgr.Ensure()
c.Assert(err, IsNil)
plan := ps.planMgr.Plan()
out, err := yaml.Marshal(plan)
Expand Down Expand Up @@ -64,7 +66,9 @@ func (ps *planSuite) TestLoadLayers(c *C) {
ps.writeLayer(c, string(reindent(l)))
}
// Load the plan from the <pebble-dir>/layers directory
err = ps.planMgr.Load()
err = ps.planMgr.StartUp()
c.Assert(err, IsNil)
err = ps.planMgr.Ensure()
c.Assert(err, IsNil)
plan := ps.planMgr.Plan()
out, err := yaml.Marshal(plan)
Expand Down Expand Up @@ -350,7 +354,9 @@ services:
override: replace
command: echo svc1
`)
err = manager.Load()
err = manager.StartUp()
c.Assert(err, IsNil)
err = manager.Ensure()
c.Assert(err, IsNil)

layer1 := ps.parseLayer(c, 0, "label1", `
Expand Down

0 comments on commit 2f30648

Please sign in to comment.