Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove controller watcher #1478

6 changes: 3 additions & 3 deletions cmd/jimmsrv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,6 @@ func start(ctx context.Context, s *service.Service) error {
}

isLeader := os.Getenv("JIMM_IS_LEADER") != ""
if isLeader {
s.Go(func() error { return jimmsvc.WatchControllers(ctx) }) // Deletes dead/dying models, updates model config.
}
s.Go(func() error { return jimmsvc.WatchModelSummaries(ctx) })

if isLeader {
Expand All @@ -206,6 +203,9 @@ func start(ctx context.Context, s *service.Service) error {
s.Go(func() error {
return jimmsvc.OpenFGACleanup(ctx, time.NewTicker(6*time.Hour).C)
})
s.Go(func() error {
SimoneDutto marked this conversation as resolved.
Show resolved Hide resolved
return jimmsvc.CleanupDyingModels(ctx, time.NewTicker(time.Minute).C)
})
}

if isLeader {
Expand Down
27 changes: 16 additions & 11 deletions cmd/jimmsrv/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,17 +209,6 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, req *http.Request) {
s.mux.ServeHTTP(w, req)
}

// WatchControllers connects to all controllers and starts an AllWatcher
// monitoring all changes to models. WatchControllers finishes when the
// given context is canceled, or there is a fatal error watching models.
func (s *Service) WatchControllers(ctx context.Context) error {
w := jimm.Watcher{
Database: s.jimm.Database,
Dialer: s.jimm.Dialer,
}
return w.Watch(ctx, 10*time.Minute)
}

// WatchModelSummaries connects to all controllers and starts a
// ModelSummaryWatcher for all models. WatchModelSummaries finishes when
// the given context is canceled, or there is a fatal error watching model
Expand Down Expand Up @@ -272,6 +261,22 @@ func (s *Service) OpenFGACleanup(ctx context.Context, trigger <-chan time.Time)
}
}

// CleanupDyingModels triggers every `trigger` time and calls the jimm methods to cleanup dying models.
func (s *Service) CleanupDyingModels(ctx context.Context, trigger <-chan time.Time) error {
for {
select {
case <-trigger:
err := s.jimm.CleanupDyingModels(ctx)
if err != nil {
zapctx.Error(ctx, "dying models cleanup", zap.Error(err))
continue
}
case <-ctx.Done():
return nil
}
}
}

// Cleanup cleans up resources that need to be released on shutdown.
func (s *Service) Cleanup() {
// Iterating over clean up function in reverse-order to avoid early clean ups.
Expand Down
46 changes: 0 additions & 46 deletions internal/jimm/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,52 +629,6 @@ func (j *JIMM) ImportModel(ctx context.Context, user *openfga.User, controllerNa
return errors.E(op, err)
}

return importer.handleModelDeltas(ctx)
}

func (m *modelImporter) handleModelDeltas(ctx context.Context) error {
const op = errors.Op("jimm.getModelDeltas")

modelAPI, err := m.jimm.dialModel(ctx, &m.model.Controller, m.model.ResourceTag())
if err != nil {
return errors.E(op, err)
}
defer modelAPI.Close()

watcherID, err := modelAPI.WatchAll(ctx)
if err != nil {
return errors.E(op, err)
}
defer func() {
if err := modelAPI.ModelWatcherStop(ctx, watcherID); err != nil {
zapctx.Error(ctx, "failed to stop model watcher", zap.Error(err))
}
}()

deltas, err := modelAPI.ModelWatcherNext(ctx, watcherID)
if err != nil {
return errors.E(op, err)
}

modelIDf := func(uuid string) *modelState {
if uuid == m.model.UUID.String {
return &modelState{
id: m.model.ID,
machines: make(map[string]int64),
units: make(map[string]bool),
}
}
return nil
}

w := &Watcher{
Database: m.jimm.Database,
}
for _, d := range deltas {
if err := w.handleDelta(ctx, modelIDf, d); err != nil {
return errors.E(op, err)
}
}
return nil
}

Expand Down
15 changes: 0 additions & 15 deletions internal/jimm/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -953,21 +953,6 @@ func TestImportModel(t *testing.T) {
c.Run(test.about, func(c *qt.C) {
api := &jimmtest.API{
ModelInfo_: test.modelInfo,
ModelWatcherNext_: func(ctx context.Context, id string) ([]jujuparams.Delta, error) {
if id != test.about {
return nil, errors.E("incorrect id")
}
return test.deltas, nil
},
ModelWatcherStop_: func(ctx context.Context, id string) error {
if id != test.about {
return errors.E("incorrect id")
}
return nil
},
WatchAll_: func(context.Context) (string, error) {
return test.about, nil
},
ListApplicationOffers_: func(ctx context.Context, of []jujuparams.OfferFilter) ([]jujuparams.ApplicationOfferAdminDetailsV5, error) {
return test.offers, nil
},
Expand Down
4 changes: 0 additions & 4 deletions internal/jimm/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ var (
ResolveTag = resolveTag
)

func WatchController(w *Watcher, ctx context.Context, ctl *dbmodel.Controller) error {
return w.watchController(ctx, ctl)
}

func NewWatcherWithControllerUnavailableChan(db db.Database, dialer Dialer, pubsub Publisher, testChannel chan error) *Watcher {
return &Watcher{
Pubsub: pubsub,
Expand Down
20 changes: 0 additions & 20 deletions internal/jimm/jimm.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,6 @@ type API interface {
// AddCloud adds a new cloud.
AddCloud(context.Context, names.CloudTag, jujuparams.Cloud, bool) error

// AllModelWatcherNext returns the next set of deltas from an
// all-model watcher.
AllModelWatcherNext(context.Context, string) ([]jujuparams.Delta, error)

// AllModelWatcherStop stops an all-model watcher.
AllModelWatcherStop(context.Context, string) error

// ChangeModelCredential replaces cloud credential for a given model with the provided one.
ChangeModelCredential(context.Context, names.ModelTag, names.CloudCredentialTag) error

Expand Down Expand Up @@ -338,13 +331,6 @@ type API interface {
// ModelSummaryWatcherStop stops a model summary watcher.
ModelSummaryWatcherStop(context.Context, string) error

// ModelWatcherNext receives the next set of results from the model
// watcher with the given id.
ModelWatcherNext(ctx context.Context, id string) ([]jujuparams.Delta, error)

// ModelWatcherStop stops the model watcher with the given id.
ModelWatcherStop(ctx context.Context, id string) error

// Offer creates a new application-offer.
Offer(context.Context, crossmodel.OfferURL, jujuparams.AddApplicationOffer) error

Expand Down Expand Up @@ -387,15 +373,9 @@ type API interface {
// ValidateModelUpgrade validates that a model can be upgraded.
ValidateModelUpgrade(context.Context, names.ModelTag, bool) error

// WatchAll creates a watcher that reports deltas for a specific model.
WatchAll(context.Context) (string, error)

// WatchAllModelSummaries creates a ModelSummaryWatcher.
WatchAllModelSummaries(context.Context) (string, error)

// WatchAllModels creates a megawatcher.
WatchAllModels(context.Context) (string, error)

// ListFilesystems lists filesystems for desired machines.
// If no machines provided, a list of all filesystems is returned.
ListFilesystems(ctx context.Context, machines []string) ([]jujuparams.FilesystemDetailsListResult, error)
Expand Down
16 changes: 11 additions & 5 deletions internal/jimm/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -1165,15 +1165,21 @@ func (j *JIMM) DestroyModel(ctx context.Context, user *openfga.User, mt names.Mo
zapctx.Info(ctx, string(op))

err := j.doModelAdmin(ctx, user, mt, func(m *dbmodel.Model, api API) error {
if err := api.DestroyModel(ctx, mt, destroyStorage, force, maxWait, timeout); err != nil {
return err
}
m.Life = state.Dying.String()
if err := j.Database.UpdateModel(ctx, m); err != nil {
// If the database fails to update don't worry too much the
// monitor should catch it.
zapctx.Error(ctx, "failed to store model change", zaputil.Error(err))
return err
}
if err := api.DestroyModel(ctx, mt, destroyStorage, force, maxWait, timeout); err != nil {
zapctx.Error(ctx, "failed to call destroy juju api", zaputil.Error(err))
SimoneDutto marked this conversation as resolved.
Show resolved Hide resolved
// this is a manual way of restoring the life state to alive if the JUJU api fails.
m.Life = state.Alive.String()
if uerr := j.Database.UpdateModel(ctx, m); uerr != nil {
zapctx.Error(ctx, "failed to store model change", zaputil.Error(uerr))
}
return err
}

return nil
})
if err != nil {
Expand Down
53 changes: 53 additions & 0 deletions internal/jimm/model_cleanup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2024 Canonical.

package jimm

import (
"context"
"fmt"

jujuparams "github.com/juju/juju/rpc/params"
"github.com/juju/juju/state"
"github.com/juju/zaputil/zapctx"

"github.com/canonical/jimm/v3/internal/dbmodel"
"github.com/canonical/jimm/v3/internal/errors"
)

// CleanupDyingModels loops over dying models, contacting the respective controller.
// And deleting the model from our database if the error is `NotFound` which means the model was successfully deleted.
func (j *JIMM) CleanupDyingModels(ctx context.Context) error {
const op = errors.Op("jimm.CleanupDyingModels")
zapctx.Info(ctx, string(op))

err := j.DB().ForEachModel(ctx, func(m *dbmodel.Model) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of for each models, rather list models that are dying.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, i was battled because i feel like creating a method in the database called GetDyingModels() is not very clever. I'd rather add a List(filter) method in our db methods.
But i remember last time there was some opposition to that, and rather just using "ForEach" to keep it consistent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think foreach is fine honestly and dialing the controllers makes a lot of sense to me. Why would we have a db method?

SimoneDutto marked this conversation as resolved.
Show resolved Hide resolved
if m.Life != state.Dying.String() {
return nil
}
// if the model is dying and not found by querying the controller we can assume it is dead.
// And safely delete the reference from our db.
api, err := j.dialController(ctx, &m.Controller)
if err != nil {
zapctx.Error(ctx, fmt.Sprintf("Cannot dial controller %s: %s\n", m.Controller.UUID, err))
return nil
}
if err := api.ModelInfo(ctx, &jujuparams.ModelInfo{UUID: m.UUID.String}); err != nil {
// Some versions of juju return unauthorized for models that cannot be found.
if errors.ErrorCode(err) == errors.CodeNotFound || errors.ErrorCode(err) == errors.CodeUnauthorized {
if err := j.DB().DeleteModel(ctx, m); err != nil {
zapctx.Error(ctx, fmt.Sprintf("Cannot delete model %s: %s\n", m.UUID.String, err))
} else {
return nil
}
} else {
zapctx.Error(ctx, fmt.Sprintf("Cannot get ModelInfo for model %s: %s\n", m.UUID.String, err))
SimoneDutto marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
}
return nil
})
if err != nil {
return errors.E(op, err)
}
return nil
}
Loading
Loading