From b7e28c8819f02bf7ff5ad173a15b8dfbddb21995 Mon Sep 17 00:00:00 2001 From: Fred Lotter Date: Mon, 5 Aug 2024 17:42:57 +0200 Subject: [PATCH 1/2] feat(planstate): make plan propagation safer --- internals/cli/cmd_run.go | 24 ++++++----- internals/daemon/api_services_test.go | 7 ++++ internals/daemon/daemon_test.go | 5 ++- internals/overlord/checkstate/manager.go | 10 +---- internals/overlord/overlord.go | 44 +++++++++++++------- internals/overlord/planstate/manager.go | 32 ++++++++------ internals/overlord/planstate/manager_test.go | 12 ++++-- 7 files changed, 82 insertions(+), 52 deletions(-) diff --git a/internals/cli/cmd_run.go b/internals/cli/cmd_run.go index 0bd2eeca9..00f0eb46b 100644 --- a/internals/cli/cmd_run.go +++ b/internals/cli/cmd_run.go @@ -199,16 +199,6 @@ func runDaemon(rcmd *cmdRun, ch chan os.Signal, ready chan<- func()) error { return err } - if rcmd.Args != nil { - mappedArgs, err := convertArgs(rcmd.Args) - if err != nil { - return err - } - if err := d.SetServiceArgs(mappedArgs); err != nil { - return err - } - } - // Run sanity check now, if anything goes wrong with the // check we go into "degraded" mode where we always report // the given error to any client. @@ -227,6 +217,20 @@ func runDaemon(rcmd *cmdRun, ch chan os.Signal, ready chan<- func()) error { return err } + // When we reach this point, we know every overlord manager has received a + // state engine StartUp() and at least one Ensure() call. This also guarantees + // the plan has been loaded and propagated to every change listener. + + if rcmd.Args != nil { + mappedArgs, err := convertArgs(rcmd.Args) + if err != nil { + return err + } + if err := d.SetServiceArgs(mappedArgs); err != nil { + return err + } + } + watchdog, err := runWatchdog(d) if err != nil { return fmt.Errorf("cannot run software watchdog: %v", err) diff --git a/internals/daemon/api_services_test.go b/internals/daemon/api_services_test.go index 8ad4c9093..a60e1997f 100644 --- a/internals/daemon/api_services_test.go +++ b/internals/daemon/api_services_test.go @@ -73,6 +73,7 @@ func (s *apiSuite) TestServicesStart(c *C) { writeTestLayer(s.pebbleDir, servicesLayer) d := s.daemon(c) st := d.overlord.State() + s.startOverlord() soon := 0 restore := FakeStateEnsureBefore(func(st *state.State, d time.Duration) { @@ -120,6 +121,7 @@ func (s *apiSuite) TestServicesStop(c *C) { writeTestLayer(s.pebbleDir, servicesLayer) d := s.daemon(c) st := d.overlord.State() + s.startOverlord() soon := 0 restore := FakeStateEnsureBefore(func(st *state.State, d time.Duration) { @@ -167,6 +169,7 @@ func (s *apiSuite) TestServicesAutoStart(c *C) { writeTestLayer(s.pebbleDir, servicesLayer) d := s.daemon(c) st := d.overlord.State() + s.startOverlord() soon := 0 restore := FakeStateEnsureBefore(func(st *state.State, d time.Duration) { @@ -209,6 +212,7 @@ func (s *apiSuite) TestServicesGet(c *C) { // Setup writeTestLayer(s.pebbleDir, servicesLayer) s.daemon(c) + s.startOverlord() // Execute req, err := http.NewRequest("GET", "/v1/services", nil) @@ -238,6 +242,7 @@ func (s *apiSuite) TestServicesRestart(c *C) { writeTestLayer(s.pebbleDir, servicesLayer) d := s.daemon(c) st := d.overlord.State() + s.startOverlord() soon := 0 restore := FakeStateEnsureBefore(func(st *state.State, d time.Duration) { @@ -287,6 +292,7 @@ func (s *apiSuite) TestServicesReplan(c *C) { writeTestLayer(s.pebbleDir, servicesLayer) d := s.daemon(c) st := d.overlord.State() + s.startOverlord() soon := 0 restore := FakeStateEnsureBefore(func(st *state.State, d time.Duration) { @@ -334,6 +340,7 @@ services: `) d := s.daemon(c) st := d.overlord.State() + s.startOverlord() restore := FakeStateEnsureBefore(func(st *state.State, d time.Duration) {}) defer restore() diff --git a/internals/daemon/daemon_test.go b/internals/daemon/daemon_test.go index dd190c68a..819ceb38c 100644 --- a/internals/daemon/daemon_test.go +++ b/internals/daemon/daemon_test.go @@ -1345,7 +1345,10 @@ func (s *daemonSuite) TestWritesRequireAdminAccess(c *C) { } func (s *daemonSuite) TestAPIAccessLevels(c *C) { - _ = s.newDaemon(c) + d := s.newDaemon(c) + d.Init() + c.Assert(d.Start(), IsNil) + defer d.Stop(nil) tests := []struct { method string diff --git a/internals/overlord/checkstate/manager.go b/internals/overlord/checkstate/manager.go index 45fcf54d3..81eed861d 100644 --- a/internals/overlord/checkstate/manager.go +++ b/internals/overlord/checkstate/manager.go @@ -20,7 +20,6 @@ import ( "reflect" "sort" "sync" - "sync/atomic" "gopkg.in/tomb.v2" @@ -38,8 +37,7 @@ const ( // CheckManager starts and manages the health checks. type CheckManager struct { - state *state.State - ensureDone atomic.Bool + state *state.State failureHandlers []FailureFunc @@ -87,7 +85,6 @@ func NewManager(s *state.State, runner *state.TaskRunner) *CheckManager { } func (m *CheckManager) Ensure() error { - m.ensureDone.Store(true) return nil } @@ -164,11 +161,6 @@ func (m *CheckManager) PlanChanged(newPlan *plan.Plan) { shouldEnsure = true } } - if !m.ensureDone.Load() { - // Can't call EnsureBefore before Overlord.Loop is running (which will - // call m.Ensure for the first time). - return - } if shouldEnsure { m.state.EnsureBefore(0) // start new tasks right away } diff --git a/internals/overlord/overlord.go b/internals/overlord/overlord.go index fc257e997..fecb1fc23 100644 --- a/internals/overlord/overlord.go +++ b/internals/overlord/overlord.go @@ -22,7 +22,6 @@ import ( "os" "path/filepath" "sync" - "sync/atomic" "time" "github.com/canonical/x-go/randutil" @@ -89,7 +88,7 @@ type Overlord struct { ensureLock sync.Mutex ensureTimer *time.Timer ensureNext time.Time - ensureRun int32 + ensureRun chan struct{} pruneTicker *time.Ticker startOfOperationTime time.Time @@ -114,6 +113,7 @@ func New(opts *Options) (*Overlord, error) { o := &Overlord{ pebbleDir: opts.PebbleDir, loopTomb: new(tomb.Tomb), + ensureRun: make(chan struct{}), inited: true, extension: opts.Extension, } @@ -208,14 +208,6 @@ func New(opts *Options) (*Overlord, error) { // before it. o.stateEng.AddManager(o.runner) - // Load the plan from the Pebble layers directory (which may be missing - // or have no layers, resulting in an empty plan), and propagate PlanChanged - // notifications to all notification subscribers. - err = o.planMgr.Load() - if err != nil { - return nil, fmt.Errorf("cannot load plan: %w", err) - } - return o, nil } @@ -379,15 +371,34 @@ func (o *Overlord) Loop() { } } }) + o.ensureWaitRun() } func (o *Overlord) ensureDidRun() { - atomic.StoreInt32(&o.ensureRun, 1) + select { + case <-o.ensureRun: + // Already closed. Ensure already ran at least once. + default: + close(o.ensureRun) + } } -func (o *Overlord) CanStandby() bool { - run := atomic.LoadInt32(&o.ensureRun) - return run != 0 +// ensureWaitRun waits until StateEngine.Ensure() was called at least once. +func (o *Overlord) ensureWaitRun() { + select { + case <-o.ensureRun: + case <-o.loopTomb.Dying(): + } +} + +func (o *Overlord) CanStandby() (ensured bool) { + select { + case <-o.ensureRun: + // Already closed. Ensure already ran at least once. + ensured = true + default: + } + return ensured } // Stop stops the ensure loop and the managers under the StateEngine. @@ -545,8 +556,9 @@ func Fake() *Overlord { // testing. func FakeWithState(handleRestart func(restart.RestartType)) *Overlord { o := &Overlord{ - loopTomb: new(tomb.Tomb), - inited: false, + loopTomb: new(tomb.Tomb), + inited: false, + ensureRun: make(chan struct{}), } s := state.New(fakeBackend{o: o}) o.stateEng = NewStateEngine(s) diff --git a/internals/overlord/planstate/manager.go b/internals/overlord/planstate/manager.go index 49fb202f5..0c1ec7454 100644 --- a/internals/overlord/planstate/manager.go +++ b/internals/overlord/planstate/manager.go @@ -32,7 +32,8 @@ func (e *LabelExists) Error() string { } type PlanManager struct { - layersDir string + layersDir string + stateEngineReady bool planLock sync.Mutex plan *plan.Plan @@ -48,21 +49,15 @@ func NewManager(layersDir string) (*PlanManager, error) { return manager, nil } -// Load reads plan layers from the pebble directory, combines and validates the -// final plan, and finally notifies registered managers of the plan update. In -// the case of a non-existent layers directory, or no layers in the layers -// directory, an empty plan is announced to change subscribers. -func (m *PlanManager) Load() error { - plan, err := plan.ReadDir(m.layersDir) +// Load reads plan layers from the layers directory and combines and validates +// the final plan. In the case of a non-existent layers directory, or no layers +// in the layers directory, an empty plan is created. +func (m *PlanManager) load() error { + var err error + m.plan, err = plan.ReadDir(m.layersDir) if err != nil { return err } - - m.planLock.Lock() - m.plan = plan - m.planLock.Unlock() - - m.callChangeListeners(plan) return nil } @@ -203,9 +198,20 @@ func findLayer(layers []*plan.Layer, label string) (int, *plan.Layer) { // Ensure implements StateManager.Ensure. func (m *PlanManager) Ensure() error { + if !m.stateEngineReady { + // No public call to this package will happen until the first + // Ensure() pass was completed. See overlord.go. + m.callChangeListeners(m.plan) + m.stateEngineReady = true + } return nil } +// StartUp implements StateManager.StartUp. +func (m *PlanManager) StartUp() error { + return m.load() +} + // SetServiceArgs sets the service arguments provided by "pebble run --args" // to their respective services. It adds a new layer in the plan, the layer // consisting of services with commands having their arguments changed. diff --git a/internals/overlord/planstate/manager_test.go b/internals/overlord/planstate/manager_test.go index 9d84bef17..82867c7a6 100644 --- a/internals/overlord/planstate/manager_test.go +++ b/internals/overlord/planstate/manager_test.go @@ -29,7 +29,9 @@ func (ps *planSuite) TestLoadInvalidPebbleDir(c *C) { ps.planMgr, err = planstate.NewManager("/invalid/path") c.Assert(err, IsNil) // Load the plan from the /layers directory - err = ps.planMgr.Load() + err = ps.planMgr.StartUp() + c.Assert(err, IsNil) + err = ps.planMgr.Ensure() c.Assert(err, IsNil) plan := ps.planMgr.Plan() out, err := yaml.Marshal(plan) @@ -64,7 +66,9 @@ func (ps *planSuite) TestLoadLayers(c *C) { ps.writeLayer(c, string(reindent(l))) } // Load the plan from the /layers directory - err = ps.planMgr.Load() + err = ps.planMgr.StartUp() + c.Assert(err, IsNil) + err = ps.planMgr.Ensure() c.Assert(err, IsNil) plan := ps.planMgr.Plan() out, err := yaml.Marshal(plan) @@ -350,7 +354,9 @@ services: override: replace command: echo svc1 `) - err = manager.Load() + err = manager.StartUp() + c.Assert(err, IsNil) + err = manager.Ensure() c.Assert(err, IsNil) layer1 := ps.parseLayer(c, 0, "label1", ` From b0df183955b8077cfead00d581c3903de1fbb37d Mon Sep 17 00:00:00 2001 From: Fred Lotter Date: Tue, 6 Aug 2024 08:32:52 +0200 Subject: [PATCH 2/2] Review improvements 1 --- internals/overlord/overlord.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/internals/overlord/overlord.go b/internals/overlord/overlord.go index fecb1fc23..0fe96a162 100644 --- a/internals/overlord/overlord.go +++ b/internals/overlord/overlord.go @@ -371,7 +371,7 @@ func (o *Overlord) Loop() { } } }) - o.ensureWaitRun() + o.waitEnsureRun() } func (o *Overlord) ensureDidRun() { @@ -383,22 +383,22 @@ func (o *Overlord) ensureDidRun() { } } -// ensureWaitRun waits until StateEngine.Ensure() was called at least once. -func (o *Overlord) ensureWaitRun() { +// waitEnsureRun waits until StateEngine.Ensure() was called at least once. +func (o *Overlord) waitEnsureRun() { select { case <-o.ensureRun: case <-o.loopTomb.Dying(): } } -func (o *Overlord) CanStandby() (ensured bool) { +func (o *Overlord) CanStandby() bool { select { case <-o.ensureRun: // Already closed. Ensure already ran at least once. - ensured = true + return true default: + return false } - return ensured } // Stop stops the ensure loop and the managers under the StateEngine.